1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101
| #include "thpool.h" #include <malloc.h>
static void *_worker_proc(void *param) { struct thpool *p; struct thpool_task *t;
p = (struct thpool *)param; _worker_sched: t = NULL; pthread_mutex_lock(&p->sched_m); while (p->t_front == NULL) { if (p->state == 1) { break; } pthread_cond_wait(&p->sched_c, &p->sched_m); } if (p->state != 1) { t = p->t_front; p->t_front = t->next; } pthread_mutex_unlock(&p->sched_m); if (t != NULL) { t->taskfn(t->param); } if (p->state == 0) { goto _worker_sched; } return NULL; }
struct thpool *thpool_create(int workers) { struct thpool *p; pthread_t t; int i;
p = (struct thpool *)malloc(sizeof(struct thpool) + sizeof(pthread_t) * (workers - 1)); p->state = 0; p->w_nums = 0; p->t_front = NULL; p->t_rear = NULL; pthread_mutex_init(&p->sched_m, NULL); pthread_cond_init(&p->sched_c, NULL); for (i = 0; i < workers; i++) { if (pthread_create(&t, NULL, _worker_proc, p) != 0) { break; } p->w[i] = t; p->w_nums++; } if (p->w_nums != workers) { thpool_destroy(p); p = NULL; } return p; }
void thpool_destroy(struct thpool *p) { int i; struct thpool_task *t; struct thpool_task *tt;
p->state = 1; pthread_mutex_lock(&p->sched_m); t = p->t_front; p->t_front = NULL; p->t_rear = NULL; pthread_mutex_unlock(&p->sched_m); pthread_cond_broadcast(&p->sched_c); while (t != NULL) { tt = t->next; free(t); t = tt; } for (i = 0; i < p->w_nums; i++) { pthread_join(p->w[i], NULL); } free(p); }
void thpool_start(struct thpool *p, thpool_taskfn taskfn, void *param) { struct thpool_task *t;
t = (struct thpool_task *)malloc(sizeof(struct thpool_task)); t->taskfn = taskfn; t->param = param; t->next = NULL; pthread_mutex_lock(&p->sched_m); if (p->t_front == NULL) { p->t_front = p->t_rear = t; } else { p->t_rear->next = t; p->t_rear = t; } pthread_mutex_unlock(&p->sched_m); pthread_cond_signal(&p->sched_c); }
|