aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGleb Smirnoff <glebius@FreeBSD.org>2025-05-05 19:56:04 +0000
committerGleb Smirnoff <glebius@FreeBSD.org>2025-05-05 19:56:04 +0000
commitd15792780760ef94647af9b377b5f0a80e1826bc (patch)
tree95daa2844852142b59762b889b79161ac75756a3
parentfbd7087b0be2f327f806a85b92789a719138df8c (diff)
unix: new implementation of unix/stream & unix/seqpacket
[this is an updated version of d80a97def9a1, that had been reverted] Provide protocol specific pr_sosend and pr_soreceive for PF_UNIX SOCK_STREAM sockets and implement SOCK_SEQPACKET sockets as an extension of SOCK_STREAM. The change meets three goals: get rid of unix(4) specific stuff in the generic socket code, provide a faster and robust unix/stream sockets and bring unix/seqpacket much closer to specification. Highlights follow: - The send buffer now is truly bypassed. Previously it was always empty, but the send(2) still needed to acquire its lock and do a variety of tricks to be woken up in the right time while sleeping on it. Now the only two things we care about in the send buffer is the I/O sx(9) lock that serializes operations and value of so_snd.sb_hiwat, which we can read without obtaining a lock. The sleep of a send(2) happens on the mutex of the receive buffer of the peer. A bulk send/recv of data with large socket buffers will make both syscalls just bounce between owning the receive buffer lock and copyin(9)/copyout(9), no other locks would be involved. Since event notification mechanisms, such as select(2), poll(2) and kevent(2) use state of the send buffer to monitor writability, the new implementation provides protocol specific pr_sopoll and pr_kqfilter. The sendfile(2) over unix/stream is preserved, providing protocol specific pr_send and pr_sendfile_wait methods. - The implementation uses new mchain structure to manipulate mbuf chains. Note that this required converting to mchain two functions that are shared with unix/dgram: unp_internalize() and unp_addsockcred() as well as adding a new shared one uipc_process_kernel_mbuf(). This induces some non- functional changes in the unix/dgram code as well. There is a space for improvement here, as right now it is a mix of mchain and manually managed mbuf chains. - unix/seqpacket previously marked as PR_ADDR & PR_ATOMIC and thus treated as a datagram socket by the generic socket code, now becomes a true stream socket with record markers. - Note on aio(4). First problem with socket aio(4) is that it uses socket buffer locks for queueing and piggybacking on this locking it calls soreadable() and sowriteable() directly. Ideally it should use pr_sopoll() method. Second problem is that unlike a syscall, aio(4) wants a consistent uio structure upon return. This is incompatible with our speculative read optimization, so in case of aio(4) write we need to restore consistency of uio. At this point we workaround those problems on the side of unix(4), but ideally those workarounds should be socket aio(4) problem (not a first class citizen) rather than problem of unix(4), definitely a primary facility.
-rw-r--r--sys/kern/uipc_usrreq.c1700
-rw-r--r--sys/sys/sockbuf.h12
2 files changed, 1332 insertions, 380 deletions
diff --git a/sys/kern/uipc_usrreq.c b/sys/kern/uipc_usrreq.c
index 79e4da5c8698..06b3317dc775 100644
--- a/sys/kern/uipc_usrreq.c
+++ b/sys/kern/uipc_usrreq.c
@@ -5,7 +5,7 @@
* The Regents of the University of California. All Rights Reserved.
* Copyright (c) 2004-2009 Robert N. M. Watson All Rights Reserved.
* Copyright (c) 2018 Matthew Macy
- * Copyright (c) 2022 Gleb Smirnoff <glebius@FreeBSD.org>
+ * Copyright (c) 2022-2025 Gleb Smirnoff <glebius@FreeBSD.org>
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
@@ -73,6 +73,7 @@
#include <sys/mount.h>
#include <sys/mutex.h>
#include <sys/namei.h>
+#include <sys/poll.h>
#include <sys/proc.h>
#include <sys/protosw.h>
#include <sys/queue.h>
@@ -142,11 +143,14 @@ static struct timeout_task unp_gc_task;
static struct task unp_defer_task;
/*
- * Both send and receive buffers are allocated PIPSIZ bytes of buffering for
- * stream sockets, although the total for sender and receiver is actually
- * only PIPSIZ.
+ * SOCK_STREAM and SOCK_SEQPACKET unix(4) sockets fully bypass the send buffer,
+ * however the notion of send buffer still makes sense with them. Its size is
+ * the amount of space that a send(2) syscall may copyin(9) before checking
+ * with the receive buffer of a peer. Although not linked anywhere yet,
+ * pointed to by a stack variable, effectively it is a buffer that needs to be
+ * sized.
*
- * Datagram sockets really use the sendspace as the maximum datagram size,
+ * SOCK_DGRAM sockets really use the sendspace as the maximum datagram size,
* and don't really want to reserve the sendspace. Their recvspace should be
* large enough for at least one max-size datagram plus address.
*/
@@ -157,7 +161,7 @@ static u_long unpst_sendspace = PIPSIZ;
static u_long unpst_recvspace = PIPSIZ;
static u_long unpdg_maxdgram = 8*1024; /* support 8KB syslog msgs */
static u_long unpdg_recvspace = 16*1024;
-static u_long unpsp_sendspace = PIPSIZ; /* really max datagram size */
+static u_long unpsp_sendspace = PIPSIZ;
static u_long unpsp_recvspace = PIPSIZ;
static SYSCTL_NODE(_net, PF_LOCAL, local, CTLFLAG_RW | CTLFLAG_MPSAFE, 0,
@@ -292,7 +296,7 @@ static int unp_connect(struct socket *, struct sockaddr *,
struct thread *);
static int unp_connectat(int, struct socket *, struct sockaddr *,
struct thread *, bool);
-static void unp_connect2(struct socket *so, struct socket *so2);
+static void unp_connect2(struct socket *, struct socket *, bool);
static void unp_disconnect(struct unpcb *unp, struct unpcb *unp2);
static void unp_dispose(struct socket *so);
static void unp_shutdown(struct unpcb *);
@@ -301,15 +305,18 @@ static void unp_gc(__unused void *, int);
static void unp_scan(struct mbuf *, void (*)(struct filedescent **, int));
static void unp_discard(struct file *);
static void unp_freerights(struct filedescent **, int);
-static int unp_internalize(struct mbuf **, struct thread *,
- struct mbuf **, u_int *, u_int *);
+static int unp_internalize(struct mbuf *, struct mchain *,
+ struct thread *);
static void unp_internalize_fp(struct file *);
static int unp_externalize(struct mbuf *, struct mbuf **, int);
static int unp_externalize_fp(struct file *);
-static struct mbuf *unp_addsockcred(struct thread *, struct mbuf *,
- int, struct mbuf **, u_int *, u_int *);
+static void unp_addsockcred(struct thread *, struct mchain *, int);
static void unp_process_defers(void * __unused, int);
+static void uipc_wrknl_lock(void *);
+static void uipc_wrknl_unlock(void *);
+static void uipc_wrknl_assert_lock(void *, int);
+
static void
unp_pcb_hold(struct unpcb *unp)
{
@@ -417,6 +424,41 @@ unp_pcb_lock_peer(struct unpcb *unp)
return (unp2);
}
+/*
+ * Try to lock peer of our socket for purposes of sending data to it.
+ */
+static int
+uipc_lock_peer(struct socket *so, struct unpcb **unp2)
+{
+ struct unpcb *unp;
+ int error;
+
+ unp = sotounpcb(so);
+ UNP_PCB_LOCK(unp);
+ *unp2 = unp_pcb_lock_peer(unp);
+ if (__predict_false(so->so_error != 0)) {
+ error = so->so_error;
+ so->so_error = 0;
+ UNP_PCB_UNLOCK(unp);
+ if (*unp2 != NULL)
+ UNP_PCB_UNLOCK(*unp2);
+ return (error);
+ }
+ if (__predict_false(*unp2 == NULL)) {
+ /*
+ * Different error code for a previously connected socket and
+ * a never connected one. The SS_ISDISCONNECTED is set in the
+ * unp_soisdisconnected() and is synchronized by the pcb lock.
+ */
+ error = so->so_state & SS_ISDISCONNECTED ? EPIPE : ENOTCONN;
+ UNP_PCB_UNLOCK(unp);
+ return (error);
+ }
+ UNP_PCB_UNLOCK(unp);
+
+ return (0);
+}
+
static void
uipc_abort(struct socket *so)
{
@@ -446,11 +488,6 @@ uipc_attach(struct socket *so, int proto, struct thread *td)
KASSERT(so->so_pcb == NULL, ("uipc_attach: so_pcb != NULL"));
switch (so->so_type) {
- case SOCK_STREAM:
- sendspace = unpst_sendspace;
- recvspace = unpst_recvspace;
- break;
-
case SOCK_DGRAM:
STAILQ_INIT(&so->so_rcv.uxdg_mb);
STAILQ_INIT(&so->so_snd.uxdg_mb);
@@ -463,11 +500,27 @@ uipc_attach(struct socket *so, int proto, struct thread *td)
sendspace = recvspace = unpdg_recvspace;
break;
+ case SOCK_STREAM:
+ sendspace = unpst_sendspace;
+ recvspace = unpst_recvspace;
+ goto common;
+
case SOCK_SEQPACKET:
sendspace = unpsp_sendspace;
recvspace = unpsp_recvspace;
+common:
+ /*
+ * XXXGL: we need to initialize the mutex with MTX_DUPOK.
+ * Ideally, protocols that have PR_SOCKBUF should be
+ * responsible for mutex initialization officially, and then
+ * this uglyness with mtx_destroy(); mtx_init(); would go away.
+ */
+ mtx_destroy(&so->so_rcv_mtx);
+ mtx_init(&so->so_rcv_mtx, "so_rcv", NULL, MTX_DEF | MTX_DUPOK);
+ knlist_init(&so->so_wrsel.si_note, so, uipc_wrknl_lock,
+ uipc_wrknl_unlock, uipc_wrknl_assert_lock);
+ STAILQ_INIT(&so->so_rcv.uxst_mbq);
break;
-
default:
panic("uipc_attach");
}
@@ -737,7 +790,7 @@ uipc_connect2(struct socket *so1, struct socket *so2)
unp2 = so2->so_pcb;
KASSERT(unp2 != NULL, ("uipc_connect2: unp2 == NULL"));
unp_pcb_lock_pair(unp, unp2);
- unp_connect2(so1, so2);
+ unp_connect2(so1, so2, false);
unp_pcb_unlock_pair(unp, unp2);
return (0);
@@ -820,6 +873,11 @@ uipc_detach(struct socket *so)
taskqueue_enqueue_timeout(taskqueue_thread, &unp_gc_task, -1);
switch (so->so_type) {
+ case SOCK_STREAM:
+ case SOCK_SEQPACKET:
+ MPASS(SOLISTENING(so) || (STAILQ_EMPTY(&so->so_rcv.uxst_mbq) &&
+ so->so_rcv.uxst_peer == NULL));
+ break;
case SOCK_DGRAM:
/*
* Everything should have been unlinked/freed by unp_dispose()
@@ -875,6 +933,12 @@ uipc_listen(struct socket *so, int backlog, struct thread *td)
error = solisten_proto_check(so);
if (error == 0) {
cru2xt(td, &unp->unp_peercred);
+ if (!SOLISTENING(so)) {
+ (void)chgsbsize(so->so_cred->cr_uidinfo,
+ &so->so_snd.sb_hiwat, 0, RLIM_INFINITY);
+ (void)chgsbsize(so->so_cred->cr_uidinfo,
+ &so->so_rcv.sb_hiwat, 0, RLIM_INFINITY);
+ }
solisten_proto(so, backlog);
}
SOCK_UNLOCK(so);
@@ -908,187 +972,880 @@ uipc_peeraddr(struct socket *so, struct sockaddr *ret)
return (0);
}
-static int
-uipc_rcvd(struct socket *so, int flags)
+/*
+ * pr_sosend() called with mbuf instead of uio is a kernel thread. NFS,
+ * netgraph(4) and other subsystems can call into socket code. The
+ * function will condition the mbuf so that it can be safely put onto socket
+ * buffer and calculate its char count and mbuf count.
+ *
+ * Note: we don't support receiving control data from a kernel thread. Our
+ * pr_sosend methods have MPASS() to check that. This may change.
+ */
+static void
+uipc_reset_kernel_mbuf(struct mbuf *m, struct mchain *mc)
{
- struct unpcb *unp, *unp2;
- struct socket *so2;
- u_int mbcnt, sbcc;
- unp = sotounpcb(so);
- KASSERT(unp != NULL, ("%s: unp == NULL", __func__));
- KASSERT(so->so_type == SOCK_STREAM || so->so_type == SOCK_SEQPACKET,
- ("%s: socktype %d", __func__, so->so_type));
+ M_ASSERTPKTHDR(m);
- /*
- * Adjust backpressure on sender and wakeup any waiting to write.
- *
- * The unp lock is acquired to maintain the validity of the unp_conn
- * pointer; no lock on unp2 is required as unp2->unp_socket will be
- * static as long as we don't permit unp2 to disconnect from unp,
- * which is prevented by the lock on unp. We cache values from
- * so_rcv to avoid holding the so_rcv lock over the entire
- * transaction on the remote so_snd.
- */
- SOCKBUF_LOCK(&so->so_rcv);
- mbcnt = so->so_rcv.sb_mbcnt;
- sbcc = sbavail(&so->so_rcv);
- SOCKBUF_UNLOCK(&so->so_rcv);
- /*
- * There is a benign race condition at this point. If we're planning to
- * clear SB_STOP, but uipc_send is called on the connected socket at
- * this instant, it might add data to the sockbuf and set SB_STOP. Then
- * we would erroneously clear SB_STOP below, even though the sockbuf is
- * full. The race is benign because the only ill effect is to allow the
- * sockbuf to exceed its size limit, and the size limits are not
- * strictly guaranteed anyway.
- */
- UNP_PCB_LOCK(unp);
- unp2 = unp->unp_conn;
- if (unp2 == NULL) {
- UNP_PCB_UNLOCK(unp);
- return (0);
+ m_clrprotoflags(m);
+ m_tag_delete_chain(m, NULL);
+ m->m_pkthdr.rcvif = NULL;
+ m->m_pkthdr.flowid = 0;
+ m->m_pkthdr.csum_flags = 0;
+ m->m_pkthdr.fibnum = 0;
+ m->m_pkthdr.rsstype = 0;
+
+ mc_init_m(mc, m);
+ MPASS(m->m_pkthdr.len == mc->mc_len);
+}
+
+#ifdef SOCKBUF_DEBUG
+static inline void
+uipc_stream_sbcheck(struct sockbuf *sb)
+{
+ struct mbuf *d;
+ u_int dacc, dccc, dctl, dmbcnt;
+ bool notready = false;
+
+ dacc = dccc = dctl = dmbcnt = 0;
+ STAILQ_FOREACH(d, &sb->uxst_mbq, m_stailq) {
+ if (d == sb->uxst_fnrdy)
+ notready = true;
+ if (notready)
+ MPASS(d->m_flags & M_NOTREADY);
+ if (d->m_type == MT_CONTROL)
+ dctl += d->m_len;
+ else if (d->m_type == MT_DATA) {
+ dccc += d->m_len;
+ if (!notready)
+ dacc += d->m_len;
+ } else
+ MPASS(0);
+ dmbcnt += MSIZE;
+ if (d->m_flags & M_EXT)
+ dmbcnt += d->m_ext.ext_size;
+ if (d->m_stailq.stqe_next == NULL)
+ MPASS(sb->uxst_mbq.stqh_last == &d->m_stailq.stqe_next);
}
- so2 = unp2->unp_socket;
- SOCKBUF_LOCK(&so2->so_snd);
- if (sbcc < so2->so_snd.sb_hiwat && mbcnt < so2->so_snd.sb_mbmax)
- so2->so_snd.sb_flags &= ~SB_STOP;
- sowwakeup_locked(so2);
- UNP_PCB_UNLOCK(unp);
- return (0);
+ MPASS(sb->uxst_fnrdy == NULL || notready);
+ MPASS(dacc == sb->sb_acc);
+ MPASS(dccc == sb->sb_ccc);
+ MPASS(dctl == sb->sb_ctl);
+ MPASS(dmbcnt == sb->sb_mbcnt);
+ (void)STAILQ_EMPTY(&sb->uxst_mbq);
+}
+#define UIPC_STREAM_SBCHECK(sb) uipc_stream_sbcheck(sb)
+#else
+#define UIPC_STREAM_SBCHECK(sb) do {} while (0)
+#endif
+
+/*
+ * uipc_stream_sbspace() returns how much a writer can send, limited by char
+ * count or mbuf memory use, whatever ends first.
+ *
+ * An obvious and legitimate reason for a socket having more data than allowed,
+ * is lowering the limit with setsockopt(SO_RCVBUF) on already full buffer.
+ * Also, sb_mbcnt may overcommit sb_mbmax in case if previous write observed
+ * 'space < mbspace', but mchain allocated to hold 'space' bytes of data ended
+ * up with 'mc_mlen > mbspace'. A typical scenario would be a full buffer with
+ * writer trying to push in a large write, and a slow reader, that reads just
+ * a few bytes at a time. In that case writer will keep creating new mbufs
+ * with mc_split(). These mbufs will carry little chars, but will all point at
+ * the same cluster, thus each adding cluster size to sb_mbcnt. This means we
+ * will count same cluster many times potentially underutilizing socket buffer.
+ * We aren't optimizing towards ineffective readers. Classic socket buffer had
+ * the same "feature".
+ */
+static inline u_int
+uipc_stream_sbspace(struct sockbuf *sb)
+{
+ u_int space, mbspace;
+
+ if (__predict_true(sb->sb_hiwat >= sb->sb_ccc + sb->sb_ctl))
+ space = sb->sb_hiwat - sb->sb_ccc - sb->sb_ctl;
+ else
+ return (0);
+ if (__predict_true(sb->sb_mbmax >= sb->sb_mbcnt))
+ mbspace = sb->sb_mbmax - sb->sb_mbcnt;
+ else
+ return (0);
+
+ return (min(space, mbspace));
}
static int
-uipc_send(struct socket *so, int flags, struct mbuf *m, struct sockaddr *nam,
- struct mbuf *control, struct thread *td)
+uipc_sosend_stream_or_seqpacket(struct socket *so, struct sockaddr *addr,
+ struct uio *uio0, struct mbuf *m, struct mbuf *c, int flags,
+ struct thread *td)
{
- struct unpcb *unp, *unp2;
+ struct unpcb *unp2;
struct socket *so2;
- u_int mbcnt, sbcc;
+ struct sockbuf *sb;
+ struct uio *uio;
+ struct mchain mc, cmc;
+ size_t resid, sent;
+ bool nonblock, eor, aio;
int error;
- unp = sotounpcb(so);
- KASSERT(unp != NULL, ("%s: unp == NULL", __func__));
- KASSERT(so->so_type == SOCK_STREAM || so->so_type == SOCK_SEQPACKET,
- ("%s: socktype %d", __func__, so->so_type));
+ MPASS((uio0 != NULL && m == NULL) || (m != NULL && uio0 == NULL));
+ MPASS(m == NULL || c == NULL);
- error = 0;
- if (flags & PRUS_OOB) {
- error = EOPNOTSUPP;
- goto release;
- }
- if (control != NULL &&
- (error = unp_internalize(&control, td, NULL, NULL, NULL)))
- goto release;
+ if (__predict_false(flags & MSG_OOB))
+ return (EOPNOTSUPP);
- unp2 = NULL;
- if ((so->so_state & SS_ISCONNECTED) == 0) {
- if (nam != NULL) {
- if ((error = unp_connect(so, nam, td)) != 0)
- goto out;
- } else {
- error = ENOTCONN;
+ nonblock = (so->so_state & SS_NBIO) ||
+ (flags & (MSG_DONTWAIT | MSG_NBIO));
+ eor = flags & MSG_EOR;
+
+ mc = MCHAIN_INITIALIZER(&mc);
+ cmc = MCHAIN_INITIALIZER(&cmc);
+ sent = 0;
+ aio = false;
+
+ if (m == NULL) {
+ if (c != NULL && (error = unp_internalize(c, &cmc, td)))
goto out;
+ /*
+ * This function may read more data from the uio than it would
+ * then place on socket. That would leave uio inconsistent
+ * upon return. Normally uio is allocated on the stack of the
+ * syscall thread and we don't care about leaving it consistent.
+ * However, aio(9) will allocate a uio as part of job and will
+ * use it to track progress. We detect aio(9) checking the
+ * SB_AIO_RUNNING flag. It is safe to check it without lock
+ * cause it is set and cleared in the same taskqueue thread.
+ *
+ * This check can also produce a false positive: there is
+ * aio(9) job and also there is a syscall we are serving now.
+ * No sane software does that, it would leave to a mess in
+ * the socket buffer, as aio(9) doesn't grab the I/O sx(9).
+ * But syzkaller can create this mess. For such false positive
+ * our goal is just don't panic or leak memory.
+ */
+ if (__predict_false(so->so_snd.sb_flags & SB_AIO_RUNNING)) {
+ uio = cloneuio(uio0);
+ aio = true;
+ } else {
+ uio = uio0;
+ resid = uio->uio_resid;
}
- }
+ /*
+ * Optimization for a case when our send fits into the receive
+ * buffer - do the copyin before taking any locks, sized to our
+ * send buffer. Later copyins will also take into account
+ * space in the peer's receive buffer.
+ */
+ error = mc_uiotomc(&mc, uio, so->so_snd.sb_hiwat, 0, M_WAITOK,
+ eor ? M_EOR : 0);
+ if (__predict_false(error))
+ goto out2;
+ } else
+ uipc_reset_kernel_mbuf(m, &mc);
+
+ error = SOCK_IO_SEND_LOCK(so, SBLOCKWAIT(flags));
+ if (error)
+ goto out2;
+
+ if (__predict_false((error = uipc_lock_peer(so, &unp2)) != 0))
+ goto out3;
- UNP_PCB_LOCK(unp);
- if ((unp2 = unp_pcb_lock_peer(unp)) == NULL) {
- UNP_PCB_UNLOCK(unp);
- error = ENOTCONN;
- goto out;
- } else if (so->so_snd.sb_state & SBS_CANTSENDMORE) {
- unp_pcb_unlock_pair(unp, unp2);
- error = EPIPE;
- goto out;
- }
- UNP_PCB_UNLOCK(unp);
- if ((so2 = unp2->unp_socket) == NULL) {
- UNP_PCB_UNLOCK(unp2);
- error = ENOTCONN;
- goto out;
- }
- SOCKBUF_LOCK(&so2->so_rcv);
if (unp2->unp_flags & UNP_WANTCRED_MASK) {
/*
* Credentials are passed only once on SOCK_STREAM and
* SOCK_SEQPACKET (LOCAL_CREDS => WANTCRED_ONESHOT), or
* forever (LOCAL_CREDS_PERSISTENT => WANTCRED_ALWAYS).
*/
- control = unp_addsockcred(td, control, unp2->unp_flags, NULL,
- NULL, NULL);
+ unp_addsockcred(td, &cmc, unp2->unp_flags);
unp2->unp_flags &= ~UNP_WANTCRED_ONESHOT;
}
/*
- * Send to paired receive port and wake up readers. Don't
- * check for space available in the receive buffer if we're
- * attaching ancillary data; Unix domain sockets only check
- * for space in the sending sockbuf, and that check is
- * performed one level up the stack. At that level we cannot
- * precisely account for the amount of buffer space used
- * (e.g., because control messages are not yet internalized).
+ * Cycle through the data to send and available space in the peer's
+ * receive buffer. Put a reference on the peer socket, so that it
+ * doesn't get freed while we sbwait(). If peer goes away, we will
+ * observe the SBS_CANTRCVMORE and our sorele() will finalize peer's
+ * socket destruction.
*/
- switch (so->so_type) {
- case SOCK_STREAM:
- if (control != NULL) {
- sbappendcontrol_locked(&so2->so_rcv,
- m->m_len > 0 ? m : NULL, control, flags);
- control = NULL;
- } else
- sbappend_locked(&so2->so_rcv, m, flags);
- break;
+ so2 = unp2->unp_socket;
+ soref(so2);
+ UNP_PCB_UNLOCK(unp2);
+ sb = &so2->so_rcv;
+ while (mc.mc_len + cmc.mc_len > 0) {
+ struct mchain mcnext = MCHAIN_INITIALIZER(&mcnext);
+ u_int space;
- case SOCK_SEQPACKET:
- if (sbappendaddr_nospacecheck_locked(&so2->so_rcv,
- &sun_noname, m, control))
- control = NULL;
- break;
+ SOCK_RECVBUF_LOCK(so2);
+restart:
+ UIPC_STREAM_SBCHECK(sb);
+ if (__predict_false(cmc.mc_len > sb->sb_hiwat)) {
+ SOCK_RECVBUF_UNLOCK(so2);
+ error = EMSGSIZE;
+ goto out4;
+ }
+ if (__predict_false(sb->sb_state & SBS_CANTRCVMORE)) {
+ SOCK_RECVBUF_UNLOCK(so2);
+ error = EPIPE;
+ goto out4;
+ }
+ /*
+ * Wait on the peer socket receive buffer until we have enough
+ * space to put at least control. The data is a stream and can
+ * be put partially, but control is really a datagram.
+ */
+ space = uipc_stream_sbspace(sb);
+ if (space < sb->sb_lowat || space < cmc.mc_len) {
+ if (nonblock) {
+ if (aio)
+ sb->uxst_flags |= UXST_PEER_AIO;
+ SOCK_RECVBUF_UNLOCK(so2);
+ if (aio) {
+ SOCK_SENDBUF_LOCK(so);
+ so->so_snd.sb_ccc =
+ so->so_snd.sb_hiwat - space;
+ SOCK_SENDBUF_UNLOCK(so);
+ }
+ error = EWOULDBLOCK;
+ goto out4;
+ }
+ if ((error = sbwait(so2, SO_RCV)) != 0) {
+ SOCK_RECVBUF_UNLOCK(so2);
+ goto out4;
+ } else
+ goto restart;
+ }
+ MPASS(space >= cmc.mc_len);
+ space -= cmc.mc_len;
+ if (space == 0) {
+ /* There is space only to send control. */
+ MPASS(!STAILQ_EMPTY(&cmc.mc_q));
+ mcnext = mc;
+ mc = MCHAIN_INITIALIZER(&mc);
+ } else if (space < mc.mc_len) {
+ /* Not enough space. */
+ if (__predict_false(mc_split(&mc, &mcnext, space,
+ M_NOWAIT) == ENOMEM)) {
+ /*
+ * If allocation failed use M_WAITOK and merge
+ * the chain back. Next time mc_split() will
+ * easily split at the same place. Only if we
+ * race with setsockopt(SO_RCVBUF) shrinking
+ * sb_hiwat can this happen more than once.
+ */
+ SOCK_RECVBUF_UNLOCK(so2);
+ (void)mc_split(&mc, &mcnext, space, M_WAITOK);
+ mc_concat(&mc, &mcnext);
+ SOCK_RECVBUF_LOCK(so2);
+ goto restart;
+ }
+ MPASS(mc.mc_len == space);
+ }
+ if (!STAILQ_EMPTY(&cmc.mc_q)) {
+ STAILQ_CONCAT(&sb->uxst_mbq, &cmc.mc_q);
+ sb->sb_ctl += cmc.mc_len;
+ sb->sb_mbcnt += cmc.mc_mlen;
+ cmc.mc_len = 0;
+ }
+ sent += mc.mc_len;
+ sb->sb_acc += mc.mc_len;
+ sb->sb_ccc += mc.mc_len;
+ sb->sb_mbcnt += mc.mc_mlen;
+ STAILQ_CONCAT(&sb->uxst_mbq, &mc.mc_q);
+ UIPC_STREAM_SBCHECK(sb);
+ space = uipc_stream_sbspace(sb);
+ sorwakeup_locked(so2);
+ if (!STAILQ_EMPTY(&mcnext.mc_q)) {
+ /*
+ * Such assignment is unsafe in general, but it is
+ * safe with !STAILQ_EMPTY(&mcnext.mc_q). In C++ we
+ * could reload = for STAILQs :)
+ */
+ mc = mcnext;
+ } else if (uio != NULL && uio->uio_resid > 0) {
+ /*
+ * Copyin sum of peer's receive buffer space and our
+ * sb_hiwat, which is our virtual send buffer size.
+ * See comment above unpst_sendspace declaration.
+ * We are reading sb_hiwat locklessly, cause a) we
+ * don't care about an application that does send(2)
+ * and setsockopt(2) racing internally, and for an
+ * application that does this in sequence we will see
+ * the correct value cause sbsetopt() uses buffer lock
+ * and we also have already acquired it at least once.
+ */
+ error = mc_uiotomc(&mc, uio, space +
+ atomic_load_int(&so->so_snd.sb_hiwat), 0, M_WAITOK,
+ eor ? M_EOR : 0);
+ if (__predict_false(error))
+ goto out4;
+ } else
+ mc = MCHAIN_INITIALIZER(&mc);
}
- mbcnt = so2->so_rcv.sb_mbcnt;
- sbcc = sbavail(&so2->so_rcv);
- if (sbcc)
- sorwakeup_locked(so2);
- else
- SOCKBUF_UNLOCK(&so2->so_rcv);
+ MPASS(STAILQ_EMPTY(&mc.mc_q));
+
+ td->td_ru.ru_msgsnd++;
+out4:
+ sorele(so2);
+out3:
+ SOCK_IO_SEND_UNLOCK(so);
+out2:
+ if (aio) {
+ freeuio(uio);
+ uioadvance(uio0, sent);
+ } else if (uio != NULL)
+ uio->uio_resid = resid - sent;
+ if (!mc_empty(&cmc))
+ unp_scan(mc_first(&cmc), unp_freerights);
+out:
+ mc_freem(&mc);
+ mc_freem(&cmc);
+
+ return (error);
+}
+
+static int
+uipc_soreceive_stream_or_seqpacket(struct socket *so, struct sockaddr **psa,
+ struct uio *uio, struct mbuf **mp0, struct mbuf **controlp, int *flagsp)
+{
+ struct sockbuf *sb = &so->so_rcv;
+ struct mbuf *control, *m, *first, *last, *next;
+ u_int ctl, space, datalen, mbcnt, lastlen;
+ int error, flags;
+ bool nonblock, waitall, peek;
+
+ MPASS(mp0 == NULL);
+
+ if (psa != NULL)
+ *psa = NULL;
+ if (controlp != NULL)
+ *controlp = NULL;
+
+ flags = flagsp != NULL ? *flagsp : 0;
+ nonblock = (so->so_state & SS_NBIO) ||
+ (flags & (MSG_DONTWAIT | MSG_NBIO));
+ peek = flags & MSG_PEEK;
+ waitall = (flags & MSG_WAITALL) && !peek;
/*
- * The PCB lock on unp2 protects the SB_STOP flag. Without it,
- * it would be possible for uipc_rcvd to be called at this
- * point, drain the receiving sockbuf, clear SB_STOP, and then
- * we would set SB_STOP below. That could lead to an empty
- * sockbuf having SB_STOP set
+ * This check may fail only on a socket that never went through
+ * connect(2). We can check this locklessly, cause: a) for a new born
+ * socket we don't care about applications that may race internally
+ * between connect(2) and recv(2), and b) for a dying socket if we
+ * miss update by unp_sosidisconnected(), we would still get the check
+ * correct. For dying socket we would observe SBS_CANTRCVMORE later.
*/
- SOCKBUF_LOCK(&so->so_snd);
- if (sbcc >= so->so_snd.sb_hiwat || mbcnt >= so->so_snd.sb_mbmax)
- so->so_snd.sb_flags |= SB_STOP;
- SOCKBUF_UNLOCK(&so->so_snd);
- UNP_PCB_UNLOCK(unp2);
- m = NULL;
-out:
+ if (__predict_false((atomic_load_short(&so->so_state) &
+ (SS_ISCONNECTED|SS_ISDISCONNECTED)) == 0))
+ return (ENOTCONN);
+
+ error = SOCK_IO_RECV_LOCK(so, SBLOCKWAIT(flags));
+ if (__predict_false(error))
+ return (error);
+
+restart:
+ SOCK_RECVBUF_LOCK(so);
+ UIPC_STREAM_SBCHECK(sb);
+ while (sb->sb_acc < sb->sb_lowat &&
+ (sb->sb_ctl == 0 || controlp == NULL)) {
+ if (so->so_error) {
+ error = so->so_error;
+ if (!peek)
+ so->so_error = 0;
+ SOCK_RECVBUF_UNLOCK(so);
+ SOCK_IO_RECV_UNLOCK(so);
+ return (error);
+ }
+ if (sb->sb_state & SBS_CANTRCVMORE) {
+ SOCK_RECVBUF_UNLOCK(so);
+ SOCK_IO_RECV_UNLOCK(so);
+ return (0);
+ }
+ if (nonblock) {
+ SOCK_RECVBUF_UNLOCK(so);
+ SOCK_IO_RECV_UNLOCK(so);
+ return (EWOULDBLOCK);
+ }
+ error = sbwait(so, SO_RCV);
+ if (error) {
+ SOCK_RECVBUF_UNLOCK(so);
+ SOCK_IO_RECV_UNLOCK(so);
+ return (error);
+ }
+ }
+
+ MPASS(STAILQ_FIRST(&sb->uxst_mbq));
+ MPASS(sb->sb_acc > 0 || sb->sb_ctl > 0);
+
+ mbcnt = 0;
+ ctl = 0;
+ first = STAILQ_FIRST(&sb->uxst_mbq);
+ if (first->m_type == MT_CONTROL) {
+ control = first;
+ STAILQ_FOREACH_FROM(first, &sb->uxst_mbq, m_stailq) {
+ if (first->m_type != MT_CONTROL)
+ break;
+ ctl += first->m_len;
+ mbcnt += MSIZE;
+ if (first->m_flags & M_EXT)
+ mbcnt += first->m_ext.ext_size;
+ }
+ } else
+ control = NULL;
+
/*
- * PRUS_EOF is equivalent to pr_send followed by pr_shutdown.
+ * Find split point for the next copyout. On exit from the loop:
+ * last == NULL - socket to be flushed
+ * last != NULL
+ * lastlen > last->m_len - uio to be filled, last to be adjusted
+ * lastlen == 0 - MT_CONTROL or M_EOR encountered
*/
- if (flags & PRUS_EOF) {
+ space = uio->uio_resid;
+ datalen = 0;
+ for (m = first, last = NULL; m != NULL; m = STAILQ_NEXT(m, m_stailq)) {
+ if (m->m_type != MT_DATA) {
+ last = m;
+ lastlen = 0;
+ break;
+ }
+ if (space >= m->m_len) {
+ space -= m->m_len;
+ datalen += m->m_len;
+ mbcnt += MSIZE;
+ if (m->m_flags & M_EXT)
+ mbcnt += m->m_ext.ext_size;
+ if (m->m_flags & M_EOR) {
+ last = STAILQ_NEXT(m, m_stailq);
+ lastlen = 0;
+ flags |= MSG_EOR;
+ break;
+ }
+ } else {
+ datalen += space;
+ last = m;
+ lastlen = space;
+ break;
+ }
+ }
+
+ UIPC_STREAM_SBCHECK(sb);
+ if (!peek) {
+ if (last == NULL)
+ STAILQ_INIT(&sb->uxst_mbq);
+ else {
+ STAILQ_FIRST(&sb->uxst_mbq) = last;
+ MPASS(last->m_len > lastlen);
+ last->m_len -= lastlen;
+ last->m_data += lastlen;
+ }
+ MPASS(sb->sb_acc >= datalen);
+ sb->sb_acc -= datalen;
+ sb->sb_ccc -= datalen;
+ MPASS(sb->sb_ctl >= ctl);
+ sb->sb_ctl -= ctl;
+ MPASS(sb->sb_mbcnt >= mbcnt);
+ sb->sb_mbcnt -= mbcnt;
+ UIPC_STREAM_SBCHECK(sb);
+ /*
+ * In a blocking mode peer is sleeping on our receive buffer,
+ * and we need just wakeup(9) on it. But to wake up various
+ * event engines, we need to reach over to peer's selinfo.
+ * This can be safely done as the socket buffer receive lock
+ * is protecting us from the peer going away.
+ */
+ if (__predict_true(sb->uxst_peer != NULL)) {
+ struct selinfo *sel = &sb->uxst_peer->so_wrsel;
+ struct unpcb *unp2;
+ bool aio;
+
+ if ((aio = sb->uxst_flags & UXST_PEER_AIO))
+ sb->uxst_flags &= ~UXST_PEER_AIO;
+ if (sb->uxst_flags & UXST_PEER_SEL) {
+ selwakeuppri(sel, PSOCK);
+ /*
+ * XXXGL: sowakeup() does SEL_WAITING() without
+ * locks.
+ */
+ if (!SEL_WAITING(sel))
+ sb->uxst_flags &= ~UXST_PEER_SEL;
+ }
+ if (sb->sb_flags & SB_WAIT) {
+ sb->sb_flags &= ~SB_WAIT;
+ wakeup(&sb->sb_acc);
+ }
+ KNOTE_LOCKED(&sel->si_note, 0);
+ SOCK_RECVBUF_UNLOCK(so);
+ /*
+ * XXXGL: need to go through uipc_lock_peer() after
+ * the receive buffer lock dropped, it was protecting
+ * us from unp_soisdisconnected(). The aio workarounds
+ * should be refactored to the aio(4) side.
+ */
+ if (aio && uipc_lock_peer(so, &unp2) == 0) {
+ struct socket *so2 = unp2->unp_socket;
+
+ SOCK_SENDBUF_LOCK(so2);
+ so2->so_snd.sb_ccc -= datalen;
+ sowakeup_aio(so2, SO_SND);
+ SOCK_SENDBUF_UNLOCK(so2);
+ UNP_PCB_UNLOCK(unp2);
+ }
+ } else
+ SOCK_RECVBUF_UNLOCK(so);
+ } else
+ SOCK_RECVBUF_UNLOCK(so);
+
+ while (control != NULL && control->m_type == MT_CONTROL) {
+ if (!peek) {
+ struct mbuf *c;
+
+ /*
+ * unp_externalize() failure must abort entire read(2).
+ * Such failure should also free the problematic
+ * control, but link back the remaining data to the head
+ * of the buffer, so that socket is not left in a state
+ * where it can't progress forward with reading.
+ * Probability of such a failure is really low, so it
+ * is fine that we need to perform pretty complex
+ * operation here to reconstruct the buffer.
+ * XXXGL: unp_externalize() used to be
+ * dom_externalize() KBI and it frees whole chain, so
+ * we need to feed it with mbufs one by one.
+ */
+ c = control;
+ control = STAILQ_NEXT(c, m_stailq);
+ STAILQ_NEXT(c, m_stailq) = NULL;
+ error = unp_externalize(c, controlp, flags);
+ if (__predict_false(error && control != NULL)) {
+ struct mchain cmc;
+
+ mc_init_m(&cmc, control);
+
+ SOCK_RECVBUF_LOCK(so);
+ MPASS(!(sb->sb_state & SBS_CANTRCVMORE));
+
+ if (__predict_false(cmc.mc_len + sb->sb_ccc +
+ sb->sb_ctl > sb->sb_hiwat)) {
+ /*
+ * Too bad, while unp_externalize() was
+ * failing, the other side had filled
+ * the buffer and we can't prepend data
+ * back. Losing data!
+ */
+ SOCK_RECVBUF_UNLOCK(so);
+ SOCK_IO_RECV_UNLOCK(so);
+ unp_scan(mc_first(&cmc),
+ unp_freerights);
+ mc_freem(&cmc);
+ return (error);
+ }
+
+ UIPC_STREAM_SBCHECK(sb);
+ /* XXXGL: STAILQ_PREPEND */
+ STAILQ_CONCAT(&cmc.mc_q, &sb->uxst_mbq);
+ STAILQ_SWAP(&cmc.mc_q, &sb->uxst_mbq, mbuf);
+
+ sb->sb_ctl = sb->sb_acc = sb->sb_ccc =
+ sb->sb_mbcnt = 0;
+ STAILQ_FOREACH(m, &sb->uxst_mbq, m_stailq) {
+ if (m->m_type == MT_DATA) {
+ sb->sb_acc += m->m_len;
+ sb->sb_ccc += m->m_len;
+ } else {
+ sb->sb_ctl += m->m_len;
+ }
+ sb->sb_mbcnt += MSIZE;
+ if (m->m_flags & M_EXT)
+ sb->sb_mbcnt +=
+ m->m_ext.ext_size;
+ }
+ UIPC_STREAM_SBCHECK(sb);
+ SOCK_RECVBUF_UNLOCK(so);
+ SOCK_IO_RECV_UNLOCK(so);
+ return (error);
+ }
+ if (controlp != NULL) {
+ while (*controlp != NULL)
+ controlp = &(*controlp)->m_next;
+ }
+ } else {
+ /*
+ * XXXGL
+ *
+ * In MSG_PEEK case control is not externalized. This
+ * means we are leaking some kernel pointers to the
+ * userland. They are useless to a law-abiding
+ * application, but may be useful to a malware. This
+ * is what the historical implementation in the
+ * soreceive_generic() did. To be improved?
+ */
+ if (controlp != NULL) {
+ *controlp = m_copym(control, 0, control->m_len,
+ M_WAITOK);
+ controlp = &(*controlp)->m_next;
+ }
+ control = STAILQ_NEXT(control, m_stailq);
+ }
+ }
+
+ for (m = first; m != last; m = next) {
+ next = STAILQ_NEXT(m, m_stailq);
+ error = uiomove(mtod(m, char *), m->m_len, uio);
+ if (__predict_false(error)) {
+ SOCK_IO_RECV_UNLOCK(so);
+ if (!peek)
+ for (; m != last; m = next) {
+ next = STAILQ_NEXT(m, m_stailq);
+ m_free(m);
+ }
+ return (error);
+ }
+ if (!peek)
+ m_free(m);
+ }
+ if (last != NULL && lastlen > 0) {
+ if (!peek) {
+ MPASS(!(m->m_flags & M_PKTHDR));
+ MPASS(last->m_data - M_START(last) >= lastlen);
+ error = uiomove(mtod(last, char *) - lastlen,
+ lastlen, uio);
+ } else
+ error = uiomove(mtod(last, char *), lastlen, uio);
+ if (__predict_false(error)) {
+ SOCK_IO_RECV_UNLOCK(so);
+ return (error);
+ }
+ }
+ if (waitall && !(flags & MSG_EOR) && uio->uio_resid > 0)
+ goto restart;
+ SOCK_IO_RECV_UNLOCK(so);
+
+ if (flagsp != NULL)
+ *flagsp |= flags;
+
+ uio->uio_td->td_ru.ru_msgrcv++;
+
+ return (0);
+}
+
+static int
+uipc_sopoll_stream_or_seqpacket(struct socket *so, int events,
+ struct thread *td)
+{
+ struct unpcb *unp = sotounpcb(so);
+ int revents;
+
+ UNP_PCB_LOCK(unp);
+ if (SOLISTENING(so)) {
+ /* The above check is safe, since conversion to listening uses
+ * both protocol and socket lock.
+ */
+ SOCK_LOCK(so);
+ if (!(events & (POLLIN | POLLRDNORM)))
+ revents = 0;
+ else if (!TAILQ_EMPTY(&so->sol_comp))
+ revents = events & (POLLIN | POLLRDNORM);
+ else if (so->so_error)
+ revents = (events & (POLLIN | POLLRDNORM)) | POLLHUP;
+ else {
+ selrecord(td, &so->so_rdsel);
+ revents = 0;
+ }
+ SOCK_UNLOCK(so);
+ } else {
+ if (so->so_state & SS_ISDISCONNECTED)
+ revents = POLLHUP;
+ else
+ revents = 0;
+ if (events & (POLLIN | POLLRDNORM | POLLRDHUP)) {
+ SOCK_RECVBUF_LOCK(so);
+ if (sbavail(&so->so_rcv) >= so->so_rcv.sb_lowat ||
+ so->so_error || so->so_rerror)
+ revents |= events & (POLLIN | POLLRDNORM);
+ if (so->so_rcv.sb_state & SBS_CANTRCVMORE)
+ revents |= events & POLLRDHUP;
+ if (!(revents & (POLLIN | POLLRDNORM | POLLRDHUP))) {
+ selrecord(td, &so->so_rdsel);
+ so->so_rcv.sb_flags |= SB_SEL;
+ }
+ SOCK_RECVBUF_UNLOCK(so);
+ }
+ if (events & (POLLOUT | POLLWRNORM)) {
+ struct socket *so2 = so->so_rcv.uxst_peer;
+
+ if (so2 != NULL) {
+ struct sockbuf *sb = &so2->so_rcv;
+
+ SOCK_RECVBUF_LOCK(so2);
+ if (uipc_stream_sbspace(sb) >= sb->sb_lowat)
+ revents |= events &
+ (POLLOUT | POLLWRNORM);
+ if (sb->sb_state & SBS_CANTRCVMORE)
+ revents |= POLLHUP;
+ if (!(revents & (POLLOUT | POLLWRNORM)))
+ so2->so_rcv.uxst_flags |= UXST_PEER_SEL;
+ SOCK_RECVBUF_UNLOCK(so2);
+ }
+ if (!(revents & (POLLOUT | POLLWRNORM)))
+ selrecord(td, &so->so_wrsel);
+ }
+ }
+ UNP_PCB_UNLOCK(unp);
+ return (revents);
+}
+
+static void
+uipc_wrknl_lock(void *arg)
+{
+ struct socket *so = arg;
+ struct unpcb *unp = sotounpcb(so);
+
+retry:
+ if (SOLISTENING(so)) {
+ SOLISTEN_LOCK(so);
+ } else {
UNP_PCB_LOCK(unp);
- socantsendmore(so);
- unp_shutdown(unp);
+ if (__predict_false(SOLISTENING(so))) {
+ UNP_PCB_UNLOCK(unp);
+ goto retry;
+ }
+ if (so->so_rcv.uxst_peer != NULL)
+ SOCK_RECVBUF_LOCK(so->so_rcv.uxst_peer);
+ }
+}
+
+static void
+uipc_wrknl_unlock(void *arg)
+{
+ struct socket *so = arg;
+ struct unpcb *unp = sotounpcb(so);
+
+ if (SOLISTENING(so))
+ SOLISTEN_UNLOCK(so);
+ else {
+ if (so->so_rcv.uxst_peer != NULL)
+ SOCK_RECVBUF_UNLOCK(so->so_rcv.uxst_peer);
UNP_PCB_UNLOCK(unp);
}
- if (control != NULL && error != 0)
- unp_scan(control, unp_freerights);
+}
-release:
- if (control != NULL)
- m_freem(control);
- /*
- * In case of PRUS_NOTREADY, uipc_ready() is responsible
- * for freeing memory.
- */
- if (m != NULL && (flags & PRUS_NOTREADY) == 0)
- m_freem(m);
- return (error);
+static void
+uipc_wrknl_assert_lock(void *arg, int what)
+{
+ struct socket *so = arg;
+
+ if (SOLISTENING(so)) {
+ if (what == LA_LOCKED)
+ SOLISTEN_LOCK_ASSERT(so);
+ else
+ SOLISTEN_UNLOCK_ASSERT(so);
+ } else {
+ /*
+ * The pr_soreceive method will put a note without owning the
+ * unp lock, so we can't assert it here. But we can safely
+ * dereference uxst_peer pointer, since receive buffer lock
+ * is assumed to be held here.
+ */
+ if (what == LA_LOCKED && so->so_rcv.uxst_peer != NULL)
+ SOCK_RECVBUF_LOCK_ASSERT(so->so_rcv.uxst_peer);
+ }
+}
+
+static void
+uipc_filt_sowdetach(struct knote *kn)
+{
+ struct socket *so = kn->kn_fp->f_data;
+
+ uipc_wrknl_lock(so);
+ knlist_remove(&so->so_wrsel.si_note, kn, 1);
+ uipc_wrknl_unlock(so);
+}
+
+static int
+uipc_filt_sowrite(struct knote *kn, long hint)
+{
+ struct socket *so = kn->kn_fp->f_data, *so2;
+ struct unpcb *unp = sotounpcb(so), *unp2 = unp->unp_conn;
+
+ if (SOLISTENING(so) || unp2 == NULL)
+ return (0);
+
+ so2 = unp2->unp_socket;
+ SOCK_RECVBUF_LOCK_ASSERT(so2);
+ kn->kn_data = uipc_stream_sbspace(&so2->so_rcv);
+
+ if (so2->so_rcv.sb_state & SBS_CANTRCVMORE) {
+ kn->kn_flags |= EV_EOF;
+ kn->kn_fflags = so->so_error;
+ return (1);
+ } else if (kn->kn_sfflags & NOTE_LOWAT)
+ return (kn->kn_data >= kn->kn_sdata);
+ else
+ return (kn->kn_data >= so2->so_rcv.sb_lowat);
+}
+
+static int
+uipc_filt_soempty(struct knote *kn, long hint)
+{
+ struct socket *so = kn->kn_fp->f_data, *so2;
+ struct unpcb *unp = sotounpcb(so), *unp2 = unp->unp_conn;
+
+ if (SOLISTENING(so) || unp2 == NULL)
+ return (1);
+
+ so2 = unp2->unp_socket;
+ SOCK_RECVBUF_LOCK_ASSERT(so2);
+ kn->kn_data = uipc_stream_sbspace(&so2->so_rcv);
+
+ return (kn->kn_data == 0 ? 1 : 0);
+}
+
+static const struct filterops uipc_write_filtops = {
+ .f_isfd = 1,
+ .f_detach = uipc_filt_sowdetach,
+ .f_event = uipc_filt_sowrite,
+};
+static const struct filterops uipc_empty_filtops = {
+ .f_isfd = 1,
+ .f_detach = uipc_filt_sowdetach,
+ .f_event = uipc_filt_soempty,
+};
+
+static int
+uipc_kqfilter_stream_or_seqpacket(struct socket *so, struct knote *kn)
+{
+ struct unpcb *unp = sotounpcb(so);
+ struct knlist *knl;
+
+ switch (kn->kn_filter) {
+ case EVFILT_READ:
+ return (sokqfilter_generic(so, kn));
+ case EVFILT_WRITE:
+ kn->kn_fop = &uipc_write_filtops;
+ break;
+ case EVFILT_EMPTY:
+ kn->kn_fop = &uipc_empty_filtops;
+ break;
+ default:
+ return (EINVAL);
+ }
+
+ knl = &so->so_wrsel.si_note;
+ UNP_PCB_LOCK(unp);
+ if (SOLISTENING(so)) {
+ SOLISTEN_LOCK(so);
+ knlist_add(knl, kn, 1);
+ SOLISTEN_UNLOCK(so);
+ } else {
+ struct socket *so2 = so->so_rcv.uxst_peer;
+
+ if (so2 != NULL)
+ SOCK_RECVBUF_LOCK(so2);
+ knlist_add(knl, kn, 1);
+ if (so2 != NULL)
+ SOCK_RECVBUF_UNLOCK(so2);
+ }
+ UNP_PCB_UNLOCK(unp);
+ return (0);
}
/* PF_UNIX/SOCK_DGRAM version of sbspace() */
@@ -1134,7 +1891,8 @@ uipc_sosend_dgram(struct socket *so, struct sockaddr *addr, struct uio *uio,
const struct sockaddr *from;
struct socket *so2;
struct sockbuf *sb;
- struct mbuf *f, *clast;
+ struct mchain cmc = MCHAIN_INITIALIZER(&cmc);
+ struct mbuf *f;
u_int cc, ctl, mbcnt;
u_int dcc __diagused, dctl __diagused, dmbcnt __diagused;
int error;
@@ -1143,7 +1901,6 @@ uipc_sosend_dgram(struct socket *so, struct sockaddr *addr, struct uio *uio,
error = 0;
f = NULL;
- ctl = 0;
if (__predict_false(flags & MSG_OOB)) {
error = EOPNOTSUPP;
@@ -1162,16 +1919,14 @@ uipc_sosend_dgram(struct socket *so, struct sockaddr *addr, struct uio *uio,
f = m_gethdr(M_WAITOK, MT_SONAME);
cc = m->m_pkthdr.len;
mbcnt = MSIZE + m->m_pkthdr.memlen;
- if (c != NULL &&
- (error = unp_internalize(&c, td, &clast, &ctl, &mbcnt)))
+ if (c != NULL && (error = unp_internalize(c, &cmc, td)))
goto out;
} else {
- /* pr_sosend() with mbuf usually is a kernel thread. */
-
- M_ASSERTPKTHDR(m);
- if (__predict_false(c != NULL))
- panic("%s: control from a kernel thread", __func__);
+ struct mchain mc;
+ uipc_reset_kernel_mbuf(m, &mc);
+ cc = mc.mc_len;
+ mbcnt = mc.mc_mlen;
if (__predict_false(m->m_pkthdr.len > unpdg_maxdgram)) {
error = EMSGSIZE;
goto out;
@@ -1180,22 +1935,6 @@ uipc_sosend_dgram(struct socket *so, struct sockaddr *addr, struct uio *uio,
error = ENOBUFS;
goto out;
}
- /* Condition the foreign mbuf to our standards. */
- m_clrprotoflags(m);
- m_tag_delete_chain(m, NULL);
- m->m_pkthdr.rcvif = NULL;
- m->m_pkthdr.flowid = 0;
- m->m_pkthdr.csum_flags = 0;
- m->m_pkthdr.fibnum = 0;
- m->m_pkthdr.rsstype = 0;
-
- cc = m->m_pkthdr.len;
- mbcnt = MSIZE;
- for (struct mbuf *mb = m; mb != NULL; mb = mb->m_next) {
- mbcnt += MSIZE;
- if (mb->m_flags & M_EXT)
- mbcnt += mb->m_ext.ext_size;
- }
}
unp = sotounpcb(so);
@@ -1247,8 +1986,7 @@ uipc_sosend_dgram(struct socket *so, struct sockaddr *addr, struct uio *uio,
}
if (unp2->unp_flags & UNP_WANTCRED_MASK)
- c = unp_addsockcred(td, c, unp2->unp_flags, &clast, &ctl,
- &mbcnt);
+ unp_addsockcred(td, &cmc, unp2->unp_flags);
if (unp->unp_addr != NULL)
from = (struct sockaddr *)unp->unp_addr;
else
@@ -1256,25 +1994,21 @@ uipc_sosend_dgram(struct socket *so, struct sockaddr *addr, struct uio *uio,
f->m_len = from->sa_len;
MPASS(from->sa_len <= MLEN);
bcopy(from, mtod(f, void *), from->sa_len);
- ctl += f->m_len;
/*
* Concatenate mbufs: from -> control -> data.
* Save overall cc and mbcnt in "from" mbuf.
*/
- if (c != NULL) {
-#ifdef INVARIANTS
- struct mbuf *mc;
-
- for (mc = c; mc->m_next != NULL; mc = mc->m_next);
- MPASS(mc == clast);
-#endif
- f->m_next = c;
- clast->m_next = m;
- c = NULL;
+ if (!STAILQ_EMPTY(&cmc.mc_q)) {
+ f->m_next = mc_first(&cmc);
+ mc_last(&cmc)->m_next = m;
+ /* XXXGL: This is dirty as well as rollback after ENOBUFS. */
+ STAILQ_INIT(&cmc.mc_q);
} else
f->m_next = m;
m = NULL;
+ ctl = f->m_len + cmc.mc_len;
+ mbcnt += cmc.mc_mlen;
#ifdef INVARIANTS
dcc = dctl = dmbcnt = 0;
for (struct mbuf *mb = f; mb != NULL; mb = mb->m_next) {
@@ -1340,7 +2074,7 @@ uipc_sosend_dgram(struct socket *so, struct sockaddr *addr, struct uio *uio,
soroverflow_locked(so2);
error = ENOBUFS;
if (f->m_next->m_type == MT_CONTROL) {
- c = f->m_next;
+ STAILQ_FIRST(&cmc.mc_q) = f->m_next;
f->m_next = NULL;
}
}
@@ -1355,13 +2089,12 @@ uipc_sosend_dgram(struct socket *so, struct sockaddr *addr, struct uio *uio,
out3:
SOCK_IO_SEND_UNLOCK(so);
out2:
- if (c)
- unp_scan(c, unp_freerights);
+ if (!mc_empty(&cmc))
+ unp_scan(mc_first(&cmc), unp_freerights);
out:
if (f)
m_freem(f);
- if (c)
- m_freem(c);
+ mc_freem(&cmc);
if (m)
m_freem(m);
@@ -1602,10 +2335,161 @@ uipc_soreceive_dgram(struct socket *so, struct sockaddr **psa, struct uio *uio,
return (0);
}
+static int
+uipc_sendfile_wait(struct socket *so, off_t need, int *space)
+{
+ struct unpcb *unp2;
+ struct socket *so2;
+ struct sockbuf *sb;
+ bool nonblock, sockref;
+ int error;
+
+ MPASS(so->so_type == SOCK_STREAM);
+ MPASS(need > 0);
+ MPASS(space != NULL);
+
+ nonblock = so->so_state & SS_NBIO;
+ sockref = false;
+
+ if (__predict_false((so->so_state & SS_ISCONNECTED) == 0))
+ return (ENOTCONN);
+
+ if (__predict_false((error = uipc_lock_peer(so, &unp2)) != 0))
+ return (error);
+
+ so2 = unp2->unp_socket;
+ sb = &so2->so_rcv;
+ SOCK_RECVBUF_LOCK(so2);
+ UNP_PCB_UNLOCK(unp2);
+ while ((*space = uipc_stream_sbspace(sb)) < need &&
+ (*space < so->so_snd.sb_hiwat / 2)) {
+ UIPC_STREAM_SBCHECK(sb);
+ if (nonblock) {
+ SOCK_RECVBUF_UNLOCK(so2);
+ return (EAGAIN);
+ }
+ if (!sockref)
+ soref(so2);
+ error = sbwait(so2, SO_RCV);
+ if (error == 0 &&
+ __predict_false(sb->sb_state & SBS_CANTRCVMORE))
+ error = EPIPE;
+ if (error) {
+ SOCK_RECVBUF_UNLOCK(so2);
+ sorele(so2);
+ return (error);
+ }
+ }
+ UIPC_STREAM_SBCHECK(sb);
+ SOCK_RECVBUF_UNLOCK(so2);
+ if (sockref)
+ sorele(so2);
+
+ return (0);
+}
+
+/*
+ * Although this is a pr_send method, for unix(4) it is called only via
+ * sendfile(2) path. This means we can be sure that mbufs are clear of
+ * any extra flags and don't require any conditioning.
+ */
+static int
+uipc_sendfile(struct socket *so, int flags, struct mbuf *m,
+ struct sockaddr *from, struct mbuf *control, struct thread *td)
+{
+ struct mchain mc;
+ struct unpcb *unp2;
+ struct socket *so2;
+ struct sockbuf *sb;
+ bool notready, wakeup;
+ int error;
+
+ MPASS(so->so_type == SOCK_STREAM);
+ MPASS(from == NULL && control == NULL);
+ KASSERT(!(m->m_flags & M_EXTPG),
+ ("unix(4): TLS sendfile(2) not supported"));
+
+ notready = flags & PRUS_NOTREADY;
+
+ if (__predict_false((so->so_state & SS_ISCONNECTED) == 0)) {
+ error = ENOTCONN;
+ goto out;
+ }
+
+ if (__predict_false((error = uipc_lock_peer(so, &unp2)) != 0))
+ goto out;
+
+ mc_init_m(&mc, m);
+
+ so2 = unp2->unp_socket;
+ sb = &so2->so_rcv;
+ SOCK_RECVBUF_LOCK(so2);
+ UNP_PCB_UNLOCK(unp2);
+ UIPC_STREAM_SBCHECK(sb);
+ sb->sb_ccc += mc.mc_len;
+ sb->sb_mbcnt += mc.mc_mlen;
+ if (sb->uxst_fnrdy == NULL) {
+ if (notready) {
+ sb->uxst_fnrdy = STAILQ_FIRST(&mc.mc_q);
+ wakeup = false;
+ } else {
+ sb->sb_acc += mc.mc_len;
+ wakeup = true;
+ }
+ } else
+ wakeup = false;
+ STAILQ_CONCAT(&sb->uxst_mbq, &mc.mc_q);
+ UIPC_STREAM_SBCHECK(sb);
+ if (wakeup)
+ sorwakeup_locked(so2);
+ else
+ SOCK_RECVBUF_UNLOCK(so2);
+
+ return (0);
+out:
+ /*
+ * In case of not ready data, uipc_ready() is responsible
+ * for freeing memory.
+ */
+ if (m != NULL && !notready)
+ m_freem(m);
+
+ return (error);
+}
+
+static int
+uipc_sbready(struct sockbuf *sb, struct mbuf *m, int count)
+{
+ u_int blocker;
+
+ /* assert locked */
+
+ blocker = (sb->uxst_fnrdy == m) ? M_BLOCKED : 0;
+ STAILQ_FOREACH_FROM(m, &sb->uxst_mbq, m_stailq) {
+ if (count > 0) {
+ MPASS(m->m_flags & M_NOTREADY);
+ m->m_flags &= ~(M_NOTREADY | blocker);
+ if (blocker)
+ sb->sb_acc += m->m_len;
+ count--;
+ } else if (blocker && !(m->m_flags & M_NOTREADY)) {
+ MPASS(m->m_flags & M_BLOCKED);
+ m->m_flags &= ~M_BLOCKED;
+ sb->sb_acc += m->m_len;
+ } else
+ break;
+ }
+ if (blocker) {
+ sb->uxst_fnrdy = m;
+ return (0);
+ } else
+ return (EINPROGRESS);
+}
+
static bool
uipc_ready_scan(struct socket *so, struct mbuf *m, int count, int *errorp)
{
- struct mbuf *mb, *n;
+ struct mbuf *mb;
struct sockbuf *sb;
SOCK_LOCK(so);
@@ -1615,22 +2499,16 @@ uipc_ready_scan(struct socket *so, struct mbuf *m, int count, int *errorp)
}
mb = NULL;
sb = &so->so_rcv;
- SOCKBUF_LOCK(sb);
- if (sb->sb_fnrdy != NULL) {
- for (mb = sb->sb_mb, n = mb->m_nextpkt; mb != NULL;) {
+ SOCK_RECVBUF_LOCK(so);
+ if (sb->uxst_fnrdy != NULL) {
+ STAILQ_FOREACH(mb, &sb->uxst_mbq, m_stailq) {
if (mb == m) {
- *errorp = sbready(sb, m, count);
+ *errorp = uipc_sbready(sb, m, count);
break;
}
- mb = mb->m_next;
- if (mb == NULL) {
- mb = n;
- if (mb != NULL)
- n = mb->m_nextpkt;
- }
}
}
- SOCKBUF_UNLOCK(sb);
+ SOCK_RECVBUF_UNLOCK(so);
SOCK_UNLOCK(so);
return (mb != NULL);
}
@@ -1639,45 +2517,44 @@ static int
uipc_ready(struct socket *so, struct mbuf *m, int count)
{
struct unpcb *unp, *unp2;
- struct socket *so2;
- int error, i;
+ int error;
- unp = sotounpcb(so);
+ MPASS(so->so_type == SOCK_STREAM);
- KASSERT(so->so_type == SOCK_STREAM,
- ("%s: unexpected socket type for %p", __func__, so));
+ if (__predict_true(uipc_lock_peer(so, &unp2) == 0)) {
+ struct socket *so2;
+ struct sockbuf *sb;
- UNP_PCB_LOCK(unp);
- if ((unp2 = unp_pcb_lock_peer(unp)) != NULL) {
- UNP_PCB_UNLOCK(unp);
so2 = unp2->unp_socket;
- SOCKBUF_LOCK(&so2->so_rcv);
- if ((error = sbready(&so2->so_rcv, m, count)) == 0)
+ sb = &so2->so_rcv;
+ SOCK_RECVBUF_LOCK(so2);
+ UNP_PCB_UNLOCK(unp2);
+ UIPC_STREAM_SBCHECK(sb);
+ error = uipc_sbready(sb, m, count);
+ UIPC_STREAM_SBCHECK(sb);
+ if (error == 0)
sorwakeup_locked(so2);
else
- SOCKBUF_UNLOCK(&so2->so_rcv);
- UNP_PCB_UNLOCK(unp2);
- return (error);
- }
- UNP_PCB_UNLOCK(unp);
-
- /*
- * The receiving socket has been disconnected, but may still be valid.
- * In this case, the now-ready mbufs are still present in its socket
- * buffer, so perform an exhaustive search before giving up and freeing
- * the mbufs.
- */
- UNP_LINK_RLOCK();
- LIST_FOREACH(unp, &unp_shead, unp_link) {
- if (uipc_ready_scan(unp->unp_socket, m, count, &error))
- break;
- }
- UNP_LINK_RUNLOCK();
+ SOCK_RECVBUF_UNLOCK(so2);
+ } else {
+ /*
+ * The receiving socket has been disconnected, but may still
+ * be valid. In this case, the not-ready mbufs are still
+ * present in its socket buffer, so perform an exhaustive
+ * search before giving up and freeing the mbufs.
+ */
+ UNP_LINK_RLOCK();
+ LIST_FOREACH(unp, &unp_shead, unp_link) {
+ if (uipc_ready_scan(unp->unp_socket, m, count, &error))
+ break;
+ }
+ UNP_LINK_RUNLOCK();
- if (unp == NULL) {
- for (i = 0; i < count; i++)
- m = m_free(m);
- error = ECONNRESET;
+ if (unp == NULL) {
+ for (int i = 0; i < count; i++)
+ m = m_free(m);
+ return (ECONNRESET);
+ }
}
return (error);
}
@@ -1992,13 +2869,17 @@ unp_connectat(int fd, struct socket *so, struct sockaddr *nam,
}
if (connreq) {
if (SOLISTENING(so2))
- so2 = sonewconn(so2, 0);
+ so2 = solisten_clone(so2);
else
so2 = NULL;
if (so2 == NULL) {
error = ECONNREFUSED;
goto bad2;
}
+ if ((error = uipc_attach(so2, 0, NULL)) != 0) {
+ sodealloc(so2);
+ goto bad2;
+ }
unp3 = sotounpcb(so2);
unp_pcb_lock_pair(unp2, unp3);
if (unp2->unp_addr != NULL) {
@@ -2027,7 +2908,9 @@ unp_connectat(int fd, struct socket *so, struct sockaddr *nam,
KASSERT(unp2 != NULL && so2 != NULL && unp2->unp_socket == so2 &&
sotounpcb(so2) == unp2,
("%s: unp2 %p so2 %p", __func__, unp2, so2));
- unp_connect2(so, so2);
+ unp_connect2(so, so2, connreq);
+ if (connreq)
+ (void)solisten_enqueue(so2, SS_ISCONNECTED);
KASSERT((unp->unp_flags & UNP_CONNECTING) != 0,
("%s: unp %p has UNP_CONNECTING clear", __func__, unp));
unp->unp_flags &= ~UNP_CONNECTING;
@@ -2077,8 +2960,66 @@ unp_copy_peercred(struct thread *td, struct unpcb *client_unp,
client_unp->unp_flags |= (listen_unp->unp_flags & UNP_WANTCRED_MASK);
}
+/*
+ * unix/stream & unix/seqpacket version of soisconnected().
+ *
+ * The crucial thing we are doing here is setting up the uxst_peer linkage,
+ * holding unp and receive buffer locks of the both sockets. The disconnect
+ * procedure does the same. This gives as a safe way to access the peer in the
+ * send(2) and recv(2) during the socket lifetime.
+ *
+ * The less important thing is event notification of the fact that a socket is
+ * now connected. It is unusual for a software to put a socket into event
+ * mechanism before connect(2), but is supposed to be supported. Note that
+ * there can not be any sleeping I/O on the socket, yet, only presence in the
+ * select/poll/kevent.
+ *
+ * This function can be called via two call paths:
+ * 1) socketpair(2) - in this case socket has not been yet reported to userland
+ * and just can't have any event notifications mechanisms set up. The
+ * 'wakeup' boolean is always false.
+ * 2) connect(2) of existing socket to a recent clone of a listener:
+ * 2.1) Socket that connect(2)s will have 'wakeup' true. An application
+ * could have already put it into event mechanism, is it shall be
+ * reported as readable and as writable.
+ * 2.2) Socket that was just cloned with solisten_clone(). Same as 1).
+ */
static void
-unp_connect2(struct socket *so, struct socket *so2)
+unp_soisconnected(struct socket *so, bool wakeup)
+{
+ struct socket *so2 = sotounpcb(so)->unp_conn->unp_socket;
+ struct sockbuf *sb;
+
+ SOCK_LOCK_ASSERT(so);
+ UNP_PCB_LOCK_ASSERT(sotounpcb(so));
+ UNP_PCB_LOCK_ASSERT(sotounpcb(so2));
+ SOCK_RECVBUF_LOCK_ASSERT(so);
+ SOCK_RECVBUF_LOCK_ASSERT(so2);
+
+ MPASS(so->so_type == SOCK_STREAM || so->so_type == SOCK_SEQPACKET);
+ MPASS((so->so_state & (SS_ISCONNECTED | SS_ISCONNECTING |
+ SS_ISDISCONNECTING)) == 0);
+ MPASS(so->so_qstate == SQ_NONE);
+
+ so->so_state &= ~SS_ISDISCONNECTED;
+ so->so_state |= SS_ISCONNECTED;
+
+ sb = &so2->so_rcv;
+ sb->uxst_peer = so;
+
+ if (wakeup) {
+ KNOTE_LOCKED(&sb->sb_sel->si_note, 0);
+ sb = &so->so_rcv;
+ selwakeuppri(sb->sb_sel, PSOCK);
+ SOCK_SENDBUF_LOCK_ASSERT(so);
+ sb = &so->so_snd;
+ selwakeuppri(sb->sb_sel, PSOCK);
+ SOCK_SENDBUF_UNLOCK(so);
+ }
+}
+
+static void
+unp_connect2(struct socket *so, struct socket *so2, bool wakeup)
{
struct unpcb *unp;
struct unpcb *unp2;
@@ -2110,8 +3051,18 @@ unp_connect2(struct socket *so, struct socket *so2)
KASSERT(unp2->unp_conn == NULL,
("%s: socket %p is already connected", __func__, unp2));
unp2->unp_conn = unp;
- soisconnected(so);
- soisconnected(so2);
+ SOCK_LOCK(so);
+ SOCK_LOCK(so2);
+ if (wakeup) /* Avoid LOR with receive buffer lock. */
+ SOCK_SENDBUF_LOCK(so);
+ SOCK_RECVBUF_LOCK(so);
+ SOCK_RECVBUF_LOCK(so2);
+ unp_soisconnected(so, wakeup); /* Will unlock send buffer. */
+ unp_soisconnected(so2, false);
+ SOCK_RECVBUF_UNLOCK(so);
+ SOCK_RECVBUF_UNLOCK(so2);
+ SOCK_UNLOCK(so);
+ SOCK_UNLOCK(so2);
break;
default:
@@ -2120,6 +3071,23 @@ unp_connect2(struct socket *so, struct socket *so2)
}
static void
+unp_soisdisconnected(struct socket *so)
+{
+ SOCK_LOCK_ASSERT(so);
+ SOCK_RECVBUF_LOCK_ASSERT(so);
+ MPASS(so->so_type == SOCK_STREAM || so->so_type == SOCK_SEQPACKET);
+ MPASS(!SOLISTENING(so));
+ MPASS((so->so_state & (SS_ISCONNECTING | SS_ISDISCONNECTING |
+ SS_ISDISCONNECTED)) == 0);
+ MPASS(so->so_state & SS_ISCONNECTED);
+
+ so->so_state |= SS_ISDISCONNECTED;
+ so->so_state &= ~SS_ISCONNECTED;
+ so->so_rcv.uxst_peer = NULL;
+ socantrcvmore_locked(so);
+}
+
+static void
unp_disconnect(struct unpcb *unp, struct unpcb *unp2)
{
struct socket *so, *so2;
@@ -2191,12 +3159,16 @@ unp_disconnect(struct unpcb *unp, struct unpcb *unp2)
case SOCK_STREAM:
case SOCK_SEQPACKET:
- if (so)
- soisdisconnected(so);
+ SOCK_LOCK(so);
+ SOCK_LOCK(so2);
+ SOCK_RECVBUF_LOCK(so);
+ SOCK_RECVBUF_LOCK(so2);
+ unp_soisdisconnected(so);
MPASS(unp2->unp_conn == unp);
unp2->unp_conn = NULL;
- if (so2)
- soisdisconnected(so2);
+ unp_soisdisconnected(so2);
+ SOCK_UNLOCK(so);
+ SOCK_UNLOCK(so2);
break;
}
@@ -2401,13 +3373,13 @@ unp_drop(struct unpcb *unp)
/*
* Regardless of whether the socket's peer dropped the connection
* with this socket by aborting or disconnecting, POSIX requires
- * that ECONNRESET is returned.
+ * that ECONNRESET is returned on next connected send(2) in case of
+ * a SOCK_DGRAM socket and EPIPE for SOCK_STREAM.
*/
-
UNP_PCB_LOCK(unp);
- so = unp->unp_socket;
- if (so)
- so->so_error = ECONNRESET;
+ if ((so = unp->unp_socket) != NULL)
+ so->so_error =
+ so->so_proto->pr_type == SOCK_DGRAM ? ECONNRESET : EPIPE;
if ((unp2 = unp_pcb_lock_peer(unp)) != NULL) {
/* Last reference dropped in unp_disconnect(). */
unp_pcb_rele_notlast(unp);
@@ -2608,15 +3580,14 @@ unp_internalize_cleanup_rights(struct mbuf *control)
}
static int
-unp_internalize(struct mbuf **controlp, struct thread *td,
- struct mbuf **clast, u_int *space, u_int *mbcnt)
+unp_internalize(struct mbuf *control, struct mchain *mc, struct thread *td)
{
- struct mbuf *control, **initial_controlp;
struct proc *p;
struct filedesc *fdesc;
struct bintime *bt;
struct cmsghdr *cm;
struct cmsgcred *cmcred;
+ struct mbuf *m;
struct filedescent *fde, **fdep, *fdev;
struct file *fp;
struct timeval *tv;
@@ -2626,15 +3597,13 @@ unp_internalize(struct mbuf **controlp, struct thread *td,
int i, j, error, *fdp, oldfds;
u_int newlen;
- MPASS((*controlp)->m_next == NULL); /* COMPAT_OLDSOCK may violate */
+ MPASS(control->m_next == NULL); /* COMPAT_OLDSOCK may violate */
UNP_LINK_UNLOCK_ASSERT();
p = td->td_proc;
fdesc = p->p_fd;
error = 0;
- control = *controlp;
- *controlp = NULL;
- initial_controlp = controlp;
+ *mc = MCHAIN_INITIALIZER(mc);
for (clen = control->m_len, cm = mtod(control, struct cmsghdr *),
data = CMSG_DATA(cm);
@@ -2648,10 +3617,10 @@ unp_internalize(struct mbuf **controlp, struct thread *td,
datalen = (char *)cm + cm->cmsg_len - (char *)data;
switch (cm->cmsg_type) {
case SCM_CREDS:
- *controlp = sbcreatecontrol(NULL, sizeof(*cmcred),
- SCM_CREDS, SOL_SOCKET, M_WAITOK);
+ m = sbcreatecontrol(NULL, sizeof(*cmcred), SCM_CREDS,
+ SOL_SOCKET, M_WAITOK);
cmcred = (struct cmsgcred *)
- CMSG_DATA(mtod(*controlp, struct cmsghdr *));
+ CMSG_DATA(mtod(m, struct cmsghdr *));
cmcred->cmcred_pid = p->p_pid;
cmcred->cmcred_uid = td->td_ucred->cr_ruid;
cmcred->cmcred_gid = td->td_ucred->cr_rgid;
@@ -2704,8 +3673,8 @@ unp_internalize(struct mbuf **controlp, struct thread *td,
* Now replace the integer FDs with pointers to the
* file structure and capability rights.
*/
- *controlp = sbcreatecontrol(NULL, newlen,
- SCM_RIGHTS, SOL_SOCKET, M_WAITOK);
+ m = sbcreatecontrol(NULL, newlen, SCM_RIGHTS,
+ SOL_SOCKET, M_WAITOK);
fdp = data;
for (i = 0; i < oldfds; i++, fdp++) {
if (!fhold(fdesc->fd_ofiles[*fdp].fde_file)) {
@@ -2721,7 +3690,7 @@ unp_internalize(struct mbuf **controlp, struct thread *td,
}
fdp = data;
fdep = (struct filedescent **)
- CMSG_DATA(mtod(*controlp, struct cmsghdr *));
+ CMSG_DATA(mtod(m, struct cmsghdr *));
fdev = malloc(sizeof(*fdev) * oldfds, M_FILECAPS,
M_WAITOK);
for (i = 0; i < oldfds; i++, fdev++, fdp++) {
@@ -2736,34 +3705,34 @@ unp_internalize(struct mbuf **controlp, struct thread *td,
break;
case SCM_TIMESTAMP:
- *controlp = sbcreatecontrol(NULL, sizeof(*tv),
- SCM_TIMESTAMP, SOL_SOCKET, M_WAITOK);
+ m = sbcreatecontrol(NULL, sizeof(*tv), SCM_TIMESTAMP,
+ SOL_SOCKET, M_WAITOK);
tv = (struct timeval *)
- CMSG_DATA(mtod(*controlp, struct cmsghdr *));
+ CMSG_DATA(mtod(m, struct cmsghdr *));
microtime(tv);
break;
case SCM_BINTIME:
- *controlp = sbcreatecontrol(NULL, sizeof(*bt),
- SCM_BINTIME, SOL_SOCKET, M_WAITOK);
+ m = sbcreatecontrol(NULL, sizeof(*bt), SCM_BINTIME,
+ SOL_SOCKET, M_WAITOK);
bt = (struct bintime *)
- CMSG_DATA(mtod(*controlp, struct cmsghdr *));
+ CMSG_DATA(mtod(m, struct cmsghdr *));
bintime(bt);
break;
case SCM_REALTIME:
- *controlp = sbcreatecontrol(NULL, sizeof(*ts),
- SCM_REALTIME, SOL_SOCKET, M_WAITOK);
+ m = sbcreatecontrol(NULL, sizeof(*ts), SCM_REALTIME,
+ SOL_SOCKET, M_WAITOK);
ts = (struct timespec *)
- CMSG_DATA(mtod(*controlp, struct cmsghdr *));
+ CMSG_DATA(mtod(m, struct cmsghdr *));
nanotime(ts);
break;
case SCM_MONOTONIC:
- *controlp = sbcreatecontrol(NULL, sizeof(*ts),
- SCM_MONOTONIC, SOL_SOCKET, M_WAITOK);
+ m = sbcreatecontrol(NULL, sizeof(*ts), SCM_MONOTONIC,
+ SOL_SOCKET, M_WAITOK);
ts = (struct timespec *)
- CMSG_DATA(mtod(*controlp, struct cmsghdr *));
+ CMSG_DATA(mtod(m, struct cmsghdr *));
nanouptime(ts);
break;
@@ -2772,28 +3741,20 @@ unp_internalize(struct mbuf **controlp, struct thread *td,
goto out;
}
- if (space != NULL) {
- *space += (*controlp)->m_len;
- *mbcnt += MSIZE;
- if ((*controlp)->m_flags & M_EXT)
- *mbcnt += (*controlp)->m_ext.ext_size;
- *clast = *controlp;
- }
- controlp = &(*controlp)->m_next;
+ mc_append(mc, m);
}
if (clen > 0)
error = EINVAL;
out:
- if (error != 0 && initial_controlp != NULL)
- unp_internalize_cleanup_rights(*initial_controlp);
+ if (error != 0)
+ unp_internalize_cleanup_rights(mc_first(mc));
m_freem(control);
return (error);
}
-static struct mbuf *
-unp_addsockcred(struct thread *td, struct mbuf *control, int mode,
- struct mbuf **clast, u_int *space, u_int *mbcnt)
+static void
+unp_addsockcred(struct thread *td, struct mchain *mc, int mode)
{
struct mbuf *m, *n, *n_prev;
const struct cmsghdr *cm;
@@ -2809,9 +3770,10 @@ unp_addsockcred(struct thread *td, struct mbuf *control, int mode,
cmsgtype = SCM_CREDS;
}
+ /* XXXGL: uipc_sosend_*() need to be improved so that we can M_WAITOK */
m = sbcreatecontrol(NULL, ctrlsz, cmsgtype, SOL_SOCKET, M_NOWAIT);
if (m == NULL)
- return (control);
+ return;
MPASS((m->m_flags & M_EXT) == 0 && m->m_next == NULL);
if (mode & UNP_WANTCRED_ALWAYS) {
@@ -2845,50 +3807,18 @@ unp_addsockcred(struct thread *td, struct mbuf *control, int mode,
* created SCM_CREDS control message (struct sockcred) has another
* format.
*/
- if (control != NULL && cmsgtype == SCM_CREDS)
- for (n = control, n_prev = NULL; n != NULL;) {
+ if (!STAILQ_EMPTY(&mc->mc_q) && cmsgtype == SCM_CREDS)
+ STAILQ_FOREACH_SAFE(n, &mc->mc_q, m_stailq, n_prev) {
cm = mtod(n, struct cmsghdr *);
if (cm->cmsg_level == SOL_SOCKET &&
cm->cmsg_type == SCM_CREDS) {
- if (n_prev == NULL)
- control = n->m_next;
- else
- n_prev->m_next = n->m_next;
- if (space != NULL) {
- MPASS(*space >= n->m_len);
- *space -= n->m_len;
- MPASS(*mbcnt >= MSIZE);
- *mbcnt -= MSIZE;
- if (n->m_flags & M_EXT) {
- MPASS(*mbcnt >=
- n->m_ext.ext_size);
- *mbcnt -= n->m_ext.ext_size;
- }
- MPASS(clast);
- if (*clast == n) {
- MPASS(n->m_next == NULL);
- if (n_prev == NULL)
- *clast = m;
- else
- *clast = n_prev;
- }
- }
- n = m_free(n);
- } else {
- n_prev = n;
- n = n->m_next;
+ mc_remove(mc, n);
+ m_free(n);
}
}
/* Prepend it to the head. */
- m->m_next = control;
- if (space != NULL) {
- *space += m->m_len;
- *mbcnt += MSIZE;
- if (control == NULL)
- *clast = m;
- }
- return (m);
+ mc_prepend(mc, m);
}
static struct unpcb *
@@ -3056,7 +3986,7 @@ unp_scan_socket(struct socket *so, void (*op)(struct filedescent **, int))
break;
case SOCK_STREAM:
case SOCK_SEQPACKET:
- unp_scan(so->so_rcv.sb_mb, op);
+ unp_scan(STAILQ_FIRST(&so->so_rcv.uxst_mbq), op);
break;
}
SOCK_RECVBUF_UNLOCK(so);
@@ -3266,39 +4196,52 @@ unp_dispose(struct socket *so)
}
m = STAILQ_FIRST(&sb->uxdg_mb);
STAILQ_INIT(&sb->uxdg_mb);
- /* XXX: our shortened sbrelease() */
- (void)chgsbsize(so->so_cred->cr_uidinfo, &sb->sb_hiwat, 0,
- RLIM_INFINITY);
- /*
- * XXXGL Mark sb with SBS_CANTRCVMORE. This is needed to
- * prevent uipc_sosend_dgram() or unp_disconnect() adding more
- * data to the socket.
- * We came here either through shutdown(2) or from the final
- * sofree(). The sofree() case is simple as it guarantees
- * that no more sends will happen, however we can race with
- * unp_disconnect() from our peer. The shutdown(2) case is
- * more exotic. It would call into unp_dispose() only if
- * socket is SS_ISCONNECTED. This is possible if we did
- * connect(2) on this socket and we also had it bound with
- * bind(2) and receive connections from other sockets.
- * Because uipc_shutdown() violates POSIX (see comment
- * there) we will end up here shutting down our receive side.
- * Of course this will have affect not only on the peer we
- * connect(2)ed to, but also on all of the peers who had
- * connect(2)ed to us. Their sends would end up with ENOBUFS.
- */
- sb->sb_state |= SBS_CANTRCVMORE;
break;
case SOCK_STREAM:
case SOCK_SEQPACKET:
sb = &so->so_rcv;
- m = sbcut_locked(sb, sb->sb_ccc);
- KASSERT(sb->sb_ccc == 0 && sb->sb_mb == 0 && sb->sb_mbcnt == 0,
- ("%s: ccc %u mb %p mbcnt %u", __func__,
- sb->sb_ccc, (void *)sb->sb_mb, sb->sb_mbcnt));
- sbrelease_locked(so, SO_RCV);
+ m = STAILQ_FIRST(&sb->uxst_mbq);
+ STAILQ_INIT(&sb->uxst_mbq);
+ sb->sb_acc = sb->sb_ccc = sb->sb_ctl = sb->sb_mbcnt = 0;
+ /*
+ * Trim M_NOTREADY buffers from the free list. They are
+ * referenced by the I/O thread.
+ */
+ if (sb->uxst_fnrdy != NULL) {
+ struct mbuf *n, *prev;
+
+ while (m != NULL && m->m_flags & M_NOTREADY)
+ m = m->m_next;
+ for (prev = n = m; n != NULL; n = n->m_next) {
+ if (n->m_flags & M_NOTREADY) {
+ n = n->m_next;
+ prev->m_next = n;
+ } else
+ prev = n;
+ }
+ sb->uxst_fnrdy = NULL;
+ }
break;
}
+ /*
+ * Mark sb with SBS_CANTRCVMORE. This is needed to prevent
+ * uipc_sosend_*() or unp_disconnect() adding more data to the socket.
+ * We came here either through shutdown(2) or from the final sofree().
+ * The sofree() case is simple as it guarantees that no more sends will
+ * happen, however we can race with unp_disconnect() from our peer.
+ * The shutdown(2) case is more exotic. It would call into
+ * unp_dispose() only if socket is SS_ISCONNECTED. This is possible if
+ * we did connect(2) on this socket and we also had it bound with
+ * bind(2) and receive connections from other sockets. Because
+ * uipc_shutdown() violates POSIX (see comment there) this applies to
+ * SOCK_DGRAM as well. For SOCK_DGRAM this SBS_CANTRCVMORE will have
+ * affect not only on the peer we connect(2)ed to, but also on all of
+ * the peers who had connect(2)ed to us. Their sends would end up
+ * with ENOBUFS.
+ */
+ sb->sb_state |= SBS_CANTRCVMORE;
+ (void)chgsbsize(so->so_cred->cr_uidinfo, &sb->sb_hiwat, 0,
+ RLIM_INFINITY);
SOCK_RECVBUF_UNLOCK(so);
SOCK_IO_RECV_UNLOCK(so);
@@ -3357,7 +4300,7 @@ unp_scan(struct mbuf *m0, void (*op)(struct filedescent **, int))
*/
static struct protosw streamproto = {
.pr_type = SOCK_STREAM,
- .pr_flags = PR_CONNREQUIRED | PR_WANTRCVD | PR_CAPATTACH,
+ .pr_flags = PR_CONNREQUIRED | PR_CAPATTACH | PR_SOCKBUF,
.pr_ctloutput = &uipc_ctloutput,
.pr_abort = uipc_abort,
.pr_accept = uipc_peeraddr,
@@ -3371,14 +4314,16 @@ static struct protosw streamproto = {
.pr_disconnect = uipc_disconnect,
.pr_listen = uipc_listen,
.pr_peeraddr = uipc_peeraddr,
- .pr_rcvd = uipc_rcvd,
- .pr_send = uipc_send,
- .pr_sendfile_wait = sendfile_wait_generic,
+ .pr_send = uipc_sendfile,
+ .pr_sendfile_wait = uipc_sendfile_wait,
.pr_ready = uipc_ready,
.pr_sense = uipc_sense,
.pr_shutdown = uipc_shutdown,
.pr_sockaddr = uipc_sockaddr,
- .pr_soreceive = soreceive_generic,
+ .pr_sosend = uipc_sosend_stream_or_seqpacket,
+ .pr_soreceive = uipc_soreceive_stream_or_seqpacket,
+ .pr_sopoll = uipc_sopoll_stream_or_seqpacket,
+ .pr_kqfilter = uipc_kqfilter_stream_or_seqpacket,
.pr_close = uipc_close,
.pr_chmod = uipc_chmod,
};
@@ -3409,13 +4354,7 @@ static struct protosw dgramproto = {
static struct protosw seqpacketproto = {
.pr_type = SOCK_SEQPACKET,
- /*
- * XXXRW: For now, PR_ADDR because soreceive will bump into them
- * due to our use of sbappendaddr. A new sbappend variants is needed
- * that supports both atomic record writes and control data.
- */
- .pr_flags = PR_ADDR | PR_ATOMIC | PR_CONNREQUIRED |
- PR_WANTRCVD | PR_CAPATTACH,
+ .pr_flags = PR_CONNREQUIRED | PR_CAPATTACH | PR_SOCKBUF,
.pr_ctloutput = &uipc_ctloutput,
.pr_abort = uipc_abort,
.pr_accept = uipc_peeraddr,
@@ -3429,12 +4368,13 @@ static struct protosw seqpacketproto = {
.pr_disconnect = uipc_disconnect,
.pr_listen = uipc_listen,
.pr_peeraddr = uipc_peeraddr,
- .pr_rcvd = uipc_rcvd,
- .pr_send = uipc_send,
.pr_sense = uipc_sense,
.pr_shutdown = uipc_shutdown,
.pr_sockaddr = uipc_sockaddr,
- .pr_soreceive = soreceive_generic, /* XXX: or...? */
+ .pr_sosend = uipc_sosend_stream_or_seqpacket,
+ .pr_soreceive = uipc_soreceive_stream_or_seqpacket,
+ .pr_sopoll = uipc_sopoll_stream_or_seqpacket,
+ .pr_kqfilter = uipc_kqfilter_stream_or_seqpacket,
.pr_close = uipc_close,
.pr_chmod = uipc_chmod,
};
diff --git a/sys/sys/sockbuf.h b/sys/sys/sockbuf.h
index 73dd7afa371f..dbf5c1ff956d 100644
--- a/sys/sys/sockbuf.h
+++ b/sys/sys/sockbuf.h
@@ -133,6 +133,18 @@ struct sockbuf {
struct ktls_session *sb_tls_info;
};
/*
+ * PF_UNIX/SOCK_STREAM and PF_UNIX/SOCK_SEQPACKET
+ * A simple stream buffer with not ready data pointer.
+ */
+ struct {
+ STAILQ_HEAD(, mbuf) uxst_mbq;
+ struct mbuf *uxst_fnrdy;
+ struct socket *uxst_peer;
+ u_int uxst_flags;
+#define UXST_PEER_AIO 0x1
+#define UXST_PEER_SEL 0x2
+ };
+ /*
* PF_UNIX/SOCK_DGRAM
*
* Local protocol, thus we should buffer on the receive side