view libpurple/protocols/jabber/bosh.c @ 28133:3eef8392a54b

jabber: Add a BOSH send timer (queue up stanzas), fixes connecting to Prosody This send timer shouldn't strictly be necessary, but Prosody currently closes the TCP stream after every response (MattJ tells me there's a pipelining bug that they couldn't fix, so they just disabled it), but in my tests, the initial purple_ssl_read doesn't catch the EOF/disconnection (the read returns EAGAIN), so the prpl, while processing the response from the server, thinks that BOSH connection is still open, and tries to send to send to it. *Then*, the write input watcher triggers and read returns 0 (EOF). The request that was sent is then lost. Anyway, I've termed Prosody a fuzzer for BOSH. :-)
author Paul Aurich <paul@darkrain42.org>
date Sun, 02 Aug 2009 03:00:00 +0000
parents 1ff7de15d856
children 20f13609ca7a
line wrap: on
line source

/*
 * purple - Jabber Protocol Plugin
 *
 * Copyright (C) 2008, Tobias Markmann <tmarkmann@googlemail.com>
 *
 * This program is free software; you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation; either version 2 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02111-1301  USA
 *
 */
#include "internal.h"
#include "circbuffer.h"
#include "core.h"
#include "cipher.h"
#include "debug.h"
#include "prpl.h"
#include "util.h"
#include "xmlnode.h"

#include "bosh.h"

#define MAX_HTTP_CONNECTIONS      2
#define MAX_FAILED_CONNECTIONS    3
#define BUFFER_SEND_IN_SECS       1

typedef struct _PurpleHTTPConnection PurpleHTTPConnection;

typedef void (*PurpleBOSHConnectionConnectFunction)(PurpleBOSHConnection *conn);
typedef void (*PurpleBOSHConnectionReceiveFunction)(PurpleBOSHConnection *conn, xmlnode *node);

static char *bosh_useragent = NULL;

typedef enum {
	PACKET_NORMAL,
	PACKET_TERMINATE,
	PACKET_FLUSH,
} PurpleBOSHPacketType;

struct _PurpleBOSHConnection {
	JabberStream *js;
	PurpleHTTPConnection *connections[MAX_HTTP_CONNECTIONS];

	PurpleCircBuffer *pending;
	PurpleBOSHConnectionConnectFunction connect_cb;
	PurpleBOSHConnectionReceiveFunction receive_cb;

	/* Must be big enough to hold 2^53 - 1 */
	char *sid;
	guint64 rid;

	/* decoded URL */
	char *host;
	char *path;
	guint16 port;

	gboolean pipelining;
	gboolean ssl;
	gboolean needs_restart;

	enum {
		BOSH_CONN_OFFLINE,
		BOSH_CONN_BOOTING,
		BOSH_CONN_ONLINE
	} state;
	guint8 failed_connections;

	int max_inactivity;
	int wait;

	int max_requests;
	int requests;

	guint inactivity_timer;
	guint send_timer;
};

struct _PurpleHTTPConnection {
	PurpleBOSHConnection *bosh;
	PurpleSslConnection *psc;

	PurpleCircBuffer *write_buf;
	GString *read_buf;

	gsize handled_len;
	gsize body_len;

	int fd;
	guint readh;
	guint writeh;

	enum {
		HTTP_CONN_OFFLINE,
		HTTP_CONN_CONNECTING,
		HTTP_CONN_CONNECTED
	} state;
	int requests; /* number of outstanding HTTP requests */

	gboolean headers_done;

};

static void http_connection_connect(PurpleHTTPConnection *conn);
static void http_connection_send_request(PurpleHTTPConnection *conn,
                                         const GString *req);
static gboolean send_timer_cb(gpointer data);

void jabber_bosh_init(void)
{
	GHashTable *ui_info = purple_core_get_ui_info();
	const char *ui_name = NULL;
	const char *ui_version = NULL;

	if (ui_info) {
		ui_name = g_hash_table_lookup(ui_info, "name");
		ui_version = g_hash_table_lookup(ui_info, "version");
	}

	if (ui_name)
		bosh_useragent = g_strdup_printf("%s%s%s (libpurple " VERSION ")",
		                                 ui_name, ui_version ? " " : "",
		                                 ui_version ? ui_version : "");
	else
		bosh_useragent = g_strdup("libpurple " VERSION);
}

void jabber_bosh_uninit(void)
{
	g_free(bosh_useragent);
	bosh_useragent = NULL;
}

static PurpleHTTPConnection*
jabber_bosh_http_connection_init(PurpleBOSHConnection *bosh)
{
	PurpleHTTPConnection *conn = g_new0(PurpleHTTPConnection, 1);
	conn->bosh = bosh;
	conn->fd = -1;
	conn->state = HTTP_CONN_OFFLINE;

	conn->write_buf = purple_circ_buffer_new(0 /* default grow size */);

	return conn;
}

static void
jabber_bosh_http_connection_destroy(PurpleHTTPConnection *conn)
{
	if (conn->read_buf)
		g_string_free(conn->read_buf, TRUE);

	if (conn->write_buf)
		purple_circ_buffer_destroy(conn->write_buf);
	if (conn->readh)
		purple_input_remove(conn->readh);
	if (conn->writeh)
		purple_input_remove(conn->writeh);
	if (conn->psc)
		purple_ssl_close(conn->psc);
	if (conn->fd >= 0)
		close(conn->fd);

	purple_proxy_connect_cancel_with_handle(conn);

	g_free(conn);
}

PurpleBOSHConnection*
jabber_bosh_connection_init(JabberStream *js, const char *url)
{
	PurpleBOSHConnection *conn;
	char *host, *path, *user, *passwd;
	int port;

	if (!purple_url_parse(url, &host, &port, &path, &user, &passwd)) {
		purple_debug_info("jabber", "Unable to parse given URL.\n");
		return NULL;
	}

	conn = g_new0(PurpleBOSHConnection, 1);
	conn->host = host;
	conn->port = port;
	conn->path = g_strdup_printf("/%s", path);
	g_free(path);
	conn->pipelining = TRUE;
	conn->needs_restart = FALSE;

	if ((user && user[0] != '\0') || (passwd && passwd[0] != '\0')) {
		purple_debug_info("jabber", "Ignoring unexpected username and password "
		                            "in BOSH URL.\n");
	}

	g_free(user);
	g_free(passwd);

	conn->js = js;

	/*
	 * Random 64-bit integer masked off by 2^52 - 1.
	 *
	 * This should produce a random integer in the range [0, 2^52). It's
	 * unlikely we'll send enough packets in one session to overflow the rid.
	 */
	conn->rid = ((guint64)g_random_int() << 32) | g_random_int();
	conn->rid &= 0xFFFFFFFFFFFFFLL;

	conn->pending = purple_circ_buffer_new(0 /* default grow size */);

	conn->state = BOSH_CONN_OFFLINE;
	if (purple_strcasestr(url, "https://") != NULL)
		conn->ssl = TRUE;
	else
		conn->ssl = FALSE;

	conn->connections[0] = jabber_bosh_http_connection_init(conn);

	return conn;
}

void
jabber_bosh_connection_destroy(PurpleBOSHConnection *conn)
{
	int i;

	g_free(conn->host);
	g_free(conn->path);

	if (conn->send_timer)
		purple_timeout_remove(conn->send_timer);
	if (conn->inactivity_timer)
		purple_timeout_remove(conn->inactivity_timer);

	purple_circ_buffer_destroy(conn->pending);

	for (i = 0; i < MAX_HTTP_CONNECTIONS; ++i) {
		if (conn->connections[i])
			jabber_bosh_http_connection_destroy(conn->connections[i]);
	}

	g_free(conn);
}

gboolean jabber_bosh_connection_is_ssl(PurpleBOSHConnection *conn)
{
	return conn->ssl;
}

static PurpleHTTPConnection *
find_available_http_connection(PurpleBOSHConnection *conn)
{
	int i;

	if (purple_debug_is_verbose()) {
		for (i = 0; i < MAX_HTTP_CONNECTIONS; ++i) {
			PurpleHTTPConnection *httpconn = conn->connections[i];
			if (httpconn == NULL)
				purple_debug_misc("jabber", "BOSH %p->connections[%d] = (nil)\n",
				                  conn, i);
			else
				purple_debug_misc("jabber", "BOSH %p->connections[%d] = %p, state = %d"
				                  ", requests = %d\n", conn, i, httpconn,
				                  httpconn->state, httpconn->requests);
		}
	}

	/* Easy solution: Does everyone involved support pipelining? Hooray! Just use
	 * one TCP connection! */
	if (conn->pipelining)
		return conn->connections[0]->state == HTTP_CONN_CONNECTED ?
				conn->connections[0] : NULL;

	/* First loop, look for a connection that's ready */
	for (i = 0; i < MAX_HTTP_CONNECTIONS; ++i) {
		if (conn->connections[i] &&
				conn->connections[i]->state == HTTP_CONN_CONNECTED &&
				conn->connections[i]->requests == 0)
			return conn->connections[i];
	}

	/* Second loop, is something currently connecting? If so, just queue up. */
	for (i = 0; i < MAX_HTTP_CONNECTIONS; ++i) {
		if (conn->connections[i] &&
				conn->connections[i]->state == HTTP_CONN_CONNECTING)
			return NULL;
	}

	/* Third loop, look for one that's NULL and create a new connection */
	for (i = 0; i < MAX_HTTP_CONNECTIONS; ++i) {
		if (!conn->connections[i]) {
			purple_debug_info("jabber", "bosh: Creating and connecting new httpconn\n");
			conn->connections[i] = jabber_bosh_http_connection_init(conn);

			http_connection_connect(conn->connections[i]);
			return NULL;
		}
	}

	purple_debug_warning("jabber", "Could not find a HTTP connection!\n");

	/* None available. */
	return NULL;
}

static void
jabber_bosh_connection_send(PurpleBOSHConnection *conn,
                            const PurpleBOSHPacketType type, const char *data)
{
	PurpleHTTPConnection *chosen;
	GString *packet = NULL;

	if (type != PACKET_FLUSH && type != PACKET_TERMINATE) {
		/*
		 * Unless this is a flush (or session terminate, which needs to be
		 * sent immediately), queue up the data and start a timer to flush
		 * the buffer.
		 */
		if (data) {
			int len = data ? strlen(data) : 0;
			purple_circ_buffer_append(conn->pending, data, len);
		}

		if (purple_debug_is_verbose())
			purple_debug_misc("jabber", "bosh: %p has %" G_GSIZE_FORMAT " bytes in "
			                  "the buffer.\n", conn, conn->pending->bufused);
		if (conn->send_timer == 0)
			conn->send_timer = purple_timeout_add_seconds(BUFFER_SEND_IN_SECS,
					send_timer_cb, conn);
		return;
	}

	chosen = find_available_http_connection(conn);

	if (!chosen) {
		/*
		 * For non-ordinary traffic, we can't 'buffer' it, so use the
		 * first connection.
		 */
		chosen = conn->connections[0];

		if (chosen->state != HTTP_CONN_CONNECTED) {
			purple_debug_warning("jabber", "Unable to find a ready BOSH "
					"connection. Ignoring send of type 0x%02x.\n", type);
			return;
		}
	}

	/* We're flushing the send buffer, so remove the send timer */
	if (conn->send_timer != 0) {
		purple_timeout_remove(conn->send_timer);
		conn->send_timer = 0;
	}

	packet = g_string_new(NULL);

	g_string_printf(packet, "<body "
	                "rid='%" G_GUINT64_FORMAT "' "
	                "sid='%s' "
	                "to='%s' "
	                "xml:lang='en' "
	                "xmlns='http://jabber.org/protocol/httpbind' "
	                "xmlns:xmpp='urn:xmpp:xbosh'",
	                ++conn->rid,
	                conn->sid,
	                conn->js->user->domain);

	if (conn->needs_restart) {
		packet = g_string_append(packet, " xmpp:restart='true'/>");
		/* TODO: Do we need to wait for a response? */
		conn->needs_restart = FALSE;
	} else {
		gsize read_amt;
		if (type == PACKET_TERMINATE)
			packet = g_string_append(packet, " type='terminate'");

		packet = g_string_append_c(packet, '>');

		while ((read_amt = purple_circ_buffer_get_max_read(conn->pending)) > 0) {
			packet = g_string_append_len(packet, conn->pending->outptr, read_amt);
			purple_circ_buffer_mark_read(conn->pending, read_amt);
		}

		if (data)
			packet = g_string_append(packet, data);
		packet = g_string_append(packet, "</body>");
	}

	http_connection_send_request(chosen, packet);
}

void jabber_bosh_connection_close(PurpleBOSHConnection *conn)
{
	jabber_bosh_connection_send(conn, PACKET_TERMINATE, NULL);
}

static void jabber_bosh_connection_stream_restart(PurpleBOSHConnection *conn)
{
	conn->needs_restart = TRUE;
	jabber_bosh_connection_send(conn, PACKET_NORMAL, NULL);
}

static gboolean jabber_bosh_connection_error_check(PurpleBOSHConnection *conn, xmlnode *node) {
	const char *type;

	type = xmlnode_get_attrib(node, "type");

	if (type != NULL && !strcmp(type, "terminate")) {
		conn->state = BOSH_CONN_OFFLINE;
		purple_connection_error_reason(conn->js->gc,
			PURPLE_CONNECTION_ERROR_OTHER_ERROR,
			_("The BOSH connection manager terminated your session."));
		return TRUE;
	}
	return FALSE;
}

static gboolean
send_timer_cb(gpointer data)
{
	PurpleBOSHConnection *bosh;

	bosh = data;
	bosh->send_timer = 0;

	jabber_bosh_connection_send(bosh, PACKET_FLUSH, NULL);

	return FALSE;
}

static gboolean
bosh_inactivity_cb(gpointer data)
{
	PurpleBOSHConnection *bosh = data;
	bosh->inactivity_timer = 0;

	if (bosh->send_timer != 0)
		purple_timeout_remove(bosh->send_timer);

	/* clears bosh->send_timer */
	send_timer_cb(bosh);

	return FALSE;
}

static void
restart_inactivity_timer(PurpleBOSHConnection *conn)
{
	if (conn->inactivity_timer != 0) {
		purple_timeout_remove(conn->inactivity_timer);
		conn->inactivity_timer = 0;
	}

	if (conn->max_inactivity != 0) {
		conn->inactivity_timer =
			purple_timeout_add_seconds(conn->max_inactivity - 5 /* rounding */,
			                           bosh_inactivity_cb, conn);
	}
}

static void jabber_bosh_connection_received(PurpleBOSHConnection *conn, xmlnode *node) {
	xmlnode *child;
	JabberStream *js = conn->js;

	g_return_if_fail(node != NULL);
	if (jabber_bosh_connection_error_check(conn, node))
		return;

	child = node->child;
	while (child != NULL) {
		/* jabber_process_packet might free child */
		xmlnode *next = child->next;
		if (child->type == XMLNODE_TYPE_TAG) {
			jabber_process_packet(js, &child);
		}

		child = next;
	}
}

static void auth_response_cb(PurpleBOSHConnection *conn, xmlnode *node) {
	xmlnode *child;

	g_return_if_fail(node != NULL);
	if (jabber_bosh_connection_error_check(conn, node))
		return;

	child = node->child;
	while(child != NULL && child->type != XMLNODE_TYPE_TAG) {
		child = child->next;
	}

	/* We're only expecting one XML node here, so only process the first one */
	if (child != NULL && child->type == XMLNODE_TYPE_TAG) {
		JabberStream *js = conn->js;
		if (!strcmp(child->name, "success")) {
			jabber_bosh_connection_stream_restart(conn);
			jabber_process_packet(js, &child);
			conn->receive_cb = jabber_bosh_connection_received;
		} else {
			js->state = JABBER_STREAM_AUTHENTICATING;
			jabber_process_packet(js, &child);
		}
	} else {
		purple_debug_warning("jabber", "Received unexepcted empty BOSH packet.\n");
	}
}

static void boot_response_cb(PurpleBOSHConnection *conn, xmlnode *node) {
	const char *sid, *version;
	const char *inactivity, *requests;
	xmlnode *packet;

	g_return_if_fail(node != NULL);
	if (jabber_bosh_connection_error_check(conn, node))
		return;

	sid = xmlnode_get_attrib(node, "sid");
	version = xmlnode_get_attrib(node, "ver");

	inactivity = xmlnode_get_attrib(node, "inactivity");
	requests = xmlnode_get_attrib(node, "requests");

	if (sid) {
		conn->sid = g_strdup(sid);
	} else {
		purple_connection_error_reason(conn->js->gc,
		        PURPLE_CONNECTION_ERROR_NETWORK_ERROR,
		        _("No session ID given"));
		return;
	}

	if (version) {
		const char *dot = strstr(version, ".");
		int major, minor = 0;

		purple_debug_info("jabber", "BOSH connection manager version %s\n", version);

		major = atoi(version);
		if (dot)
			minor = atoi(dot + 1);

		if (major != 1 || minor < 6) {
			purple_connection_error_reason(conn->js->gc,
			        PURPLE_CONNECTION_ERROR_NETWORK_ERROR,
			        _("Unsupported version of BOSH protocol"));
			return;
		}
	} else {
		purple_debug_info("jabber", "Missing version in BOSH initiation\n");
	}

	if (inactivity) {
		conn->max_inactivity = atoi(inactivity);
		if (conn->max_inactivity <= 5) {
			purple_debug_warning("jabber", "Ignoring bogusly small inactivity: %s\n",
			                     inactivity);
			conn->max_inactivity = 0;
		} else {
			/* TODO: Integrate this with jabber.c keepalive checks... */
			/* TODO: Can this check fail? It shouldn't */
			if (conn->inactivity_timer == 0) {
				purple_debug_misc("jabber", "Starting BOSH inactivity timer "
						"for %d secs (compensating for rounding)\n",
						conn->max_inactivity - 5);
				restart_inactivity_timer(conn);
			}
		}
	}

	if (requests)
		conn->max_requests = atoi(requests);

	/* FIXME: Depending on receiving features might break with some hosts */
	packet = xmlnode_get_child(node, "features");
	conn->state = BOSH_CONN_ONLINE;
	conn->receive_cb = auth_response_cb;
	jabber_stream_features_parse(conn->js, packet);
}

static void jabber_bosh_connection_boot(PurpleBOSHConnection *conn) {
	GString *buf = g_string_new(NULL);

	g_string_printf(buf, "<body content='text/xml; charset=utf-8' "
	                "secure='true' "
	                "to='%s' "
	                "xml:lang='en' "
	                "xmpp:version='1.0' "
	                "ver='1.6' "
	                "xmlns:xmpp='urn:xmpp:bosh' "
	                "rid='%" G_GUINT64_FORMAT "' "
/* TODO: This should be adjusted/adjustable automatically according to
 * realtime network behavior */
	                "wait='60' "
	                "hold='1' "
	                "xmlns='http://jabber.org/protocol/httpbind'/>",
	                conn->js->user->domain,
	                ++conn->rid);

	purple_debug_misc("jabber", "SendBOSH Boot %s(%" G_GSIZE_FORMAT "): %s\n",
	                  conn->ssl ? "(ssl)" : "", buf->len, buf->str);
	conn->receive_cb = boot_response_cb;
	http_connection_send_request(conn->connections[0], buf);
	g_string_free(buf, TRUE);
}

static void
http_received_cb(const char *data, int len, PurpleBOSHConnection *conn)
{
	if (conn->failed_connections)
		/* We've got some data, so reset the number of failed connections */
		conn->failed_connections = 0;

	if (conn->receive_cb) {
		xmlnode *node = xmlnode_from_str(data, len);

		purple_debug_info("jabber", "RecvBOSH %s(%d): %s\n",
		                  conn->ssl ? "(ssl)" : "", len, data);

		if (node) {
			conn->receive_cb(conn, node);
			xmlnode_free(node);
		} else {
			purple_debug_warning("jabber", "BOSH: Received invalid XML\n");
		}
	} else {
		g_return_if_reached();
	}
}

void jabber_bosh_connection_send_raw(PurpleBOSHConnection *conn,
                                     const char *data)
{
	jabber_bosh_connection_send(conn, PACKET_NORMAL, data);
}

static void
connection_common_established_cb(PurpleHTTPConnection *conn)
{
	/* Indicate we're ready and reset some variables */
	conn->state = HTTP_CONN_CONNECTED;
	if (conn->requests != 0)
		purple_debug_error("jabber", "bosh: httpconn %p has %d requests, != 0\n",
		                   conn, conn->requests);

	conn->requests = 0;
	if (conn->read_buf) {
		g_string_free(conn->read_buf, TRUE);
		conn->read_buf = NULL;
	}
	conn->headers_done = FALSE;
	conn->handled_len = conn->body_len = 0;

	if (conn->bosh->needs_restart)
		jabber_bosh_connection_stream_restart(conn->bosh);
	else if (conn->bosh->state == BOSH_CONN_ONLINE) {
		purple_debug_info("jabber", "BOSH session already exists. Trying to reuse it.\n");
		if (conn->bosh->requests == 0 || conn->bosh->pending->bufused > 0) {
			/* Send the pending data */
			jabber_bosh_connection_send(conn->bosh, PACKET_FLUSH, NULL);
		}
	} else
		jabber_bosh_connection_boot(conn->bosh);
}

static void http_connection_disconnected(PurpleHTTPConnection *conn)
{
	/*
	 * Well, then. Fine! I never liked you anyway, server! I was cheating on you
	 * with AIM!
	 */
	conn->state = HTTP_CONN_OFFLINE;
	if (conn->psc) {
		purple_ssl_close(conn->psc);
		conn->psc = NULL;
	} else if (conn->fd >= 0) {
		close(conn->fd);
		conn->fd = -1;
	}

	if (conn->readh) {
		purple_input_remove(conn->readh);
		conn->readh = 0;
	}

	if (conn->writeh) {
		purple_input_remove(conn->writeh);
		conn->writeh = 0;
	}

	if (conn->requests > 0 && conn->read_buf->len == 0) {
		purple_debug_error("jabber", "bosh: Adjusting BOSHconn requests (%d) to %d\n",
		                   conn->bosh->requests, conn->bosh->requests - conn->requests);
		conn->bosh->requests -= conn->requests;
		conn->requests = 0;
	}
	if (conn->bosh->pipelining)
		/* Hmmmm, fall back to multiple connections */
		conn->bosh->pipelining = FALSE;

	if (++conn->bosh->failed_connections == MAX_FAILED_CONNECTIONS) {
		purple_connection_error_reason(conn->bosh->js->gc,
				PURPLE_CONNECTION_ERROR_NETWORK_ERROR,
				_("Unable to establish a connection with the server"));
	} else {
		/* No! Please! Take me back. It was me, not you! I was weak! */
		http_connection_connect(conn);
	}
}

void jabber_bosh_connection_connect(PurpleBOSHConnection *bosh) {
	PurpleHTTPConnection *conn = bosh->connections[0];

	g_return_if_fail(bosh->state == BOSH_CONN_OFFLINE);
	bosh->state = BOSH_CONN_BOOTING;

	http_connection_connect(conn);
}

static void
jabber_bosh_http_connection_process(PurpleHTTPConnection *conn)
{
	const char *cursor;

	cursor = conn->read_buf->str + conn->handled_len;

	if (!conn->headers_done) {
		const char *content_length = purple_strcasestr(cursor, "\r\nContent-Length");
		const char *end_of_headers = strstr(cursor, "\r\n\r\n");

		/* Make sure Content-Length is in headers, not body */
		if (content_length && (!end_of_headers || content_length < end_of_headers)) {
			const char *sep;
			const char *eol;
			int len;

			if ((sep = strstr(content_length, ": ")) == NULL ||
					(eol = strstr(sep, "\r\n")) == NULL)
				/*
				 * The packet ends in the middle of the Content-Length line.
				 * We'll try again later when we have more.
				 */
				return;

			len = atoi(sep + 2);
			if (len == 0)
				purple_debug_warning("jabber", "Found mangled Content-Length header.\n");

			conn->body_len = len;
		}

		if (end_of_headers) {
			conn->headers_done = TRUE;
			conn->handled_len = end_of_headers - conn->read_buf->str + 4;
			cursor = end_of_headers + 4;
		} else {
			conn->handled_len = conn->read_buf->len;
			return;
		}
	}

	/* Have we handled everything in the buffer? */
	if (conn->handled_len >= conn->read_buf->len)
		return;

	/* Have we read all that the Content-Length promised us? */
	if (conn->read_buf->len - conn->handled_len < conn->body_len)
		return;

	--conn->requests;
	--conn->bosh->requests;

	http_received_cb(conn->read_buf->str + conn->handled_len, conn->body_len,
	                 conn->bosh);

	if (conn->bosh->state == BOSH_CONN_ONLINE &&
			(conn->bosh->requests == 0 || conn->bosh->pending->bufused > 0)) {
		purple_debug_misc("jabber", "BOSH: Sending an empty request\n");
		jabber_bosh_connection_send(conn->bosh, PACKET_NORMAL, NULL);
	}

	g_string_free(conn->read_buf, TRUE);
	conn->read_buf = NULL;
	conn->headers_done = FALSE;
	conn->handled_len = conn->body_len = 0;
}

/*
 * Common code for reading, called from http_connection_read_cb_ssl and
 * http_connection_read_cb.
 */
static void
http_connection_read(PurpleHTTPConnection *conn)
{
	char buffer[1025];
	int cnt, count = 0;

	if (!conn->read_buf)
		conn->read_buf = g_string_new(NULL);

	do {
		if (conn->psc)
			cnt = purple_ssl_read(conn->psc, buffer, sizeof(buffer));
		else
			cnt = read(conn->fd, buffer, sizeof(buffer));

		if (cnt > 0) {
			count += cnt;
			g_string_append_len(conn->read_buf, buffer, cnt);
		}
	} while (cnt > 0);

	if (cnt == 0 || (cnt < 0 && errno != EAGAIN)) {
		if (cnt < 0)
			purple_debug_info("jabber", "bosh read=%d, errno=%d, error=%s\n",
			                  cnt, errno, g_strerror(errno));
		else
			purple_debug_info("jabber", "bosh server closed the connection\n");

		/*
		 * If the socket is closed, the processing really needs to know about
		 * it. Handle that now.
		 */
		http_connection_disconnected(conn);

		/* Process what we do have */
	}

	if (conn->read_buf->len > 0)
		jabber_bosh_http_connection_process(conn);
}

static void
http_connection_read_cb(gpointer data, gint fd, PurpleInputCondition condition)
{
	PurpleHTTPConnection *conn = data;

	http_connection_read(conn);
}

static void
http_connection_read_cb_ssl(gpointer data, PurpleSslConnection *psc,
                            PurpleInputCondition cond)
{
	PurpleHTTPConnection *conn = data;

	http_connection_read(conn);
}

static void
ssl_connection_established_cb(gpointer data, PurpleSslConnection *psc,
                              PurpleInputCondition cond)
{
	PurpleHTTPConnection *conn = data;

	purple_ssl_input_add(psc, http_connection_read_cb_ssl, conn);
	connection_common_established_cb(conn);
}

static void
ssl_connection_error_cb(PurpleSslConnection *gsc, PurpleSslErrorType error,
                        gpointer data)
{
	PurpleHTTPConnection *conn = data;

	/* sslconn frees the connection on error */
	conn->psc = NULL;

	purple_connection_ssl_error(conn->bosh->js->gc, error);
}

static void
connection_established_cb(gpointer data, gint source, const gchar *error)
{
	PurpleHTTPConnection *conn = data;
	PurpleConnection *gc = conn->bosh->js->gc;

	if (source < 0) {
		gchar *tmp;
		tmp = g_strdup_printf(_("Unable to establish a connection with the server: %s"),
		        error);
		purple_connection_error_reason(gc, PURPLE_CONNECTION_ERROR_NETWORK_ERROR, tmp);
		g_free(tmp);
		return;
	}

	conn->fd = source;
	conn->readh = purple_input_add(conn->fd, PURPLE_INPUT_READ,
	        http_connection_read_cb, conn);
	connection_common_established_cb(conn);
}

static void http_connection_connect(PurpleHTTPConnection *conn)
{
	PurpleBOSHConnection *bosh = conn->bosh;
	PurpleConnection *gc = bosh->js->gc;
	PurpleAccount *account = purple_connection_get_account(gc);

	conn->state = HTTP_CONN_CONNECTING;

	if (bosh->ssl) {
		if (purple_ssl_is_supported()) {
			conn->psc = purple_ssl_connect(account, bosh->host, bosh->port,
			                               ssl_connection_established_cb,
			                               ssl_connection_error_cb,
			                               conn);
			if (!conn->psc) {
				purple_connection_error_reason(gc,
					PURPLE_CONNECTION_ERROR_NO_SSL_SUPPORT,
					_("Unable to establish SSL connection"));
			}
		} else {
			purple_connection_error_reason(gc,
			    PURPLE_CONNECTION_ERROR_NO_SSL_SUPPORT,
			    _("SSL support unavailable"));
		}
	} else if (purple_proxy_connect(conn, account, bosh->host, bosh->port,
	                                 connection_established_cb, conn) == NULL) {
		purple_connection_error_reason(gc,
		    PURPLE_CONNECTION_ERROR_NETWORK_ERROR,
		    _("Unable to connect"));
	}
}

static int
http_connection_do_send(PurpleHTTPConnection *conn, const char *data, int len)
{
	int ret;

	if (conn->psc)
		ret = purple_ssl_write(conn->psc, data, len);
	else
		ret = write(conn->fd, data, len);

	return ret;
}

static void
http_connection_send_cb(gpointer data, gint source, PurpleInputCondition cond)
{
	PurpleHTTPConnection *conn = data;
	int ret;
	int writelen = purple_circ_buffer_get_max_read(conn->write_buf);

	if (writelen == 0) {
		purple_input_remove(conn->writeh);
		conn->writeh = 0;
		return;
	}

	ret = http_connection_do_send(conn, conn->write_buf->outptr, writelen);

	if (ret < 0 && errno == EAGAIN)
		return;
	else if (ret <= 0) {
		/*
		 * TODO: Handle this better. Probably requires a PurpleBOSHConnection
		 * buffer that stores what is "being sent" until the
		 * PurpleHTTPConnection reports it is fully sent.
		 */
		gchar *tmp = g_strdup_printf(_("Lost connection with server: %s"),
				g_strerror(errno));
		purple_connection_error_reason(conn->bosh->js->gc,
				PURPLE_CONNECTION_ERROR_NETWORK_ERROR,
				tmp);
		g_free(tmp);
		return;
	}

	purple_circ_buffer_mark_read(conn->write_buf, ret);
}

static void
http_connection_send_request(PurpleHTTPConnection *conn, const GString *req)
{
	char *data;
	int ret;
	size_t len;

	/* Sending something to the server, restart the inactivity timer */
	restart_inactivity_timer(conn->bosh);

	data = g_strdup_printf("POST %s HTTP/1.1\r\n"
	                       "Host: %s\r\n"
	                       "User-Agent: %s\r\n"
	                       "Content-Encoding: text/xml; charset=utf-8\r\n"
	                       "Content-Length: %" G_GSIZE_FORMAT "\r\n\r\n"
	                       "%s",
	                       conn->bosh->path, conn->bosh->host, bosh_useragent,
	                       req->len, req->str);

	len = strlen(data);

	++conn->requests;
	++conn->bosh->requests;

	if (purple_debug_is_unsafe() && purple_debug_is_verbose())
		/* Will contain passwords for SASL PLAIN and is verbose */
		purple_debug_misc("jabber", "BOSH: Sending %s\n", data);

	if (conn->writeh == 0)
		ret = http_connection_do_send(conn, data, len);
	else {
		ret = -1;
		errno = EAGAIN;
	}

	if (ret < 0 && errno != EAGAIN) {
		/*
		 * TODO: Handle this better. Probably requires a PurpleBOSHConnection
		 * buffer that stores what is "being sent" until the
		 * PurpleHTTPConnection reports it is fully sent.
		 */
		gchar *tmp = g_strdup_printf(_("Lost connection with server: %s"),
				g_strerror(errno));
		purple_connection_error_reason(conn->bosh->js->gc,
				PURPLE_CONNECTION_ERROR_NETWORK_ERROR,
				tmp);
		g_free(tmp);
		return;
	} else if (ret < len) {
		if (ret < 0)
			ret = 0;
		if (conn->writeh == 0)
			conn->writeh = purple_input_add(conn->psc ? conn->psc->fd : conn->fd,
					PURPLE_INPUT_WRITE, http_connection_send_cb, conn);
		purple_circ_buffer_append(conn->write_buf, data + ret, len - ret);
	}
}