diff recpt1/recpt1.c @ 18:84ff6ef710ea

- support stdout for output - changed representation for indefinite recording to '-' - handles EPIPE error
author Yoshiki Yazawa <yaz@honeyplanet.jp>
date Wed, 25 Feb 2009 17:22:31 +0900
parents 52c7c7c64ba6
children b63f5c100e5a
line wrap: on
line diff
--- a/recpt1/recpt1.c	Wed Feb 25 03:07:50 2009 +0900
+++ b/recpt1/recpt1.c	Wed Feb 25 17:22:31 2009 +0900
@@ -10,6 +10,8 @@
 #include <unistd.h>
 #include <getopt.h>
 #include <signal.h>
+#include <errno.h>
+#include <sys/time.h>
 
 #include <netdb.h>
 #include <arpa/inet.h>
@@ -29,12 +31,18 @@
     struct sockaddr_in addr;
 } sock_data;
 
-typedef struct thread_data {
+typedef struct reader_thread_data {
     QUEUE_T *queue;
     decoder *decoder;
     int wfd;    /* output file fd */
     sock_data *sock_data;
-} thread_data;
+    pthread_t signal_thread;
+} reader_thread_data;
+
+typedef struct signal_thread_data {
+    QUEUE_T *queue;
+    int tfd;    /* tuner fd */
+} signal_thread_data;
 
 /* lookup frequency conversion table*/
 ISDB_T_FREQ_CONV_TABLE *
@@ -64,11 +72,11 @@
 
     if(p_queue != NULL) {
         p_queue->size = size;
-        p_queue->no_full = size;
-        p_queue->no_empty = 0;
+        p_queue->num_avail = size;
+        p_queue->num_used = 0;
         pthread_mutex_init(&p_queue->mutex, NULL);
-        pthread_cond_init(&p_queue->cond_full, NULL);
-        pthread_cond_init(&p_queue->cond_empty, NULL);
+        pthread_cond_init(&p_queue->cond_avail, NULL);
+        pthread_cond_init(&p_queue->cond_used, NULL);
     }
 
     return p_queue;
@@ -81,8 +89,8 @@
         return;
 
     pthread_mutex_destroy(&p_queue->mutex);
-    pthread_cond_destroy(&p_queue->cond_full);
-    pthread_cond_destroy(&p_queue->cond_empty);
+    pthread_cond_destroy(&p_queue->cond_avail);
+    pthread_cond_destroy(&p_queue->cond_used);
     free(p_queue);
 }
 
@@ -90,65 +98,85 @@
 void
 enqueue(QUEUE_T *p_queue, BUFSZ *data)
 {
+    struct timeval now;
+    struct timespec spec;
+
+    gettimeofday(&now, NULL);
+    spec.tv_sec = now.tv_sec + 1;
+    spec.tv_nsec = now.tv_usec * 1000;
+
     pthread_mutex_lock(&p_queue->mutex);
     /* entered critical section */
 
-    /* wait until queue is not full */
-    while(!p_queue->no_full) {
-        pthread_cond_wait(&p_queue->cond_full, &p_queue->mutex);
-        fprintf(stderr, "Full\n");
+    /* wait while queue is full */
+    while(p_queue->num_avail == 0) {
+        pthread_cond_timedwait(&p_queue->cond_avail,
+                               &p_queue->mutex, &spec);
+        if(f_exit)
+            return;
     }
 
     p_queue->buffer[p_queue->in] = data;
 
+    /* move position marker for input to next position */
     p_queue->in++;
     p_queue->in %= p_queue->size;
 
-    p_queue->no_full--;
-    p_queue->no_empty++;
+    /* update counters */
+    p_queue->num_avail--;
+    p_queue->num_used++;
 
     /* leaving critical section */
     pthread_mutex_unlock(&p_queue->mutex);
-    pthread_cond_signal(&p_queue->cond_empty);
+    pthread_cond_signal(&p_queue->cond_used);
 }
 
 /* dequeue data. this function will block if queue is empty. */
 BUFSZ *
 dequeue(QUEUE_T *p_queue)
 {
+    struct timeval now;
+    struct timespec spec;
     BUFSZ *buffer;
 
+    gettimeofday(&now, NULL);
+    spec.tv_sec = now.tv_sec + 1;
+    spec.tv_nsec = now.tv_usec * 1000;
+
     pthread_mutex_lock(&p_queue->mutex);
     /* entered the critical section*/
 
-    /* wait until queue is filled */
-    while (!p_queue->no_empty) {
-        pthread_cond_wait(&p_queue->cond_empty, &p_queue->mutex);
+    /* wait while queue is empty */
+    while(p_queue->num_used == 0) {
+        pthread_cond_timedwait(&p_queue->cond_avail,
+                               &p_queue->mutex, &spec);
+        if(f_exit)
+            return NULL;
     }
 
     /* take buffer address */
     buffer = p_queue->buffer[p_queue->out];
 
-    /* move location marker to next position */
+    /* move position marker for output to next position */
     p_queue->out++;
     p_queue->out %= p_queue->size;
 
-    /* update flags */
-    p_queue->no_full++;
-    p_queue->no_empty--;
+    /* update counters */
+    p_queue->num_avail++;
+    p_queue->num_used--;
 
     /* leaving the critical section */
     pthread_mutex_unlock(&p_queue->mutex);
-    pthread_cond_signal(&p_queue->cond_full);
+    pthread_cond_signal(&p_queue->cond_avail);
 
     return buffer;
 }
 
 /* this function will be reader thread */
 void *
-write_func(void *p)
+reader_func(void *p)
 {
-    thread_data *data = (thread_data *)p;
+    reader_thread_data *data = (reader_thread_data *)p;
     QUEUE_T *p_queue = data->queue;
     decoder *dec = data->decoder;
     int wfd = data->wfd;
@@ -156,6 +184,7 @@
     int use_udp = data->sock_data ? TRUE : FALSE;
     int fileless = FALSE;
     int sfd = -1;
+    pthread_t signal_thread = data->signal_thread;
     struct sockaddr *addr = NULL;
     BUFSZ *buf;
     ARIB_STD_B25_BUFFER sbuf, dbuf;
@@ -205,12 +234,11 @@
             }
 
             /* normal exit */
-            if((f_exit) && (!p_queue->no_empty)) {
+            if((f_exit) && (!p_queue->num_used)) {
                 if(use_b25) {
                     code = b25_finish(dec, &sbuf, &dbuf);
                     if(code < 0) {
                         fprintf(stderr, "b25_finish failed\n");
-                        close(sfd);
                         break;
                     }
 
@@ -219,16 +247,15 @@
                                addr, sizeof(struct sockaddr_in));
                     }
                 }
-                close(sfd);
                 break;
             }
-        } /* fileless */
+        } /* end of fileless */
         else {
 
+            ssize_t wc;
             buf = dequeue(p_queue);
             /* no entry in the queue */
             if(buf == NULL) {
-                close(wfd);
                 break;
             }
 
@@ -240,10 +267,12 @@
                 code = b25_decode(dec, &sbuf, &dbuf);
                 if(code < 0) {
                     fprintf(stderr, "b25_decode failed\n");
-                    close(wfd);
                     break;
                 }
-                write(wfd, dbuf.data, dbuf.size);
+                wc = write(wfd, dbuf.data, dbuf.size);
+                if(wc <= 0 && errno == EPIPE) {
+                    pthread_kill(signal_thread, SIGPIPE);
+                }
 
                 if(use_udp && sfd != -1) {
                     sendto(sfd, dbuf.data, dbuf.size, 0,
@@ -252,7 +281,10 @@
                 free(buf);
             }
             else {
-                write(wfd, sbuf.data, sbuf.size);
+                wc = write(wfd, sbuf.data, sbuf.size);
+                if(wc <= 0 && errno == EPIPE) {
+                    pthread_kill(signal_thread, SIGPIPE);
+                }
 
                 if(use_udp && sfd != -1) {
                     sendto(sfd, sbuf.data, sbuf.size, 0,
@@ -262,24 +294,23 @@
             }
 
             /* normal exit */
-            if((f_exit) && (!p_queue->no_empty)) {
+            if((f_exit) && (!p_queue->num_used)) {
                 if(use_b25) {
                     code = b25_finish(dec, &sbuf, &dbuf);
                     if(code < 0) {
                         fprintf(stderr, "b25_finish failed\n");
-                        close(wfd);
-                        close(sfd);
                         break;
                     }
-                    write(wfd, dbuf.data, dbuf.size);
+                    wc = write(wfd, dbuf.data, dbuf.size);
+                    if(wc <= 0 && errno == EPIPE) {
+                        pthread_kill(signal_thread, SIGPIPE);
+                    }
 
                     if(use_udp && sfd != -1) {
                         sendto(sfd, dbuf.data, dbuf.size, 0,
                                addr, sizeof(struct sockaddr_in));
                     }
                 }
-                close(wfd);
-                close(sfd);
                 break;
             }
         }
@@ -294,6 +325,10 @@
     fprintf(stderr, "\n");
     fprintf(stderr, "Usage: \n%s [--b25 [--round N] [--strip] [--EMM]] [--udp [--addr hostname --port portnumber]] channel recsec destfile\n", cmd);
     fprintf(stderr, "\n");
+    fprintf(stderr, "Remarks:\n");
+    fprintf(stderr, "if recsec   is '-', records indefinitely.\n");
+    fprintf(stderr, "if destfile is '-', stdout is used for output.\n");
+    fprintf(stderr, "\n");
 }
 
 void
@@ -401,10 +436,10 @@
 }
 
 void
-cleanup(int tfd)
+cleanup(signal_thread_data *sdata)
 {
     /* stop recording */
-    ioctl(tfd, STOP_REC, 0);
+    ioctl(sdata->tfd, STOP_REC, 0);
 
     /* restore LNB state */
 #if 0
@@ -414,8 +449,11 @@
         }
     }
 #endif
-    /* xxx really need mutex? */
+    /* xxx need mutex? */
     f_exit = TRUE;
+
+    pthread_cond_signal(&sdata->queue->cond_avail);
+    pthread_cond_signal(&sdata->queue->cond_used);
 }
 
 /* will be signal handler thread */
@@ -424,22 +462,28 @@
 {
     sigset_t waitset;
     int sig;
-    int tfd = *(int *)data;
+    signal_thread_data *sdata;
+    sdata = (signal_thread_data *)data;
 
     sigemptyset(&waitset);
+    sigaddset(&waitset, SIGPIPE);
     sigaddset(&waitset, SIGINT);
     sigaddset(&waitset, SIGTERM);
 
     sigwait(&waitset, &sig);
 
     switch(sig) {
+    case SIGPIPE:
+        fprintf(stderr, "\nSIGPIPE received. cleaning up...\n");
+        cleanup(sdata);
+        break;
     case SIGINT:
         fprintf(stderr, "\nSIGINT received. cleaning up...\n");
-        cleanup(tfd);
+        cleanup(sdata);
         break;
     case SIGTERM:
         fprintf(stderr, "\nSIGTERM received. cleaning up...\n");
-        cleanup(tfd);
+        cleanup(sdata);
         break;
     }
 
@@ -447,21 +491,19 @@
 }
 
 void
-init_signal_handlers(pthread_t *signal_thread, int tfd)
+init_signal_handlers(pthread_t *signal_thread, signal_thread_data *sdata)
 {
     sigset_t blockset;
-    static int tunerfd;
-
-    tunerfd = tfd;
 
     sigemptyset(&blockset);
+    sigaddset(&blockset, SIGPIPE);
     sigaddset(&blockset, SIGINT);
     sigaddset(&blockset, SIGTERM);
 
     if(pthread_sigmask(SIG_BLOCK, &blockset, NULL))
         fprintf(stderr, "pthread_sigmask() failed.\n");
 
-    pthread_create(signal_thread, NULL, process_signals, &tunerfd);
+    pthread_create(signal_thread, NULL, process_signals, sdata);
 }
 
 int
@@ -479,7 +521,8 @@
     QUEUE_T *p_queue = create_queue(MAX_QUEUE);
     BUFSZ   *bufptr;
     decoder *dec = NULL;
-    thread_data tdata;
+    static reader_thread_data tdata;
+    static signal_thread_data sdata;
     decoder_options dopt = {
         4,  /* round */
         0,  /* strip */
@@ -499,15 +542,16 @@
         { "addr",      1, NULL, 'a'},
         { "port",      1, NULL, 'p'},
         { "help",      0, NULL, 'h'},
-        {0, 0, 0, 0} /* terminate */
+        {0, 0, NULL, 0} /* terminate */
     };
 
     int use_b25 = FALSE;
     int use_udp = FALSE;
     int fileless = FALSE;
+    int use_stdout = FALSE;
     char *host_to = NULL;
     int port_to = 1234;
-    sock_data *sdata = NULL;
+    sock_data *sockdata = NULL;
 
     while((result = getopt_long(argc, argv, "br:smua:p:h",
                                 long_options, &option_index)) != -1) {
@@ -601,9 +645,30 @@
     }
 
     /* get recsec */
-    recsec = atoi(argv[optind + 1]);
-    if(recsec <= 0)
+    char *recsecstr = argv[optind + 1];
+	if(!strcmp("-", recsecstr)) {
         indefinite = TRUE;
+        recsec = -1;
+	}
+    else {
+        recsec = atoi(recsecstr);
+    }
+
+    /* open output file */
+    char *destfile = argv[optind + 2];
+	if(!strcmp("-", destfile)) {
+        use_stdout = TRUE;
+        wfd = 1; /* stdout */
+	}
+    else {
+        if(!fileless) {
+            wfd = open(argv[optind + 2], (O_RDWR | O_CREAT | O_TRUNC), 0666);
+            if(wfd < 0) {
+                fprintf(stderr, "Could not open output file(%s)\n", argv[optind + 2]);
+                return 1;
+            }
+        }
+    }
 
     /* initialize decoder */
     if(use_b25) {
@@ -617,34 +682,27 @@
 
     /* initialize udp connection */
     if(use_udp) {
-      sdata = calloc(1, sizeof(sock_data));
+      sockdata = calloc(1, sizeof(sock_data));
       struct in_addr ia;
       ia.s_addr = inet_addr(host_to);
       if(ia.s_addr == INADDR_NONE) {
             struct hostent *hoste = gethostbyname(host_to);
             if(!hoste) {
-                perror("failed to get host by name");
+                perror("gethostbyname");
                 return 1;
             }
             ia.s_addr = *(in_addr_t*) (hoste->h_addr_list[0]);
         }
-        sdata->addr.sin_family = AF_INET;
-        sdata->addr.sin_port = htons (port_to);
-        sdata->addr.sin_addr.s_addr = ia.s_addr;
-        if((sdata->sfd = socket(PF_INET, SOCK_DGRAM, 0)) < 0) {
+        sockdata->addr.sin_family = AF_INET;
+        sockdata->addr.sin_port = htons (port_to);
+        sockdata->addr.sin_addr.s_addr = ia.s_addr;
+        if((sockdata->sfd = socket(PF_INET, SOCK_DGRAM, 0)) < 0) {
             perror("socket");
             return 1;
         }
     }
 
-    /* open output file */
-    if(!fileless) {
-        wfd = open(argv[optind + 2], (O_RDWR | O_CREAT | O_TRUNC), 0666);
-        if(wfd < 0) {
-            fprintf(stderr, "Could not open output file(%s)\n", argv[optind + 2]);
-            return 1;
-        }
-    }
+    /* setup tuner */
     if(ptr->type == CHTYPE_SATELLITE) {
         if(ioctl(tfd, LNB_ENABLE, 0) < 0) {
             return 0 ;
@@ -659,14 +717,17 @@
     calc_cn(tfd, ptr->type);
 
     /* init signal handler thread */
-    init_signal_handlers(&signal_thread, tfd);
+    sdata.queue = p_queue;
+    sdata.tfd = tfd;
+    init_signal_handlers(&signal_thread, &sdata);
 
     /* make reader thread */
     tdata.queue = p_queue;
     tdata.decoder = dec;
     tdata.wfd = wfd;
-    tdata.sock_data = sdata;
-    pthread_create(&dequeue_thread, NULL, write_func, &tdata);
+    tdata.sock_data = sockdata;
+    tdata.signal_thread = signal_thread;
+    pthread_create(&dequeue_thread, NULL, reader_func, &tdata);
 
     /* start recording */
     if(ioctl(tfd, START_REC, 0) < 0) {
@@ -723,25 +784,27 @@
     }
     close(tfd);
 
-    /* wait reader thread */
+    /* wait for threads */
     pthread_join(dequeue_thread, NULL);
-//    fprintf(stderr, "dequeue_thread joined\n");
+    pthread_join(signal_thread, NULL);
 
-    pthread_join(signal_thread, NULL);
-//    fprintf(stderr, "signal_thread joined\n");
-
-    /* relase queue */
+    /* release queue */
     destroy_queue(p_queue);
 
+    /* close output file */
+    if(!use_stdout)
+        close(wfd);
+
     /* free socket data */
-    free(sdata);
+    if(use_udp) {
+        close(sockdata->sfd);
+        free(sockdata);
+    }
 
     /* release decoder */
     if(use_b25) {
         b25_shutdown(dec);
     }
 
-//    fprintf(stderr, "leaving main\n");
-
     return 0;
 }