changeset 1857:00a6bfc81010 libavcodec

count > thread_count for execute()
author michael
date Wed, 03 Mar 2004 19:29:00 +0000
parents ed6eb3e304cc
children ea2a4058441c
files avcodec.h pthread.c
diffstat 2 files changed, 114 insertions(+), 52 deletions(-) [+]
line wrap: on
line diff
--- a/avcodec.h	Wed Mar 03 17:53:55 2004 +0000
+++ b/avcodec.h	Wed Mar 03 19:29:00 2004 +0000
@@ -17,7 +17,7 @@
 
 #define FFMPEG_VERSION_INT     0x000408
 #define FFMPEG_VERSION         "0.4.8"
-#define LIBAVCODEC_BUILD       4706
+#define LIBAVCODEC_BUILD       4707
 
 #define LIBAVCODEC_VERSION_INT FFMPEG_VERSION_INT
 #define LIBAVCODEC_VERSION     FFMPEG_VERSION
@@ -1547,7 +1547,7 @@
      * the codec may call this to execute several independant things. it will return only after
      * finishing all tasks, the user may replace this with some multithreaded implementation, the
      * default implementation will execute the parts serially
-     * @param count the number of functions this will be identical to thread_count if possible
+     * @param count the number of things to execute
      * - encoding: set by lavc, user can override
      * - decoding: set by lavc, user can override
      */
--- a/pthread.c	Wed Mar 03 17:53:55 2004 +0000
+++ b/pthread.c	Wed Mar 03 19:29:00 2004 +0000
@@ -24,30 +24,57 @@
 #include "avcodec.h"
 #include "common.h"
 
-
-typedef struct ThreadContext{
-    AVCodecContext *avctx;
-    pthread_t thread;
-    sem_t work_sem;
-    sem_t done_sem;
+typedef struct JobContext{
+    sem_t available_sem;
+    int assigned;
     int (*func)(AVCodecContext *c, void *arg);
     void *arg;
     int ret;
+}JobContext;
+
+typedef struct WorkerContext{
+    AVCodecContext *avctx;
+    pthread_t thread;
+    int start_index;
+    sem_t work_sem;
+    sem_t done_sem;
+}WorkerContext;
+
+typedef struct ThreadContext{
+    WorkerContext *worker;
+    JobContext *job;
+    int job_count;
+    int allocated_job_count;
 }ThreadContext;
 
 static void * thread_func(void *v){
-    ThreadContext *c= v;
+    WorkerContext *w= v;
+    ThreadContext *c= w->avctx->thread_opaque;
+    int i;
 
     for(;;){
-//printf("thread_func %X enter wait\n", (int)v); fflush(stdout);
-        sem_wait(&c->work_sem);
-//printf("thread_func %X after wait (func=%X)\n", (int)v, (int)c->func); fflush(stdout);
-        if(c->func)
-            c->ret= c->func(c->avctx, c->arg);
-        else
-            return NULL;
-//printf("thread_func %X signal complete\n", (int)v); fflush(stdout);
-        sem_post(&c->done_sem);
+//av_log(w->avctx, AV_LOG_DEBUG, "thread_func %X enter wait\n", (int)v);
+        sem_wait(&w->work_sem);
+//av_log(w->avctx, AV_LOG_DEBUG, "thread_func %X after wait\n", (int)v);
+        if(c->job_count == 0)
+           break;
+        
+        for(i=0; i<c->job_count; i++){
+            int index= (i + w->start_index) % c->job_count;
+            JobContext *j= &c->job[index];
+        
+//av_log(w->avctx, AV_LOG_DEBUG, "thread_func %X first check of %d\n", (int)v, index);
+            if(j->assigned) continue; //unsynced check, if != 0 it is already given to another worker, it never becomes available before the next execute() call so this should be safe
+            
+//av_log(w->avctx, AV_LOG_DEBUG, "thread_func %X second check of %d\n", (int)v, index);
+            if(sem_trywait(&j->available_sem) == 0){
+                j->assigned=1;
+                j->ret= j->func(w->avctx, j->arg);
+//av_log(w->avctx, AV_LOG_DEBUG, "thread_func %X done %d\n", (int)v, index);
+            }
+        }
+//av_log(w->avctx, AV_LOG_DEBUG, "thread_func %X complete\n", (int)v);
+        sem_post(&w->done_sem);
     }
     
     return NULL;
@@ -59,73 +86,108 @@
  */
 void avcodec_thread_free(AVCodecContext *s){
     ThreadContext *c= s->thread_opaque;
-    int i;
-
-    for(i=0; i<s->thread_count; i++){
-        int val;
-        
-        sem_getvalue(&c[i].work_sem, &val); assert(val == 0);
-        sem_getvalue(&c[i].done_sem, &val); assert(val == 0);
-
-        c[i].func= NULL;
-        sem_post(&c[i].work_sem);
-        pthread_join(c[i].thread, NULL);
-        sem_destroy(&c[i].work_sem);
-        sem_destroy(&c[i].done_sem);
+    int i, val;
+    
+    for(i=0; i<c->allocated_job_count; i++){
+        sem_getvalue(&c->job[i].available_sem, &val); assert(val == 0);
+        sem_destroy(&c->job[i].available_sem);
     }
 
+    c->job_count= 0;
+    for(i=0; i<s->thread_count; i++){
+        sem_getvalue(&c->worker[i].work_sem, &val); assert(val == 0);
+        sem_getvalue(&c->worker[i].done_sem, &val); assert(val == 0);
+
+        sem_post(&c->worker[i].work_sem);
+        pthread_join(c->worker[i].thread, NULL);
+        sem_destroy(&c->worker[i].work_sem);
+        sem_destroy(&c->worker[i].done_sem);
+    }
+
+    av_freep(&c->job);
+    av_freep(&c->worker);
     av_freep(&s->thread_opaque);
 }
 
-int avcodec_thread_execute(AVCodecContext *s, int (*func)(AVCodecContext *c2, void *arg2),void **arg, int *ret, int count){
+int avcodec_thread_execute(AVCodecContext *s, int (*func)(AVCodecContext *c2, void *arg2),void **arg, int *ret, int job_count){
     ThreadContext *c= s->thread_opaque;
     int i, val;
     
     assert(s == c->avctx);
-    assert(count <= s->thread_count);
+    if(job_count > c->allocated_job_count){
+        c->job= av_realloc(c->job, job_count*sizeof(JobContext));
+
+        for(i=c->allocated_job_count; i<job_count; i++){
+            memset(&c->job[i], 0, sizeof(JobContext));
+            c->allocated_job_count++;
+
+            if(sem_init(&c->job[i].available_sem, 0, 0))
+                return -1;
+        }
+    }
+    c->job_count= job_count;
     
     /* note, we can be certain that this is not called with the same AVCodecContext by different threads at the same time */
 
-    for(i=0; i<count; i++){
-        sem_getvalue(&c[i].work_sem, &val); assert(val == 0);
-        sem_getvalue(&c[i].done_sem, &val); assert(val == 0);
+    for(i=0; i<job_count; i++){
+        sem_getvalue(&c->job[i].available_sem, &val); assert(val == 0);
         
-        c[i].arg= arg[i];
-        c[i].func= func;
-        c[i].ret= 12345;
-        sem_post(&c[i].work_sem);
+        c->job[i].arg= arg[i];
+        c->job[i].func= func;
+        c->job[i].ret= 12345;
+        c->job[i].assigned= 0;
+        sem_post(&c->job[i].available_sem);
     }
-    for(i=0; i<count; i++){
-        sem_wait(&c[i].done_sem);
+
+    for(i=0; i<s->thread_count && i<job_count; i++){
+        sem_getvalue(&c->worker[i].work_sem, &val); assert(val == 0);
+        sem_getvalue(&c->worker[i].done_sem, &val); assert(val == 0);
+
+        c->worker[i].start_index= (i + job_count/2)/job_count;
+//av_log(s, AV_LOG_DEBUG, "start worker %d\n", i);
+        sem_post(&c->worker[i].work_sem);
+    }
 
-        sem_getvalue(&c[i].work_sem, &val); assert(val == 0);
-        sem_getvalue(&c[i].done_sem, &val); assert(val == 0);
+    for(i=0; i<s->thread_count && i<job_count; i++){
+//av_log(s, AV_LOG_DEBUG, "wait for worker %d\n", i);
+        sem_wait(&c->worker[i].done_sem);
+
+        sem_getvalue(&c->worker[i].work_sem, &val); assert(val == 0);
+        sem_getvalue(&c->worker[i].done_sem, &val); assert(val == 0);
+    }
+
+    for(i=0; i<job_count; i++){
+        sem_getvalue(&c->job[i].available_sem, &val); assert(val == 0);
         
-        c[i].func= NULL;
-        if(ret) ret[i]= c[i].ret;
+        c->job[i].func= NULL;
+        if(ret) ret[i]= c->job[i].ret;
     }
+
     return 0;
 }
 
 int avcodec_thread_init(AVCodecContext *s, int thread_count){
     int i;
     ThreadContext *c;
+    WorkerContext *worker;
 
     s->thread_count= thread_count;
 
     assert(!s->thread_opaque);
-    c= av_mallocz(sizeof(ThreadContext)*thread_count);
+    c= av_mallocz(sizeof(ThreadContext));
+    worker= av_mallocz(sizeof(WorkerContext)*thread_count);
     s->thread_opaque= c;
-    
+    c->worker= worker;
+        
     for(i=0; i<thread_count; i++){
 //printf("init semaphors %d\n", i); fflush(stdout);
-        c[i].avctx= s;
-        if(sem_init(&c[i].work_sem, 0, 0))
+        worker[i].avctx= s;
+        if(sem_init(&worker[i].work_sem, 0, 0))
             goto fail;
-        if(sem_init(&c[i].done_sem, 0, 0))
+        if(sem_init(&worker[i].done_sem, 0, 0))
             goto fail;
 //printf("create thread %d\n", i); fflush(stdout);
-        if(pthread_create(&c[i].thread, NULL, thread_func, &c[i]))
+        if(pthread_create(&worker[i].thread, NULL, thread_func, &worker[i]))
             goto fail;
     }
 //printf("init done\n"); fflush(stdout);