changeset 17558:6e4e2d234c3a

Update Bonjour to do nonblocking I/O correctly. This also includes a number of error handling bugfixes (and various other improvements). This doesn't handle the scenario where a partial message is read - I need to figure out how libxml2 handles such a scenario to fix it correctly. There are also also a few quirks that I noticed and didn't get around to fixing: -We don't wait for a "</stream:stream>" from the peer before closing the socket. -We don't make sure that the peer has sent us the stream start message before starting.
author Daniel Atallah <daniel.atallah@gmail.com>
date Fri, 08 Jun 2007 18:24:23 +0000
parents 2af1f8ccd396
children e60f5e8bb77e
files libpurple/protocols/bonjour/jabber.c libpurple/protocols/bonjour/jabber.h
diffstat 2 files changed, 333 insertions(+), 137 deletions(-) [+]
line wrap: on
line diff
--- a/libpurple/protocols/bonjour/jabber.c	Fri Jun 08 15:37:48 2007 +0000
+++ b/libpurple/protocols/bonjour/jabber.c	Fri Jun 08 18:24:23 2007 +0000
@@ -49,40 +49,6 @@
 #define DOCTYPE "<?xml version=\"1.0\" encoding=\"UTF-8\" ?>\n" \
 		"<stream:stream xmlns=\"jabber:client\" xmlns:stream=\"http://etherx.jabber.org/streams\" from=\"%s\" to=\"%s\">"
 
-static gint
-_connect_to_buddy(PurpleBuddy *gb)
-{
-	gint socket_fd;
-	struct sockaddr_in buddy_address;
-	BonjourBuddy *bb = gb->proto_data;
-
-	purple_debug_info("bonjour", "Connecting to buddy %s at %s:%d.\n",
-		purple_buddy_get_name(gb), bb->ip ? bb->ip : "(null)", bb->port_p2pj);
-
-	/* Create a socket and make it non-blocking */
-	socket_fd = socket(PF_INET, SOCK_STREAM, 0);
-	if (socket_fd < 0) {
-		purple_debug_warning("bonjour", "Error opening socket: %s\n", strerror(errno));
-		return -1;
-	}
-
-	buddy_address.sin_family = PF_INET;
-	buddy_address.sin_port = htons(bb->port_p2pj);
-	inet_aton(bb->ip, &(buddy_address.sin_addr));
-	memset(&(buddy_address.sin_zero), '\0', 8);
-
-	/* TODO: make this nonblocking before connecting */
-	if (connect(socket_fd, (struct sockaddr*)&buddy_address, sizeof(struct sockaddr)) == 0)
-		fcntl(socket_fd, F_SETFL, O_NONBLOCK);
-	else {
-		purple_debug_warning("bonjour", "Error connecting to buddy %s at %s:%d error: %s\n", purple_buddy_get_name(gb), bb->ip ? bb->ip : "(null)", bb->port_p2pj, strerror(errno));
-		close(socket_fd);
-		socket_fd = -1;
-	}
-
-	return socket_fd;
-}
-
 #if 0 /* this isn't used anywhere... */
 static const char *
 _font_size_purple_to_ichat(int size)
@@ -113,11 +79,14 @@
 
 	BonjourJabberConversation *bconv = g_new0(BonjourJabberConversation, 1);
 	bconv->socket = -1;
-	bconv->watcher_id = -1;
+	bconv->tx_buf = purple_circ_buffer_new(512);
+	bconv->tx_handler = -1;
+	bconv->rx_handler = -1;
 
 	return bconv;
 }
 
+
 static const char *
 _font_size_ichat_to_purple(int size)
 {
@@ -139,7 +108,7 @@
 }
 
 static void
-_jabber_parse_and_write_message_to_ui(xmlnode *message_node, PurpleConnection *connection, PurpleBuddy *gb)
+_jabber_parse_and_write_message_to_ui(xmlnode *message_node, PurpleConnection *connection, PurpleBuddy *pb)
 {
 	xmlnode *body_node, *html_node, *events_node;
 	char *body, *html_body = NULL;
@@ -188,9 +157,7 @@
 	if (events_node != NULL)
 	{
 		if (xmlnode_get_child(events_node, "composing") != NULL)
-		{
 			composing_event = TRUE;
-		}
 		if (xmlnode_get_child(events_node, "id") != NULL)
 		{
 			/* The user is just typing */
@@ -218,7 +185,7 @@
 	/* TODO: Should we do something with "composing_event" here? */
 
 	/* Send the message to the UI */
-	serv_got_im(connection, gb->name, body, 0, time(NULL));
+	serv_got_im(connection, pb->name, body, 0, time(NULL));
 
 	g_free(body);
 	g_free(html_body);
@@ -281,24 +248,96 @@
 	return total_message_length;
 }
 
-static gint
-_send_data(gint socket, char *message)
+static void
+_send_data_write_cb(gpointer data, gint source, PurpleInputCondition cond)
 {
-	gint message_len = strlen(message);
-	gint partial_sent = 0;
-	gchar *partial_message = message;
+	PurpleBuddy *pb = data;
+	BonjourBuddy *bb = pb->proto_data;
+	BonjourJabberConversation *bconv = bb->conversation;
+	int ret, writelen;
+
+	/* TODO: Make sure that the stream has been established before sending */
+
+	writelen = purple_circ_buffer_get_max_read(bconv->tx_buf);
+
+	if (writelen == 0) {
+		purple_input_remove(bconv->tx_handler);
+		bconv->tx_handler = -1;
+		return;
+	}
 
-	while ((partial_sent = send(socket, partial_message, message_len, 0)) < message_len)
-	{
-		if (partial_sent != -1) {
-			partial_message += partial_sent;
-			message_len -= partial_sent;
-		} else {
-			return -1;
-		}
+	ret = send(bconv->socket, bconv->tx_buf->outptr, writelen, 0);
+
+	if (ret < 0 && errno == EAGAIN)
+		return;
+	else if (ret <= 0) {
+		PurpleConversation *conv;
+		const char *error = strerror(errno);
+
+		purple_debug_error("bonjour", "Error sending message to buddy %s error: %s\n",
+				   purple_buddy_get_name(pb), error ? error : "(null)");
+
+		conv = purple_find_conversation_with_account(PURPLE_CONV_TYPE_IM, bb->name, pb->account);
+		if (conv != NULL)
+			purple_conversation_write(conv, NULL,
+				  _("Unable to send message."),
+				  PURPLE_MESSAGE_SYSTEM, time(NULL));
+
+		bonjour_jabber_close_conversation(bb->conversation);
+		bb->conversation = NULL;
+		return;
 	}
 
-	return strlen(message);
+	purple_circ_buffer_mark_read(bconv->tx_buf, ret);
+}
+
+static gint
+_send_data(PurpleBuddy *pb, char *message)
+{
+	gint ret;
+	int len = strlen(message);
+	BonjourBuddy *bb = pb->proto_data;
+	BonjourJabberConversation *bconv = bb->conversation;
+
+	/* If we're not ready to actually send, append it to the buffer */
+	if (bconv->tx_handler != -1
+			|| bconv->connect_data != NULL
+			|| !bconv->stream_started
+			|| purple_circ_buffer_get_max_read(bconv->tx_buf) > 0) {
+		ret = -1;
+		errno = EAGAIN;
+	} else {
+		ret = send(bconv->socket, message, len, 0);
+	}
+
+	if (ret == -1 && errno == EAGAIN)
+		ret = 0;
+	else if (ret <= 0) {
+		PurpleConversation *conv;
+		const char *error = strerror(errno);
+
+		purple_debug_error("bonjour", "Error sending message to buddy %s error: %s\n",
+				   purple_buddy_get_name(pb), error ? error : "(null)");
+
+		conv = purple_find_conversation_with_account(PURPLE_CONV_TYPE_IM, bb->name, pb->account);
+		if (conv != NULL)
+			purple_conversation_write(conv, NULL,
+				  _("Unable to send message."),
+				  PURPLE_MESSAGE_SYSTEM, time(NULL));
+
+		bonjour_jabber_close_conversation(bb->conversation);
+		bb->conversation = NULL;
+		return -1;
+	}
+
+	if (ret < len) {
+		if (bconv->tx_handler == -1)
+			bconv->tx_handler = purple_input_add(bconv->socket, PURPLE_INPUT_WRITE,
+				_send_data_write_cb, pb);
+		purple_circ_buffer_append(bconv->tx_buf, message + ret, len - ret);
+	}
+
+	return ret;
 }
 
 static void
@@ -308,7 +347,6 @@
 	gint message_length;
 	PurpleBuddy *pb = data;
 	PurpleAccount *account = pb->account;
-	PurpleConversation *conversation;
 	BonjourBuddy *bb = pb->proto_data;
 	gboolean closed_conversation = FALSE;
 	xmlnode *message_node;
@@ -316,7 +354,13 @@
 	/* Read the data from the socket */
 	if ((message_length = _read_data(socket, &message)) == -1) {
 		/* There have been an error reading from the socket */
-		/* TODO: Shouldn't we handle the error if it isn't EAGAIN? */
+		if (errno != EAGAIN) {
+			bonjour_jabber_close_conversation(bb->conversation);
+			bb->conversation = NULL;
+
+			/* I guess we really don't need to notify the user.
+			 * If they try to send another message it'll reconnect */
+		}
 		return;
 	} else if (message_length == 0) { /* The other end has closed the socket */
 		closed_conversation = TRUE;
@@ -332,49 +376,25 @@
 	/* Parse the message into an XMLnode for analysis */
 	message_node = xmlnode_from_str(message, strlen(message));
 
-	/* Check if the start of the stream has been received, if not check that the current */
-	/* data is the start of the stream */
-	if (!(bb->conversation->stream_started))
-	{
-		/* Check if this is the start of the stream */
-		if ((message_node != NULL) &&
-			g_ascii_strcasecmp(xmlnode_get_attrib(message_node, "xmlns"), "jabber:client") &&
-			(xmlnode_get_attrib(message_node,"xmlns:stream") != NULL))
-		{
-			bb->conversation->stream_started = TRUE;
-		}
-		else
-		{
-			char *stream_start = g_strdup_printf(DOCTYPE, purple_account_get_username(pb->account),
-							     purple_buddy_get_name(pb));
-
-			/* TODO: This needs to be nonblocking! */
-			if (send(bb->conversation->socket, stream_start, strlen(stream_start), 0) == -1)
-				purple_debug_error("bonjour", "Unable to start a conversation with %s\n", bb->name);
-			else
-				bb->conversation->stream_started = TRUE;
-
-			g_free(stream_start);
-		}
-	}
-
 	/*
 	 * Check that this is not the end of the conversation.  This is
 	 * using a magic string, but xmlnode won't play nice when just
 	 * parsing an end tag
 	 */
 	if (closed_conversation || purple_str_has_prefix(message, STREAM_END)) {
-		char *closed_conv_message;
+		PurpleConversation *conv;
 
 		/* Close the socket, clear the watcher and free memory */
 		bonjour_jabber_close_conversation(bb->conversation);
 		bb->conversation = NULL;
 
 		/* Inform the user that the conversation has been closed */
-		conversation = purple_find_conversation_with_account(PURPLE_CONV_TYPE_IM, pb->name, account);
-		closed_conv_message = g_strdup_printf(_("%s has closed the conversation."), pb->name);
-		purple_conversation_write(conversation, NULL, closed_conv_message, PURPLE_MESSAGE_SYSTEM, time(NULL));
-		g_free(closed_conv_message);
+		conv = purple_find_conversation_with_account(PURPLE_CONV_TYPE_IM, pb->name, account);
+		if (conv != NULL) {
+			char *tmp = g_strdup_printf(_("%s has closed the conversation."), pb->name);
+			purple_conversation_write(conv, NULL, tmp, PURPLE_MESSAGE_SYSTEM, time(NULL));
+			g_free(tmp);
+		}
 	} else if (message_node != NULL) {
 		/* Parse the message to get the data and send to the ui */
 		_jabber_parse_and_write_message_to_ui(message_node, account->gc, pb);
@@ -387,6 +407,71 @@
 		xmlnode_free(message_node);
 }
 
+struct _stream_start_data {
+	char *msg;
+	PurpleInputFunction tx_handler_cb;
+};
+
+static void
+_start_stream(gpointer data, gint source, PurpleInputCondition condition)
+{
+	PurpleBuddy *pb = data;
+	BonjourBuddy *bb = pb->proto_data;
+	struct _stream_start_data *ss = bb->conversation->stream_data;
+	int len, ret;
+
+	len = strlen(ss->msg);
+
+	/* Start Stream */
+	ret = send(source, ss->msg, len, 0);
+
+	if (ret == -1 && errno == EAGAIN)
+		return;
+	else if (ret <= 0) {
+		const char *err = strerror(errno);
+		PurpleConversation *conv;
+
+		purple_debug_error("bonjour", "Error starting stream with buddy %s at %s:%d error: %s\n",
+				   purple_buddy_get_name(pb), bb->ip ? bb->ip : "(null)", bb->port_p2pj, err ? err : "(null)");
+
+		conv = purple_find_conversation_with_account(PURPLE_CONV_TYPE_IM, bb->name, pb->account);
+		if (conv != NULL)
+			purple_conversation_write(conv, NULL,
+				  _("Unable to send the message, the conversation couldn't be started."),
+				  PURPLE_MESSAGE_SYSTEM, time(NULL));
+
+		bonjour_jabber_close_conversation(bb->conversation);
+		bb->conversation = NULL;
+
+		return;
+	}
+
+	/* This is EXTREMELY unlikely to happen */
+	if (ret < len) {
+		char *tmp = g_strdup(ss->msg + ret);
+		g_free(ss->msg);
+		ss->msg = tmp;
+		return;
+	}
+
+	/* Stream started; process the send buffer if there is one*/
+	purple_input_remove(bb->conversation->tx_handler);
+	bb->conversation->tx_handler= -1;
+
+	bb->conversation->stream_started = TRUE;
+
+	g_free(ss->msg);
+	g_free(ss);
+	bb->conversation->stream_data = NULL;
+
+	if (ss->tx_handler_cb) {
+		bb->conversation->tx_handler = purple_input_add(source, PURPLE_INPUT_WRITE,
+			ss->tx_handler_cb, pb);
+		/* We can probably write the data now. */
+		(ss->tx_handler_cb)(pb, source, PURPLE_INPUT_WRITE);
+	}
+}
+
 static void
 _server_socket_handler(gpointer data, int server_socket, PurpleInputCondition condition)
 {
@@ -427,21 +512,48 @@
 	/* Check if the conversation has been previously started */
 	if (bb->conversation == NULL)
 	{
+		int ret, len;
+		char *stream_start = g_strdup_printf(DOCTYPE, purple_account_get_username(pb->account),
+			purple_buddy_get_name(pb));
+
+		len = strlen(stream_start);
+
+		/* Start the stream */
+		ret = send(client_socket, stream_start, len, 0);
+
+		if (ret == -1 && errno == EAGAIN)
+			ret = 0;
+		else if (ret <= 0) {
+			const char *err = strerror(errno);
+
+			purple_debug_error("bonjour", "Error starting stream with buddy %s at %s:%d error: %s\n",
+					   purple_buddy_get_name(pb), bb->ip ? bb->ip : "(null)", bb->port_p2pj, err ? err : "(null)");
+
+			close(client_socket);
+			g_free(stream_start);
+
+			return;
+		}
+
 		bb->conversation = bonjour_jabber_conv_new();
 		bb->conversation->socket = client_socket;
+		bb->conversation->rx_handler = purple_input_add(client_socket,
+			PURPLE_INPUT_READ, _client_socket_handler, pb);
 
-		if (bb->conversation->stream_started == FALSE) {
-			char *stream_start = g_strdup_printf(DOCTYPE, purple_account_get_username(pb->account),
-							     purple_buddy_get_name(pb));
-			/* Start the stream */
-			send(bb->conversation->socket, stream_start, strlen(stream_start), 0);
+		/* This is unlikely to happen */
+		if (ret < len) {
+			struct _stream_start_data *ss = g_new(struct _stream_start_data, 1);
+			ss->msg = g_strdup(stream_start + ret);
+			ss->tx_handler_cb = NULL; /* We have nothing to write yet */
+			bb->conversation->stream_data = ss;
+			/* Finish sending the stream start */
+			bb->conversation->tx_handler = purple_input_add(client_socket,
+				PURPLE_INPUT_WRITE, _start_stream, pb);
+		} else {
 			bb->conversation->stream_started = TRUE;
-			g_free(stream_start);
 		}
 
-		/* Open a watcher for the client socket */
-		bb->conversation->watcher_id = purple_input_add(client_socket, PURPLE_INPUT_READ,
-								_client_socket_handler, pb);
+		g_free(stream_start);
 	} else {
 		close(client_socket);
 	}
@@ -519,6 +631,89 @@
 	return data->port;
 }
 
+static void
+_connected_to_buddy(gpointer data, gint source, const gchar *error)
+{
+	PurpleBuddy *pb = data;
+	BonjourBuddy *bb = pb->proto_data;
+	int len, ret;
+	char *stream_start = g_strdup_printf(DOCTYPE, purple_account_get_username(pb->account), purple_buddy_get_name(pb));
+
+	bb->conversation->connect_data = NULL;
+
+	if (source < 0) {
+		PurpleConversation *conv;
+
+		purple_debug_error("bonjour", "Error connecting to buddy %s at %s:%d error: %s\n",
+				   purple_buddy_get_name(pb), bb->ip ? bb->ip : "(null)", bb->port_p2pj, error ? error : "(null)");
+
+		conv = purple_find_conversation_with_account(PURPLE_CONV_TYPE_IM, bb->name, pb->account);
+		if (conv != NULL)
+			purple_conversation_write(conv, NULL,
+				  _("Unable to send the message, the conversation couldn't be started."),
+				  PURPLE_MESSAGE_SYSTEM, time(NULL));
+
+		bonjour_jabber_close_conversation(bb->conversation);
+		bb->conversation = NULL;
+		return;
+	}
+
+	len = strlen(stream_start);
+
+	/* Start the stream and send queued messages */
+	ret = send(source, stream_start, len, 0);
+
+	if (ret == -1 && errno == EAGAIN)
+		ret = 0;
+	else if (ret <= 0) {
+		const char *err = strerror(errno);
+		PurpleConversation *conv;
+
+		purple_debug_error("bonjour", "Error starting stream with buddy %s at %s:%d error: %s\n",
+				   purple_buddy_get_name(pb), bb->ip ? bb->ip : "(null)", bb->port_p2pj, err ? err : "(null)");
+
+		conv = purple_find_conversation_with_account(PURPLE_CONV_TYPE_IM, bb->name, pb->account);
+		if (conv != NULL)
+			purple_conversation_write(conv, NULL,
+				  _("Unable to send the message, the conversation couldn't be started."),
+				  PURPLE_MESSAGE_SYSTEM, time(NULL));
+
+		close(source);
+		bonjour_jabber_close_conversation(bb->conversation);
+		bb->conversation = NULL;
+
+		g_free(stream_start);
+
+		return;
+	}
+
+	bb->conversation->socket = source;
+	bb->conversation->rx_handler = purple_input_add(source,
+		PURPLE_INPUT_READ, _client_socket_handler, pb);
+
+	/* This is unlikely to happen */
+	if (ret < len) {
+		struct _stream_start_data *ss = g_new(struct _stream_start_data, 1);
+		ss->msg = g_strdup(stream_start + ret);
+		ss->tx_handler_cb = _send_data_write_cb;
+		bb->conversation->stream_data = ss;
+		/* Finish sending the stream start */
+		bb->conversation->tx_handler = purple_input_add(source,
+			PURPLE_INPUT_WRITE, _start_stream, pb);
+	}
+	/* Process the send buffer */
+	else {
+		bb->conversation->stream_started = TRUE;
+		/* Watch for when we can write the buffered messages */
+		bb->conversation->tx_handler = purple_input_add(source, PURPLE_INPUT_WRITE,
+			_send_data_write_cb, pb);
+		/* We can probably write the data now. */
+		_send_data_write_cb(pb, source, PURPLE_INPUT_WRITE);
+	}
+
+	g_free(stream_start);
+}
+
 int
 bonjour_jabber_send_message(BonjourJabber *data, const gchar *to, const gchar *body)
 {
@@ -540,14 +735,31 @@
 	/* Check if there is a previously open conversation */
 	if (bb->conversation == NULL)
 	{
-		int socket = _connect_to_buddy(pb);
-		if (socket < 0)
+		PurpleProxyConnectData *connect_data;
+
+		/* Make sure that the account always has a proxy of "none".
+		 * This is kind of dirty, but proxy_connect_none() isn't exposed. */
+		static PurpleProxyInfo *tmp_none_proxy_info = NULL;
+		if (!tmp_none_proxy_info) {
+			tmp_none_proxy_info = purple_proxy_info_new();
+			purple_proxy_info_set_type(tmp_none_proxy_info, PURPLE_PROXY_NONE);
+		}
+		purple_account_set_proxy_info(data->account, tmp_none_proxy_info);
+
+		connect_data =
+			purple_proxy_connect(data->account->gc, data->account, bb->ip,
+					     bb->port_p2pj, _connected_to_buddy, pb);
+
+		if (connect_data == NULL) {
+			purple_debug_error("bonjour", "Unable to connect to buddy (%s).\n", to);
 			return -10001;
+		}
 
 		bb->conversation = bonjour_jabber_conv_new();
-		bb->conversation->socket = socket;
-		bb->conversation->watcher_id = purple_input_add(bb->conversation->socket,
-				PURPLE_INPUT_READ, _client_socket_handler, pb);
+		bb->conversation->connect_data = connect_data;
+		/* We don't want _send_data() to register the tx_handler;
+		 * that neeeds to wait until we're actually connected. */
+		bb->conversation->tx_handler = 0;
 	}
 
 	message_node = xmlnode_new("message");
@@ -574,47 +786,14 @@
 	xmlnode_set_namespace(node, "jabber:x:event");
 	xmlnode_insert_child(node, xmlnode_new("composing"));
 
-
 	message = xmlnode_to_str(message_node, NULL);
 	xmlnode_free(message_node);
 
-	/* Check if the stream for the conversation has been started */
-	if (bb->conversation->stream_started == FALSE)
-	{
-		char *stream_start = g_strdup_printf(DOCTYPE, purple_account_get_username(pb->account),
-						     purple_buddy_get_name(pb));
-		/* Start the stream */
-		if (send(bb->conversation->socket, stream_start, strlen(stream_start), 0) == -1)
-		{
-			PurpleConversation *conv;
+	ret = _send_data(pb, message) >= 0;
 
-			purple_debug_error("bonjour", "Unable to start a conversation\n");
-			purple_debug_warning("bonjour", "send error: %s\n", strerror(errno));
-			conv = purple_find_conversation_with_account(PURPLE_CONV_TYPE_IM, bb->name, data->account);
-			purple_conversation_write(conv, NULL,
-						  _("Unable to send the message, the conversation couldn't be started."),
-						  PURPLE_MESSAGE_SYSTEM, time(NULL));
-
-			bonjour_jabber_close_conversation(bb->conversation);
-			bb->conversation = NULL;
-
-			g_free(message);
-			g_free(stream_start);
-			return 0;
-		}
-
-		g_free(stream_start);
-		bb->conversation->stream_started = TRUE;
-	}
-
-	/* Send the message */
-	ret = (_send_data(bb->conversation->socket, message) == -1);
 	g_free(message);
 
-	if (ret == -1)
-		return -10000;
-
-	return 1;
+	return ret;
 }
 
 void
@@ -630,9 +809,20 @@
 			/* TODO: We're really supposed to wait for "</stream:stream>" before closing the socket */
 			close(bconv->socket);
 		}
-		purple_input_remove(bconv->watcher_id);
+		if (bconv->rx_handler != -1)
+			purple_input_remove(bconv->rx_handler);
+		if (bconv->tx_handler > 0)
+			purple_input_remove(bconv->tx_handler);
 
 		/* Free all the data related to the conversation */
+		purple_circ_buffer_destroy(bconv->tx_buf);
+		if (bconv->connect_data != NULL)
+			purple_proxy_connect_cancel(bconv->connect_data);
+		if (bconv->stream_data != NULL) {
+			struct _stream_start_data *ss = bconv->stream_data;
+			g_free(ss->msg);
+			g_free(ss);
+		}
 		g_free(bconv);
 	}
 }
@@ -657,6 +847,7 @@
 			bonjour_jabber_close_conversation(bb->conversation);
 			bb->conversation = NULL;
 		}
+
 		g_slist_free(buddies);
 	}
 }
--- a/libpurple/protocols/bonjour/jabber.h	Fri Jun 08 15:37:48 2007 +0000
+++ b/libpurple/protocols/bonjour/jabber.h	Fri Jun 08 18:24:23 2007 +0000
@@ -27,6 +27,7 @@
 #define _BONJOUR_JABBER_H_
 
 #include "account.h"
+#include "circbuffer.h"
 
 typedef struct _BonjourJabber
 {
@@ -39,8 +40,12 @@
 typedef struct _BonjourJabberConversation
 {
 	gint socket;
-	gint watcher_id;
+	guint rx_handler;
+	guint tx_handler;
+	PurpleCircBuffer *tx_buf;
 	gboolean stream_started;
+	PurpleProxyConnectData *connect_data;
+	gpointer stream_data;
 } BonjourJabberConversation;
 
 /**