source: libfaim/txqueue.c @ c9e72d1

barnowl_perlaimdebianowlrelease-1.10release-1.4release-1.5release-1.6release-1.7release-1.8release-1.9
Last change on this file since c9e72d1 was e374dee, checked in by James M. Kretchmar <kretch@mit.edu>, 20 years ago
*** empty log message ***
  • Property mode set to 100644
File size: 10.3 KB
RevLine 
[5e53c4a]1/*
[862371b]2 * txqueue.c
[5e53c4a]3 *
4 * Herein lies all the mangement routines for the transmit (Tx) queue.
5 *
6 */
7
8#define FAIM_INTERNAL
9#include <aim.h>
10
11#ifndef _WIN32
12#include <sys/socket.h>
[862371b]13#else
14#include "win32dep.h"
[5e53c4a]15#endif
16
17/*
18 * Allocate a new tx frame.
19 *
20 * This is more for looks than anything else.
21 *
22 * Right now, that is.  If/when we implement a pool of transmit
23 * frames, this will become the request-an-unused-frame part.
24 *
25 * framing = AIM_FRAMETYPE_OFT/FLAP
26 * chan = channel for FLAP, hdrtype for OFT
27 *
28 */
[862371b]29faim_internal aim_frame_t *aim_tx_new(aim_session_t *sess, aim_conn_t *conn, fu8_t framing, fu16_t chan, int datalen)
[5e53c4a]30{
[e374dee]31  aim_frame_t *fr;
32 
33  if (!conn) {
34    faimdprintf(sess, 0, "aim_tx_new: ERROR: no connection specified\n");
35    return NULL;
36  }
37 
38  /* For sanity... */
39  if ((conn->type == AIM_CONN_TYPE_RENDEZVOUS) || (conn->type == AIM_CONN_TYPE_LISTENER)) {
40    if (framing != AIM_FRAMETYPE_OFT) {
41      faimdprintf(sess, 0, "aim_tx_new: attempted to allocate inappropriate frame type for rendezvous connection\n");
42      return NULL;
43    }
44  } else {
45    if (framing != AIM_FRAMETYPE_FLAP) {
46      faimdprintf(sess, 0, "aim_tx_new: attempted to allocate inappropriate frame type for FLAP connection\n");
47      return NULL;
48    }
49  }
50 
51  if (!(fr = (aim_frame_t *)malloc(sizeof(aim_frame_t)))) return NULL;
52  memset(fr, 0, sizeof(aim_frame_t));
53 
54  fr->conn = conn; 
55  fr->hdrtype = framing;
56  if (fr->hdrtype == AIM_FRAMETYPE_FLAP) {
57    fr->hdr.flap.type = chan;
58  } else if (fr->hdrtype == AIM_FRAMETYPE_OFT) {
59    fr->hdr.rend.type = chan;
60  } else {
61    faimdprintf(sess, 0, "tx_new: unknown framing\n");
62  }
63 
64  if (datalen > 0) {
65    fu8_t *data;
66    if (!(data = (unsigned char *)malloc(datalen))) {
67      aim_frame_destroy(fr);
68      return NULL;
69    }
70    aim_bstream_init(&fr->data, data, datalen);
71  }
72 
73  return fr;
[5e53c4a]74}
75
76/*
77 * aim_tx_enqeue__queuebased()
78 *
79 * The overall purpose here is to enqueue the passed in command struct
80 * into the outgoing (tx) queue.  Basically...
81 *   1) Make a scope-irrelevent copy of the struct
82 *   3) Mark as not-sent-yet
83 *   4) Enqueue the struct into the list
84 *   6) Return
85 *
86 * Note that this is only used when doing queue-based transmitting;
87 * that is, when sess->tx_enqueue is set to &aim_tx_enqueue__queuebased.
88 *
89 */
90static int aim_tx_enqueue__queuebased(aim_session_t *sess, aim_frame_t *fr)
91{
[e374dee]92 
93  if (!fr->conn) {
94    faimdprintf(sess, 1, "aim_tx_enqueue: WARNING: enqueueing packet with no connecetion\n");
95    fr->conn = aim_getconn_type(sess, AIM_CONN_TYPE_BOS);
96  }
97 
98  if (fr->hdrtype == AIM_FRAMETYPE_FLAP) {
99    /* assign seqnum -- XXX should really not assign until hardxmit */
100    fr->hdr.flap.seqnum = aim_get_next_txseqnum(fr->conn);
101  }
102 
103  fr->handled = 0; /* not sent yet */
104 
105  /* see overhead note in aim_rxqueue counterpart */
106  if (!sess->queue_outgoing)
107    sess->queue_outgoing = fr;
108  else {
109    aim_frame_t *cur;
110   
111    for (cur = sess->queue_outgoing; cur->next; cur = cur->next)
112      ;
113    cur->next = fr;
114  }
115 
116  return 0;
[5e53c4a]117}
118
119/*
120 * aim_tx_enqueue__immediate()
121 *
122 * Parallel to aim_tx_enqueue__queuebased, however, this bypasses
123 * the whole queue mess when you want immediate writes to happen.
124 *
125 * Basically the same as its __queuebased couterpart, however
126 * instead of doing a list append, it just calls aim_tx_sendframe()
127 * right here.
128 *
129 */
130static int aim_tx_enqueue__immediate(aim_session_t *sess, aim_frame_t *fr)
131{
132
[e374dee]133  if (!fr->conn) {
134    faimdprintf(sess, 1, "aim_tx_enqueue: ERROR: packet has no connection\n");
135    aim_frame_destroy(fr);
136    return 0;
137  }
138 
139  if (fr->hdrtype == AIM_FRAMETYPE_FLAP) fr->hdr.flap.seqnum = aim_get_next_txseqnum(fr->conn);
140  fr->handled = 0; /* not sent yet */
141  aim_tx_sendframe(sess, fr);
142  aim_frame_destroy(fr);
143 
144  return 0;
[5e53c4a]145}
146
147faim_export int aim_tx_setenqueue(aim_session_t *sess, int what, int (*func)(aim_session_t *, aim_frame_t *))
148{
[e374dee]149  if (what == AIM_TX_QUEUED) {
150    sess->tx_enqueue = &aim_tx_enqueue__queuebased;
151  } else if (what == AIM_TX_IMMEDIATE) {
152    sess->tx_enqueue = &aim_tx_enqueue__immediate;
153  } else if (what == AIM_TX_USER) {
154    if (!func) return -EINVAL;
155    sess->tx_enqueue = func;
156  } else {
157    return -EINVAL; /* unknown action */
158  }
159 
160  return 0;
[5e53c4a]161}
162
163faim_internal int aim_tx_enqueue(aim_session_t *sess, aim_frame_t *fr)
164{
[e374dee]165 
166  /*
167   * If we want to send a connection thats inprogress, we have to force
168   * them to use the queue based version. Otherwise, use whatever they
169   * want.
170   */
171  if (fr && fr->conn && 
172      (fr->conn->status & AIM_CONN_STATUS_INPROGRESS)) {
173    return aim_tx_enqueue__queuebased(sess, fr);
174  }
175 
176  return (*sess->tx_enqueue)(sess, fr);
[5e53c4a]177}
178
179/*
180 *  aim_get_next_txseqnum()
181 *
182 *   This increments the tx command count, and returns the seqnum
183 *   that should be stamped on the next FLAP packet sent.  This is
184 *   normally called during the final step of packet preparation
185 *   before enqueuement (in aim_tx_enqueue()).
186 *
187 */
188faim_internal flap_seqnum_t aim_get_next_txseqnum(aim_conn_t *conn)
189{
[e374dee]190  flap_seqnum_t ret;
191  ret = ++conn->seqnum;
192  return ret;
[5e53c4a]193}
194
195static int aim_send(int fd, const void *buf, size_t count)
196{
[e374dee]197  int left, cur;
198 
199  for (cur = 0, left = count; left; ) {
200    int ret;
201   
202    ret = send(fd, ((unsigned char *)buf)+cur, left, 0);
203    if (ret == -1) {
204      return -1;
205    } else if (ret == 0) {
206      return cur;
207    }
208   
209    cur += ret;
210    left -= ret;
211  }
212 
213  return cur;
[5e53c4a]214}
215
216static int aim_bstream_send(aim_bstream_t *bs, aim_conn_t *conn, size_t count)
217{
[e374dee]218  int wrote = 0;
219  if (!bs || !conn || (count < 0))
220    return -EINVAL;
221 
222  if (count > aim_bstream_empty(bs))
223    count = aim_bstream_empty(bs); /* truncate to remaining space */
224 
225  if (count) {
226    if ((conn->type == AIM_CONN_TYPE_RENDEZVOUS) && 
227        (conn->subtype == AIM_CONN_SUBTYPE_OFT_DIRECTIM)) {
228      /* I strongly suspect that this is a horrible thing to do
229       * and I feel really guilty doing it. */
230      const char *sn = aim_odc_getsn(conn);
231      aim_rxcallback_t userfunc;
232      while (count - wrote > 1024) {
233        wrote = wrote + aim_send(conn->fd, bs->data + bs->offset + wrote, 1024);
234        if ((userfunc=aim_callhandler(conn->sessv, conn, 
235                                      AIM_CB_FAM_SPECIAL, 
236                                      AIM_CB_SPECIAL_IMAGETRANSFER)))
237          userfunc(conn->sessv, NULL, sn, 
238                   count-wrote>1024 ? ((double)wrote / count) : 1);
239      }
240    }
241    if (count - wrote) {
242      wrote = wrote + aim_send(conn->fd, bs->data + bs->offset + wrote, count - wrote);
243    }
244   
245  }
246 
247  if (((aim_session_t *)conn->sessv)->debug >= 2) {
248    int i;
249    aim_session_t *sess = (aim_session_t *)conn->sessv;
250   
251    faimdprintf(sess, 2, "\nOutgoing data: (%d bytes)", wrote);
252    for (i = 0; i < wrote; i++) {
253      if (!(i % 8)) 
254        faimdprintf(sess, 2, "\n\t");
255      faimdprintf(sess, 2, "0x%02x ", *(bs->data + bs->offset + i));
256    }
257    faimdprintf(sess, 2, "\n");
258  }
259 
260  bs->offset += wrote;
261 
262  return wrote;
[5e53c4a]263}
264
265static int sendframe_flap(aim_session_t *sess, aim_frame_t *fr)
266{
[e374dee]267  aim_bstream_t obs;
268  fu8_t *obs_raw;
269  int payloadlen, err = 0, obslen;
270 
271  payloadlen = aim_bstream_curpos(&fr->data);
272 
273  if (!(obs_raw = malloc(6 + payloadlen)))
274    return -ENOMEM;
275 
276  aim_bstream_init(&obs, obs_raw, 6 + payloadlen);
277 
278  /* FLAP header */
279  aimbs_put8(&obs, 0x2a);
280  aimbs_put8(&obs, fr->hdr.flap.type);
281  aimbs_put16(&obs, fr->hdr.flap.seqnum);
282  aimbs_put16(&obs, payloadlen);
283 
284  /* payload */
285  aim_bstream_rewind(&fr->data);
286  aimbs_putbs(&obs, &fr->data, payloadlen);
287 
288  obslen = aim_bstream_curpos(&obs);
289  aim_bstream_rewind(&obs);
290  if (aim_bstream_send(&obs, fr->conn, obslen) != obslen)
291    err = -errno;
292 
293  free(obs_raw); /* XXX aim_bstream_free */
294 
295  fr->handled = 1;
296  fr->conn->lastactivity = time(NULL);
297 
298  return err;
[5e53c4a]299}
300
[862371b]301static int sendframe_rendezvous(aim_session_t *sess, aim_frame_t *fr)
[5e53c4a]302{
[862371b]303        aim_bstream_t bs;
304        fu8_t *bs_raw;
[5e53c4a]305        int err = 0;
[862371b]306        int totlen = 8 + aim_bstream_curpos(&fr->data);
[5e53c4a]307
[862371b]308        if (!(bs_raw = malloc(totlen)))
[5e53c4a]309                return -1;
310
[862371b]311        aim_bstream_init(&bs, bs_raw, totlen);
[5e53c4a]312
[862371b]313        aimbs_putraw(&bs, fr->hdr.rend.magic, 4);
314        aimbs_put16(&bs, 8 + fr->hdr.rend.hdrlen);
315        aimbs_put16(&bs, fr->hdr.rend.type);
[5e53c4a]316
[862371b]317        /* payload */
318        aim_bstream_rewind(&fr->data);
319        aimbs_putbs(&bs, &fr->data, totlen - 8);
[5e53c4a]320
[862371b]321        aim_bstream_rewind(&bs);
[5e53c4a]322
[862371b]323        if (aim_bstream_send(&bs, fr->conn, totlen) != totlen)
324                err = -errno;
[5e53c4a]325
[862371b]326        free(bs_raw); /* XXX aim_bstream_free */
[5e53c4a]327
328        fr->handled = 1;
329        fr->conn->lastactivity = time(NULL);
330
331        return err;
332}
333
334faim_internal int aim_tx_sendframe(aim_session_t *sess, aim_frame_t *fr)
335{
336        if (fr->hdrtype == AIM_FRAMETYPE_FLAP)
337                return sendframe_flap(sess, fr);
338        else if (fr->hdrtype == AIM_FRAMETYPE_OFT)
[862371b]339                return sendframe_rendezvous(sess, fr);
[5e53c4a]340        return -1;
341}
342
343faim_export int aim_tx_flushqueue(aim_session_t *sess)
344{
345        aim_frame_t *cur;
346
347        for (cur = sess->queue_outgoing; cur; cur = cur->next) {
348
349                if (cur->handled)
350                        continue; /* already been sent */
351
352                if (cur->conn && (cur->conn->status & AIM_CONN_STATUS_INPROGRESS))
353                        continue;
354
355                /*
356                 * And now for the meager attempt to force transmit
357                 * latency and avoid missed messages.
358                 */
359                if ((cur->conn->lastactivity + cur->conn->forcedlatency) >= time(NULL)) {
360                        /*
361                         * XXX should be a break! we dont want to block the
362                         * upper layers
363                         *
364                         * XXX or better, just do this right.
365                         *
366                         */
367                        sleep((cur->conn->lastactivity + cur->conn->forcedlatency) - time(NULL));
368                }
369
370                /* XXX this should call the custom "queuing" function!! */
371                aim_tx_sendframe(sess, cur);
372        }
373
374        /* purge sent commands from queue */
375        aim_tx_purgequeue(sess);
376
377        return 0;
378}
379
380/*
381 *  aim_tx_purgequeue()
382 * 
383 *  This is responsable for removing sent commands from the transmit
384 *  queue. This is not a required operation, but it of course helps
385 *  reduce memory footprint at run time! 
386 *
387 */
388faim_export void aim_tx_purgequeue(aim_session_t *sess)
389{
390        aim_frame_t *cur, **prev;
391
392        for (prev = &sess->queue_outgoing; (cur = *prev); ) {
393
394                if (cur->handled) {
395                        *prev = cur->next;
396
397                        aim_frame_destroy(cur);
398
399                } else
400                        prev = &cur->next;
401        }
402
403        return;
404}
405
406/**
407 * aim_tx_cleanqueue - get rid of packets waiting for tx on a dying conn
408 * @sess: session
409 * @conn: connection that's dying
410 *
411 * for now this simply marks all packets as sent and lets them
412 * disappear without warning.
413 *
414 */
[862371b]415faim_internal void aim_tx_cleanqueue(aim_session_t *sess, aim_conn_t *conn)
[5e53c4a]416{
417        aim_frame_t *cur;
418
419        for (cur = sess->queue_outgoing; cur; cur = cur->next) {
420                if (cur->conn == conn)
421                        cur->handled = 1;
422        }
423
424        return;
425}
Note: See TracBrowser for help on using the repository browser.