aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJohn Baldwin <jhb@FreeBSD.org>2022-02-08 00:20:06 +0000
committerJohn Baldwin <jhb@FreeBSD.org>2022-02-08 00:20:06 +0000
commitfd8f61d6e970fa443d393d330ae70c54c9a523a4 (patch)
treea9d75d7e756b4c69f4593476761b8236106043dc
parente85af89fa7613a4bb506ca6ab8ecafbfbfde782d (diff)
downloadsrc-fd8f61d6e970fa443d393d330ae70c54c9a523a4.tar.gz
src-fd8f61d6e970fa443d393d330ae70c54c9a523a4.zip
cxgbei: Dispatch sent PDUs to the NIC asynchronously.
Previously the driver was called to send PDUs to the NIC synchronously from the icl_conn_pdu_queue_cb callback. However, this performed a fair bit of work while holding the icl connection lock. Instead, change the callback to add sent PDUs to a STAILQ and defer dispatching of PDUs to the NIC to a helper thread similar to the scheme used in the TCP iSCSI backend. - Replace rx_flags int and the sole RXF_ACTIVE flag with a simple rx_active bool. - Add a pool of transmit worker threads for cxgbei. - Fix worker thread exit to depend on the wakeup in kthread_exit() to fix a race with module unload. Reported by: mav Sponsored by: Chelsio Communications
-rw-r--r--sys/dev/cxgbe/cxgbei/cxgbei.c187
-rw-r--r--sys/dev/cxgbe/cxgbei/cxgbei.h21
-rw-r--r--sys/dev/cxgbe/cxgbei/icl_cxgbei.c172
3 files changed, 260 insertions, 120 deletions
diff --git a/sys/dev/cxgbe/cxgbei/cxgbei.c b/sys/dev/cxgbe/cxgbei/cxgbei.c
index 4a8df99b3d48..c06e39005197 100644
--- a/sys/dev/cxgbe/cxgbei/cxgbei.c
+++ b/sys/dev/cxgbe/cxgbei/cxgbei.c
@@ -95,8 +95,9 @@ __FBSDID("$FreeBSD$");
#include "cxgbei.h"
static int worker_thread_count;
-static struct cxgbei_worker_thread_softc *cwt_softc;
-static struct proc *cxgbei_proc;
+static struct cxgbei_worker_thread *cwt_rx_threads, *cwt_tx_threads;
+
+static void cwt_queue_for_rx(struct icl_cxgbei_conn *icc);
static void
read_pdu_limits(struct adapter *sc, uint32_t *max_tx_data_len,
@@ -585,17 +586,9 @@ do_rx_iscsi_ddp(struct sge_iq *iq, const struct rss_header *rss, struct mbuf *m)
icl_cxgbei_new_pdu_set_conn(ip, ic);
STAILQ_INSERT_TAIL(&icc->rcvd_pdus, ip, ip_next);
- if ((icc->rx_flags & RXF_ACTIVE) == 0) {
- struct cxgbei_worker_thread_softc *cwt = &cwt_softc[icc->cwt];
-
- mtx_lock(&cwt->cwt_lock);
- icc->rx_flags |= RXF_ACTIVE;
- TAILQ_INSERT_TAIL(&cwt->rx_head, icc, rx_link);
- if (cwt->cwt_state == CWT_SLEEPING) {
- cwt->cwt_state = CWT_RUNNING;
- cv_signal(&cwt->cwt_cv);
- }
- mtx_unlock(&cwt->cwt_lock);
+ if (!icc->rx_active) {
+ icc->rx_active = true;
+ cwt_queue_for_rx(icc);
}
SOCKBUF_UNLOCK(sb);
INP_WUNLOCK(inp);
@@ -836,17 +829,9 @@ do_rx_iscsi_cmp(struct sge_iq *iq, const struct rss_header *rss, struct mbuf *m)
/* Enqueue the PDU to the received pdus queue. */
STAILQ_INSERT_TAIL(&icc->rcvd_pdus, ip, ip_next);
- if ((icc->rx_flags & RXF_ACTIVE) == 0) {
- struct cxgbei_worker_thread_softc *cwt = &cwt_softc[icc->cwt];
-
- mtx_lock(&cwt->cwt_lock);
- icc->rx_flags |= RXF_ACTIVE;
- TAILQ_INSERT_TAIL(&cwt->rx_head, icc, rx_link);
- if (cwt->cwt_state == CWT_SLEEPING) {
- cwt->cwt_state = CWT_RUNNING;
- cv_signal(&cwt->cwt_cv);
- }
- mtx_unlock(&cwt->cwt_lock);
+ if (!icc->rx_active) {
+ icc->rx_active = true;
+ cwt_queue_for_rx(icc);
}
SOCKBUF_UNLOCK(sb);
INP_WUNLOCK(inp);
@@ -944,9 +929,9 @@ static struct uld_info cxgbei_uld_info = {
};
static void
-cwt_main(void *arg)
+cwt_rx_main(void *arg)
{
- struct cxgbei_worker_thread_softc *cwt = arg;
+ struct cxgbei_worker_thread *cwt = arg;
struct icl_cxgbei_conn *icc = NULL;
struct icl_conn *ic;
struct icl_pdu *ip;
@@ -962,8 +947,8 @@ cwt_main(void *arg)
while (__predict_true(cwt->cwt_state != CWT_STOP)) {
cwt->cwt_state = CWT_RUNNING;
- while ((icc = TAILQ_FIRST(&cwt->rx_head)) != NULL) {
- TAILQ_REMOVE(&cwt->rx_head, icc, rx_link);
+ while ((icc = TAILQ_FIRST(&cwt->icc_head)) != NULL) {
+ TAILQ_REMOVE(&cwt->icc_head, icc, rx_link);
mtx_unlock(&cwt->cwt_lock);
ic = &icc->ic;
@@ -979,7 +964,7 @@ cwt_main(void *arg)
*/
parse_pdus(icc, sb);
}
- MPASS(icc->rx_flags & RXF_ACTIVE);
+ MPASS(icc->rx_active);
if (__predict_true(!(sb->sb_state & SBS_CANTRCVMORE))) {
MPASS(STAILQ_EMPTY(&rx_pdus));
STAILQ_SWAP(&icc->rcvd_pdus, &rx_pdus, icl_pdu);
@@ -994,11 +979,16 @@ cwt_main(void *arg)
SOCKBUF_LOCK(sb);
MPASS(STAILQ_EMPTY(&rx_pdus));
}
- MPASS(icc->rx_flags & RXF_ACTIVE);
+ MPASS(icc->rx_active);
if (STAILQ_EMPTY(&icc->rcvd_pdus) ||
__predict_false(sb->sb_state & SBS_CANTRCVMORE)) {
- icc->rx_flags &= ~RXF_ACTIVE;
+ icc->rx_active = false;
+ SOCKBUF_UNLOCK(sb);
+
+ mtx_lock(&cwt->cwt_lock);
} else {
+ SOCKBUF_UNLOCK(sb);
+
/*
* More PDUs were received while we were busy
* handing over the previous batch to ICL.
@@ -1006,13 +996,9 @@ cwt_main(void *arg)
* queue.
*/
mtx_lock(&cwt->cwt_lock);
- TAILQ_INSERT_TAIL(&cwt->rx_head, icc,
+ TAILQ_INSERT_TAIL(&cwt->icc_head, icc,
rx_link);
- mtx_unlock(&cwt->cwt_lock);
}
- SOCKBUF_UNLOCK(sb);
-
- mtx_lock(&cwt->cwt_lock);
}
/* Inner loop doesn't check for CWT_STOP, do that first. */
@@ -1022,84 +1008,121 @@ cwt_main(void *arg)
cv_wait(&cwt->cwt_cv, &cwt->cwt_lock);
}
- MPASS(TAILQ_FIRST(&cwt->rx_head) == NULL);
- mtx_assert(&cwt->cwt_lock, MA_OWNED);
- cwt->cwt_state = CWT_STOPPED;
- cv_signal(&cwt->cwt_cv);
+ MPASS(TAILQ_FIRST(&cwt->icc_head) == NULL);
mtx_unlock(&cwt->cwt_lock);
kthread_exit();
}
+static void
+cwt_queue_for_rx(struct icl_cxgbei_conn *icc)
+{
+ struct cxgbei_worker_thread *cwt = &cwt_rx_threads[icc->cwt];
+
+ mtx_lock(&cwt->cwt_lock);
+ TAILQ_INSERT_TAIL(&cwt->icc_head, icc, rx_link);
+ if (cwt->cwt_state == CWT_SLEEPING) {
+ cwt->cwt_state = CWT_RUNNING;
+ cv_signal(&cwt->cwt_cv);
+ }
+ mtx_unlock(&cwt->cwt_lock);
+}
+
+void
+cwt_queue_for_tx(struct icl_cxgbei_conn *icc)
+{
+ struct cxgbei_worker_thread *cwt = &cwt_tx_threads[icc->cwt];
+
+ mtx_lock(&cwt->cwt_lock);
+ TAILQ_INSERT_TAIL(&cwt->icc_head, icc, tx_link);
+ if (cwt->cwt_state == CWT_SLEEPING) {
+ cwt->cwt_state = CWT_RUNNING;
+ cv_signal(&cwt->cwt_cv);
+ }
+ mtx_unlock(&cwt->cwt_lock);
+}
+
static int
start_worker_threads(void)
{
+ struct proc *cxgbei_proc;
int i, rc;
- struct cxgbei_worker_thread_softc *cwt;
+ struct cxgbei_worker_thread *cwt;
worker_thread_count = min(mp_ncpus, 32);
- cwt_softc = malloc(worker_thread_count * sizeof(*cwt), M_CXGBE,
+ cwt_rx_threads = malloc(worker_thread_count * sizeof(*cwt), M_CXGBE,
+ M_WAITOK | M_ZERO);
+ cwt_tx_threads = malloc(worker_thread_count * sizeof(*cwt), M_CXGBE,
M_WAITOK | M_ZERO);
- MPASS(cxgbei_proc == NULL);
- for (i = 0, cwt = &cwt_softc[0]; i < worker_thread_count; i++, cwt++) {
+ for (i = 0, cwt = &cwt_rx_threads[0]; i < worker_thread_count;
+ i++, cwt++) {
+ mtx_init(&cwt->cwt_lock, "cwt lock", NULL, MTX_DEF);
+ cv_init(&cwt->cwt_cv, "cwt cv");
+ TAILQ_INIT(&cwt->icc_head);
+ }
+
+ for (i = 0, cwt = &cwt_tx_threads[0]; i < worker_thread_count;
+ i++, cwt++) {
mtx_init(&cwt->cwt_lock, "cwt lock", NULL, MTX_DEF);
cv_init(&cwt->cwt_cv, "cwt cv");
- TAILQ_INIT(&cwt->rx_head);
- rc = kproc_kthread_add(cwt_main, cwt, &cxgbei_proc, NULL, 0, 0,
- "cxgbei", "%d", i);
+ TAILQ_INIT(&cwt->icc_head);
+ }
+
+ cxgbei_proc = NULL;
+ for (i = 0, cwt = &cwt_rx_threads[0]; i < worker_thread_count;
+ i++, cwt++) {
+ rc = kproc_kthread_add(cwt_rx_main, cwt, &cxgbei_proc,
+ &cwt->cwt_td, 0, 0, "cxgbei", "rx %d", i);
if (rc != 0) {
- printf("cxgbei: failed to start thread #%d/%d (%d)\n",
+ printf("cxgbei: failed to start rx thread #%d/%d (%d)\n",
i + 1, worker_thread_count, rc);
- mtx_destroy(&cwt->cwt_lock);
- cv_destroy(&cwt->cwt_cv);
- bzero(cwt, sizeof(*cwt));
- if (i == 0) {
- free(cwt_softc, M_CXGBE);
- worker_thread_count = 0;
-
- return (rc);
- }
-
- /* Not fatal, carry on with fewer threads. */
- worker_thread_count = i;
- rc = 0;
- break;
+ return (rc);
}
+ }
- /* Wait for thread to start before moving on to the next one. */
- mtx_lock(&cwt->cwt_lock);
- while (cwt->cwt_state == 0)
- cv_wait(&cwt->cwt_cv, &cwt->cwt_lock);
- mtx_unlock(&cwt->cwt_lock);
+ for (i = 0, cwt = &cwt_tx_threads[0]; i < worker_thread_count;
+ i++, cwt++) {
+ rc = kproc_kthread_add(cwt_tx_main, cwt, &cxgbei_proc,
+ &cwt->cwt_td, 0, 0, "cxgbei", "tx %d", i);
+ if (rc != 0) {
+ printf("cxgbei: failed to start tx thread #%d/%d (%d)\n",
+ i + 1, worker_thread_count, rc);
+ return (rc);
+ }
}
- MPASS(cwt_softc != NULL);
- MPASS(worker_thread_count > 0);
return (0);
}
static void
-stop_worker_threads(void)
+stop_worker_threads1(struct cxgbei_worker_thread *threads)
{
+ struct cxgbei_worker_thread *cwt;
int i;
- struct cxgbei_worker_thread_softc *cwt = &cwt_softc[0];
- MPASS(worker_thread_count >= 0);
-
- for (i = 0, cwt = &cwt_softc[0]; i < worker_thread_count; i++, cwt++) {
+ for (i = 0, cwt = &threads[0]; i < worker_thread_count; i++, cwt++) {
mtx_lock(&cwt->cwt_lock);
- MPASS(cwt->cwt_state == CWT_RUNNING ||
- cwt->cwt_state == CWT_SLEEPING);
- cwt->cwt_state = CWT_STOP;
- cv_signal(&cwt->cwt_cv);
- do {
- cv_wait(&cwt->cwt_cv, &cwt->cwt_lock);
- } while (cwt->cwt_state != CWT_STOPPED);
+ if (cwt->cwt_td != NULL) {
+ MPASS(cwt->cwt_state == CWT_RUNNING ||
+ cwt->cwt_state == CWT_SLEEPING);
+ cwt->cwt_state = CWT_STOP;
+ cv_signal(&cwt->cwt_cv);
+ mtx_sleep(cwt->cwt_td, &cwt->cwt_lock, 0, "cwtstop", 0);
+ }
mtx_unlock(&cwt->cwt_lock);
mtx_destroy(&cwt->cwt_lock);
cv_destroy(&cwt->cwt_cv);
}
- free(cwt_softc, M_CXGBE);
+ free(threads, M_CXGBE);
+}
+
+static void
+stop_worker_threads(void)
+{
+
+ MPASS(worker_thread_count >= 0);
+ stop_worker_threads1(cwt_rx_threads);
+ stop_worker_threads1(cwt_tx_threads);
}
/* Select a worker thread for a connection. */
diff --git a/sys/dev/cxgbe/cxgbei/cxgbei.h b/sys/dev/cxgbe/cxgbei/cxgbei.h
index 58a5dac6d63b..b078f3110d62 100644
--- a/sys/dev/cxgbe/cxgbei/cxgbei.h
+++ b/sys/dev/cxgbe/cxgbei/cxgbei.h
@@ -36,23 +36,19 @@ enum {
CWT_SLEEPING = 1,
CWT_RUNNING = 2,
CWT_STOP = 3,
- CWT_STOPPED = 4,
};
-struct cxgbei_worker_thread_softc {
+struct cxgbei_worker_thread {
struct mtx cwt_lock;
struct cv cwt_cv;
volatile int cwt_state;
+ struct thread *cwt_td;
- TAILQ_HEAD(, icl_cxgbei_conn) rx_head;
+ TAILQ_HEAD(, icl_cxgbei_conn) icc_head;
} __aligned(CACHE_LINE_SIZE);
#define CXGBEI_CONN_SIGNATURE 0x56788765
-enum {
- RXF_ACTIVE = 1 << 0, /* In the worker thread's queue */
-};
-
struct cxgbei_cmp {
LIST_ENTRY(cxgbei_cmp) link;
@@ -71,16 +67,21 @@ struct icl_cxgbei_conn {
int ulp_submode;
struct adapter *sc;
struct toepcb *toep;
+ u_int cwt;
/* Receive related. */
- u_int rx_flags; /* protected by so_rcv lock */
- u_int cwt;
+ bool rx_active; /* protected by so_rcv lock */
STAILQ_HEAD(, icl_pdu) rcvd_pdus; /* protected by so_rcv lock */
TAILQ_ENTRY(icl_cxgbei_conn) rx_link; /* protected by cwt lock */
struct cxgbei_cmp_head *cmp_table; /* protected by cmp_lock */
struct mtx cmp_lock;
unsigned long cmp_hash_mask;
+
+ /* Transmit related. */
+ bool tx_active; /* protected by ic lock */
+ STAILQ_HEAD(, icl_pdu) sent_pdus; /* protected by ic lock */
+ TAILQ_ENTRY(icl_cxgbei_conn) tx_link; /* protected by cwt lock */
};
static inline struct icl_cxgbei_conn *
@@ -134,8 +135,10 @@ struct cxgbei_data {
/* cxgbei.c */
u_int cxgbei_select_worker_thread(struct icl_cxgbei_conn *);
+void cwt_queue_for_tx(struct icl_cxgbei_conn *);
/* icl_cxgbei.c */
+void cwt_tx_main(void *);
int icl_cxgbei_mod_load(void);
int icl_cxgbei_mod_unload(void);
struct icl_pdu *icl_cxgbei_new_pdu(int);
diff --git a/sys/dev/cxgbe/cxgbei/icl_cxgbei.c b/sys/dev/cxgbe/cxgbei/icl_cxgbei.c
index f66a959f6311..516ab931a49c 100644
--- a/sys/dev/cxgbe/cxgbei/icl_cxgbei.c
+++ b/sys/dev/cxgbe/cxgbei/icl_cxgbei.c
@@ -421,6 +421,128 @@ finalize_pdu(struct icl_cxgbei_conn *icc, struct icl_cxgbei_pdu *icp)
return (m);
}
+static void
+cwt_push_pdus(struct icl_cxgbei_conn *icc, struct socket *so, struct mbufq *mq)
+{
+ struct epoch_tracker et;
+ struct icl_conn *ic = &icc->ic;
+ struct toepcb *toep = icc->toep;
+ struct inpcb *inp;
+
+ /*
+ * Do not get inp from toep->inp as the toepcb might have
+ * detached already.
+ */
+ inp = sotoinpcb(so);
+ CURVNET_SET(toep->vnet);
+ NET_EPOCH_ENTER(et);
+ INP_WLOCK(inp);
+
+ ICL_CONN_UNLOCK(ic);
+ if (__predict_false(inp->inp_flags & (INP_DROPPED | INP_TIMEWAIT)) ||
+ __predict_false((toep->flags & TPF_ATTACHED) == 0)) {
+ mbufq_drain(mq);
+ } else {
+ mbufq_concat(&toep->ulp_pduq, mq);
+ t4_push_pdus(icc->sc, toep, 0);
+ }
+ INP_WUNLOCK(inp);
+ NET_EPOCH_EXIT(et);
+ CURVNET_RESTORE();
+
+ ICL_CONN_LOCK(ic);
+}
+
+void
+cwt_tx_main(void *arg)
+{
+ struct cxgbei_worker_thread *cwt = arg;
+ struct icl_cxgbei_conn *icc;
+ struct icl_conn *ic;
+ struct icl_pdu *ip;
+ struct socket *so;
+ struct mbuf *m;
+ struct mbufq mq;
+ STAILQ_HEAD(, icl_pdu) tx_pdus = STAILQ_HEAD_INITIALIZER(tx_pdus);
+
+ MPASS(cwt != NULL);
+
+ mtx_lock(&cwt->cwt_lock);
+ MPASS(cwt->cwt_state == 0);
+ cwt->cwt_state = CWT_RUNNING;
+ cv_signal(&cwt->cwt_cv);
+
+ mbufq_init(&mq, INT_MAX);
+ while (__predict_true(cwt->cwt_state != CWT_STOP)) {
+ cwt->cwt_state = CWT_RUNNING;
+ while ((icc = TAILQ_FIRST(&cwt->icc_head)) != NULL) {
+ TAILQ_REMOVE(&cwt->icc_head, icc, tx_link);
+ mtx_unlock(&cwt->cwt_lock);
+
+ ic = &icc->ic;
+
+ ICL_CONN_LOCK(ic);
+ MPASS(icc->tx_active);
+ STAILQ_SWAP(&icc->sent_pdus, &tx_pdus, icl_pdu);
+ ICL_CONN_UNLOCK(ic);
+
+ while ((ip = STAILQ_FIRST(&tx_pdus)) != NULL) {
+ STAILQ_REMOVE_HEAD(&tx_pdus, ip_next);
+
+ m = finalize_pdu(icc, ip_to_icp(ip));
+ M_ASSERTPKTHDR(m);
+ MPASS((m->m_pkthdr.len & 3) == 0);
+
+ mbufq_enqueue(&mq, m);
+ }
+
+ ICL_CONN_LOCK(ic);
+ so = ic->ic_socket;
+ if (__predict_false(ic->ic_disconnecting) ||
+ __predict_false(so == NULL)) {
+ mbufq_drain(&mq);
+ icc->tx_active = false;
+ ICL_CONN_UNLOCK(ic);
+
+ mtx_lock(&cwt->cwt_lock);
+ continue;
+ }
+
+ cwt_push_pdus(icc, so, &mq);
+
+ MPASS(icc->tx_active);
+ if (STAILQ_EMPTY(&icc->sent_pdus)) {
+ icc->tx_active = false;
+ ICL_CONN_UNLOCK(ic);
+
+ mtx_lock(&cwt->cwt_lock);
+ } else {
+ ICL_CONN_UNLOCK(ic);
+
+ /*
+ * More PDUs were queued while we were
+ * busy sending the previous batch.
+ * Re-add this connection to the end
+ * of the queue.
+ */
+ mtx_lock(&cwt->cwt_lock);
+ TAILQ_INSERT_TAIL(&cwt->icc_head, icc,
+ tx_link);
+ }
+ }
+
+ /* Inner loop doesn't check for CWT_STOP, do that first. */
+ if (__predict_false(cwt->cwt_state == CWT_STOP))
+ break;
+ cwt->cwt_state = CWT_SLEEPING;
+ cv_wait(&cwt->cwt_cv, &cwt->cwt_lock);
+ }
+
+ MPASS(TAILQ_FIRST(&cwt->icc_head) == NULL);
+ mtx_unlock(&cwt->cwt_lock);
+ kthread_exit();
+}
+
int
icl_cxgbei_conn_pdu_append_data(struct icl_conn *ic, struct icl_pdu *ip,
const void *addr, size_t len, int flags)
@@ -534,13 +656,9 @@ void
icl_cxgbei_conn_pdu_queue_cb(struct icl_conn *ic, struct icl_pdu *ip,
icl_pdu_cb cb)
{
- struct epoch_tracker et;
struct icl_cxgbei_conn *icc = ic_to_icc(ic);
struct icl_cxgbei_pdu *icp = ip_to_icp(ip);
struct socket *so = ic->ic_socket;
- struct toepcb *toep = icc->toep;
- struct inpcb *inp;
- struct mbuf *m;
MPASS(ic == ip->ip_conn);
MPASS(ip->ip_bhs_mbuf != NULL);
@@ -557,28 +675,11 @@ icl_cxgbei_conn_pdu_queue_cb(struct icl_conn *ic, struct icl_pdu *ip,
return;
}
- m = finalize_pdu(icc, icp);
- M_ASSERTPKTHDR(m);
- MPASS((m->m_pkthdr.len & 3) == 0);
-
- /*
- * Do not get inp from toep->inp as the toepcb might have detached
- * already.
- */
- inp = sotoinpcb(so);
- CURVNET_SET(toep->vnet);
- NET_EPOCH_ENTER(et);
- INP_WLOCK(inp);
- if (__predict_false(inp->inp_flags & (INP_DROPPED | INP_TIMEWAIT)) ||
- __predict_false((toep->flags & TPF_ATTACHED) == 0))
- m_freem(m);
- else {
- mbufq_enqueue(&toep->ulp_pduq, m);
- t4_push_pdus(icc->sc, toep, 0);
+ STAILQ_INSERT_TAIL(&icc->sent_pdus, ip, ip_next);
+ if (!icc->tx_active) {
+ icc->tx_active = true;
+ cwt_queue_for_tx(icc);
}
- INP_WUNLOCK(inp);
- NET_EPOCH_EXIT(et);
- CURVNET_RESTORE();
}
static struct icl_conn *
@@ -593,6 +694,7 @@ icl_cxgbei_new_conn(const char *name, struct mtx *lock)
M_WAITOK | M_ZERO);
icc->icc_signature = CXGBEI_CONN_SIGNATURE;
STAILQ_INIT(&icc->rcvd_pdus);
+ STAILQ_INIT(&icc->sent_pdus);
icc->cmp_table = hashinit(64, M_CXGBEI, &icc->cmp_hash_mask);
mtx_init(&icc->cmp_lock, "cxgbei_cmp", NULL, MTX_DEF);
@@ -935,21 +1037,33 @@ icl_cxgbei_conn_close(struct icl_conn *ic)
if (toep != NULL) { /* NULL if connection was never offloaded. */
toep->ulpcb = NULL;
+ /*
+ * Wait for the cwt threads to stop processing this
+ * connection for transmit.
+ */
+ while (icc->tx_active)
+ rw_sleep(inp, &inp->inp_lock, 0, "conclo", 1);
+
/* Discard PDUs queued for TX. */
+ while (!STAILQ_EMPTY(&icc->sent_pdus)) {
+ ip = STAILQ_FIRST(&icc->sent_pdus);
+ STAILQ_REMOVE_HEAD(&icc->sent_pdus, ip_next);
+ icl_cxgbei_pdu_done(ip, ENOTCONN);
+ }
mbufq_drain(&toep->ulp_pduq);
/*
* Wait for the cwt threads to stop processing this
- * connection.
+ * connection for receive.
*/
SOCKBUF_LOCK(sb);
- if (icc->rx_flags & RXF_ACTIVE) {
- volatile u_int *p = &icc->rx_flags;
+ if (icc->rx_active) {
+ volatile bool *p = &icc->rx_active;
SOCKBUF_UNLOCK(sb);
INP_WUNLOCK(inp);
- while (*p & RXF_ACTIVE)
+ while (*p)
pause("conclo", 1);
INP_WLOCK(inp);