aboutsummaryrefslogtreecommitdiff
path: root/sys/rpc/clnt_vc.c
diff options
context:
space:
mode:
authorDoug Rabson <dfr@FreeBSD.org>2008-06-26 10:21:54 +0000
committerDoug Rabson <dfr@FreeBSD.org>2008-06-26 10:21:54 +0000
commitc675522fc4323ee02bfabe08fb00086132c32708 (patch)
tree994a214037913bc4e44eaee5070c65aeadf53485 /sys/rpc/clnt_vc.c
parent91bc389e54b5909c6fe96dbc593066ec4a5d9e81 (diff)
downloadsrc-c675522fc4323ee02bfabe08fb00086132c32708.tar.gz
src-c675522fc4323ee02bfabe08fb00086132c32708.zip
Re-implement the client side of rpc.lockd in the kernel. This implementation
provides the correct semantics for flock(2) style locks which are used by the lockf(1) command line tool and the pidfile(3) library. It also implements recovery from server restarts and ensures that dirty cache blocks are written to the server before obtaining locks (allowing multiple clients to use file locking to safely share data). Sponsored by: Isilon Systems PR: 94256 MFC after: 2 weeks
Notes
Notes: svn path=/head/; revision=180025
Diffstat (limited to 'sys/rpc/clnt_vc.c')
-rw-r--r--sys/rpc/clnt_vc.c138
1 files changed, 96 insertions, 42 deletions
diff --git a/sys/rpc/clnt_vc.c b/sys/rpc/clnt_vc.c
index 5731e1e0070c..cb093528c47d 100644
--- a/sys/rpc/clnt_vc.c
+++ b/sys/rpc/clnt_vc.c
@@ -80,8 +80,8 @@ struct cmessage {
struct cmsgcred cmcred;
};
-static enum clnt_stat clnt_vc_call(CLIENT *, rpcproc_t, xdrproc_t, void *,
- xdrproc_t, void *, struct timeval);
+static enum clnt_stat clnt_vc_call(CLIENT *, struct rpc_callextra *,
+ rpcproc_t, xdrproc_t, void *, xdrproc_t, void *, struct timeval);
static void clnt_vc_geterr(CLIENT *, struct rpc_err *);
static bool_t clnt_vc_freeres(CLIENT *, xdrproc_t, void *);
static void clnt_vc_abort(CLIENT *);
@@ -100,7 +100,9 @@ static struct clnt_ops clnt_vc_ops = {
};
/*
- * A pending RPC request which awaits a reply.
+ * A pending RPC request which awaits a reply. Requests which have
+ * received their reply will have cr_xid set to zero and cr_mrep to
+ * the mbuf chain of the reply.
*/
struct ct_request {
TAILQ_ENTRY(ct_request) cr_link;
@@ -113,6 +115,8 @@ TAILQ_HEAD(ct_request_list, ct_request);
struct ct_data {
struct mtx ct_lock;
+ int ct_threads; /* number of threads in clnt_vc_call */
+ bool_t ct_closing; /* TRUE if we are destroying client */
struct socket *ct_socket; /* connection socket */
bool_t ct_closeit; /* close it on destroy */
struct timeval ct_wait; /* wait interval in milliseconds */
@@ -161,7 +165,7 @@ clnt_vc_create(
static uint32_t disrupt;
struct __rpc_sockinfo si;
XDR xdrs;
- int error;
+ int error, interrupted;
if (disrupt == 0)
disrupt = (uint32_t)(long)raddr;
@@ -170,10 +174,31 @@ clnt_vc_create(
ct = (struct ct_data *)mem_alloc(sizeof (*ct));
mtx_init(&ct->ct_lock, "ct->ct_lock", NULL, MTX_DEF);
+ ct->ct_threads = 0;
+ ct->ct_closing = FALSE;
if ((so->so_state & (SS_ISCONNECTED|SS_ISCONFIRMING)) == 0) {
error = soconnect(so, raddr, curthread);
+ SOCK_LOCK(so);
+ interrupted = 0;
+ while ((so->so_state & SS_ISCONNECTING)
+ && so->so_error == 0) {
+ error = msleep(&so->so_timeo, SOCK_MTX(so),
+ PSOCK | PCATCH, "connec", 0);
+ if (error) {
+ if (error == EINTR || error == ERESTART)
+ interrupted = 1;
+ break;
+ }
+ }
+ if (error == 0) {
+ error = so->so_error;
+ so->so_error = 0;
+ }
+ SOCK_UNLOCK(so);
if (error) {
+ if (!interrupted)
+ so->so_state &= ~SS_ISCONNECTING;
rpc_createerr.cf_stat = RPC_SYSTEMERROR;
rpc_createerr.cf_error.re_errno = error;
goto err;
@@ -224,6 +249,7 @@ clnt_vc_create(
* Create a client handle which uses xdrrec for serialization
* and authnone for authentication.
*/
+ cl->cl_refs = 1;
cl->cl_ops = &clnt_vc_ops;
cl->cl_private = ct;
cl->cl_auth = authnone_create();
@@ -255,6 +281,7 @@ err:
static enum clnt_stat
clnt_vc_call(
CLIENT *cl,
+ struct rpc_callextra *ext,
rpcproc_t proc,
xdrproc_t xdr_args,
void *args_ptr,
@@ -263,6 +290,7 @@ clnt_vc_call(
struct timeval utimeout)
{
struct ct_data *ct = (struct ct_data *) cl->cl_private;
+ AUTH *auth;
XDR xdrs;
struct rpc_msg reply_msg;
bool_t ok;
@@ -270,13 +298,27 @@ clnt_vc_call(
struct timeval timeout;
uint32_t xid;
struct mbuf *mreq = NULL;
- struct ct_request cr;
+ struct ct_request *cr;
int error;
+ cr = malloc(sizeof(struct ct_request), M_RPC, M_WAITOK);
+
mtx_lock(&ct->ct_lock);
- cr.cr_mrep = NULL;
- cr.cr_error = 0;
+ if (ct->ct_closing) {
+ mtx_unlock(&ct->ct_lock);
+ free(cr, M_RPC);
+ return (RPC_CANTSEND);
+ }
+ ct->ct_threads++;
+
+ if (ext)
+ auth = ext->rc_auth;
+ else
+ auth = cl->cl_auth;
+
+ cr->cr_mrep = NULL;
+ cr->cr_error = 0;
if (ct->ct_wait.tv_usec == -1) {
timeout = utimeout; /* use supplied timeout */
@@ -311,12 +353,12 @@ call_again:
ct->ct_error.re_status = RPC_SUCCESS;
if ((! XDR_PUTINT32(&xdrs, &proc)) ||
- (! AUTH_MARSHALL(cl->cl_auth, &xdrs)) ||
+ (! AUTH_MARSHALL(auth, &xdrs)) ||
(! (*xdr_args)(&xdrs, args_ptr))) {
if (ct->ct_error.re_status == RPC_SUCCESS)
ct->ct_error.re_status = RPC_CANTENCODEARGS;
- m_freem(mreq);
- return (ct->ct_error.re_status);
+ mtx_lock(&ct->ct_lock);
+ goto out;
}
m_fixhdr(mreq);
@@ -327,9 +369,9 @@ call_again:
*mtod(mreq, uint32_t *) =
htonl(0x80000000 | (mreq->m_pkthdr.len - sizeof(uint32_t)));
- cr.cr_xid = xid;
+ cr->cr_xid = xid;
mtx_lock(&ct->ct_lock);
- TAILQ_INSERT_TAIL(&ct->ct_pending, &cr, cr_link);
+ TAILQ_INSERT_TAIL(&ct->ct_pending, cr, cr_link);
mtx_unlock(&ct->ct_lock);
/*
@@ -343,10 +385,8 @@ call_again:
reply_msg.acpted_rply.ar_results.proc = xdr_results;
mtx_lock(&ct->ct_lock);
-
if (error) {
- TAILQ_REMOVE(&ct->ct_pending, &cr, cr_link);
-
+ TAILQ_REMOVE(&ct->ct_pending, cr, cr_link);
ct->ct_error.re_errno = error;
ct->ct_error.re_status = RPC_CANTSEND;
goto out;
@@ -357,12 +397,14 @@ call_again:
* lock. In both these cases, the request has been removed
* from ct->ct_pending.
*/
- if (cr.cr_error) {
- ct->ct_error.re_errno = cr.cr_error;
+ if (cr->cr_error) {
+ TAILQ_REMOVE(&ct->ct_pending, cr, cr_link);
+ ct->ct_error.re_errno = cr->cr_error;
ct->ct_error.re_status = RPC_CANTRECV;
goto out;
}
- if (cr.cr_mrep) {
+ if (cr->cr_mrep) {
+ TAILQ_REMOVE(&ct->ct_pending, cr, cr_link);
goto got_reply;
}
@@ -370,23 +412,22 @@ call_again:
* Hack to provide rpc-based message passing
*/
if (timeout.tv_sec == 0 && timeout.tv_usec == 0) {
- if (cr.cr_xid)
- TAILQ_REMOVE(&ct->ct_pending, &cr, cr_link);
+ TAILQ_REMOVE(&ct->ct_pending, cr, cr_link);
ct->ct_error.re_status = RPC_TIMEDOUT;
goto out;
}
- error = msleep(&cr, &ct->ct_lock, ct->ct_waitflag, ct->ct_waitchan,
+ error = msleep(cr, &ct->ct_lock, ct->ct_waitflag, ct->ct_waitchan,
tvtohz(&timeout));
+ TAILQ_REMOVE(&ct->ct_pending, cr, cr_link);
+
if (error) {
/*
* The sleep returned an error so our request is still
* on the list. Turn the error code into an
* appropriate client status.
*/
- if (cr.cr_xid)
- TAILQ_REMOVE(&ct->ct_pending, &cr, cr_link);
ct->ct_error.re_errno = error;
switch (error) {
case EINTR:
@@ -405,8 +446,8 @@ call_again:
* upcall had a receive error, report that,
* otherwise we have a reply.
*/
- if (cr.cr_error) {
- ct->ct_error.re_errno = cr.cr_error;
+ if (cr->cr_error) {
+ ct->ct_error.re_errno = cr->cr_error;
ct->ct_error.re_status = RPC_CANTRECV;
goto out;
}
@@ -419,10 +460,10 @@ got_reply:
*/
mtx_unlock(&ct->ct_lock);
- xdrmbuf_create(&xdrs, cr.cr_mrep, XDR_DECODE);
+ xdrmbuf_create(&xdrs, cr->cr_mrep, XDR_DECODE);
ok = xdr_replymsg(&xdrs, &reply_msg);
XDR_DESTROY(&xdrs);
- cr.cr_mrep = NULL;
+ cr->cr_mrep = NULL;
mtx_lock(&ct->ct_lock);
@@ -466,10 +507,17 @@ out:
if (mreq)
m_freem(mreq);
- if (cr.cr_mrep)
- m_freem(cr.cr_mrep);
+ if (cr->cr_mrep)
+ m_freem(cr->cr_mrep);
+ ct->ct_threads--;
+ if (ct->ct_closing)
+ wakeup(ct);
+
mtx_unlock(&ct->ct_lock);
+
+ free(cr, M_RPC);
+
return (ct->ct_error.re_status);
}
@@ -628,6 +676,7 @@ static void
clnt_vc_destroy(CLIENT *cl)
{
struct ct_data *ct = (struct ct_data *) cl->cl_private;
+ struct ct_request *cr;
struct socket *so = NULL;
mtx_lock(&ct->ct_lock);
@@ -639,8 +688,19 @@ clnt_vc_destroy(CLIENT *cl)
ct->ct_socket->so_rcv.sb_flags &= ~SB_UPCALL;
SOCKBUF_UNLOCK(&ct->ct_socket->so_rcv);
- KASSERT(!TAILQ_FIRST(&ct->ct_pending),
- ("Destroying RPC client with pending RPC requests"));
+ /*
+ * Abort any pending requests and wait until everyone
+ * has finished with clnt_vc_call.
+ */
+ ct->ct_closing = TRUE;
+ TAILQ_FOREACH(cr, &ct->ct_pending, cr_link) {
+ cr->cr_xid = 0;
+ cr->cr_error = ESHUTDOWN;
+ wakeup(cr);
+ }
+
+ while (ct->ct_threads)
+ msleep(ct, &ct->ct_lock, 0, "rpcclose", 0);
if (ct->ct_closeit) {
so = ct->ct_socket;
@@ -732,7 +792,6 @@ clnt_vc_soupcall(struct socket *so, void *arg, int waitflag)
cr->cr_error = error;
wakeup(cr);
}
- TAILQ_INIT(&ct->ct_pending);
mtx_unlock(&ct->ct_lock);
break;
}
@@ -795,19 +854,14 @@ clnt_vc_soupcall(struct socket *so, void *arg, int waitflag)
if (cr->cr_xid == xid) {
/*
* This one
- * matches. We snip it
- * out of the pending
- * list and leave the
- * reply mbuf in
+ * matches. We leave
+ * the reply mbuf in
* cr->cr_mrep. Set
* the XID to zero so
- * that clnt_vc_call
- * can know not to
- * repeat the
- * TAILQ_REMOVE.
+ * that we will ignore
+ * any duplicaed
+ * replies.
*/
- TAILQ_REMOVE(&ct->ct_pending,
- cr, cr_link);
cr->cr_xid = 0;
cr->cr_mrep = ct->ct_record;
cr->cr_error = 0;