aboutsummaryrefslogtreecommitdiff
path: root/sys/sys/buf_ring.h
blob: 48c7101aad97c6164a069531080a4db3a9b047cb (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
/*-
 * SPDX-License-Identifier: BSD-2-Clause-FreeBSD
 *
 * Copyright (c) 2007-2009 Kip Macy <kmacy@freebsd.org>
 * All rights reserved.
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions
 * are met:
 * 1. Redistributions of source code must retain the above copyright
 *    notice, this list of conditions and the following disclaimer.
 * 2. Redistributions in binary form must reproduce the above copyright
 *    notice, this list of conditions and the following disclaimer in the
 *    documentation and/or other materials provided with the distribution.
 *
 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
 * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
 * SUCH DAMAGE.
 *
 * $FreeBSD$
 *
 */

#ifndef	_SYS_BUF_RING_H_
#define	_SYS_BUF_RING_H_

#include <machine/cpu.h>

#ifdef DEBUG_BUFRING
#include <sys/lock.h>
#include <sys/mutex.h>
#endif

struct buf_ring {
	volatile uint32_t	br_prod_head;
	volatile uint32_t	br_prod_tail;	
	int              	br_prod_size;
	int              	br_prod_mask;
	uint64_t		br_drops;
	volatile uint32_t	br_cons_head __aligned(CACHE_LINE_SIZE);
	volatile uint32_t	br_cons_tail;
	int		 	br_cons_size;
	int              	br_cons_mask;
#ifdef DEBUG_BUFRING
	struct mtx		*br_lock;
#endif	
	void			*br_ring[0] __aligned(CACHE_LINE_SIZE);
};

/*
 * multi-producer safe lock-free ring buffer enqueue
 *
 */
static __inline int
buf_ring_enqueue(struct buf_ring *br, void *buf)
{
	uint32_t prod_head, prod_next, cons_tail;
#ifdef DEBUG_BUFRING
	int i;

	/*
	 * Note: It is possible to encounter an mbuf that was removed
	 * via drbr_peek(), and then re-added via drbr_putback() and
	 * trigger a spurious panic.
	 */
	for (i = br->br_cons_head; i != br->br_prod_head;
	     i = ((i + 1) & br->br_cons_mask))
		if(br->br_ring[i] == buf)
			panic("buf=%p already enqueue at %d prod=%d cons=%d",
			    buf, i, br->br_prod_tail, br->br_cons_tail);
#endif	
	critical_enter();
	do {
		prod_head = br->br_prod_head;
		prod_next = (prod_head + 1) & br->br_prod_mask;
		cons_tail = br->br_cons_tail;

		if (prod_next == cons_tail) {
			rmb();
			if (prod_head == br->br_prod_head &&
			    cons_tail == br->br_cons_tail) {
				br->br_drops++;
				critical_exit();
				return (ENOBUFS);
			}
			continue;
		}
	} while (!atomic_cmpset_acq_int(&br->br_prod_head, prod_head, prod_next));
#ifdef DEBUG_BUFRING
	if (br->br_ring[prod_head] != NULL)
		panic("dangling value in enqueue");
#endif	
	br->br_ring[prod_head] = buf;

	/*
	 * If there are other enqueues in progress
	 * that preceded us, we need to wait for them
	 * to complete 
	 */   
	while (br->br_prod_tail != prod_head)
		cpu_spinwait();
	atomic_store_rel_int(&br->br_prod_tail, prod_next);
	critical_exit();
	return (0);
}

/*
 * multi-consumer safe dequeue 
 *
 */
static __inline void *
buf_ring_dequeue_mc(struct buf_ring *br)
{
	uint32_t cons_head, cons_next;
	void *buf;

	critical_enter();
	do {
		cons_head = br->br_cons_head;
		cons_next = (cons_head + 1) & br->br_cons_mask;

		if (cons_head == br->br_prod_tail) {
			critical_exit();
			return (NULL);
		}
	} while (!atomic_cmpset_acq_int(&br->br_cons_head, cons_head, cons_next));

	buf = br->br_ring[cons_head];
#ifdef DEBUG_BUFRING
	br->br_ring[cons_head] = NULL;
#endif
	/*
	 * If there are other dequeues in progress
	 * that preceded us, we need to wait for them
	 * to complete 
	 */   
	while (br->br_cons_tail != cons_head)
		cpu_spinwait();

	atomic_store_rel_int(&br->br_cons_tail, cons_next);
	critical_exit();

	return (buf);
}

/*
 * single-consumer dequeue 
 * use where dequeue is protected by a lock
 * e.g. a network driver's tx queue lock
 */
static __inline void *
buf_ring_dequeue_sc(struct buf_ring *br)
{
	uint32_t cons_head, cons_next;
#ifdef PREFETCH_DEFINED
	uint32_t cons_next_next;
#endif
	uint32_t prod_tail;
	void *buf;

	/*
	 * This is a workaround to allow using buf_ring on ARM and ARM64.
	 * ARM64TODO: Fix buf_ring in a generic way.
	 * REMARKS: It is suspected that br_cons_head does not require
	 *   load_acq operation, but this change was extensively tested
	 *   and confirmed it's working. To be reviewed once again in
	 *   FreeBSD-12.
	 *
	 * Preventing following situation:

	 * Core(0) - buf_ring_enqueue()                                       Core(1) - buf_ring_dequeue_sc()
	 * -----------------------------------------                                       ----------------------------------------------
	 *
	 *                                                                                cons_head = br->br_cons_head;
	 * atomic_cmpset_acq_32(&br->br_prod_head, ...));
	 *                                                                                buf = br->br_ring[cons_head];     <see <1>>
	 * br->br_ring[prod_head] = buf;
	 * atomic_store_rel_32(&br->br_prod_tail, ...);
	 *                                                                                prod_tail = br->br_prod_tail;
	 *                                                                                if (cons_head == prod_tail) 
	 *                                                                                        return (NULL);
	 *                                                                                <condition is false and code uses invalid(old) buf>`	
	 *
	 * <1> Load (on core 1) from br->br_ring[cons_head] can be reordered (speculative readed) by CPU.
	 */	
#if defined(__arm__) || defined(__aarch64__)
	cons_head = atomic_load_acq_32(&br->br_cons_head);
#else
	cons_head = br->br_cons_head;
#endif
	prod_tail = atomic_load_acq_32(&br->br_prod_tail);

	cons_next = (cons_head + 1) & br->br_cons_mask;
#ifdef PREFETCH_DEFINED
	cons_next_next = (cons_head + 2) & br->br_cons_mask;
#endif

	if (cons_head == prod_tail) 
		return (NULL);

#ifdef PREFETCH_DEFINED	
	if (cons_next != prod_tail) {		
		prefetch(br->br_ring[cons_next]);
		if (cons_next_next != prod_tail) 
			prefetch(br->br_ring[cons_next_next]);
	}
#endif
	br->br_cons_head = cons_next;
	buf = br->br_ring[cons_head];

#ifdef DEBUG_BUFRING
	br->br_ring[cons_head] = NULL;
	if (!mtx_owned(br->br_lock))
		panic("lock not held on single consumer dequeue");
	if (br->br_cons_tail != cons_head)
		panic("inconsistent list cons_tail=%d cons_head=%d",
		    br->br_cons_tail, cons_head);
#endif
	br->br_cons_tail = cons_next;
	return (buf);
}

/*
 * single-consumer advance after a peek
 * use where it is protected by a lock
 * e.g. a network driver's tx queue lock
 */
static __inline void
buf_ring_advance_sc(struct buf_ring *br)
{
	uint32_t cons_head, cons_next;
	uint32_t prod_tail;

	cons_head = br->br_cons_head;
	prod_tail = br->br_prod_tail;

	cons_next = (cons_head + 1) & br->br_cons_mask;
	if (cons_head == prod_tail) 
		return;
	br->br_cons_head = cons_next;
#ifdef DEBUG_BUFRING
	br->br_ring[cons_head] = NULL;
#endif
	br->br_cons_tail = cons_next;
}

/*
 * Used to return a buffer (most likely already there)
 * to the top of the ring. The caller should *not*
 * have used any dequeue to pull it out of the ring
 * but instead should have used the peek() function.
 * This is normally used where the transmit queue
 * of a driver is full, and an mbuf must be returned.
 * Most likely whats in the ring-buffer is what
 * is being put back (since it was not removed), but
 * sometimes the lower transmit function may have
 * done a pullup or other function that will have
 * changed it. As an optimization we always put it
 * back (since jhb says the store is probably cheaper),
 * if we have to do a multi-queue version we will need
 * the compare and an atomic.
 */
static __inline void
buf_ring_putback_sc(struct buf_ring *br, void *new)
{
	KASSERT(br->br_cons_head != br->br_prod_tail, 
		("Buf-Ring has none in putback")) ;
	br->br_ring[br->br_cons_head] = new;
}

/*
 * return a pointer to the first entry in the ring
 * without modifying it, or NULL if the ring is empty
 * race-prone if not protected by a lock
 */
static __inline void *
buf_ring_peek(struct buf_ring *br)
{

#ifdef DEBUG_BUFRING
	if ((br->br_lock != NULL) && !mtx_owned(br->br_lock))
		panic("lock not held on single consumer dequeue");
#endif	
	/*
	 * I believe it is safe to not have a memory barrier
	 * here because we control cons and tail is worst case
	 * a lagging indicator so we worst case we might
	 * return NULL immediately after a buffer has been enqueued
	 */
	if (br->br_cons_head == br->br_prod_tail)
		return (NULL);

	return (br->br_ring[br->br_cons_head]);
}

static __inline void *
buf_ring_peek_clear_sc(struct buf_ring *br)
{
#ifdef DEBUG_BUFRING
	void *ret;

	if (!mtx_owned(br->br_lock))
		panic("lock not held on single consumer dequeue");
#endif	

	if (br->br_cons_head == br->br_prod_tail)
		return (NULL);

#if defined(__arm__) || defined(__aarch64__)
	/*
	 * The barrier is required there on ARM and ARM64 to ensure, that
	 * br->br_ring[br->br_cons_head] will not be fetched before the above
	 * condition is checked.
	 * Without the barrier, it is possible, that buffer will be fetched
	 * before the enqueue will put mbuf into br, then, in the meantime, the
	 * enqueue will update the array and the br_prod_tail, and the
	 * conditional check will be true, so we will return previously fetched
	 * (and invalid) buffer.
	 */
	atomic_thread_fence_acq();
#endif

#ifdef DEBUG_BUFRING
	/*
	 * Single consumer, i.e. cons_head will not move while we are
	 * running, so atomic_swap_ptr() is not necessary here.
	 */
	ret = br->br_ring[br->br_cons_head];
	br->br_ring[br->br_cons_head] = NULL;
	return (ret);
#else
	return (br->br_ring[br->br_cons_head]);
#endif
}

static __inline int
buf_ring_full(struct buf_ring *br)
{

	return (((br->br_prod_head + 1) & br->br_prod_mask) == br->br_cons_tail);
}

static __inline int
buf_ring_empty(struct buf_ring *br)
{

	return (br->br_cons_head == br->br_prod_tail);
}

static __inline int
buf_ring_count(struct buf_ring *br)
{

	return ((br->br_prod_size + br->br_prod_tail - br->br_cons_tail)
	    & br->br_prod_mask);
}

struct buf_ring *buf_ring_alloc(int count, struct malloc_type *type, int flags,
    struct mtx *);
void buf_ring_free(struct buf_ring *br, struct malloc_type *type);

#endif