# HG changeset patch # User Naoya OYAMA # Date 1287341799 -32400 # Node ID c4e0a57773635318626896984043ca3b88d1379b # Parent 0db6ccf0fe313dd94d876d1f580a3e5d34ce1e71 add DLNA arg. (--dlna) modify waits when stream_queue is empry. diff -r 0db6ccf0fe31 -r c4e0a5777363 src/recpt1.c --- a/src/recpt1.c Thu Oct 14 00:07:05 2010 +0900 +++ b/src/recpt1.c Mon Oct 18 03:56:39 2010 +0900 @@ -222,15 +222,12 @@ { struct timeval now; struct timespec spec; - //fprintf (stderr, "enqueue() start.\n"); gettimeofday(&now, NULL); spec.tv_sec = now.tv_sec + 1; spec.tv_nsec = now.tv_usec * 1000; - //fprintf (stderr, "enqueue() mutex lock try. num_used[%d] num_avail[%d]\n", p_queue->num_used, p_queue->num_avail); pthread_mutex_lock(&p_queue->mutex); - //fprintf (stderr, "enqueue() mutex lock success. num_used[%d] num_avail[%d]\n", p_queue->num_used, p_queue->num_avail); /* entered critical section */ /* wait while queue is full */ @@ -254,7 +251,6 @@ p_queue->num_used++; /* leaving critical section */ - //fprintf (stderr, "enqueue() mutex unlock. num_used[%d]\n", p_queue->num_used); pthread_mutex_unlock(&p_queue->mutex); pthread_cond_signal(&p_queue->cond_used); } @@ -271,19 +267,15 @@ struct timeval now; struct timespec spec; int i; - //fprintf (stderr, "stream_enqueue() start.\n"); gettimeofday(&now, NULL); spec.tv_sec = now.tv_sec + 1; spec.tv_nsec = now.tv_usec * 1000; - //fprintf (stderr, "stream_enqueue() mutex lock try. num_used[%d]\n", p_queue->num_used); pthread_mutex_lock(&p_queue->mutex); - //fprintf (stderr, "stream_enqueue() mutex lock success. num_used[%d]\n", p_queue->num_used); /* entered critical section */ if (p_queue->num_avail == 0) { - //fprintf (stderr, "stream_enqueue() num_used reach max[%d].\n", p_queue->num_used); /* stream queue は一杯になったら消去してしまう */ for ( i=0; i < p_queue->size; i++ ) { if ( p_queue->buffer[i] != NULL ) { @@ -310,7 +302,6 @@ p_queue->num_used++; /* leaving critical section */ - //fprintf (stderr, "stream_enqueue() mutex unlock.\n"); pthread_mutex_unlock(&p_queue->mutex); pthread_cond_signal(&p_queue->cond_used); } @@ -323,60 +314,11 @@ struct timespec spec; BUFSZ *buffer; - //fprintf (stderr, "dequeue() start.\n"); gettimeofday(&now, NULL); spec.tv_sec = now.tv_sec + 1; spec.tv_nsec = now.tv_usec * 1000; - //fprintf (stderr, "dequeue() mutex lock try. num_used[%d]\n", p_queue->num_used); pthread_mutex_lock(&p_queue->mutex); - //fprintf (stderr, "dequeue() mutex lock success. num_used[%d]\n", p_queue->num_used); - /* entered the critical section*/ - - /* wait while queue is empty */ - while(p_queue->num_used == 0) { - pthread_cond_timedwait(&p_queue->cond_used, - &p_queue->mutex, &spec); - if(f_exit) { - pthread_mutex_unlock(&p_queue->mutex); - return NULL; - } - } - - /* take buffer address */ - buffer = p_queue->buffer[p_queue->out]; - - /* move position marker for output to next position */ - p_queue->out++; - p_queue->out %= p_queue->size; - - /* update counters */ - p_queue->num_avail++; - p_queue->num_used--; - - /* leaving the critical section */ - //fprintf (stderr, "dequeue() mutex unlock.\n"); - pthread_mutex_unlock(&p_queue->mutex); - pthread_cond_signal(&p_queue->cond_avail); - - return buffer; -} - -ARIB_STD_B25_BUFFER * -stream_dequeue(STREAM_QUEUE_T *p_queue) -{ - struct timeval now; - struct timespec spec; - ARIB_STD_B25_BUFFER *buffer; - - //fprintf (stderr, "stream_dequeue() start.\n"); - gettimeofday(&now, NULL); - spec.tv_sec = now.tv_sec + 1; - spec.tv_nsec = now.tv_usec * 1000; - - //fprintf (stderr, "stream_dequeue() mutex lock try. num_used[%d]\n", p_queue->num_used); - pthread_mutex_lock(&p_queue->mutex); - //fprintf (stderr, "stream_dequeue() mutex lock success. num_used[%d]\n", p_queue->num_used); /* entered the critical section*/ /* wait while queue is empty */ @@ -402,10 +344,56 @@ /* leaving the critical section */ pthread_mutex_unlock(&p_queue->mutex); - //fprintf (stderr, "stream_dequeue() mutex unlock.\n"); pthread_cond_signal(&p_queue->cond_avail); - //fprintf (stderr, "dequeue() finish.\n"); + + return buffer; +} + +#define QUQUE_LOW_MIN (100) +#define QUEUE_LOW_USLEEP (800*1000) +ARIB_STD_B25_BUFFER * +stream_dequeue(STREAM_QUEUE_T *p_queue) +{ + struct timeval now; + struct timespec spec; + ARIB_STD_B25_BUFFER *buffer; + + gettimeofday(&now, NULL); + spec.tv_sec = now.tv_sec + 1; + spec.tv_nsec = now.tv_usec * 1000; + + pthread_mutex_lock(&p_queue->mutex); + /* entered the critical section*/ + /* wait while queue is empty */ + while(p_queue->num_used == 0) { + pthread_cond_timedwait(&p_queue->cond_used, + &p_queue->mutex, &spec); + if(f_exit) { + pthread_mutex_unlock(&p_queue->mutex); + return NULL; + } + } + + /* take buffer address */ + buffer = p_queue->buffer[p_queue->out]; + + /* move position marker for output to next position */ + p_queue->out++; + p_queue->out %= p_queue->size; + + /* update counters */ + p_queue->num_avail++; + p_queue->num_used--; + + /* leaving the critical section */ + pthread_mutex_unlock(&p_queue->mutex); + pthread_cond_signal(&p_queue->cond_avail); + + if ( p_queue->num_used < QUQUE_LOW_MIN ) { + /* force sleep 800msec */ + usleep(QUEUE_LOW_USLEEP); + } return buffer; } @@ -422,7 +410,7 @@ boolean use_udp = data->sock_data ? TRUE : FALSE; boolean fileless = FALSE; boolean use_splitter = splitter ? TRUE : FALSE; - boolean use_streaming = TRUE; // 本当は引数にすること + boolean use_dlna = data->streamer ? TRUE : FALSE; int sfd = -1; pthread_t signal_thread = data->signal_thread; struct sockaddr_in *addr = NULL; @@ -446,12 +434,9 @@ } while(1) { -// fprintf (stderr, "reader_func() while loop\n"); ssize_t wc = 0; int file_err = 0; - //fprintf (stderr, "reader_func() dequeue() start.\n"); qbuf = dequeue(p_queue); - //fprintf (stderr, "reader_func() dequeue() finish.\n"); /* no entry in the queue */ if(qbuf == NULL) { break; @@ -526,18 +511,18 @@ * 2.2.3.1 enqueue() は tdata->stream_queue->mutex を lock/unlock して書き込み時の同時更新を防止している */ //fprintf (stderr, "reader_func() buf.size[%d]\n", buf.size); - if ( use_streaming && buf.size > 0 ) { + if ( use_dlna && buf.size > 0 ) { do { eqbuf = malloc(sizeof(ARIB_STD_B25_BUFFER)); if ( eqbuf == NULL ) { fprintf (stderr, "Cannot malloc eqbuf memory. streaming abort.\n"); - use_streaming = FALSE; + use_dlna = FALSE; break; } eqbuf->data = malloc(buf.size); if ( eqbuf->data == NULL ) { fprintf (stderr, "Cannot malloc eqbuf memory. streaming abort.\n"); - use_streaming = FALSE; + use_dlna = FALSE; break; } eqbuf->size = buf.size; @@ -683,16 +668,13 @@ qbuf = stream_dequeue(p_queue); /* no entry in the queue */ if(qbuf == NULL) { - //fprintf (stderr, "stream_func(): dequeue() return NULL pointer. streaming abort.\n"); continue; } data->streamer->total_byte += qbuf->size; // クリティカルセクション長いのなんとかしたいなぁ… // ToDo: memcpy とかクリティカルセクションの外に出す // 3.2 tdata->streamer->mutex を lock - //fprintf (stderr, "stream_func(): mutex lock try.\n"); pthread_mutex_lock(&data->streamer->mutex); - //fprintf (stderr, "stream_func(): mutex lock success.\n"); // 3.3 以下を tdata->streamer->stream_nr の数だけループ for ( i=0; i < data->streamer->stream_nr; i++ ) { // 3.3.1 tdata->streamer->stream_session[N]->is_valid が有効か確認 @@ -724,7 +706,6 @@ pthread_mutex_unlock(&data->streamer->mutex); free(qbuf->data); free(qbuf); - //fprintf (stderr, "stream_func(): mutex unlock.\n"); } return NULL; } @@ -733,9 +714,9 @@ show_usage(char *cmd) { #ifdef HAVE_LIBARIB25 - fprintf(stderr, "Usage: \n%s [--b25 [--round N] [--strip] [--EMM]] [--udp [--addr hostname --port portnumber]] [--device devicefile] [--lnb voltage] [--sid SID1,SID2] [--es filename_suffix] [--start_time YYYYMMDDHHMISS] channel rectime destfile\n", cmd); + fprintf(stderr, "Usage: \n%s [--b25 [--round N] [--strip] [--EMM]] [--udp [--addr hostname --port portnumber]] [--device devicefile] [--lnb voltage] [--sid SID1,SID2] [--es filename_suffix] [--start_time YYYYMMDDHHMISS] [--dlna] channel rectime destfile\n", cmd); #else - fprintf(stderr, "Usage: \n%s [--strip] [--EMM]] [--udp [--addr hostname --port portnumber]] [--device devicefile] [--lnb voltage] [--sid SID1,SID2] [--es filename_suffix] [--start_time YYYYMMDDHHMISS] channel rectime destfile\n", cmd); + fprintf(stderr, "Usage: \n%s [--strip] [--EMM]] [--udp [--addr hostname --port portnumber]] [--device devicefile] [--lnb voltage] [--sid SID1,SID2] [--es filename_suffix] [--start_time YYYYMMDDHHMISS] [--dlna] channel rectime destfile\n", cmd); #endif fprintf(stderr, "\n"); fprintf(stderr, "Remarks:\n"); @@ -761,6 +742,7 @@ fprintf(stderr, "--sid SID1,SID2,...: Specify SID number in CSV format (101,102,...)\n"); fprintf(stderr, " --es filename: Specify ES out filename prefix\n"); fprintf(stderr, " --start_time YYYYMMDDHHMISS: Specify record start datetime\n"); + fprintf(stderr, "--dlna: Turn on DLNA DMS(Digital Media Server)\n"); fprintf(stderr, "--help: Show this help\n"); fprintf(stderr, "--version: Show version\n"); fprintf(stderr, "--list: Show channel list\n"); @@ -876,7 +858,7 @@ void cleanup(thread_data *tdata) { - int use_dlna = TRUE; + boolean use_dlna = tdata->streamer ? TRUE : FALSE; /* stop recording */ ioctl(tdata->tfd, STOP_REC, 0); @@ -972,7 +954,7 @@ /* case 1: specified tuner device */ if(device) { tdata->tfd = open(device, O_RDONLY); - tdata->device_id = get_device_id_by_name(device); + tdata->device_id = get_device_id_by_name(device); if(tdata->tfd < 0) { fprintf(stderr, "Cannot open tuner device: %s\n", device); return 1; @@ -1163,6 +1145,8 @@ { "es", 1, NULL, 'e'}, { "ES", 1, NULL, 'e'}, { "start_time", 1, NULL, 'y'}, + { "dlna", 0, NULL, 'c'}, + { "DLNA", 0, NULL, 'c'}, {0, 0, NULL, 0} /* terminate */ }; @@ -1171,6 +1155,8 @@ boolean fileless = FALSE; boolean use_stdout = FALSE; boolean use_splitter = FALSE; + boolean use_esout = FALSE; + boolean use_dlna = FALSE; char *host_to = NULL; int port_to = 1234; sock_data *sockdata = NULL; @@ -1181,7 +1167,7 @@ char *es_name_prefix = NULL; char *start_time = NULL; - while((result = getopt_long(argc, argv, "br:smn:ua:p:d:hvli:", + while((result = getopt_long(argc, argv, "br:smn:ua:p:d:hvli:y:c", long_options, &option_index)) != -1) { switch(result) { case 'b': @@ -1258,17 +1244,34 @@ sid_list = optarg; break; case 'e': + use_esout = TRUE; es_name_prefix = optarg; break; case 'y': start_time = optarg; break; + case 'c': + use_dlna = TRUE; + fprintf(stderr, "enable DLNA DMS(Digital Media Server)\n"); + break; } } if(argc - optind < 3) { - if(argc - optind == 2 && use_udp) { - fprintf(stderr, "Fileless UDP broadcasting\n"); + if(argc - optind == 2 && + (use_udp|use_dlna|use_esout) ) { + if ( use_udp ) { + fprintf(stderr, "Fileless UDP broadcasting\n"); + } + else if ( use_dlna ) { + fprintf(stderr, "Fileless DLNA broadcasting\n"); + } + else if ( use_esout ) { + fprintf(stderr, "Fileless ES out\n"); + } + else { + fprintf(stderr, "Fileless...not found...\n"); + } fileless = TRUE; tdata.wfd = -1; } @@ -1332,7 +1335,6 @@ } } - boolean use_dlna = TRUE; /* initialize DLNA */ if(use_dlna) { do { @@ -1345,8 +1347,14 @@ tdata.streamer->stream_nr = 0; tdata.streamer->stream_session[0] = NULL; pthread_mutex_init(&tdata.streamer->mutex, NULL); + + pthread_create(&stream_thread, NULL, stream_func, &tdata); + pthread_create(&dlna_thread, NULL, dlna_startup, NULL); } while(0); } + else { + tdata.streamer = NULL; + } /* initialize udp connection */ if(use_udp) { @@ -1389,8 +1397,6 @@ /* spawn reader thread */ tdata.signal_thread = signal_thread; pthread_create(&reader_thread, NULL, reader_func, &tdata); - pthread_create(&stream_thread, NULL, stream_func, &tdata); - pthread_create(&dlna_thread, NULL, dlna_startup, NULL); /* spawn ipc thread */ key_t key; @@ -1415,7 +1421,6 @@ while(1) { if(f_exit) break; - //fprintf (stderr, "main() while loop start.\n"); time(&cur_time); bufptr = malloc(sizeof(BUFSZ)); @@ -1423,25 +1428,18 @@ f_exit = TRUE; break; } - //fprintf (stderr, "main() loop#1.\n"); bufptr->size = read(tdata.tfd, bufptr->buffer, MAX_READ_SIZE); - //fprintf (stderr, "main() loop#2.\n"); if(bufptr->size <= 0) { if((cur_time - tdata.start_time) >= tdata.recsec && !tdata.indefinite) { - //fprintf (stderr, "main() loop#3.\n"); f_exit = TRUE; enqueue(p_queue, NULL); break; } else { - //fprintf (stderr, "main() loop#4.\n"); continue; } } - //fprintf (stderr, "main() loop#5.\n"); - //fprintf (stderr, "PT1 enqueue start.\n"); enqueue(p_queue, bufptr); - //fprintf (stderr, "PT1 enqueue finish.\n"); /* stop recording */ time(&cur_time); @@ -1477,8 +1475,9 @@ pthread_join(stream_thread, NULL); pthread_join(signal_thread, NULL); pthread_join(ipc_thread, NULL); -// pthread_join(dlna_startup, NULL); - pthread_join(dlna_thread, NULL); + if ( use_dlna ) { + pthread_join(dlna_thread, NULL); + } /* close tuner */ if(close_tuner(&tdata) != 0)