comparison pthread.c @ 2023:50e92cec1b84 libavcodec

* reimplementation using mutexes and condition variables.
author romansh
date Tue, 18 May 2004 01:53:43 +0000
parents 00a6bfc81010
children e1b69326ae36
comparison
equal deleted inserted replaced
2022:62f5652f03c4 2023:50e92cec1b84
1 /* 1 /*
2 * Copyright (c) 2004 Michael Niedermayer <michaelni@gmx.at> 2 * Copyright (c) 2004 Roman Shaposhnik.
3 *
4 * Many thanks to Steven M. Schultz for providing clever ideas and
5 * to Michael Niedermayer <michaelni@gmx.at> for writing initial
6 * implementation.
3 * 7 *
4 * This library is free software; you can redistribute it and/or 8 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Lesser General Public 9 * modify it under the terms of the GNU Lesser General Public
6 * License as published by the Free Software Foundation; either 10 * License as published by the Free Software Foundation; either
7 * version 2 of the License, or (at your option) any later version. 11 * version 2 of the License, or (at your option) any later version.
14 * You should have received a copy of the GNU Lesser General Public 18 * You should have received a copy of the GNU Lesser General Public
15 * License along with this library; if not, write to the Free Software 19 * License along with this library; if not, write to the Free Software
16 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 20 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17 * 21 *
18 */ 22 */
19 #include <semaphore.h>
20 #include <pthread.h> 23 #include <pthread.h>
21
22 //#define DEBUG
23 24
24 #include "avcodec.h" 25 #include "avcodec.h"
25 #include "common.h" 26 #include "common.h"
26 27
27 typedef struct JobContext{ 28 typedef int (action_t)(AVCodecContext *c, void *arg);
28 sem_t available_sem;
29 int assigned;
30 int (*func)(AVCodecContext *c, void *arg);
31 void *arg;
32 int ret;
33 }JobContext;
34 29
35 typedef struct WorkerContext{ 30 typedef struct ThreadContext {
36 AVCodecContext *avctx; 31 pthread_t *workers;
37 pthread_t thread; 32 action_t *func;
38 int start_index; 33 void **args;
39 sem_t work_sem; 34 int *rets;
40 sem_t done_sem; 35 int rets_count;
41 }WorkerContext; 36 int job_count;
37
38 pthread_cond_t last_job_cond;
39 pthread_cond_t current_job_cond;
40 pthread_mutex_t current_job_lock;
41 int current_job;
42 int done;
43 } ThreadContext;
42 44
43 typedef struct ThreadContext{ 45 static void* worker(void *v)
44 WorkerContext *worker; 46 {
45 JobContext *job; 47 AVCodecContext *avctx = v;
46 int job_count; 48 ThreadContext *c = avctx->thread_opaque;
47 int allocated_job_count; 49 int our_job = c->job_count;
48 }ThreadContext; 50 int thread_count = avctx->thread_count;
51 int self_id;
49 52
50 static void * thread_func(void *v){ 53 pthread_mutex_lock(&c->current_job_lock);
51 WorkerContext *w= v; 54 self_id = c->current_job++;
52 ThreadContext *c= w->avctx->thread_opaque; 55 for (;;){
53 int i; 56 while (our_job >= c->job_count) {
54 57 if (c->current_job == thread_count + c->job_count)
55 for(;;){ 58 pthread_cond_signal(&c->last_job_cond);
56 //av_log(w->avctx, AV_LOG_DEBUG, "thread_func %X enter wait\n", (int)v);
57 sem_wait(&w->work_sem);
58 //av_log(w->avctx, AV_LOG_DEBUG, "thread_func %X after wait\n", (int)v);
59 if(c->job_count == 0)
60 break;
61
62 for(i=0; i<c->job_count; i++){
63 int index= (i + w->start_index) % c->job_count;
64 JobContext *j= &c->job[index];
65
66 //av_log(w->avctx, AV_LOG_DEBUG, "thread_func %X first check of %d\n", (int)v, index);
67 if(j->assigned) continue; //unsynced check, if != 0 it is already given to another worker, it never becomes available before the next execute() call so this should be safe
68 59
69 //av_log(w->avctx, AV_LOG_DEBUG, "thread_func %X second check of %d\n", (int)v, index); 60 pthread_cond_wait(&c->current_job_cond, &c->current_job_lock);
70 if(sem_trywait(&j->available_sem) == 0){ 61 our_job = self_id;
71 j->assigned=1; 62
72 j->ret= j->func(w->avctx, j->arg); 63 if (c->done) {
73 //av_log(w->avctx, AV_LOG_DEBUG, "thread_func %X done %d\n", (int)v, index); 64 pthread_mutex_unlock(&c->current_job_lock);
74 } 65 return NULL;
75 } 66 }
76 //av_log(w->avctx, AV_LOG_DEBUG, "thread_func %X complete\n", (int)v); 67 }
77 sem_post(&w->done_sem); 68 pthread_mutex_unlock(&c->current_job_lock);
69
70 c->rets[our_job%c->rets_count] = c->func(avctx, c->args[our_job]);
71
72 pthread_mutex_lock(&c->current_job_lock);
73 our_job = c->current_job++;
78 } 74 }
79
80 return NULL;
81 } 75 }
82 76
83 /** 77 static always_inline void avcodec_thread_park_workers(ThreadContext *c, int thread_count)
84 * free what has been allocated by avcodec_thread_init(). 78 {
85 * must be called after decoding has finished, especially dont call while avcodec_thread_execute() is running 79 pthread_cond_wait(&c->last_job_cond, &c->current_job_lock);
86 */ 80 pthread_mutex_unlock(&c->current_job_lock);
87 void avcodec_thread_free(AVCodecContext *s){
88 ThreadContext *c= s->thread_opaque;
89 int i, val;
90
91 for(i=0; i<c->allocated_job_count; i++){
92 sem_getvalue(&c->job[i].available_sem, &val); assert(val == 0);
93 sem_destroy(&c->job[i].available_sem);
94 }
95
96 c->job_count= 0;
97 for(i=0; i<s->thread_count; i++){
98 sem_getvalue(&c->worker[i].work_sem, &val); assert(val == 0);
99 sem_getvalue(&c->worker[i].done_sem, &val); assert(val == 0);
100
101 sem_post(&c->worker[i].work_sem);
102 pthread_join(c->worker[i].thread, NULL);
103 sem_destroy(&c->worker[i].work_sem);
104 sem_destroy(&c->worker[i].done_sem);
105 }
106
107 av_freep(&c->job);
108 av_freep(&c->worker);
109 av_freep(&s->thread_opaque);
110 } 81 }
111 82
112 int avcodec_thread_execute(AVCodecContext *s, int (*func)(AVCodecContext *c2, void *arg2),void **arg, int *ret, int job_count){ 83 void avcodec_thread_free(AVCodecContext *avctx)
113 ThreadContext *c= s->thread_opaque; 84 {
114 int i, val; 85 ThreadContext *c = avctx->thread_opaque;
86 int i;
115 87
116 assert(s == c->avctx); 88 pthread_mutex_lock(&c->current_job_lock);
117 if(job_count > c->allocated_job_count){ 89 c->done = 1;
118 c->job= av_realloc(c->job, job_count*sizeof(JobContext)); 90 pthread_cond_signal(&c->current_job_cond);
91 pthread_mutex_unlock(&c->current_job_lock);
119 92
120 for(i=c->allocated_job_count; i<job_count; i++){ 93 for (i=0; i<avctx->thread_count; i++)
121 memset(&c->job[i], 0, sizeof(JobContext)); 94 pthread_join(c->workers[i], NULL);
122 c->allocated_job_count++;
123 95
124 if(sem_init(&c->job[i].available_sem, 0, 0)) 96 pthread_mutex_destroy(&c->current_job_lock);
125 return -1; 97 pthread_cond_destroy(&c->current_job_cond);
126 } 98 pthread_cond_destroy(&c->last_job_cond);
99 av_free(c->workers);
100 av_freep(c);
101 }
102
103 int avcodec_thread_execute(AVCodecContext *avctx, action_t* func, void **arg, int *ret, int job_count)
104 {
105 ThreadContext *c= avctx->thread_opaque;
106 int dummy_ret;
107
108 if (job_count <= 0)
109 return 0;
110
111 pthread_mutex_lock(&c->current_job_lock);
112
113 c->current_job = avctx->thread_count;
114 c->job_count = job_count;
115 c->args = arg;
116 c->func = func;
117 if (ret) {
118 c->rets = ret;
119 c->rets_count = job_count;
120 } else {
121 c->rets = &dummy_ret;
122 c->rets_count = 1;
127 } 123 }
128 c->job_count= job_count; 124 pthread_cond_broadcast(&c->current_job_cond);
125
126 avcodec_thread_park_workers(c, avctx->thread_count);
129 127
130 /* note, we can be certain that this is not called with the same AVCodecContext by different threads at the same time */
131
132 for(i=0; i<job_count; i++){
133 sem_getvalue(&c->job[i].available_sem, &val); assert(val == 0);
134
135 c->job[i].arg= arg[i];
136 c->job[i].func= func;
137 c->job[i].ret= 12345;
138 c->job[i].assigned= 0;
139 sem_post(&c->job[i].available_sem);
140 }
141
142 for(i=0; i<s->thread_count && i<job_count; i++){
143 sem_getvalue(&c->worker[i].work_sem, &val); assert(val == 0);
144 sem_getvalue(&c->worker[i].done_sem, &val); assert(val == 0);
145
146 c->worker[i].start_index= (i + job_count/2)/job_count;
147 //av_log(s, AV_LOG_DEBUG, "start worker %d\n", i);
148 sem_post(&c->worker[i].work_sem);
149 }
150
151 for(i=0; i<s->thread_count && i<job_count; i++){
152 //av_log(s, AV_LOG_DEBUG, "wait for worker %d\n", i);
153 sem_wait(&c->worker[i].done_sem);
154
155 sem_getvalue(&c->worker[i].work_sem, &val); assert(val == 0);
156 sem_getvalue(&c->worker[i].done_sem, &val); assert(val == 0);
157 }
158
159 for(i=0; i<job_count; i++){
160 sem_getvalue(&c->job[i].available_sem, &val); assert(val == 0);
161
162 c->job[i].func= NULL;
163 if(ret) ret[i]= c->job[i].ret;
164 }
165
166 return 0; 128 return 0;
167 } 129 }
168 130
169 int avcodec_thread_init(AVCodecContext *s, int thread_count){ 131 int avcodec_thread_init(AVCodecContext *avctx, int thread_count)
132 {
170 int i; 133 int i;
171 ThreadContext *c; 134 ThreadContext *c;
172 WorkerContext *worker;
173 135
174 s->thread_count= thread_count; 136 c = av_mallocz(sizeof(ThreadContext));
137 if (!c)
138 return -1;
139
140 c->workers = av_mallocz(sizeof(pthread_t)*thread_count);
141 if (!c->workers) {
142 av_free(c);
143 return -1;
144 }
175 145
176 assert(!s->thread_opaque); 146 avctx->thread_opaque = c;
177 c= av_mallocz(sizeof(ThreadContext)); 147 avctx->thread_count = thread_count;
178 worker= av_mallocz(sizeof(WorkerContext)*thread_count); 148 c->current_job = 0;
179 s->thread_opaque= c; 149 c->job_count = 0;
180 c->worker= worker; 150 c->done = 0;
181 151 pthread_cond_init(&c->current_job_cond, NULL);
182 for(i=0; i<thread_count; i++){ 152 pthread_cond_init(&c->last_job_cond, NULL);
183 //printf("init semaphors %d\n", i); fflush(stdout); 153 pthread_mutex_init(&c->current_job_lock, NULL);
184 worker[i].avctx= s; 154 pthread_mutex_lock(&c->current_job_lock);
185 if(sem_init(&worker[i].work_sem, 0, 0)) 155 for (i=0; i<thread_count; i++) {
186 goto fail; 156 if(pthread_create(&c->workers[i], NULL, worker, avctx)) {
187 if(sem_init(&worker[i].done_sem, 0, 0)) 157 avctx->thread_count = i;
188 goto fail; 158 pthread_mutex_unlock(&c->current_job_lock);
189 //printf("create thread %d\n", i); fflush(stdout); 159 avcodec_thread_free(avctx);
190 if(pthread_create(&worker[i].thread, NULL, thread_func, &worker[i])) 160 return -1;
191 goto fail; 161 }
192 } 162 }
193 //printf("init done\n"); fflush(stdout);
194 163
195 s->execute= avcodec_thread_execute; 164 avcodec_thread_park_workers(c, thread_count);
196 165
166 avctx->execute = avcodec_thread_execute;
197 return 0; 167 return 0;
198 fail:
199 avcodec_thread_free(s);
200 return -1;
201 } 168 }