aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Somers <asomers@FreeBSD.org>2019-04-17 23:32:38 +0000
committerAlan Somers <asomers@FreeBSD.org>2019-04-17 23:32:38 +0000
commit723c7768299a077a93f19b26e8c27b4641751e34 (patch)
tree80dcd987fc01a7125c4b4598cd80a78dab4bc398
parentf067b60946900c5f95e1514bb20dbc0eddd23337 (diff)
downloadsrc-723c7768299a077a93f19b26e8c27b4641751e34.tar.gz
src-723c7768299a077a93f19b26e8c27b4641751e34.zip
fusefs: WIP making FUSE operations interruptible
The fuse protocol includes a FUSE_INTERRUPT operation that the client can send to the server to indicate that it wants to abort an in-progress operation. It's required to interrupt any syscall that is blocking on a fuse operation. This commit adds basic FUSE_INTERRUPT support. If a process receives any signal while it's blocking on a FUSE operation, it will send a FUSE_INTERRUPT and wait for the original operation to complete. But there is still much to do: * The current code will leak memory if the server ignores FUSE_INTERRUPT, which many do. It will also leak memory if the server completes the original operation before it receives the FUSE_INTERRUPT. * An interrupted read(2) will incorrectly appear to be successful. * fusefs should return immediately for fatal signals. * Operations that haven't been sent to the server yet should be aborted without sending FUSE_INTERRUPT. * Test coverage should be better. * It would be great if write operations could be made restartable. That would require delaying uiomove until the last possible moment, which would be sometime during fuse_device_read. PR: 236530 Sponsored by: The FreeBSD Foundation
Notes
Notes: svn path=/projects/fuse2/; revision=346339
-rw-r--r--sys/fs/fuse/fuse_device.c13
-rw-r--r--sys/fs/fuse/fuse_io.c19
-rw-r--r--sys/fs/fuse/fuse_ipc.c148
-rw-r--r--sys/fs/fuse/fuse_ipc.h13
-rw-r--r--tests/sys/fs/fusefs/interrupt.cc56
-rw-r--r--tests/sys/fs/fusefs/mockfs.cc3
6 files changed, 199 insertions, 53 deletions
diff --git a/sys/fs/fuse/fuse_device.c b/sys/fs/fuse/fuse_device.c
index 23271d4c2c1a..a9aa0d7e64c9 100644
--- a/sys/fs/fuse/fuse_device.c
+++ b/sys/fs/fuse/fuse_device.c
@@ -346,8 +346,8 @@ fuse_ohead_audit(struct fuse_out_header *ohead, struct uio *uio)
return (0);
}
-SDT_PROBE_DEFINE1(fuse, , device, fuse_device_write_bumped_into_callback,
- "uint64_t");
+SDT_PROBE_DEFINE1(fuse, , device, fuse_device_write_missing_ticket, "uint64_t");
+SDT_PROBE_DEFINE1(fuse, , device, fuse_device_write_found, "struct fuse_ticket*");
/*
* fuse_device_write first reads the header sent by the daemon.
* If that's OK, looks up ticket/callback node by the unique id seen in header.
@@ -393,10 +393,9 @@ fuse_device_write(struct cdev *dev, struct uio *uio, int ioflag)
fuse_lck_mtx_lock(data->aw_mtx);
TAILQ_FOREACH_SAFE(tick, &data->aw_head, tk_aw_link,
x_tick) {
- SDT_PROBE1(fuse, , device,
- fuse_device_write_bumped_into_callback,
- tick->tk_unique);
if (tick->tk_unique == ohead.unique) {
+ SDT_PROBE1(fuse, , device, fuse_device_write_found,
+ tick);
found = 1;
fuse_aw_remove(tick);
break;
@@ -432,8 +431,8 @@ fuse_device_write(struct cdev *dev, struct uio *uio, int ioflag)
fuse_ticket_drop(tick);
} else {
/* no callback at all! */
- SDT_PROBE2(fuse, , device, trace, 1,
- "erhm, no handler for this response");
+ SDT_PROBE1(fuse, , device, fuse_device_write_missing_ticket,
+ ohead.unique);
err = EINVAL;
}
diff --git a/sys/fs/fuse/fuse_io.c b/sys/fs/fuse/fuse_io.c
index df6d8d71e53d..258e1221c8cd 100644
--- a/sys/fs/fuse/fuse_io.c
+++ b/sys/fs/fuse/fuse_io.c
@@ -379,8 +379,25 @@ fuse_write_directbackend(struct vnode *vp, struct uio *uio,
break;
retry:
- if ((err = fdisp_wait_answ(&fdi)))
+ err = fdisp_wait_answ(&fdi);
+ if (err == ERESTART || err == EINTR || err == EWOULDBLOCK) {
+ /*
+ * Rewind the uio so dofilewrite will know it's
+ * incomplete
+ */
+ uio->uio_resid += fwi->size;
+ uio->uio_offset -= fwi->size;
+ /*
+ * Change ERESTART into EINTR because we can't rewind
+ * uio->uio_iov. Basically, once uiomove(9) has been
+ * called, it's impossible to restart a syscall.
+ */
+ if (err == ERESTART)
+ err = EINTR;
break;
+ } else if (err) {
+ break;
+ }
fwo = ((struct fuse_write_out *)fdi.answ);
diff --git a/sys/fs/fuse/fuse_ipc.c b/sys/fs/fuse/fuse_ipc.c
index 4dde1132df9d..a19d38f99dea 100644
--- a/sys/fs/fuse/fuse_ipc.c
+++ b/sys/fs/fuse/fuse_ipc.c
@@ -92,7 +92,10 @@ SDT_PROVIDER_DECLARE(fuse);
*/
SDT_PROBE_DEFINE2(fuse, , ipc, trace, "int", "char*");
+static void fdisp_make_pid(struct fuse_dispatcher *fdip, enum fuse_opcode op,
+ struct fuse_data *data, uint64_t nid, pid_t pid, struct ucred *cred);
static void fiov_clear(struct fuse_iov *fiov);
+static void fuse_interrupt_send(struct fuse_ticket *otick);
static struct fuse_ticket *fticket_alloc(struct fuse_data *data);
static void fticket_refresh(struct fuse_ticket *ftick);
static void fticket_destroy(struct fuse_ticket *ftick);
@@ -126,25 +129,76 @@ SYSCTL_INT(_vfs_fusefs, OID_AUTO, iov_credit, CTLFLAG_RW,
MALLOC_DEFINE(M_FUSEMSG, "fuse_msgbuf", "fuse message buffer");
static uma_zone_t ticket_zone;
-static void
-fuse_block_sigs(sigset_t *oldset)
-{
- sigset_t newset;
-
- SIGFILLSET(newset);
- SIGDELSET(newset, SIGKILL);
- if (kern_sigprocmask(curthread, SIG_BLOCK, &newset, oldset, 0))
- panic("%s: Invalid operation for kern_sigprocmask()",
- __func__);
+/*
+ * TODO: figure out how to timeout INTERRUPT requests, because the daemon may
+ * leagally never respond
+ *
+ * TODO: remove an INTERRUPT request if the daemon responds to the original
+ */
+static int
+fuse_interrupt_callback(struct fuse_ticket *tick, struct uio *uio)
+{
+ if (tick->tk_aw_ohead.error == EAGAIN) {
+ /*
+ * There are two reasons we might get this:
+ * 1) the daemon received the INTERRUPT request before the
+ * original, or
+ * 2) the daemon received the INTERRUPT request after it
+ * completed the original request.
+ * In the first case we should re-send the INTERRUPT. In the
+ * second, we should ignore it.
+ */
+ struct fuse_interrupt_in *fii;
+ struct fuse_data *data;
+ struct fuse_ticket *otick, *x_tick;
+ bool found = false;
+
+ data = tick->tk_data;
+ fii = (struct fuse_interrupt_in*)((char*)tick->tk_ms_fiov.base +
+ sizeof(struct fuse_in_header));
+ fuse_lck_mtx_lock(data->aw_mtx);
+ TAILQ_FOREACH_SAFE(otick, &data->aw_head, tk_aw_link, x_tick) {
+ if (otick->tk_unique == fii->unique) {
+ found = true;
+ break;
+ }
+ }
+ fuse_lck_mtx_unlock(data->aw_mtx);
+ if (found) {
+ /* Resend */
+ fuse_interrupt_send(otick);
+ } else {
+ /* Original is already complete; nothing to do */
+ }
+ return 0;
+ } else {
+ /* Illegal FUSE_INTERRUPT response */
+ return EINVAL;
+ }
}
-static void
-fuse_restore_sigs(sigset_t *oldset)
+void
+fuse_interrupt_send(struct fuse_ticket *otick)
{
+ struct fuse_dispatcher fdi;
+ struct fuse_interrupt_in *fii;
+ struct fuse_in_header *ftick_hdr;
+ struct fuse_data *data = otick->tk_data;
+ struct ucred reused_creds;
- if (kern_sigprocmask(curthread, SIG_SETMASK, oldset, NULL, 0))
- panic("%s: Invalid operation for kern_sigprocmask()",
- __func__);
+ ftick_hdr = fticket_in_header(otick);
+ reused_creds.cr_uid = ftick_hdr->uid;
+ reused_creds.cr_rgid = ftick_hdr->gid;
+ fdisp_init(&fdi, sizeof(*fii));
+ fdisp_make_pid(&fdi, FUSE_INTERRUPT, data, ftick_hdr->nodeid,
+ ftick_hdr->pid, &reused_creds);
+
+ fii = fdi.indata;
+ fii->unique = otick->tk_unique;
+ fuse_insert_callback(fdi.tick, fuse_interrupt_callback);
+
+ fuse_insert_message(fdi.tick);
+ fdisp_destroy(&fdi);
}
void
@@ -329,12 +383,16 @@ fticket_reset(struct fuse_ticket *ftick)
static int
fticket_wait_answer(struct fuse_ticket *ftick)
{
- sigset_t tset;
+ struct thread *td = curthread;
+ sigset_t blockedset, oldset;
int err = 0;
struct fuse_data *data;
fuse_lck_mtx_lock(ftick->tk_aw_mtx);
+ SIGEMPTYSET(blockedset);
+ kern_sigprocmask(td, SIG_BLOCK, &blockedset, &oldset, 0);
+retry:
if (fticket_answered(ftick)) {
goto out;
}
@@ -345,11 +403,12 @@ fticket_wait_answer(struct fuse_ticket *ftick)
fticket_set_answered(ftick);
goto out;
}
- fuse_block_sigs(&tset);
err = msleep(ftick, &ftick->tk_aw_mtx, PCATCH, "fu_ans",
data->daemon_timeout * hz);
- fuse_restore_sigs(&tset);
- if (err == EAGAIN) { /* same as EWOULDBLOCK */
+ kern_sigprocmask(td, SIG_SETMASK, &oldset, NULL, 0);
+ if (err == EWOULDBLOCK) {
+ SDT_PROBE2(fuse, , ipc, trace, 3,
+ "fticket_wait_answer: EWOULDBLOCK");
#ifdef XXXIP /* die conditionally */
if (!fdata_get_dead(data)) {
fdata_set_dead(data);
@@ -357,6 +416,45 @@ fticket_wait_answer(struct fuse_ticket *ftick)
#endif
err = ETIMEDOUT;
fticket_set_answered(ftick);
+ } else if ((err == EINTR || err == ERESTART)) {
+ /*
+ * Whether we get EINTR or ERESTART depends on whether
+ * SA_RESTART was set by sigaction(2).
+ *
+ * Try to interrupt the operation and wait for an EINTR response
+ * to the original operation. If the file system does not
+ * support FUSE_INTERRUPT, then we'll just wait for it to
+ * complete like normal. If it does support FUSE_INTERRUPT,
+ * then it will either respond EINTR to the original operation,
+ * or EAGAIN to the interrupt.
+ */
+ int sig;
+
+ SDT_PROBE2(fuse, , ipc, trace, 4,
+ "fticket_wait_answer: interrupt");
+ fuse_lck_mtx_unlock(ftick->tk_aw_mtx);
+ fuse_interrupt_send(ftick);
+ fuse_lck_mtx_lock(ftick->tk_aw_mtx);
+
+ /* TODO: return, rather than retry, for fatal signals */
+
+ /*
+ * Block the just-delivered signal while we wait for an
+ * interrupt response
+ */
+ PROC_LOCK(td->td_proc);
+ mtx_lock(&td->td_proc->p_sigacts->ps_mtx);
+ sig = cursig(td);
+ mtx_unlock(&td->td_proc->p_sigacts->ps_mtx);
+ PROC_UNLOCK(td->td_proc);
+ SIGADDSET(blockedset, sig);
+ kern_sigprocmask(curthread, SIG_BLOCK, &blockedset, NULL, 0);
+ goto retry;
+ } else if (err) {
+ SDT_PROBE2(fuse, , ipc, trace, 6,
+ "fticket_wait_answer: other error");
+ } else {
+ SDT_PROBE2(fuse, , ipc, trace, 7, "fticket_wait_answer: OK");
}
out:
if (!(err || fticket_answered(ftick))) {
@@ -762,10 +860,8 @@ fdisp_refresh_pid(struct fuse_dispatcher *fdip, enum fuse_opcode op,
/* Initialize a dispatcher from a pid and node id */
static void
fdisp_make_pid(struct fuse_dispatcher *fdip, enum fuse_opcode op,
- struct mount *mp, uint64_t nid, pid_t pid, struct ucred *cred)
+ struct fuse_data *data, uint64_t nid, pid_t pid, struct ucred *cred)
{
- struct fuse_data *data = fuse_get_mpdata(mp);
-
if (fdip->tick) {
fticket_refresh(fdip->tick);
} else {
@@ -783,17 +879,21 @@ void
fdisp_make(struct fuse_dispatcher *fdip, enum fuse_opcode op, struct mount *mp,
uint64_t nid, struct thread *td, struct ucred *cred)
{
+ struct fuse_data *data = fuse_get_mpdata(mp);
RECTIFY_TDCR(td, cred);
- return fdisp_make_pid(fdip, op, mp, nid, td->td_proc->p_pid, cred);
+ return fdisp_make_pid(fdip, op, data, nid, td->td_proc->p_pid, cred);
}
void
fdisp_make_vp(struct fuse_dispatcher *fdip, enum fuse_opcode op,
struct vnode *vp, struct thread *td, struct ucred *cred)
{
+ struct mount *mp = vnode_mount(vp);
+ struct fuse_data *data = fuse_get_mpdata(mp);
+
RECTIFY_TDCR(td, cred);
- return fdisp_make_pid(fdip, op, vnode_mount(vp), VTOI(vp),
+ return fdisp_make_pid(fdip, op, data, VTOI(vp),
td->td_proc->p_pid, cred);
}
diff --git a/sys/fs/fuse/fuse_ipc.h b/sys/fs/fuse/fuse_ipc.h
index 863234acf549..3a2f9fb0c1e1 100644
--- a/sys/fs/fuse/fuse_ipc.h
+++ b/sys/fs/fuse/fuse_ipc.h
@@ -147,10 +147,16 @@ fticket_set_answered(struct fuse_ticket *ftick)
ftick->tk_flag |= FT_ANSW;
}
+static inline struct fuse_in_header*
+fticket_in_header(struct fuse_ticket *ftick)
+{
+ return (struct fuse_in_header *)(ftick->tk_ms_fiov.base);
+}
+
static inline enum fuse_opcode
fticket_opcode(struct fuse_ticket *ftick)
{
- return (((struct fuse_in_header *)(ftick->tk_ms_fiov.base))->opcode);
+ return fticket_in_header(ftick)->opcode;
}
int fticket_pull(struct fuse_ticket *ftick, struct uio *uio);
@@ -174,6 +180,11 @@ struct fuse_data {
struct mtx aw_mtx;
TAILQ_HEAD(, fuse_ticket) aw_head;
+ /*
+ * Holds the next value of the FUSE operation unique value.
+ * Also, serves as a wakeup channel to prevent any operations from
+ * being created before INIT completes.
+ */
u_long ticketer;
struct sx rename_lock;
diff --git a/tests/sys/fs/fusefs/interrupt.cc b/tests/sys/fs/fusefs/interrupt.cc
index 86135162587c..30ce446af69c 100644
--- a/tests/sys/fs/fusefs/interrupt.cc
+++ b/tests/sys/fs/fusefs/interrupt.cc
@@ -41,8 +41,10 @@ using namespace testing;
/* Don't do anything; all we care about is that the syscall gets interrupted */
void sigusr2_handler(int __unused sig) {
- if (verbosity > 1)
- printf("Signaled!\n");
+ if (verbosity > 1) {
+ printf("Signaled! thread %p\n", pthread_self());
+ }
+
}
void* killer(void* target) {
@@ -52,8 +54,8 @@ void* killer(void* target) {
*/
usleep(250'000);
if (verbosity > 1)
- printf("Signalling!\n");
- pthread_kill(*(pthread_t*)target, SIGUSR2);
+ printf("Signalling! thread %p\n", target);
+ pthread_kill((pthread_t)target, SIGUSR2);
return(NULL);
}
@@ -94,9 +96,14 @@ void setup_interruptor(pthread_t self)
}
void TearDown() {
+ struct sigaction sa;
+
if (m_child != NULL) {
pthread_join(m_child, NULL);
}
+ bzero(&sa, sizeof(sa));
+ sa.sa_handler = SIG_DFL;
+ sigaction(SIGUSR2, &sa, NULL);
FuseTest::TearDown();
}
@@ -107,7 +114,7 @@ void TearDown() {
* complete should generate an EAGAIN response.
*/
/* https://bugs.freebsd.org/bugzilla/show_bug.cgi?id=236530 */
-TEST_F(Interrupt, DISABLED_already_complete)
+TEST_F(Interrupt, already_complete)
{
const char FULLPATH[] = "mountpoint/some_file.txt";
const char RELPATH[] = "some_file.txt";
@@ -122,7 +129,6 @@ TEST_F(Interrupt, DISABLED_already_complete)
expect_lookup(RELPATH, ino);
expect_open(ino, 0, 1);
- expect_getattr(ino, 0);
expect_write(ino, &write_unique);
EXPECT_CALL(*m_mock, process(
ResultOf([&](auto in) {
@@ -150,7 +156,7 @@ TEST_F(Interrupt, DISABLED_already_complete)
ASSERT_LE(0, fd) << strerror(errno);
setup_interruptor(self);
- ASSERT_EQ(bufsize, write(fd, CONTENTS, bufsize)) << strerror(errno);
+ EXPECT_EQ(bufsize, write(fd, CONTENTS, bufsize)) << strerror(errno);
/* Deliberately leak fd. close(2) will be tested in release.cc */
}
@@ -160,7 +166,7 @@ TEST_F(Interrupt, DISABLED_already_complete)
* complete the original operation whenever it damn well pleases.
*/
/* https://bugs.freebsd.org/bugzilla/show_bug.cgi?id=236530 */
-TEST_F(Interrupt, DISABLED_ignore)
+TEST_F(Interrupt, ignore)
{
const char FULLPATH[] = "mountpoint/some_file.txt";
const char RELPATH[] = "some_file.txt";
@@ -175,10 +181,9 @@ TEST_F(Interrupt, DISABLED_ignore)
expect_lookup(RELPATH, ino);
expect_open(ino, 0, 1);
- expect_getattr(ino, 0);
expect_write(ino, &write_unique);
EXPECT_CALL(*m_mock, process(
- ResultOf([=](auto in) {
+ ResultOf([&](auto in) {
return (in->header.opcode == FUSE_INTERRUPT &&
in->body.interrupt.unique == write_unique);
}, Eq(true)),
@@ -208,7 +213,7 @@ TEST_F(Interrupt, DISABLED_ignore)
* return EINTR to userspace
*/
/* https://bugs.freebsd.org/bugzilla/show_bug.cgi?id=236530 */
-TEST_F(Interrupt, DISABLED_in_progress)
+TEST_F(Interrupt, in_progress)
{
const char FULLPATH[] = "mountpoint/some_file.txt";
const char RELPATH[] = "some_file.txt";
@@ -223,10 +228,9 @@ TEST_F(Interrupt, DISABLED_in_progress)
expect_lookup(RELPATH, ino);
expect_open(ino, 0, 1);
- expect_getattr(ino, 0);
expect_write(ino, &write_unique);
EXPECT_CALL(*m_mock, process(
- ResultOf([=](auto in) {
+ ResultOf([&](auto in) {
return (in->header.opcode == FUSE_INTERRUPT &&
in->body.interrupt.unique == write_unique);
}, Eq(true)),
@@ -260,11 +264,12 @@ TEST_F(Interrupt, DISABLED_in_progress)
* successfully interrupts the original
*/
/* https://bugs.freebsd.org/bugzilla/show_bug.cgi?id=236530 */
-TEST_F(Interrupt, DISABLED_too_soon)
+TEST_F(Interrupt, too_soon)
{
const char FULLPATH[] = "mountpoint/some_file.txt";
const char RELPATH[] = "some_file.txt";
const char *CONTENTS = "abcdefgh";
+ Sequence seq;
uint64_t ino = 42;
int fd;
ssize_t bufsize = strlen(CONTENTS);
@@ -275,25 +280,25 @@ TEST_F(Interrupt, DISABLED_too_soon)
expect_lookup(RELPATH, ino);
expect_open(ino, 0, 1);
- expect_getattr(ino, 0);
expect_write(ino, &write_unique);
EXPECT_CALL(*m_mock, process(
- ResultOf([=](auto in) {
+ ResultOf([&](auto in) {
return (in->header.opcode == FUSE_INTERRUPT &&
in->body.interrupt.unique == write_unique);
}, Eq(true)),
_)
- ).WillOnce(Invoke(ReturnErrno(EAGAIN)))
- .RetiresOnSaturation();
+ ).InSequence(seq)
+ .WillOnce(Invoke(ReturnErrno(EAGAIN)));
EXPECT_CALL(*m_mock, process(
- ResultOf([=](auto in) {
+ ResultOf([&](auto in) {
return (in->header.opcode == FUSE_INTERRUPT &&
in->body.interrupt.unique == write_unique);
}, Eq(true)),
_)
- ).WillOnce(Invoke([&](auto in __unused, auto &out __unused) {
+ ).InSequence(seq)
+ .WillOnce(Invoke([&](auto in __unused, auto &out __unused) {
auto out0 = new mockfs_buf_out;
out0->header.error = -EINTR;
out0->header.unique = write_unique;
@@ -310,3 +315,14 @@ TEST_F(Interrupt, DISABLED_too_soon)
/* Deliberately leak fd. close(2) will be tested in release.cc */
}
+
+
+// TODO: add a test that uses siginterrupt and an interruptible signal
+// TODO: add a test that verifies a process can be cleanly killed even if a
+// FUSE_WRITE command never returns.
+// TODO: write in-progress tests for read and other operations
+// TODO: add a test where write returns EWOULDBLOCK
+// TODO: test that if a fatal signal is received, fticket_wait_answer will
+// return without waiting for a response to the interrupted operation.
+// TODO: test that operations that haven't been received by the server can be
+// interrupted without generating a FUSE_INTERRUPT.
diff --git a/tests/sys/fs/fusefs/mockfs.cc b/tests/sys/fs/fusefs/mockfs.cc
index ecac3e1d5c78..c5c6e30366f2 100644
--- a/tests/sys/fs/fusefs/mockfs.cc
+++ b/tests/sys/fs/fusefs/mockfs.cc
@@ -184,6 +184,9 @@ void debug_fuseop(const mockfs_buf_in *in)
case FUSE_FSYNCDIR:
printf(" flags=%#x", in->body.fsyncdir.fsync_flags);
break;
+ case FUSE_INTERRUPT:
+ printf(" unique=%lu", in->body.interrupt.unique);
+ break;
case FUSE_LOOKUP:
printf(" %s", in->body.lookup);
break;