Mercurial > pidgin.yaz
diff src/protocols/oscar/txqueue.c @ 2246:933346315b9b
[gaim-migrate @ 2256]
heh.
committer: Tailor Script <tailor@pidgin.im>
author | Eric Warmenhoven <eric@warmenhoven.org> |
---|---|
date | Sun, 09 Sep 2001 10:07:14 +0000 |
parents | 424a40f12a6c |
children | d82efea341ef |
line wrap: on
line diff
--- a/src/protocols/oscar/txqueue.c Sun Sep 09 06:33:54 2001 +0000 +++ b/src/protocols/oscar/txqueue.c Sun Sep 09 10:07:14 2001 +0000 @@ -20,60 +20,65 @@ * Right now, that is. If/when we implement a pool of transmit * frames, this will become the request-an-unused-frame part. * - * framing = AIM_FRAMETYPE_OFT/OSCAR - * chan = channel for OSCAR, hdrtype for OFT + * framing = AIM_FRAMETYPE_OFT/FLAP + * chan = channel for FLAP, hdrtype for OFT * */ -faim_internal struct command_tx_struct *aim_tx_new(struct aim_session_t *sess, - struct aim_conn_t *conn, - unsigned char framing, - int chan, - int datalen) +faim_internal aim_frame_t *aim_tx_new(aim_session_t *sess, aim_conn_t *conn, fu8_t framing, fu8_t chan, int datalen) { - struct command_tx_struct *newtx; + aim_frame_t *fr; + + if (!conn) { + faimdprintf(sess, 0, "aim_tx_new: ERROR: no connection specified\n"); + return NULL; + } - if (!conn) { - faimdprintf(sess, 0, "aim_tx_new: ERROR: no connection specified\n"); - return NULL; - } + /* For sanity... */ + if ((conn->type == AIM_CONN_TYPE_RENDEZVOUS) || + (conn->type == AIM_CONN_TYPE_RENDEZVOUS_OUT)) { + if (framing != AIM_FRAMETYPE_OFT) { + faimdprintf(sess, 0, "aim_tx_new: attempted to allocate inappropriate frame type for rendezvous connection\n"); + return NULL; + } + } else { + if (framing != AIM_FRAMETYPE_FLAP) { + faimdprintf(sess, 0, "aim_tx_new: attempted to allocate inappropriate frame type for FLAP connection\n"); + return NULL; + } + } + + if (!(fr = (aim_frame_t *)malloc(sizeof(aim_frame_t)))) + return NULL; + memset(fr, 0, sizeof(aim_frame_t)); - /* For sanity... */ - if ((conn->type == AIM_CONN_TYPE_RENDEZVOUS) || (conn->type == AIM_CONN_TYPE_RENDEZVOUS_OUT)) { - if (framing != AIM_FRAMETYPE_OFT) { - faimdprintf(sess, 0, "aim_tx_new: attempted to allocate inappropriate frame type for rendezvous connection\n"); - return NULL; - } - } else { - if (framing != AIM_FRAMETYPE_OSCAR) { - faimdprintf(sess, 0, "aim_tx_new: attempted to allocate inappropriate frame type for FLAP connection\n"); - return NULL; - } - } + fr->conn = conn; + + fr->hdrtype = framing; + + if (fr->hdrtype == AIM_FRAMETYPE_FLAP) { - newtx = (struct command_tx_struct *)malloc(sizeof(struct command_tx_struct)); - if (!newtx) - return NULL; - memset(newtx, 0, sizeof(struct command_tx_struct)); + fr->hdr.flap.type = chan; + + } else if (fr->hdrtype == AIM_FRAMETYPE_OFT) { + + fr->hdr.oft.type = chan; + fr->hdr.oft.hdr2len = 0; /* this will get setup by caller */ - newtx->conn = conn; + } else + faimdprintf(sess, 0, "tx_new: unknown framing\n"); - if(datalen) { - newtx->data = (unsigned char *)malloc(datalen); - newtx->commandlen = datalen; - } else - newtx->data = NULL; + if (datalen > 0) { + fu8_t *data; - newtx->hdrtype = framing; - if (newtx->hdrtype == AIM_FRAMETYPE_OSCAR) { - newtx->hdr.oscar.type = chan; - } else if (newtx->hdrtype == AIM_FRAMETYPE_OFT) { - newtx->hdr.oft.type = chan; - newtx->hdr.oft.hdr2len = 0; /* this will get setup by caller */ - } else { - faimdprintf(sess, 0, "tx_new: unknown framing\n"); - } + if (!(data = (unsigned char *)malloc(datalen))) { + aim_frame_destroy(fr); + return NULL; + } - return newtx; + aim_bstream_init(&fr->data, data, datalen); + } + + return fr; } /* @@ -82,48 +87,41 @@ * The overall purpose here is to enqueue the passed in command struct * into the outgoing (tx) queue. Basically... * 1) Make a scope-irrelevent copy of the struct - * 2) Lock the struct * 3) Mark as not-sent-yet * 4) Enqueue the struct into the list - * 5) Unlock the struct once it's linked in * 6) Return * * Note that this is only used when doing queue-based transmitting; * that is, when sess->tx_enqueue is set to &aim_tx_enqueue__queuebased. * */ -static int aim_tx_enqueue__queuebased(struct aim_session_t *sess, struct command_tx_struct *newpacket) +static int aim_tx_enqueue__queuebased(aim_session_t *sess, aim_frame_t *fr) { - struct command_tx_struct *cur; + + if (!fr->conn) { + faimdprintf(sess, 1, "aim_tx_enqueue: WARNING: enqueueing packet with no connecetion\n"); + fr->conn = aim_getconn_type(sess, AIM_CONN_TYPE_BOS); + } + + if (fr->hdrtype == AIM_FRAMETYPE_FLAP) { + /* assign seqnum -- XXX should really not assign until hardxmit */ + fr->hdr.flap.seqnum = aim_get_next_txseqnum(fr->conn); + } - if (newpacket->conn == NULL) { - faimdprintf(sess, 1, "aim_tx_enqueue: WARNING: enqueueing packet with no connecetion\n"); - newpacket->conn = aim_getconn_type(sess, AIM_CONN_TYPE_BOS); - } - - if (newpacket->hdrtype == AIM_FRAMETYPE_OSCAR) { - /* assign seqnum */ - newpacket->hdr.oscar.seqnum = aim_get_next_txseqnum(newpacket->conn); - } - /* set some more fields */ - newpacket->lock = 1; /* lock */ - newpacket->sent = 0; /* not sent yet */ - newpacket->next = NULL; /* always last */ + fr->handled = 0; /* not sent yet */ - /* see overhead note in aim_rxqueue counterpart */ - if (sess->queue_outgoing == NULL) { - sess->queue_outgoing = newpacket; - } else { - for (cur = sess->queue_outgoing; - cur->next; - cur = cur->next) - ; - cur->next = newpacket; - } + /* see overhead note in aim_rxqueue counterpart */ + if (!sess->queue_outgoing) + sess->queue_outgoing = fr; + else { + aim_frame_t *cur; - newpacket->lock = 0; /* unlock so it can be sent */ + for (cur = sess->queue_outgoing; cur->next; cur = cur->next) + ; + cur->next = fr; + } - return 0; + return 0; } /* @@ -137,63 +135,58 @@ * right here. * */ -static int aim_tx_enqueue__immediate(struct aim_session_t *sess, struct command_tx_struct *newpacket) +static int aim_tx_enqueue__immediate(aim_session_t *sess, aim_frame_t *fr) { - if (newpacket->conn == NULL) { - faimdprintf(sess, 1, "aim_tx_enqueue: ERROR: packet has no connection\n"); - if (newpacket->data) - free(newpacket->data); - free(newpacket); - return -1; - } + + if (!fr->conn) { + faimdprintf(sess, 1, "aim_tx_enqueue: ERROR: packet has no connection\n"); + aim_frame_destroy(fr); + return 0; + } - if (newpacket->hdrtype == AIM_FRAMETYPE_OSCAR) - newpacket->hdr.oscar.seqnum = aim_get_next_txseqnum(newpacket->conn); + if (fr->hdrtype == AIM_FRAMETYPE_FLAP) + fr->hdr.flap.seqnum = aim_get_next_txseqnum(fr->conn); - newpacket->lock = 1; /* lock */ - newpacket->sent = 0; /* not sent yet */ + fr->handled = 0; /* not sent yet */ - aim_tx_sendframe(sess, newpacket); + aim_tx_sendframe(sess, fr); - if (newpacket->data) - free(newpacket->data); - free(newpacket); + aim_frame_destroy(fr); - return 0; + return 0; } -faim_export int aim_tx_setenqueue(struct aim_session_t *sess, - int what, - int (*func)(struct aim_session_t *, struct command_tx_struct *)) +faim_export int aim_tx_setenqueue(aim_session_t *sess, int what, int (*func)(aim_session_t *, aim_frame_t *)) { - if (!sess) - return -1; + + if (what == AIM_TX_QUEUED) + sess->tx_enqueue = &aim_tx_enqueue__queuebased; + else if (what == AIM_TX_IMMEDIATE) + sess->tx_enqueue = &aim_tx_enqueue__immediate; + else if (what == AIM_TX_USER) { + if (!func) + return -EINVAL; + sess->tx_enqueue = func; + } else + return -EINVAL; /* unknown action */ - if (what == AIM_TX_QUEUED) - sess->tx_enqueue = &aim_tx_enqueue__queuebased; - else if (what == AIM_TX_IMMEDIATE) - sess->tx_enqueue = &aim_tx_enqueue__immediate; - else if (what == AIM_TX_USER) { - if (!func) - return -1; - sess->tx_enqueue = func; - } else - return -1; /* unknown action */ - - return 0; + return 0; } -faim_internal int aim_tx_enqueue(struct aim_session_t *sess, struct command_tx_struct *command) +faim_internal int aim_tx_enqueue(aim_session_t *sess, aim_frame_t *fr) { - /* - * If we want to send a connection thats inprogress, we have to force - * them to use the queue based version. Otherwise, use whatever they - * want. - */ - if (command && command->conn && (command->conn->status & AIM_CONN_STATUS_INPROGRESS)) { - return aim_tx_enqueue__queuebased(sess, command); - } - return (*sess->tx_enqueue)(sess, command); + + /* + * If we want to send a connection thats inprogress, we have to force + * them to use the queue based version. Otherwise, use whatever they + * want. + */ + if (fr && fr->conn && + (fr->conn->status & AIM_CONN_STATUS_INPROGRESS)) { + return aim_tx_enqueue__queuebased(sess, fr); + } + + return (*sess->tx_enqueue)(sess, fr); } /* @@ -205,184 +198,194 @@ * before enqueuement (in aim_tx_enqueue()). * */ -faim_internal unsigned int aim_get_next_txseqnum(struct aim_conn_t *conn) +faim_internal flap_seqnum_t aim_get_next_txseqnum(aim_conn_t *conn) +{ + flap_seqnum_t ret; + + faim_mutex_lock(&conn->seqnum_lock); + ret = ++conn->seqnum; + faim_mutex_unlock(&conn->seqnum_lock); + + return ret; +} + +static int aim_send(int fd, const void *buf, size_t count) { - u_int ret; - - faim_mutex_lock(&conn->seqnum_lock); - ret = ++conn->seqnum; - faim_mutex_unlock(&conn->seqnum_lock); - return ret; + int left, cur; + + for (cur = 0, left = count; left; ) { + int ret; + + ret = send(fd, ((unsigned char *)buf)+cur, left, 0); + if (ret == -1) + return -1; + else if (ret == 0) + return cur; + + cur += ret; + left -= ret; + } + + return cur; +} + +static int aim_bstream_send(aim_bstream_t *bs, aim_conn_t *conn, size_t count) +{ + int wrote = 0; + + if (!bs || !conn || (count < 0)) + return -EINVAL; + + if (count > aim_bstream_empty(bs)) + count = aim_bstream_empty(bs); /* truncate to remaining space */ + + if (count) + wrote = aim_send(conn->fd, bs->data + bs->offset, count); + + if (((aim_session_t *)conn->sessv)->debug >= 2) { + int i; + aim_session_t *sess = (aim_session_t *)conn->sessv; + + faimdprintf(sess, 2, "\nOutgoing data: (%d bytes)", wrote); + for (i = 0; i < wrote; i++) { + if (!(i % 8)) + faimdprintf(sess, 2, "\n\t"); + faimdprintf(sess, 2, "0x%02x ", *(bs->data + bs->offset + i)); + } + faimdprintf(sess, 2, "\n"); + } + + + bs->offset += wrote; + + return wrote; } -/* - * aim_tx_flushqueue() - * - * This the function is responsable for putting the queued commands - * onto the wire. This function is critical to the operation of - * the queue and therefore is the most prone to brokenness. It - * seems to be working quite well at this point. - * - * Procedure: - * 1) Traverse the list, only operate on commands that are unlocked - * and haven't been sent yet. - * 2) Lock the struct - * 3) Allocate a temporary buffer to store the finished, fully - * processed packet in. - * 4) Build the packet from the command_tx_struct data. - * 5) Write the packet to the socket. - * 6) If success, mark the packet sent, if fail report failure, do NOT - * mark the packet sent (so it will not get purged and therefore - * be attempted again on next call). - * 7) Unlock the struct. - * 8) Free the temp buffer - * 9) Step to next struct in list and go back to 1. - * - */ -faim_internal int aim_tx_sendframe(struct aim_session_t *sess, struct command_tx_struct *cur) +static int sendframe_flap(aim_session_t *sess, aim_frame_t *fr) { - int buflen = 0; - unsigned char *curPacket; - - if (!cur) - return -1; /* fatal */ + aim_bstream_t obs; + fu8_t *obs_raw; + int payloadlen, err = 0, obslen; - cur->lock = 1; /* lock the struct */ - - if (cur->hdrtype == AIM_FRAMETYPE_OSCAR) - buflen = cur->commandlen + 6; - else if (cur->hdrtype == AIM_FRAMETYPE_OFT) - buflen = cur->hdr.oft.hdr2len + 8; - else { - cur->lock = 0; - return -1; - } + payloadlen = aim_bstream_curpos(&fr->data); - /* allocate full-packet buffer */ - if (!(curPacket = (unsigned char *) malloc(buflen))) { - cur->lock = 0; - return -1; - } - - if (cur->hdrtype == AIM_FRAMETYPE_OSCAR) { - /* command byte */ - curPacket[0] = 0x2a; - - /* type/family byte */ - curPacket[1] = cur->hdr.oscar.type; - - /* bytes 3+4: word: FLAP sequence number */ - aimutil_put16(curPacket+2, cur->hdr.oscar.seqnum); + if (!(obs_raw = malloc(6 + payloadlen))) + return -ENOMEM; - /* bytes 5+6: word: SNAC len */ - aimutil_put16(curPacket+4, cur->commandlen); - - /* bytes 7 and on: raw: SNAC data */ /* XXX: ye gods! get rid of this! */ - memcpy(&(curPacket[6]), cur->data, cur->commandlen); + aim_bstream_init(&obs, obs_raw, 6 + payloadlen); - } else if (cur->hdrtype == AIM_FRAMETYPE_OFT) { - int z = 0; - - z += aimutil_put8(curPacket+z, cur->hdr.oft.magic[0]); - z += aimutil_put8(curPacket+z, cur->hdr.oft.magic[1]); - z += aimutil_put8(curPacket+z, cur->hdr.oft.magic[2]); - z += aimutil_put8(curPacket+z, cur->hdr.oft.magic[3]); - - z += aimutil_put16(curPacket+z, cur->hdr.oft.hdr2len + 8); - z += aimutil_put16(curPacket+z, cur->hdr.oft.type); - - memcpy(curPacket+z, cur->hdr.oft.hdr2, cur->hdr.oft.hdr2len); - } + /* FLAP header */ + aimbs_put8(&obs, 0x2a); + aimbs_put8(&obs, fr->hdr.flap.type); + aimbs_put16(&obs, fr->hdr.flap.seqnum); + aimbs_put16(&obs, payloadlen); - /* - * For OSCAR, a full image of the raw packet data now in curPacket. - * For OFT, an image of just the bloated header is in curPacket, - * since OFT allows us to do the data in a different write (yay!). - */ - faim_mutex_lock(&cur->conn->active); - if (send(cur->conn->fd, curPacket, buflen, 0) != buflen) { - faim_mutex_unlock(&cur->conn->active); - cur->sent = 1; - aim_conn_close(cur->conn); - return 0; /* bail out */ - } - - if ((cur->hdrtype == AIM_FRAMETYPE_OFT) && cur->commandlen) { - int curposi; - for(curposi = 0; curposi < cur->commandlen; curposi++) - faimdprintf(sess, 0, "%02x ", cur->data[curposi]); + /* payload */ + aim_bstream_rewind(&fr->data); + aimbs_putbs(&obs, &fr->data, payloadlen); - if (send(cur->conn->fd, cur->data, cur->commandlen, 0) != (int)cur->commandlen) { - /* - * Theres nothing we can do about this since we've already sent the - * header! The connection is unstable. - */ - faim_mutex_unlock(&cur->conn->active); - cur->sent = 1; - aim_conn_close(cur->conn); - return 0; /* bail out */ - } - - } - - cur->sent = 1; /* mark the struct as sent */ - cur->conn->lastactivity = time(NULL); - - faim_mutex_unlock(&cur->conn->active); + obslen = aim_bstream_curpos(&obs); + aim_bstream_rewind(&obs); - if (sess->debug >= 2) { - int i; + if (aim_bstream_send(&obs, fr->conn, obslen) != obslen) + err = -errno; + + free(obs_raw); /* XXX aim_bstream_free */ - faimdprintf(sess, 2, "\nOutgoing packet: (only valid for OSCAR)"); - for (i = 0; i < buflen; i++) { - if (!(i % 8)) - faimdprintf(sess, 2, "\n\t"); - faimdprintf(sess, 2, "0x%02x ", curPacket[i]); - } - faimdprintf(sess, 2, "\n"); - } + fr->handled = 1; + fr->conn->lastactivity = time(NULL); - cur->lock = 0; /* unlock the struct */ - - free(curPacket); /* free up full-packet buffer */ - - return 1; /* success */ + return err; } -faim_export int aim_tx_flushqueue(struct aim_session_t *sess) +static int sendframe_oft(aim_session_t *sess, aim_frame_t *fr) { - struct command_tx_struct *cur; - - if (sess->queue_outgoing == NULL) - return 0; + aim_bstream_t hbs; + fu8_t *hbs_raw; + int hbslen; + int err = 0; + + hbslen = 8 + fr->hdr.oft.hdr2len; + + if (!(hbs_raw = malloc(hbslen))) + return -1; + + aim_bstream_init(&hbs, hbs_raw, hbslen); + + aimbs_putraw(&hbs, fr->hdr.oft.magic, 4); + aimbs_put16(&hbs, fr->hdr.oft.hdr2len + 8); + aimbs_put16(&hbs, fr->hdr.oft.type); + aimbs_putraw(&hbs, fr->hdr.oft.hdr2, fr->hdr.oft.hdr2len); - faimdprintf(sess, 2, "beginning txflush...\n"); - for (cur = sess->queue_outgoing; cur; cur = cur->next) { - /* only process if its unlocked and unsent */ - if (!cur->lock && !cur->sent) { + aim_bstream_rewind(&hbs); + + if (aim_bstream_send(&hbs, fr->conn, hbslen) != hbslen) { + + err = -errno; + + } else if (aim_bstream_curpos(&fr->data)) { + int len; - if (cur->conn && (cur->conn->status & AIM_CONN_STATUS_INPROGRESS)) - continue; + len = aim_bstream_curpos(&fr->data); + aim_bstream_rewind(&fr->data); + + if (aim_bstream_send(&fr->data, fr->conn, len) != len) + err = -errno; + } + + free(hbs_raw); /* XXX aim_bstream_free */ + + fr->handled = 1; + fr->conn->lastactivity = time(NULL); + + + return err; +} - /* - * And now for the meager attempt to force transmit - * latency and avoid missed messages. - */ - if ((cur->conn->lastactivity + cur->conn->forcedlatency) >= time(NULL)) { - /* FIXME FIXME -- should be a break! we dont want to block the upper layers */ - sleep((cur->conn->lastactivity + cur->conn->forcedlatency) - time(NULL)); - } +faim_internal int aim_tx_sendframe(aim_session_t *sess, aim_frame_t *fr) +{ + if (fr->hdrtype == AIM_FRAMETYPE_FLAP) + return sendframe_flap(sess, fr); + else if (fr->hdrtype == AIM_FRAMETYPE_OFT) + return sendframe_oft(sess, fr); + return -1; +} + +faim_export int aim_tx_flushqueue(aim_session_t *sess) +{ + aim_frame_t *cur; + + for (cur = sess->queue_outgoing; cur; cur = cur->next) { + + if (cur->handled) + continue; /* already been sent */ + + if (cur->conn && (cur->conn->status & AIM_CONN_STATUS_INPROGRESS)) + continue; - /* XXX XXX XXX this should call the custom "queuing" function!! */ - if (aim_tx_sendframe(sess, cur) == -1) - break; - } - } + /* + * And now for the meager attempt to force transmit + * latency and avoid missed messages. + */ + if ((cur->conn->lastactivity + cur->conn->forcedlatency) >= time(NULL)) { + /* + * XXX should be a break! we dont want to block the + * upper layers + * + * XXX or better, just do this right. + * + */ + sleep((cur->conn->lastactivity + cur->conn->forcedlatency) - time(NULL)); + } - /* purge sent commands from queue */ - aim_tx_purgequeue(sess); + /* XXX this should call the custom "queuing" function!! */ + aim_tx_sendframe(sess, cur); + } - return 0; + /* purge sent commands from queue */ + aim_tx_purgequeue(sess); + + return 0; } /* @@ -393,47 +396,22 @@ * reduce memory footprint at run time! * */ -faim_export void aim_tx_purgequeue(struct aim_session_t *sess) +faim_export void aim_tx_purgequeue(aim_session_t *sess) { - struct command_tx_struct *cur = NULL; - struct command_tx_struct *tmp; + aim_frame_t *cur, **prev; - if (sess->queue_outgoing == NULL) - return; - - if (sess->queue_outgoing->next == NULL) { - if (!sess->queue_outgoing->lock && sess->queue_outgoing->sent) { - tmp = sess->queue_outgoing; - sess->queue_outgoing = NULL; - if (tmp->hdrtype == AIM_FRAMETYPE_OFT) - free(tmp->hdr.oft.hdr2); - free(tmp->data); - free(tmp); - } - return; - } + for (prev = &sess->queue_outgoing; (cur = *prev); ) { - for(cur = sess->queue_outgoing; cur->next != NULL; ) { - if (!cur->next->lock && cur->next->sent) { - tmp = cur->next; - cur->next = tmp->next; - if (tmp->hdrtype == AIM_FRAMETYPE_OFT) - free(tmp->hdr.oft.hdr2); - free(tmp->data); - free(tmp); - } - cur = cur->next; + if (cur->handled) { + *prev = cur->next; + + aim_frame_destroy(cur); - /* - * Be careful here. Because of the way we just - * manipulated the pointer, cur may be NULL and - * the for() will segfault doing the check unless - * we find this case first. - */ - if (cur == NULL) - break; - } - return; + } else + prev = &cur->next; + } + + return; } /** @@ -444,22 +422,17 @@ * for now this simply marks all packets as sent and lets them * disappear without warning. * - * doesn't respect command_tx_struct locks. */ - -faim_export int aim_tx_cleanqueue(struct aim_session_t *sess, struct aim_conn_t *conn) +faim_export void aim_tx_cleanqueue(aim_session_t *sess, aim_conn_t *conn) { - struct command_tx_struct *cur = NULL; - - if(!sess || !conn) - return -1; + aim_frame_t *cur; - /* we don't respect locks here */ - for(cur = sess->queue_outgoing; cur; cur = cur->next) - if(cur->conn == conn) - cur->sent = 1; - - return 0; + for (cur = sess->queue_outgoing; cur; cur = cur->next) { + if (cur->conn == conn) + cur->handled = 1; + } + + return; } - - + +