source: libfaim/txqueue.c @ f36222f

barnowl_perlaimdebianowlrelease-1.10release-1.4release-1.5release-1.6release-1.7release-1.8release-1.9
Last change on this file since f36222f was 1e34e40, checked in by Erik Nygren <nygren@mit.edu>, 21 years ago
Handle the case in aim_bstream_send where aim_send returns -1, although there is likely an underlying problem here that would lead to this case.
  • Property mode set to 100644
File size: 10.5 KB
Line 
1/*
2 * txqueue.c
3 *
4 * Herein lies all the mangement routines for the transmit (Tx) queue.
5 *
6 */
7
8#define FAIM_INTERNAL
9#include <aim.h>
10
11#ifndef _WIN32
12#include <sys/socket.h>
13#else
14#include "win32dep.h"
15#endif
16
17/*
18 * Allocate a new tx frame.
19 *
20 * This is more for looks than anything else.
21 *
22 * Right now, that is.  If/when we implement a pool of transmit
23 * frames, this will become the request-an-unused-frame part.
24 *
25 * framing = AIM_FRAMETYPE_OFT/FLAP
26 * chan = channel for FLAP, hdrtype for OFT
27 *
28 */
29faim_internal aim_frame_t *aim_tx_new(aim_session_t *sess, aim_conn_t *conn, fu8_t framing, fu16_t chan, int datalen)
30{
31  aim_frame_t *fr;
32 
33  if (!conn) {
34    faimdprintf(sess, 0, "aim_tx_new: ERROR: no connection specified\n");
35    return NULL;
36  }
37 
38  /* For sanity... */
39  if ((conn->type == AIM_CONN_TYPE_RENDEZVOUS) || (conn->type == AIM_CONN_TYPE_LISTENER)) {
40    if (framing != AIM_FRAMETYPE_OFT) {
41      faimdprintf(sess, 0, "aim_tx_new: attempted to allocate inappropriate frame type for rendezvous connection\n");
42      return NULL;
43    }
44  } else {
45    if (framing != AIM_FRAMETYPE_FLAP) {
46      faimdprintf(sess, 0, "aim_tx_new: attempted to allocate inappropriate frame type for FLAP connection\n");
47      return NULL;
48    }
49  }
50 
51  if (!(fr = (aim_frame_t *)malloc(sizeof(aim_frame_t)))) return NULL;
52  memset(fr, 0, sizeof(aim_frame_t));
53 
54  fr->conn = conn; 
55  fr->hdrtype = framing;
56  if (fr->hdrtype == AIM_FRAMETYPE_FLAP) {
57    fr->hdr.flap.type = chan;
58  } else if (fr->hdrtype == AIM_FRAMETYPE_OFT) {
59    fr->hdr.rend.type = chan;
60  } else {
61    faimdprintf(sess, 0, "tx_new: unknown framing\n");
62  }
63 
64  if (datalen > 0) {
65    fu8_t *data;
66    if (!(data = (unsigned char *)malloc(datalen))) {
67      aim_frame_destroy(fr);
68      return NULL;
69    }
70    aim_bstream_init(&fr->data, data, datalen);
71  }
72 
73  return fr;
74}
75
76/*
77 * aim_tx_enqeue__queuebased()
78 *
79 * The overall purpose here is to enqueue the passed in command struct
80 * into the outgoing (tx) queue.  Basically...
81 *   1) Make a scope-irrelevent copy of the struct
82 *   3) Mark as not-sent-yet
83 *   4) Enqueue the struct into the list
84 *   6) Return
85 *
86 * Note that this is only used when doing queue-based transmitting;
87 * that is, when sess->tx_enqueue is set to &aim_tx_enqueue__queuebased.
88 *
89 */
90static int aim_tx_enqueue__queuebased(aim_session_t *sess, aim_frame_t *fr)
91{
92 
93  if (!fr->conn) {
94    faimdprintf(sess, 1, "aim_tx_enqueue: WARNING: enqueueing packet with no connecetion\n");
95    fr->conn = aim_getconn_type(sess, AIM_CONN_TYPE_BOS);
96  }
97 
98  if (fr->hdrtype == AIM_FRAMETYPE_FLAP) {
99    /* assign seqnum -- XXX should really not assign until hardxmit */
100    fr->hdr.flap.seqnum = aim_get_next_txseqnum(fr->conn);
101  }
102 
103  fr->handled = 0; /* not sent yet */
104 
105  /* see overhead note in aim_rxqueue counterpart */
106  if (!sess->queue_outgoing)
107    sess->queue_outgoing = fr;
108  else {
109    aim_frame_t *cur;
110   
111    for (cur = sess->queue_outgoing; cur->next; cur = cur->next)
112      ;
113    cur->next = fr;
114  }
115 
116  return 0;
117}
118
119/*
120 * aim_tx_enqueue__immediate()
121 *
122 * Parallel to aim_tx_enqueue__queuebased, however, this bypasses
123 * the whole queue mess when you want immediate writes to happen.
124 *
125 * Basically the same as its __queuebased couterpart, however
126 * instead of doing a list append, it just calls aim_tx_sendframe()
127 * right here.
128 *
129 */
130static int aim_tx_enqueue__immediate(aim_session_t *sess, aim_frame_t *fr)
131{
132
133  if (!fr->conn) {
134    faimdprintf(sess, 1, "aim_tx_enqueue: ERROR: packet has no connection\n");
135    aim_frame_destroy(fr);
136    return 0;
137  }
138 
139  if (fr->hdrtype == AIM_FRAMETYPE_FLAP) fr->hdr.flap.seqnum = aim_get_next_txseqnum(fr->conn);
140  fr->handled = 0; /* not sent yet */
141  aim_tx_sendframe(sess, fr);
142  aim_frame_destroy(fr);
143 
144  return 0;
145}
146
147faim_export int aim_tx_setenqueue(aim_session_t *sess, int what, int (*func)(aim_session_t *, aim_frame_t *))
148{
149  if (what == AIM_TX_QUEUED) {
150    sess->tx_enqueue = &aim_tx_enqueue__queuebased;
151  } else if (what == AIM_TX_IMMEDIATE) {
152    sess->tx_enqueue = &aim_tx_enqueue__immediate;
153  } else if (what == AIM_TX_USER) {
154    if (!func) return -EINVAL;
155    sess->tx_enqueue = func;
156  } else {
157    return -EINVAL; /* unknown action */
158  }
159 
160  return 0;
161}
162
163faim_internal int aim_tx_enqueue(aim_session_t *sess, aim_frame_t *fr)
164{
165 
166  /*
167   * If we want to send a connection thats inprogress, we have to force
168   * them to use the queue based version. Otherwise, use whatever they
169   * want.
170   */
171  if (fr && fr->conn && 
172      (fr->conn->status & AIM_CONN_STATUS_INPROGRESS)) {
173    return aim_tx_enqueue__queuebased(sess, fr);
174  }
175 
176  return (*sess->tx_enqueue)(sess, fr);
177}
178
179/*
180 *  aim_get_next_txseqnum()
181 *
182 *   This increments the tx command count, and returns the seqnum
183 *   that should be stamped on the next FLAP packet sent.  This is
184 *   normally called during the final step of packet preparation
185 *   before enqueuement (in aim_tx_enqueue()).
186 *
187 */
188faim_internal flap_seqnum_t aim_get_next_txseqnum(aim_conn_t *conn)
189{
190  flap_seqnum_t ret;
191  ret = ++conn->seqnum;
192  return ret;
193}
194
195static int aim_send(int fd, const void *buf, size_t count)
196{
197  int left, cur;
198 
199  for (cur = 0, left = count; left; ) {
200    int ret;
201   
202    ret = send(fd, ((unsigned char *)buf)+cur, left, 0);
203    if (ret == -1) {
204      return -1;
205    } else if (ret == 0) {
206      return cur;
207    }
208   
209    cur += ret;
210    left -= ret;
211  }
212 
213  return cur;
214}
215
216static int aim_bstream_send(aim_bstream_t *bs, aim_conn_t *conn, size_t count)
217{
218  int wrote = 0;
219  int rv = 0;
220  if (!bs || !conn || (count < 0))
221    return -EINVAL;
222 
223  if (count > aim_bstream_empty(bs))
224    count = aim_bstream_empty(bs); /* truncate to remaining space */
225 
226  if (count) {
227    if ((conn->type == AIM_CONN_TYPE_RENDEZVOUS) && 
228        (conn->subtype == AIM_CONN_SUBTYPE_OFT_DIRECTIM)) {
229      /* I strongly suspect that this is a horrible thing to do
230       * and I feel really guilty doing it. */
231      const char *sn = aim_odc_getsn(conn);
232      aim_rxcallback_t userfunc;
233      while (count - wrote > 1024) {
234        rv = aim_send(conn->fd, bs->data + bs->offset + wrote, 1024);
235        if (rv < 0) {
236          fprintf(stderr, "aim_bstream_send: aim_send failed...\n");
237          return -EINVAL;
238        }
239        wrote = wrote + rv;
240         
241        if ((userfunc=aim_callhandler(conn->sessv, conn, 
242                                      AIM_CB_FAM_SPECIAL, 
243                                      AIM_CB_SPECIAL_IMAGETRANSFER)))
244          userfunc(conn->sessv, NULL, sn, 
245                   count-wrote>1024 ? ((double)wrote / count) : 1);
246      }
247    }
248    if (count - wrote) {
249      rv = aim_send(conn->fd, bs->data + bs->offset + wrote, count - wrote);
250      if (rv < 0) {
251        fprintf(stderr, "aim_bstream_send: aim_send failed...\n");
252        return -EINVAL;
253      }
254      wrote = wrote + rv;
255    }
256   
257  }
258 
259  if (((aim_session_t *)conn->sessv)->debug >= 2) {
260    int i;
261    aim_session_t *sess = (aim_session_t *)conn->sessv;
262   
263    faimdprintf(sess, 2, "\nOutgoing data: (%d bytes)", wrote);
264    for (i = 0; i < wrote; i++) {
265      if (!(i % 8)) 
266        faimdprintf(sess, 2, "\n\t");
267      faimdprintf(sess, 2, "0x%02x ", *(bs->data + bs->offset + i));
268    }
269    faimdprintf(sess, 2, "\n");
270  }
271 
272  bs->offset += wrote;
273 
274  return wrote;
275}
276
277static int sendframe_flap(aim_session_t *sess, aim_frame_t *fr)
278{
279  aim_bstream_t obs;
280  fu8_t *obs_raw;
281  int payloadlen, err = 0, obslen;
282 
283  payloadlen = aim_bstream_curpos(&fr->data);
284 
285  if (!(obs_raw = malloc(6 + payloadlen)))
286    return -ENOMEM;
287 
288  aim_bstream_init(&obs, obs_raw, 6 + payloadlen);
289 
290  /* FLAP header */
291  aimbs_put8(&obs, 0x2a);
292  aimbs_put8(&obs, fr->hdr.flap.type);
293  aimbs_put16(&obs, fr->hdr.flap.seqnum);
294  aimbs_put16(&obs, payloadlen);
295 
296  /* payload */
297  aim_bstream_rewind(&fr->data);
298  aimbs_putbs(&obs, &fr->data, payloadlen);
299 
300  obslen = aim_bstream_curpos(&obs);
301  aim_bstream_rewind(&obs);
302  if (aim_bstream_send(&obs, fr->conn, obslen) != obslen)
303    err = -errno;
304 
305  free(obs_raw); /* XXX aim_bstream_free */
306 
307  fr->handled = 1;
308  fr->conn->lastactivity = time(NULL);
309 
310  return err;
311}
312
313static int sendframe_rendezvous(aim_session_t *sess, aim_frame_t *fr)
314{
315        aim_bstream_t bs;
316        fu8_t *bs_raw;
317        int err = 0;
318        int totlen = 8 + aim_bstream_curpos(&fr->data);
319
320        if (!(bs_raw = malloc(totlen)))
321                return -1;
322
323        aim_bstream_init(&bs, bs_raw, totlen);
324
325        aimbs_putraw(&bs, fr->hdr.rend.magic, 4);
326        aimbs_put16(&bs, 8 + fr->hdr.rend.hdrlen);
327        aimbs_put16(&bs, fr->hdr.rend.type);
328
329        /* payload */
330        aim_bstream_rewind(&fr->data);
331        aimbs_putbs(&bs, &fr->data, totlen - 8);
332
333        aim_bstream_rewind(&bs);
334
335        if (aim_bstream_send(&bs, fr->conn, totlen) != totlen)
336                err = -errno;
337
338        free(bs_raw); /* XXX aim_bstream_free */
339
340        fr->handled = 1;
341        fr->conn->lastactivity = time(NULL);
342
343        return err;
344}
345
346faim_internal int aim_tx_sendframe(aim_session_t *sess, aim_frame_t *fr)
347{
348        if (fr->hdrtype == AIM_FRAMETYPE_FLAP)
349                return sendframe_flap(sess, fr);
350        else if (fr->hdrtype == AIM_FRAMETYPE_OFT)
351                return sendframe_rendezvous(sess, fr);
352        return -1;
353}
354
355faim_export int aim_tx_flushqueue(aim_session_t *sess)
356{
357        aim_frame_t *cur;
358
359        for (cur = sess->queue_outgoing; cur; cur = cur->next) {
360
361                if (cur->handled)
362                        continue; /* already been sent */
363
364                if (cur->conn && (cur->conn->status & AIM_CONN_STATUS_INPROGRESS))
365                        continue;
366
367                /*
368                 * And now for the meager attempt to force transmit
369                 * latency and avoid missed messages.
370                 */
371                if ((cur->conn->lastactivity + cur->conn->forcedlatency) >= time(NULL)) {
372                        /*
373                         * XXX should be a break! we dont want to block the
374                         * upper layers
375                         *
376                         * XXX or better, just do this right.
377                         *
378                         */
379                        sleep((cur->conn->lastactivity + cur->conn->forcedlatency) - time(NULL));
380                }
381
382                /* XXX this should call the custom "queuing" function!! */
383                aim_tx_sendframe(sess, cur);
384        }
385
386        /* purge sent commands from queue */
387        aim_tx_purgequeue(sess);
388
389        return 0;
390}
391
392/*
393 *  aim_tx_purgequeue()
394 * 
395 *  This is responsable for removing sent commands from the transmit
396 *  queue. This is not a required operation, but it of course helps
397 *  reduce memory footprint at run time! 
398 *
399 */
400faim_export void aim_tx_purgequeue(aim_session_t *sess)
401{
402        aim_frame_t *cur, **prev;
403
404        for (prev = &sess->queue_outgoing; (cur = *prev); ) {
405
406                if (cur->handled) {
407                        *prev = cur->next;
408
409                        aim_frame_destroy(cur);
410
411                } else
412                        prev = &cur->next;
413        }
414
415        return;
416}
417
418/**
419 * aim_tx_cleanqueue - get rid of packets waiting for tx on a dying conn
420 * @sess: session
421 * @conn: connection that's dying
422 *
423 * for now this simply marks all packets as sent and lets them
424 * disappear without warning.
425 *
426 */
427faim_internal void aim_tx_cleanqueue(aim_session_t *sess, aim_conn_t *conn)
428{
429        aim_frame_t *cur;
430
431        for (cur = sess->queue_outgoing; cur; cur = cur->next) {
432                if (cur->conn == conn)
433                        cur->handled = 1;
434        }
435
436        return;
437}
Note: See TracBrowser for help on using the repository browser.