Mercurial > pt1
diff recpt1/recpt1.c @ 78:5a0126d8af17
landed ipc control functionality branch
author | Yoshiki Yazawa <yaz@honeyplanet.jp> |
---|---|
date | Tue, 01 Dec 2009 20:24:22 +0900 |
parents | b6607f6e2851 |
children | 6e3bb2c0c5b6 |
line wrap: on
line diff
--- a/recpt1/recpt1.c Tue Dec 01 18:33:01 2009 +0900 +++ b/recpt1/recpt1.c Tue Dec 01 20:24:22 2009 +0900 @@ -28,30 +28,122 @@ #include "version.h" #include "mkpath.h" +#include <sys/ipc.h> +#include <sys/msg.h> + /* maximum write length at once */ #define SIZE_CHANK 1316 -/* globals */ -int f_exit = FALSE; +/* ipc message size */ +#define MSGSZ 255 /* type definitions */ +typedef int boolean; + typedef struct sock_data { int sfd; /* socket fd */ struct sockaddr_in addr; } sock_data; -typedef struct reader_thread_data { +typedef struct thread_data { QUEUE_T *queue; decoder *decoder; + decoder_options *dopt; + int ch; + int tfd; /* tuner fd */ int wfd; /* output file fd */ + ISDB_T_FREQ_CONV_TABLE *table; sock_data *sock_data; pthread_t signal_thread; -} reader_thread_data; + int recsec; + boolean indefinite; + int msqid; + time_t start_time; +} thread_data; + +typedef struct msgbuf { + long mtype; + char mtext[MSGSZ]; +} message_buf; + +/* globals */ +boolean f_exit = FALSE; + +/* prototypes */ +int tune(char *channel, thread_data *tdata, char *device); +int close_tuner(thread_data *tdata); + + +/* ipc message receive */ +void * +mq_recv(void *t) +{ + thread_data *tdata = (thread_data *)t; + message_buf rbuf; + char channel[16]; + int ch = 0, recsec = 0, time_to_add = 0; + + while(1) { + if(msgrcv(tdata->msqid, &rbuf, MSGSZ, 1, 0) < 0) { + return NULL; + } + + sscanf(rbuf.mtext, "ch=%s t=%d e=%d", channel, &recsec, &time_to_add); + ch = atoi(channel); +// fprintf(stderr, "ch=%d time=%d extend=%d\n", ch, recsec, time_to_add); + + if(ch && tdata->ch != ch) { + /* stop stream */ + ioctl(tdata->tfd, STOP_REC, 0); + + /* flush remainder? */ -typedef struct signal_thread_data { - QUEUE_T *queue; - int tfd; /* tuner fd */ -} signal_thread_data; +#if 0 + /* re-initialize decoder */ + if(tdata->decoder) { +// b25_finish(tdata->decoder); + b25_shutdown(tdata->decoder); + tdata->decoder = b25_startup(tdata->dopt); + if(!tdata->decoder) { + fprintf(stderr, "Cannot start b25 decoder\n"); + fprintf(stderr, "Fall back to encrypted recording\n"); + } + } +#endif + /* tune to new channel */ + if(close_tuner(tdata) != 0) + return NULL; + tune(channel, tdata, NULL); + + /* restart recording */ + if(ioctl(tdata->tfd, START_REC, 0) < 0) { + fprintf(stderr, "Tuner cannot start recording\n"); + return NULL; + } + } + + if(time_to_add) { + tdata->recsec += time_to_add; + fprintf(stderr, "Extended %d sec\n", time_to_add); + } + + if(recsec) { + time_t cur_time; + time(&cur_time); + if(cur_time - tdata->start_time > recsec) { + f_exit = TRUE; + } + else { + tdata->recsec = recsec; + fprintf(stderr, "Total recording time = %d sec\n", recsec); + } + } + + if(f_exit) + return NULL; + } +} + /* lookup frequency conversion table*/ ISDB_T_FREQ_CONV_TABLE * @@ -189,13 +281,13 @@ void * reader_func(void *p) { - reader_thread_data *data = (reader_thread_data *)p; + thread_data *data = (thread_data *)p; QUEUE_T *p_queue = data->queue; decoder *dec = data->decoder; int wfd = data->wfd; - int use_b25 = dec ? TRUE : FALSE; - int use_udp = data->sock_data ? TRUE : FALSE; - int fileless = FALSE; + boolean use_b25 = dec ? TRUE : FALSE; + boolean use_udp = data->sock_data ? TRUE : FALSE; + boolean fileless = FALSE; int sfd = -1; pthread_t signal_thread = data->signal_thread; struct sockaddr_in *addr = NULL; @@ -306,6 +398,11 @@ } } + time_t cur_time; + time(&cur_time); + fprintf(stderr, "Recorded %d sec\n", + (int)(cur_time - data->start_time)); + return NULL; } @@ -341,6 +438,7 @@ fprintf(stderr, "--version: Show version\n"); fprintf(stderr, "--list: Show channel list\n"); } + void show_channels(void) { @@ -437,19 +535,19 @@ P = log10(5505024/(double)rc) * 10; CNR = (0.000024 * P * P * P * P) - (0.0016 * P * P * P) + (0.0398 * P * P) + (0.5491 * P)+3.0965; - fprintf(stderr, "Signal=%fdB\n", CNR); + fprintf(stderr, "Signal = %f dB\n", CNR); } else { CNR = getsignal_isdb_s(rc); - fprintf(stderr, "Signal=%fdB\n", CNR); + fprintf(stderr, "Signal = %f dB\n", CNR); } } void -cleanup(signal_thread_data *sdata) +cleanup(thread_data *tdata) { /* stop recording */ - ioctl(sdata->tfd, STOP_REC, 0); + ioctl(tdata->tfd, STOP_REC, 0); /* restore LNB state */ #if 0 @@ -462,8 +560,8 @@ /* xxx need mutex? */ f_exit = TRUE; - pthread_cond_signal(&sdata->queue->cond_avail); - pthread_cond_signal(&sdata->queue->cond_used); + pthread_cond_signal(&tdata->queue->cond_avail); + pthread_cond_signal(&tdata->queue->cond_used); } /* will be signal handler thread */ @@ -472,8 +570,7 @@ { sigset_t waitset; int sig; - signal_thread_data *sdata; - sdata = (signal_thread_data *)data; + thread_data *tdata = (thread_data *)data; sigemptyset(&waitset); sigaddset(&waitset, SIGPIPE); @@ -487,22 +584,22 @@ switch(sig) { case SIGPIPE: fprintf(stderr, "\nSIGPIPE received. cleaning up...\n"); - cleanup(sdata); + cleanup(tdata); break; case SIGINT: fprintf(stderr, "\nSIGINT received. cleaning up...\n"); - cleanup(sdata); + cleanup(tdata); break; case SIGTERM: fprintf(stderr, "\nSIGTERM received. cleaning up...\n"); - cleanup(sdata); + cleanup(tdata); break; case SIGUSR1: /* normal exit*/ - cleanup(sdata); + cleanup(tdata); break; case SIGUSR2: /* error */ fprintf(stderr, "Detected an error. cleaning up...\n"); - cleanup(sdata); + cleanup(tdata); break; } @@ -510,7 +607,7 @@ } void -init_signal_handlers(pthread_t *signal_thread, signal_thread_data *sdata) +init_signal_handlers(pthread_t *signal_thread, thread_data *tdata) { sigset_t blockset; @@ -524,31 +621,188 @@ if(pthread_sigmask(SIG_BLOCK, &blockset, NULL)) fprintf(stderr, "pthread_sigmask() failed.\n"); - pthread_create(signal_thread, NULL, process_signals, sdata); + pthread_create(signal_thread, NULL, process_signals, tdata); +} + +int +tune(char *channel, thread_data *tdata, char *device) +{ + char **tuner; + int num_devs; + int lp; + FREQUENCY freq; + + /* get channel */ + tdata->table = searchrecoff(channel); + if(tdata->table == NULL) { + fprintf(stderr, "Invalid Channel: %s\n", channel); + return 1; + } + + freq.frequencyno = tdata->table->set_freq; + freq.slot = tdata->table->add_freq; + + /* open tuner */ + /* case 1: specified tuner device */ + if(device) { + tdata->tfd = open(device, O_RDONLY); + if(tdata->tfd < 0) { + fprintf(stderr, "Cannot open tuner device: %s\n", device); + return 1; + } + + /* power on LNB */ + if(tdata->table->type == CHTYPE_SATELLITE) { + if(ioctl(tdata->tfd, LNB_ENABLE, 0) < 0) { + close(tdata->tfd); + fprintf(stderr, "Power on LNB failed: %s\n", device); + return 1; + } + } + + /* tune to specified channel */ + if(ioctl(tdata->tfd, SET_CHANNEL, &freq) < 0) { + close(tdata->tfd); + fprintf(stderr, "Cannot tune to the specified channel: %s\n", device); + return 1; + } + else { + tdata->ch = atoi(channel); + } + } + else { + /* case 2: loop around available devices */ + if(tdata->table->type == CHTYPE_SATELLITE) { + tuner = bsdev; + num_devs = NUM_BSDEV; + } + else { + tuner = isdb_t_dev; + num_devs = NUM_ISDB_T_DEV; + } + + for(lp = 0; lp < num_devs; lp++) { + tdata->tfd = open(tuner[lp], O_RDONLY); + if(tdata->tfd >= 0) { + /* power on LNB */ + if(tdata->table->type == CHTYPE_SATELLITE) { + if(ioctl(tdata->tfd, LNB_ENABLE, 0) < 0) { + close(tdata->tfd); + tdata->tfd = -1; + continue; + } + } + + /* tune to specified channel */ + if(ioctl(tdata->tfd, SET_CHANNEL, &freq) < 0) { + close(tdata->tfd); + tdata->tfd = -1; + continue; + } + + break; /* found suitable tuner */ + } + } + + /* all tuners cannot be used */ + if(tdata->tfd < 0) { + fprintf(stderr, "Cannot tune to the specified channel\n"); + return 1; + } + else { + tdata->ch = atoi(channel); + } + } + + /* show signal strength */ + calc_cn(tdata->tfd, tdata->table->type); + + return 0; /* success */ +} + +int +parse_time(char *rectimestr, thread_data *tdata) +{ + /* indefinite */ + if(!strcmp("-", rectimestr)) { + tdata->indefinite = TRUE; + tdata->recsec = -1; + } + /* colon */ + else if(strchr(rectimestr, ':')) { + int n1, n2, n3; + if(sscanf(rectimestr, "%d:%d:%d", &n1, &n2, &n3) == 3) + tdata->recsec = n1 * 3600 + n2 * 60 + n3; + else if(sscanf(rectimestr, "%d:%d", &n1, &n2) == 2) + tdata->recsec = n1 * 3600 + n2 * 60; + } + /* HMS */ + else { + char *tmpstr; + char *p1, *p2; + + tmpstr = strdup(rectimestr); + p1 = tmpstr; + while(*p1 && !isdigit(*p1)) + p1++; + + /* hour */ + if((p2 = strchr(p1, 'H')) || (p2 = strchr(p1, 'h'))) { + *p2 = '\0'; + tdata->recsec += atoi(p1) * 3600; + p1 = p2 + 1; + while(*p1 && !isdigit(*p1)) + p1++; + } + + /* minute */ + if((p2 = strchr(p1, 'M')) || (p2 = strchr(p1, 'm'))) { + *p2 = '\0'; + tdata->recsec += atoi(p1) * 60; + p1 = p2 + 1; + while(*p1 && !isdigit(*p1)) + p1++; + } + + /* second */ + tdata->recsec += atoi(p1); + + free(tmpstr); + } + + return 0; /* success */ +} + +int +close_tuner(thread_data *tdata) +{ + if(tdata->table->type == CHTYPE_SATELLITE) { + if(ioctl(tdata->tfd, LNB_DISABLE, 0) < 0) { + return 1; + } + } + close(tdata->tfd); + + return 0; } int main(int argc, char **argv) { - int tfd, wfd; - int lp; - int recsec = 0; - int indefinite = FALSE; - time_t start_time, cur_time; - FREQUENCY freq; - ISDB_T_FREQ_CONV_TABLE *ptr; + time_t cur_time; + pthread_t signal_thread; pthread_t reader_thread; - pthread_t signal_thread; + pthread_t ipc_thread; QUEUE_T *p_queue = create_queue(MAX_QUEUE); BUFSZ *bufptr; decoder *dec = NULL; - static reader_thread_data tdata; - static signal_thread_data sdata; + static thread_data tdata; decoder_options dopt = { 4, /* round */ 0, /* strip */ 0 /* emm */ }; + tdata.dopt = &dopt; int result; int option_index; @@ -571,16 +825,14 @@ {0, 0, NULL, 0} /* terminate */ }; - int use_b25 = FALSE; - int use_udp = FALSE; - int fileless = FALSE; - int use_stdout = FALSE; + boolean use_b25 = FALSE; + boolean use_udp = FALSE; + boolean fileless = FALSE; + boolean use_stdout = FALSE; char *host_to = NULL; int port_to = 1234; sock_data *sockdata = NULL; char *device = NULL; - char **tuner; - int num_devs; while((result = getopt_long(argc, argv, "br:smua:p:d:hvl", long_options, &option_index)) != -1) { @@ -646,7 +898,7 @@ if(argc - optind == 2 && use_udp) { fprintf(stderr, "Fileless UDP broadcasting\n"); fileless = TRUE; - wfd = -1; + tdata.wfd = -1; } else { fprintf(stderr, "Arguments are necessary!\n"); @@ -655,142 +907,21 @@ } } - /* get channel */ - ptr = searchrecoff(argv[optind]); - if(ptr == NULL) { - fprintf(stderr, "Invalid Channel: %s\n", argv[optind]); - return 1; - } - - freq.frequencyno = ptr->set_freq; - freq.slot = ptr->add_freq; - - /* open tuner */ - /* case 1: specified tuner device */ - if(device) { - tfd = open(device, O_RDONLY); - if(tfd < 0) { - fprintf(stderr, "Cannot open tuner device: %s\n", device); - return 1; - } - - /* power on LNB */ - if(ptr->type == CHTYPE_SATELLITE) { - if(ioctl(tfd, LNB_ENABLE, 0) < 0) { - close(tfd); - fprintf(stderr, "Power on LNB failed: %s\n", device); - return 1; - } - } - - /* tune to specified channel */ - if(ioctl(tfd, SET_CHANNEL, &freq) < 0) { - close(tfd); - fprintf(stderr, "Cannot tune to the specified channel: %s\n", device); - return 1; - } - } - else { - /* case 2: loop around available devices */ - if(ptr->type == CHTYPE_SATELLITE) { - tuner = bsdev; - num_devs = NUM_BSDEV; - } - else { - tuner = isdb_t_dev; - num_devs = NUM_ISDB_T_DEV; - } - - for(lp = 0; lp < num_devs; lp++) { - tfd = open(tuner[lp], O_RDONLY); - if(tfd >= 0) { - /* power on LNB */ - if(ptr->type == CHTYPE_SATELLITE) { - if(ioctl(tfd, LNB_ENABLE, 0) < 0) { - close(tfd); - tfd = -1; - continue; - } - } - - /* tune to specified channel */ - if(ioctl(tfd, SET_CHANNEL, &freq) < 0) { - close(tfd); - tfd = -1; - continue; - } + fprintf(stderr, "pid = %d\n", getpid()); - break; /* found suitable tuner */ - } - } - - /* all tuners cannot be used */ - if(tfd < 0) { - fprintf(stderr, "Cannot tune to the specified channel\n"); - return 1; - } - } - - /* show signal strength */ - calc_cn(tfd, ptr->type); - - /* get recsec */ - char *rectimestr = argv[optind + 1]; + /* tune */ + if(tune(argv[optind], &tdata, device) != 0) + return 1; - /* indefinite */ - if(!strcmp("-", rectimestr)) { - indefinite = TRUE; - recsec = -1; - } - /* colon */ - else if(strchr(rectimestr, ':')) { - int n1, n2, n3; - if(sscanf(rectimestr, "%d:%d:%d", &n1, &n2, &n3) == 3) - recsec = n1 * 3600 + n2 * 60 + n3; - else if(sscanf(rectimestr, "%d:%d", &n1, &n2) == 2) - recsec = n1 * 3600 + n2 * 60; - } - /* HMS */ - else { - char *tmpstr; - char *p1, *p2; - - tmpstr = strdup(rectimestr); - p1 = tmpstr; - while(*p1 && !isdigit(*p1)) - p1++; - - /* hour */ - if((p2 = strchr(p1, 'H')) || (p2 = strchr(p1, 'h'))) { - *p2 = '\0'; - recsec += atoi(p1) * 3600; - p1 = p2 + 1; - while(*p1 && !isdigit(*p1)) - p1++; - } - - /* minute */ - if((p2 = strchr(p1, 'M')) || (p2 = strchr(p1, 'm'))) { - *p2 = '\0'; - recsec += atoi(p1) * 60; - p1 = p2 + 1; - while(*p1 && !isdigit(*p1)) - p1++; - } - - /* second */ - recsec += atoi(p1); - - free(tmpstr); - } - - /* fprintf(stderr, "recsec = %d\n", recsec); */ + /* set recsec */ + if(parse_time(argv[optind + 1], &tdata) != 0) + return 1; /* open output file */ char *destfile = argv[optind + 2]; if(destfile && !strcmp("-", destfile)) { use_stdout = TRUE; - wfd = 1; /* stdout */ + tdata.wfd = 1; /* stdout */ } else { if(!fileless) { @@ -802,8 +933,8 @@ perror("mkpath"); free(path); - wfd = open(argv[optind + 2], (O_RDWR | O_CREAT | O_TRUNC), 0666); - if(wfd < 0) { + tdata.wfd = open(argv[optind + 2], (O_RDWR | O_CREAT | O_TRUNC), 0666); + if(tdata.wfd < 0) { fprintf(stderr, "Cannot open output file: %s\n", argv[optind + 2]); return 1; @@ -850,28 +981,36 @@ } } + /* prepare thread data */ + tdata.queue = p_queue; + tdata.decoder = dec; + tdata.sock_data = sockdata; + /* spawn signal handler thread */ - sdata.queue = p_queue; - sdata.tfd = tfd; - init_signal_handlers(&signal_thread, &sdata); + init_signal_handlers(&signal_thread, &tdata); /* spawn reader thread */ - tdata.queue = p_queue; - tdata.decoder = dec; - tdata.wfd = wfd; - tdata.sock_data = sockdata; tdata.signal_thread = signal_thread; pthread_create(&reader_thread, NULL, reader_func, &tdata); + /* spawn ipc thread */ + key_t key; + key = (key_t)getpid(); + + if ((tdata.msqid = msgget(key, IPC_CREAT | 0666)) < 0) { + perror("msgget"); + } + pthread_create(&ipc_thread, NULL, mq_recv, &tdata); + /* start recording */ - if(ioctl(tfd, START_REC, 0) < 0) { + if(ioctl(tdata.tfd, START_REC, 0) < 0) { fprintf(stderr, "Tuner cannot start recording\n"); return 1; } fprintf(stderr, "Recording...\n"); - time(&start_time); + time(&tdata.start_time); /* read from tuner */ while(1) { @@ -880,9 +1019,9 @@ time(&cur_time); bufptr = malloc(sizeof(BUFSZ)); - bufptr->size = read(tfd, bufptr->buffer, MAX_READ_SIZE); + bufptr->size = read(tdata.tfd, bufptr->buffer, MAX_READ_SIZE); if(bufptr->size <= 0) { - if((cur_time - start_time) >= recsec && !indefinite) { + if((cur_time - tdata.start_time) >= tdata.recsec && !tdata.indefinite) { f_exit = TRUE; enqueue(p_queue, NULL); break; @@ -894,12 +1033,13 @@ enqueue(p_queue, bufptr); /* stop recording */ - if((cur_time - start_time) >= recsec && !indefinite) { - ioctl(tfd, STOP_REC, 0); + time(&cur_time); + if((cur_time - tdata.start_time) >= tdata.recsec && !tdata.indefinite) { + ioctl(tdata.tfd, STOP_REC, 0); /* read remaining data */ while(1) { bufptr = malloc(sizeof(BUFSZ)); - bufptr->size = read(tfd, bufptr->buffer, MAX_READ_SIZE); + bufptr->size = read(tdata.tfd, bufptr->buffer, MAX_READ_SIZE); if(bufptr->size <= 0) { f_exit = TRUE; enqueue(p_queue, NULL); @@ -911,26 +1051,26 @@ } } + /* delete message queue*/ + msgctl(tdata.msqid, IPC_RMID, NULL); + pthread_kill(signal_thread, SIGUSR1); /* wait for threads */ pthread_join(reader_thread, NULL); pthread_join(signal_thread, NULL); + pthread_join(ipc_thread, NULL); /* close tuner */ - if(ptr->type == CHTYPE_SATELLITE) { - if(ioctl(tfd, LNB_DISABLE, 0) < 0) { - return 0 ; - } - } - close(tfd); + if(close_tuner(&tdata) != 0) + return 1; /* release queue */ destroy_queue(p_queue); /* close output file */ if(!use_stdout) - close(wfd); + close(tdata.wfd); /* free socket data */ if(use_udp) {