/* Icecast
 *
 * This program is distributed under the GNU General Public License, version 2.
 * A copy of this license is included with this source.
 *
 * Copyright 2000-2004, Jack Moffitt <jack@xiph.org, 
 *                      Michael Smith <msmith@xiph.org>,
 *                      oddsock <oddsock@xiph.org>,
 *                      Karl Heyes <karl@xiph.org>
 *                      and others (see AUTHORS for details).
 */

/* -*- c-basic-offset: 4; indent-tabs-mode: nil; -*- */
#ifdef HAVE_CONFIG_H
#include <config.h>
#endif

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include <ogg/ogg.h>
#include <errno.h>

#ifndef _WIN32
#include <unistd.h>
#include <sys/time.h>
#include <sys/socket.h>
#else
#include <winsock2.h>
#include <windows.h>
#endif

#include "thread/thread.h"
#include "avl/avl.h"
#include "httpp/httpp.h"
#include "net/sock.h"

#include "connection.h"
#include "global.h"
#include "refbuf.h"
#include "client.h"
#include "stats.h"
#include "logging.h"
#include "cfgfile.h"
#include "util.h"
#ifdef USE_YP
#include "geturl.h"
#endif
#include "source.h"
#include "format.h"
#include "auth.h"

#undef CATMODULE
#define CATMODULE "source"

#define MAX_FALLBACK_DEPTH 10

mutex_t move_clients_mutex;

/* avl tree helper */
static int _compare_clients(void *compare_arg, void *a, void *b);
static int _free_client(void *key);
static int _parse_audio_info(source_t *source, char *s);
static void source_run_script (char *command, char *mountpoint);


/* Allocate a new source with the stated mountpoint, if one already
 * exists with that mountpoint in the global source tree then return
 * NULL.
 */
source_t *source_reserve (const char *mount)
{
    source_t *src = NULL;

    do
    {
        avl_tree_wlock (global.source_tree);
        src = source_find_mount_raw (mount);
        if (src)
        {
            src = NULL;
            break;
        }

        src = calloc (1, sizeof(source_t));
        if (src == NULL)
            break;

        /* make duplicates for strings or similar */
        src->mount = strdup (mount);
        src->max_listeners = -1;

        src->active_clients_tail = &src->active_clients;
        src->pending_clients_tail = &src->pending_clients;

        thread_mutex_create (&src->lock);

        avl_insert (global.source_tree, src);

    } while (0);

    avl_tree_unlock (global.source_tree);
    return src;
}


/* Find a mount with this raw name - ignoring fallbacks. You should have the
 * global source tree locked to call this.
 */
source_t *source_find_mount_raw(const char *mount)
{
    source_t *source;
    avl_node *node;
    int cmp;

    if (!mount) {
        return NULL;
    }
    /* get the root node */
    node = global.source_tree->root->right;
    
    while (node) {
        source = (source_t *)node->key;
        cmp = strcmp(mount, source->mount);
        if (cmp < 0) 
            node = node->left;
        else if (cmp > 0)
            node = node->right;
        else
            return source;
    }
    
    /* didn't find it */
    return NULL;
}


/* Search for mount, if the mount is there but not currently running then
 * check the fallback, and so on.  Must have a global source lock to call
 * this function.
 */
source_t *source_find_mount (const char *mount)
{
    source_t *source = NULL;
    ice_config_t *config;
    mount_proxy *mountinfo;
    int depth = 0;

    config = config_get_config();
    while (mount != NULL)
    {
        /* limit the number of times through, maybe infinite */
        if (depth > MAX_FALLBACK_DEPTH)
        {
            source = NULL;
            break;
        }

        source = source_find_mount_raw(mount);
        if (source == NULL)
            break; /* fallback to missing mountpoint */

        if (source->running)
            break;
        else
            if (source->on_demand)
                break;

        /* source is not running, meaning that the fallback is not configured
           within the source, we need to check the mount list */
        mountinfo = config->mounts;
        source = NULL;
        while (mountinfo)
        {
            if (strcmp (mountinfo->mountname, mount) == 0)
                break;
            mountinfo = mountinfo->next;
        }
        if (mountinfo)
            mount = mountinfo->fallback_mount;
        else
            mount = NULL;
        depth++;
    }

    config_release_config();
    return source;
}


int source_compare_sources(void *arg, void *a, void *b)
{
    source_t *srca = (source_t *)a;
    source_t *srcb = (source_t *)b;

    return strcmp(srca->mount, srcb->mount);
}


void source_clear_source (source_t *source)
{
#ifdef USE_YP
    int i;
#endif
    DEBUG1 ("clearing source \"%s\"", source->mount);
    client_destroy(source->client);
    source->client = NULL;
    source->parser = NULL;
    source->con = NULL;

    if (source->dumpfile)
    {
        INFO1 ("Closing dumpfile for %s", source->mount);
        fclose (source->dumpfile);
        source->dumpfile = NULL;
    }

    /* lets drop any clients still connected */
    while (source->active_clients)
    {
        client_t *client = source->active_clients;
        source->active_clients = client->next;
        _free_client (client);
    }
    source->active_clients_tail = &source->active_clients;
    while (source->pending_clients)
    {
        client_t *client = source->pending_clients;
        source->pending_clients = client->next;
        _free_client (client);
    }
    source->pending_clients_tail = &source->pending_clients;

    /* flush out the stream data, we don't want any left over */
    while (source->stream_data)
    {
        refbuf_t *p = source->stream_data;
        source->stream_data = p->next;
        if (source->stream_data && p->associated == source->stream_data->associated)
            p->associated = NULL;
        refbuf_release (p);
    }
    source->stream_data_tail = NULL;
    source->associated = NULL;

    if (source->format && source->format->free_plugin)
    {
        source->format->free_plugin (source->format);
    }
    source->format = NULL;
#ifdef USE_YP
    for (i=0; i<source->num_yp_directories; i++)
    {
        yp_destroy_ypdata(source->ypdata[i]);
        source->ypdata[i] = NULL;
    }
    source->num_yp_directories = 0;

    util_dict_free (source->audio_info);
    source->audio_info = NULL;
#endif
    source->starting_point = NULL;
    source->starting_count = 0;
    source->queue_size = 0;
    source->queue_size_limit = 0;
    source->listeners = 0;
    source->no_mount = 0;
    source->max_listeners = -1;
    source->yp_public = 0;

    free(source->fallback_mount);
    source->fallback_mount = NULL;

    free(source->dumpfilename);
    source->dumpfilename = NULL;

    free (source->on_connect);
    source->on_connect = NULL;

    free (source->on_disconnect);
    source->on_disconnect = NULL;

    source->on_demand_req = 0;
}


/* Remove the provided source from the global tree and free it */
void source_free_source (source_t *source)
{
    DEBUG1 ("freeing source \"%s\"", source->mount);
    avl_tree_wlock (global.source_tree);
    avl_delete (global.source_tree, source, NULL);
    avl_tree_unlock (global.source_tree);

    /* There should be no listeners on this mount */
    if (source->active_clients)
        WARN1("active listeners on mountpoint %s", source->mount);

    if (source->pending_clients)
        WARN1("pending listeners on mountpoint %s", source->mount);

    thread_mutex_destroy (&source->lock);

    free (source->mount);
    free (source);

    return;
}


client_t *source_find_client(source_t *source, int id)
{
    client_t fakeclient, *client = NULL;
    connection_t fakecon;

    fakeclient.con = &fakecon;
    fakeclient.con->id = id;

    client = source->active_clients;
    while (client)
    {
        if (_compare_clients (NULL, client, &fakeclient) == 0)
            break;
        client = client->next;
    }

    return client;
}
    

/* Move clients from source to dest provided dest is running
 * and that the stream format is the same.
 * The only lock that should be held when this is called is the
 * source tree lock
 */
void source_move_clients (source_t *source, source_t *dest)
{
    /* we don't want the two write locks to deadlock in here */
    thread_mutex_lock (&move_clients_mutex);
    thread_mutex_lock (&dest->lock);

    /* if the destination is not running then we can't move clients */
    if (dest->running == 0)
    {
        WARN1 ("destination mount %s not running, unable to move clients ", dest->mount);
        thread_mutex_unlock (&dest->lock);
        thread_mutex_unlock (&move_clients_mutex);
        return;
    }

    do
    {
        thread_mutex_lock (&source->lock);

        if (source->format == NULL)
        {
            INFO1 ("source mount %s is not available", source->mount);
            break;
        }
        if (source->format->type != dest->format->type)
        {
            WARN2 ("stream %s and %s are of different types, ignored", source->mount, dest->mount);
            break;
        }

        INFO2 ("passing %d listeners to \"%s\"", source->listeners,
                dest->mount);

        /* we need to move the client and pending trees */
        while (source->active_clients)
        {
            client_t *client = source->active_clients;
            source->active_clients = client->next;
            client->refbuf = dest->stream_data_tail;
            client->pos = 0;
            *dest->pending_clients_tail = client;
            dest->pending_clients_tail = &client->next;
        }
        source->active_clients_tail = &source->active_clients;
        while (source->pending_clients)
        {
            client_t *client = source->pending_clients;
            source->pending_clients = client->next;
            *dest->pending_clients_tail = client;
            dest->pending_clients_tail = &client->next;
        }
        source->pending_clients_tail = &source->pending_clients;

        dest->new_listeners += source->listeners;
        source->listeners = 0;
        stats_event (source->mount, "listeners", "0");

    } while (0);

    thread_mutex_unlock (&source->lock);
    thread_mutex_unlock (&dest->lock);
    thread_mutex_unlock (&move_clients_mutex);
}


static refbuf_t *get_next_buffer (source_t *source)
{
    refbuf_t *refbuf = NULL;
    int delay = 250;

    if (source->short_delay)
        delay = 0;
    while (global.running == ICE_RUNNING && source->running)
    {
        int fds;
        fds = util_timed_wait_for_fd (source->con->sock, delay);
        if (fds ==  0)
        {
            time_t current = time(NULL);

            if (source->last_read == 0)
            {
                source->last_read = current;
            }
            else
            {
                if (source->last_read + (time_t)source->timeout < current)
                {
                    WARN0 ("Disconnecting source due to socket timeout");
                    source->running = 0;
                    continue;
                }
            }
            return NULL;
        }
        if (fds < 0)
        {
            if (sock_recoverable (sock_error()))
                return NULL;
            else
            {
                WARN0 ("Error while waiting on socket, Disconnecting source");
                source->running = 0;
                break;
            }
        }
        source->last_read = 0;
        refbuf = source->format->get_buffer (source);
        if (refbuf)
        {
            if (refbuf->len == 0 && refbuf->associated == NULL)
            {
                /* any empty buffers get ignored, go back to waiting */
                refbuf_release (refbuf);
                refbuf = NULL;
                continue;
            }
            if (refbuf->associated == NULL)
                refbuf->associated = source->associated;

            if (refbuf->associated != source->associated)
                source->associated = refbuf->associated;

            break;
        }
    }

    return refbuf;
}


/* generic send routine per listener.  The deletion_expected tells us whether the last
 * in the queue is about to disappear, so if this client is still referring to it after
 * writing then drop the client as it's fallen too far behind
 */
static void send_to_listener (source_t *source, client_t *client, int deletion_expected)
{
    int bytes;
    int loop = 4;   /* max number of iterations in one go */
    int total_written = 0;

    if (client->predata)
    {
        char *ptr = client->predata + client->predata_offset;
        unsigned len  = client->predata_len - client->predata_offset;
        bytes = client_send_bytes (client, ptr, len);
        if (bytes > 0 && bytes < len)
        {
            client->predata_offset += bytes;
            return;
        }
        free (client->predata);
        client->predata_size = client->predata_len = client->predata_offset = 0;
        client->predata = NULL;
    }

    /* new users need somewhere to start from */
    if (client->refbuf == NULL)
        client->refbuf = source->starting_point;

    while (1)
    {
        /* jump out if client has error'd */
        if (client->con->error)
            break;

        /* lets not send too much to one client in one go, but don't
           sleep for too long if more data can be sent */
        if (total_written > 12000 || loop == 0)
        {
            source->short_delay = 1;
            break;
        }

        loop--; 

        bytes = source->format->write_buf_to_client (source->format, client);
        if (bytes <= 0)
            break;  /* can't write any more */

        total_written += bytes;
    }

    /* the refbuf referenced at head (last in queue) may be marked for deletion
       if so, check to see if this client is still referring to it */
    if (deletion_expected && client->refbuf == source->stream_data)
    {
        DEBUG0("Client has fallen too far behind, removing");
        client->con->error = 1;
    }
}


static void source_init (source_t *source)
{
    ice_config_t *config = config_get_config();
    char *listenurl;
    int listen_url_size;
#ifdef USE_YP
    char *s;
    long current_time;
    int    i;
    char *ai;

    for (i=0;i<config->num_yp_directories;i++) {
        if (config->yp_url[i]) {
            source->ypdata[source->num_yp_directories] = yp_create_ypdata();
            source->ypdata[source->num_yp_directories]->yp_url = 
                strdup (config->yp_url[i]);
            source->ypdata[source->num_yp_directories]->yp_url_timeout = 
                config->yp_url_timeout[i];
            source->ypdata[source->num_yp_directories]->yp_touch_interval = 0;
            source->num_yp_directories++;
        }
    }
    
    source->audio_info = util_dict_new();
    /* ice-* is icecast, icy-* is shoutcast */
    if ((s = httpp_getvar(source->parser, "ice-url"))) {
        add_yp_info(source, "server_url", s, YP_SERVER_URL);
    }
    if ((s = httpp_getvar(source->parser, "ice-name"))) {
        add_yp_info(source, "server_name", s, YP_SERVER_NAME);
    }
    if ((s = httpp_getvar(source->parser, "icy-name"))) {
        add_yp_info(source, "server_name", s, YP_SERVER_NAME);
    }
    if ((s = httpp_getvar(source->parser, "ice-url"))) {
        add_yp_info(source, "server_url", s, YP_SERVER_URL);
    }
    if ((s = httpp_getvar(source->parser, "icy-url"))) {
        add_yp_info(source, "server_url", s, YP_SERVER_URL);
    }
    if ((s = httpp_getvar(source->parser, "ice-genre"))) {
        add_yp_info(source, "genre", s, YP_SERVER_GENRE);
    }
    if ((s = httpp_getvar(source->parser, "icy-genre"))) {
        add_yp_info(source, "genre", s, YP_SERVER_GENRE);
    }
    if ((s = httpp_getvar(source->parser, "ice-bitrate"))) {
        add_yp_info(source, "bitrate", s, YP_BITRATE);
    }
    if ((s = httpp_getvar(source->parser, "icy-br"))) {
        add_yp_info(source, "bitrate", s, YP_BITRATE);
    }
    if ((s = httpp_getvar(source->parser, "ice-description"))) {
        add_yp_info(source, "server_description", s, YP_SERVER_DESC);
    }
    if ((s = httpp_getvar(source->parser, "ice-public"))) {
        stats_event(source->mount, "public", s);
        source->yp_public = atoi(s);
    }
    if ((s = httpp_getvar(source->parser, "icy-pub"))) {
        stats_event(source->mount, "public", s);
        source->yp_public = atoi(s);
    }
    if ((s = httpp_getvar(source->parser, "ice-audio-info"))) {
        stats_event(source->mount, "audio_info", s);
        if (_parse_audio_info(source, s)) {
            ai = util_dict_urlencode(source->audio_info, '&');
            add_yp_info(source, "audio_info", 
                    ai,
                    YP_AUDIO_INFO);
            if (ai) {
                free(ai);
            }
        }
    }
    for (i=0;i<source->num_yp_directories;i++) {
        add_yp_info(source, "server_type", 
                     source->format->format_description,
                     YP_SERVER_TYPE);
        if (source->ypdata[i]->listen_url) {
            free(source->ypdata[i]->listen_url);
        }
        /* 6 for max size of port */
        listen_url_size = strlen("http://") + strlen(config->hostname) +
            strlen(":") + 6 + strlen (source->mount) + 1;
        source->ypdata[i]->listen_url = malloc (listen_url_size);
        sprintf (source->ypdata[i]->listen_url, "http://%s:%d%s",
                config->hostname, config->port, source->mount);
    }

    if(source->yp_public) {

        current_time = time(NULL);

        for (i=0;i<source->num_yp_directories;i++) {
            /* Give the source 5 seconds to update the metadata
               before we do our first touch */
            /* Don't permit touch intervals of less than 30 seconds */
            if (source->ypdata[i]->yp_touch_interval <= 30) {
                source->ypdata[i]->yp_touch_interval = 30;
            }
            source->ypdata[i]->yp_last_touch = 0;
        }
    }
#endif

    /* 6 for max size of port */
    listen_url_size = strlen("http://") + strlen(config->hostname) +
        strlen(":") + 6 + strlen(source->mount) + 1;

    listenurl = malloc (listen_url_size);
    memset (listenurl, '\000', listen_url_size);
    snprintf (listenurl, listen_url_size, "http://%s:%d%s",
            config->hostname, config->port, source->mount);
    config_release_config();

    stats_event (source->mount, "listenurl", listenurl);

    if (listenurl) {
        free(listenurl);
    }

    if (source->dumpfilename != NULL)
    {
        source->dumpfile = fopen (source->dumpfilename, "ab");
        if (source->dumpfile == NULL)
        {
            WARN2("Cannot open dump file \"%s\" for appending: %s, disabling.",
                    source->dumpfilename, strerror(errno));
        }
    }

    /* grab a read lock, to make sure we get a chance to cleanup */
    thread_rwlock_rlock (source->shutdown_rwlock);

    /* start off the statistics */
    source->listeners = 0;
    stats_event_inc (NULL, "sources");
    stats_event_inc (NULL, "source_total_connections");
    stats_event (source->mount, "listeners", "0");
    stats_event (source->mount, "type", source->format->format_description);

    sock_set_blocking (source->con->sock, SOCK_NONBLOCK);

    DEBUG0("Source creation complete");
    source->running = 1;

    if (source->on_connect)
        source_run_script (source->on_connect, source->mount);

    /*
    ** Now, if we have a fallback source and override is on, we want
    ** to steal it's clients, because it means we've come back online
    ** after a failure and they should be gotten back from the waiting
    ** loop or jingle track or whatever the fallback is used for
    */

    if (source->fallback_override && source->fallback_mount)
    {
        source_t *fallback_source;

        avl_tree_rlock(global.source_tree);
        fallback_source = source_find_mount(source->fallback_mount);

        if (fallback_source)
            source_move_clients (fallback_source, source);

        avl_tree_unlock(global.source_tree);
    }
    thread_mutex_lock (&source->lock);
}


static void source_shutdown (source_t *source)
{
    INFO1("Source \"%s\" exiting", source->mount);
    source->running = 0;

#ifdef USE_YP
    if (source->yp_public) {
        yp_remove(source);
    }
#endif

    if (source->on_disconnect)
        source_run_script (source->on_disconnect, source->mount);

    if (source->fallback_mount)
    {
        source_t *fallback_source;

        avl_tree_rlock(global.source_tree);
        fallback_source = source_find_mount (source->fallback_mount);

        if (fallback_source != NULL)
        {
            /* be careful wrt to deadlocking */
            thread_mutex_unlock (&source->lock);
            source_move_clients (source, fallback_source);
            thread_mutex_lock (&source->lock);
        }

        avl_tree_unlock (global.source_tree);
    }

    /* delete this sources stats */
    stats_event_dec (NULL, "sources");
    stats_event (source->mount, "listeners", NULL);

    /* we don't remove the source from the tree here, it may be a relay and
       therefore reserved */
    source_clear_source (source);

    thread_mutex_unlock (&source->lock);

    global_lock();
    global.sources--;
    global_unlock();

    /* release our hold on the lock so the main thread can continue cleaning up */
    thread_rwlock_unlock(source->shutdown_rwlock);
}


void source_main(source_t *source)
{
    long bytes;
    client_t *client;
    refbuf_t *refbuf;
    int listeners = 0;

    source_init (source);

    bytes = 0;
    while (global.running == ICE_RUNNING && source->running)
    {
        int remove_from_q;
        client_t **client_p;

        thread_mutex_unlock (&source->lock);

        refbuf = get_next_buffer (source);

        /* take the lock and process each listener */
        thread_mutex_lock (&source->lock);

        remove_from_q = 0;
        source->short_delay = 0;

        if (refbuf)
        {
            /* append buffer to the in-flight data queue,  */
            if (source->stream_data == NULL)
            {
                source->stream_data = refbuf;
                source->starting_point = refbuf;
                source->starting_count = 0 - refbuf->len;
            }
            if (source->stream_data_tail)
                source->stream_data_tail->next = refbuf;
            source->stream_data_tail = refbuf;

            /* lets see if we have too much data in the queue, but do not
               remove it until later */
            source->queue_size += refbuf->len;
            if (source->queue_size  > source->queue_size_limit)
                remove_from_q = 1;

            /* move the starting point for new listeners */
            source->starting_count += refbuf->len;
            if (source->starting_count > source->starting_max)
            {
                source->starting_count -= refbuf->len;
                source->starting_point = source->starting_point->next;
            }

            /* save stream to file */
            if (source->dumpfile && source->format->write_buf_to_file)
                source->format->write_buf_to_file (source, refbuf);
        }
        
        listeners = source->listeners;

        /* add pending clients */
        if (source->pending_clients)
        {
            unsigned count = 0;

            while (source->pending_clients)
            {
                client_t *client = source->pending_clients;
                source->pending_clients = client->next;

                if (client->format_data == NULL)
                {
                    DEBUG0("processing pending client headers");
                    format_prepare_headers (source, client);
                    if (source->format->create_client_data &&
                            source->format->create_client_data (source, client) < 0)
                    {
                        DEBUG0("pending client failed");
                        _free_client(client);
                        continue;
                    }
                }

                *source->active_clients_tail = client;
                source->active_clients_tail = &client->next;
                count++;
            }
            source->pending_clients_tail = &source->pending_clients;
            DEBUG1("Adding %d client(s)", count);
            stats_event_add (NULL, "clients", count);
            stats_event_add (source->mount, "connections", count);

            listeners += count;
            source->new_listeners = 0;
        }
        client = source->active_clients;
        client_p = &source->active_clients;
        while (client)
        {
            send_to_listener (source, client, remove_from_q);

            if (client->con->error)
            {
                client_t *to_go = client;

                *client_p = client->next;
                if (client->next == NULL)
                    source->active_clients_tail = client_p;

                client = client->next;
                _free_client (to_go);
                listeners--;
                DEBUG0("Client removed");
            }
            else
            {
                client_p = &client->next;
                client = client->next;
            }
        }

        /* modify the listener count if need be */
        if (source->listeners != listeners)
        {
            DEBUG1("listener count now %d", listeners);
            stats_event_args (source->mount, "listeners", "%d", listeners);
            source->listeners = listeners;
            if (listeners == 0 && source->on_demand)
                source->running = 0;
        }

        if (remove_from_q)
        {
            refbuf_t *to_go = source->stream_data;
            /* associated data is shared so don't release it if the next refbuf refers to it */
            if (to_go->next)
            {
                source->stream_data = to_go->next;
                if (to_go->associated == source->stream_data->associated)
                    to_go->associated = NULL;
                source->queue_size -= to_go->len;
                if (source->starting_point == to_go)
                {
                    source->starting_point = to_go->next;
                    source->starting_count -= to_go->len;
                }
                refbuf_release (to_go);
            }
            else
                WARN0("possible queue length error");
        }
    }
    source->running = 0;

    source_shutdown (source);
}


static int _compare_clients(void *compare_arg, void *a, void *b)
{
    client_t *clienta = (client_t *)a;
    client_t *clientb = (client_t *)b;

    connection_t *cona = clienta->con;
    connection_t *conb = clientb->con;

    if (cona->id < conb->id) return -1;
    if (cona->id > conb->id) return 1;

    return 0;
}

int source_remove_client(void *key)
{
    return 1;
}

static int _free_client(void *key)
{
    client_t *client = (client_t *)key;

    global_lock();
    global.clients--;
    global_unlock();
    stats_event_dec(NULL, "clients");

    /* if no response has been sent then send a 404 */
    if (client->respcode == 0)
        client_send_404 (client, "Mount unavailable");
    else
        client_destroy (client);
    
    return 1;
}

static int _parse_audio_info(source_t *source, char *s)
{
    char *token = NULL;
    char *pvar = NULL;
    char *variable = NULL;
    char *value = NULL;

    while ((token = strtok(s,";")) != NULL) {
        pvar = strchr(token, '=');
        if (pvar) {
            variable = (char *)malloc(pvar-token+1);
            strncpy(variable, token, pvar-token);    
            variable[pvar-token] = 0;
            pvar++;
            if (strlen(pvar)) {
                value = util_url_unescape(pvar);
                util_dict_set(source->audio_info, variable, value);
                stats_event(source->mount, variable, value);
                if (value) {
                    free(value);
                }
            }
            if (variable) {
                free(variable);
            }
        }
        s = NULL;
    }
    return 1;
}


void source_apply_mount (source_t *source, mount_proxy *mountinfo)
{
    DEBUG1("Applying mount information for \"%s\"", source->mount);
    source->max_listeners = mountinfo->max_listeners;
    source->fallback_override = mountinfo->fallback_override;
    source->no_mount = mountinfo->no_mount;
    if (mountinfo->fallback_mount)
    {
        source->fallback_mount = strdup (mountinfo->fallback_mount);
        DEBUG1 ("fallback %s", mountinfo->fallback_mount);
    }
    if (mountinfo->auth_type != NULL)
    {
        source->authenticator = auth_get_authenticator(
                mountinfo->auth_type, mountinfo->auth_options);
    }
    if (mountinfo->dumpfile)
    {
        DEBUG1("Dumping stream to %s", mountinfo->dumpfile);
        source->dumpfilename = strdup (mountinfo->dumpfile);
    }
    if (mountinfo->queue_size_limit)
    {
        source->queue_size_limit = mountinfo->queue_size_limit;
        DEBUG1 ("queue size to %u", source->queue_size_limit);
    }
    if (mountinfo->source_timeout)
    {
        source->timeout = mountinfo->source_timeout;
        DEBUG1 ("source timeout to %u", source->timeout);
    }
    if (mountinfo->burst_size)
    {
        source->burst_size = mountinfo->burst_size;
        DEBUG1 ("burst size to %u", source->burst_size);
    }
    if (mountinfo->on_connect)
    {
        source->on_connect = strdup(mountinfo->on_connect);
        DEBUG1 ("connect script \"%s\"", source->on_connect);
    }
    if (mountinfo->on_disconnect)
    {
        source->on_disconnect = strdup(mountinfo->on_disconnect);
        DEBUG1 ("disconnect script \"%s\"", source->on_disconnect);
    }
}


void *source_client_thread (void *arg)
{
    source_t *source = arg;
    const char ok_msg[] = "HTTP/1.0 200 OK\r\n\r\n";
    int bytes;

    source->client->respcode = 200;
    bytes = sock_write_bytes (source->client->con->sock, ok_msg, sizeof (ok_msg)-1);
    if (bytes < sizeof (ok_msg)-1)
    {
        global_lock();
        global.sources--;
        global_unlock();
        WARN0 ("Error writing 200 OK message to source client");
    }
    else
    {
        source->client->con->sent_bytes += bytes;

        stats_event_inc(NULL, "source_client_connections");
        source_main (source);
    }
    source_free_source (source);
    return NULL;
}


static void source_run_script (char *command, char *mountpoint)
{
    pid_t pid;

    switch (pid = fork ())
    {
    case -1:
        ERROR2 ("Unable to fork %s (%s)", command, strerror (errno));
        break;
    case 0:  /* child */
        DEBUG1 ("Starting command %s", command);
        execl (command, command, mountpoint, NULL);
        ERROR2 ("Unable to run command %s (%s)", command, strerror (errno));
        break;
    default: /* parent */
        break;
    }
    return;
}

