aboutsummaryrefslogtreecommitdiff
path: root/cddl/compat
diff options
context:
space:
mode:
Diffstat (limited to 'cddl/compat')
-rw-r--r--cddl/compat/opensolaris/include/thread_pool.h99
-rw-r--r--cddl/compat/opensolaris/misc/thread_pool.c430
-rw-r--r--cddl/compat/opensolaris/misc/thread_pool_impl.h99
3 files changed, 598 insertions, 30 deletions
diff --git a/cddl/compat/opensolaris/include/thread_pool.h b/cddl/compat/opensolaris/include/thread_pool.h
index 25ac55dedea7..3bd23a6dd2aa 100644
--- a/cddl/compat/opensolaris/include/thread_pool.h
+++ b/cddl/compat/opensolaris/include/thread_pool.h
@@ -1,39 +1,78 @@
-/*-
- * Copyright (c) 2010 Pawel Jakub Dawidek <pjd@FreeBSD.org>
- * All rights reserved.
+/*
+ * CDDL HEADER START
*
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions
- * are met:
- * 1. Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * 2. Redistributions in binary form must reproduce the above copyright
- * notice, this list of conditions and the following disclaimer in the
- * documentation and/or other materials provided with the distribution.
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License (the "License").
+ * You may not use this file except in compliance with the License.
*
- * THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS ``AS IS'' AND
- * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
- * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE
- * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
- * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
- * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
- * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
- * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
- * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
- * SUCH DAMAGE.
+ * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
+ * or http://www.opensolaris.org/os/licensing.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
*
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information: Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ */
+
+/*
+ * Copyright 2006 Sun Microsystems, Inc. All rights reserved.
+ * Use is subject to license terms.
+ */
+
+/*
* $FreeBSD$
*/
-#ifndef _OPENSOLARIS_THREAD_POOL_H_
-#define _OPENSOLARIS_THREAD_POOL_H_
+#ifndef _THREAD_POOL_H_
+#define _THREAD_POOL_H_
+
+#pragma ident "%Z%%M% %I% %E% SMI"
+
+#include <sys/types.h>
+#include <thread.h>
+#include <pthread.h>
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+typedef struct tpool tpool_t; /* opaque thread pool descriptor */
+
+#if defined(__STDC__)
+
+extern tpool_t *tpool_create(uint_t min_threads, uint_t max_threads,
+ uint_t linger, pthread_attr_t *attr);
+extern int tpool_dispatch(tpool_t *tpool,
+ void (*func)(void *), void *arg);
+extern void tpool_destroy(tpool_t *tpool);
+extern void tpool_abandon(tpool_t *tpool);
+extern void tpool_wait(tpool_t *tpool);
+extern void tpool_suspend(tpool_t *tpool);
+extern int tpool_suspended(tpool_t *tpool);
+extern void tpool_resume(tpool_t *tpool);
+extern int tpool_member(tpool_t *tpool);
+
+#else /* Non ANSI */
+
+extern tpool_t *tpool_create();
+extern int tpool_dispatch();
+extern void tpool_destroy();
+extern void tpool_abandon();
+extern void tpool_wait();
+extern void tpool_suspend();
+extern int tpool_suspended();
+extern void tpool_resume();
+extern int tpool_member();
-typedef int tpool_t;
+#endif /* __STDC__ */
-#define tpool_create(a, b, c, d) (0)
-#define tpool_dispatch(pool, func, arg) func(arg)
-#define tpool_wait(pool) do { } while (0)
-#define tpool_destroy(pool) do { } while (0)
+#ifdef __cplusplus
+}
+#endif
-#endif /* !_OPENSOLARIS_THREAD_POOL_H_ */
+#endif /* _THREAD_POOL_H_ */
diff --git a/cddl/compat/opensolaris/misc/thread_pool.c b/cddl/compat/opensolaris/misc/thread_pool.c
new file mode 100644
index 000000000000..a6a834fb2bbd
--- /dev/null
+++ b/cddl/compat/opensolaris/misc/thread_pool.c
@@ -0,0 +1,430 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License (the "License").
+ * You may not use this file except in compliance with the License.
+ *
+ * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
+ * or http://www.opensolaris.org/os/licensing.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information: Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ */
+
+/*
+ * Copyright 2008 Sun Microsystems, Inc. All rights reserved.
+ * Use is subject to license terms.
+ */
+
+#include <sys/cdefs.h>
+__FBSDID("$FreeBSD$");
+
+#pragma ident "%Z%%M% %I% %E% SMI"
+
+#include <stdlib.h>
+#include <signal.h>
+#include <errno.h>
+#include "thread_pool_impl.h"
+
+typedef void (*_Voidfp)(void*); /* pointer to extern "C" function */
+
+static void
+delete_pool(tpool_t *tpool)
+{
+ tpool_job_t *job;
+
+ /*
+ * There should be no pending jobs, but just in case...
+ */
+ for (job = tpool->tp_head; job != NULL; job = tpool->tp_head) {
+ tpool->tp_head = job->tpj_next;
+ free(job);
+ }
+ (void) pthread_attr_destroy(&tpool->tp_attr);
+ free(tpool);
+}
+
+/*
+ * Worker thread is terminating.
+ */
+static void
+worker_cleanup(void *arg)
+{
+ tpool_t *tpool = arg;
+
+ if (--tpool->tp_current == 0 &&
+ (tpool->tp_flags & (TP_DESTROY | TP_ABANDON))) {
+ if (tpool->tp_flags & TP_ABANDON) {
+ pthread_mutex_unlock(&tpool->tp_mutex);
+ delete_pool(tpool);
+ return;
+ }
+ if (tpool->tp_flags & TP_DESTROY)
+ (void) pthread_cond_broadcast(&tpool->tp_busycv);
+ }
+ pthread_mutex_unlock(&tpool->tp_mutex);
+}
+
+static void
+notify_waiters(tpool_t *tpool)
+{
+ if (tpool->tp_head == NULL && tpool->tp_active == NULL) {
+ tpool->tp_flags &= ~TP_WAIT;
+ (void) pthread_cond_broadcast(&tpool->tp_waitcv);
+ }
+}
+
+/*
+ * Called by a worker thread on return from a tpool_dispatch()d job.
+ */
+static void
+job_cleanup(void *arg)
+{
+ tpool_t *tpool = arg;
+ pthread_t my_tid = pthread_self();
+ tpool_active_t *activep;
+ tpool_active_t **activepp;
+
+ pthread_mutex_lock(&tpool->tp_mutex);
+ /* CSTYLED */
+ for (activepp = &tpool->tp_active;; activepp = &activep->tpa_next) {
+ activep = *activepp;
+ if (activep->tpa_tid == my_tid) {
+ *activepp = activep->tpa_next;
+ break;
+ }
+ }
+ if (tpool->tp_flags & TP_WAIT)
+ notify_waiters(tpool);
+}
+
+static void *
+tpool_worker(void *arg)
+{
+ tpool_t *tpool = (tpool_t *)arg;
+ int elapsed;
+ tpool_job_t *job;
+ void (*func)(void *);
+ tpool_active_t active;
+ sigset_t maskset;
+
+ pthread_mutex_lock(&tpool->tp_mutex);
+ pthread_cleanup_push(worker_cleanup, tpool);
+
+ /*
+ * This is the worker's main loop.
+ * It will only be left if a timeout or an error has occured.
+ */
+ active.tpa_tid = pthread_self();
+ for (;;) {
+ elapsed = 0;
+ tpool->tp_idle++;
+ if (tpool->tp_flags & TP_WAIT)
+ notify_waiters(tpool);
+ while ((tpool->tp_head == NULL ||
+ (tpool->tp_flags & TP_SUSPEND)) &&
+ !(tpool->tp_flags & (TP_DESTROY | TP_ABANDON))) {
+ if (tpool->tp_current <= tpool->tp_minimum ||
+ tpool->tp_linger == 0) {
+ (void) pthread_cond_wait(&tpool->tp_workcv,
+ &tpool->tp_mutex);
+ } else {
+ struct timespec timeout;
+
+ clock_gettime(CLOCK_MONOTONIC, &timeout);
+ timeout.tv_sec += tpool->tp_linger;
+ if (pthread_cond_timedwait(&tpool->tp_workcv,
+ &tpool->tp_mutex, &timeout) != 0) {
+ elapsed = 1;
+ break;
+ }
+ }
+ }
+ tpool->tp_idle--;
+ if (tpool->tp_flags & TP_DESTROY)
+ break;
+ if (tpool->tp_flags & TP_ABANDON) {
+ /* can't abandon a suspended pool */
+ if (tpool->tp_flags & TP_SUSPEND) {
+ tpool->tp_flags &= ~TP_SUSPEND;
+ (void) pthread_cond_broadcast(&tpool->tp_workcv);
+ }
+ if (tpool->tp_head == NULL)
+ break;
+ }
+ if ((job = tpool->tp_head) != NULL &&
+ !(tpool->tp_flags & TP_SUSPEND)) {
+ elapsed = 0;
+ func = job->tpj_func;
+ arg = job->tpj_arg;
+ tpool->tp_head = job->tpj_next;
+ if (job == tpool->tp_tail)
+ tpool->tp_tail = NULL;
+ tpool->tp_njobs--;
+ active.tpa_next = tpool->tp_active;
+ tpool->tp_active = &active;
+ pthread_mutex_unlock(&tpool->tp_mutex);
+ pthread_cleanup_push(job_cleanup, tpool);
+ free(job);
+ /*
+ * Call the specified function.
+ */
+ func(arg);
+ /*
+ * We don't know what this thread has been doing,
+ * so we reset its signal mask and cancellation
+ * state back to the initial values.
+ */
+ sigfillset(&maskset);
+ (void) pthread_sigmask(SIG_SETMASK, &maskset, NULL);
+ (void) pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED,
+ NULL);
+ (void) pthread_setcancelstate(PTHREAD_CANCEL_ENABLE,
+ NULL);
+ pthread_cleanup_pop(1);
+ }
+ if (elapsed && tpool->tp_current > tpool->tp_minimum) {
+ /*
+ * We timed out and there is no work to be done
+ * and the number of workers exceeds the minimum.
+ * Exit now to reduce the size of the pool.
+ */
+ break;
+ }
+ }
+ pthread_cleanup_pop(1);
+ return (arg);
+}
+
+/*
+ * Create a worker thread, with all signals blocked.
+ */
+static int
+create_worker(tpool_t *tpool)
+{
+ sigset_t maskset, oset;
+ pthread_t thread;
+ int error;
+
+ sigfillset(&maskset);
+ (void) pthread_sigmask(SIG_SETMASK, &maskset, &oset);
+ error = pthread_create(&thread, &tpool->tp_attr, tpool_worker, tpool);
+ (void) pthread_sigmask(SIG_SETMASK, &oset, NULL);
+ return (error);
+}
+
+tpool_t *
+tpool_create(uint_t min_threads, uint_t max_threads, uint_t linger,
+ pthread_attr_t *attr)
+{
+ tpool_t *tpool;
+ int error;
+
+ if (min_threads > max_threads || max_threads < 1) {
+ errno = EINVAL;
+ return (NULL);
+ }
+
+ tpool = malloc(sizeof (*tpool));
+ if (tpool == NULL) {
+ errno = ENOMEM;
+ return (NULL);
+ }
+ bzero(tpool, sizeof(*tpool));
+ (void) pthread_mutex_init(&tpool->tp_mutex, NULL);
+ (void) pthread_cond_init(&tpool->tp_busycv, NULL);
+ (void) pthread_cond_init(&tpool->tp_workcv, NULL);
+ (void) pthread_cond_init(&tpool->tp_waitcv, NULL);
+ tpool->tp_minimum = min_threads;
+ tpool->tp_maximum = max_threads;
+ tpool->tp_linger = linger;
+
+ /* make all pool threads be detached daemon threads */
+ (void) pthread_attr_init(&tpool->tp_attr);
+ (void) pthread_attr_setdetachstate(&tpool->tp_attr,
+ PTHREAD_CREATE_DETACHED);
+
+ return (tpool);
+}
+
+/*
+ * Dispatch a work request to the thread pool.
+ * If there are idle workers, awaken one.
+ * Else, if the maximum number of workers has
+ * not been reached, spawn a new worker thread.
+ * Else just return with the job added to the queue.
+ */
+int
+tpool_dispatch(tpool_t *tpool, void (*func)(void *), void *arg)
+{
+ tpool_job_t *job;
+
+ if ((job = malloc(sizeof (*job))) == NULL)
+ return (-1);
+ bzero(job, sizeof(*job));
+ job->tpj_next = NULL;
+ job->tpj_func = func;
+ job->tpj_arg = arg;
+
+ pthread_mutex_lock(&tpool->tp_mutex);
+
+ if (tpool->tp_head == NULL)
+ tpool->tp_head = job;
+ else
+ tpool->tp_tail->tpj_next = job;
+ tpool->tp_tail = job;
+ tpool->tp_njobs++;
+
+ if (!(tpool->tp_flags & TP_SUSPEND)) {
+ if (tpool->tp_idle > 0)
+ (void) pthread_cond_signal(&tpool->tp_workcv);
+ else if (tpool->tp_current < tpool->tp_maximum &&
+ create_worker(tpool) == 0)
+ tpool->tp_current++;
+ }
+
+ pthread_mutex_unlock(&tpool->tp_mutex);
+ return (0);
+}
+
+/*
+ * Assumes: by the time tpool_destroy() is called no one will use this
+ * thread pool in any way and no one will try to dispatch entries to it.
+ * Calling tpool_destroy() from a job in the pool will cause deadlock.
+ */
+void
+tpool_destroy(tpool_t *tpool)
+{
+ tpool_active_t *activep;
+
+ pthread_mutex_lock(&tpool->tp_mutex);
+ pthread_cleanup_push((_Voidfp)pthread_mutex_unlock, &tpool->tp_mutex);
+
+ /* mark the pool as being destroyed; wakeup idle workers */
+ tpool->tp_flags |= TP_DESTROY;
+ tpool->tp_flags &= ~TP_SUSPEND;
+ (void) pthread_cond_broadcast(&tpool->tp_workcv);
+
+ /* cancel all active workers */
+ for (activep = tpool->tp_active; activep; activep = activep->tpa_next)
+ (void) pthread_cancel(activep->tpa_tid);
+
+ /* wait for all active workers to finish */
+ while (tpool->tp_active != NULL) {
+ tpool->tp_flags |= TP_WAIT;
+ (void) pthread_cond_wait(&tpool->tp_waitcv, &tpool->tp_mutex);
+ }
+
+ /* the last worker to terminate will wake us up */
+ while (tpool->tp_current != 0)
+ (void) pthread_cond_wait(&tpool->tp_busycv, &tpool->tp_mutex);
+
+ pthread_cleanup_pop(1); /* pthread_mutex_unlock(&tpool->tp_mutex); */
+ delete_pool(tpool);
+}
+
+/*
+ * Like tpool_destroy(), but don't cancel workers or wait for them to finish.
+ * The last worker to terminate will delete the pool.
+ */
+void
+tpool_abandon(tpool_t *tpool)
+{
+
+ pthread_mutex_lock(&tpool->tp_mutex);
+ if (tpool->tp_current == 0) {
+ /* no workers, just delete the pool */
+ pthread_mutex_unlock(&tpool->tp_mutex);
+ delete_pool(tpool);
+ } else {
+ /* wake up all workers, last one will delete the pool */
+ tpool->tp_flags |= TP_ABANDON;
+ tpool->tp_flags &= ~TP_SUSPEND;
+ (void) pthread_cond_broadcast(&tpool->tp_workcv);
+ pthread_mutex_unlock(&tpool->tp_mutex);
+ }
+}
+
+/*
+ * Wait for all jobs to complete.
+ * Calling tpool_wait() from a job in the pool will cause deadlock.
+ */
+void
+tpool_wait(tpool_t *tpool)
+{
+
+ pthread_mutex_lock(&tpool->tp_mutex);
+ pthread_cleanup_push((_Voidfp)pthread_mutex_unlock, &tpool->tp_mutex);
+ while (tpool->tp_head != NULL || tpool->tp_active != NULL) {
+ tpool->tp_flags |= TP_WAIT;
+ (void) pthread_cond_wait(&tpool->tp_waitcv, &tpool->tp_mutex);
+ }
+ pthread_cleanup_pop(1); /* pthread_mutex_unlock(&tpool->tp_mutex); */
+}
+
+void
+tpool_suspend(tpool_t *tpool)
+{
+
+ pthread_mutex_lock(&tpool->tp_mutex);
+ tpool->tp_flags |= TP_SUSPEND;
+ pthread_mutex_unlock(&tpool->tp_mutex);
+}
+
+int
+tpool_suspended(tpool_t *tpool)
+{
+ int suspended;
+
+ pthread_mutex_lock(&tpool->tp_mutex);
+ suspended = (tpool->tp_flags & TP_SUSPEND) != 0;
+ pthread_mutex_unlock(&tpool->tp_mutex);
+
+ return (suspended);
+}
+
+void
+tpool_resume(tpool_t *tpool)
+{
+ int excess;
+
+ pthread_mutex_lock(&tpool->tp_mutex);
+ if (!(tpool->tp_flags & TP_SUSPEND)) {
+ pthread_mutex_unlock(&tpool->tp_mutex);
+ return;
+ }
+ tpool->tp_flags &= ~TP_SUSPEND;
+ (void) pthread_cond_broadcast(&tpool->tp_workcv);
+ excess = tpool->tp_njobs - tpool->tp_idle;
+ while (excess-- > 0 && tpool->tp_current < tpool->tp_maximum) {
+ if (create_worker(tpool) != 0)
+ break; /* pthread_create() failed */
+ tpool->tp_current++;
+ }
+ pthread_mutex_unlock(&tpool->tp_mutex);
+}
+
+int
+tpool_member(tpool_t *tpool)
+{
+ pthread_t my_tid = pthread_self();
+ tpool_active_t *activep;
+
+ pthread_mutex_lock(&tpool->tp_mutex);
+ for (activep = tpool->tp_active; activep; activep = activep->tpa_next) {
+ if (activep->tpa_tid == my_tid) {
+ pthread_mutex_unlock(&tpool->tp_mutex);
+ return (1);
+ }
+ }
+ pthread_mutex_unlock(&tpool->tp_mutex);
+ return (0);
+}
diff --git a/cddl/compat/opensolaris/misc/thread_pool_impl.h b/cddl/compat/opensolaris/misc/thread_pool_impl.h
new file mode 100644
index 000000000000..bc98ac8b1b37
--- /dev/null
+++ b/cddl/compat/opensolaris/misc/thread_pool_impl.h
@@ -0,0 +1,99 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License (the "License").
+ * You may not use this file except in compliance with the License.
+ *
+ * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
+ * or http://www.opensolaris.org/os/licensing.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information: Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ */
+
+/*
+ * Copyright 2008 Sun Microsystems, Inc. All rights reserved.
+ * Use is subject to license terms.
+ */
+
+/*
+ * $FreeBSD$
+ */
+
+#ifndef _THREAD_POOL_IMPL_H
+#define _THREAD_POOL_IMPL_H
+
+#pragma ident "%Z%%M% %I% %E% SMI"
+
+#include <thread_pool.h>
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+/*
+ * Thread pool implementation definitions.
+ * See <thread_pool.h> for interface declarations.
+ */
+
+/*
+ * FIFO queued job
+ */
+typedef struct tpool_job tpool_job_t;
+struct tpool_job {
+ tpool_job_t *tpj_next; /* list of jobs */
+ void (*tpj_func)(void *); /* function to call */
+ void *tpj_arg; /* its argument */
+};
+
+/*
+ * List of active threads, linked through their stacks.
+ */
+typedef struct tpool_active tpool_active_t;
+struct tpool_active {
+ tpool_active_t *tpa_next; /* list of active threads */
+ pthread_t tpa_tid; /* active thread id */
+};
+
+/*
+ * The thread pool.
+ */
+struct tpool {
+ tpool_t *tp_forw; /* circular list of all thread pools */
+ tpool_t *tp_back;
+ mutex_t tp_mutex; /* protects the pool data */
+ cond_t tp_busycv; /* synchronization in tpool_dispatch */
+ cond_t tp_workcv; /* synchronization with workers */
+ cond_t tp_waitcv; /* synchronization in tpool_wait() */
+ tpool_active_t *tp_active; /* threads performing work */
+ tpool_job_t *tp_head; /* FIFO job queue */
+ tpool_job_t *tp_tail;
+ pthread_attr_t tp_attr; /* attributes of the workers */
+ int tp_flags; /* see below */
+ uint_t tp_linger; /* seconds before idle workers exit */
+ int tp_njobs; /* number of jobs in job queue */
+ int tp_minimum; /* minimum number of worker threads */
+ int tp_maximum; /* maximum number of worker threads */
+ int tp_current; /* current number of worker threads */
+ int tp_idle; /* number of idle workers */
+};
+
+/* tp_flags */
+#define TP_WAIT 0x01 /* waiting in tpool_wait() */
+#define TP_SUSPEND 0x02 /* pool is being suspended */
+#define TP_DESTROY 0x04 /* pool is being destroyed */
+#define TP_ABANDON 0x08 /* pool is abandoned (auto-destroy) */
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* _THREAD_POOL_IMPL_H */