diff options
Diffstat (limited to 'module/os/freebsd/spl/spl_taskq.c')
-rw-r--r-- | module/os/freebsd/spl/spl_taskq.c | 191 |
1 files changed, 137 insertions, 54 deletions
diff --git a/module/os/freebsd/spl/spl_taskq.c b/module/os/freebsd/spl/spl_taskq.c index ba22c77b69c3..67c0a4c94134 100644 --- a/module/os/freebsd/spl/spl_taskq.c +++ b/module/os/freebsd/spl/spl_taskq.c @@ -26,12 +26,7 @@ * SUCH DAMAGE. */ -#include <sys/cdefs.h> -__FBSDID("$FreeBSD$"); - #include <sys/param.h> -#include <sys/ck.h> -#include <sys/epoch.h> #include <sys/kernel.h> #include <sys/kmem.h> #include <sys/lock.h> @@ -66,11 +61,9 @@ taskq_t *dynamic_taskq = NULL; proc_t *system_proc; -extern int uma_align_cache; - static MALLOC_DEFINE(M_TASKQ, "taskq", "taskq structures"); -static CK_LIST_HEAD(tqenthashhead, taskq_ent) *tqenthashtbl; +static LIST_HEAD(tqenthashhead, taskq_ent) *tqenthashtbl; static unsigned long tqenthash; static unsigned long tqenthashlock; static struct sx *tqenthashtbl_lock; @@ -80,8 +73,8 @@ static taskqid_t tqidnext; #define TQIDHASH(tqid) (&tqenthashtbl[(tqid) & tqenthash]) #define TQIDHASHLOCK(tqid) (&tqenthashtbl_lock[((tqid) & tqenthashlock)]) +#define NORMAL_TASK 0 #define TIMEOUT_TASK 1 -#define NORMAL_TASK 2 static void system_taskq_init(void *arg) @@ -121,7 +114,7 @@ system_taskq_fini(void *arg) for (i = 0; i < tqenthashlock + 1; i++) sx_destroy(&tqenthashtbl_lock[i]); for (i = 0; i < tqenthash + 1; i++) - VERIFY(CK_LIST_EMPTY(&tqenthashtbl[i])); + VERIFY(LIST_EMPTY(&tqenthashtbl[i])); free(tqenthashtbl_lock, M_TASKQ); free(tqenthashtbl, M_TASKQ); } @@ -162,27 +155,27 @@ taskq_lookup(taskqid_t tqid) { taskq_ent_t *ent = NULL; - sx_xlock(TQIDHASHLOCK(tqid)); - CK_LIST_FOREACH(ent, TQIDHASH(tqid), tqent_hash) { + if (tqid == 0) + return (NULL); + sx_slock(TQIDHASHLOCK(tqid)); + LIST_FOREACH(ent, TQIDHASH(tqid), tqent_hash) { if (ent->tqent_id == tqid) break; } if (ent != NULL) refcount_acquire(&ent->tqent_rc); - sx_xunlock(TQIDHASHLOCK(tqid)); + sx_sunlock(TQIDHASHLOCK(tqid)); return (ent); } static taskqid_t taskq_insert(taskq_ent_t *ent) { - taskqid_t tqid; + taskqid_t tqid = __taskq_genid(); - tqid = __taskq_genid(); ent->tqent_id = tqid; - ent->tqent_registered = B_TRUE; sx_xlock(TQIDHASHLOCK(tqid)); - CK_LIST_INSERT_HEAD(TQIDHASH(tqid), ent, tqent_hash); + LIST_INSERT_HEAD(TQIDHASH(tqid), ent, tqent_hash); sx_xunlock(TQIDHASHLOCK(tqid)); return (tqid); } @@ -192,13 +185,14 @@ taskq_remove(taskq_ent_t *ent) { taskqid_t tqid = ent->tqent_id; - if (!ent->tqent_registered) + if (tqid == 0) return; - sx_xlock(TQIDHASHLOCK(tqid)); - CK_LIST_REMOVE(ent, tqent_hash); + if (ent->tqent_id != 0) { + LIST_REMOVE(ent, tqent_hash); + ent->tqent_id = 0; + } sx_xunlock(TQIDHASHLOCK(tqid)); - ent->tqent_registered = B_FALSE; } static void @@ -223,6 +217,7 @@ taskq_create_impl(const char *name, int nthreads, pri_t pri, nthreads = MAX((mp_ncpus * nthreads) / 100, 1); tq = kmem_alloc(sizeof (*tq), KM_SLEEP); + tq->tq_nthreads = nthreads; tq->tq_queue = taskqueue_create(name, M_WAITOK, taskqueue_thread_enqueue, &tq->tq_queue); taskqueue_set_callback(tq->tq_queue, TASKQUEUE_CALLBACK_TYPE_INIT, @@ -257,6 +252,87 @@ taskq_destroy(taskq_t *tq) kmem_free(tq, sizeof (*tq)); } +static void taskq_sync_assign(void *arg); + +typedef struct taskq_sync_arg { + kthread_t *tqa_thread; + kcondvar_t tqa_cv; + kmutex_t tqa_lock; + int tqa_ready; +} taskq_sync_arg_t; + +static void +taskq_sync_assign(void *arg) +{ + taskq_sync_arg_t *tqa = arg; + + mutex_enter(&tqa->tqa_lock); + tqa->tqa_thread = curthread; + tqa->tqa_ready = 1; + cv_signal(&tqa->tqa_cv); + while (tqa->tqa_ready == 1) + cv_wait(&tqa->tqa_cv, &tqa->tqa_lock); + mutex_exit(&tqa->tqa_lock); +} + +/* + * Create a taskq with a specified number of pool threads. Allocate + * and return an array of nthreads kthread_t pointers, one for each + * thread in the pool. The array is not ordered and must be freed + * by the caller. + */ +taskq_t * +taskq_create_synced(const char *name, int nthreads, pri_t pri, + int minalloc, int maxalloc, uint_t flags, kthread_t ***ktpp) +{ + taskq_t *tq; + taskq_sync_arg_t *tqs = kmem_zalloc(sizeof (*tqs) * nthreads, KM_SLEEP); + kthread_t **kthreads = kmem_zalloc(sizeof (*kthreads) * nthreads, + KM_SLEEP); + + flags &= ~(TASKQ_DYNAMIC | TASKQ_THREADS_CPU_PCT | TASKQ_DC_BATCH); + + tq = taskq_create(name, nthreads, minclsyspri, nthreads, INT_MAX, + flags | TASKQ_PREPOPULATE); + VERIFY(tq != NULL); + VERIFY(tq->tq_nthreads == nthreads); + + /* spawn all syncthreads */ + for (int i = 0; i < nthreads; i++) { + cv_init(&tqs[i].tqa_cv, NULL, CV_DEFAULT, NULL); + mutex_init(&tqs[i].tqa_lock, NULL, MUTEX_DEFAULT, NULL); + (void) taskq_dispatch(tq, taskq_sync_assign, + &tqs[i], TQ_FRONT); + } + + /* wait on all syncthreads to start */ + for (int i = 0; i < nthreads; i++) { + mutex_enter(&tqs[i].tqa_lock); + while (tqs[i].tqa_ready == 0) + cv_wait(&tqs[i].tqa_cv, &tqs[i].tqa_lock); + mutex_exit(&tqs[i].tqa_lock); + } + + /* let all syncthreads resume, finish */ + for (int i = 0; i < nthreads; i++) { + mutex_enter(&tqs[i].tqa_lock); + tqs[i].tqa_ready = 2; + cv_broadcast(&tqs[i].tqa_cv); + mutex_exit(&tqs[i].tqa_lock); + } + taskq_wait(tq); + + for (int i = 0; i < nthreads; i++) { + kthreads[i] = tqs[i].tqa_thread; + mutex_destroy(&tqs[i].tqa_lock); + cv_destroy(&tqs[i].tqa_cv); + } + kmem_free(tqs, sizeof (*tqs) * nthreads); + + *ktpp = kthreads; + return (tq); +} + int taskq_member(taskq_t *tq, kthread_t *thread) { @@ -285,21 +361,22 @@ taskq_cancel_id(taskq_t *tq, taskqid_t tid) int rc; taskq_ent_t *ent; - if (tid == 0) - return (0); - if ((ent = taskq_lookup(tid)) == NULL) return (0); - ent->tqent_cancelled = B_TRUE; - if (ent->tqent_type == TIMEOUT_TASK) { + if (ent->tqent_type == NORMAL_TASK) { + rc = taskqueue_cancel(tq->tq_queue, &ent->tqent_task, &pend); + if (rc == EBUSY) + taskqueue_drain(tq->tq_queue, &ent->tqent_task); + } else { rc = taskqueue_cancel_timeout(tq->tq_queue, &ent->tqent_timeout_task, &pend); - } else - rc = taskqueue_cancel(tq->tq_queue, &ent->tqent_task, &pend); - if (rc == EBUSY) { - taskqueue_drain(tq->tq_queue, &ent->tqent_task); - } else if (pend) { + if (rc == EBUSY) { + taskqueue_drain_timeout(tq->tq_queue, + &ent->tqent_timeout_task); + } + } + if (pend) { /* * Tasks normally free themselves when run, but here the task * was cancelled so it did not free itself. @@ -312,12 +389,13 @@ taskq_cancel_id(taskq_t *tq, taskqid_t tid) } static void -taskq_run(void *arg, int pending __unused) +taskq_run(void *arg, int pending) { taskq_ent_t *task = arg; - if (!task->tqent_cancelled) - task->tqent_func(task->tqent_arg); + if (pending == 0) + return; + task->tqent_func(task->tqent_arg); taskq_free(task); } @@ -345,7 +423,6 @@ taskq_dispatch_delay(taskq_t *tq, task_func_t func, void *arg, task->tqent_func = func; task->tqent_arg = arg; task->tqent_type = TIMEOUT_TASK; - task->tqent_cancelled = B_FALSE; refcount_init(&task->tqent_rc, 1); tqid = taskq_insert(task); TIMEOUT_TASK_INIT(tq->tq_queue, &task->tqent_timeout_task, 0, @@ -379,7 +456,6 @@ taskq_dispatch(taskq_t *tq, task_func_t func, void *arg, uint_t flags) refcount_init(&task->tqent_rc, 1); task->tqent_func = func; task->tqent_arg = arg; - task->tqent_cancelled = B_FALSE; task->tqent_type = NORMAL_TASK; tqid = taskq_insert(task); TASK_INIT(&task->tqent_task, prio, taskq_run, task); @@ -388,10 +464,12 @@ taskq_dispatch(taskq_t *tq, task_func_t func, void *arg, uint_t flags) } static void -taskq_run_ent(void *arg, int pending __unused) +taskq_run_ent(void *arg, int pending) { taskq_ent_t *task = arg; + if (pending == 0) + return; task->tqent_func(task->tqent_arg); } @@ -399,24 +477,34 @@ void taskq_dispatch_ent(taskq_t *tq, task_func_t func, void *arg, uint32_t flags, taskq_ent_t *task) { - int prio; - /* * If TQ_FRONT is given, we want higher priority for this task, so it * can go at the front of the queue. */ - prio = !!(flags & TQ_FRONT); - task->tqent_cancelled = B_FALSE; - task->tqent_registered = B_FALSE; - task->tqent_id = 0; + task->tqent_task.ta_priority = !!(flags & TQ_FRONT); task->tqent_func = func; task->tqent_arg = arg; - - TASK_INIT(&task->tqent_task, prio, taskq_run_ent, task); taskqueue_enqueue(tq->tq_queue, &task->tqent_task); } void +taskq_init_ent(taskq_ent_t *task) +{ + TASK_INIT(&task->tqent_task, 0, taskq_run_ent, task); + task->tqent_func = NULL; + task->tqent_arg = NULL; + task->tqent_id = 0; + task->tqent_type = NORMAL_TASK; + task->tqent_rc = 0; +} + +int +taskq_empty_ent(taskq_ent_t *task) +{ + return (task->tqent_task.ta_pending == 0); +} + +void taskq_wait(taskq_t *tq) { taskqueue_quiesce(tq->tq_queue); @@ -427,12 +515,13 @@ taskq_wait_id(taskq_t *tq, taskqid_t tid) { taskq_ent_t *ent; - if (tid == 0) - return; if ((ent = taskq_lookup(tid)) == NULL) return; - taskqueue_drain(tq->tq_queue, &ent->tqent_task); + if (ent->tqent_type == NORMAL_TASK) + taskqueue_drain(tq->tq_queue, &ent->tqent_task); + else + taskqueue_drain_timeout(tq->tq_queue, &ent->tqent_timeout_task); taskq_free(ent); } @@ -441,9 +530,3 @@ taskq_wait_outstanding(taskq_t *tq, taskqid_t id __unused) { taskqueue_drain_all(tq->tq_queue); } - -int -taskq_empty_ent(taskq_ent_t *t) -{ - return (t->tqent_task.ta_pending == 0); -} |