aboutsummaryrefslogtreecommitdiff
path: root/sys/dev/ntb/if_ntb/if_ntb.c
diff options
context:
space:
mode:
authorConrad Meyer <cem@FreeBSD.org>2015-10-20 19:20:42 +0000
committerConrad Meyer <cem@FreeBSD.org>2015-10-20 19:20:42 +0000
commit4994fe1295653f4fcb2dcb31e5ca7796f3c2b7ef (patch)
treefba215b8b949557d2b4986168726855722c721d8 /sys/dev/ntb/if_ntb/if_ntb.c
parent8a26cf17c641bdcb71d98b3e8b646e34b6830aad (diff)
downloadsrc-4994fe1295653f4fcb2dcb31e5ca7796f3c2b7ef.tar.gz
src-4994fe1295653f4fcb2dcb31e5ca7796f3c2b7ef.zip
NTB: MFV da2e5ae5: Fix ntb_transport out-of-order RX update
It was possible for a synchronous update of the RX index in the error case to get ahead of the asynchronous RX index update in the normal case. Change the RX processing to preserve an RX completion order. There were two error cases. First, if a buffer is not present to receive data, there would be no queue entry to preserve the RX completion order. Instead of dropping the RX frame, leave the RX frame in the ring. Schedule RX processing when RX entries are enqueued, in case there are RX frames waiting in the ring to be received. Second, if a buffer is too small to receive data, drop the frame in the ring, mark the RX entry as done, and indicate the error in the RX entry length. Check for a negative length in the receive callback in ntb_netdev, and count occurrences as rx_length_errors. Authored by: Allen Hubbe Obtained from: Linux (Dual BSD/GPL driver) Sponsored by: EMC / Isilon Storage Division
Notes
Notes: svn path=/head/; revision=289651
Diffstat (limited to 'sys/dev/ntb/if_ntb/if_ntb.c')
-rw-r--r--sys/dev/ntb/if_ntb/if_ntb.c118
1 files changed, 74 insertions, 44 deletions
diff --git a/sys/dev/ntb/if_ntb/if_ntb.c b/sys/dev/ntb/if_ntb/if_ntb.c
index 40806ea96443..d2bfb59f0a0c 100644
--- a/sys/dev/ntb/if_ntb/if_ntb.c
+++ b/sys/dev/ntb/if_ntb/if_ntb.c
@@ -149,10 +149,11 @@ struct ntb_transport_qp {
void (*rx_handler)(struct ntb_transport_qp *qp, void *qp_data,
void *data, int len);
+ struct ntb_queue_list rx_post_q;
struct ntb_queue_list rx_pend_q;
struct ntb_queue_list rx_free_q;
- struct mtx ntb_rx_pend_q_lock;
- struct mtx ntb_rx_free_q_lock;
+ /* ntb_rx_q_lock: synchronize access to rx_XXXX_q */
+ struct mtx ntb_rx_q_lock;
struct task rx_completion_task;
struct task rxc_db_work;
void *rx_buff;
@@ -285,10 +286,11 @@ static void ntb_memcpy_tx(struct ntb_transport_qp *qp,
struct ntb_queue_entry *entry, void *offset);
static void ntb_qp_full(void *arg);
static void ntb_transport_rxc_db(void *arg, int pending);
-static void ntb_rx_pendq_full(void *arg);
static int ntb_process_rxc(struct ntb_transport_qp *qp);
-static void ntb_rx_copy_task(struct ntb_transport_qp *qp,
+static void ntb_memcpy_rx(struct ntb_transport_qp *qp,
struct ntb_queue_entry *entry, void *offset);
+static inline void ntb_rx_copy_callback(struct ntb_transport_qp *qp,
+ void *data);
static void ntb_complete_rxc(void *arg, int pending);
static void ntb_transport_doorbell_callback(void *data, uint32_t vector);
static void ntb_transport_event_callback(void *data);
@@ -308,6 +310,8 @@ static void ntb_list_add(struct mtx *lock, struct ntb_queue_entry *entry,
struct ntb_queue_list *list);
static struct ntb_queue_entry *ntb_list_rm(struct mtx *lock,
struct ntb_queue_list *list);
+static struct ntb_queue_entry *ntb_list_mv(struct mtx *lock,
+ struct ntb_queue_list *from, struct ntb_queue_list *to);
static void create_random_local_eui48(u_char *eaddr);
static unsigned int ntb_transport_max_size(struct ntb_transport_qp *qp);
@@ -681,12 +685,12 @@ ntb_transport_init_queue(struct ntb_transport_ctx *nt, unsigned int qp_num)
callout_init(&qp->queue_full, 1);
callout_init(&qp->rx_full, 1);
- mtx_init(&qp->ntb_rx_pend_q_lock, "ntb rx pend q", NULL, MTX_SPIN);
- mtx_init(&qp->ntb_rx_free_q_lock, "ntb rx free q", NULL, MTX_SPIN);
+ mtx_init(&qp->ntb_rx_q_lock, "ntb rx q", NULL, MTX_SPIN);
mtx_init(&qp->ntb_tx_free_q_lock, "ntb tx free q", NULL, MTX_SPIN);
TASK_INIT(&qp->rx_completion_task, 0, ntb_complete_rxc, qp);
TASK_INIT(&qp->rxc_db_work, 0, ntb_transport_rxc_db, qp);
+ STAILQ_INIT(&qp->rx_post_q);
STAILQ_INIT(&qp->rx_pend_q);
STAILQ_INIT(&qp->rx_free_q);
STAILQ_INIT(&qp->tx_free_q);
@@ -711,10 +715,13 @@ ntb_transport_free_queue(struct ntb_transport_qp *qp)
qp->tx_handler = NULL;
qp->event_handler = NULL;
- while ((entry = ntb_list_rm(&qp->ntb_rx_free_q_lock, &qp->rx_free_q)))
+ while ((entry = ntb_list_rm(&qp->ntb_rx_q_lock, &qp->rx_free_q)))
free(entry, M_NTB_IF);
- while ((entry = ntb_list_rm(&qp->ntb_rx_pend_q_lock, &qp->rx_pend_q)))
+ while ((entry = ntb_list_rm(&qp->ntb_rx_q_lock, &qp->rx_pend_q)))
+ free(entry, M_NTB_IF);
+
+ while ((entry = ntb_list_rm(&qp->ntb_rx_q_lock, &qp->rx_post_q)))
free(entry, M_NTB_IF);
while ((entry = ntb_list_rm(&qp->ntb_tx_free_q_lock, &qp->tx_free_q)))
@@ -769,7 +776,7 @@ ntb_transport_create_queue(void *data, struct ntb_softc *ntb,
entry->cb_data = nt->ifp;
entry->buf = NULL;
entry->len = transport_mtu;
- ntb_list_add(&qp->ntb_rx_pend_q_lock, entry, &qp->rx_pend_q);
+ ntb_list_add(&qp->ntb_rx_q_lock, entry, &qp->rx_pend_q);
}
for (i = 0; i < NTB_QP_DEF_NUM_ENTRIES; i++) {
@@ -951,14 +958,6 @@ ntb_qp_full(void *arg)
/* Transport Rx */
static void
-ntb_rx_pendq_full(void *arg)
-{
-
- CTR0(KTR_NTB, "RX: ntb_rx_pendq_full callout");
- ntb_transport_rxc_db(arg, 0);
-}
-
-static void
ntb_transport_rxc_db(void *arg, int pending __unused)
{
struct ntb_transport_qp *qp = arg;
@@ -1030,7 +1029,7 @@ ntb_process_rxc(struct ntb_transport_qp *qp)
return (EIO);
}
- entry = ntb_list_rm(&qp->ntb_rx_pend_q_lock, &qp->rx_pend_q);
+ entry = ntb_list_mv(&qp->ntb_rx_q_lock, &qp->rx_pend_q, &qp->rx_post_q);
if (entry == NULL) {
qp->rx_err_no_buf++;
CTR0(KTR_NTB, "RX: No entries in rx_pend_q");
@@ -1050,7 +1049,6 @@ ntb_process_rxc(struct ntb_transport_qp *qp)
entry->len = -EIO;
entry->flags |= IF_NTB_DESC_DONE_FLAG;
- ntb_list_add(&qp->ntb_rx_free_q_lock, entry, &qp->rx_free_q);
taskqueue_enqueue(taskqueue_swi, &qp->rx_completion_task);
} else {
qp->rx_bytes += hdr->len;
@@ -1060,7 +1058,7 @@ ntb_process_rxc(struct ntb_transport_qp *qp)
entry->len = hdr->len;
- ntb_rx_copy_task(qp, entry, offset);
+ ntb_memcpy_rx(qp, entry, offset);
}
qp->rx_index++;
@@ -1069,7 +1067,7 @@ ntb_process_rxc(struct ntb_transport_qp *qp)
}
static void
-ntb_rx_copy_task(struct ntb_transport_qp *qp, struct ntb_queue_entry *entry,
+ntb_memcpy_rx(struct ntb_transport_qp *qp, struct ntb_queue_entry *entry,
void *offset)
{
struct ifnet *ifp = entry->cb_data;
@@ -1084,15 +1082,18 @@ ntb_rx_copy_task(struct ntb_transport_qp *qp, struct ntb_queue_entry *entry,
/* Ensure that the data is globally visible before clearing the flag */
wmb();
- entry->x_hdr->flags = 0;
- /* TODO: replace with bus_space_write */
- qp->rx_info->entry = qp->rx_index;
- CTR2(KTR_NTB,
- "RX: copied entry %p to mbuf %p. Adding entry to rx_free_q", entry,
- m);
- ntb_list_add(&qp->ntb_rx_free_q_lock, entry, &qp->rx_free_q);
+ CTR2(KTR_NTB, "RX: copied entry %p to mbuf %p.", entry, m);
+ ntb_rx_copy_callback(qp, entry);
+}
+static inline void
+ntb_rx_copy_callback(struct ntb_transport_qp *qp, void *data)
+{
+ struct ntb_queue_entry *entry;
+
+ entry = data;
+ entry->flags |= IF_NTB_DESC_DONE_FLAG;
taskqueue_enqueue(taskqueue_swi, &qp->rx_completion_task);
}
@@ -1100,30 +1101,39 @@ static void
ntb_complete_rxc(void *arg, int pending)
{
struct ntb_transport_qp *qp = arg;
- struct mbuf *m;
struct ntb_queue_entry *entry;
+ struct mbuf *m;
+ unsigned len;
CTR0(KTR_NTB, "RX: rx_completion_task");
- while ((entry = ntb_list_rm(&qp->ntb_rx_free_q_lock, &qp->rx_free_q))) {
+ mtx_lock_spin(&qp->ntb_rx_q_lock);
+
+ while (!STAILQ_EMPTY(&qp->rx_post_q)) {
+ entry = STAILQ_FIRST(&qp->rx_post_q);
+ if ((entry->flags & IF_NTB_DESC_DONE_FLAG) == 0)
+ break;
+
+ entry->x_hdr->flags = 0;
+ /* XXX bus_space_write */
+ qp->rx_info->entry = entry->index;
+
+ len = entry->len;
m = entry->buf;
+
+ STAILQ_REMOVE_HEAD(&qp->rx_post_q, entry);
+ STAILQ_INSERT_TAIL(&qp->rx_free_q, entry, entry);
+
+ mtx_unlock_spin(&qp->ntb_rx_q_lock);
+
CTR2(KTR_NTB, "RX: completing entry %p, mbuf %p", entry, m);
- if (qp->rx_handler && qp->client_ready)
- qp->rx_handler(qp, qp->cb_data, m, entry->len);
+ if (qp->rx_handler != NULL && qp->client_ready)
+ qp->rx_handler(qp, qp->cb_data, m, len);
- entry->buf = NULL;
- entry->len = qp->transport->bufsize;
-
- CTR1(KTR_NTB,"RX: entry %p removed from rx_free_q "
- "and added to rx_pend_q", entry);
- ntb_list_add(&qp->ntb_rx_pend_q_lock, entry, &qp->rx_pend_q);
- if (qp->rx_err_no_buf > qp->last_rx_no_buf) {
- qp->last_rx_no_buf = qp->rx_err_no_buf;
- CTR0(KTR_NTB, "RX: could spawn rx task");
- callout_reset(&qp->rx_full, hz / 1000, ntb_rx_pendq_full,
- qp);
- }
+ mtx_lock_spin(&qp->ntb_rx_q_lock);
}
+
+ mtx_unlock_spin(&qp->ntb_rx_q_lock);
}
static void
@@ -1573,6 +1583,26 @@ out:
return (entry);
}
+static struct ntb_queue_entry *
+ntb_list_mv(struct mtx *lock, struct ntb_queue_list *from,
+ struct ntb_queue_list *to)
+{
+ struct ntb_queue_entry *entry;
+
+ mtx_lock_spin(lock);
+ if (STAILQ_EMPTY(from)) {
+ entry = NULL;
+ goto out;
+ }
+ entry = STAILQ_FIRST(from);
+ STAILQ_REMOVE_HEAD(from, entry);
+ STAILQ_INSERT_TAIL(to, entry, entry);
+
+out:
+ mtx_unlock_spin(lock);
+ return (entry);
+}
+
/* Helper functions */
/* TODO: This too should really be part of the kernel */
#define EUI48_MULTICAST 1 << 0