Mercurial > libavcodec.hg
comparison pthread.c @ 2023:50e92cec1b84 libavcodec
* reimplementation using mutexes and condition variables.
author | romansh |
---|---|
date | Tue, 18 May 2004 01:53:43 +0000 |
parents | 00a6bfc81010 |
children | e1b69326ae36 |
comparison
equal
deleted
inserted
replaced
2022:62f5652f03c4 | 2023:50e92cec1b84 |
---|---|
1 /* | 1 /* |
2 * Copyright (c) 2004 Michael Niedermayer <michaelni@gmx.at> | 2 * Copyright (c) 2004 Roman Shaposhnik. |
3 * | |
4 * Many thanks to Steven M. Schultz for providing clever ideas and | |
5 * to Michael Niedermayer <michaelni@gmx.at> for writing initial | |
6 * implementation. | |
3 * | 7 * |
4 * This library is free software; you can redistribute it and/or | 8 * This library is free software; you can redistribute it and/or |
5 * modify it under the terms of the GNU Lesser General Public | 9 * modify it under the terms of the GNU Lesser General Public |
6 * License as published by the Free Software Foundation; either | 10 * License as published by the Free Software Foundation; either |
7 * version 2 of the License, or (at your option) any later version. | 11 * version 2 of the License, or (at your option) any later version. |
14 * You should have received a copy of the GNU Lesser General Public | 18 * You should have received a copy of the GNU Lesser General Public |
15 * License along with this library; if not, write to the Free Software | 19 * License along with this library; if not, write to the Free Software |
16 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA | 20 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA |
17 * | 21 * |
18 */ | 22 */ |
19 #include <semaphore.h> | |
20 #include <pthread.h> | 23 #include <pthread.h> |
21 | |
22 //#define DEBUG | |
23 | 24 |
24 #include "avcodec.h" | 25 #include "avcodec.h" |
25 #include "common.h" | 26 #include "common.h" |
26 | 27 |
27 typedef struct JobContext{ | 28 typedef int (action_t)(AVCodecContext *c, void *arg); |
28 sem_t available_sem; | |
29 int assigned; | |
30 int (*func)(AVCodecContext *c, void *arg); | |
31 void *arg; | |
32 int ret; | |
33 }JobContext; | |
34 | 29 |
35 typedef struct WorkerContext{ | 30 typedef struct ThreadContext { |
36 AVCodecContext *avctx; | 31 pthread_t *workers; |
37 pthread_t thread; | 32 action_t *func; |
38 int start_index; | 33 void **args; |
39 sem_t work_sem; | 34 int *rets; |
40 sem_t done_sem; | 35 int rets_count; |
41 }WorkerContext; | 36 int job_count; |
37 | |
38 pthread_cond_t last_job_cond; | |
39 pthread_cond_t current_job_cond; | |
40 pthread_mutex_t current_job_lock; | |
41 int current_job; | |
42 int done; | |
43 } ThreadContext; | |
42 | 44 |
43 typedef struct ThreadContext{ | 45 static void* worker(void *v) |
44 WorkerContext *worker; | 46 { |
45 JobContext *job; | 47 AVCodecContext *avctx = v; |
46 int job_count; | 48 ThreadContext *c = avctx->thread_opaque; |
47 int allocated_job_count; | 49 int our_job = c->job_count; |
48 }ThreadContext; | 50 int thread_count = avctx->thread_count; |
51 int self_id; | |
49 | 52 |
50 static void * thread_func(void *v){ | 53 pthread_mutex_lock(&c->current_job_lock); |
51 WorkerContext *w= v; | 54 self_id = c->current_job++; |
52 ThreadContext *c= w->avctx->thread_opaque; | 55 for (;;){ |
53 int i; | 56 while (our_job >= c->job_count) { |
54 | 57 if (c->current_job == thread_count + c->job_count) |
55 for(;;){ | 58 pthread_cond_signal(&c->last_job_cond); |
56 //av_log(w->avctx, AV_LOG_DEBUG, "thread_func %X enter wait\n", (int)v); | |
57 sem_wait(&w->work_sem); | |
58 //av_log(w->avctx, AV_LOG_DEBUG, "thread_func %X after wait\n", (int)v); | |
59 if(c->job_count == 0) | |
60 break; | |
61 | |
62 for(i=0; i<c->job_count; i++){ | |
63 int index= (i + w->start_index) % c->job_count; | |
64 JobContext *j= &c->job[index]; | |
65 | |
66 //av_log(w->avctx, AV_LOG_DEBUG, "thread_func %X first check of %d\n", (int)v, index); | |
67 if(j->assigned) continue; //unsynced check, if != 0 it is already given to another worker, it never becomes available before the next execute() call so this should be safe | |
68 | 59 |
69 //av_log(w->avctx, AV_LOG_DEBUG, "thread_func %X second check of %d\n", (int)v, index); | 60 pthread_cond_wait(&c->current_job_cond, &c->current_job_lock); |
70 if(sem_trywait(&j->available_sem) == 0){ | 61 our_job = self_id; |
71 j->assigned=1; | 62 |
72 j->ret= j->func(w->avctx, j->arg); | 63 if (c->done) { |
73 //av_log(w->avctx, AV_LOG_DEBUG, "thread_func %X done %d\n", (int)v, index); | 64 pthread_mutex_unlock(&c->current_job_lock); |
74 } | 65 return NULL; |
75 } | 66 } |
76 //av_log(w->avctx, AV_LOG_DEBUG, "thread_func %X complete\n", (int)v); | 67 } |
77 sem_post(&w->done_sem); | 68 pthread_mutex_unlock(&c->current_job_lock); |
69 | |
70 c->rets[our_job%c->rets_count] = c->func(avctx, c->args[our_job]); | |
71 | |
72 pthread_mutex_lock(&c->current_job_lock); | |
73 our_job = c->current_job++; | |
78 } | 74 } |
79 | |
80 return NULL; | |
81 } | 75 } |
82 | 76 |
83 /** | 77 static always_inline void avcodec_thread_park_workers(ThreadContext *c, int thread_count) |
84 * free what has been allocated by avcodec_thread_init(). | 78 { |
85 * must be called after decoding has finished, especially dont call while avcodec_thread_execute() is running | 79 pthread_cond_wait(&c->last_job_cond, &c->current_job_lock); |
86 */ | 80 pthread_mutex_unlock(&c->current_job_lock); |
87 void avcodec_thread_free(AVCodecContext *s){ | |
88 ThreadContext *c= s->thread_opaque; | |
89 int i, val; | |
90 | |
91 for(i=0; i<c->allocated_job_count; i++){ | |
92 sem_getvalue(&c->job[i].available_sem, &val); assert(val == 0); | |
93 sem_destroy(&c->job[i].available_sem); | |
94 } | |
95 | |
96 c->job_count= 0; | |
97 for(i=0; i<s->thread_count; i++){ | |
98 sem_getvalue(&c->worker[i].work_sem, &val); assert(val == 0); | |
99 sem_getvalue(&c->worker[i].done_sem, &val); assert(val == 0); | |
100 | |
101 sem_post(&c->worker[i].work_sem); | |
102 pthread_join(c->worker[i].thread, NULL); | |
103 sem_destroy(&c->worker[i].work_sem); | |
104 sem_destroy(&c->worker[i].done_sem); | |
105 } | |
106 | |
107 av_freep(&c->job); | |
108 av_freep(&c->worker); | |
109 av_freep(&s->thread_opaque); | |
110 } | 81 } |
111 | 82 |
112 int avcodec_thread_execute(AVCodecContext *s, int (*func)(AVCodecContext *c2, void *arg2),void **arg, int *ret, int job_count){ | 83 void avcodec_thread_free(AVCodecContext *avctx) |
113 ThreadContext *c= s->thread_opaque; | 84 { |
114 int i, val; | 85 ThreadContext *c = avctx->thread_opaque; |
86 int i; | |
115 | 87 |
116 assert(s == c->avctx); | 88 pthread_mutex_lock(&c->current_job_lock); |
117 if(job_count > c->allocated_job_count){ | 89 c->done = 1; |
118 c->job= av_realloc(c->job, job_count*sizeof(JobContext)); | 90 pthread_cond_signal(&c->current_job_cond); |
91 pthread_mutex_unlock(&c->current_job_lock); | |
119 | 92 |
120 for(i=c->allocated_job_count; i<job_count; i++){ | 93 for (i=0; i<avctx->thread_count; i++) |
121 memset(&c->job[i], 0, sizeof(JobContext)); | 94 pthread_join(c->workers[i], NULL); |
122 c->allocated_job_count++; | |
123 | 95 |
124 if(sem_init(&c->job[i].available_sem, 0, 0)) | 96 pthread_mutex_destroy(&c->current_job_lock); |
125 return -1; | 97 pthread_cond_destroy(&c->current_job_cond); |
126 } | 98 pthread_cond_destroy(&c->last_job_cond); |
99 av_free(c->workers); | |
100 av_freep(c); | |
101 } | |
102 | |
103 int avcodec_thread_execute(AVCodecContext *avctx, action_t* func, void **arg, int *ret, int job_count) | |
104 { | |
105 ThreadContext *c= avctx->thread_opaque; | |
106 int dummy_ret; | |
107 | |
108 if (job_count <= 0) | |
109 return 0; | |
110 | |
111 pthread_mutex_lock(&c->current_job_lock); | |
112 | |
113 c->current_job = avctx->thread_count; | |
114 c->job_count = job_count; | |
115 c->args = arg; | |
116 c->func = func; | |
117 if (ret) { | |
118 c->rets = ret; | |
119 c->rets_count = job_count; | |
120 } else { | |
121 c->rets = &dummy_ret; | |
122 c->rets_count = 1; | |
127 } | 123 } |
128 c->job_count= job_count; | 124 pthread_cond_broadcast(&c->current_job_cond); |
125 | |
126 avcodec_thread_park_workers(c, avctx->thread_count); | |
129 | 127 |
130 /* note, we can be certain that this is not called with the same AVCodecContext by different threads at the same time */ | |
131 | |
132 for(i=0; i<job_count; i++){ | |
133 sem_getvalue(&c->job[i].available_sem, &val); assert(val == 0); | |
134 | |
135 c->job[i].arg= arg[i]; | |
136 c->job[i].func= func; | |
137 c->job[i].ret= 12345; | |
138 c->job[i].assigned= 0; | |
139 sem_post(&c->job[i].available_sem); | |
140 } | |
141 | |
142 for(i=0; i<s->thread_count && i<job_count; i++){ | |
143 sem_getvalue(&c->worker[i].work_sem, &val); assert(val == 0); | |
144 sem_getvalue(&c->worker[i].done_sem, &val); assert(val == 0); | |
145 | |
146 c->worker[i].start_index= (i + job_count/2)/job_count; | |
147 //av_log(s, AV_LOG_DEBUG, "start worker %d\n", i); | |
148 sem_post(&c->worker[i].work_sem); | |
149 } | |
150 | |
151 for(i=0; i<s->thread_count && i<job_count; i++){ | |
152 //av_log(s, AV_LOG_DEBUG, "wait for worker %d\n", i); | |
153 sem_wait(&c->worker[i].done_sem); | |
154 | |
155 sem_getvalue(&c->worker[i].work_sem, &val); assert(val == 0); | |
156 sem_getvalue(&c->worker[i].done_sem, &val); assert(val == 0); | |
157 } | |
158 | |
159 for(i=0; i<job_count; i++){ | |
160 sem_getvalue(&c->job[i].available_sem, &val); assert(val == 0); | |
161 | |
162 c->job[i].func= NULL; | |
163 if(ret) ret[i]= c->job[i].ret; | |
164 } | |
165 | |
166 return 0; | 128 return 0; |
167 } | 129 } |
168 | 130 |
169 int avcodec_thread_init(AVCodecContext *s, int thread_count){ | 131 int avcodec_thread_init(AVCodecContext *avctx, int thread_count) |
132 { | |
170 int i; | 133 int i; |
171 ThreadContext *c; | 134 ThreadContext *c; |
172 WorkerContext *worker; | |
173 | 135 |
174 s->thread_count= thread_count; | 136 c = av_mallocz(sizeof(ThreadContext)); |
137 if (!c) | |
138 return -1; | |
139 | |
140 c->workers = av_mallocz(sizeof(pthread_t)*thread_count); | |
141 if (!c->workers) { | |
142 av_free(c); | |
143 return -1; | |
144 } | |
175 | 145 |
176 assert(!s->thread_opaque); | 146 avctx->thread_opaque = c; |
177 c= av_mallocz(sizeof(ThreadContext)); | 147 avctx->thread_count = thread_count; |
178 worker= av_mallocz(sizeof(WorkerContext)*thread_count); | 148 c->current_job = 0; |
179 s->thread_opaque= c; | 149 c->job_count = 0; |
180 c->worker= worker; | 150 c->done = 0; |
181 | 151 pthread_cond_init(&c->current_job_cond, NULL); |
182 for(i=0; i<thread_count; i++){ | 152 pthread_cond_init(&c->last_job_cond, NULL); |
183 //printf("init semaphors %d\n", i); fflush(stdout); | 153 pthread_mutex_init(&c->current_job_lock, NULL); |
184 worker[i].avctx= s; | 154 pthread_mutex_lock(&c->current_job_lock); |
185 if(sem_init(&worker[i].work_sem, 0, 0)) | 155 for (i=0; i<thread_count; i++) { |
186 goto fail; | 156 if(pthread_create(&c->workers[i], NULL, worker, avctx)) { |
187 if(sem_init(&worker[i].done_sem, 0, 0)) | 157 avctx->thread_count = i; |
188 goto fail; | 158 pthread_mutex_unlock(&c->current_job_lock); |
189 //printf("create thread %d\n", i); fflush(stdout); | 159 avcodec_thread_free(avctx); |
190 if(pthread_create(&worker[i].thread, NULL, thread_func, &worker[i])) | 160 return -1; |
191 goto fail; | 161 } |
192 } | 162 } |
193 //printf("init done\n"); fflush(stdout); | |
194 | 163 |
195 s->execute= avcodec_thread_execute; | 164 avcodec_thread_park_workers(c, thread_count); |
196 | 165 |
166 avctx->execute = avcodec_thread_execute; | |
197 return 0; | 167 return 0; |
198 fail: | |
199 avcodec_thread_free(s); | |
200 return -1; | |
201 } | 168 } |