view recpt1/recpt1.c @ 8:6da603afd363

added udp capability
author Yoshiki Yazawa <yaz@honeyplanet.jp>
date Mon, 23 Feb 2009 03:06:17 +0900
parents 407af34cfbd9
children 4615eaf04415
line wrap: on
line source

#include <stdio.h>
#include <fcntl.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <time.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include <unistd.h>
#include <getopt.h>

#include <netdb.h>
#include <arpa/inet.h>
#include <netinet/in.h>

#include <sys/ioctl.h>
#include "pt1_ioctl.h"

#include "recpt1.h"
#include "decoder.h"

/* globals */
int f_exit = FALSE;

typedef struct sock_data {
    int sfd;    /* socket fd */
    struct sockaddr_in addr;
} sock_data;

typedef struct thread_data {
    QUEUE_T *queue;
    decoder *decoder;
    int wfd;    /* output file fd */
    sock_data *sock_data;
} thread_data;

/* lookup frequency conversion table*/
ISDB_T_FREQ_CONV_TABLE *
searchrecoff(char *channel)
{
    int lp;

    for(lp = 0; isdb_t_conv_table[lp].parm_freq != NULL; lp++){
        /* return entry number in the table when strings match and
         * lengths are same. */
        if((memcmp(isdb_t_conv_table[lp].parm_freq, channel,
                   strlen(channel)) == 0) &&
           (strlen(channel) == strlen(isdb_t_conv_table[lp].parm_freq))){
            return &isdb_t_conv_table[lp];
        }
    }
    return NULL;
}

QUEUE_T *
create_queue(size_t size)
{
    QUEUE_T *p_queue;
    int memsize = sizeof(QUEUE_T) + size * sizeof(BUFSZ);

    p_queue = (QUEUE_T*)calloc(memsize, sizeof(char));

    if(p_queue != NULL) {
        p_queue->size = size;
        p_queue->no_full = size;
        p_queue->no_empty = 0;
        pthread_mutex_init(&p_queue->mutex, NULL);
        pthread_cond_init(&p_queue->cond_full, NULL);
        pthread_cond_init(&p_queue->cond_empty, NULL);
    }

    return p_queue;
}

void
destroy_queue(QUEUE_T *p_queue)
{
    if(!p_queue)
        return;

    pthread_mutex_destroy(&p_queue->mutex);
    pthread_cond_destroy(&p_queue->cond_full);
    pthread_cond_destroy(&p_queue->cond_empty);
    free(p_queue);
}

/* enqueue data. this function will block if queue is full. */
void
enqueue(QUEUE_T *p_queue, BUFSZ *data)
{
    pthread_mutex_lock(&p_queue->mutex);
    /* entered critical section */

    /* wait until queue is not full */
    while(!p_queue->no_full) {
        pthread_cond_wait(&p_queue->cond_full, &p_queue->mutex);
        fprintf(stderr, "Full\n");
    }

    p_queue->buffer[p_queue->in] = data;

    p_queue->in++;
    p_queue->in %= p_queue->size;

    p_queue->no_full--;
    p_queue->no_empty++;

    /* leaving critical section */
    pthread_mutex_unlock(&p_queue->mutex);
    pthread_cond_signal(&p_queue->cond_empty);
}

/* dequeue data. this function will block if queue is empty. */
BUFSZ *
dequeue(QUEUE_T *p_queue)
{
    BUFSZ *buffer;

    pthread_mutex_lock(&p_queue->mutex);
    /* entered the critical section*/

    /* wait until queue is filled */
    while (!p_queue->no_empty) {
        pthread_cond_wait(&p_queue->cond_empty, &p_queue->mutex);
    }

    /* take buffer address */
    buffer = p_queue->buffer[p_queue->out];

    /* move location marker to next position */
    p_queue->out++;
    p_queue->out %= p_queue->size;

    /* update flags */
    p_queue->no_full++;
    p_queue->no_empty--;

    /* leaving the critical section */
    pthread_mutex_unlock(&p_queue->mutex);
    pthread_cond_signal(&p_queue->cond_full);

    return buffer;
}

/* this function will be reader thread */
void *
write_func(void *p)
{
    thread_data *data = (thread_data *)p;
    QUEUE_T *p_queue = data->queue;
    decoder *dec = data->decoder;
    int wfd = data->wfd;
    int use_b25 = dec ? 1 : 0;
    int use_udp = data->sock_data ? 1 : 0;
    int sfd = 0;
    struct sockaddr *addr = NULL;
    BUFSZ *buf;
    ARIB_STD_B25_BUFFER sbuf, dbuf;
    int code;

    if(use_udp) {
        sfd = data->sock_data->sfd;
        addr = (struct sockaddr *)&data->sock_data->addr;
    }

    while(1) {
        buf = dequeue(p_queue);
        /* no entry in the queue */
        if(buf == NULL){
            close(wfd);
            break;
        }

        sbuf.data = buf->buffer;
        sbuf.size = buf->size;

        if(use_b25) {
            /* write data to output file*/
            code = b25_decode(dec, &sbuf, &dbuf);
            if(code < 0) {
                fprintf(stderr, "b25_decode failed\n");
                close(wfd);
                break;
            }
            write(wfd, dbuf.data, dbuf.size);

            if(use_udp && sfd) {
                sendto(sfd, dbuf.data, dbuf.size, 0,
                       addr, sizeof(struct sockaddr_in));
            }
            free(buf);
        } else {
            write(wfd, sbuf.data, sbuf.size);

            if(use_udp && sfd) {
                sendto(sfd, sbuf.data, sbuf.size, 0,
                       addr, sizeof(struct sockaddr_in));
            }
            free(buf);
        }

        /* normal exit */
        if((f_exit) && (!p_queue->no_empty)) {
            if(use_b25) {
                code = b25_finish(dec, &sbuf, &dbuf);
                if(code < 0) {
                    fprintf(stderr, "b25_finish failed\n");
                    close(wfd);
                    close(sfd);
                    break;
                }
                write(wfd, dbuf.data, dbuf.size);

                if(use_udp && sfd) {
                    sendto(sfd, dbuf.data, dbuf.size, 0,
                           addr, sizeof(struct sockaddr_in));
                }
            }
            close(wfd);
            close(sfd);
            break;
        }
    }

    return NULL;
}

void
show_usage(char *cmd)
{
    fprintf(stderr, "Usage: %s [--b25 [--round N] [--strip] [--EMM]] [--udp hostname [--port port]] channel recsec destfile\n", cmd);
}

int
main(int argc, char **argv)
{
    int tfd, wfd;
    int lp;
    int recsec;
    time_t start_time, cur_time;
    FREQUENCY freq;
    ISDB_T_FREQ_CONV_TABLE *ptr;
    pthread_t dequeue_threads;
    QUEUE_T *p_queue = create_queue(MAX_QUEUE);
    BUFSZ   *bufptr;
    decoder *dec = NULL;
    thread_data tdata;
    decoder_options dopt = {
        4,  /* round */
        0,  /* strip */
        0   /* emm */
    };

    int result;
    int option_index;
    struct option long_options[] = {
        { "b25",       0, NULL, 'b'},
        { "B25",       0, NULL, 'b'},
        { "round",     1, NULL, 'r'},
        { "strip",     0, NULL, 's'},
        { "emm",       0, NULL, 'm'},
        { "EMM",       0, NULL, 'm'},
        { "udp",       1, NULL, 'u'},
        { "port",      1, NULL, 'p'}
    };

    int use_b25 = 0;
    int use_udp = 0;
    char *host_to = NULL;
    int port_to = 1234;
    sock_data *sdata = NULL;

    while((result = getopt_long(argc, argv, "br:sm" "u:p:", long_options, &option_index)) != -1) {
        switch(result) {
        case 'b':
            use_b25 = 1;
            fprintf(stderr, "using B25...\n");
            break;
        case 's':
            dopt.strip = 1;
            fprintf(stderr, "enable B25 strip\n");
            break;
        case 'm':
            dopt.emm = 1;
            fprintf(stderr, "enable B25 emm processing\n");
            break;
        case 'r':
            dopt.round = atoi(optarg);
            fprintf(stderr, "set round %d\n", dopt.round);
            break;
        case ':':
            fprintf(stderr, "%c needs value\n", result);
            break;
        case '?':
            show_usage(argv[0]);
            break;
        case 'u':
            use_udp = 1;
            host_to = optarg;
            fprintf(stderr, "UDP destination address: %s\n", host_to);
            break;
        case 'p':
            port_to = atoi(optarg);
            fprintf(stderr, "UDP port: %d\n", port_to);
            break;
        }
    }

    if(argc - optind < 3) {
        show_usage(argv[0]);
        printf("channel =\n");
        printf("151ch:BS朝日\n");
        printf("161ch:BS-i\n");
        printf("191ch:WOWOW\n");
        printf("171ch:BSジャパン\n");
        printf("200ch:スターチャンネル\n");
        printf("211ch:BS11デジタル\n");
        printf("222ch:TwellV\n");
        printf("141ch:BS日テレ\n");
        printf("181ch:BSフジ\n");
        printf("101ch:NHK衛星第1放送(BS1)\n");
        printf("102ch:NHK衛星第2放送(BS2)\n");
        printf("103ch:NHKハイビジョン(BShi)\n");
        printf("CS2-CS24:CSチャンネル\n");
        return 1;
    }
    ptr = searchrecoff(argv[optind]);
    if(ptr == NULL){
        printf("Channel Select Error(%s)\n", argv[optind]);
        return 1;
    }

    freq.frequencyno = ptr->set_freq;
    freq.slot = ptr->add_freq;

    if(ptr->type == CHTYPE_SATELLITE) {
        for(lp = 0; lp < 2; lp++) {
            tfd = open(bsdev[lp], O_RDONLY);
            if(tfd >= 0) {
                break;
            }
        }
        if(tfd < 0) {
            fprintf(stderr, "Device Open Error\n");
            return 1;
        }
    } else {
        for(lp = 0; lp < 2; lp++) {
            tfd = open(isdb_t_dev[lp], O_RDONLY);
            if(tfd >= 0) {
                break;
            }
        }
        if(tfd < 0) {
            fprintf(stderr, "Device Open Error\n");
            return 1;
        }
    }
    recsec = atoi(argv[optind + 1]);

    /* initialize decoder */
    if(use_b25) {
        dec = b25_startup(&dopt);
        if(!dec) {
            fprintf(stderr, "cannot start b25 decoder\n");
            fprintf(stderr, "fall back to encrypted recording\n");
            use_b25 = 0;
        }
    }

    /* initialize udp connection */
    if(use_udp) {
      sdata = calloc(1, sizeof(sock_data));
      struct in_addr ia;
      ia.s_addr = inet_addr(host_to);
      if(ia.s_addr == INADDR_NONE) {
            struct hostent *hoste = gethostbyname(host_to);
            if(!hoste) {
                perror("failed to get host by name");
                return 1;
            }
            ia.s_addr = *(in_addr_t*) (hoste->h_addr_list[0]);
        }
        sdata->addr.sin_family = AF_INET;
        sdata->addr.sin_port = htons (port_to);
        sdata->addr.sin_addr.s_addr = ia.s_addr;
        if((sdata->sfd = socket(PF_INET, SOCK_DGRAM, 0)) < 0) {
            perror("socket");
            return 1;
        }
    }

    /* open output file */
    wfd = open(argv[optind + 2], (O_RDWR | O_CREAT | O_TRUNC), 0666);
    if(wfd < 0) {
        fprintf(stderr, "Output File Open Error(%s)\n", argv[optind + 2]);
        return 1;
    }

    if(ioctl(tfd, SET_CHANNEL, &freq) < 0) {
        fprintf(stderr, "Tuner Select Error\n");
        return 1;
    }

    /* make reader thread */
    tdata.queue = p_queue;
    tdata.decoder = dec;
    tdata.wfd = wfd;
    tdata.sock_data = sdata;
    pthread_create(&dequeue_threads, NULL, write_func, &tdata);

    /* start recording */
    if(ioctl(tfd, START_REC, 0) < 0) {
        fprintf(stderr, "Tuner Start Error\n");
        return 1;
    }

    time(&start_time);

    /* read from tuner */
    while(1) {
        time(&cur_time);
        bufptr = malloc(sizeof(BUFSZ));
        bufptr->size = read(tfd, bufptr->buffer, MAX_READ_SIZE);
        if(bufptr->size <= 0) {
            if((cur_time - start_time) >= recsec) {
                f_exit = TRUE;
                enqueue(p_queue, NULL);
                break;
            } else {
                continue;
            }
        }
        enqueue(p_queue, bufptr);

        /* stop recording */
        if((cur_time - start_time) >= recsec) {
            ioctl(tfd, STOP_REC, 0);
            /* read remaining data */
            while(1) {
                bufptr = malloc(sizeof(BUFSZ));
                bufptr->size = read(tfd, bufptr->buffer, MAX_READ_SIZE);
                if(bufptr->size <= 0) {
                    f_exit = TRUE;
                    enqueue(p_queue, NULL);
                    break;
                }
                enqueue(p_queue, bufptr);
            }
            break;
        }
    }
    /* close tuner */
    close(tfd);

    /* wait reader thread */
    pthread_join(dequeue_threads, NULL);
    destroy_queue(p_queue);

    /* close socket */
    if(use_udp) {
        close(sdata->sfd);
        free(sdata);
    }

    /* release decoder */
    if(use_b25) {
        b25_shutdown(dec);
    }

    return 0;
}