aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJohn Baldwin <jhb@FreeBSD.org>2022-02-08 00:20:40 +0000
committerJohn Baldwin <jhb@FreeBSD.org>2022-02-08 00:20:40 +0000
commit511b83b167b02608a1f857fa2e0cc5fb3218e09d (patch)
treeddb441e354e8ee68ec001f046f956cea28e0f658
parentfd8f61d6e970fa443d393d330ae70c54c9a523a4 (diff)
downloadsrc-511b83b167b02608a1f857fa2e0cc5fb3218e09d.tar.gz
src-511b83b167b02608a1f857fa2e0cc5fb3218e09d.zip
cxgbei: Replace worker thread pools with per-connection kthreads.
Having a single pool of worker threads adds extra complexity and overhead. The software backend also uses per-connection kthreads. Sponsored by: Chelsio Communications
-rw-r--r--sys/dev/cxgbe/cxgbei/cxgbei.c237
-rw-r--r--sys/dev/cxgbe/cxgbei/cxgbei.h22
-rw-r--r--sys/dev/cxgbe/cxgbei/icl_cxgbei.c320
3 files changed, 178 insertions, 401 deletions
diff --git a/sys/dev/cxgbe/cxgbei/cxgbei.c b/sys/dev/cxgbe/cxgbei/cxgbei.c
index c06e39005197..979feace81dd 100644
--- a/sys/dev/cxgbe/cxgbei/cxgbei.c
+++ b/sys/dev/cxgbe/cxgbei/cxgbei.c
@@ -94,11 +94,6 @@ __FBSDID("$FreeBSD$");
#include "tom/t4_tom.h"
#include "cxgbei.h"
-static int worker_thread_count;
-static struct cxgbei_worker_thread *cwt_rx_threads, *cwt_tx_threads;
-
-static void cwt_queue_for_rx(struct icl_cxgbei_conn *icc);
-
static void
read_pdu_limits(struct adapter *sc, uint32_t *max_tx_data_len,
uint32_t *max_rx_data_len, struct ppod_region *pr)
@@ -424,7 +419,7 @@ parse_pdu(struct socket *so, struct toepcb *toep, struct icl_cxgbei_conn *icc,
return (ip);
}
-static void
+void
parse_pdus(struct icl_cxgbei_conn *icc, struct sockbuf *sb)
{
struct icl_conn *ic = &icc->ic;
@@ -588,7 +583,7 @@ do_rx_iscsi_ddp(struct sge_iq *iq, const struct rss_header *rss, struct mbuf *m)
STAILQ_INSERT_TAIL(&icc->rcvd_pdus, ip, ip_next);
if (!icc->rx_active) {
icc->rx_active = true;
- cwt_queue_for_rx(icc);
+ wakeup(&icc->rx_active);
}
SOCKBUF_UNLOCK(sb);
INP_WUNLOCK(inp);
@@ -831,7 +826,7 @@ do_rx_iscsi_cmp(struct sge_iq *iq, const struct rss_header *rss, struct mbuf *m)
STAILQ_INSERT_TAIL(&icc->rcvd_pdus, ip, ip_next);
if (!icc->rx_active) {
icc->rx_active = true;
- cwt_queue_for_rx(icc);
+ wakeup(&icc->rx_active);
}
SOCKBUF_UNLOCK(sb);
INP_WUNLOCK(inp);
@@ -928,222 +923,6 @@ static struct uld_info cxgbei_uld_info = {
.deactivate = cxgbei_deactivate,
};
-static void
-cwt_rx_main(void *arg)
-{
- struct cxgbei_worker_thread *cwt = arg;
- struct icl_cxgbei_conn *icc = NULL;
- struct icl_conn *ic;
- struct icl_pdu *ip;
- struct sockbuf *sb;
- STAILQ_HEAD(, icl_pdu) rx_pdus = STAILQ_HEAD_INITIALIZER(rx_pdus);
-
- MPASS(cwt != NULL);
-
- mtx_lock(&cwt->cwt_lock);
- MPASS(cwt->cwt_state == 0);
- cwt->cwt_state = CWT_RUNNING;
- cv_signal(&cwt->cwt_cv);
-
- while (__predict_true(cwt->cwt_state != CWT_STOP)) {
- cwt->cwt_state = CWT_RUNNING;
- while ((icc = TAILQ_FIRST(&cwt->icc_head)) != NULL) {
- TAILQ_REMOVE(&cwt->icc_head, icc, rx_link);
- mtx_unlock(&cwt->cwt_lock);
-
- ic = &icc->ic;
- sb = &ic->ic_socket->so_rcv;
-
- SOCKBUF_LOCK(sb);
- if (__predict_false(sbused(sb)) != 0) {
- /*
- * PDUs were received before the tid
- * transitioned to ULP mode. Convert
- * them to icl_cxgbei_pdus and insert
- * them into the head of rcvd_pdus.
- */
- parse_pdus(icc, sb);
- }
- MPASS(icc->rx_active);
- if (__predict_true(!(sb->sb_state & SBS_CANTRCVMORE))) {
- MPASS(STAILQ_EMPTY(&rx_pdus));
- STAILQ_SWAP(&icc->rcvd_pdus, &rx_pdus, icl_pdu);
- SOCKBUF_UNLOCK(sb);
-
- /* Hand over PDUs to ICL. */
- while ((ip = STAILQ_FIRST(&rx_pdus)) != NULL) {
- STAILQ_REMOVE_HEAD(&rx_pdus, ip_next);
- ic->ic_receive(ip);
- }
-
- SOCKBUF_LOCK(sb);
- MPASS(STAILQ_EMPTY(&rx_pdus));
- }
- MPASS(icc->rx_active);
- if (STAILQ_EMPTY(&icc->rcvd_pdus) ||
- __predict_false(sb->sb_state & SBS_CANTRCVMORE)) {
- icc->rx_active = false;
- SOCKBUF_UNLOCK(sb);
-
- mtx_lock(&cwt->cwt_lock);
- } else {
- SOCKBUF_UNLOCK(sb);
-
- /*
- * More PDUs were received while we were busy
- * handing over the previous batch to ICL.
- * Re-add this connection to the end of the
- * queue.
- */
- mtx_lock(&cwt->cwt_lock);
- TAILQ_INSERT_TAIL(&cwt->icc_head, icc,
- rx_link);
- }
- }
-
- /* Inner loop doesn't check for CWT_STOP, do that first. */
- if (__predict_false(cwt->cwt_state == CWT_STOP))
- break;
- cwt->cwt_state = CWT_SLEEPING;
- cv_wait(&cwt->cwt_cv, &cwt->cwt_lock);
- }
-
- MPASS(TAILQ_FIRST(&cwt->icc_head) == NULL);
- mtx_unlock(&cwt->cwt_lock);
- kthread_exit();
-}
-
-static void
-cwt_queue_for_rx(struct icl_cxgbei_conn *icc)
-{
- struct cxgbei_worker_thread *cwt = &cwt_rx_threads[icc->cwt];
-
- mtx_lock(&cwt->cwt_lock);
- TAILQ_INSERT_TAIL(&cwt->icc_head, icc, rx_link);
- if (cwt->cwt_state == CWT_SLEEPING) {
- cwt->cwt_state = CWT_RUNNING;
- cv_signal(&cwt->cwt_cv);
- }
- mtx_unlock(&cwt->cwt_lock);
-}
-
-void
-cwt_queue_for_tx(struct icl_cxgbei_conn *icc)
-{
- struct cxgbei_worker_thread *cwt = &cwt_tx_threads[icc->cwt];
-
- mtx_lock(&cwt->cwt_lock);
- TAILQ_INSERT_TAIL(&cwt->icc_head, icc, tx_link);
- if (cwt->cwt_state == CWT_SLEEPING) {
- cwt->cwt_state = CWT_RUNNING;
- cv_signal(&cwt->cwt_cv);
- }
- mtx_unlock(&cwt->cwt_lock);
-}
-
-static int
-start_worker_threads(void)
-{
- struct proc *cxgbei_proc;
- int i, rc;
- struct cxgbei_worker_thread *cwt;
-
- worker_thread_count = min(mp_ncpus, 32);
- cwt_rx_threads = malloc(worker_thread_count * sizeof(*cwt), M_CXGBE,
- M_WAITOK | M_ZERO);
- cwt_tx_threads = malloc(worker_thread_count * sizeof(*cwt), M_CXGBE,
- M_WAITOK | M_ZERO);
-
- for (i = 0, cwt = &cwt_rx_threads[0]; i < worker_thread_count;
- i++, cwt++) {
- mtx_init(&cwt->cwt_lock, "cwt lock", NULL, MTX_DEF);
- cv_init(&cwt->cwt_cv, "cwt cv");
- TAILQ_INIT(&cwt->icc_head);
- }
-
- for (i = 0, cwt = &cwt_tx_threads[0]; i < worker_thread_count;
- i++, cwt++) {
- mtx_init(&cwt->cwt_lock, "cwt lock", NULL, MTX_DEF);
- cv_init(&cwt->cwt_cv, "cwt cv");
- TAILQ_INIT(&cwt->icc_head);
- }
-
- cxgbei_proc = NULL;
- for (i = 0, cwt = &cwt_rx_threads[0]; i < worker_thread_count;
- i++, cwt++) {
- rc = kproc_kthread_add(cwt_rx_main, cwt, &cxgbei_proc,
- &cwt->cwt_td, 0, 0, "cxgbei", "rx %d", i);
- if (rc != 0) {
- printf("cxgbei: failed to start rx thread #%d/%d (%d)\n",
- i + 1, worker_thread_count, rc);
- return (rc);
- }
- }
-
- for (i = 0, cwt = &cwt_tx_threads[0]; i < worker_thread_count;
- i++, cwt++) {
- rc = kproc_kthread_add(cwt_tx_main, cwt, &cxgbei_proc,
- &cwt->cwt_td, 0, 0, "cxgbei", "tx %d", i);
- if (rc != 0) {
- printf("cxgbei: failed to start tx thread #%d/%d (%d)\n",
- i + 1, worker_thread_count, rc);
- return (rc);
- }
- }
-
- return (0);
-}
-
-static void
-stop_worker_threads1(struct cxgbei_worker_thread *threads)
-{
- struct cxgbei_worker_thread *cwt;
- int i;
-
- for (i = 0, cwt = &threads[0]; i < worker_thread_count; i++, cwt++) {
- mtx_lock(&cwt->cwt_lock);
- if (cwt->cwt_td != NULL) {
- MPASS(cwt->cwt_state == CWT_RUNNING ||
- cwt->cwt_state == CWT_SLEEPING);
- cwt->cwt_state = CWT_STOP;
- cv_signal(&cwt->cwt_cv);
- mtx_sleep(cwt->cwt_td, &cwt->cwt_lock, 0, "cwtstop", 0);
- }
- mtx_unlock(&cwt->cwt_lock);
- mtx_destroy(&cwt->cwt_lock);
- cv_destroy(&cwt->cwt_cv);
- }
- free(threads, M_CXGBE);
-}
-
-static void
-stop_worker_threads(void)
-{
-
- MPASS(worker_thread_count >= 0);
- stop_worker_threads1(cwt_rx_threads);
- stop_worker_threads1(cwt_tx_threads);
-}
-
-/* Select a worker thread for a connection. */
-u_int
-cxgbei_select_worker_thread(struct icl_cxgbei_conn *icc)
-{
- struct adapter *sc = icc->sc;
- struct toepcb *toep = icc->toep;
- u_int i, n;
-
- n = worker_thread_count / sc->sge.nofldrxq;
- if (n > 0)
- i = toep->vi->pi->port_id * n + arc4random() % n;
- else
- i = arc4random() % worker_thread_count;
-
- CTR3(KTR_CXGBE, "%s: tid %u, cwt %u", __func__, toep->tid, i);
-
- return (i);
-}
-
static int
cxgbei_mod_load(void)
{
@@ -1154,15 +933,9 @@ cxgbei_mod_load(void)
t4_register_cpl_handler(CPL_RX_ISCSI_DDP, do_rx_iscsi_ddp);
t4_register_cpl_handler(CPL_RX_ISCSI_CMP, do_rx_iscsi_cmp);
- rc = start_worker_threads();
- if (rc != 0)
- return (rc);
-
rc = t4_register_uld(&cxgbei_uld_info);
- if (rc != 0) {
- stop_worker_threads();
+ if (rc != 0)
return (rc);
- }
t4_iterate(cxgbei_activate_all, NULL);
@@ -1178,8 +951,6 @@ cxgbei_mod_unload(void)
if (t4_unregister_uld(&cxgbei_uld_info) == EBUSY)
return (EBUSY);
- stop_worker_threads();
-
t4_register_cpl_handler(CPL_ISCSI_HDR, NULL);
t4_register_cpl_handler(CPL_ISCSI_DATA, NULL);
t4_register_cpl_handler(CPL_RX_ISCSI_DDP, NULL);
diff --git a/sys/dev/cxgbe/cxgbei/cxgbei.h b/sys/dev/cxgbe/cxgbei/cxgbei.h
index b078f3110d62..d0e423ce5b2f 100644
--- a/sys/dev/cxgbe/cxgbei/cxgbei.h
+++ b/sys/dev/cxgbe/cxgbei/cxgbei.h
@@ -32,21 +32,6 @@
#include <dev/iscsi/icl.h>
-enum {
- CWT_SLEEPING = 1,
- CWT_RUNNING = 2,
- CWT_STOP = 3,
-};
-
-struct cxgbei_worker_thread {
- struct mtx cwt_lock;
- struct cv cwt_cv;
- volatile int cwt_state;
- struct thread *cwt_td;
-
- TAILQ_HEAD(, icl_cxgbei_conn) icc_head;
-} __aligned(CACHE_LINE_SIZE);
-
#define CXGBEI_CONN_SIGNATURE 0x56788765
struct cxgbei_cmp {
@@ -67,12 +52,12 @@ struct icl_cxgbei_conn {
int ulp_submode;
struct adapter *sc;
struct toepcb *toep;
- u_int cwt;
/* Receive related. */
bool rx_active; /* protected by so_rcv lock */
+ bool rx_exiting; /* protected by so_rcv lock */
STAILQ_HEAD(, icl_pdu) rcvd_pdus; /* protected by so_rcv lock */
- TAILQ_ENTRY(icl_cxgbei_conn) rx_link; /* protected by cwt lock */
+ struct thread *rx_thread;
struct cxgbei_cmp_head *cmp_table; /* protected by cmp_lock */
struct mtx cmp_lock;
@@ -81,7 +66,7 @@ struct icl_cxgbei_conn {
/* Transmit related. */
bool tx_active; /* protected by ic lock */
STAILQ_HEAD(, icl_pdu) sent_pdus; /* protected by ic lock */
- TAILQ_ENTRY(icl_cxgbei_conn) tx_link; /* protected by cwt lock */
+ struct thread *tx_thread;
};
static inline struct icl_cxgbei_conn *
@@ -136,6 +121,7 @@ struct cxgbei_data {
/* cxgbei.c */
u_int cxgbei_select_worker_thread(struct icl_cxgbei_conn *);
void cwt_queue_for_tx(struct icl_cxgbei_conn *);
+void parse_pdus(struct icl_cxgbei_conn *, struct sockbuf *);
/* icl_cxgbei.c */
void cwt_tx_main(void *);
diff --git a/sys/dev/cxgbe/cxgbei/icl_cxgbei.c b/sys/dev/cxgbe/cxgbei/icl_cxgbei.c
index 516ab931a49c..296d4f2d270a 100644
--- a/sys/dev/cxgbe/cxgbei/icl_cxgbei.c
+++ b/sys/dev/cxgbe/cxgbei/icl_cxgbei.c
@@ -422,124 +422,133 @@ finalize_pdu(struct icl_cxgbei_conn *icc, struct icl_cxgbei_pdu *icp)
}
static void
-cwt_push_pdus(struct icl_cxgbei_conn *icc, struct socket *so, struct mbufq *mq)
+icl_cxgbei_tx_main(void *arg)
{
struct epoch_tracker et;
+ struct icl_cxgbei_conn *icc = arg;
struct icl_conn *ic = &icc->ic;
struct toepcb *toep = icc->toep;
- struct inpcb *inp;
-
- /*
- * Do not get inp from toep->inp as the toepcb might have
- * detached already.
- */
- inp = sotoinpcb(so);
- CURVNET_SET(toep->vnet);
- NET_EPOCH_ENTER(et);
- INP_WLOCK(inp);
-
- ICL_CONN_UNLOCK(ic);
- if (__predict_false(inp->inp_flags & (INP_DROPPED | INP_TIMEWAIT)) ||
- __predict_false((toep->flags & TPF_ATTACHED) == 0)) {
- mbufq_drain(mq);
- } else {
- mbufq_concat(&toep->ulp_pduq, mq);
- t4_push_pdus(icc->sc, toep, 0);
- }
- INP_WUNLOCK(inp);
- NET_EPOCH_EXIT(et);
- CURVNET_RESTORE();
-
- ICL_CONN_LOCK(ic);
-}
-
-void
-cwt_tx_main(void *arg)
-{
- struct cxgbei_worker_thread *cwt = arg;
- struct icl_cxgbei_conn *icc;
- struct icl_conn *ic;
+ struct socket *so = ic->ic_socket;
+ struct inpcb *inp = sotoinpcb(so);
struct icl_pdu *ip;
- struct socket *so;
struct mbuf *m;
struct mbufq mq;
STAILQ_HEAD(, icl_pdu) tx_pdus = STAILQ_HEAD_INITIALIZER(tx_pdus);
- MPASS(cwt != NULL);
+ mbufq_init(&mq, INT_MAX);
- mtx_lock(&cwt->cwt_lock);
- MPASS(cwt->cwt_state == 0);
- cwt->cwt_state = CWT_RUNNING;
- cv_signal(&cwt->cwt_cv);
+ ICL_CONN_LOCK(ic);
+ while (__predict_true(!ic->ic_disconnecting)) {
+ while (STAILQ_EMPTY(&icc->sent_pdus)) {
+ icc->tx_active = false;
+ mtx_sleep(&icc->tx_active, ic->ic_lock, 0, "-", 0);
+ if (__predict_false(ic->ic_disconnecting))
+ goto out;
+ MPASS(icc->tx_active);
+ }
- mbufq_init(&mq, INT_MAX);
- while (__predict_true(cwt->cwt_state != CWT_STOP)) {
- cwt->cwt_state = CWT_RUNNING;
- while ((icc = TAILQ_FIRST(&cwt->icc_head)) != NULL) {
- TAILQ_REMOVE(&cwt->icc_head, icc, tx_link);
- mtx_unlock(&cwt->cwt_lock);
+ STAILQ_SWAP(&icc->sent_pdus, &tx_pdus, icl_pdu);
+ ICL_CONN_UNLOCK(ic);
- ic = &icc->ic;
+ while ((ip = STAILQ_FIRST(&tx_pdus)) != NULL) {
+ STAILQ_REMOVE_HEAD(&tx_pdus, ip_next);
- ICL_CONN_LOCK(ic);
- MPASS(icc->tx_active);
- STAILQ_SWAP(&icc->sent_pdus, &tx_pdus, icl_pdu);
- ICL_CONN_UNLOCK(ic);
+ m = finalize_pdu(icc, ip_to_icp(ip));
+ M_ASSERTPKTHDR(m);
+ MPASS((m->m_pkthdr.len & 3) == 0);
- while ((ip = STAILQ_FIRST(&tx_pdus)) != NULL) {
- STAILQ_REMOVE_HEAD(&tx_pdus, ip_next);
+ mbufq_enqueue(&mq, m);
+ }
- m = finalize_pdu(icc, ip_to_icp(ip));
- M_ASSERTPKTHDR(m);
- MPASS((m->m_pkthdr.len & 3) == 0);
+ ICL_CONN_LOCK(ic);
+ if (__predict_false(ic->ic_disconnecting) ||
+ __predict_false(ic->ic_socket == NULL)) {
+ mbufq_drain(&mq);
+ break;
+ }
- mbufq_enqueue(&mq, m);
- }
+ CURVNET_SET(toep->vnet);
+ NET_EPOCH_ENTER(et);
+ INP_WLOCK(inp);
- ICL_CONN_LOCK(ic);
- so = ic->ic_socket;
- if (__predict_false(ic->ic_disconnecting) ||
- __predict_false(so == NULL)) {
- mbufq_drain(&mq);
- icc->tx_active = false;
- ICL_CONN_UNLOCK(ic);
+ ICL_CONN_UNLOCK(ic);
+ if (__predict_false(inp->inp_flags & (INP_DROPPED |
+ INP_TIMEWAIT)) ||
+ __predict_false((toep->flags & TPF_ATTACHED) == 0)) {
+ mbufq_drain(&mq);
+ } else {
+ mbufq_concat(&toep->ulp_pduq, &mq);
+ t4_push_pdus(icc->sc, toep, 0);
+ }
+ INP_WUNLOCK(inp);
+ NET_EPOCH_EXIT(et);
+ CURVNET_RESTORE();
- mtx_lock(&cwt->cwt_lock);
- continue;
- }
+ ICL_CONN_LOCK(ic);
+ }
+out:
+ ICL_CONN_UNLOCK(ic);
- cwt_push_pdus(icc, so, &mq);
+ kthread_exit();
+}
- MPASS(icc->tx_active);
- if (STAILQ_EMPTY(&icc->sent_pdus)) {
- icc->tx_active = false;
- ICL_CONN_UNLOCK(ic);
-
- mtx_lock(&cwt->cwt_lock);
- } else {
- ICL_CONN_UNLOCK(ic);
-
- /*
- * More PDUs were queued while we were
- * busy sending the previous batch.
- * Re-add this connection to the end
- * of the queue.
- */
- mtx_lock(&cwt->cwt_lock);
- TAILQ_INSERT_TAIL(&cwt->icc_head, icc,
- tx_link);
- }
+static void
+icl_cxgbei_rx_main(void *arg)
+{
+ struct icl_cxgbei_conn *icc = arg;
+ struct icl_conn *ic = &icc->ic;
+ struct icl_pdu *ip;
+ struct sockbuf *sb;
+ STAILQ_HEAD(, icl_pdu) rx_pdus = STAILQ_HEAD_INITIALIZER(rx_pdus);
+ bool cantrcvmore;
+
+ sb = &ic->ic_socket->so_rcv;
+ SOCKBUF_LOCK(sb);
+ while (__predict_true(!ic->ic_disconnecting)) {
+ while (STAILQ_EMPTY(&icc->rcvd_pdus)) {
+ icc->rx_active = false;
+ mtx_sleep(&icc->rx_active, SOCKBUF_MTX(sb), 0, "-", 0);
+ if (__predict_false(ic->ic_disconnecting))
+ goto out;
+ MPASS(icc->rx_active);
}
- /* Inner loop doesn't check for CWT_STOP, do that first. */
- if (__predict_false(cwt->cwt_state == CWT_STOP))
- break;
- cwt->cwt_state = CWT_SLEEPING;
- cv_wait(&cwt->cwt_cv, &cwt->cwt_lock);
+ if (__predict_false(sbused(sb)) != 0) {
+ /*
+ * PDUs were received before the tid
+ * transitioned to ULP mode. Convert
+ * them to icl_cxgbei_pdus and insert
+ * them into the head of rcvd_pdus.
+ */
+ parse_pdus(icc, sb);
+ }
+ cantrcvmore = (sb->sb_state & SBS_CANTRCVMORE) != 0;
+ MPASS(STAILQ_EMPTY(&rx_pdus));
+ STAILQ_SWAP(&icc->rcvd_pdus, &rx_pdus, icl_pdu);
+ SOCKBUF_UNLOCK(sb);
+
+ /* Hand over PDUs to ICL. */
+ while ((ip = STAILQ_FIRST(&rx_pdus)) != NULL) {
+ STAILQ_REMOVE_HEAD(&rx_pdus, ip_next);
+ if (cantrcvmore)
+ icl_cxgbei_pdu_done(ip, ENOTCONN);
+ else
+ ic->ic_receive(ip);
+ }
+
+ SOCKBUF_LOCK(sb);
}
+out:
+ /*
+ * Since ic_disconnecting is set before the SOCKBUF_MTX is
+ * locked in icl_cxgbei_conn_close, the loop above can exit
+ * before icl_cxgbei_conn_close can lock SOCKBUF_MTX and block
+ * waiting for the thread exit.
+ */
+ while (!icc->rx_exiting)
+ mtx_sleep(&icc->rx_active, SOCKBUF_MTX(sb), 0, "-", 0);
+ SOCKBUF_UNLOCK(sb);
- MPASS(TAILQ_FIRST(&cwt->icc_head) == NULL);
- mtx_unlock(&cwt->cwt_lock);
kthread_exit();
}
@@ -678,7 +687,7 @@ icl_cxgbei_conn_pdu_queue_cb(struct icl_conn *ic, struct icl_pdu *ip,
STAILQ_INSERT_TAIL(&icc->sent_pdus, ip, ip_next);
if (!icc->tx_active) {
icc->tx_active = true;
- cwt_queue_for_tx(icc);
+ wakeup(&icc->tx_active);
}
}
@@ -740,10 +749,8 @@ icl_cxgbei_setsockopt(struct icl_conn *ic, struct socket *so, int sspace,
rs = max(recvspace, rspace);
error = soreserve(so, ss, rs);
- if (error != 0) {
- icl_cxgbei_conn_close(ic);
+ if (error != 0)
return (error);
- }
SOCKBUF_LOCK(&so->so_snd);
so->so_snd.sb_flags |= SB_AUTOSIZE;
SOCKBUF_UNLOCK(&so->so_snd);
@@ -761,10 +768,8 @@ icl_cxgbei_setsockopt(struct icl_conn *ic, struct socket *so, int sspace,
opt.sopt_val = &one;
opt.sopt_valsize = sizeof(one);
error = sosetopt(so, &opt);
- if (error != 0) {
- icl_cxgbei_conn_close(ic);
+ if (error != 0)
return (error);
- }
return (0);
}
@@ -934,8 +939,10 @@ icl_cxgbei_conn_handoff(struct icl_conn *ic, int fd)
fa.sc = NULL;
fa.so = so;
t4_iterate(find_offload_adapter, &fa);
- if (fa.sc == NULL)
- return (EINVAL);
+ if (fa.sc == NULL) {
+ error = EINVAL;
+ goto out;
+ }
icc->sc = fa.sc;
max_rx_pdu_len = ISCSI_BHS_SIZE + ic->ic_max_recv_data_segment_length;
@@ -954,7 +961,8 @@ icl_cxgbei_conn_handoff(struct icl_conn *ic, int fd)
tp = intotcpcb(inp);
if (inp->inp_flags & (INP_DROPPED | INP_TIMEWAIT)) {
INP_WUNLOCK(inp);
- return (EBUSY);
+ error = ENOTCONN;
+ goto out;
}
/*
@@ -968,11 +976,11 @@ icl_cxgbei_conn_handoff(struct icl_conn *ic, int fd)
if (ulp_mode(toep) != ULP_MODE_NONE) {
INP_WUNLOCK(inp);
- return (EINVAL);
+ error = EINVAL;
+ goto out;
}
icc->toep = toep;
- icc->cwt = cxgbei_select_worker_thread(icc);
icc->ulp_submode = 0;
if (ic->ic_header_crc32c)
@@ -996,7 +1004,21 @@ icl_cxgbei_conn_handoff(struct icl_conn *ic, int fd)
set_ulp_mode_iscsi(icc->sc, toep, icc->ulp_submode);
INP_WUNLOCK(inp);
- return (icl_cxgbei_setsockopt(ic, so, max_tx_pdu_len, max_rx_pdu_len));
+ error = kthread_add(icl_cxgbei_tx_main, icc, NULL, &icc->tx_thread, 0,
+ 0, "%stx (cxgbei)", ic->ic_name);
+ if (error != 0)
+ goto out;
+
+ error = kthread_add(icl_cxgbei_rx_main, icc, NULL, &icc->rx_thread, 0,
+ 0, "%srx (cxgbei)", ic->ic_name);
+ if (error != 0)
+ goto out;
+
+ error = icl_cxgbei_setsockopt(ic, so, max_tx_pdu_len, max_rx_pdu_len);
+out:
+ if (error != 0)
+ icl_cxgbei_conn_close(ic);
+ return (error);
}
void
@@ -1027,61 +1049,59 @@ icl_cxgbei_conn_close(struct icl_conn *ic)
("destroying session with %d outstanding PDUs",
ic->ic_outstanding_pdus));
#endif
- ICL_CONN_UNLOCK(ic);
CTR3(KTR_CXGBE, "%s: tid %d, icc %p", __func__, toep ? toep->tid : -1,
icc);
+
+ /*
+ * Wait for the transmit thread to stop processing
+ * this connection.
+ */
+ if (icc->tx_thread != NULL) {
+ wakeup(&icc->tx_active);
+ mtx_sleep(icc->tx_thread, ic->ic_lock, 0, "conclo", 0);
+ }
+
+ /* Discard PDUs queued for TX. */
+ while (!STAILQ_EMPTY(&icc->sent_pdus)) {
+ ip = STAILQ_FIRST(&icc->sent_pdus);
+ STAILQ_REMOVE_HEAD(&icc->sent_pdus, ip_next);
+ icl_cxgbei_pdu_done(ip, ENOTCONN);
+ }
+ ICL_CONN_UNLOCK(ic);
+
inp = sotoinpcb(so);
sb = &so->so_rcv;
+
+ /*
+ * Wait for the receive thread to stop processing this
+ * connection.
+ */
+ SOCKBUF_LOCK(sb);
+ if (icc->rx_thread != NULL) {
+ icc->rx_exiting = true;
+ wakeup(&icc->rx_active);
+ mtx_sleep(icc->rx_thread, SOCKBUF_MTX(sb), 0, "conclo", 0);
+ }
+
+ /*
+ * Discard received PDUs not passed to the iSCSI layer.
+ */
+ while (!STAILQ_EMPTY(&icc->rcvd_pdus)) {
+ ip = STAILQ_FIRST(&icc->rcvd_pdus);
+ STAILQ_REMOVE_HEAD(&icc->rcvd_pdus, ip_next);
+ icl_cxgbei_pdu_done(ip, ENOTCONN);
+ }
+ SOCKBUF_UNLOCK(sb);
+
INP_WLOCK(inp);
if (toep != NULL) { /* NULL if connection was never offloaded. */
toep->ulpcb = NULL;
- /*
- * Wait for the cwt threads to stop processing this
- * connection for transmit.
- */
- while (icc->tx_active)
- rw_sleep(inp, &inp->inp_lock, 0, "conclo", 1);
-
- /* Discard PDUs queued for TX. */
- while (!STAILQ_EMPTY(&icc->sent_pdus)) {
- ip = STAILQ_FIRST(&icc->sent_pdus);
- STAILQ_REMOVE_HEAD(&icc->sent_pdus, ip_next);
- icl_cxgbei_pdu_done(ip, ENOTCONN);
- }
+ /* Discard mbufs queued for TX. */
mbufq_drain(&toep->ulp_pduq);
/*
- * Wait for the cwt threads to stop processing this
- * connection for receive.
- */
- SOCKBUF_LOCK(sb);
- if (icc->rx_active) {
- volatile bool *p = &icc->rx_active;
-
- SOCKBUF_UNLOCK(sb);
- INP_WUNLOCK(inp);
-
- while (*p)
- pause("conclo", 1);
-
- INP_WLOCK(inp);
- SOCKBUF_LOCK(sb);
- }
-
- /*
- * Discard received PDUs not passed to the iSCSI
- * layer.
- */
- while (!STAILQ_EMPTY(&icc->rcvd_pdus)) {
- ip = STAILQ_FIRST(&icc->rcvd_pdus);
- STAILQ_REMOVE_HEAD(&icc->rcvd_pdus, ip_next);
- icl_cxgbei_pdu_done(ip, ENOTCONN);
- }
- SOCKBUF_UNLOCK(sb);
-
- /*
* Grab a reference to use when waiting for the final
* CPL to be received. If toep->inp is NULL, then
* final_cpl_received() has already been called (e.g.