comparison recpt1/recpt1.c @ 18:84ff6ef710ea

- support stdout for output - changed representation for indefinite recording to '-' - handles EPIPE error
author Yoshiki Yazawa <yaz@honeyplanet.jp>
date Wed, 25 Feb 2009 17:22:31 +0900
parents 52c7c7c64ba6
children b63f5c100e5a
comparison
equal deleted inserted replaced
17:52c7c7c64ba6 18:84ff6ef710ea
8 #include <pthread.h> 8 #include <pthread.h>
9 #include <math.h> 9 #include <math.h>
10 #include <unistd.h> 10 #include <unistd.h>
11 #include <getopt.h> 11 #include <getopt.h>
12 #include <signal.h> 12 #include <signal.h>
13 #include <errno.h>
14 #include <sys/time.h>
13 15
14 #include <netdb.h> 16 #include <netdb.h>
15 #include <arpa/inet.h> 17 #include <arpa/inet.h>
16 #include <netinet/in.h> 18 #include <netinet/in.h>
17 19
27 typedef struct sock_data { 29 typedef struct sock_data {
28 int sfd; /* socket fd */ 30 int sfd; /* socket fd */
29 struct sockaddr_in addr; 31 struct sockaddr_in addr;
30 } sock_data; 32 } sock_data;
31 33
32 typedef struct thread_data { 34 typedef struct reader_thread_data {
33 QUEUE_T *queue; 35 QUEUE_T *queue;
34 decoder *decoder; 36 decoder *decoder;
35 int wfd; /* output file fd */ 37 int wfd; /* output file fd */
36 sock_data *sock_data; 38 sock_data *sock_data;
37 } thread_data; 39 pthread_t signal_thread;
40 } reader_thread_data;
41
42 typedef struct signal_thread_data {
43 QUEUE_T *queue;
44 int tfd; /* tuner fd */
45 } signal_thread_data;
38 46
39 /* lookup frequency conversion table*/ 47 /* lookup frequency conversion table*/
40 ISDB_T_FREQ_CONV_TABLE * 48 ISDB_T_FREQ_CONV_TABLE *
41 searchrecoff(char *channel) 49 searchrecoff(char *channel)
42 { 50 {
62 70
63 p_queue = (QUEUE_T*)calloc(memsize, sizeof(char)); 71 p_queue = (QUEUE_T*)calloc(memsize, sizeof(char));
64 72
65 if(p_queue != NULL) { 73 if(p_queue != NULL) {
66 p_queue->size = size; 74 p_queue->size = size;
67 p_queue->no_full = size; 75 p_queue->num_avail = size;
68 p_queue->no_empty = 0; 76 p_queue->num_used = 0;
69 pthread_mutex_init(&p_queue->mutex, NULL); 77 pthread_mutex_init(&p_queue->mutex, NULL);
70 pthread_cond_init(&p_queue->cond_full, NULL); 78 pthread_cond_init(&p_queue->cond_avail, NULL);
71 pthread_cond_init(&p_queue->cond_empty, NULL); 79 pthread_cond_init(&p_queue->cond_used, NULL);
72 } 80 }
73 81
74 return p_queue; 82 return p_queue;
75 } 83 }
76 84
79 { 87 {
80 if(!p_queue) 88 if(!p_queue)
81 return; 89 return;
82 90
83 pthread_mutex_destroy(&p_queue->mutex); 91 pthread_mutex_destroy(&p_queue->mutex);
84 pthread_cond_destroy(&p_queue->cond_full); 92 pthread_cond_destroy(&p_queue->cond_avail);
85 pthread_cond_destroy(&p_queue->cond_empty); 93 pthread_cond_destroy(&p_queue->cond_used);
86 free(p_queue); 94 free(p_queue);
87 } 95 }
88 96
89 /* enqueue data. this function will block if queue is full. */ 97 /* enqueue data. this function will block if queue is full. */
90 void 98 void
91 enqueue(QUEUE_T *p_queue, BUFSZ *data) 99 enqueue(QUEUE_T *p_queue, BUFSZ *data)
92 { 100 {
101 struct timeval now;
102 struct timespec spec;
103
104 gettimeofday(&now, NULL);
105 spec.tv_sec = now.tv_sec + 1;
106 spec.tv_nsec = now.tv_usec * 1000;
107
93 pthread_mutex_lock(&p_queue->mutex); 108 pthread_mutex_lock(&p_queue->mutex);
94 /* entered critical section */ 109 /* entered critical section */
95 110
96 /* wait until queue is not full */ 111 /* wait while queue is full */
97 while(!p_queue->no_full) { 112 while(p_queue->num_avail == 0) {
98 pthread_cond_wait(&p_queue->cond_full, &p_queue->mutex); 113 pthread_cond_timedwait(&p_queue->cond_avail,
99 fprintf(stderr, "Full\n"); 114 &p_queue->mutex, &spec);
115 if(f_exit)
116 return;
100 } 117 }
101 118
102 p_queue->buffer[p_queue->in] = data; 119 p_queue->buffer[p_queue->in] = data;
103 120
121 /* move position marker for input to next position */
104 p_queue->in++; 122 p_queue->in++;
105 p_queue->in %= p_queue->size; 123 p_queue->in %= p_queue->size;
106 124
107 p_queue->no_full--; 125 /* update counters */
108 p_queue->no_empty++; 126 p_queue->num_avail--;
127 p_queue->num_used++;
109 128
110 /* leaving critical section */ 129 /* leaving critical section */
111 pthread_mutex_unlock(&p_queue->mutex); 130 pthread_mutex_unlock(&p_queue->mutex);
112 pthread_cond_signal(&p_queue->cond_empty); 131 pthread_cond_signal(&p_queue->cond_used);
113 } 132 }
114 133
115 /* dequeue data. this function will block if queue is empty. */ 134 /* dequeue data. this function will block if queue is empty. */
116 BUFSZ * 135 BUFSZ *
117 dequeue(QUEUE_T *p_queue) 136 dequeue(QUEUE_T *p_queue)
118 { 137 {
138 struct timeval now;
139 struct timespec spec;
119 BUFSZ *buffer; 140 BUFSZ *buffer;
141
142 gettimeofday(&now, NULL);
143 spec.tv_sec = now.tv_sec + 1;
144 spec.tv_nsec = now.tv_usec * 1000;
120 145
121 pthread_mutex_lock(&p_queue->mutex); 146 pthread_mutex_lock(&p_queue->mutex);
122 /* entered the critical section*/ 147 /* entered the critical section*/
123 148
124 /* wait until queue is filled */ 149 /* wait while queue is empty */
125 while (!p_queue->no_empty) { 150 while(p_queue->num_used == 0) {
126 pthread_cond_wait(&p_queue->cond_empty, &p_queue->mutex); 151 pthread_cond_timedwait(&p_queue->cond_avail,
152 &p_queue->mutex, &spec);
153 if(f_exit)
154 return NULL;
127 } 155 }
128 156
129 /* take buffer address */ 157 /* take buffer address */
130 buffer = p_queue->buffer[p_queue->out]; 158 buffer = p_queue->buffer[p_queue->out];
131 159
132 /* move location marker to next position */ 160 /* move position marker for output to next position */
133 p_queue->out++; 161 p_queue->out++;
134 p_queue->out %= p_queue->size; 162 p_queue->out %= p_queue->size;
135 163
136 /* update flags */ 164 /* update counters */
137 p_queue->no_full++; 165 p_queue->num_avail++;
138 p_queue->no_empty--; 166 p_queue->num_used--;
139 167
140 /* leaving the critical section */ 168 /* leaving the critical section */
141 pthread_mutex_unlock(&p_queue->mutex); 169 pthread_mutex_unlock(&p_queue->mutex);
142 pthread_cond_signal(&p_queue->cond_full); 170 pthread_cond_signal(&p_queue->cond_avail);
143 171
144 return buffer; 172 return buffer;
145 } 173 }
146 174
147 /* this function will be reader thread */ 175 /* this function will be reader thread */
148 void * 176 void *
149 write_func(void *p) 177 reader_func(void *p)
150 { 178 {
151 thread_data *data = (thread_data *)p; 179 reader_thread_data *data = (reader_thread_data *)p;
152 QUEUE_T *p_queue = data->queue; 180 QUEUE_T *p_queue = data->queue;
153 decoder *dec = data->decoder; 181 decoder *dec = data->decoder;
154 int wfd = data->wfd; 182 int wfd = data->wfd;
155 int use_b25 = dec ? TRUE : FALSE; 183 int use_b25 = dec ? TRUE : FALSE;
156 int use_udp = data->sock_data ? TRUE : FALSE; 184 int use_udp = data->sock_data ? TRUE : FALSE;
157 int fileless = FALSE; 185 int fileless = FALSE;
158 int sfd = -1; 186 int sfd = -1;
187 pthread_t signal_thread = data->signal_thread;
159 struct sockaddr *addr = NULL; 188 struct sockaddr *addr = NULL;
160 BUFSZ *buf; 189 BUFSZ *buf;
161 ARIB_STD_B25_BUFFER sbuf, dbuf; 190 ARIB_STD_B25_BUFFER sbuf, dbuf;
162 int code; 191 int code;
163 192
203 } 232 }
204 free(buf); 233 free(buf);
205 } 234 }
206 235
207 /* normal exit */ 236 /* normal exit */
208 if((f_exit) && (!p_queue->no_empty)) { 237 if((f_exit) && (!p_queue->num_used)) {
209 if(use_b25) { 238 if(use_b25) {
210 code = b25_finish(dec, &sbuf, &dbuf); 239 code = b25_finish(dec, &sbuf, &dbuf);
211 if(code < 0) { 240 if(code < 0) {
212 fprintf(stderr, "b25_finish failed\n"); 241 fprintf(stderr, "b25_finish failed\n");
213 close(sfd);
214 break; 242 break;
215 } 243 }
216 244
217 if(use_udp && sfd != -1) { 245 if(use_udp && sfd != -1) {
218 sendto(sfd, dbuf.data, dbuf.size, 0, 246 sendto(sfd, dbuf.data, dbuf.size, 0,
219 addr, sizeof(struct sockaddr_in)); 247 addr, sizeof(struct sockaddr_in));
220 } 248 }
221 } 249 }
222 close(sfd);
223 break; 250 break;
224 } 251 }
225 } /* fileless */ 252 } /* end of fileless */
226 else { 253 else {
227 254
255 ssize_t wc;
228 buf = dequeue(p_queue); 256 buf = dequeue(p_queue);
229 /* no entry in the queue */ 257 /* no entry in the queue */
230 if(buf == NULL) { 258 if(buf == NULL) {
231 close(wfd);
232 break; 259 break;
233 } 260 }
234 261
235 sbuf.data = buf->buffer; 262 sbuf.data = buf->buffer;
236 sbuf.size = buf->size; 263 sbuf.size = buf->size;
238 if(use_b25) { 265 if(use_b25) {
239 /* write data to output file*/ 266 /* write data to output file*/
240 code = b25_decode(dec, &sbuf, &dbuf); 267 code = b25_decode(dec, &sbuf, &dbuf);
241 if(code < 0) { 268 if(code < 0) {
242 fprintf(stderr, "b25_decode failed\n"); 269 fprintf(stderr, "b25_decode failed\n");
243 close(wfd);
244 break; 270 break;
245 } 271 }
246 write(wfd, dbuf.data, dbuf.size); 272 wc = write(wfd, dbuf.data, dbuf.size);
273 if(wc <= 0 && errno == EPIPE) {
274 pthread_kill(signal_thread, SIGPIPE);
275 }
247 276
248 if(use_udp && sfd != -1) { 277 if(use_udp && sfd != -1) {
249 sendto(sfd, dbuf.data, dbuf.size, 0, 278 sendto(sfd, dbuf.data, dbuf.size, 0,
250 addr, sizeof(struct sockaddr_in)); 279 addr, sizeof(struct sockaddr_in));
251 } 280 }
252 free(buf); 281 free(buf);
253 } 282 }
254 else { 283 else {
255 write(wfd, sbuf.data, sbuf.size); 284 wc = write(wfd, sbuf.data, sbuf.size);
285 if(wc <= 0 && errno == EPIPE) {
286 pthread_kill(signal_thread, SIGPIPE);
287 }
256 288
257 if(use_udp && sfd != -1) { 289 if(use_udp && sfd != -1) {
258 sendto(sfd, sbuf.data, sbuf.size, 0, 290 sendto(sfd, sbuf.data, sbuf.size, 0,
259 addr, sizeof(struct sockaddr_in)); 291 addr, sizeof(struct sockaddr_in));
260 } 292 }
261 free(buf); 293 free(buf);
262 } 294 }
263 295
264 /* normal exit */ 296 /* normal exit */
265 if((f_exit) && (!p_queue->no_empty)) { 297 if((f_exit) && (!p_queue->num_used)) {
266 if(use_b25) { 298 if(use_b25) {
267 code = b25_finish(dec, &sbuf, &dbuf); 299 code = b25_finish(dec, &sbuf, &dbuf);
268 if(code < 0) { 300 if(code < 0) {
269 fprintf(stderr, "b25_finish failed\n"); 301 fprintf(stderr, "b25_finish failed\n");
270 close(wfd);
271 close(sfd);
272 break; 302 break;
273 } 303 }
274 write(wfd, dbuf.data, dbuf.size); 304 wc = write(wfd, dbuf.data, dbuf.size);
305 if(wc <= 0 && errno == EPIPE) {
306 pthread_kill(signal_thread, SIGPIPE);
307 }
275 308
276 if(use_udp && sfd != -1) { 309 if(use_udp && sfd != -1) {
277 sendto(sfd, dbuf.data, dbuf.size, 0, 310 sendto(sfd, dbuf.data, dbuf.size, 0,
278 addr, sizeof(struct sockaddr_in)); 311 addr, sizeof(struct sockaddr_in));
279 } 312 }
280 } 313 }
281 close(wfd);
282 close(sfd);
283 break; 314 break;
284 } 315 }
285 } 316 }
286 } 317 }
287 318
291 void 322 void
292 show_usage(char *cmd) 323 show_usage(char *cmd)
293 { 324 {
294 fprintf(stderr, "\n"); 325 fprintf(stderr, "\n");
295 fprintf(stderr, "Usage: \n%s [--b25 [--round N] [--strip] [--EMM]] [--udp [--addr hostname --port portnumber]] channel recsec destfile\n", cmd); 326 fprintf(stderr, "Usage: \n%s [--b25 [--round N] [--strip] [--EMM]] [--udp [--addr hostname --port portnumber]] channel recsec destfile\n", cmd);
327 fprintf(stderr, "\n");
328 fprintf(stderr, "Remarks:\n");
329 fprintf(stderr, "if recsec is '-', records indefinitely.\n");
330 fprintf(stderr, "if destfile is '-', stdout is used for output.\n");
296 fprintf(stderr, "\n"); 331 fprintf(stderr, "\n");
297 } 332 }
298 333
299 void 334 void
300 show_options(void) 335 show_options(void)
399 printf("Signal=%fdB\n", CNR); 434 printf("Signal=%fdB\n", CNR);
400 } 435 }
401 } 436 }
402 437
403 void 438 void
404 cleanup(int tfd) 439 cleanup(signal_thread_data *sdata)
405 { 440 {
406 /* stop recording */ 441 /* stop recording */
407 ioctl(tfd, STOP_REC, 0); 442 ioctl(sdata->tfd, STOP_REC, 0);
408 443
409 /* restore LNB state */ 444 /* restore LNB state */
410 #if 0 445 #if 0
411 if(ptr->type == CHTYPE_SATELLITE) { 446 if(ptr->type == CHTYPE_SATELLITE) {
412 if(ioctl(tfd, LNB_DISABLE, 0) < 0) { 447 if(ioctl(tfd, LNB_DISABLE, 0) < 0) {
413 return 0 ; 448 return 0 ;
414 } 449 }
415 } 450 }
416 #endif 451 #endif
417 /* xxx really need mutex? */ 452 /* xxx need mutex? */
418 f_exit = TRUE; 453 f_exit = TRUE;
454
455 pthread_cond_signal(&sdata->queue->cond_avail);
456 pthread_cond_signal(&sdata->queue->cond_used);
419 } 457 }
420 458
421 /* will be signal handler thread */ 459 /* will be signal handler thread */
422 void * 460 void *
423 process_signals(void *data) 461 process_signals(void *data)
424 { 462 {
425 sigset_t waitset; 463 sigset_t waitset;
426 int sig; 464 int sig;
427 int tfd = *(int *)data; 465 signal_thread_data *sdata;
466 sdata = (signal_thread_data *)data;
428 467
429 sigemptyset(&waitset); 468 sigemptyset(&waitset);
469 sigaddset(&waitset, SIGPIPE);
430 sigaddset(&waitset, SIGINT); 470 sigaddset(&waitset, SIGINT);
431 sigaddset(&waitset, SIGTERM); 471 sigaddset(&waitset, SIGTERM);
432 472
433 sigwait(&waitset, &sig); 473 sigwait(&waitset, &sig);
434 474
435 switch(sig) { 475 switch(sig) {
476 case SIGPIPE:
477 fprintf(stderr, "\nSIGPIPE received. cleaning up...\n");
478 cleanup(sdata);
479 break;
436 case SIGINT: 480 case SIGINT:
437 fprintf(stderr, "\nSIGINT received. cleaning up...\n"); 481 fprintf(stderr, "\nSIGINT received. cleaning up...\n");
438 cleanup(tfd); 482 cleanup(sdata);
439 break; 483 break;
440 case SIGTERM: 484 case SIGTERM:
441 fprintf(stderr, "\nSIGTERM received. cleaning up...\n"); 485 fprintf(stderr, "\nSIGTERM received. cleaning up...\n");
442 cleanup(tfd); 486 cleanup(sdata);
443 break; 487 break;
444 } 488 }
445 489
446 return NULL; /* dummy */ 490 return NULL; /* dummy */
447 } 491 }
448 492
449 void 493 void
450 init_signal_handlers(pthread_t *signal_thread, int tfd) 494 init_signal_handlers(pthread_t *signal_thread, signal_thread_data *sdata)
451 { 495 {
452 sigset_t blockset; 496 sigset_t blockset;
453 static int tunerfd;
454
455 tunerfd = tfd;
456 497
457 sigemptyset(&blockset); 498 sigemptyset(&blockset);
499 sigaddset(&blockset, SIGPIPE);
458 sigaddset(&blockset, SIGINT); 500 sigaddset(&blockset, SIGINT);
459 sigaddset(&blockset, SIGTERM); 501 sigaddset(&blockset, SIGTERM);
460 502
461 if(pthread_sigmask(SIG_BLOCK, &blockset, NULL)) 503 if(pthread_sigmask(SIG_BLOCK, &blockset, NULL))
462 fprintf(stderr, "pthread_sigmask() failed.\n"); 504 fprintf(stderr, "pthread_sigmask() failed.\n");
463 505
464 pthread_create(signal_thread, NULL, process_signals, &tunerfd); 506 pthread_create(signal_thread, NULL, process_signals, sdata);
465 } 507 }
466 508
467 int 509 int
468 main(int argc, char **argv) 510 main(int argc, char **argv)
469 { 511 {
477 pthread_t dequeue_thread; 519 pthread_t dequeue_thread;
478 pthread_t signal_thread; 520 pthread_t signal_thread;
479 QUEUE_T *p_queue = create_queue(MAX_QUEUE); 521 QUEUE_T *p_queue = create_queue(MAX_QUEUE);
480 BUFSZ *bufptr; 522 BUFSZ *bufptr;
481 decoder *dec = NULL; 523 decoder *dec = NULL;
482 thread_data tdata; 524 static reader_thread_data tdata;
525 static signal_thread_data sdata;
483 decoder_options dopt = { 526 decoder_options dopt = {
484 4, /* round */ 527 4, /* round */
485 0, /* strip */ 528 0, /* strip */
486 0 /* emm */ 529 0 /* emm */
487 }; 530 };
497 { "EMM", 0, NULL, 'm'}, 540 { "EMM", 0, NULL, 'm'},
498 { "udp", 0, NULL, 'u'}, 541 { "udp", 0, NULL, 'u'},
499 { "addr", 1, NULL, 'a'}, 542 { "addr", 1, NULL, 'a'},
500 { "port", 1, NULL, 'p'}, 543 { "port", 1, NULL, 'p'},
501 { "help", 0, NULL, 'h'}, 544 { "help", 0, NULL, 'h'},
502 {0, 0, 0, 0} /* terminate */ 545 {0, 0, NULL, 0} /* terminate */
503 }; 546 };
504 547
505 int use_b25 = FALSE; 548 int use_b25 = FALSE;
506 int use_udp = FALSE; 549 int use_udp = FALSE;
507 int fileless = FALSE; 550 int fileless = FALSE;
551 int use_stdout = FALSE;
508 char *host_to = NULL; 552 char *host_to = NULL;
509 int port_to = 1234; 553 int port_to = 1234;
510 sock_data *sdata = NULL; 554 sock_data *sockdata = NULL;
511 555
512 while((result = getopt_long(argc, argv, "br:smua:p:h", 556 while((result = getopt_long(argc, argv, "br:smua:p:h",
513 long_options, &option_index)) != -1) { 557 long_options, &option_index)) != -1) {
514 switch(result) { 558 switch(result) {
515 case 'b': 559 case 'b':
599 return 1; 643 return 1;
600 } 644 }
601 } 645 }
602 646
603 /* get recsec */ 647 /* get recsec */
604 recsec = atoi(argv[optind + 1]); 648 char *recsecstr = argv[optind + 1];
605 if(recsec <= 0) 649 if(!strcmp("-", recsecstr)) {
606 indefinite = TRUE; 650 indefinite = TRUE;
651 recsec = -1;
652 }
653 else {
654 recsec = atoi(recsecstr);
655 }
656
657 /* open output file */
658 char *destfile = argv[optind + 2];
659 if(!strcmp("-", destfile)) {
660 use_stdout = TRUE;
661 wfd = 1; /* stdout */
662 }
663 else {
664 if(!fileless) {
665 wfd = open(argv[optind + 2], (O_RDWR | O_CREAT | O_TRUNC), 0666);
666 if(wfd < 0) {
667 fprintf(stderr, "Could not open output file(%s)\n", argv[optind + 2]);
668 return 1;
669 }
670 }
671 }
607 672
608 /* initialize decoder */ 673 /* initialize decoder */
609 if(use_b25) { 674 if(use_b25) {
610 dec = b25_startup(&dopt); 675 dec = b25_startup(&dopt);
611 if(!dec) { 676 if(!dec) {
615 } 680 }
616 } 681 }
617 682
618 /* initialize udp connection */ 683 /* initialize udp connection */
619 if(use_udp) { 684 if(use_udp) {
620 sdata = calloc(1, sizeof(sock_data)); 685 sockdata = calloc(1, sizeof(sock_data));
621 struct in_addr ia; 686 struct in_addr ia;
622 ia.s_addr = inet_addr(host_to); 687 ia.s_addr = inet_addr(host_to);
623 if(ia.s_addr == INADDR_NONE) { 688 if(ia.s_addr == INADDR_NONE) {
624 struct hostent *hoste = gethostbyname(host_to); 689 struct hostent *hoste = gethostbyname(host_to);
625 if(!hoste) { 690 if(!hoste) {
626 perror("failed to get host by name"); 691 perror("gethostbyname");
627 return 1; 692 return 1;
628 } 693 }
629 ia.s_addr = *(in_addr_t*) (hoste->h_addr_list[0]); 694 ia.s_addr = *(in_addr_t*) (hoste->h_addr_list[0]);
630 } 695 }
631 sdata->addr.sin_family = AF_INET; 696 sockdata->addr.sin_family = AF_INET;
632 sdata->addr.sin_port = htons (port_to); 697 sockdata->addr.sin_port = htons (port_to);
633 sdata->addr.sin_addr.s_addr = ia.s_addr; 698 sockdata->addr.sin_addr.s_addr = ia.s_addr;
634 if((sdata->sfd = socket(PF_INET, SOCK_DGRAM, 0)) < 0) { 699 if((sockdata->sfd = socket(PF_INET, SOCK_DGRAM, 0)) < 0) {
635 perror("socket"); 700 perror("socket");
636 return 1; 701 return 1;
637 } 702 }
638 } 703 }
639 704
640 /* open output file */ 705 /* setup tuner */
641 if(!fileless) {
642 wfd = open(argv[optind + 2], (O_RDWR | O_CREAT | O_TRUNC), 0666);
643 if(wfd < 0) {
644 fprintf(stderr, "Could not open output file(%s)\n", argv[optind + 2]);
645 return 1;
646 }
647 }
648 if(ptr->type == CHTYPE_SATELLITE) { 706 if(ptr->type == CHTYPE_SATELLITE) {
649 if(ioctl(tfd, LNB_ENABLE, 0) < 0) { 707 if(ioctl(tfd, LNB_ENABLE, 0) < 0) {
650 return 0 ; 708 return 0 ;
651 } 709 }
652 } 710 }
657 return 1; 715 return 1;
658 } 716 }
659 calc_cn(tfd, ptr->type); 717 calc_cn(tfd, ptr->type);
660 718
661 /* init signal handler thread */ 719 /* init signal handler thread */
662 init_signal_handlers(&signal_thread, tfd); 720 sdata.queue = p_queue;
721 sdata.tfd = tfd;
722 init_signal_handlers(&signal_thread, &sdata);
663 723
664 /* make reader thread */ 724 /* make reader thread */
665 tdata.queue = p_queue; 725 tdata.queue = p_queue;
666 tdata.decoder = dec; 726 tdata.decoder = dec;
667 tdata.wfd = wfd; 727 tdata.wfd = wfd;
668 tdata.sock_data = sdata; 728 tdata.sock_data = sockdata;
669 pthread_create(&dequeue_thread, NULL, write_func, &tdata); 729 tdata.signal_thread = signal_thread;
730 pthread_create(&dequeue_thread, NULL, reader_func, &tdata);
670 731
671 /* start recording */ 732 /* start recording */
672 if(ioctl(tfd, START_REC, 0) < 0) { 733 if(ioctl(tfd, START_REC, 0) < 0) {
673 fprintf(stderr, "Tuner could not start recording\n"); 734 fprintf(stderr, "Tuner could not start recording\n");
674 return 1; 735 return 1;
721 return 0 ; 782 return 0 ;
722 } 783 }
723 } 784 }
724 close(tfd); 785 close(tfd);
725 786
726 /* wait reader thread */ 787 /* wait for threads */
727 pthread_join(dequeue_thread, NULL); 788 pthread_join(dequeue_thread, NULL);
728 // fprintf(stderr, "dequeue_thread joined\n");
729
730 pthread_join(signal_thread, NULL); 789 pthread_join(signal_thread, NULL);
731 // fprintf(stderr, "signal_thread joined\n"); 790
732 791 /* release queue */
733 /* relase queue */
734 destroy_queue(p_queue); 792 destroy_queue(p_queue);
735 793
794 /* close output file */
795 if(!use_stdout)
796 close(wfd);
797
736 /* free socket data */ 798 /* free socket data */
737 free(sdata); 799 if(use_udp) {
800 close(sockdata->sfd);
801 free(sockdata);
802 }
738 803
739 /* release decoder */ 804 /* release decoder */
740 if(use_b25) { 805 if(use_b25) {
741 b25_shutdown(dec); 806 b25_shutdown(dec);
742 } 807 }
743 808
744 // fprintf(stderr, "leaving main\n");
745
746 return 0; 809 return 0;
747 } 810 }