aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMark Johnston <markj@FreeBSD.org>2023-03-01 20:21:30 +0000
committerMark Johnston <markj@FreeBSD.org>2023-03-13 15:53:16 +0000
commit762ad964ee346cffdbf3eaa6ff87fa5b32d30738 (patch)
tree3ae3a83b5d264f88cdd818d3abe3b5fa52c77a61
parentc3bd32f225ec093ba0f7cd7fc1a000b02aad5211 (diff)
downloadsrc-762ad964ee346cffdbf3eaa6ff87fa5b32d30738.tar.gz
src-762ad964ee346cffdbf3eaa6ff87fa5b32d30738.zip
epair: Simplify the transmit path and address lost wakeups
epairs currently shuttle all transmitted packets through a single global taskqueue thread. To hand packets over to the taskqueue thread, each epair maintains a pair of ring buffers and a lockless scheme for notifying the thread of pending work. The implementation can lead to lost wakeups, causing to-be-transmitted packets to end up stuck in the queue. Rather than extending the existing scheme, simply replace it with a linked list protected by a mutex, and use the mutex to synchronize wakeups of the taskqueue thread. This appears to give equivalent or better throughput with >= 16 producer threads and eliminates the lost wakeups. Reviewed by: kp MFC after: 1 week Sponsored by: Klara, Inc. Sponsored by: Modirum MDPay Differential Revision: https://reviews.freebsd.org/D38843 (cherry picked from commit df7bbd8c354a907d2c2f85a6e18f356f76458f57)
-rw-r--r--sys/net/if_epair.c162
1 files changed, 79 insertions, 83 deletions
diff --git a/sys/net/if_epair.c b/sys/net/if_epair.c
index 05667633fa42..68b68c11af06 100644
--- a/sys/net/if_epair.c
+++ b/sys/net/if_epair.c
@@ -104,15 +104,16 @@ static unsigned int next_index = 0;
#define EPAIR_LOCK() mtx_lock(&epair_n_index_mtx)
#define EPAIR_UNLOCK() mtx_unlock(&epair_n_index_mtx)
-#define BIT_QUEUE_TASK 0
-#define BIT_MBUF_QUEUED 1
-
struct epair_softc;
struct epair_queue {
+ struct mtx mtx;
+ struct mbufq q;
int id;
- struct buf_ring *rxring[2];
- volatile int ridx; /* 0 || 1 */
- volatile long state; /* taskqueue coordination */
+ enum {
+ EPAIR_QUEUE_IDLE,
+ EPAIR_QUEUE_WAKING,
+ EPAIR_QUEUE_RUNNING,
+ } state;
struct task tx_task;
struct epair_softc *sc;
};
@@ -148,44 +149,49 @@ epair_clear_mbuf(struct mbuf *m)
}
static void
-epair_if_input(struct epair_softc *sc, struct epair_queue *q, int ridx)
+epair_tx_start_deferred(void *arg, int pending)
{
- struct ifnet *ifp;
- struct mbuf *m;
+ struct epair_queue *q = (struct epair_queue *)arg;
+ if_t ifp;
+ struct mbuf *m, *n;
+ bool resched;
+
+ ifp = q->sc->ifp;
- ifp = sc->ifp;
+ if_ref(ifp);
CURVNET_SET(ifp->if_vnet);
- while (! buf_ring_empty(q->rxring[ridx])) {
- m = buf_ring_dequeue_mc(q->rxring[ridx]);
- if (m == NULL)
- continue;
- MPASS((m->m_pkthdr.csum_flags & CSUM_SND_TAG) == 0);
- (*ifp->if_input)(ifp, m);
+ mtx_lock(&q->mtx);
+ m = mbufq_flush(&q->q);
+ q->state = EPAIR_QUEUE_RUNNING;
+ mtx_unlock(&q->mtx);
+ while (m != NULL) {
+ n = STAILQ_NEXT(m, m_stailqpkt);
+ m->m_nextpkt = NULL;
+ if_input(ifp, m);
+ m = n;
}
- CURVNET_RESTORE();
-}
-static void
-epair_tx_start_deferred(void *arg, int pending)
-{
- struct epair_queue *q = (struct epair_queue *)arg;
- struct epair_softc *sc = q->sc;
- int ridx, nidx;
-
- if_ref(sc->ifp);
- ridx = atomic_load_int(&q->ridx);
- do {
- nidx = (ridx == 0) ? 1 : 0;
- } while (!atomic_fcmpset_int(&q->ridx, &ridx, nidx));
- epair_if_input(sc, q, ridx);
-
- atomic_clear_long(&q->state, (1 << BIT_QUEUE_TASK));
- if (atomic_testandclear_long(&q->state, BIT_MBUF_QUEUED))
+ /*
+ * Avoid flushing the queue more than once per task. We can otherwise
+ * end up starving ourselves in a multi-epair routing configuration.
+ */
+ mtx_lock(&q->mtx);
+ if (mbufq_len(&q->q) > 0) {
+ resched = true;
+ q->state = EPAIR_QUEUE_WAKING;
+ } else {
+ resched = false;
+ q->state = EPAIR_QUEUE_IDLE;
+ }
+ mtx_unlock(&q->mtx);
+
+ if (resched)
taskqueue_enqueue(epair_tasks.tq[q->id], &q->tx_task);
- if_rele(sc->ifp);
+ CURVNET_RESTORE();
+ if_rele(ifp);
}
static struct epair_queue *
@@ -239,9 +245,9 @@ epair_prepare_mbuf(struct mbuf *m, struct ifnet *src_ifp)
static void
epair_menq(struct mbuf *m, struct epair_softc *osc)
{
+ struct epair_queue *q;
struct ifnet *ifp, *oifp;
- int len, ret;
- int ridx;
+ int error, len;
bool mcast;
/*
@@ -257,32 +263,26 @@ epair_menq(struct mbuf *m, struct epair_softc *osc)
len = m->m_pkthdr.len;
mcast = (m->m_flags & (M_BCAST | M_MCAST)) != 0;
- struct epair_queue *q = epair_select_queue(osc, m);
+ q = epair_select_queue(osc, m);
- atomic_set_long(&q->state, (1 << BIT_MBUF_QUEUED));
- ridx = atomic_load_int(&q->ridx);
- ret = buf_ring_enqueue(q->rxring[ridx], m);
- if (ret != 0) {
- /* Ring is full. */
- if_inc_counter(ifp, IFCOUNTER_OQDROPS, 1);
- m_freem(m);
- return;
+ mtx_lock(&q->mtx);
+ if (q->state == EPAIR_QUEUE_IDLE) {
+ q->state = EPAIR_QUEUE_WAKING;
+ taskqueue_enqueue(epair_tasks.tq[q->id], &q->tx_task);
}
+ error = mbufq_enqueue(&q->q, m);
+ mtx_unlock(&q->mtx);
- if_inc_counter(ifp, IFCOUNTER_OPACKETS, 1);
- /*
- * IFQ_HANDOFF_ADJ/ip_handoff() update statistics,
- * but as we bypass all this we have to duplicate
- * the logic another time.
- */
- if_inc_counter(ifp, IFCOUNTER_OBYTES, len);
- if (mcast)
- if_inc_counter(ifp, IFCOUNTER_OMCASTS, 1);
- /* Someone else received the packet. */
- if_inc_counter(oifp, IFCOUNTER_IPACKETS, 1);
-
- if (!atomic_testandset_long(&q->state, BIT_QUEUE_TASK))
- taskqueue_enqueue(epair_tasks.tq[q->id], &q->tx_task);
+ if (error != 0) {
+ m_freem(m);
+ if_inc_counter(ifp, IFCOUNTER_OQDROPS, 1);
+ } else {
+ if_inc_counter(ifp, IFCOUNTER_OPACKETS, 1);
+ if_inc_counter(ifp, IFCOUNTER_OBYTES, len);
+ if (mcast)
+ if_inc_counter(ifp, IFCOUNTER_OMCASTS, 1);
+ if_inc_counter(oifp, IFCOUNTER_IPACKETS, 1);
+ }
}
static void
@@ -561,10 +561,9 @@ epair_clone_create(struct if_clone *ifc, char *name, size_t len, caddr_t params)
for (int i = 0; i < sca->num_queues; i++) {
struct epair_queue *q = &sca->queues[i];
q->id = i;
- q->rxring[0] = buf_ring_alloc(RXRSIZE, M_EPAIR, M_WAITOK, NULL);
- q->rxring[1] = buf_ring_alloc(RXRSIZE, M_EPAIR, M_WAITOK, NULL);
- q->ridx = 0;
- q->state = 0;
+ q->state = EPAIR_QUEUE_IDLE;
+ mtx_init(&q->mtx, "epaiq", NULL, MTX_DEF | MTX_NEW);
+ mbufq_init(&q->q, RXRSIZE);
q->sc = sca;
NET_TASK_INIT(&q->tx_task, 0, epair_tx_start_deferred, q);
}
@@ -584,10 +583,9 @@ epair_clone_create(struct if_clone *ifc, char *name, size_t len, caddr_t params)
for (int i = 0; i < scb->num_queues; i++) {
struct epair_queue *q = &scb->queues[i];
q->id = i;
- q->rxring[0] = buf_ring_alloc(RXRSIZE, M_EPAIR, M_WAITOK, NULL);
- q->rxring[1] = buf_ring_alloc(RXRSIZE, M_EPAIR, M_WAITOK, NULL);
- q->ridx = 0;
- q->state = 0;
+ q->state = EPAIR_QUEUE_IDLE;
+ mtx_init(&q->mtx, "epaiq", NULL, MTX_DEF | MTX_NEW);
+ mbufq_init(&q->q, RXRSIZE);
q->sc = scb;
NET_TASK_INIT(&q->tx_task, 0, epair_tx_start_deferred, q);
}
@@ -707,18 +705,18 @@ epair_clone_create(struct if_clone *ifc, char *name, size_t len, caddr_t params)
static void
epair_drain_rings(struct epair_softc *sc)
{
- int ridx;
- struct mbuf *m;
+ for (int i = 0; i < sc->num_queues; i++) {
+ struct epair_queue *q;
+ struct mbuf *m, *n;
- for (ridx = 0; ridx < 2; ridx++) {
- for (int i = 0; i < sc->num_queues; i++) {
- struct epair_queue *q = &sc->queues[i];
- do {
- m = buf_ring_dequeue_sc(q->rxring[ridx]);
- if (m == NULL)
- break;
- m_freem(m);
- } while (1);
+ q = &sc->queues[i];
+ mtx_lock(&q->mtx);
+ m = mbufq_flush(&q->q);
+ mtx_unlock(&q->mtx);
+
+ for (; m != NULL; m = n) {
+ n = m->m_nextpkt;
+ m_freem(m);
}
}
}
@@ -764,8 +762,7 @@ epair_clone_destroy(struct if_clone *ifc, struct ifnet *ifp)
ifmedia_removeall(&scb->media);
for (int i = 0; i < scb->num_queues; i++) {
struct epair_queue *q = &scb->queues[i];
- buf_ring_free(q->rxring[0], M_EPAIR);
- buf_ring_free(q->rxring[1], M_EPAIR);
+ mtx_destroy(&q->mtx);
}
free(scb->queues, M_EPAIR);
free(scb, M_EPAIR);
@@ -776,8 +773,7 @@ epair_clone_destroy(struct if_clone *ifc, struct ifnet *ifp)
ifmedia_removeall(&sca->media);
for (int i = 0; i < sca->num_queues; i++) {
struct epair_queue *q = &sca->queues[i];
- buf_ring_free(q->rxring[0], M_EPAIR);
- buf_ring_free(q->rxring[1], M_EPAIR);
+ mtx_destroy(&q->mtx);
}
free(sca->queues, M_EPAIR);
free(sca, M_EPAIR);