# HG changeset patch # User Daniel Atallah # Date 1181327063 0 # Node ID 6e4e2d234c3a88431397320af97c841b9c0ebe05 # Parent 2af1f8ccd396aa364c01226540783f2673e9f38b Update Bonjour to do nonblocking I/O correctly. This also includes a number of error handling bugfixes (and various other improvements). This doesn't handle the scenario where a partial message is read - I need to figure out how libxml2 handles such a scenario to fix it correctly. There are also also a few quirks that I noticed and didn't get around to fixing: -We don't wait for a "" from the peer before closing the socket. -We don't make sure that the peer has sent us the stream start message before starting. diff -r 2af1f8ccd396 -r 6e4e2d234c3a libpurple/protocols/bonjour/jabber.c --- a/libpurple/protocols/bonjour/jabber.c Fri Jun 08 15:37:48 2007 +0000 +++ b/libpurple/protocols/bonjour/jabber.c Fri Jun 08 18:24:23 2007 +0000 @@ -49,40 +49,6 @@ #define DOCTYPE "\n" \ "" -static gint -_connect_to_buddy(PurpleBuddy *gb) -{ - gint socket_fd; - struct sockaddr_in buddy_address; - BonjourBuddy *bb = gb->proto_data; - - purple_debug_info("bonjour", "Connecting to buddy %s at %s:%d.\n", - purple_buddy_get_name(gb), bb->ip ? bb->ip : "(null)", bb->port_p2pj); - - /* Create a socket and make it non-blocking */ - socket_fd = socket(PF_INET, SOCK_STREAM, 0); - if (socket_fd < 0) { - purple_debug_warning("bonjour", "Error opening socket: %s\n", strerror(errno)); - return -1; - } - - buddy_address.sin_family = PF_INET; - buddy_address.sin_port = htons(bb->port_p2pj); - inet_aton(bb->ip, &(buddy_address.sin_addr)); - memset(&(buddy_address.sin_zero), '\0', 8); - - /* TODO: make this nonblocking before connecting */ - if (connect(socket_fd, (struct sockaddr*)&buddy_address, sizeof(struct sockaddr)) == 0) - fcntl(socket_fd, F_SETFL, O_NONBLOCK); - else { - purple_debug_warning("bonjour", "Error connecting to buddy %s at %s:%d error: %s\n", purple_buddy_get_name(gb), bb->ip ? bb->ip : "(null)", bb->port_p2pj, strerror(errno)); - close(socket_fd); - socket_fd = -1; - } - - return socket_fd; -} - #if 0 /* this isn't used anywhere... */ static const char * _font_size_purple_to_ichat(int size) @@ -113,11 +79,14 @@ BonjourJabberConversation *bconv = g_new0(BonjourJabberConversation, 1); bconv->socket = -1; - bconv->watcher_id = -1; + bconv->tx_buf = purple_circ_buffer_new(512); + bconv->tx_handler = -1; + bconv->rx_handler = -1; return bconv; } + static const char * _font_size_ichat_to_purple(int size) { @@ -139,7 +108,7 @@ } static void -_jabber_parse_and_write_message_to_ui(xmlnode *message_node, PurpleConnection *connection, PurpleBuddy *gb) +_jabber_parse_and_write_message_to_ui(xmlnode *message_node, PurpleConnection *connection, PurpleBuddy *pb) { xmlnode *body_node, *html_node, *events_node; char *body, *html_body = NULL; @@ -188,9 +157,7 @@ if (events_node != NULL) { if (xmlnode_get_child(events_node, "composing") != NULL) - { composing_event = TRUE; - } if (xmlnode_get_child(events_node, "id") != NULL) { /* The user is just typing */ @@ -218,7 +185,7 @@ /* TODO: Should we do something with "composing_event" here? */ /* Send the message to the UI */ - serv_got_im(connection, gb->name, body, 0, time(NULL)); + serv_got_im(connection, pb->name, body, 0, time(NULL)); g_free(body); g_free(html_body); @@ -281,24 +248,96 @@ return total_message_length; } -static gint -_send_data(gint socket, char *message) +static void +_send_data_write_cb(gpointer data, gint source, PurpleInputCondition cond) { - gint message_len = strlen(message); - gint partial_sent = 0; - gchar *partial_message = message; + PurpleBuddy *pb = data; + BonjourBuddy *bb = pb->proto_data; + BonjourJabberConversation *bconv = bb->conversation; + int ret, writelen; + + /* TODO: Make sure that the stream has been established before sending */ + + writelen = purple_circ_buffer_get_max_read(bconv->tx_buf); + + if (writelen == 0) { + purple_input_remove(bconv->tx_handler); + bconv->tx_handler = -1; + return; + } - while ((partial_sent = send(socket, partial_message, message_len, 0)) < message_len) - { - if (partial_sent != -1) { - partial_message += partial_sent; - message_len -= partial_sent; - } else { - return -1; - } + ret = send(bconv->socket, bconv->tx_buf->outptr, writelen, 0); + + if (ret < 0 && errno == EAGAIN) + return; + else if (ret <= 0) { + PurpleConversation *conv; + const char *error = strerror(errno); + + purple_debug_error("bonjour", "Error sending message to buddy %s error: %s\n", + purple_buddy_get_name(pb), error ? error : "(null)"); + + conv = purple_find_conversation_with_account(PURPLE_CONV_TYPE_IM, bb->name, pb->account); + if (conv != NULL) + purple_conversation_write(conv, NULL, + _("Unable to send message."), + PURPLE_MESSAGE_SYSTEM, time(NULL)); + + bonjour_jabber_close_conversation(bb->conversation); + bb->conversation = NULL; + return; } - return strlen(message); + purple_circ_buffer_mark_read(bconv->tx_buf, ret); +} + +static gint +_send_data(PurpleBuddy *pb, char *message) +{ + gint ret; + int len = strlen(message); + BonjourBuddy *bb = pb->proto_data; + BonjourJabberConversation *bconv = bb->conversation; + + /* If we're not ready to actually send, append it to the buffer */ + if (bconv->tx_handler != -1 + || bconv->connect_data != NULL + || !bconv->stream_started + || purple_circ_buffer_get_max_read(bconv->tx_buf) > 0) { + ret = -1; + errno = EAGAIN; + } else { + ret = send(bconv->socket, message, len, 0); + } + + if (ret == -1 && errno == EAGAIN) + ret = 0; + else if (ret <= 0) { + PurpleConversation *conv; + const char *error = strerror(errno); + + purple_debug_error("bonjour", "Error sending message to buddy %s error: %s\n", + purple_buddy_get_name(pb), error ? error : "(null)"); + + conv = purple_find_conversation_with_account(PURPLE_CONV_TYPE_IM, bb->name, pb->account); + if (conv != NULL) + purple_conversation_write(conv, NULL, + _("Unable to send message."), + PURPLE_MESSAGE_SYSTEM, time(NULL)); + + bonjour_jabber_close_conversation(bb->conversation); + bb->conversation = NULL; + return -1; + } + + if (ret < len) { + if (bconv->tx_handler == -1) + bconv->tx_handler = purple_input_add(bconv->socket, PURPLE_INPUT_WRITE, + _send_data_write_cb, pb); + purple_circ_buffer_append(bconv->tx_buf, message + ret, len - ret); + } + + return ret; } static void @@ -308,7 +347,6 @@ gint message_length; PurpleBuddy *pb = data; PurpleAccount *account = pb->account; - PurpleConversation *conversation; BonjourBuddy *bb = pb->proto_data; gboolean closed_conversation = FALSE; xmlnode *message_node; @@ -316,7 +354,13 @@ /* Read the data from the socket */ if ((message_length = _read_data(socket, &message)) == -1) { /* There have been an error reading from the socket */ - /* TODO: Shouldn't we handle the error if it isn't EAGAIN? */ + if (errno != EAGAIN) { + bonjour_jabber_close_conversation(bb->conversation); + bb->conversation = NULL; + + /* I guess we really don't need to notify the user. + * If they try to send another message it'll reconnect */ + } return; } else if (message_length == 0) { /* The other end has closed the socket */ closed_conversation = TRUE; @@ -332,49 +376,25 @@ /* Parse the message into an XMLnode for analysis */ message_node = xmlnode_from_str(message, strlen(message)); - /* Check if the start of the stream has been received, if not check that the current */ - /* data is the start of the stream */ - if (!(bb->conversation->stream_started)) - { - /* Check if this is the start of the stream */ - if ((message_node != NULL) && - g_ascii_strcasecmp(xmlnode_get_attrib(message_node, "xmlns"), "jabber:client") && - (xmlnode_get_attrib(message_node,"xmlns:stream") != NULL)) - { - bb->conversation->stream_started = TRUE; - } - else - { - char *stream_start = g_strdup_printf(DOCTYPE, purple_account_get_username(pb->account), - purple_buddy_get_name(pb)); - - /* TODO: This needs to be nonblocking! */ - if (send(bb->conversation->socket, stream_start, strlen(stream_start), 0) == -1) - purple_debug_error("bonjour", "Unable to start a conversation with %s\n", bb->name); - else - bb->conversation->stream_started = TRUE; - - g_free(stream_start); - } - } - /* * Check that this is not the end of the conversation. This is * using a magic string, but xmlnode won't play nice when just * parsing an end tag */ if (closed_conversation || purple_str_has_prefix(message, STREAM_END)) { - char *closed_conv_message; + PurpleConversation *conv; /* Close the socket, clear the watcher and free memory */ bonjour_jabber_close_conversation(bb->conversation); bb->conversation = NULL; /* Inform the user that the conversation has been closed */ - conversation = purple_find_conversation_with_account(PURPLE_CONV_TYPE_IM, pb->name, account); - closed_conv_message = g_strdup_printf(_("%s has closed the conversation."), pb->name); - purple_conversation_write(conversation, NULL, closed_conv_message, PURPLE_MESSAGE_SYSTEM, time(NULL)); - g_free(closed_conv_message); + conv = purple_find_conversation_with_account(PURPLE_CONV_TYPE_IM, pb->name, account); + if (conv != NULL) { + char *tmp = g_strdup_printf(_("%s has closed the conversation."), pb->name); + purple_conversation_write(conv, NULL, tmp, PURPLE_MESSAGE_SYSTEM, time(NULL)); + g_free(tmp); + } } else if (message_node != NULL) { /* Parse the message to get the data and send to the ui */ _jabber_parse_and_write_message_to_ui(message_node, account->gc, pb); @@ -387,6 +407,71 @@ xmlnode_free(message_node); } +struct _stream_start_data { + char *msg; + PurpleInputFunction tx_handler_cb; +}; + +static void +_start_stream(gpointer data, gint source, PurpleInputCondition condition) +{ + PurpleBuddy *pb = data; + BonjourBuddy *bb = pb->proto_data; + struct _stream_start_data *ss = bb->conversation->stream_data; + int len, ret; + + len = strlen(ss->msg); + + /* Start Stream */ + ret = send(source, ss->msg, len, 0); + + if (ret == -1 && errno == EAGAIN) + return; + else if (ret <= 0) { + const char *err = strerror(errno); + PurpleConversation *conv; + + purple_debug_error("bonjour", "Error starting stream with buddy %s at %s:%d error: %s\n", + purple_buddy_get_name(pb), bb->ip ? bb->ip : "(null)", bb->port_p2pj, err ? err : "(null)"); + + conv = purple_find_conversation_with_account(PURPLE_CONV_TYPE_IM, bb->name, pb->account); + if (conv != NULL) + purple_conversation_write(conv, NULL, + _("Unable to send the message, the conversation couldn't be started."), + PURPLE_MESSAGE_SYSTEM, time(NULL)); + + bonjour_jabber_close_conversation(bb->conversation); + bb->conversation = NULL; + + return; + } + + /* This is EXTREMELY unlikely to happen */ + if (ret < len) { + char *tmp = g_strdup(ss->msg + ret); + g_free(ss->msg); + ss->msg = tmp; + return; + } + + /* Stream started; process the send buffer if there is one*/ + purple_input_remove(bb->conversation->tx_handler); + bb->conversation->tx_handler= -1; + + bb->conversation->stream_started = TRUE; + + g_free(ss->msg); + g_free(ss); + bb->conversation->stream_data = NULL; + + if (ss->tx_handler_cb) { + bb->conversation->tx_handler = purple_input_add(source, PURPLE_INPUT_WRITE, + ss->tx_handler_cb, pb); + /* We can probably write the data now. */ + (ss->tx_handler_cb)(pb, source, PURPLE_INPUT_WRITE); + } +} + static void _server_socket_handler(gpointer data, int server_socket, PurpleInputCondition condition) { @@ -427,21 +512,48 @@ /* Check if the conversation has been previously started */ if (bb->conversation == NULL) { + int ret, len; + char *stream_start = g_strdup_printf(DOCTYPE, purple_account_get_username(pb->account), + purple_buddy_get_name(pb)); + + len = strlen(stream_start); + + /* Start the stream */ + ret = send(client_socket, stream_start, len, 0); + + if (ret == -1 && errno == EAGAIN) + ret = 0; + else if (ret <= 0) { + const char *err = strerror(errno); + + purple_debug_error("bonjour", "Error starting stream with buddy %s at %s:%d error: %s\n", + purple_buddy_get_name(pb), bb->ip ? bb->ip : "(null)", bb->port_p2pj, err ? err : "(null)"); + + close(client_socket); + g_free(stream_start); + + return; + } + bb->conversation = bonjour_jabber_conv_new(); bb->conversation->socket = client_socket; + bb->conversation->rx_handler = purple_input_add(client_socket, + PURPLE_INPUT_READ, _client_socket_handler, pb); - if (bb->conversation->stream_started == FALSE) { - char *stream_start = g_strdup_printf(DOCTYPE, purple_account_get_username(pb->account), - purple_buddy_get_name(pb)); - /* Start the stream */ - send(bb->conversation->socket, stream_start, strlen(stream_start), 0); + /* This is unlikely to happen */ + if (ret < len) { + struct _stream_start_data *ss = g_new(struct _stream_start_data, 1); + ss->msg = g_strdup(stream_start + ret); + ss->tx_handler_cb = NULL; /* We have nothing to write yet */ + bb->conversation->stream_data = ss; + /* Finish sending the stream start */ + bb->conversation->tx_handler = purple_input_add(client_socket, + PURPLE_INPUT_WRITE, _start_stream, pb); + } else { bb->conversation->stream_started = TRUE; - g_free(stream_start); } - /* Open a watcher for the client socket */ - bb->conversation->watcher_id = purple_input_add(client_socket, PURPLE_INPUT_READ, - _client_socket_handler, pb); + g_free(stream_start); } else { close(client_socket); } @@ -519,6 +631,89 @@ return data->port; } +static void +_connected_to_buddy(gpointer data, gint source, const gchar *error) +{ + PurpleBuddy *pb = data; + BonjourBuddy *bb = pb->proto_data; + int len, ret; + char *stream_start = g_strdup_printf(DOCTYPE, purple_account_get_username(pb->account), purple_buddy_get_name(pb)); + + bb->conversation->connect_data = NULL; + + if (source < 0) { + PurpleConversation *conv; + + purple_debug_error("bonjour", "Error connecting to buddy %s at %s:%d error: %s\n", + purple_buddy_get_name(pb), bb->ip ? bb->ip : "(null)", bb->port_p2pj, error ? error : "(null)"); + + conv = purple_find_conversation_with_account(PURPLE_CONV_TYPE_IM, bb->name, pb->account); + if (conv != NULL) + purple_conversation_write(conv, NULL, + _("Unable to send the message, the conversation couldn't be started."), + PURPLE_MESSAGE_SYSTEM, time(NULL)); + + bonjour_jabber_close_conversation(bb->conversation); + bb->conversation = NULL; + return; + } + + len = strlen(stream_start); + + /* Start the stream and send queued messages */ + ret = send(source, stream_start, len, 0); + + if (ret == -1 && errno == EAGAIN) + ret = 0; + else if (ret <= 0) { + const char *err = strerror(errno); + PurpleConversation *conv; + + purple_debug_error("bonjour", "Error starting stream with buddy %s at %s:%d error: %s\n", + purple_buddy_get_name(pb), bb->ip ? bb->ip : "(null)", bb->port_p2pj, err ? err : "(null)"); + + conv = purple_find_conversation_with_account(PURPLE_CONV_TYPE_IM, bb->name, pb->account); + if (conv != NULL) + purple_conversation_write(conv, NULL, + _("Unable to send the message, the conversation couldn't be started."), + PURPLE_MESSAGE_SYSTEM, time(NULL)); + + close(source); + bonjour_jabber_close_conversation(bb->conversation); + bb->conversation = NULL; + + g_free(stream_start); + + return; + } + + bb->conversation->socket = source; + bb->conversation->rx_handler = purple_input_add(source, + PURPLE_INPUT_READ, _client_socket_handler, pb); + + /* This is unlikely to happen */ + if (ret < len) { + struct _stream_start_data *ss = g_new(struct _stream_start_data, 1); + ss->msg = g_strdup(stream_start + ret); + ss->tx_handler_cb = _send_data_write_cb; + bb->conversation->stream_data = ss; + /* Finish sending the stream start */ + bb->conversation->tx_handler = purple_input_add(source, + PURPLE_INPUT_WRITE, _start_stream, pb); + } + /* Process the send buffer */ + else { + bb->conversation->stream_started = TRUE; + /* Watch for when we can write the buffered messages */ + bb->conversation->tx_handler = purple_input_add(source, PURPLE_INPUT_WRITE, + _send_data_write_cb, pb); + /* We can probably write the data now. */ + _send_data_write_cb(pb, source, PURPLE_INPUT_WRITE); + } + + g_free(stream_start); +} + int bonjour_jabber_send_message(BonjourJabber *data, const gchar *to, const gchar *body) { @@ -540,14 +735,31 @@ /* Check if there is a previously open conversation */ if (bb->conversation == NULL) { - int socket = _connect_to_buddy(pb); - if (socket < 0) + PurpleProxyConnectData *connect_data; + + /* Make sure that the account always has a proxy of "none". + * This is kind of dirty, but proxy_connect_none() isn't exposed. */ + static PurpleProxyInfo *tmp_none_proxy_info = NULL; + if (!tmp_none_proxy_info) { + tmp_none_proxy_info = purple_proxy_info_new(); + purple_proxy_info_set_type(tmp_none_proxy_info, PURPLE_PROXY_NONE); + } + purple_account_set_proxy_info(data->account, tmp_none_proxy_info); + + connect_data = + purple_proxy_connect(data->account->gc, data->account, bb->ip, + bb->port_p2pj, _connected_to_buddy, pb); + + if (connect_data == NULL) { + purple_debug_error("bonjour", "Unable to connect to buddy (%s).\n", to); return -10001; + } bb->conversation = bonjour_jabber_conv_new(); - bb->conversation->socket = socket; - bb->conversation->watcher_id = purple_input_add(bb->conversation->socket, - PURPLE_INPUT_READ, _client_socket_handler, pb); + bb->conversation->connect_data = connect_data; + /* We don't want _send_data() to register the tx_handler; + * that neeeds to wait until we're actually connected. */ + bb->conversation->tx_handler = 0; } message_node = xmlnode_new("message"); @@ -574,47 +786,14 @@ xmlnode_set_namespace(node, "jabber:x:event"); xmlnode_insert_child(node, xmlnode_new("composing")); - message = xmlnode_to_str(message_node, NULL); xmlnode_free(message_node); - /* Check if the stream for the conversation has been started */ - if (bb->conversation->stream_started == FALSE) - { - char *stream_start = g_strdup_printf(DOCTYPE, purple_account_get_username(pb->account), - purple_buddy_get_name(pb)); - /* Start the stream */ - if (send(bb->conversation->socket, stream_start, strlen(stream_start), 0) == -1) - { - PurpleConversation *conv; + ret = _send_data(pb, message) >= 0; - purple_debug_error("bonjour", "Unable to start a conversation\n"); - purple_debug_warning("bonjour", "send error: %s\n", strerror(errno)); - conv = purple_find_conversation_with_account(PURPLE_CONV_TYPE_IM, bb->name, data->account); - purple_conversation_write(conv, NULL, - _("Unable to send the message, the conversation couldn't be started."), - PURPLE_MESSAGE_SYSTEM, time(NULL)); - - bonjour_jabber_close_conversation(bb->conversation); - bb->conversation = NULL; - - g_free(message); - g_free(stream_start); - return 0; - } - - g_free(stream_start); - bb->conversation->stream_started = TRUE; - } - - /* Send the message */ - ret = (_send_data(bb->conversation->socket, message) == -1); g_free(message); - if (ret == -1) - return -10000; - - return 1; + return ret; } void @@ -630,9 +809,20 @@ /* TODO: We're really supposed to wait for "" before closing the socket */ close(bconv->socket); } - purple_input_remove(bconv->watcher_id); + if (bconv->rx_handler != -1) + purple_input_remove(bconv->rx_handler); + if (bconv->tx_handler > 0) + purple_input_remove(bconv->tx_handler); /* Free all the data related to the conversation */ + purple_circ_buffer_destroy(bconv->tx_buf); + if (bconv->connect_data != NULL) + purple_proxy_connect_cancel(bconv->connect_data); + if (bconv->stream_data != NULL) { + struct _stream_start_data *ss = bconv->stream_data; + g_free(ss->msg); + g_free(ss); + } g_free(bconv); } } @@ -657,6 +847,7 @@ bonjour_jabber_close_conversation(bb->conversation); bb->conversation = NULL; } + g_slist_free(buddies); } } diff -r 2af1f8ccd396 -r 6e4e2d234c3a libpurple/protocols/bonjour/jabber.h --- a/libpurple/protocols/bonjour/jabber.h Fri Jun 08 15:37:48 2007 +0000 +++ b/libpurple/protocols/bonjour/jabber.h Fri Jun 08 18:24:23 2007 +0000 @@ -27,6 +27,7 @@ #define _BONJOUR_JABBER_H_ #include "account.h" +#include "circbuffer.h" typedef struct _BonjourJabber { @@ -39,8 +40,12 @@ typedef struct _BonjourJabberConversation { gint socket; - gint watcher_id; + guint rx_handler; + guint tx_handler; + PurpleCircBuffer *tx_buf; gboolean stream_started; + PurpleProxyConnectData *connect_data; + gpointer stream_data; } BonjourJabberConversation; /**