From 4f27864f12520ba2f03e83467a94bc5f13fb050f Mon Sep 17 00:00:00 2001 From: Nick Gasson Date: Sat, 5 Nov 2022 14:37:52 +0000 Subject: [PATCH] Simplify how worker threads poll work queues --- src/thread.c | 192 +++++++++++++++++++++------------------------------ 1 file changed, 77 insertions(+), 115 deletions(-) diff --git a/src/thread.c b/src/thread.c index 351882aa..fcdae1d5 100644 --- a/src/thread.c +++ b/src/thread.c @@ -37,7 +37,6 @@ #define MAX_THREADS 64 #define LOCK_SPINS 15 -#define MAX_ACTIVEQS 16 #define MIN_TAKE 8 #define PARKING_BAYS 64 @@ -136,20 +135,16 @@ typedef struct { abp_idx_t bot; } __attribute__((aligned(64))) threadq_t; -typedef enum { - IDLE, START, DEAD, -} workq_state_t; +typedef enum { IDLE, START } workq_state_t; struct _workq { workq_t *next; workq_state_t state; - nvc_lock_t lock; task_t *entryq; unsigned queuesz; unsigned wptr; unsigned rptr; - unsigned comp; - int activeidx; + unsigned comp; // TODO: stripe this across threads void *context; bool parallel; }; @@ -179,6 +174,14 @@ struct _nvc_thread { void *arg; }; +typedef struct { + nvc_lock_t lock; + task_t *tasks; + unsigned wptr; + unsigned rptr; + unsigned max; +} globalq_t; + static parking_bay_t parking_bays[PARKING_BAYS] = { [0 ... PARKING_BAYS - 1] = { PTHREAD_MUTEX_INITIALIZER, @@ -190,7 +193,7 @@ static nvc_thread_t *threads[MAX_THREADS]; static unsigned max_workers = 0; static int running_threads = 0; static bool should_stop = false; -static workq_t *activeqs[MAX_ACTIVEQS]; +static globalq_t globalq; #ifdef DEBUG static lock_stats_t lock_stats[MAX_THREADS]; @@ -403,7 +406,7 @@ static void thread_unpark(void *cookie, unpark_fn_t fn) } // Do not use pthread_cond_signal here as multiple threads parked in - // this bay here may be waiting on different cookies + // this bay may be waiting on different cookies PTHREAD_CHECK(pthread_cond_broadcast, &(bay->cond)); } @@ -568,15 +571,66 @@ static bool pop_top(threadq_t *tq, task_t *task) return atomic_cas(&tq->age.bits, old_age.bits, new_age.bits); } +static void globalq_put(globalq_t *gq, const task_t *tasks, size_t count) +{ + SCOPED_LOCK(gq->lock); + + if (gq->wptr == gq->rptr) + gq->wptr = gq->rptr = 0; + + if (gq->wptr + count > gq->max) { + gq->max = next_power_of_2(gq->wptr + count); + gq->tasks = xrealloc_array(gq->tasks, gq->max, sizeof(task_t)); + } + + memcpy(gq->tasks + gq->wptr, tasks, count * sizeof(task_t)); + gq->wptr += count; +} + +static size_t globalq_take(globalq_t *gq, threadq_t *tq) +{ + SCOPED_LOCK(gq->lock); + + if (gq->wptr == gq->rptr) + return 0; + + const int remain = gq->wptr - gq->rptr; + const int share = gq->wptr / relaxed_load(&running_threads); + const int take = MIN(remain, MAX(MIN_TAKE, MIN(THREADQ_SIZE, share))); + const int from = gq->rptr; + + gq->rptr += take; + + push_bot(tq, gq->tasks + from, take); + return take; +} + +static bool globalq_poll(globalq_t *gq, threadq_t *tq) +{ + int ntasks; + if ((ntasks = globalq_take(gq, tq))) { + task_t task; + int comp = 0; + for (; pop_bot(tq, &task); comp++) { + (*task.fn)(task.context, task.arg); + atomic_add(&(task.workq->comp), 1); + } + + WORKQ_EVENT(comp, comp); + return true; + } + else + return false; +} + workq_t *workq_new(void *context) { if (my_thread->kind != MAIN_THREAD) fatal_trace("work queues can only be created by the main thread"); workq_t *wq = xcalloc(sizeof(workq_t)); - wq->state = IDLE; - wq->activeidx = -1; - wq->context = context; + wq->state = IDLE; + wq->context = context; return wq; } @@ -586,13 +640,10 @@ void workq_free(workq_t *wq) if (my_thread->kind != MAIN_THREAD) fatal_trace("work queues can only be freed by the main thread"); - { - SCOPED_LOCK(wq->lock); - assert(wq->state == IDLE); - wq->state = DEAD; - } + assert(wq->state == IDLE); - // TODO: free at safe point + free(wq->entryq); + free(wq); } void workq_do(workq_t *wq, task_fn_t fn, void *arg) @@ -612,38 +663,10 @@ void workq_do(workq_t *wq, task_fn_t fn, void *arg) void workq_scan(workq_t *wq, scan_fn_t fn, void *arg) { - SCOPED_LOCK(wq->lock); - for (int i = wq->rptr; i < wq->wptr; i++) (*fn)(wq->context, wq->entryq[i].arg, arg); } -static size_t workq_take(workq_t *wq, threadq_t *tq) -{ - int from, take; - { - SCOPED_LOCK(wq->lock); - - if (wq->state != START) - return 0; - else if (wq->wptr == wq->rptr) { - assert(wq->activeidx != -1); - atomic_cas(&(activeqs[wq->activeidx]), wq, NULL); - return 0; - } - - const int remain = wq->wptr - wq->rptr; - const int share = wq->wptr / relaxed_load(&running_threads); - take = MIN(remain, MAX(MIN_TAKE, MIN(THREADQ_SIZE, share))); - from = wq->rptr; - - wq->rptr += take; - } - - push_bot(tq, wq->entryq + from, take); - return take; -} - static int estimate_depth(threadq_t *tq) { if (tq == NULL) return 0; @@ -665,23 +688,6 @@ static threadq_t *get_thread_queue(int id) return &(t->queue); } -static bool workq_poll(workq_t *wq, threadq_t *tq) -{ - int ntasks; - if ((ntasks = workq_take(wq, tq))) { - task_t task; - int comp = 0; - for (; pop_bot(tq, &task); comp++) - (*task.fn)(task.context, task.arg); - - WORKQ_EVENT(comp, comp); - atomic_add(&(wq->comp), comp); - return true; - } - else - return false; -} - static bool steal_task(void) { int nthreads = relaxed_load(&running_threads), victim; @@ -746,22 +752,8 @@ static void *worker_thread(void *arg) mspace_stack_limit(MSPACE_CURRENT_FRAME); do { - const int bias = rand(); - bool did_work = false; - for (int i = 0; i < MAX_ACTIVEQS; i++) { - const int idx = (i + bias) % MAX_ACTIVEQS; - workq_t *wq = atomic_load(&(activeqs[idx])); - if (wq == NULL) - continue; - - did_work |= workq_poll(wq, &(my_thread->queue)); - } - - if (!did_work) - did_work |= steal_task(); - - if (did_work) - my_thread->spins = 0; + if (globalq_poll(&globalq, &(my_thread->queue)) || steal_task()) + my_thread->spins = 0; // Did work else maybe_backoff(); @@ -798,24 +790,10 @@ void workq_start(workq_t *wq) if (wq->parallel) { create_workers(wq->wptr); - SCOPED_LOCK(wq->lock); - - int idx = 0; - for (; idx < MAX_ACTIVEQS; idx++) { - if (relaxed_load(&activeqs[idx]) != NULL) - continue; - else if (atomic_cas(&activeqs[idx], NULL, wq)) - break; - } - - if (unlikely(idx >= MAX_ACTIVEQS)) - fatal_trace("too many active work queues"); + globalq_put(&globalq, wq->entryq, wq->wptr); assert(wq->state == IDLE); wq->state = START; - - assert(wq->activeidx == -1); - wq->activeidx = idx; } else { assert(wq->state == IDLE); @@ -831,29 +809,13 @@ void workq_start(workq_t *wq) static void workq_parallel_drain(workq_t *wq) { - while (workq_poll(wq, &(my_thread->queue))) - ; - - for (;;) { - { - SCOPED_LOCK(wq->lock); - assert(wq->state == START); - - if (atomic_load(&wq->comp) == wq->wptr) { - wq->state = IDLE; - wq->wptr = wq->rptr = wq->comp = 0; - - assert(wq->activeidx != -1); - assert(relaxed_load(&activeqs[wq->activeidx]) != wq); - - wq->activeidx = -1; - break; - } - } - - if (!steal_task()) + while (atomic_load(&wq->comp) < wq->wptr) { + if (!globalq_poll(&globalq, &(my_thread->queue)) || !steal_task()) spin_wait(); } + + wq->state = IDLE; + wq->wptr = wq->rptr = wq->comp = 0; } void workq_drain(workq_t *wq) -- 2.39.2