source: libfaim/txqueue.c @ f7cf6c2

barnowl_perlaimdebianowlrelease-1.10release-1.4release-1.5release-1.6release-1.7release-1.8release-1.9
Last change on this file since f7cf6c2 was cf02dd6, checked in by James M. Kretchmar <kretch@mit.edu>, 21 years ago
*** empty log message ***
  • Property mode set to 100644
File size: 9.9 KB
Line 
1/*
2 * txqueue.c
3 *
4 * Herein lies all the mangement routines for the transmit (Tx) queue.
5 *
6 */
7
8#define FAIM_INTERNAL
9#include <aim.h>
10
11#ifndef _WIN32
12#include <sys/socket.h>
13#else
14#include "win32dep.h"
15#endif
16
17/*
18 * Allocate a new tx frame.
19 *
20 * This is more for looks than anything else.
21 *
22 * Right now, that is.  If/when we implement a pool of transmit
23 * frames, this will become the request-an-unused-frame part.
24 *
25 * framing = AIM_FRAMETYPE_OFT/FLAP
26 * chan = channel for FLAP, hdrtype for OFT
27 *
28 */
29faim_internal aim_frame_t *aim_tx_new(aim_session_t *sess, aim_conn_t *conn, fu8_t framing, fu16_t chan, int datalen)
30{
31        aim_frame_t *fr;
32
33        if (!conn) {
34                faimdprintf(sess, 0, "aim_tx_new: ERROR: no connection specified\n");
35                return NULL;
36        }
37
38        /* For sanity... */
39        if ((conn->type == AIM_CONN_TYPE_RENDEZVOUS) || (conn->type == AIM_CONN_TYPE_LISTENER)) {
40                if (framing != AIM_FRAMETYPE_OFT) {
41                        faimdprintf(sess, 0, "aim_tx_new: attempted to allocate inappropriate frame type for rendezvous connection\n");
42                        return NULL;
43                }
44        } else {
45                if (framing != AIM_FRAMETYPE_FLAP) {
46                        faimdprintf(sess, 0, "aim_tx_new: attempted to allocate inappropriate frame type for FLAP connection\n");
47                        return NULL;
48                }
49        }
50
51        if (!(fr = (aim_frame_t *)malloc(sizeof(aim_frame_t))))
52                return NULL;
53        memset(fr, 0, sizeof(aim_frame_t));
54
55        fr->conn = conn; 
56
57        fr->hdrtype = framing;
58
59        if (fr->hdrtype == AIM_FRAMETYPE_FLAP) {
60
61                fr->hdr.flap.type = chan;
62
63        } else if (fr->hdrtype == AIM_FRAMETYPE_OFT) {
64
65                fr->hdr.rend.type = chan;
66
67        } else 
68                faimdprintf(sess, 0, "tx_new: unknown framing\n");
69
70        if (datalen > 0) {
71                fu8_t *data;
72
73                if (!(data = (unsigned char *)malloc(datalen))) {
74                        aim_frame_destroy(fr);
75                        return NULL;
76                }
77
78                aim_bstream_init(&fr->data, data, datalen);
79        }
80
81        return fr;
82}
83
84/*
85 * aim_tx_enqeue__queuebased()
86 *
87 * The overall purpose here is to enqueue the passed in command struct
88 * into the outgoing (tx) queue.  Basically...
89 *   1) Make a scope-irrelevent copy of the struct
90 *   3) Mark as not-sent-yet
91 *   4) Enqueue the struct into the list
92 *   6) Return
93 *
94 * Note that this is only used when doing queue-based transmitting;
95 * that is, when sess->tx_enqueue is set to &aim_tx_enqueue__queuebased.
96 *
97 */
98static int aim_tx_enqueue__queuebased(aim_session_t *sess, aim_frame_t *fr)
99{
100
101        if (!fr->conn) {
102                faimdprintf(sess, 1, "aim_tx_enqueue: WARNING: enqueueing packet with no connecetion\n");
103                fr->conn = aim_getconn_type(sess, AIM_CONN_TYPE_BOS);
104        }
105
106        if (fr->hdrtype == AIM_FRAMETYPE_FLAP) {
107                /* assign seqnum -- XXX should really not assign until hardxmit */
108                fr->hdr.flap.seqnum = aim_get_next_txseqnum(fr->conn);
109        }
110
111        fr->handled = 0; /* not sent yet */
112
113        /* see overhead note in aim_rxqueue counterpart */
114        if (!sess->queue_outgoing)
115                sess->queue_outgoing = fr;
116        else {
117                aim_frame_t *cur;
118
119                for (cur = sess->queue_outgoing; cur->next; cur = cur->next)
120                        ;
121                cur->next = fr;
122        }
123
124        return 0;
125}
126
127/*
128 * aim_tx_enqueue__immediate()
129 *
130 * Parallel to aim_tx_enqueue__queuebased, however, this bypasses
131 * the whole queue mess when you want immediate writes to happen.
132 *
133 * Basically the same as its __queuebased couterpart, however
134 * instead of doing a list append, it just calls aim_tx_sendframe()
135 * right here.
136 *
137 */
138static int aim_tx_enqueue__immediate(aim_session_t *sess, aim_frame_t *fr)
139{
140
141        if (!fr->conn) {
142                faimdprintf(sess, 1, "aim_tx_enqueue: ERROR: packet has no connection\n");
143                aim_frame_destroy(fr);
144                return 0;
145        }
146
147        if (fr->hdrtype == AIM_FRAMETYPE_FLAP)
148                fr->hdr.flap.seqnum = aim_get_next_txseqnum(fr->conn);
149
150        fr->handled = 0; /* not sent yet */
151
152        aim_tx_sendframe(sess, fr);
153
154        aim_frame_destroy(fr);
155
156        return 0;
157}
158
159faim_export int aim_tx_setenqueue(aim_session_t *sess, int what, int (*func)(aim_session_t *, aim_frame_t *))
160{
161       
162        if (what == AIM_TX_QUEUED)
163                sess->tx_enqueue = &aim_tx_enqueue__queuebased;
164        else if (what == AIM_TX_IMMEDIATE) 
165                sess->tx_enqueue = &aim_tx_enqueue__immediate;
166        else if (what == AIM_TX_USER) {
167                if (!func)
168                        return -EINVAL;
169                sess->tx_enqueue = func;
170        } else
171                return -EINVAL; /* unknown action */
172
173        return 0;
174}
175
176faim_internal int aim_tx_enqueue(aim_session_t *sess, aim_frame_t *fr)
177{
178       
179        /*
180         * If we want to send a connection thats inprogress, we have to force
181         * them to use the queue based version. Otherwise, use whatever they
182         * want.
183         */
184        if (fr && fr->conn && 
185                        (fr->conn->status & AIM_CONN_STATUS_INPROGRESS)) {
186                return aim_tx_enqueue__queuebased(sess, fr);
187        }
188
189        return (*sess->tx_enqueue)(sess, fr);
190}
191
192/*
193 *  aim_get_next_txseqnum()
194 *
195 *   This increments the tx command count, and returns the seqnum
196 *   that should be stamped on the next FLAP packet sent.  This is
197 *   normally called during the final step of packet preparation
198 *   before enqueuement (in aim_tx_enqueue()).
199 *
200 */
201faim_internal flap_seqnum_t aim_get_next_txseqnum(aim_conn_t *conn)
202{
203        flap_seqnum_t ret;
204       
205        ret = ++conn->seqnum;
206
207        return ret;
208}
209
210static int aim_send(int fd, const void *buf, size_t count)
211{
212        int left, cur;
213
214        for (cur = 0, left = count; left; ) {
215                int ret;
216
217                ret = send(fd, ((unsigned char *)buf)+cur, left, 0);
218                if (ret == -1)
219                        return -1;
220                else if (ret == 0)
221                        return cur;
222
223                cur += ret;
224                left -= ret;
225        }
226
227        return cur;
228}
229
230static int aim_bstream_send(aim_bstream_t *bs, aim_conn_t *conn, size_t count)
231{
232        int wrote = 0;
233        if (!bs || !conn || (count < 0))
234                return -EINVAL;
235
236        if (count > aim_bstream_empty(bs))
237                count = aim_bstream_empty(bs); /* truncate to remaining space */
238
239        if (count) {
240                if ((conn->type == AIM_CONN_TYPE_RENDEZVOUS) && 
241                    (conn->subtype == AIM_CONN_SUBTYPE_OFT_DIRECTIM)) {
242                        /* I strongly suspect that this is a horrible thing to do
243                         * and I feel really guilty doing it. */
244                        const char *sn = aim_odc_getsn(conn);
245                        aim_rxcallback_t userfunc;
246                        while (count - wrote > 1024) {
247                                wrote = wrote + aim_send(conn->fd, bs->data + bs->offset + wrote, 1024);
248                                if ((userfunc=aim_callhandler(conn->sessv, conn, 
249                                                                AIM_CB_FAM_SPECIAL, 
250                                                                AIM_CB_SPECIAL_IMAGETRANSFER)))
251                                  userfunc(conn->sessv, NULL, sn, 
252                                           count-wrote>1024 ? ((double)wrote / count) : 1);
253                        }
254                }
255                if (count - wrote) {
256                        wrote = wrote + aim_send(conn->fd, bs->data + bs->offset + wrote, count - wrote);
257                }
258
259        }
260
261        if (((aim_session_t *)conn->sessv)->debug >= 2) {
262                int i;
263                aim_session_t *sess = (aim_session_t *)conn->sessv;
264
265                faimdprintf(sess, 2, "\nOutgoing data: (%d bytes)", wrote);
266                for (i = 0; i < wrote; i++) {
267                        if (!(i % 8)) 
268                                faimdprintf(sess, 2, "\n\t");
269                        faimdprintf(sess, 2, "0x%02x ", *(bs->data + bs->offset + i));
270                }
271                faimdprintf(sess, 2, "\n");
272        }
273
274        bs->offset += wrote;
275
276        return wrote;
277}
278
279static int sendframe_flap(aim_session_t *sess, aim_frame_t *fr)
280{
281        aim_bstream_t obs;
282        fu8_t *obs_raw;
283        int payloadlen, err = 0, obslen;
284
285        payloadlen = aim_bstream_curpos(&fr->data);
286
287        if (!(obs_raw = malloc(6 + payloadlen)))
288                return -ENOMEM;
289
290        aim_bstream_init(&obs, obs_raw, 6 + payloadlen);
291
292        /* FLAP header */
293        aimbs_put8(&obs, 0x2a);
294        aimbs_put8(&obs, fr->hdr.flap.type);
295        aimbs_put16(&obs, fr->hdr.flap.seqnum);
296        aimbs_put16(&obs, payloadlen);
297
298        /* payload */
299        aim_bstream_rewind(&fr->data);
300        aimbs_putbs(&obs, &fr->data, payloadlen);
301
302        obslen = aim_bstream_curpos(&obs);
303        aim_bstream_rewind(&obs);
304        if (aim_bstream_send(&obs, fr->conn, obslen) != obslen)
305                err = -errno;
306       
307        free(obs_raw); /* XXX aim_bstream_free */
308
309        fr->handled = 1;
310        fr->conn->lastactivity = time(NULL);
311
312        return err;
313}
314
315static int sendframe_rendezvous(aim_session_t *sess, aim_frame_t *fr)
316{
317        aim_bstream_t bs;
318        fu8_t *bs_raw;
319        int err = 0;
320        int totlen = 8 + aim_bstream_curpos(&fr->data);
321
322        if (!(bs_raw = malloc(totlen)))
323                return -1;
324
325        aim_bstream_init(&bs, bs_raw, totlen);
326
327        aimbs_putraw(&bs, fr->hdr.rend.magic, 4);
328        aimbs_put16(&bs, 8 + fr->hdr.rend.hdrlen);
329        aimbs_put16(&bs, fr->hdr.rend.type);
330
331        /* payload */
332        aim_bstream_rewind(&fr->data);
333        aimbs_putbs(&bs, &fr->data, totlen - 8);
334
335        aim_bstream_rewind(&bs);
336
337        if (aim_bstream_send(&bs, fr->conn, totlen) != totlen)
338                err = -errno;
339
340        free(bs_raw); /* XXX aim_bstream_free */
341
342        fr->handled = 1;
343        fr->conn->lastactivity = time(NULL);
344
345        return err;
346}
347
348faim_internal int aim_tx_sendframe(aim_session_t *sess, aim_frame_t *fr)
349{
350        if (fr->hdrtype == AIM_FRAMETYPE_FLAP)
351                return sendframe_flap(sess, fr);
352        else if (fr->hdrtype == AIM_FRAMETYPE_OFT)
353                return sendframe_rendezvous(sess, fr);
354        return -1;
355}
356
357faim_export int aim_tx_flushqueue(aim_session_t *sess)
358{
359        aim_frame_t *cur;
360
361        for (cur = sess->queue_outgoing; cur; cur = cur->next) {
362
363                if (cur->handled)
364                        continue; /* already been sent */
365
366                if (cur->conn && (cur->conn->status & AIM_CONN_STATUS_INPROGRESS))
367                        continue;
368
369                /*
370                 * And now for the meager attempt to force transmit
371                 * latency and avoid missed messages.
372                 */
373                if ((cur->conn->lastactivity + cur->conn->forcedlatency) >= time(NULL)) {
374                        /*
375                         * XXX should be a break! we dont want to block the
376                         * upper layers
377                         *
378                         * XXX or better, just do this right.
379                         *
380                         */
381                        sleep((cur->conn->lastactivity + cur->conn->forcedlatency) - time(NULL));
382                }
383
384                /* XXX this should call the custom "queuing" function!! */
385                aim_tx_sendframe(sess, cur);
386        }
387
388        /* purge sent commands from queue */
389        aim_tx_purgequeue(sess);
390
391        return 0;
392}
393
394/*
395 *  aim_tx_purgequeue()
396 * 
397 *  This is responsable for removing sent commands from the transmit
398 *  queue. This is not a required operation, but it of course helps
399 *  reduce memory footprint at run time! 
400 *
401 */
402faim_export void aim_tx_purgequeue(aim_session_t *sess)
403{
404        aim_frame_t *cur, **prev;
405
406        for (prev = &sess->queue_outgoing; (cur = *prev); ) {
407
408                if (cur->handled) {
409                        *prev = cur->next;
410
411                        aim_frame_destroy(cur);
412
413                } else
414                        prev = &cur->next;
415        }
416
417        return;
418}
419
420/**
421 * aim_tx_cleanqueue - get rid of packets waiting for tx on a dying conn
422 * @sess: session
423 * @conn: connection that's dying
424 *
425 * for now this simply marks all packets as sent and lets them
426 * disappear without warning.
427 *
428 */
429faim_internal void aim_tx_cleanqueue(aim_session_t *sess, aim_conn_t *conn)
430{
431        aim_frame_t *cur;
432
433        for (cur = sess->queue_outgoing; cur; cur = cur->next) {
434                if (cur->conn == conn)
435                        cur->handled = 1;
436        }
437
438        return;
439}
Note: See TracBrowser for help on using the repository browser.