aboutsummaryrefslogtreecommitdiff
path: root/module/os/freebsd/spl/spl_taskq.c
diff options
context:
space:
mode:
Diffstat (limited to 'module/os/freebsd/spl/spl_taskq.c')
-rw-r--r--module/os/freebsd/spl/spl_taskq.c191
1 files changed, 137 insertions, 54 deletions
diff --git a/module/os/freebsd/spl/spl_taskq.c b/module/os/freebsd/spl/spl_taskq.c
index ba22c77b69c3..67c0a4c94134 100644
--- a/module/os/freebsd/spl/spl_taskq.c
+++ b/module/os/freebsd/spl/spl_taskq.c
@@ -26,12 +26,7 @@
* SUCH DAMAGE.
*/
-#include <sys/cdefs.h>
-__FBSDID("$FreeBSD$");
-
#include <sys/param.h>
-#include <sys/ck.h>
-#include <sys/epoch.h>
#include <sys/kernel.h>
#include <sys/kmem.h>
#include <sys/lock.h>
@@ -66,11 +61,9 @@ taskq_t *dynamic_taskq = NULL;
proc_t *system_proc;
-extern int uma_align_cache;
-
static MALLOC_DEFINE(M_TASKQ, "taskq", "taskq structures");
-static CK_LIST_HEAD(tqenthashhead, taskq_ent) *tqenthashtbl;
+static LIST_HEAD(tqenthashhead, taskq_ent) *tqenthashtbl;
static unsigned long tqenthash;
static unsigned long tqenthashlock;
static struct sx *tqenthashtbl_lock;
@@ -80,8 +73,8 @@ static taskqid_t tqidnext;
#define TQIDHASH(tqid) (&tqenthashtbl[(tqid) & tqenthash])
#define TQIDHASHLOCK(tqid) (&tqenthashtbl_lock[((tqid) & tqenthashlock)])
+#define NORMAL_TASK 0
#define TIMEOUT_TASK 1
-#define NORMAL_TASK 2
static void
system_taskq_init(void *arg)
@@ -121,7 +114,7 @@ system_taskq_fini(void *arg)
for (i = 0; i < tqenthashlock + 1; i++)
sx_destroy(&tqenthashtbl_lock[i]);
for (i = 0; i < tqenthash + 1; i++)
- VERIFY(CK_LIST_EMPTY(&tqenthashtbl[i]));
+ VERIFY(LIST_EMPTY(&tqenthashtbl[i]));
free(tqenthashtbl_lock, M_TASKQ);
free(tqenthashtbl, M_TASKQ);
}
@@ -162,27 +155,27 @@ taskq_lookup(taskqid_t tqid)
{
taskq_ent_t *ent = NULL;
- sx_xlock(TQIDHASHLOCK(tqid));
- CK_LIST_FOREACH(ent, TQIDHASH(tqid), tqent_hash) {
+ if (tqid == 0)
+ return (NULL);
+ sx_slock(TQIDHASHLOCK(tqid));
+ LIST_FOREACH(ent, TQIDHASH(tqid), tqent_hash) {
if (ent->tqent_id == tqid)
break;
}
if (ent != NULL)
refcount_acquire(&ent->tqent_rc);
- sx_xunlock(TQIDHASHLOCK(tqid));
+ sx_sunlock(TQIDHASHLOCK(tqid));
return (ent);
}
static taskqid_t
taskq_insert(taskq_ent_t *ent)
{
- taskqid_t tqid;
+ taskqid_t tqid = __taskq_genid();
- tqid = __taskq_genid();
ent->tqent_id = tqid;
- ent->tqent_registered = B_TRUE;
sx_xlock(TQIDHASHLOCK(tqid));
- CK_LIST_INSERT_HEAD(TQIDHASH(tqid), ent, tqent_hash);
+ LIST_INSERT_HEAD(TQIDHASH(tqid), ent, tqent_hash);
sx_xunlock(TQIDHASHLOCK(tqid));
return (tqid);
}
@@ -192,13 +185,14 @@ taskq_remove(taskq_ent_t *ent)
{
taskqid_t tqid = ent->tqent_id;
- if (!ent->tqent_registered)
+ if (tqid == 0)
return;
-
sx_xlock(TQIDHASHLOCK(tqid));
- CK_LIST_REMOVE(ent, tqent_hash);
+ if (ent->tqent_id != 0) {
+ LIST_REMOVE(ent, tqent_hash);
+ ent->tqent_id = 0;
+ }
sx_xunlock(TQIDHASHLOCK(tqid));
- ent->tqent_registered = B_FALSE;
}
static void
@@ -223,6 +217,7 @@ taskq_create_impl(const char *name, int nthreads, pri_t pri,
nthreads = MAX((mp_ncpus * nthreads) / 100, 1);
tq = kmem_alloc(sizeof (*tq), KM_SLEEP);
+ tq->tq_nthreads = nthreads;
tq->tq_queue = taskqueue_create(name, M_WAITOK,
taskqueue_thread_enqueue, &tq->tq_queue);
taskqueue_set_callback(tq->tq_queue, TASKQUEUE_CALLBACK_TYPE_INIT,
@@ -257,6 +252,87 @@ taskq_destroy(taskq_t *tq)
kmem_free(tq, sizeof (*tq));
}
+static void taskq_sync_assign(void *arg);
+
+typedef struct taskq_sync_arg {
+ kthread_t *tqa_thread;
+ kcondvar_t tqa_cv;
+ kmutex_t tqa_lock;
+ int tqa_ready;
+} taskq_sync_arg_t;
+
+static void
+taskq_sync_assign(void *arg)
+{
+ taskq_sync_arg_t *tqa = arg;
+
+ mutex_enter(&tqa->tqa_lock);
+ tqa->tqa_thread = curthread;
+ tqa->tqa_ready = 1;
+ cv_signal(&tqa->tqa_cv);
+ while (tqa->tqa_ready == 1)
+ cv_wait(&tqa->tqa_cv, &tqa->tqa_lock);
+ mutex_exit(&tqa->tqa_lock);
+}
+
+/*
+ * Create a taskq with a specified number of pool threads. Allocate
+ * and return an array of nthreads kthread_t pointers, one for each
+ * thread in the pool. The array is not ordered and must be freed
+ * by the caller.
+ */
+taskq_t *
+taskq_create_synced(const char *name, int nthreads, pri_t pri,
+ int minalloc, int maxalloc, uint_t flags, kthread_t ***ktpp)
+{
+ taskq_t *tq;
+ taskq_sync_arg_t *tqs = kmem_zalloc(sizeof (*tqs) * nthreads, KM_SLEEP);
+ kthread_t **kthreads = kmem_zalloc(sizeof (*kthreads) * nthreads,
+ KM_SLEEP);
+
+ flags &= ~(TASKQ_DYNAMIC | TASKQ_THREADS_CPU_PCT | TASKQ_DC_BATCH);
+
+ tq = taskq_create(name, nthreads, minclsyspri, nthreads, INT_MAX,
+ flags | TASKQ_PREPOPULATE);
+ VERIFY(tq != NULL);
+ VERIFY(tq->tq_nthreads == nthreads);
+
+ /* spawn all syncthreads */
+ for (int i = 0; i < nthreads; i++) {
+ cv_init(&tqs[i].tqa_cv, NULL, CV_DEFAULT, NULL);
+ mutex_init(&tqs[i].tqa_lock, NULL, MUTEX_DEFAULT, NULL);
+ (void) taskq_dispatch(tq, taskq_sync_assign,
+ &tqs[i], TQ_FRONT);
+ }
+
+ /* wait on all syncthreads to start */
+ for (int i = 0; i < nthreads; i++) {
+ mutex_enter(&tqs[i].tqa_lock);
+ while (tqs[i].tqa_ready == 0)
+ cv_wait(&tqs[i].tqa_cv, &tqs[i].tqa_lock);
+ mutex_exit(&tqs[i].tqa_lock);
+ }
+
+ /* let all syncthreads resume, finish */
+ for (int i = 0; i < nthreads; i++) {
+ mutex_enter(&tqs[i].tqa_lock);
+ tqs[i].tqa_ready = 2;
+ cv_broadcast(&tqs[i].tqa_cv);
+ mutex_exit(&tqs[i].tqa_lock);
+ }
+ taskq_wait(tq);
+
+ for (int i = 0; i < nthreads; i++) {
+ kthreads[i] = tqs[i].tqa_thread;
+ mutex_destroy(&tqs[i].tqa_lock);
+ cv_destroy(&tqs[i].tqa_cv);
+ }
+ kmem_free(tqs, sizeof (*tqs) * nthreads);
+
+ *ktpp = kthreads;
+ return (tq);
+}
+
int
taskq_member(taskq_t *tq, kthread_t *thread)
{
@@ -285,21 +361,22 @@ taskq_cancel_id(taskq_t *tq, taskqid_t tid)
int rc;
taskq_ent_t *ent;
- if (tid == 0)
- return (0);
-
if ((ent = taskq_lookup(tid)) == NULL)
return (0);
- ent->tqent_cancelled = B_TRUE;
- if (ent->tqent_type == TIMEOUT_TASK) {
+ if (ent->tqent_type == NORMAL_TASK) {
+ rc = taskqueue_cancel(tq->tq_queue, &ent->tqent_task, &pend);
+ if (rc == EBUSY)
+ taskqueue_drain(tq->tq_queue, &ent->tqent_task);
+ } else {
rc = taskqueue_cancel_timeout(tq->tq_queue,
&ent->tqent_timeout_task, &pend);
- } else
- rc = taskqueue_cancel(tq->tq_queue, &ent->tqent_task, &pend);
- if (rc == EBUSY) {
- taskqueue_drain(tq->tq_queue, &ent->tqent_task);
- } else if (pend) {
+ if (rc == EBUSY) {
+ taskqueue_drain_timeout(tq->tq_queue,
+ &ent->tqent_timeout_task);
+ }
+ }
+ if (pend) {
/*
* Tasks normally free themselves when run, but here the task
* was cancelled so it did not free itself.
@@ -312,12 +389,13 @@ taskq_cancel_id(taskq_t *tq, taskqid_t tid)
}
static void
-taskq_run(void *arg, int pending __unused)
+taskq_run(void *arg, int pending)
{
taskq_ent_t *task = arg;
- if (!task->tqent_cancelled)
- task->tqent_func(task->tqent_arg);
+ if (pending == 0)
+ return;
+ task->tqent_func(task->tqent_arg);
taskq_free(task);
}
@@ -345,7 +423,6 @@ taskq_dispatch_delay(taskq_t *tq, task_func_t func, void *arg,
task->tqent_func = func;
task->tqent_arg = arg;
task->tqent_type = TIMEOUT_TASK;
- task->tqent_cancelled = B_FALSE;
refcount_init(&task->tqent_rc, 1);
tqid = taskq_insert(task);
TIMEOUT_TASK_INIT(tq->tq_queue, &task->tqent_timeout_task, 0,
@@ -379,7 +456,6 @@ taskq_dispatch(taskq_t *tq, task_func_t func, void *arg, uint_t flags)
refcount_init(&task->tqent_rc, 1);
task->tqent_func = func;
task->tqent_arg = arg;
- task->tqent_cancelled = B_FALSE;
task->tqent_type = NORMAL_TASK;
tqid = taskq_insert(task);
TASK_INIT(&task->tqent_task, prio, taskq_run, task);
@@ -388,10 +464,12 @@ taskq_dispatch(taskq_t *tq, task_func_t func, void *arg, uint_t flags)
}
static void
-taskq_run_ent(void *arg, int pending __unused)
+taskq_run_ent(void *arg, int pending)
{
taskq_ent_t *task = arg;
+ if (pending == 0)
+ return;
task->tqent_func(task->tqent_arg);
}
@@ -399,24 +477,34 @@ void
taskq_dispatch_ent(taskq_t *tq, task_func_t func, void *arg, uint32_t flags,
taskq_ent_t *task)
{
- int prio;
-
/*
* If TQ_FRONT is given, we want higher priority for this task, so it
* can go at the front of the queue.
*/
- prio = !!(flags & TQ_FRONT);
- task->tqent_cancelled = B_FALSE;
- task->tqent_registered = B_FALSE;
- task->tqent_id = 0;
+ task->tqent_task.ta_priority = !!(flags & TQ_FRONT);
task->tqent_func = func;
task->tqent_arg = arg;
-
- TASK_INIT(&task->tqent_task, prio, taskq_run_ent, task);
taskqueue_enqueue(tq->tq_queue, &task->tqent_task);
}
void
+taskq_init_ent(taskq_ent_t *task)
+{
+ TASK_INIT(&task->tqent_task, 0, taskq_run_ent, task);
+ task->tqent_func = NULL;
+ task->tqent_arg = NULL;
+ task->tqent_id = 0;
+ task->tqent_type = NORMAL_TASK;
+ task->tqent_rc = 0;
+}
+
+int
+taskq_empty_ent(taskq_ent_t *task)
+{
+ return (task->tqent_task.ta_pending == 0);
+}
+
+void
taskq_wait(taskq_t *tq)
{
taskqueue_quiesce(tq->tq_queue);
@@ -427,12 +515,13 @@ taskq_wait_id(taskq_t *tq, taskqid_t tid)
{
taskq_ent_t *ent;
- if (tid == 0)
- return;
if ((ent = taskq_lookup(tid)) == NULL)
return;
- taskqueue_drain(tq->tq_queue, &ent->tqent_task);
+ if (ent->tqent_type == NORMAL_TASK)
+ taskqueue_drain(tq->tq_queue, &ent->tqent_task);
+ else
+ taskqueue_drain_timeout(tq->tq_queue, &ent->tqent_timeout_task);
taskq_free(ent);
}
@@ -441,9 +530,3 @@ taskq_wait_outstanding(taskq_t *tq, taskqid_t id __unused)
{
taskqueue_drain_all(tq->tq_queue);
}
-
-int
-taskq_empty_ent(taskq_ent_t *t)
-{
- return (t->tqent_task.ta_pending == 0);
-}