changeset 134:c4e0a5777363

add DLNA arg. (--dlna) modify waits when stream_queue is empry.
author Naoya OYAMA <naoya.oyama@gmail.com>
date Mon, 18 Oct 2010 03:56:39 +0900
parents 0db6ccf0fe31
children 1d343a4eb657
files src/recpt1.c
diffstat 1 files changed, 90 insertions(+), 91 deletions(-) [+]
line wrap: on
line diff
--- a/src/recpt1.c	Thu Oct 14 00:07:05 2010 +0900
+++ b/src/recpt1.c	Mon Oct 18 03:56:39 2010 +0900
@@ -222,15 +222,12 @@
 {
     struct timeval now;
     struct timespec spec;
-    //fprintf (stderr, "enqueue() start.\n");
 
     gettimeofday(&now, NULL);
     spec.tv_sec = now.tv_sec + 1;
     spec.tv_nsec = now.tv_usec * 1000;
 
-    //fprintf (stderr, "enqueue() mutex lock try. num_used[%d] num_avail[%d]\n", p_queue->num_used, p_queue->num_avail);
     pthread_mutex_lock(&p_queue->mutex);
-    //fprintf (stderr, "enqueue() mutex lock success. num_used[%d] num_avail[%d]\n", p_queue->num_used, p_queue->num_avail);
     /* entered critical section */
 
     /* wait while queue is full */
@@ -254,7 +251,6 @@
     p_queue->num_used++;
 
     /* leaving critical section */
-    //fprintf (stderr, "enqueue() mutex unlock. num_used[%d]\n", p_queue->num_used);
     pthread_mutex_unlock(&p_queue->mutex);
     pthread_cond_signal(&p_queue->cond_used);
 }
@@ -271,19 +267,15 @@
     struct timeval now;
     struct timespec spec;
     int i;
-    //fprintf (stderr, "stream_enqueue() start.\n");
 
     gettimeofday(&now, NULL);
     spec.tv_sec = now.tv_sec + 1;
     spec.tv_nsec = now.tv_usec * 1000;
 
-    //fprintf (stderr, "stream_enqueue() mutex lock try. num_used[%d]\n", p_queue->num_used);
     pthread_mutex_lock(&p_queue->mutex);
-    //fprintf (stderr, "stream_enqueue() mutex lock success. num_used[%d]\n", p_queue->num_used);
     /* entered critical section */
 
     if (p_queue->num_avail == 0) {
-        //fprintf (stderr, "stream_enqueue() num_used reach max[%d].\n", p_queue->num_used);
         /* stream queue $B$O0lGU$K$J$C$?$i>C5n$7$F$7$^$&(B */
         for ( i=0; i < p_queue->size; i++ ) {
             if ( p_queue->buffer[i] != NULL ) {
@@ -310,7 +302,6 @@
     p_queue->num_used++;
 
     /* leaving critical section */
-    //fprintf (stderr, "stream_enqueue() mutex unlock.\n");
     pthread_mutex_unlock(&p_queue->mutex);
     pthread_cond_signal(&p_queue->cond_used);
 }
@@ -323,60 +314,11 @@
     struct timespec spec;
     BUFSZ *buffer;
 
-    //fprintf (stderr, "dequeue() start.\n");
     gettimeofday(&now, NULL);
     spec.tv_sec = now.tv_sec + 1;
     spec.tv_nsec = now.tv_usec * 1000;
 
-    //fprintf (stderr, "dequeue() mutex lock try. num_used[%d]\n", p_queue->num_used);
     pthread_mutex_lock(&p_queue->mutex);
-    //fprintf (stderr, "dequeue() mutex lock success. num_used[%d]\n", p_queue->num_used);
-    /* entered the critical section*/
-
-    /* wait while queue is empty */
-    while(p_queue->num_used == 0) {
-        pthread_cond_timedwait(&p_queue->cond_used,
-                               &p_queue->mutex, &spec);
-        if(f_exit) {
-            pthread_mutex_unlock(&p_queue->mutex);
-            return NULL;
-        }
-    }
-
-    /* take buffer address */
-    buffer = p_queue->buffer[p_queue->out];
-
-    /* move position marker for output to next position */
-    p_queue->out++;
-    p_queue->out %= p_queue->size;
-
-    /* update counters */
-    p_queue->num_avail++;
-    p_queue->num_used--;
-
-    /* leaving the critical section */
-    //fprintf (stderr, "dequeue() mutex unlock.\n");
-    pthread_mutex_unlock(&p_queue->mutex);
-    pthread_cond_signal(&p_queue->cond_avail);
-
-    return buffer;
-}
-
-ARIB_STD_B25_BUFFER *
-stream_dequeue(STREAM_QUEUE_T *p_queue)
-{
-    struct timeval now;
-    struct timespec spec;
-    ARIB_STD_B25_BUFFER *buffer;
-
-    //fprintf (stderr, "stream_dequeue() start.\n");
-    gettimeofday(&now, NULL);
-    spec.tv_sec = now.tv_sec + 1;
-    spec.tv_nsec = now.tv_usec * 1000;
-
-    //fprintf (stderr, "stream_dequeue() mutex lock try. num_used[%d]\n", p_queue->num_used);
-    pthread_mutex_lock(&p_queue->mutex);
-    //fprintf (stderr, "stream_dequeue() mutex lock success. num_used[%d]\n", p_queue->num_used);
     /* entered the critical section*/
 
     /* wait while queue is empty */
@@ -402,10 +344,56 @@
 
     /* leaving the critical section */
     pthread_mutex_unlock(&p_queue->mutex);
-    //fprintf (stderr, "stream_dequeue() mutex unlock.\n");
     pthread_cond_signal(&p_queue->cond_avail);
-    //fprintf (stderr, "dequeue() finish.\n");
+
+    return buffer;
+}
+
+#define QUQUE_LOW_MIN (100)
+#define QUEUE_LOW_USLEEP (800*1000)
+ARIB_STD_B25_BUFFER *
+stream_dequeue(STREAM_QUEUE_T *p_queue)
+{
+    struct timeval now;
+    struct timespec spec;
+    ARIB_STD_B25_BUFFER *buffer;
+
+    gettimeofday(&now, NULL);
+    spec.tv_sec = now.tv_sec + 1;
+    spec.tv_nsec = now.tv_usec * 1000;
+
+    pthread_mutex_lock(&p_queue->mutex);
+    /* entered the critical section*/
 
+    /* wait while queue is empty */
+    while(p_queue->num_used == 0) {
+        pthread_cond_timedwait(&p_queue->cond_used,
+                               &p_queue->mutex, &spec);
+        if(f_exit) {
+            pthread_mutex_unlock(&p_queue->mutex);
+            return NULL;
+        }
+    }
+
+    /* take buffer address */
+    buffer = p_queue->buffer[p_queue->out];
+
+    /* move position marker for output to next position */
+    p_queue->out++;
+    p_queue->out %= p_queue->size;
+
+    /* update counters */
+    p_queue->num_avail++;
+    p_queue->num_used--;
+
+    /* leaving the critical section */
+    pthread_mutex_unlock(&p_queue->mutex);
+    pthread_cond_signal(&p_queue->cond_avail);
+
+    if ( p_queue->num_used < QUQUE_LOW_MIN ) {
+        /* force sleep 800msec */
+        usleep(QUEUE_LOW_USLEEP);
+    }
     return buffer;
 }
 
@@ -422,7 +410,7 @@
     boolean use_udp = data->sock_data ? TRUE : FALSE;
     boolean fileless = FALSE;
     boolean use_splitter = splitter ? TRUE : FALSE;
-    boolean use_streaming = TRUE; // $BK\Ev$O0z?t$K$9$k$3$H(B
+    boolean use_dlna = data->streamer ? TRUE : FALSE;
     int sfd = -1;
     pthread_t signal_thread = data->signal_thread;
     struct sockaddr_in *addr = NULL;
@@ -446,12 +434,9 @@
     }
 
     while(1) {
-//        fprintf (stderr, "reader_func() while loop\n");
         ssize_t wc = 0;
         int file_err = 0;
-        //fprintf (stderr, "reader_func() dequeue() start.\n");
         qbuf = dequeue(p_queue);
-        //fprintf (stderr, "reader_func() dequeue() finish.\n");
         /* no entry in the queue */
         if(qbuf == NULL) {
             break;
@@ -526,18 +511,18 @@
          *        2.2.3.1 enqueue() $B$O(B tdata->stream_queue->mutex $B$r(B lock/unlock $B$7$F=q$-9~$_;~$NF1;~99?7$rKI;_$7$F$$$k(B
          */
         //fprintf (stderr, "reader_func() buf.size[%d]\n", buf.size);
-        if ( use_streaming && buf.size > 0 ) {
+        if ( use_dlna && buf.size > 0 ) {
             do {
                 eqbuf = malloc(sizeof(ARIB_STD_B25_BUFFER));
                 if ( eqbuf == NULL ) {
                     fprintf (stderr, "Cannot malloc eqbuf memory. streaming abort.\n");
-                    use_streaming = FALSE;
+                    use_dlna = FALSE;
                     break;
                 }
                 eqbuf->data = malloc(buf.size);
                 if ( eqbuf->data == NULL ) {
                     fprintf (stderr, "Cannot malloc eqbuf memory. streaming abort.\n");
-                    use_streaming = FALSE;
+                    use_dlna = FALSE;
                     break;
                 }
                 eqbuf->size = buf.size;
@@ -683,16 +668,13 @@
         qbuf = stream_dequeue(p_queue);
         /* no entry in the queue */
         if(qbuf == NULL) {
-            //fprintf (stderr, "stream_func(): dequeue() return NULL pointer. streaming abort.\n");
             continue;
         }
         data->streamer->total_byte += qbuf->size;
         // $B%/%j%F%#%+%k%;%/%7%g%sD9$$$N$J$s$H$+$7$?$$$J$!!D(B
         // ToDo: memcpy $B$H$+%/%j%F%#%+%k%;%/%7%g%s$N30$K=P$9(B
         // 3.2 tdata->streamer->mutex $B$r(B lock
-        //fprintf (stderr, "stream_func(): mutex lock try.\n");
         pthread_mutex_lock(&data->streamer->mutex);
-        //fprintf (stderr, "stream_func(): mutex lock success.\n");
         // 3.3 $B0J2<$r(B tdata->streamer->stream_nr $B$N?t$@$1%k!<%W(B
         for ( i=0; i < data->streamer->stream_nr; i++ ) {
             // 3.3.1 tdata->streamer->stream_session[N]->is_valid $B$,M-8z$+3NG'(B
@@ -724,7 +706,6 @@
         pthread_mutex_unlock(&data->streamer->mutex);
         free(qbuf->data);
         free(qbuf);
-        //fprintf (stderr, "stream_func(): mutex unlock.\n");
     }
     return NULL;
 }
@@ -733,9 +714,9 @@
 show_usage(char *cmd)
 {
 #ifdef HAVE_LIBARIB25
-    fprintf(stderr, "Usage: \n%s [--b25 [--round N] [--strip] [--EMM]] [--udp [--addr hostname --port portnumber]] [--device devicefile] [--lnb voltage] [--sid SID1,SID2] [--es filename_suffix] [--start_time YYYYMMDDHHMISS] channel rectime destfile\n", cmd);
+    fprintf(stderr, "Usage: \n%s [--b25 [--round N] [--strip] [--EMM]] [--udp [--addr hostname --port portnumber]] [--device devicefile] [--lnb voltage] [--sid SID1,SID2] [--es filename_suffix] [--start_time YYYYMMDDHHMISS] [--dlna] channel rectime destfile\n", cmd);
 #else
-    fprintf(stderr, "Usage: \n%s [--strip] [--EMM]] [--udp [--addr hostname --port portnumber]] [--device devicefile] [--lnb voltage] [--sid SID1,SID2] [--es filename_suffix] [--start_time YYYYMMDDHHMISS] channel rectime destfile\n", cmd);
+    fprintf(stderr, "Usage: \n%s [--strip] [--EMM]] [--udp [--addr hostname --port portnumber]] [--device devicefile] [--lnb voltage] [--sid SID1,SID2] [--es filename_suffix] [--start_time YYYYMMDDHHMISS] [--dlna] channel rectime destfile\n", cmd);
 #endif
     fprintf(stderr, "\n");
     fprintf(stderr, "Remarks:\n");
@@ -761,6 +742,7 @@
     fprintf(stderr, "--sid SID1,SID2,...: Specify SID number in CSV format (101,102,...)\n");
     fprintf(stderr, "  --es filename:     Specify ES out filename prefix\n");
     fprintf(stderr, "  --start_time YYYYMMDDHHMISS: Specify record start datetime\n");
+    fprintf(stderr, "--dlna:              Turn on DLNA DMS(Digital Media Server)\n");
     fprintf(stderr, "--help:              Show this help\n");
     fprintf(stderr, "--version:           Show version\n");
     fprintf(stderr, "--list:              Show channel list\n");
@@ -876,7 +858,7 @@
 void
 cleanup(thread_data *tdata)
 {
-    int use_dlna = TRUE;
+    boolean use_dlna = tdata->streamer ? TRUE : FALSE;
     /* stop recording */
     ioctl(tdata->tfd, STOP_REC, 0);
 
@@ -972,7 +954,7 @@
     /* case 1: specified tuner device */
     if(device) {
         tdata->tfd = open(device, O_RDONLY);
-        tdata->device_id = get_device_id_by_name(device);
+	tdata->device_id = get_device_id_by_name(device);
         if(tdata->tfd < 0) {
             fprintf(stderr, "Cannot open tuner device: %s\n", device);
             return 1;
@@ -1163,6 +1145,8 @@
         { "es",       1, NULL, 'e'},
         { "ES",       1, NULL, 'e'},
         { "start_time",       1, NULL, 'y'},
+        { "dlna",       0, NULL, 'c'},
+        { "DLNA",       0, NULL, 'c'},
         {0, 0, NULL, 0} /* terminate */
     };
 
@@ -1171,6 +1155,8 @@
     boolean fileless = FALSE;
     boolean use_stdout = FALSE;
     boolean use_splitter = FALSE;
+    boolean use_esout = FALSE;
+    boolean use_dlna = FALSE;
     char *host_to = NULL;
     int port_to = 1234;
     sock_data *sockdata = NULL;
@@ -1181,7 +1167,7 @@
     char *es_name_prefix = NULL;
     char *start_time = NULL;
 
-    while((result = getopt_long(argc, argv, "br:smn:ua:p:d:hvli:",
+    while((result = getopt_long(argc, argv, "br:smn:ua:p:d:hvli:y:c",
                                 long_options, &option_index)) != -1) {
         switch(result) {
         case 'b':
@@ -1258,17 +1244,34 @@
             sid_list = optarg;
             break;
         case 'e':
+            use_esout = TRUE;
             es_name_prefix = optarg;
             break;
         case 'y':
             start_time = optarg;
             break;
+        case 'c':
+            use_dlna = TRUE;
+            fprintf(stderr, "enable DLNA DMS(Digital Media Server)\n");
+            break;
         }
     }
 
     if(argc - optind < 3) {
-        if(argc - optind == 2 && use_udp) {
-            fprintf(stderr, "Fileless UDP broadcasting\n");
+        if(argc - optind == 2 &&
+              (use_udp|use_dlna|use_esout) ) {
+            if ( use_udp ) {
+                fprintf(stderr, "Fileless UDP broadcasting\n");
+            }
+            else if ( use_dlna ) {
+                fprintf(stderr, "Fileless DLNA broadcasting\n");
+            }
+            else if ( use_esout ) {
+                fprintf(stderr, "Fileless ES out\n");
+            }
+            else {
+                fprintf(stderr, "Fileless...not found...\n");
+            }
             fileless = TRUE;
             tdata.wfd = -1;
         }
@@ -1332,7 +1335,6 @@
         }
     }
 
-    boolean use_dlna = TRUE;
     /* initialize DLNA */
     if(use_dlna) {
         do {
@@ -1345,8 +1347,14 @@
             tdata.streamer->stream_nr = 0;
             tdata.streamer->stream_session[0] = NULL;
             pthread_mutex_init(&tdata.streamer->mutex, NULL);
+
+            pthread_create(&stream_thread, NULL, stream_func, &tdata);
+            pthread_create(&dlna_thread, NULL, dlna_startup, NULL);
         } while(0);
     }
+    else {
+        tdata.streamer = NULL;
+    }
 
     /* initialize udp connection */
     if(use_udp) {
@@ -1389,8 +1397,6 @@
     /* spawn reader thread */
     tdata.signal_thread = signal_thread;
     pthread_create(&reader_thread, NULL, reader_func, &tdata);
-    pthread_create(&stream_thread, NULL, stream_func, &tdata);
-    pthread_create(&dlna_thread, NULL, dlna_startup, NULL);
 
     /* spawn ipc thread */
     key_t key;
@@ -1415,7 +1421,6 @@
     while(1) {
         if(f_exit)
             break;
-        //fprintf (stderr, "main() while loop start.\n");
 
         time(&cur_time);
         bufptr = malloc(sizeof(BUFSZ));
@@ -1423,25 +1428,18 @@
             f_exit = TRUE;
             break;
         }
-        //fprintf (stderr, "main() loop#1.\n");
         bufptr->size = read(tdata.tfd, bufptr->buffer, MAX_READ_SIZE);
-        //fprintf (stderr, "main() loop#2.\n");
         if(bufptr->size <= 0) {
             if((cur_time - tdata.start_time) >= tdata.recsec && !tdata.indefinite) {
-                //fprintf (stderr, "main() loop#3.\n");
                 f_exit = TRUE;
                 enqueue(p_queue, NULL);
                 break;
             }
             else {
-                //fprintf (stderr, "main() loop#4.\n");
                 continue;
             }
         }
-        //fprintf (stderr, "main() loop#5.\n");
-        //fprintf (stderr, "PT1 enqueue start.\n");
         enqueue(p_queue, bufptr);
-        //fprintf (stderr, "PT1 enqueue finish.\n");
 
         /* stop recording */
         time(&cur_time);
@@ -1477,8 +1475,9 @@
     pthread_join(stream_thread, NULL);
     pthread_join(signal_thread, NULL);
     pthread_join(ipc_thread, NULL);
-//    pthread_join(dlna_startup, NULL);
-    pthread_join(dlna_thread, NULL);
+    if ( use_dlna ) {
+        pthread_join(dlna_thread, NULL);
+    }
 
     /* close tuner */
     if(close_tuner(&tdata) != 0)