view src/neon/neon.c @ 1729:eaa8a5747628

- Add network timeouts - Add useragent
author Ralf Ertzinger <ralf@skytale.net>
date Tue, 18 Sep 2007 11:35:15 +0200
parents 2e33cfa6a872
children 50d151b259bb
line wrap: on
line source

#include <audacious/vfs.h>
#include <audacious/plugin.h>

#include <ne_socket.h>
#include <ne_utils.h>
#include <ne_redirect.h>
#include <ne_request.h>

#include "debug.h"
#include "neon.h"
#include "rb.h"

#define NBUFSIZ (128u*1024u)
#define NETBLKSIZ (4096u)

DECLARE_PLUGIN(neon, init, fini)

VFSConstructor neon_http_const = {
    "http://",
    neon_vfs_fopen_impl,
    neon_vfs_fclose_impl,
    neon_vfs_fread_impl,
    neon_vfs_fwrite_impl,
    neon_vfs_getc_impl,
    neon_vfs_ungetc_impl,
    neon_vfs_fseek_impl,
    neon_vfs_rewind_impl,
    neon_vfs_ftell_impl,
    neon_vfs_feof_impl,
    neon_vfs_truncate_impl,
    neon_vfs_fsize_impl,
    neon_vfs_metadata_impl
};

/*
 * ========
 */

static struct neon_handle* handle_init(void) {

    struct neon_handle* h;

    _ENTER;

    if (NULL == (h = malloc(sizeof(struct neon_handle)))) {
        _ERROR("Could not allocate memory for handle");
        _LEAVE NULL;
    }

    if (0 != init_rb(&(h->rb), NBUFSIZ)) {
        _ERROR("Could not initialize buffer");
        free(h);
        _LEAVE NULL;
    }

    h->url = NULL;
    h->purl = &purl;
    memset(h->purl, 0, sizeof(ne_uri));
    h->session = NULL;
    h->request = NULL;
    h->redircount = 0;
    h->pos = 0;
    h->content_length = -1;
    h->can_ranges = FALSE;
    h->reader = NULL;
    h->reader_status.mutex = g_mutex_new();
    h->reader_status.cond = g_cond_new();
    h->reader_status.reading = FALSE;
    h->reader_status.status = NEON_READER_INIT;
    h->eof = FALSE;

    _LEAVE h;
}

/*
 * -----
 */

static void handle_free(struct neon_handle* h) {

    _ENTER;

    ne_uri_free(h->purl);
    destroy_rb(&h->rb);
    free(h);

    _LEAVE;
}

/*
 * ----
 */

static void init(void) {

    int ret;

    _ENTER;

    if (0 != (ret = ne_sock_init())) {
        _ERROR("Could not initialize neon library: %d\n", ret);
        _LEAVE;
    }

    vfs_register_transport(&neon_http_const);

    if (0 != ne_has_support(NE_FEATURE_SSL)) {
        _DEBUG("neon compiled with thread-safe SSL, enabling https:// transport");
    }

    _LEAVE;
}

/*
 * -----
 */

static void fini(void) {

    _ENTER;

    ne_sock_exit();

    _LEAVE;
}

/*
 * -----
 */

static void kill_reader(struct neon_handle* h) {

    _ENTER;

    _DEBUG("Signaling reader thread to terminate");
    g_mutex_lock(h->reader_status.mutex);
    h->reader_status.reading = FALSE;
    g_cond_signal(h->reader_status.cond);
    g_mutex_unlock(h->reader_status.mutex);

    _DEBUG("Waiting for reader thread to die...");
    g_thread_join(h->reader);
    _DEBUG("Reader thread has died");
    h->reader = NULL;
}

/*
 * -----
 */

static void handle_headers(struct neon_handle* h) {

    const gchar* name;
    const gchar* value;
    void* cursor = NULL;
    long len;
    gchar* endptr;

    _ENTER;

    _DEBUG("Header responses:");
    while(NULL != (cursor = ne_response_header_iterate(h->request, cursor, &name, &value))) {
        _DEBUG("HEADER: %s: %s", name, value);
        if (0 == g_ascii_strncasecmp("accept-ranges", name, 13)) {
            /*
             * The server advertises range capability. we need "bytes"
             */
            if (NULL != g_strrstr(value, "bytes")) {
                _DEBUG("server can_ranges");
                h->can_ranges = TRUE;
            }
        }

        if (0 == g_ascii_strncasecmp("content-length", name, 14)) {
            /*
             * The server sent us the content length. Parse and store.
             */
            len = strtol(value, &endptr, 10);
            if ((*value != '\0') && (*endptr == '\0')) {
                /*
                 * Valid data.
                 */
                _DEBUG("Content length as advertised by server: %d", len);
                h->content_length = len;
            }
        }
    }

    _LEAVE;
}

/*
 * -----
 */

static int open_request(struct neon_handle* handle, unsigned long startbyte) {

    int ret;

    _ENTER;

    handle->request = ne_request_create(handle->session, "GET", handle->purl->path);
    ne_print_request_header(handle->request, "Range", "bytes=%ld-", startbyte);

    /*
     * Try to connect to the server.
     */
    _DEBUG("Connecting...");
    ret = ne_begin_request(handle->request);

    switch (ret) {
        case NE_OK:
            /* URL opened OK */
            _DEBUG("URL opened OK");
            handle->content_start = startbyte;
            handle->pos = startbyte;
            handle_headers(handle);
            _LEAVE 0;
            break;

        case NE_REDIRECT:
            /* We hit a redirect. Handle it. */
            _DEBUG("Redirect encountered");
            handle->redircount += 1;
            handle->purl = ne_redirect_location(handle->session);
            ne_request_destroy(handle->request);
            if (NULL == handle->purl) {
                _ERROR("Could not parse redirect response");
                _LEAVE -1;
            }
            _LEAVE 1;
            break;

        default:
            /* Something went wrong. */
            _ERROR("Could not open URL: %d", ret);
            if (1 == ret) {
                _ERROR("neon error string: %s", ne_get_error(handle->session));
            }
            ne_request_destroy(handle->request);
            _LEAVE -1;
            break;
    }
}

/*
 * -----
 */

static int open_handle(struct neon_handle* handle, unsigned long startbyte) {

    int ret;

    _ENTER;

    handle->redircount = 0;

    _DEBUG("Parsing URL");
    if (0 != ne_uri_parse(handle->url, handle->purl)) {
        _ERROR("Could not parse URL '%s'", handle->url);
        _LEAVE -1;
    }

    if (0 == handle->purl->port) {
        handle->purl->port = 80;
    }

    while (handle->redircount < 10) {

        _DEBUG("Creating session");
        handle->session = ne_session_create(handle->purl->scheme, handle->purl->host, handle->purl->port);
        ne_set_session_flag(handle->session, NE_SESSFLAG_ICYPROTO, 1);
        ne_set_session_flag(handle->session, NE_SESSFLAG_PERSIST, 0);
        ne_set_connect_timeout(handle->session, 10);
        ne_set_read_timeout(handle->session, 10);
        ne_set_useragent(handle->session, "Audacious/1.4.0");
        ne_redirect_register(handle->session);

        _DEBUG("Creating request");
        ret = open_request(handle, startbyte);

        if (0 == ret) {
            _LEAVE 0;
        } else if (-1 == ret) {
            ne_session_destroy(handle->session);
            _LEAVE -1;
        }
    }

    /*
     * If we get here, our redirect count exceeded
     */

    _ERROR("Redirect count exceeded for URL");

    _LEAVE 1;
}

/*
 * -----
 */

static int fill_buffer(struct neon_handle* h) {

    ssize_t bsize;
    char buffer[NETBLKSIZ];
    ssize_t to_read;

    _ENTER;

    bsize = free_rb(&h->rb);
    to_read = MIN(bsize, NETBLKSIZ);

    _DEBUG("%d bytes free in buffer, trying to read %d bytes max", bsize, to_read);

    _DEBUG("Reading from the network....");
    if (0 >= (bsize = ne_read_response_block(h->request, buffer, to_read))) {
        if (0 == bsize) {
            _DEBUG("End of file encountered");
            _LEAVE 1;
        } else {
            _ERROR("Error while reading from the network");
            _LEAVE -1;
        }
    }
    _DEBUG("Read %d bytes from the network", bsize);

    if (0 != write_rb(&(h->rb), buffer, bsize)) {
        _ERROR("Error putting data into buffer");
        _LEAVE -1;
    }

    _LEAVE 0;
}

/*
 * -----
 */

static int fill_buffer_limit(struct neon_handle* h, unsigned int maxfree) {

    ssize_t bfree;
    int ret;

    _ENTER;

    bfree = free_rb(&h->rb);
    _DEBUG("Filling buffer up to max %d bytes free, %d bytes free now", maxfree, bfree);

    while (bfree > maxfree) {
        ret = fill_buffer(h);
        if (-1 == ret) {
            _ERROR("Error while filling buffer");
            _LEAVE ret;
        } else if (1 == ret) {
            /*
             * EOF while filling the buffer. Return what we have.
             */
            _LEAVE 0;
        }

        bfree = free_rb(&h->rb);
    }

    _LEAVE 0;
}

/*
 * -----
 */

static gpointer reader_thread(void* data) {

    struct neon_handle* h = (struct neon_handle*)data;
    int ret;

    _ENTER;

    g_mutex_lock(h->reader_status.mutex);

    while(h->reader_status.reading) {
        g_mutex_unlock(h->reader_status.mutex);

        /*
         * Hit the network only if we have more than NETBLKSIZ of free buffer
         */
        if (NETBLKSIZ < free_rb(&h->rb)) {

            _DEBUG("Filling buffer...");
            ret = fill_buffer(h);

            g_mutex_lock(h->reader_status.mutex);
            if (-1 == ret) {
                /*
                 * Error encountered while reading from the network.
                 * Set the error flag and terminate the
                 * reader thread.
                 */
                _DEBUG("Error while reading from the network. Terminating reader thread");
                h->reader_status.status = NEON_READER_ERROR;
                g_mutex_unlock(h->reader_status.mutex);
                _LEAVE NULL;
            } else if (1 == ret) {
                /*
                 * EOF encountered while reading from the
                 * network. Set the EOF status and exit.
                 */
                _DEBUG("EOF encountered while reading from the network. Terminating reader thread");
                h->reader_status.status = NEON_READER_EOF;
                g_mutex_unlock(h->reader_status.mutex);
                _LEAVE NULL;
            }

            /*
             * So we actually got some data out of the stream.
             */
            _DEBUG("Network read succeeded");
        } else {
            /*
             * Not enough free space in the buffer.
             * Sleep until the main thread wakes us up.
             */
            g_mutex_lock(h->reader_status.mutex);
            if (h->reader_status.reading) {
                _DEBUG("Reader thread going to sleep");
                g_cond_wait(h->reader_status.cond, h->reader_status.mutex);
                _DEBUG("Reader thread woke up");
            } else {
                /*
                 * Main thread has ordered termination of this thread.
                 * Leave the loop.
                 */
                break;
            }
        }
    }

    _DEBUG("Reader thread terminating gracefully");
    h->reader_status.status = NEON_READER_TERM;
    g_mutex_unlock(h->reader_status.mutex);

    _LEAVE NULL;
}

/*
 * -----
 */

VFSFile* neon_vfs_fopen_impl(const gchar* path, const gchar* mode) {

    VFSFile* file;
    struct neon_handle* handle;

    _ENTER;

    _DEBUG("Trying to open '%s' with neon", path);

    if (NULL == (file = malloc(sizeof(VFSFile)))) {
        _ERROR("Could not allocate memory for filehandle");
        _LEAVE NULL;
    }

    if (NULL == (handle = handle_init())) {
        _ERROR("Could not allocate memory for neon handle");
        free(file);
        _LEAVE NULL;
    }

    if (NULL == (handle->url = strdup(path))) {
        _ERROR("Could not copy URL string");
        handle_free(handle);
        free(file);
        _LEAVE NULL;
    }

    if (0 != open_handle(handle, 0)) {
        _ERROR("Could not open URL");
        handle_free(handle);
        free(file);
        _LEAVE NULL;
    }

    file->handle = handle;
    file->base = &neon_http_const;

    _LEAVE file;
}

/*
 * ----
 */

gint neon_vfs_fclose_impl(VFSFile* file) {

    struct neon_handle* h = (struct neon_handle *)file->handle;

    _ENTER;

    if (NULL != h->reader) {
        kill_reader(h);
    }

    _DEBUG("Destroying request");
    if (NULL != h->request) {
        ne_request_destroy(h->request);
    }

    _DEBUG("Destroying session");
    ne_session_destroy(h->session);

    handle_free(h);

    _LEAVE 0;
}

/*
 * -----
 */

size_t neon_vfs_fread_impl(gpointer ptr_, size_t size, size_t nmemb, VFSFile* file) {

    struct neon_handle* h = (struct neon_handle*)file->handle;
    int belem;
    int ret;

    _ENTER;

    if (NULL == h->request) {
        _ERROR("No request to read from, seek gone wrong?");
        _LEAVE 0;
    }

    _DEBUG("Requesting %d elements of %d bytes size each (%d bytes total), to be stored at %p",
            nmemb, size, (nmemb*size), ptr_);

    /*
     * Look how much data is in the buffer
     */
    belem = used_rb(&h->rb) / size;

    if ((NULL != h->reader) && (0 == belem)) {
        /*
         * There is a reader thread, but the buffer is empty.
         * If we are running normally we will have to rebuffer.
         * Kill the reader thread and restart.
         */
        g_mutex_lock(h->reader_status.mutex);
        if (NEON_READER_RUN == h->reader_status.status) {
            g_mutex_unlock(h->reader_status.mutex);
            _ERROR("Buffer underrun, trying rebuffering");
            kill_reader(h);
        } else {
            g_mutex_unlock(h->reader_status.mutex);
        }
    }

    if (NULL == h->reader) {
        /*
         * There is no reader thread yet. Read the first bytes from
         * the network ourselves, and then fire up the reader thread
         * to keep the buffer filled up.
         */
        _DEBUG("Doing initial buffer fill");
        ret = fill_buffer_limit(h, NBUFSIZ/2);

        if (-1 == ret) {
            _ERROR("Error while reading from the network");
            _LEAVE 0;
        } else if (1 == ret) {
            _ERROR("EOF during initial read");
            _LEAVE 0;
        }

        /*
         * We have some data in the buffer now.
         * Start the reader thread.
         */
        h->reader_status.reading = TRUE;
        if (NULL == (h->reader = g_thread_create(reader_thread, h, TRUE, NULL))) {
            h->reader_status.reading = FALSE;
            _ERROR("Error creating reader thread!");
            _LEAVE 0;
        }
        g_mutex_lock(h->reader_status.mutex);
        h->reader_status.status = NEON_READER_RUN;
        g_mutex_unlock(h->reader_status.mutex);
    } else {
        /*
         * There already is a reader thread. Look if it is in good
         * shape.
         */
        g_mutex_lock(h->reader_status.mutex);
        _DEBUG("Reader thread status: %d", h->reader_status.status);
        switch (h->reader_status.status) {
            case NEON_READER_INIT:
            case NEON_READER_RUN:
                /*
                 * All is well, nothing to be done.
                 */
                break;
            case NEON_READER_EOF:
                /*
                 * If there still is data in the buffer, carry on.
                 * If not, terminate the reader thread and return 0.
                 */
                if (0 == used_rb(&h->rb)) {
                    _DEBUG("Reached end of stream");
                    g_mutex_unlock(h->reader_status.mutex);
                    kill_reader(h);
                    h->eof = TRUE;
                    _LEAVE 0;
                }
                break;
            case NEON_READER_ERROR:
                /* Terminate the reader and return 0 */
                g_mutex_unlock(h->reader_status.mutex);
                kill_reader(h);
                _LEAVE 0;
                break;
            case NEON_READER_TERM:
                /*
                 * The reader thread terminated gracefully, most
                 * likely on our own request.
                 * We should not get here.
                 */
                _ERROR("Reader thread terminated and fread() called. How did we get here?");
                g_mutex_unlock(h->reader_status.mutex);
                kill_reader(h);
                _LEAVE 0;
        }
        g_mutex_unlock(h->reader_status.mutex);
    }

    /*
     * Deliver data from the buffer
     */
    belem = used_rb(&h->rb) / size;

    if (0 == belem) {
        /*
         * The buffer is empty, we can deliver no data!
         */
        _ERROR("Buffer still underrun, fatal.");
        _LEAVE 0;
    }

    _DEBUG("%d elements of data in the buffer", belem);
    read_rb(&h->rb, ptr_, MIN(belem, nmemb)*size);

    /*
     * Signal the network thread to continue reading
     */
    _DEBUG("Waking up reader thread");
    g_mutex_lock(h->reader_status.mutex);
    g_cond_signal(h->reader_status.cond);
    g_mutex_unlock(h->reader_status.mutex);

    h->pos += (MIN(belem, nmemb)*size);

    _DEBUG("Returning %d elements", MIN(belem, nmemb));

    _LEAVE MIN(belem, nmemb);
}


/*
 * -----
 */

size_t neon_vfs_fwrite_impl(gconstpointer ptr, size_t size, size_t nmemb, VFSFile* file) {

    _ENTER;

    _ERROR("NOT IMPLEMENTED");

    _LEAVE 0;
}

/*
 * -----
 */

gint neon_vfs_getc_impl(VFSFile* file) {

    gchar c;

    _ENTER;

    if (1 != neon_vfs_fread_impl(&c, 1, 1, file)) {
        _ERROR("Could not getc()!");
        _LEAVE -1;
    }

    _LEAVE c;
}

/*
 * -----
 */

gint neon_vfs_ungetc_impl(gint c, VFSFile* stream) {

    _ENTER;

    _ERROR("NOT IMPLEMENTED");

    _LEAVE 0;
}

/*
 * -----
 */

void neon_vfs_rewind_impl(VFSFile* file) {

    _ENTER;

    (void)neon_vfs_fseek_impl(file, 0L, SEEK_SET);

    _LEAVE;
}

/*
 * -----
 */

glong neon_vfs_ftell_impl(VFSFile* file) {

    struct neon_handle* h = (struct neon_handle *)file->handle;

    _ENTER;

    _DEBUG("Current file position: %d", h->pos);

    _LEAVE h->pos;
}

/*
 * -----
 */

gboolean neon_vfs_feof_impl(VFSFile* file) {

    struct neon_handle* h = (struct neon_handle*)file->handle;

    _ENTER;

    _LEAVE h->eof;
}

/*
 * -----
 */

gint neon_vfs_truncate_impl(VFSFile* file, glong size) {

    _ENTER;

    _ERROR("NOT IMPLEMENTED");

    _LEAVE 0;
}

/*
 * -----
 */

gint neon_vfs_fseek_impl(VFSFile* file, glong offset, gint whence) {

    struct neon_handle* h = (struct neon_handle*)file->handle;
    long newpos;
    long content_length;

    _ENTER;

    _DEBUG("Seek requested: offset %ld, whence %d", offset, whence);
    /*
     * Two things must be satisfied for us to be able to seek:
     * - the server must advertise a content-length
     * - the server must advertise accept-ranges: bytes
     */
    if ((-1 == h->content_length) || !h->can_ranges) {
        _DEBUG("Can not seek due to server restrictions");
        _LEAVE -1;
    }

    content_length = h->content_length + h->content_start;

    switch (whence) {
        case SEEK_SET:
            newpos = offset;
            break;
        case SEEK_CUR:
            newpos = h->pos + offset;
            break;
        case SEEK_END:
            newpos = content_length + offset;
            break;
        default:
            _ERROR("Invalid whence specified");
            _LEAVE -1;
    }

    _DEBUG("Position to seek to: %ld, current: %ld", newpos, h->pos);
    if (0 > newpos) {
        _ERROR("Can not seek before start of stream");
        _LEAVE -1;
    }

    if (newpos > content_length) {
        _ERROR("Can not seek beyond end of stream");
        _LEAVE -1;
    }

    if (newpos == h->pos) {
        _LEAVE 0;
    }

    /*
     * To seek to the new position we have to
     * - stop the current reader thread, if there is one
     * - destroy the current request
     * - dump all data currently in the ringbuffer
     * - create a new request starting at newpos
     */
    if (NULL != h->reader) {
        /*
         * There may be a thread still running.
         */
        kill_reader(h);
    }

    ne_request_destroy(h->request);
    ne_session_destroy(h->session);
    reset_rb(&h->rb);

    if (0 != open_handle(h, newpos)) {
        /*
         * Something went wrong while creating the new request.
         * There is not much we can do now, we'll set the request
         * to NULL, so that fread() will error out on the next
         * read request
         */
        _ERROR("Error while creating new request!");
        h->request = NULL;
        _LEAVE -1;
    }

    /*
     * Things seem to have worked. The next read request will start
     * the reader thread again.
     */

    _LEAVE 0;
}

/*
 * -----
 */

gchar *neon_vfs_metadata_impl(VFSFile* file, const gchar * field) {

    _ENTER;

    _ERROR("NOT IMPLEMENTED");

    _LEAVE NULL;
}

/*
 * -----
 */

off_t neon_vfs_fsize_impl(VFSFile* file) {

    struct neon_handle* h = (struct neon_handle*)file->handle;

    _ENTER;

    if (-1 == h->content_length) {
        _DEBUG("Unknown content length");
        _LEAVE 0;
    }

    _LEAVE (h->content_start + h->content_length);
}