source: libfaim/txqueue.c @ 453bd70

barnowl_perlaimdebianowlrelease-1.4release-1.5release-1.6release-1.7release-1.8release-1.9
Last change on this file since 453bd70 was 862371b, checked in by James M. Kretchmar <kretch@mit.edu>, 18 years ago
*** empty log message ***
  • Property mode set to 100644
File size: 10.0 KB
Line 
1/*
2 * txqueue.c
3 *
4 * Herein lies all the mangement routines for the transmit (Tx) queue.
5 *
6 */
7
8#define FAIM_INTERNAL
9#include <aim.h>
10
11#ifndef _WIN32
12#include <sys/socket.h>
13#else
14#include "win32dep.h"
15#endif
16
17/*
18 * Allocate a new tx frame.
19 *
20 * This is more for looks than anything else.
21 *
22 * Right now, that is.  If/when we implement a pool of transmit
23 * frames, this will become the request-an-unused-frame part.
24 *
25 * framing = AIM_FRAMETYPE_OFT/FLAP
26 * chan = channel for FLAP, hdrtype for OFT
27 *
28 */
29faim_internal aim_frame_t *aim_tx_new(aim_session_t *sess, aim_conn_t *conn, fu8_t framing, fu16_t chan, int datalen)
30{
31        aim_frame_t *fr;
32
33        if (!conn) {
34                faimdprintf(sess, 0, "aim_tx_new: ERROR: no connection specified\n");
35                return NULL;
36        }
37
38        /* For sanity... */
39        if ((conn->type == AIM_CONN_TYPE_RENDEZVOUS) || 
40                        (conn->type == AIM_CONN_TYPE_RENDEZVOUS_OUT)) {
41                if (framing != AIM_FRAMETYPE_OFT) {
42                        faimdprintf(sess, 0, "aim_tx_new: attempted to allocate inappropriate frame type for rendezvous connection\n");
43                        return NULL;
44                }
45        } else {
46                if (framing != AIM_FRAMETYPE_FLAP) {
47                        faimdprintf(sess, 0, "aim_tx_new: attempted to allocate inappropriate frame type for FLAP connection\n");
48                        return NULL;
49                }
50        }
51
52        if (!(fr = (aim_frame_t *)malloc(sizeof(aim_frame_t))))
53                return NULL;
54        memset(fr, 0, sizeof(aim_frame_t));
55
56        fr->conn = conn; 
57
58        fr->hdrtype = framing;
59
60        if (fr->hdrtype == AIM_FRAMETYPE_FLAP) {
61
62                fr->hdr.flap.type = chan;
63
64        } else if (fr->hdrtype == AIM_FRAMETYPE_OFT) {
65
66                fr->hdr.rend.type = chan;
67
68        } else 
69                faimdprintf(sess, 0, "tx_new: unknown framing\n");
70
71        if (datalen > 0) {
72                fu8_t *data;
73
74                if (!(data = (unsigned char *)malloc(datalen))) {
75                        aim_frame_destroy(fr);
76                        return NULL;
77                }
78
79                aim_bstream_init(&fr->data, data, datalen);
80        }
81
82        return fr;
83}
84
85/*
86 * aim_tx_enqeue__queuebased()
87 *
88 * The overall purpose here is to enqueue the passed in command struct
89 * into the outgoing (tx) queue.  Basically...
90 *   1) Make a scope-irrelevent copy of the struct
91 *   3) Mark as not-sent-yet
92 *   4) Enqueue the struct into the list
93 *   6) Return
94 *
95 * Note that this is only used when doing queue-based transmitting;
96 * that is, when sess->tx_enqueue is set to &aim_tx_enqueue__queuebased.
97 *
98 */
99static int aim_tx_enqueue__queuebased(aim_session_t *sess, aim_frame_t *fr)
100{
101
102        if (!fr->conn) {
103                faimdprintf(sess, 1, "aim_tx_enqueue: WARNING: enqueueing packet with no connecetion\n");
104                fr->conn = aim_getconn_type(sess, AIM_CONN_TYPE_BOS);
105        }
106
107        if (fr->hdrtype == AIM_FRAMETYPE_FLAP) {
108                /* assign seqnum -- XXX should really not assign until hardxmit */
109                fr->hdr.flap.seqnum = aim_get_next_txseqnum(fr->conn);
110        }
111
112        fr->handled = 0; /* not sent yet */
113
114        /* see overhead note in aim_rxqueue counterpart */
115        if (!sess->queue_outgoing)
116                sess->queue_outgoing = fr;
117        else {
118                aim_frame_t *cur;
119
120                for (cur = sess->queue_outgoing; cur->next; cur = cur->next)
121                        ;
122                cur->next = fr;
123        }
124
125        return 0;
126}
127
128/*
129 * aim_tx_enqueue__immediate()
130 *
131 * Parallel to aim_tx_enqueue__queuebased, however, this bypasses
132 * the whole queue mess when you want immediate writes to happen.
133 *
134 * Basically the same as its __queuebased couterpart, however
135 * instead of doing a list append, it just calls aim_tx_sendframe()
136 * right here.
137 *
138 */
139static int aim_tx_enqueue__immediate(aim_session_t *sess, aim_frame_t *fr)
140{
141
142        if (!fr->conn) {
143                faimdprintf(sess, 1, "aim_tx_enqueue: ERROR: packet has no connection\n");
144                aim_frame_destroy(fr);
145                return 0;
146        }
147
148        if (fr->hdrtype == AIM_FRAMETYPE_FLAP)
149                fr->hdr.flap.seqnum = aim_get_next_txseqnum(fr->conn);
150
151        fr->handled = 0; /* not sent yet */
152
153        aim_tx_sendframe(sess, fr);
154
155        aim_frame_destroy(fr);
156
157        return 0;
158}
159
160faim_export int aim_tx_setenqueue(aim_session_t *sess, int what, int (*func)(aim_session_t *, aim_frame_t *))
161{
162       
163        if (what == AIM_TX_QUEUED)
164                sess->tx_enqueue = &aim_tx_enqueue__queuebased;
165        else if (what == AIM_TX_IMMEDIATE) 
166                sess->tx_enqueue = &aim_tx_enqueue__immediate;
167        else if (what == AIM_TX_USER) {
168                if (!func)
169                        return -EINVAL;
170                sess->tx_enqueue = func;
171        } else
172                return -EINVAL; /* unknown action */
173
174        return 0;
175}
176
177faim_internal int aim_tx_enqueue(aim_session_t *sess, aim_frame_t *fr)
178{
179       
180        /*
181         * If we want to send a connection thats inprogress, we have to force
182         * them to use the queue based version. Otherwise, use whatever they
183         * want.
184         */
185        if (fr && fr->conn && 
186                        (fr->conn->status & AIM_CONN_STATUS_INPROGRESS)) {
187                return aim_tx_enqueue__queuebased(sess, fr);
188        }
189
190        return (*sess->tx_enqueue)(sess, fr);
191}
192
193/*
194 *  aim_get_next_txseqnum()
195 *
196 *   This increments the tx command count, and returns the seqnum
197 *   that should be stamped on the next FLAP packet sent.  This is
198 *   normally called during the final step of packet preparation
199 *   before enqueuement (in aim_tx_enqueue()).
200 *
201 */
202faim_internal flap_seqnum_t aim_get_next_txseqnum(aim_conn_t *conn)
203{
204        flap_seqnum_t ret;
205       
206        ret = ++conn->seqnum;
207
208        return ret;
209}
210
211static int aim_send(int fd, const void *buf, size_t count)
212{
213        int left, cur;
214
215        for (cur = 0, left = count; left; ) {
216                int ret;
217
218                ret = send(fd, ((unsigned char *)buf)+cur, left, 0);
219                if (ret == -1)
220                        return -1;
221                else if (ret == 0)
222                        return cur;
223
224                cur += ret;
225                left -= ret;
226        }
227
228        return cur;
229}
230
231static int aim_bstream_send(aim_bstream_t *bs, aim_conn_t *conn, size_t count)
232{
233        int wrote = 0;
234        if (!bs || !conn || (count < 0))
235                return -EINVAL;
236
237        if (count > aim_bstream_empty(bs))
238                count = aim_bstream_empty(bs); /* truncate to remaining space */
239
240        if (count) {
241                if ((conn->type == AIM_CONN_TYPE_RENDEZVOUS) && 
242                    (conn->subtype == AIM_CONN_SUBTYPE_OFT_DIRECTIM)) {
243                        /* I strongly suspect that this is a horrible thing to do
244                         * and I feel really guilty doing it. */
245                        const char *sn = aim_directim_getsn(conn);
246                        aim_rxcallback_t userfunc;
247                        while (count - wrote > 1024) {
248                                wrote = wrote + aim_send(conn->fd, bs->data + bs->offset + wrote, 1024);
249                                if ((userfunc=aim_callhandler(conn->sessv, conn, 
250                                                              AIM_CB_FAM_SPECIAL, 
251                                                              AIM_CB_SPECIAL_IMAGETRANSFER)))
252                                  userfunc(conn->sessv, NULL, sn, 
253                                           count-wrote>1024 ? ((double)wrote / count) : 1);
254                        }
255                }
256                if (count - wrote) {
257                        wrote = wrote + aim_send(conn->fd, bs->data + bs->offset + wrote, count - wrote);
258                }
259               
260        }
261       
262
263        if (((aim_session_t *)conn->sessv)->debug >= 2) {
264                int i;
265                aim_session_t *sess = (aim_session_t *)conn->sessv;
266
267                faimdprintf(sess, 2, "\nOutgoing data: (%d bytes)", wrote);
268                for (i = 0; i < wrote; i++) {
269                        if (!(i % 8)) 
270                                faimdprintf(sess, 2, "\n\t");
271                        faimdprintf(sess, 2, "0x%02x ", *(bs->data + bs->offset + i));
272                }
273                faimdprintf(sess, 2, "\n");
274        }
275
276
277        bs->offset += wrote;
278
279        return wrote;   
280}
281
282static int sendframe_flap(aim_session_t *sess, aim_frame_t *fr)
283{
284        aim_bstream_t obs;
285        fu8_t *obs_raw;
286        int payloadlen, err = 0, obslen;
287
288        payloadlen = aim_bstream_curpos(&fr->data);
289
290        if (!(obs_raw = malloc(6 + payloadlen)))
291                return -ENOMEM;
292
293        aim_bstream_init(&obs, obs_raw, 6 + payloadlen);
294
295        /* FLAP header */
296        aimbs_put8(&obs, 0x2a);
297        aimbs_put8(&obs, fr->hdr.flap.type);
298        aimbs_put16(&obs, fr->hdr.flap.seqnum);
299        aimbs_put16(&obs, payloadlen);
300
301        /* payload */
302        aim_bstream_rewind(&fr->data);
303        aimbs_putbs(&obs, &fr->data, payloadlen);
304
305        obslen = aim_bstream_curpos(&obs);
306        aim_bstream_rewind(&obs);
307        if (aim_bstream_send(&obs, fr->conn, obslen) != obslen)
308                err = -errno;
309       
310        free(obs_raw); /* XXX aim_bstream_free */
311
312        fr->handled = 1;
313        fr->conn->lastactivity = time(NULL);
314
315        return err;
316}
317
318static int sendframe_rendezvous(aim_session_t *sess, aim_frame_t *fr)
319{
320        aim_bstream_t bs;
321        fu8_t *bs_raw;
322        int err = 0;
323        int totlen = 8 + aim_bstream_curpos(&fr->data);
324
325        if (!(bs_raw = malloc(totlen)))
326                return -1;
327
328        aim_bstream_init(&bs, bs_raw, totlen);
329
330        aimbs_putraw(&bs, fr->hdr.rend.magic, 4);
331        aimbs_put16(&bs, 8 + fr->hdr.rend.hdrlen);
332        aimbs_put16(&bs, fr->hdr.rend.type);
333
334        /* payload */
335        aim_bstream_rewind(&fr->data);
336        aimbs_putbs(&bs, &fr->data, totlen - 8);
337
338        aim_bstream_rewind(&bs);
339
340        if (aim_bstream_send(&bs, fr->conn, totlen) != totlen)
341                err = -errno;
342
343        free(bs_raw); /* XXX aim_bstream_free */
344
345        fr->handled = 1;
346        fr->conn->lastactivity = time(NULL);
347
348        return err;
349}
350
351faim_internal int aim_tx_sendframe(aim_session_t *sess, aim_frame_t *fr)
352{
353        if (fr->hdrtype == AIM_FRAMETYPE_FLAP)
354                return sendframe_flap(sess, fr);
355        else if (fr->hdrtype == AIM_FRAMETYPE_OFT)
356                return sendframe_rendezvous(sess, fr);
357        return -1;
358}
359
360faim_export int aim_tx_flushqueue(aim_session_t *sess)
361{
362        aim_frame_t *cur;
363
364        for (cur = sess->queue_outgoing; cur; cur = cur->next) {
365
366                if (cur->handled)
367                        continue; /* already been sent */
368
369                if (cur->conn && (cur->conn->status & AIM_CONN_STATUS_INPROGRESS))
370                        continue;
371
372                /*
373                 * And now for the meager attempt to force transmit
374                 * latency and avoid missed messages.
375                 */
376                if ((cur->conn->lastactivity + cur->conn->forcedlatency) >= time(NULL)) {
377                        /*
378                         * XXX should be a break! we dont want to block the
379                         * upper layers
380                         *
381                         * XXX or better, just do this right.
382                         *
383                         */
384                        sleep((cur->conn->lastactivity + cur->conn->forcedlatency) - time(NULL));
385                }
386
387                /* XXX this should call the custom "queuing" function!! */
388                aim_tx_sendframe(sess, cur);
389        }
390
391        /* purge sent commands from queue */
392        aim_tx_purgequeue(sess);
393
394        return 0;
395}
396
397/*
398 *  aim_tx_purgequeue()
399 * 
400 *  This is responsable for removing sent commands from the transmit
401 *  queue. This is not a required operation, but it of course helps
402 *  reduce memory footprint at run time! 
403 *
404 */
405faim_export void aim_tx_purgequeue(aim_session_t *sess)
406{
407        aim_frame_t *cur, **prev;
408
409        for (prev = &sess->queue_outgoing; (cur = *prev); ) {
410
411                if (cur->handled) {
412                        *prev = cur->next;
413
414                        aim_frame_destroy(cur);
415
416                } else
417                        prev = &cur->next;
418        }
419
420        return;
421}
422
423/**
424 * aim_tx_cleanqueue - get rid of packets waiting for tx on a dying conn
425 * @sess: session
426 * @conn: connection that's dying
427 *
428 * for now this simply marks all packets as sent and lets them
429 * disappear without warning.
430 *
431 */
432faim_internal void aim_tx_cleanqueue(aim_session_t *sess, aim_conn_t *conn)
433{
434        aim_frame_t *cur;
435
436        for (cur = sess->queue_outgoing; cur; cur = cur->next) {
437                if (cur->conn == conn)
438                        cur->handled = 1;
439        }
440
441        return;
442}
Note: See TracBrowser for help on using the repository browser.