# HG changeset patch # User Taylor R Campbell # Date 1599431734 0 # Sun Sep 06 22:35:34 2020 +0000 # Branch trunk # Node ID 023a9d854839f2f9f91cc2e06fdf8d26fad884d7 # Parent a41b816fb00bfc9c3993c1837df9aac30ae5b9fa # EXP-Topic riastradh-wg workqueue: Switch from direct use of kthread(9) to threadpool(9). Should save a lot of lwps for seldom-used workqueues in the system. While here, lift unnecessary restrictions on workqueue_wait: allow multiple concurrent waits at a time, and allow enqueueing work at the same time (as long as it's not the work we're waiting for). diff -r a41b816fb00b -r 023a9d854839 share/man/man9/workqueue.9 --- a/share/man/man9/workqueue.9 Sun Sep 06 22:38:38 2020 +0000 +++ b/share/man/man9/workqueue.9 Sun Sep 06 22:35:34 2020 +0000 @@ -133,11 +133,11 @@ waits for a specified work on the workqueue .Fa wq to finish. -The caller must ensure that no new work will be enqueued to the workqueue -beforehand. -Note that if the workqueue is -.Dv WQ_PERCPU , -the caller can enqueue a new work to another queue other than the waiting queue. +The caller must ensure that +.Fa wk +will not be enqueued to the workqueue again until after +.Fn workqueue_wait +returns. .Pp .\" - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - .Fn workqueue_destroy diff -r a41b816fb00b -r 023a9d854839 sys/kern/subr_workqueue.c --- a/sys/kern/subr_workqueue.c Sun Sep 06 22:38:38 2020 +0000 +++ b/sys/kern/subr_workqueue.c Sun Sep 06 22:35:34 2020 +0000 @@ -30,15 +30,17 @@ __KERNEL_RCSID(0, "$NetBSD: subr_workqueue.c,v 1.38 2020/08/01 02:14:43 riastradh Exp $"); #include +#include #include -#include +#include #include -#include +#include +#include #include +#include +#include +#include #include -#include -#include -#include typedef struct work_impl { SIMPLEQ_ENTRY(work_impl) wk_entry; @@ -48,11 +50,12 @@ SIMPLEQ_HEAD(workqhead, work_impl); struct workqueue_queue { kmutex_t q_mutex; - kcondvar_t q_cv; struct workqhead q_queue_pending; struct workqhead q_queue_running; + struct threadpool_job q_job; + kcondvar_t q_cv; /* workqueue_wait */ lwp_t *q_worker; - work_impl_t *q_waiter; + struct workqueue *q_wq; }; struct workqueue { @@ -62,45 +65,23 @@ struct workqueue { char wq_name[MAXCOMLEN]; pri_t wq_prio; - void *wq_ptr; + int wq_ipl; + union { + struct threadpool_percpu *wq_tp_percpu; + struct threadpool *wq_tp_global; + }; + union { + struct percpu *wq_percpu; /* struct workqueue_queue * */ + struct workqueue_queue *wq_global; + }; }; -#define WQ_SIZE (roundup2(sizeof(struct workqueue), coherency_unit)) -#define WQ_QUEUE_SIZE (roundup2(sizeof(struct workqueue_queue), coherency_unit)) - -#define POISON 0xaabbccdd - -static size_t -workqueue_size(int flags) -{ - - return WQ_SIZE - + ((flags & WQ_PERCPU) != 0 ? ncpu : 1) * WQ_QUEUE_SIZE - + coherency_unit; -} - -static struct workqueue_queue * -workqueue_queue_lookup(struct workqueue *wq, struct cpu_info *ci) -{ - u_int idx = 0; - - if (wq->wq_flags & WQ_PERCPU) { - idx = ci ? cpu_index(ci) : cpu_index(curcpu()); - } - - return (void *)((uintptr_t)(wq) + WQ_SIZE + (idx * WQ_QUEUE_SIZE)); -} - static void workqueue_runlist(struct workqueue *wq, struct workqhead *list) { work_impl_t *wk; work_impl_t *next; - /* - * note that "list" is not a complete SIMPLEQ. - */ - for (wk = SIMPLEQ_FIRST(list); wk != NULL; wk = next) { next = SIMPLEQ_NEXT(wk, wk_entry); (*wq->wq_func)((void *)wk, wq->wq_arg); @@ -108,43 +89,33 @@ workqueue_runlist(struct workqueue *wq, } static void -workqueue_worker(void *cookie) +workqueue_worker(struct threadpool_job *job) { - struct workqueue *wq = cookie; - struct workqueue_queue *q; - int s; + struct workqueue_queue *q = container_of(job, struct workqueue_queue, + q_job); + struct workqueue *wq = q->q_wq; + int s, fpu = wq->wq_flags & WQ_FPU; - /* find the workqueue of this kthread */ - q = workqueue_queue_lookup(wq, curlwp->l_cpu); - - if (wq->wq_flags & WQ_FPU) + if (fpu) s = kthread_fpu_enter(); - for (;;) { - /* - * we violate abstraction of SIMPLEQ. - */ - - mutex_enter(&q->q_mutex); - while (SIMPLEQ_EMPTY(&q->q_queue_pending)) - cv_wait(&q->q_cv, &q->q_mutex); - KASSERT(SIMPLEQ_EMPTY(&q->q_queue_running)); - q->q_queue_running.sqh_first = - q->q_queue_pending.sqh_first; /* XXX */ - SIMPLEQ_INIT(&q->q_queue_pending); + mutex_enter(&q->q_mutex); + KASSERT(q->q_worker == NULL); + q->q_worker = curlwp; + KASSERT(SIMPLEQ_EMPTY(&q->q_queue_running)); + while (!SIMPLEQ_EMPTY(&q->q_queue_pending)) { + SIMPLEQ_CONCAT(&q->q_queue_running, &q->q_queue_pending); mutex_exit(&q->q_mutex); - workqueue_runlist(wq, &q->q_queue_running); - mutex_enter(&q->q_mutex); KASSERT(!SIMPLEQ_EMPTY(&q->q_queue_running)); SIMPLEQ_INIT(&q->q_queue_running); - if (__predict_false(q->q_waiter != NULL)) { - /* Wake up workqueue_wait */ - cv_signal(&q->q_cv); - } - mutex_exit(&q->q_mutex); + /* Wake up workqueue_wait */ + cv_broadcast(&q->q_cv); } - if (wq->wq_flags & WQ_FPU) + KASSERT(q->q_worker == curlwp); + q->q_worker = NULL; + mutex_exit(&q->q_mutex); + if (fpu) kthread_fpu_exit(s); } @@ -158,83 +129,77 @@ workqueue_init(struct workqueue *wq, con strncpy(wq->wq_name, name, sizeof(wq->wq_name)); wq->wq_prio = prio; + wq->wq_ipl = ipl; wq->wq_func = callback_func; wq->wq_arg = callback_arg; } -static int -workqueue_initqueue(struct workqueue *wq, struct workqueue_queue *q, - int ipl, struct cpu_info *ci) +static void +workqueue_initqueue(struct workqueue *wq, struct workqueue_queue **qp, + struct cpu_info *ci) { - int error, ktf; + struct workqueue_queue *q; - KASSERT(q->q_worker == NULL); + *qp = q = kmem_zalloc(sizeof(*q), KM_SLEEP); - mutex_init(&q->q_mutex, MUTEX_DEFAULT, ipl); + mutex_init(&q->q_mutex, MUTEX_DEFAULT, wq->wq_ipl); cv_init(&q->q_cv, wq->wq_name); SIMPLEQ_INIT(&q->q_queue_pending); SIMPLEQ_INIT(&q->q_queue_running); - ktf = ((wq->wq_flags & WQ_MPSAFE) != 0 ? KTHREAD_MPSAFE : 0); - if (wq->wq_prio < PRI_KERNEL) - ktf |= KTHREAD_TS; - if (ci) { - error = kthread_create(wq->wq_prio, ktf, ci, workqueue_worker, - wq, &q->q_worker, "%s/%u", wq->wq_name, ci->ci_index); + if (wq->wq_flags & WQ_PERCPU) { + threadpool_job_init(&q->q_job, workqueue_worker, &q->q_mutex, + "%s/%d", wq->wq_name, cpu_index(ci)); } else { - error = kthread_create(wq->wq_prio, ktf, ci, workqueue_worker, - wq, &q->q_worker, "%s", wq->wq_name); - } - if (error != 0) { - mutex_destroy(&q->q_mutex); - cv_destroy(&q->q_cv); - KASSERT(q->q_worker == NULL); + threadpool_job_init(&q->q_job, workqueue_worker, &q->q_mutex, + "%s", wq->wq_name); } - return error; -} - -struct workqueue_exitargs { - work_impl_t wqe_wk; - struct workqueue_queue *wqe_q; -}; - -static void -workqueue_exit(struct work *wk, void *arg) -{ - struct workqueue_exitargs *wqe = (void *)wk; - struct workqueue_queue *q = wqe->wqe_q; - - /* - * only competition at this point is workqueue_finiqueue. - */ - - KASSERT(q->q_worker == curlwp); - KASSERT(SIMPLEQ_EMPTY(&q->q_queue_pending)); - mutex_enter(&q->q_mutex); - q->q_worker = NULL; - cv_signal(&q->q_cv); - mutex_exit(&q->q_mutex); - kthread_exit(0); + q->q_wq = wq; } static void -workqueue_finiqueue(struct workqueue *wq, struct workqueue_queue *q) +workqueue_finiqueue(struct workqueue *wq, struct workqueue_queue **qp, + struct cpu_info *ci) { - struct workqueue_exitargs wqe; + struct workqueue_queue *q = *qp; + struct threadpool *tp; - KASSERT(wq->wq_func == workqueue_exit); + mutex_enter(&q->q_mutex); + if (wq->wq_flags & WQ_PERCPU) { + tp = threadpool_percpu_ref_remote(wq->wq_tp_percpu, ci); + } else { + KASSERT(ci == NULL); + tp = wq->wq_tp_global; + } + threadpool_cancel_job(tp, &q->q_job); + mutex_exit(&q->q_mutex); - wqe.wqe_q = q; + KASSERT(q->q_wq == wq); + threadpool_job_destroy(&q->q_job); + KASSERT(SIMPLEQ_EMPTY(&q->q_queue_running)); KASSERT(SIMPLEQ_EMPTY(&q->q_queue_pending)); - KASSERT(q->q_worker != NULL); - mutex_enter(&q->q_mutex); - SIMPLEQ_INSERT_TAIL(&q->q_queue_pending, &wqe.wqe_wk, wk_entry); - cv_signal(&q->q_cv); - while (q->q_worker != NULL) { - cv_wait(&q->q_cv, &q->q_mutex); - } - mutex_exit(&q->q_mutex); + cv_destroy(&q->q_cv); mutex_destroy(&q->q_mutex); - cv_destroy(&q->q_cv); + + kmem_free(q, sizeof(*q)); + *qp = NULL; +} + +static void +workqueue_init_cpu(void *vqp, void *vwq, struct cpu_info *ci) +{ + struct workqueue_queue **qp = vqp; + struct workqueue *wq = vwq; + + workqueue_initqueue(wq, qp, ci); +} + +static void +workqueue_fini_cpu(void *vqp, void *vwq, struct cpu_info *ci) +{ + struct workqueue_queue **qp = vqp; + struct workqueue *wq = vwq; + + workqueue_finiqueue(wq, qp, ci); } /* --- */ @@ -245,43 +210,32 @@ workqueue_create(struct workqueue **wqp, pri_t prio, int ipl, int flags) { struct workqueue *wq; - struct workqueue_queue *q; - void *ptr; - int error = 0; + int error; CTASSERT(sizeof(work_impl_t) <= sizeof(struct work)); - ptr = kmem_zalloc(workqueue_size(flags), KM_SLEEP); - wq = (void *)roundup2((uintptr_t)ptr, coherency_unit); - wq->wq_ptr = ptr; + wq = kmem_zalloc(sizeof(*wq), KM_SLEEP); wq->wq_flags = flags; workqueue_init(wq, name, callback_func, callback_arg, prio, ipl); if (flags & WQ_PERCPU) { - struct cpu_info *ci; - CPU_INFO_ITERATOR cii; - - /* create the work-queue for each CPU */ - for (CPU_INFO_FOREACH(cii, ci)) { - q = workqueue_queue_lookup(wq, ci); - error = workqueue_initqueue(wq, q, ipl, ci); - if (error) { - break; - } - } + error = threadpool_percpu_get(&wq->wq_tp_percpu, prio); + if (error) + goto fail; + wq->wq_percpu = percpu_create(sizeof(struct workqueue_queue *), + workqueue_init_cpu, workqueue_fini_cpu, wq); } else { - /* initialize a work-queue */ - q = workqueue_queue_lookup(wq, NULL); - error = workqueue_initqueue(wq, q, ipl, NULL); + error = threadpool_get(&wq->wq_tp_global, prio); + if (error) + goto fail; + workqueue_initqueue(wq, &wq->wq_global, NULL); } - if (error != 0) { - workqueue_destroy(wq); - } else { - *wqp = wq; - } + *wqp = wq; + return 0; +fail: kmem_free(wq, sizeof(*wq)); return error; } @@ -306,13 +260,9 @@ workqueue_q_wait(struct workqueue_queue found: if (wk != NULL) { found = true; - KASSERT(q->q_waiter == NULL); - q->q_waiter = wk; cv_wait(&q->q_cv, &q->q_mutex); goto again; } - if (q->q_waiter != NULL) - q->q_waiter = NULL; out: mutex_exit(&q->q_mutex); @@ -328,39 +278,45 @@ workqueue_q_wait(struct workqueue_queue void workqueue_wait(struct workqueue *wq, struct work *wk) { - struct workqueue_queue *q; + struct workqueue_queue *q, **qp; bool found; if (ISSET(wq->wq_flags, WQ_PERCPU)) { struct cpu_info *ci; CPU_INFO_ITERATOR cii; + for (CPU_INFO_FOREACH(cii, ci)) { - q = workqueue_queue_lookup(wq, ci); + /* + * Bind to the CPU _and_ prevent percpu-swap + * xcalls from completing. This is safe as + * long as we don't sleep. + */ + kpreempt_disable(); + qp = percpu_getptr_remote(wq->wq_percpu, ci); + q = *qp; + kpreempt_enable(); found = workqueue_q_wait(q, (work_impl_t *)wk); if (found) break; } } else { - q = workqueue_queue_lookup(wq, NULL); - (void) workqueue_q_wait(q, (work_impl_t *)wk); + (void) workqueue_q_wait(wq->wq_global, (work_impl_t *)wk); } } void workqueue_destroy(struct workqueue *wq) { - struct workqueue_queue *q; - struct cpu_info *ci; - CPU_INFO_ITERATOR cii; - wq->wq_func = workqueue_exit; - for (CPU_INFO_FOREACH(cii, ci)) { - q = workqueue_queue_lookup(wq, ci); - if (q->q_worker != NULL) { - workqueue_finiqueue(wq, q); - } + if (wq->wq_flags & WQ_PERCPU) { + percpu_free(wq->wq_percpu, sizeof(struct workqueue_queue *)); + threadpool_percpu_put(wq->wq_tp_percpu, wq->wq_prio); + } else { + workqueue_finiqueue(wq, &wq->wq_global, NULL); + threadpool_put(wq->wq_tp_global, wq->wq_prio); } - kmem_free(wq->wq_ptr, workqueue_size(wq->wq_flags)); + + kmem_free(wq, sizeof(*wq)); } #ifdef DEBUG @@ -379,18 +335,35 @@ workqueue_check_duplication(struct workq void workqueue_enqueue(struct workqueue *wq, struct work *wk0, struct cpu_info *ci) { - struct workqueue_queue *q; + struct workqueue_queue *q, **qp; + struct threadpool *tp; work_impl_t *wk = (void *)wk0; KASSERT(wq->wq_flags & WQ_PERCPU || ci == NULL); - q = workqueue_queue_lookup(wq, ci); + + if (wq->wq_flags & WQ_PERCPU) { + /* + * Bind to the CPU _and_ block percpu-swap xcalls from + * completing. This is safe as long as we don't sleep. + */ + kpreempt_disable(); + if (ci == NULL) + ci = curcpu(); + qp = percpu_getptr_remote(wq->wq_percpu, ci); + q = *qp; + tp = threadpool_percpu_ref_remote(wq->wq_tp_percpu, ci); + kpreempt_enable(); + } else { + KASSERT(ci == NULL); + q = wq->wq_global; + tp = wq->wq_tp_global; + } mutex_enter(&q->q_mutex); - KASSERT(q->q_waiter == NULL); #ifdef DEBUG workqueue_check_duplication(q, wk); #endif SIMPLEQ_INSERT_TAIL(&q->q_queue_pending, wk, wk_entry); - cv_signal(&q->q_cv); + threadpool_schedule_job(tp, &q->q_job); mutex_exit(&q->q_mutex); }