comparison pthread.c @ 1857:00a6bfc81010 libavcodec

count > thread_count for execute()
author michael
date Wed, 03 Mar 2004 19:29:00 +0000
parents 7366bb5c363f
children 50e92cec1b84
comparison
equal deleted inserted replaced
1856:ed6eb3e304cc 1857:00a6bfc81010
22 //#define DEBUG 22 //#define DEBUG
23 23
24 #include "avcodec.h" 24 #include "avcodec.h"
25 #include "common.h" 25 #include "common.h"
26 26
27 27 typedef struct JobContext{
28 typedef struct ThreadContext{ 28 sem_t available_sem;
29 AVCodecContext *avctx; 29 int assigned;
30 pthread_t thread;
31 sem_t work_sem;
32 sem_t done_sem;
33 int (*func)(AVCodecContext *c, void *arg); 30 int (*func)(AVCodecContext *c, void *arg);
34 void *arg; 31 void *arg;
35 int ret; 32 int ret;
33 }JobContext;
34
35 typedef struct WorkerContext{
36 AVCodecContext *avctx;
37 pthread_t thread;
38 int start_index;
39 sem_t work_sem;
40 sem_t done_sem;
41 }WorkerContext;
42
43 typedef struct ThreadContext{
44 WorkerContext *worker;
45 JobContext *job;
46 int job_count;
47 int allocated_job_count;
36 }ThreadContext; 48 }ThreadContext;
37 49
38 static void * thread_func(void *v){ 50 static void * thread_func(void *v){
39 ThreadContext *c= v; 51 WorkerContext *w= v;
52 ThreadContext *c= w->avctx->thread_opaque;
53 int i;
40 54
41 for(;;){ 55 for(;;){
42 //printf("thread_func %X enter wait\n", (int)v); fflush(stdout); 56 //av_log(w->avctx, AV_LOG_DEBUG, "thread_func %X enter wait\n", (int)v);
43 sem_wait(&c->work_sem); 57 sem_wait(&w->work_sem);
44 //printf("thread_func %X after wait (func=%X)\n", (int)v, (int)c->func); fflush(stdout); 58 //av_log(w->avctx, AV_LOG_DEBUG, "thread_func %X after wait\n", (int)v);
45 if(c->func) 59 if(c->job_count == 0)
46 c->ret= c->func(c->avctx, c->arg); 60 break;
47 else 61
48 return NULL; 62 for(i=0; i<c->job_count; i++){
49 //printf("thread_func %X signal complete\n", (int)v); fflush(stdout); 63 int index= (i + w->start_index) % c->job_count;
50 sem_post(&c->done_sem); 64 JobContext *j= &c->job[index];
65
66 //av_log(w->avctx, AV_LOG_DEBUG, "thread_func %X first check of %d\n", (int)v, index);
67 if(j->assigned) continue; //unsynced check, if != 0 it is already given to another worker, it never becomes available before the next execute() call so this should be safe
68
69 //av_log(w->avctx, AV_LOG_DEBUG, "thread_func %X second check of %d\n", (int)v, index);
70 if(sem_trywait(&j->available_sem) == 0){
71 j->assigned=1;
72 j->ret= j->func(w->avctx, j->arg);
73 //av_log(w->avctx, AV_LOG_DEBUG, "thread_func %X done %d\n", (int)v, index);
74 }
75 }
76 //av_log(w->avctx, AV_LOG_DEBUG, "thread_func %X complete\n", (int)v);
77 sem_post(&w->done_sem);
51 } 78 }
52 79
53 return NULL; 80 return NULL;
54 } 81 }
55 82
57 * free what has been allocated by avcodec_thread_init(). 84 * free what has been allocated by avcodec_thread_init().
58 * must be called after decoding has finished, especially dont call while avcodec_thread_execute() is running 85 * must be called after decoding has finished, especially dont call while avcodec_thread_execute() is running
59 */ 86 */
60 void avcodec_thread_free(AVCodecContext *s){ 87 void avcodec_thread_free(AVCodecContext *s){
61 ThreadContext *c= s->thread_opaque; 88 ThreadContext *c= s->thread_opaque;
62 int i; 89 int i, val;
63 90
91 for(i=0; i<c->allocated_job_count; i++){
92 sem_getvalue(&c->job[i].available_sem, &val); assert(val == 0);
93 sem_destroy(&c->job[i].available_sem);
94 }
95
96 c->job_count= 0;
64 for(i=0; i<s->thread_count; i++){ 97 for(i=0; i<s->thread_count; i++){
65 int val; 98 sem_getvalue(&c->worker[i].work_sem, &val); assert(val == 0);
66 99 sem_getvalue(&c->worker[i].done_sem, &val); assert(val == 0);
67 sem_getvalue(&c[i].work_sem, &val); assert(val == 0); 100
68 sem_getvalue(&c[i].done_sem, &val); assert(val == 0); 101 sem_post(&c->worker[i].work_sem);
69 102 pthread_join(c->worker[i].thread, NULL);
70 c[i].func= NULL; 103 sem_destroy(&c->worker[i].work_sem);
71 sem_post(&c[i].work_sem); 104 sem_destroy(&c->worker[i].done_sem);
72 pthread_join(c[i].thread, NULL); 105 }
73 sem_destroy(&c[i].work_sem); 106
74 sem_destroy(&c[i].done_sem); 107 av_freep(&c->job);
75 } 108 av_freep(&c->worker);
76
77 av_freep(&s->thread_opaque); 109 av_freep(&s->thread_opaque);
78 } 110 }
79 111
80 int avcodec_thread_execute(AVCodecContext *s, int (*func)(AVCodecContext *c2, void *arg2),void **arg, int *ret, int count){ 112 int avcodec_thread_execute(AVCodecContext *s, int (*func)(AVCodecContext *c2, void *arg2),void **arg, int *ret, int job_count){
81 ThreadContext *c= s->thread_opaque; 113 ThreadContext *c= s->thread_opaque;
82 int i, val; 114 int i, val;
83 115
84 assert(s == c->avctx); 116 assert(s == c->avctx);
85 assert(count <= s->thread_count); 117 if(job_count > c->allocated_job_count){
118 c->job= av_realloc(c->job, job_count*sizeof(JobContext));
119
120 for(i=c->allocated_job_count; i<job_count; i++){
121 memset(&c->job[i], 0, sizeof(JobContext));
122 c->allocated_job_count++;
123
124 if(sem_init(&c->job[i].available_sem, 0, 0))
125 return -1;
126 }
127 }
128 c->job_count= job_count;
86 129
87 /* note, we can be certain that this is not called with the same AVCodecContext by different threads at the same time */ 130 /* note, we can be certain that this is not called with the same AVCodecContext by different threads at the same time */
88 131
89 for(i=0; i<count; i++){ 132 for(i=0; i<job_count; i++){
90 sem_getvalue(&c[i].work_sem, &val); assert(val == 0); 133 sem_getvalue(&c->job[i].available_sem, &val); assert(val == 0);
91 sem_getvalue(&c[i].done_sem, &val); assert(val == 0); 134
92 135 c->job[i].arg= arg[i];
93 c[i].arg= arg[i]; 136 c->job[i].func= func;
94 c[i].func= func; 137 c->job[i].ret= 12345;
95 c[i].ret= 12345; 138 c->job[i].assigned= 0;
96 sem_post(&c[i].work_sem); 139 sem_post(&c->job[i].available_sem);
97 } 140 }
98 for(i=0; i<count; i++){ 141
99 sem_wait(&c[i].done_sem); 142 for(i=0; i<s->thread_count && i<job_count; i++){
100 143 sem_getvalue(&c->worker[i].work_sem, &val); assert(val == 0);
101 sem_getvalue(&c[i].work_sem, &val); assert(val == 0); 144 sem_getvalue(&c->worker[i].done_sem, &val); assert(val == 0);
102 sem_getvalue(&c[i].done_sem, &val); assert(val == 0); 145
103 146 c->worker[i].start_index= (i + job_count/2)/job_count;
104 c[i].func= NULL; 147 //av_log(s, AV_LOG_DEBUG, "start worker %d\n", i);
105 if(ret) ret[i]= c[i].ret; 148 sem_post(&c->worker[i].work_sem);
106 } 149 }
150
151 for(i=0; i<s->thread_count && i<job_count; i++){
152 //av_log(s, AV_LOG_DEBUG, "wait for worker %d\n", i);
153 sem_wait(&c->worker[i].done_sem);
154
155 sem_getvalue(&c->worker[i].work_sem, &val); assert(val == 0);
156 sem_getvalue(&c->worker[i].done_sem, &val); assert(val == 0);
157 }
158
159 for(i=0; i<job_count; i++){
160 sem_getvalue(&c->job[i].available_sem, &val); assert(val == 0);
161
162 c->job[i].func= NULL;
163 if(ret) ret[i]= c->job[i].ret;
164 }
165
107 return 0; 166 return 0;
108 } 167 }
109 168
110 int avcodec_thread_init(AVCodecContext *s, int thread_count){ 169 int avcodec_thread_init(AVCodecContext *s, int thread_count){
111 int i; 170 int i;
112 ThreadContext *c; 171 ThreadContext *c;
172 WorkerContext *worker;
113 173
114 s->thread_count= thread_count; 174 s->thread_count= thread_count;
115 175
116 assert(!s->thread_opaque); 176 assert(!s->thread_opaque);
117 c= av_mallocz(sizeof(ThreadContext)*thread_count); 177 c= av_mallocz(sizeof(ThreadContext));
178 worker= av_mallocz(sizeof(WorkerContext)*thread_count);
118 s->thread_opaque= c; 179 s->thread_opaque= c;
119 180 c->worker= worker;
181
120 for(i=0; i<thread_count; i++){ 182 for(i=0; i<thread_count; i++){
121 //printf("init semaphors %d\n", i); fflush(stdout); 183 //printf("init semaphors %d\n", i); fflush(stdout);
122 c[i].avctx= s; 184 worker[i].avctx= s;
123 if(sem_init(&c[i].work_sem, 0, 0)) 185 if(sem_init(&worker[i].work_sem, 0, 0))
124 goto fail; 186 goto fail;
125 if(sem_init(&c[i].done_sem, 0, 0)) 187 if(sem_init(&worker[i].done_sem, 0, 0))
126 goto fail; 188 goto fail;
127 //printf("create thread %d\n", i); fflush(stdout); 189 //printf("create thread %d\n", i); fflush(stdout);
128 if(pthread_create(&c[i].thread, NULL, thread_func, &c[i])) 190 if(pthread_create(&worker[i].thread, NULL, thread_func, &worker[i]))
129 goto fail; 191 goto fail;
130 } 192 }
131 //printf("init done\n"); fflush(stdout); 193 //printf("init done\n"); fflush(stdout);
132 194
133 s->execute= avcodec_thread_execute; 195 s->execute= avcodec_thread_execute;