aboutsummaryrefslogtreecommitdiff
path: root/sys/contrib/openzfs/module/zfs/bqueue.c
diff options
context:
space:
mode:
Diffstat (limited to 'sys/contrib/openzfs/module/zfs/bqueue.c')
-rw-r--r--sys/contrib/openzfs/module/zfs/bqueue.c122
1 files changed, 71 insertions, 51 deletions
diff --git a/sys/contrib/openzfs/module/zfs/bqueue.c b/sys/contrib/openzfs/module/zfs/bqueue.c
index 22539efc4e23..a7fa516975de 100644
--- a/sys/contrib/openzfs/module/zfs/bqueue.c
+++ b/sys/contrib/openzfs/module/zfs/bqueue.c
@@ -27,34 +27,46 @@ obj2node(bqueue_t *q, void *data)
/*
* Initialize a blocking queue The maximum capacity of the queue is set to
- * size. Types that are stored in a bqueue must contain a bqueue_node_t,
- * and node_offset must be its offset from the start of the struct.
- * fill_fraction is a performance tuning value; when the queue is full, any
- * threads attempting to enqueue records will block. They will block until
- * they're signaled, which will occur when the queue is at least 1/fill_fraction
+ * size. Types that are stored in a bqueue must contain a bqueue_node_t, and
+ * node_offset must be its offset from the start of the struct. fill_fraction
+ * is a performance tuning value; when the queue is full, any threads
+ * attempting to enqueue records will block. They will block until they're
+ * signaled, which will occur when the queue is at least 1/fill_fraction
* empty. Similar behavior occurs on dequeue; if the queue is empty, threads
- * block. They will be signalled when the queue has 1/fill_fraction full, or
- * when bqueue_flush is called. As a result, you must call bqueue_flush when
- * you enqueue your final record on a thread, in case the dequeueing threads are
- * currently blocked and that enqueue does not cause them to be awoken.
- * Alternatively, this behavior can be disabled (causing signaling to happen
- * immediately) by setting fill_fraction to any value larger than size.
- * Return 0 on success, or -1 on failure.
+ * block. They will be signalled when the queue has 1/fill_fraction full.
+ * As a result, you must call bqueue_enqueue_flush() when you enqueue your
+ * final record on a thread, in case the dequeuing threads are currently
+ * blocked and that enqueue does not cause them to be woken. Alternatively,
+ * this behavior can be disabled (causing signaling to happen immediately) by
+ * setting fill_fraction to any value larger than size. Return 0 on success,
+ * or -1 on failure.
+ *
+ * Note: The caller must ensure that for a given bqueue_t, there's only a
+ * single call to bqueue_enqueue() running at a time (e.g. by calling only
+ * from a single thread, or with locking around the call). Similarly, the
+ * caller must ensure that there's only a single call to bqueue_dequeue()
+ * running at a time. However, the one call to bqueue_enqueue() may be
+ * invoked concurrently with the one call to bqueue_dequeue().
*/
int
-bqueue_init(bqueue_t *q, uint64_t fill_fraction, uint64_t size,
- size_t node_offset)
+bqueue_init(bqueue_t *q, uint_t fill_fraction, size_t size, size_t node_offset)
{
if (fill_fraction == 0) {
return (-1);
}
list_create(&q->bq_list, node_offset + sizeof (bqueue_node_t),
node_offset + offsetof(bqueue_node_t, bqn_node));
+ list_create(&q->bq_dequeuing_list, node_offset + sizeof (bqueue_node_t),
+ node_offset + offsetof(bqueue_node_t, bqn_node));
+ list_create(&q->bq_enqueuing_list, node_offset + sizeof (bqueue_node_t),
+ node_offset + offsetof(bqueue_node_t, bqn_node));
cv_init(&q->bq_add_cv, NULL, CV_DEFAULT, NULL);
cv_init(&q->bq_pop_cv, NULL, CV_DEFAULT, NULL);
mutex_init(&q->bq_lock, NULL, MUTEX_DEFAULT, NULL);
q->bq_node_offset = node_offset;
q->bq_size = 0;
+ q->bq_dequeuing_size = 0;
+ q->bq_enqueuing_size = 0;
q->bq_maxsize = size;
q->bq_fill_fraction = fill_fraction;
return (0);
@@ -70,31 +82,40 @@ bqueue_destroy(bqueue_t *q)
{
mutex_enter(&q->bq_lock);
ASSERT0(q->bq_size);
+ ASSERT0(q->bq_dequeuing_size);
+ ASSERT0(q->bq_enqueuing_size);
cv_destroy(&q->bq_add_cv);
cv_destroy(&q->bq_pop_cv);
list_destroy(&q->bq_list);
+ list_destroy(&q->bq_dequeuing_list);
+ list_destroy(&q->bq_enqueuing_list);
mutex_exit(&q->bq_lock);
mutex_destroy(&q->bq_lock);
}
static void
-bqueue_enqueue_impl(bqueue_t *q, void *data, uint64_t item_size,
- boolean_t flush)
+bqueue_enqueue_impl(bqueue_t *q, void *data, size_t item_size, boolean_t flush)
{
ASSERT3U(item_size, >, 0);
ASSERT3U(item_size, <=, q->bq_maxsize);
- mutex_enter(&q->bq_lock);
+
obj2node(q, data)->bqn_size = item_size;
- while (q->bq_size + item_size > q->bq_maxsize) {
- cv_wait_sig(&q->bq_add_cv, &q->bq_lock);
- }
- q->bq_size += item_size;
- list_insert_tail(&q->bq_list, data);
- if (q->bq_size >= q->bq_maxsize / q->bq_fill_fraction)
- cv_signal(&q->bq_pop_cv);
- if (flush)
+ q->bq_enqueuing_size += item_size;
+ list_insert_tail(&q->bq_enqueuing_list, data);
+
+ if (flush ||
+ q->bq_enqueuing_size >= q->bq_maxsize / q->bq_fill_fraction) {
+ /* Append the enquing list to the shared list. */
+ mutex_enter(&q->bq_lock);
+ while (q->bq_size > q->bq_maxsize) {
+ cv_wait_sig(&q->bq_add_cv, &q->bq_lock);
+ }
+ q->bq_size += q->bq_enqueuing_size;
+ list_move_tail(&q->bq_list, &q->bq_enqueuing_list);
+ q->bq_enqueuing_size = 0;
cv_broadcast(&q->bq_pop_cv);
- mutex_exit(&q->bq_lock);
+ mutex_exit(&q->bq_lock);
+ }
}
/*
@@ -103,7 +124,7 @@ bqueue_enqueue_impl(bqueue_t *q, void *data, uint64_t item_size,
* > 0.
*/
void
-bqueue_enqueue(bqueue_t *q, void *data, uint64_t item_size)
+bqueue_enqueue(bqueue_t *q, void *data, size_t item_size)
{
bqueue_enqueue_impl(q, data, item_size, B_FALSE);
}
@@ -112,12 +133,12 @@ bqueue_enqueue(bqueue_t *q, void *data, uint64_t item_size)
* Enqueue an entry, and then flush the queue. This forces the popping threads
* to wake up, even if we're below the fill fraction. We have this in a single
* function, rather than having a separate call, because it prevents race
- * conditions between the enqueuing thread and the dequeueing thread, where the
- * enqueueing thread will wake up the dequeueing thread, that thread will
+ * conditions between the enqueuing thread and the dequeuing thread, where the
+ * enqueueing thread will wake up the dequeuing thread, that thread will
* destroy the condvar before the enqueuing thread is done.
*/
void
-bqueue_enqueue_flush(bqueue_t *q, void *data, uint64_t item_size)
+bqueue_enqueue_flush(bqueue_t *q, void *data, size_t item_size)
{
bqueue_enqueue_impl(q, data, item_size, B_TRUE);
}
@@ -129,27 +150,26 @@ bqueue_enqueue_flush(bqueue_t *q, void *data, uint64_t item_size)
void *
bqueue_dequeue(bqueue_t *q)
{
- void *ret = NULL;
- uint64_t item_size;
- mutex_enter(&q->bq_lock);
- while (q->bq_size == 0) {
- cv_wait_sig(&q->bq_pop_cv, &q->bq_lock);
+ void *ret = list_remove_head(&q->bq_dequeuing_list);
+ if (ret == NULL) {
+ /*
+ * Dequeuing list is empty. Wait for there to be something on
+ * the shared list, then move the entire shared list to the
+ * dequeuing list.
+ */
+ mutex_enter(&q->bq_lock);
+ while (q->bq_size == 0) {
+ cv_wait_sig(&q->bq_pop_cv, &q->bq_lock);
+ }
+ ASSERT0(q->bq_dequeuing_size);
+ ASSERT(list_is_empty(&q->bq_dequeuing_list));
+ list_move_tail(&q->bq_dequeuing_list, &q->bq_list);
+ q->bq_dequeuing_size = q->bq_size;
+ q->bq_size = 0;
+ cv_broadcast(&q->bq_add_cv);
+ mutex_exit(&q->bq_lock);
+ ret = list_remove_head(&q->bq_dequeuing_list);
}
- ret = list_remove_head(&q->bq_list);
- ASSERT3P(ret, !=, NULL);
- item_size = obj2node(q, ret)->bqn_size;
- q->bq_size -= item_size;
- if (q->bq_size <= q->bq_maxsize - (q->bq_maxsize / q->bq_fill_fraction))
- cv_signal(&q->bq_add_cv);
- mutex_exit(&q->bq_lock);
+ q->bq_dequeuing_size -= obj2node(q, ret)->bqn_size;
return (ret);
}
-
-/*
- * Returns true if the space used is 0.
- */
-boolean_t
-bqueue_empty(bqueue_t *q)
-{
- return (q->bq_size == 0);
-}