aboutsummaryrefslogtreecommitdiff
path: root/module/zfs/dmu_recv.c
diff options
context:
space:
mode:
Diffstat (limited to 'module/zfs/dmu_recv.c')
-rw-r--r--module/zfs/dmu_recv.c557
1 files changed, 482 insertions, 75 deletions
diff --git a/module/zfs/dmu_recv.c b/module/zfs/dmu_recv.c
index 5ac862519166..680aed4513bc 100644
--- a/module/zfs/dmu_recv.c
+++ b/module/zfs/dmu_recv.c
@@ -6,7 +6,7 @@
* You may not use this file except in compliance with the License.
*
* You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
- * or http://www.opensolaris.org/os/licensing.
+ * or https://opensource.org/licenses/CDDL-1.0.
* See the License for the specific language governing permissions
* and limitations under the License.
*
@@ -27,8 +27,12 @@
* Copyright (c) 2018, loli10K <ezomori.nozomu@gmail.com>. All rights reserved.
* Copyright (c) 2019, Klara Inc.
* Copyright (c) 2019, Allan Jude
+ * Copyright (c) 2019 Datto Inc.
+ * Copyright (c) 2022 Axcient.
*/
+#include <sys/arc.h>
+#include <sys/spa_impl.h>
#include <sys/dmu.h>
#include <sys/dmu_impl.h>
#include <sys/dmu_send.h>
@@ -64,13 +68,20 @@
#endif
#include <sys/zfs_file.h>
-static int zfs_recv_queue_length = SPA_MAXBLOCKSIZE;
-static int zfs_recv_queue_ff = 20;
-static int zfs_recv_write_batch_size = 1024 * 1024;
+static uint_t zfs_recv_queue_length = SPA_MAXBLOCKSIZE;
+static uint_t zfs_recv_queue_ff = 20;
+static uint_t zfs_recv_write_batch_size = 1024 * 1024;
+static int zfs_recv_best_effort_corrective = 0;
static const void *const dmu_recv_tag = "dmu_recv_tag";
const char *const recv_clone_name = "%recv";
+typedef enum {
+ ORNS_NO,
+ ORNS_YES,
+ ORNS_MAYBE
+} or_need_sync_t;
+
static int receive_read_payload_and_next_header(dmu_recv_cookie_t *ra, int len,
void *buf);
@@ -102,6 +113,8 @@ struct receive_writer_arg {
boolean_t done;
int err;
+ const char *tofs;
+ boolean_t heal;
boolean_t resumable;
boolean_t raw; /* DMU_BACKUP_FEATURE_RAW set */
boolean_t spill; /* DRR_FLAG_SPILL_BLOCK set */
@@ -121,6 +134,10 @@ struct receive_writer_arg {
uint8_t or_iv[ZIO_DATA_IV_LEN];
uint8_t or_mac[ZIO_DATA_MAC_LEN];
boolean_t or_byteorder;
+ zio_t *heal_pio;
+
+ /* Keep track of DRR_FREEOBJECTS right after DRR_OBJECT_RANGE */
+ or_need_sync_t or_need_sync;
};
typedef struct dmu_recv_begin_arg {
@@ -343,9 +360,10 @@ static int
recv_begin_check_existing_impl(dmu_recv_begin_arg_t *drba, dsl_dataset_t *ds,
uint64_t fromguid, uint64_t featureflags)
{
- uint64_t val;
+ uint64_t obj;
uint64_t children;
int error;
+ dsl_dataset_t *snap;
dsl_pool_t *dp = ds->ds_dir->dd_pool;
boolean_t encrypted = ds->ds_dir->dd_crypto_obj != 0;
boolean_t raw = (featureflags & DMU_BACKUP_FEATURE_RAW) != 0;
@@ -354,7 +372,7 @@ recv_begin_check_existing_impl(dmu_recv_begin_arg_t *drba, dsl_dataset_t *ds,
/* Temporary clone name must not exist. */
error = zap_lookup(dp->dp_meta_objset,
dsl_dir_phys(ds->ds_dir)->dd_child_dir_zapobj, recv_clone_name,
- 8, 1, &val);
+ 8, 1, &obj);
if (error != ENOENT)
return (error == 0 ? SET_ERROR(EBUSY) : error);
@@ -362,12 +380,16 @@ recv_begin_check_existing_impl(dmu_recv_begin_arg_t *drba, dsl_dataset_t *ds,
if (dsl_dataset_has_resume_receive_state(ds))
return (SET_ERROR(EBUSY));
- /* New snapshot name must not exist. */
+ /* New snapshot name must not exist if we're not healing it. */
error = zap_lookup(dp->dp_meta_objset,
dsl_dataset_phys(ds)->ds_snapnames_zapobj,
- drba->drba_cookie->drc_tosnap, 8, 1, &val);
- if (error != ENOENT)
+ drba->drba_cookie->drc_tosnap, 8, 1, &obj);
+ if (drba->drba_cookie->drc_heal) {
+ if (error != 0)
+ return (error);
+ } else if (error != ENOENT) {
return (error == 0 ? SET_ERROR(EEXIST) : error);
+ }
/* Must not have children if receiving a ZVOL. */
error = zap_count(dp->dp_meta_objset,
@@ -392,8 +414,40 @@ recv_begin_check_existing_impl(dmu_recv_begin_arg_t *drba, dsl_dataset_t *ds,
if (error != 0)
return (error);
- if (fromguid != 0) {
- dsl_dataset_t *snap;
+ if (drba->drba_cookie->drc_heal) {
+ /* Encryption is incompatible with embedded data. */
+ if (encrypted && embed)
+ return (SET_ERROR(EINVAL));
+
+ /* Healing is not supported when in 'force' mode. */
+ if (drba->drba_cookie->drc_force)
+ return (SET_ERROR(EINVAL));
+
+ /* Must have keys loaded if doing encrypted non-raw recv. */
+ if (encrypted && !raw) {
+ if (spa_keystore_lookup_key(dp->dp_spa, ds->ds_object,
+ NULL, NULL) != 0)
+ return (SET_ERROR(EACCES));
+ }
+
+ error = dsl_dataset_hold_obj(dp, obj, FTAG, &snap);
+ if (error != 0)
+ return (error);
+
+ /*
+ * When not doing best effort corrective recv healing can only
+ * be done if the send stream is for the same snapshot as the
+ * one we are trying to heal.
+ */
+ if (zfs_recv_best_effort_corrective == 0 &&
+ drba->drba_cookie->drc_drrb->drr_toguid !=
+ dsl_dataset_phys(snap)->ds_guid) {
+ dsl_dataset_rele(snap, FTAG);
+ return (SET_ERROR(ENOTSUP));
+ }
+ dsl_dataset_rele(snap, FTAG);
+ } else if (fromguid != 0) {
+ /* Sanity check the incremental recv */
uint64_t obj = dsl_dataset_phys(ds)->ds_prev_snap_obj;
/* Can't perform a raw receive on top of a non-raw receive */
@@ -459,7 +513,7 @@ recv_begin_check_existing_impl(dmu_recv_begin_arg_t *drba, dsl_dataset_t *ds,
dsl_dataset_rele(snap, FTAG);
} else {
- /* if full, then must be forced */
+ /* If full and not healing then must be forced. */
if (!drba->drba_cookie->drc_force)
return (SET_ERROR(EEXIST));
@@ -602,7 +656,7 @@ dmu_recv_begin_check(void *arg, dmu_tx_t *tx)
* so add the DS_HOLD_FLAG_DECRYPT flag only if we are dealing
* with a dataset we may encrypt.
*/
- if (drba->drba_dcp != NULL &&
+ if (drba->drba_dcp == NULL ||
drba->drba_dcp->cp_crypt != ZIO_CRYPT_OFF) {
dsflags |= DS_HOLD_FLAG_DECRYPT;
}
@@ -626,6 +680,10 @@ dmu_recv_begin_check(void *arg, dmu_tx_t *tx)
char buf[ZFS_MAX_DATASET_NAME_LEN];
objset_t *os;
+ /* healing recv must be done "into" an existing snapshot */
+ if (drba->drba_cookie->drc_heal == B_TRUE)
+ return (SET_ERROR(ENOTSUP));
+
/*
* If it's a non-clone incremental, we are missing the
* target fs, so fail the recv.
@@ -807,7 +865,7 @@ dmu_recv_begin_sync(void *arg, dmu_tx_t *tx)
error = dsl_dataset_hold_flags(dp, tofs, dsflags, FTAG, &ds);
if (error == 0) {
- /* create temporary clone */
+ /* Create temporary clone unless we're doing corrective recv */
dsl_dataset_t *snap = NULL;
if (drba->drba_cookie->drc_fromsnapobj != 0) {
@@ -815,8 +873,15 @@ dmu_recv_begin_sync(void *arg, dmu_tx_t *tx)
drba->drba_cookie->drc_fromsnapobj, FTAG, &snap));
ASSERT3P(dcp, ==, NULL);
}
- dsobj = dsl_dataset_create_sync(ds->ds_dir, recv_clone_name,
- snap, crflags, drba->drba_cred, dcp, tx);
+ if (drc->drc_heal) {
+ /* When healing we want to use the provided snapshot */
+ VERIFY0(dsl_dataset_snap_lookup(ds, drc->drc_tosnap,
+ &dsobj));
+ } else {
+ dsobj = dsl_dataset_create_sync(ds->ds_dir,
+ recv_clone_name, snap, crflags, drba->drba_cred,
+ dcp, tx);
+ }
if (drba->drba_cookie->drc_fromsnapobj != 0)
dsl_dataset_rele(snap, FTAG);
dsl_dataset_rele_flags(ds, dsflags, FTAG);
@@ -933,7 +998,8 @@ dmu_recv_begin_sync(void *arg, dmu_tx_t *tx)
*/
rrw_enter(&newds->ds_bp_rwlock, RW_READER, FTAG);
if (BP_IS_HOLE(dsl_dataset_get_blkptr(newds)) &&
- (featureflags & DMU_BACKUP_FEATURE_RAW) == 0) {
+ (featureflags & DMU_BACKUP_FEATURE_RAW) == 0 &&
+ !drc->drc_heal) {
(void) dmu_objset_create_impl(dp->dp_spa,
newds, dsl_dataset_get_blkptr(newds), drrb->drr_type, tx);
}
@@ -989,13 +1055,24 @@ dmu_recv_resume_begin_check(void *arg, dmu_tx_t *tx)
dsflags |= DS_HOLD_FLAG_DECRYPT;
}
+ boolean_t recvexist = B_TRUE;
if (dsl_dataset_hold_flags(dp, recvname, dsflags, FTAG, &ds) != 0) {
/* %recv does not exist; continue in tofs */
+ recvexist = B_FALSE;
error = dsl_dataset_hold_flags(dp, tofs, dsflags, FTAG, &ds);
if (error != 0)
return (error);
}
+ /*
+ * Resume of full/newfs recv on existing dataset should be done with
+ * force flag
+ */
+ if (recvexist && drrb->drr_fromguid == 0 && !drc->drc_force) {
+ dsl_dataset_rele_flags(ds, dsflags, FTAG);
+ return (SET_ERROR(ZFS_ERR_RESUME_EXISTS));
+ }
+
/* check that ds is marked inconsistent */
if (!DS_IS_INCONSISTENT(ds)) {
dsl_dataset_rele_flags(ds, dsflags, FTAG);
@@ -1140,13 +1217,14 @@ dmu_recv_resume_begin_sync(void *arg, dmu_tx_t *tx)
* succeeds; otherwise we will leak the holds on the datasets.
*/
int
-dmu_recv_begin(char *tofs, char *tosnap, dmu_replay_record_t *drr_begin,
- boolean_t force, boolean_t resumable, nvlist_t *localprops,
- nvlist_t *hidden_args, char *origin, dmu_recv_cookie_t *drc,
- zfs_file_t *fp, offset_t *voffp)
+dmu_recv_begin(const char *tofs, const char *tosnap,
+ dmu_replay_record_t *drr_begin, boolean_t force, boolean_t heal,
+ boolean_t resumable, nvlist_t *localprops, nvlist_t *hidden_args,
+ const char *origin, dmu_recv_cookie_t *drc, zfs_file_t *fp,
+ offset_t *voffp)
{
dmu_recv_begin_arg_t drba = { 0 };
- int err;
+ int err = 0;
memset(drc, 0, sizeof (dmu_recv_cookie_t));
drc->drc_drr_begin = drr_begin;
@@ -1154,6 +1232,7 @@ dmu_recv_begin(char *tofs, char *tosnap, dmu_replay_record_t *drr_begin,
drc->drc_tosnap = tosnap;
drc->drc_tofs = tofs;
drc->drc_force = force;
+ drc->drc_heal = heal;
drc->drc_resumable = resumable;
drc->drc_cred = CRED();
drc->drc_proc = curproc;
@@ -1177,20 +1256,36 @@ dmu_recv_begin(char *tofs, char *tosnap, dmu_replay_record_t *drr_begin,
DMU_GET_FEATUREFLAGS(drc->drc_drrb->drr_versioninfo);
uint32_t payloadlen = drc->drc_drr_begin->drr_payloadlen;
- void *payload = NULL;
- if (payloadlen != 0)
- payload = kmem_alloc(payloadlen, KM_SLEEP);
- err = receive_read_payload_and_next_header(drc, payloadlen,
- payload);
- if (err != 0) {
- kmem_free(payload, payloadlen);
- return (err);
- }
+ /*
+ * Since OpenZFS 2.0.0, we have enforced a 64MB limit in userspace
+ * configurable via ZFS_SENDRECV_MAX_NVLIST. We enforce 256MB as a hard
+ * upper limit. Systems with less than 1GB of RAM will see a lower
+ * limit from `arc_all_memory() / 4`.
+ */
+ if (payloadlen > (MIN((1U << 28), arc_all_memory() / 4)))
+ return (E2BIG);
+
+
if (payloadlen != 0) {
+ void *payload = vmem_alloc(payloadlen, KM_SLEEP);
+ /*
+ * For compatibility with recursive send streams, we don't do
+ * this here if the stream could be part of a package. Instead,
+ * we'll do it in dmu_recv_stream. If we pull the next header
+ * too early, and it's the END record, we break the `recv_skip`
+ * logic.
+ */
+
+ err = receive_read_payload_and_next_header(drc, payloadlen,
+ payload);
+ if (err != 0) {
+ vmem_free(payload, payloadlen);
+ return (err);
+ }
err = nvlist_unpack(payload, payloadlen, &drc->drc_begin_nvl,
KM_SLEEP);
- kmem_free(payload, payloadlen);
+ vmem_free(payload, payloadlen);
if (err != 0) {
kmem_free(drc->drc_next_rrd,
sizeof (*drc->drc_next_rrd));
@@ -1243,6 +1338,186 @@ dmu_recv_begin(char *tofs, char *tosnap, dmu_replay_record_t *drr_begin,
return (err);
}
+/*
+ * Holds data need for corrective recv callback
+ */
+typedef struct cr_cb_data {
+ uint64_t size;
+ zbookmark_phys_t zb;
+ spa_t *spa;
+} cr_cb_data_t;
+
+static void
+corrective_read_done(zio_t *zio)
+{
+ cr_cb_data_t *data = zio->io_private;
+ /* Corruption corrected; update error log if needed */
+ if (zio->io_error == 0) {
+ spa_remove_error(data->spa, &data->zb,
+ BP_GET_LOGICAL_BIRTH(zio->io_bp));
+ }
+ kmem_free(data, sizeof (cr_cb_data_t));
+ abd_free(zio->io_abd);
+}
+
+/*
+ * zio_rewrite the data pointed to by bp with the data from the rrd's abd.
+ */
+static int
+do_corrective_recv(struct receive_writer_arg *rwa, struct drr_write *drrw,
+ struct receive_record_arg *rrd, blkptr_t *bp)
+{
+ int err;
+ zio_t *io;
+ zbookmark_phys_t zb;
+ dnode_t *dn;
+ abd_t *abd = rrd->abd;
+ zio_cksum_t bp_cksum = bp->blk_cksum;
+ zio_flag_t flags = ZIO_FLAG_SPECULATIVE | ZIO_FLAG_DONT_RETRY |
+ ZIO_FLAG_CANFAIL;
+
+ if (rwa->raw)
+ flags |= ZIO_FLAG_RAW;
+
+ err = dnode_hold(rwa->os, drrw->drr_object, FTAG, &dn);
+ if (err != 0)
+ return (err);
+ SET_BOOKMARK(&zb, dmu_objset_id(rwa->os), drrw->drr_object, 0,
+ dbuf_whichblock(dn, 0, drrw->drr_offset));
+ dnode_rele(dn, FTAG);
+
+ if (!rwa->raw && DRR_WRITE_COMPRESSED(drrw)) {
+ /* Decompress the stream data */
+ abd_t *dabd = abd_alloc_linear(
+ drrw->drr_logical_size, B_FALSE);
+ err = zio_decompress_data(drrw->drr_compressiontype,
+ abd, abd_to_buf(dabd), abd_get_size(abd),
+ abd_get_size(dabd), NULL);
+
+ if (err != 0) {
+ abd_free(dabd);
+ return (err);
+ }
+ /* Swap in the newly decompressed data into the abd */
+ abd_free(abd);
+ abd = dabd;
+ }
+
+ if (!rwa->raw && BP_GET_COMPRESS(bp) != ZIO_COMPRESS_OFF) {
+ /* Recompress the data */
+ abd_t *cabd = abd_alloc_linear(BP_GET_PSIZE(bp),
+ B_FALSE);
+ void *buf = abd_to_buf(cabd);
+ uint64_t csize = zio_compress_data(BP_GET_COMPRESS(bp),
+ abd, &buf, abd_get_size(abd),
+ rwa->os->os_complevel);
+ abd_zero_off(cabd, csize, BP_GET_PSIZE(bp) - csize);
+ /* Swap in newly compressed data into the abd */
+ abd_free(abd);
+ abd = cabd;
+ flags |= ZIO_FLAG_RAW_COMPRESS;
+ }
+
+ /*
+ * The stream is not encrypted but the data on-disk is.
+ * We need to re-encrypt the buf using the same
+ * encryption type, salt, iv, and mac that was used to encrypt
+ * the block previosly.
+ */
+ if (!rwa->raw && BP_USES_CRYPT(bp)) {
+ dsl_dataset_t *ds;
+ dsl_crypto_key_t *dck = NULL;
+ uint8_t salt[ZIO_DATA_SALT_LEN];
+ uint8_t iv[ZIO_DATA_IV_LEN];
+ uint8_t mac[ZIO_DATA_MAC_LEN];
+ boolean_t no_crypt = B_FALSE;
+ dsl_pool_t *dp = dmu_objset_pool(rwa->os);
+ abd_t *eabd = abd_alloc_linear(BP_GET_PSIZE(bp), B_FALSE);
+
+ zio_crypt_decode_params_bp(bp, salt, iv);
+ zio_crypt_decode_mac_bp(bp, mac);
+
+ dsl_pool_config_enter(dp, FTAG);
+ err = dsl_dataset_hold_flags(dp, rwa->tofs,
+ DS_HOLD_FLAG_DECRYPT, FTAG, &ds);
+ if (err != 0) {
+ dsl_pool_config_exit(dp, FTAG);
+ abd_free(eabd);
+ return (SET_ERROR(EACCES));
+ }
+
+ /* Look up the key from the spa's keystore */
+ err = spa_keystore_lookup_key(rwa->os->os_spa,
+ zb.zb_objset, FTAG, &dck);
+ if (err != 0) {
+ dsl_dataset_rele_flags(ds, DS_HOLD_FLAG_DECRYPT,
+ FTAG);
+ dsl_pool_config_exit(dp, FTAG);
+ abd_free(eabd);
+ return (SET_ERROR(EACCES));
+ }
+
+ err = zio_do_crypt_abd(B_TRUE, &dck->dck_key,
+ BP_GET_TYPE(bp), BP_SHOULD_BYTESWAP(bp), salt, iv,
+ mac, abd_get_size(abd), abd, eabd, &no_crypt);
+
+ spa_keystore_dsl_key_rele(rwa->os->os_spa, dck, FTAG);
+ dsl_dataset_rele_flags(ds, DS_HOLD_FLAG_DECRYPT, FTAG);
+ dsl_pool_config_exit(dp, FTAG);
+
+ ASSERT0(no_crypt);
+ if (err != 0) {
+ abd_free(eabd);
+ return (err);
+ }
+ /* Swap in the newly encrypted data into the abd */
+ abd_free(abd);
+ abd = eabd;
+
+ /*
+ * We want to prevent zio_rewrite() from trying to
+ * encrypt the data again
+ */
+ flags |= ZIO_FLAG_RAW_ENCRYPT;
+ }
+ rrd->abd = abd;
+
+ io = zio_rewrite(NULL, rwa->os->os_spa, BP_GET_LOGICAL_BIRTH(bp), bp,
+ abd, BP_GET_PSIZE(bp), NULL, NULL, ZIO_PRIORITY_SYNC_WRITE, flags,
+ &zb);
+
+ ASSERT(abd_get_size(abd) == BP_GET_LSIZE(bp) ||
+ abd_get_size(abd) == BP_GET_PSIZE(bp));
+
+ /* compute new bp checksum value and make sure it matches the old one */
+ zio_checksum_compute(io, BP_GET_CHECKSUM(bp), abd, abd_get_size(abd));
+ if (!ZIO_CHECKSUM_EQUAL(bp_cksum, io->io_bp->blk_cksum)) {
+ zio_destroy(io);
+ if (zfs_recv_best_effort_corrective != 0)
+ return (0);
+ return (SET_ERROR(ECKSUM));
+ }
+
+ /* Correct the corruption in place */
+ err = zio_wait(io);
+ if (err == 0) {
+ cr_cb_data_t *cb_data =
+ kmem_alloc(sizeof (cr_cb_data_t), KM_SLEEP);
+ cb_data->spa = rwa->os->os_spa;
+ cb_data->size = drrw->drr_logical_size;
+ cb_data->zb = zb;
+ /* Test if healing worked by re-reading the bp */
+ err = zio_wait(zio_read(rwa->heal_pio, rwa->os->os_spa, bp,
+ abd_alloc_for_io(drrw->drr_logical_size, B_FALSE),
+ drrw->drr_logical_size, corrective_read_done,
+ cb_data, ZIO_PRIORITY_ASYNC_READ, flags, NULL));
+ }
+ if (err != 0 && zfs_recv_best_effort_corrective != 0)
+ err = 0;
+
+ return (err);
+}
+
static int
receive_read(dmu_recv_cookie_t *drc, int len, void *buf)
{
@@ -1256,11 +1531,11 @@ receive_read(dmu_recv_cookie_t *drc, int len, void *buf)
(drc->drc_featureflags & DMU_BACKUP_FEATURE_RAW) != 0);
while (done < len) {
- ssize_t resid;
+ ssize_t resid = len - done;
zfs_file_t *fp = drc->drc_fp;
int err = zfs_file_read(fp, (char *)buf + done,
len - done, &resid);
- if (resid == len - done) {
+ if (err == 0 && resid == len - done) {
/*
* Note: ECKSUM or ZFS_ERR_STREAM_TRUNCATED indicates
* that the receive was interrupted and can
@@ -1523,17 +1798,19 @@ receive_handle_existing_object(const struct receive_writer_arg *rwa,
}
/*
- * The dmu does not currently support decreasing nlevels
- * or changing the number of dnode slots on an object. For
- * non-raw sends, this does not matter and the new object
- * can just use the previous one's nlevels. For raw sends,
- * however, the structure of the received dnode (including
- * nlevels and dnode slots) must match that of the send
- * side. Therefore, instead of using dmu_object_reclaim(),
- * we must free the object completely and call
- * dmu_object_claim_dnsize() instead.
+ * The dmu does not currently support decreasing nlevels or changing
+ * indirect block size if there is already one, same as changing the
+ * number of of dnode slots on an object. For non-raw sends this
+ * does not matter and the new object can just use the previous one's
+ * parameters. For raw sends, however, the structure of the received
+ * dnode (including indirects and dnode slots) must match that of the
+ * send side. Therefore, instead of using dmu_object_reclaim(), we
+ * must free the object completely and call dmu_object_claim_dnsize()
+ * instead.
*/
- if ((rwa->raw && drro->drr_nlevels < doi->doi_indirection) ||
+ if ((rwa->raw && ((doi->doi_indirection > 1 &&
+ indblksz != doi->doi_metadata_block_size) ||
+ drro->drr_nlevels < doi->doi_indirection)) ||
dn_slots != doi->doi_dnodesize >> DNODE_SHIFT) {
err = dmu_free_long_object(rwa->os, drro->drr_object);
if (err != 0)
@@ -1641,6 +1918,8 @@ receive_object(struct receive_writer_arg *rwa, struct drr_object *drro,
if (err == 0) {
err = receive_handle_existing_object(rwa, drro, &doi, data,
&object_to_hold, &new_blksz);
+ if (err != 0)
+ return (err);
} else if (err == EEXIST) {
/*
* The object requested is currently an interior slot of a
@@ -1657,10 +1936,22 @@ receive_object(struct receive_writer_arg *rwa, struct drr_object *drro,
/* object was freed and we are about to allocate a new one */
object_to_hold = DMU_NEW_OBJECT;
} else {
+ /*
+ * If the only record in this range so far was DRR_FREEOBJECTS
+ * with at least one actually freed object, it's possible that
+ * the block will now be converted to a hole. We need to wait
+ * for the txg to sync to prevent races.
+ */
+ if (rwa->or_need_sync == ORNS_YES)
+ txg_wait_synced(dmu_objset_pool(rwa->os), 0);
+
/* object is free and we are about to allocate a new one */
object_to_hold = DMU_NEW_OBJECT;
}
+ /* Only relevant for the first object in the range */
+ rwa->or_need_sync = ORNS_NO;
+
/*
* If this is a multi-slot dnode there is a chance that this
* object will expand into a slot that is already used by
@@ -1822,6 +2113,16 @@ receive_object(struct receive_writer_arg *rwa, struct drr_object *drro,
dmu_buf_rele(db, FTAG);
dnode_rele(dn, FTAG);
}
+
+ /*
+ * If the receive fails, we want the resume stream to start with the
+ * same record that we last successfully received. There is no way to
+ * request resume from the object record, but we can benefit from the
+ * fact that sender always sends object record before anything else,
+ * after which it will "resend" data at offset 0 and resume normally.
+ */
+ save_resume_state(rwa, drro->drr_object, 0, tx);
+
dmu_tx_commit(tx);
return (0);
@@ -1854,6 +2155,9 @@ receive_freeobjects(struct receive_writer_arg *rwa,
if (err != 0)
return (err);
+
+ if (rwa->or_need_sync == ORNS_MAYBE)
+ rwa->or_need_sync = ORNS_YES;
}
if (next_err != ESRCH)
return (next_err);
@@ -1937,10 +2241,10 @@ flush_write_batch_impl(struct receive_writer_arg *rwa)
if (err == 0)
abd_free(abd);
} else {
- zio_prop_t zp;
+ zio_prop_t zp = {0};
dmu_write_policy(rwa->os, dn, 0, 0, &zp);
- enum zio_flag zio_flags = 0;
+ zio_flag_t zio_flags = 0;
if (rwa->raw) {
zp.zp_encrypt = B_TRUE;
@@ -2049,6 +2353,53 @@ receive_process_write_record(struct receive_writer_arg *rwa,
!DMU_OT_IS_VALID(drrw->drr_type))
return (SET_ERROR(EINVAL));
+ if (rwa->heal) {
+ blkptr_t *bp;
+ dmu_buf_t *dbp;
+ int flags = DB_RF_CANFAIL;
+
+ if (rwa->raw)
+ flags |= DB_RF_NO_DECRYPT;
+
+ if (rwa->byteswap) {
+ dmu_object_byteswap_t byteswap =
+ DMU_OT_BYTESWAP(drrw->drr_type);
+ dmu_ot_byteswap[byteswap].ob_func(abd_to_buf(rrd->abd),
+ DRR_WRITE_PAYLOAD_SIZE(drrw));
+ }
+
+ err = dmu_buf_hold_noread(rwa->os, drrw->drr_object,
+ drrw->drr_offset, FTAG, &dbp);
+ if (err != 0)
+ return (err);
+
+ /* Try to read the object to see if it needs healing */
+ err = dbuf_read((dmu_buf_impl_t *)dbp, NULL, flags);
+ /*
+ * We only try to heal when dbuf_read() returns a ECKSUMs.
+ * Other errors (even EIO) get returned to caller.
+ * EIO indicates that the device is not present/accessible,
+ * so writing to it will likely fail.
+ * If the block is healthy, we don't want to overwrite it
+ * unnecessarily.
+ */
+ if (err != ECKSUM) {
+ dmu_buf_rele(dbp, FTAG);
+ return (err);
+ }
+ /* Make sure the on-disk block and recv record sizes match */
+ if (drrw->drr_logical_size != dbp->db_size) {
+ err = ENOTSUP;
+ dmu_buf_rele(dbp, FTAG);
+ return (err);
+ }
+ /* Get the block pointer for the corrupted block */
+ bp = dmu_buf_get_blkptr(dbp);
+ err = do_corrective_recv(rwa, drrw, rrd, bp);
+ dmu_buf_rele(dbp, FTAG);
+ return (err);
+ }
+
/*
* For resuming to work, records must be in increasing order
* by (object, offset).
@@ -2189,7 +2540,7 @@ receive_spill(struct receive_writer_arg *rwa, struct drr_spill *drrs,
* size of the provided arc_buf_t.
*/
if (db_spill->db_size != drrs->drr_length) {
- dmu_buf_will_fill(db_spill, tx);
+ dmu_buf_will_fill(db_spill, tx, B_FALSE);
VERIFY0(dbuf_spill_set_blksz(db_spill,
drrs->drr_length, tx));
}
@@ -2295,6 +2646,8 @@ receive_object_range(struct receive_writer_arg *rwa,
memcpy(rwa->or_mac, drror->drr_mac, ZIO_DATA_MAC_LEN);
rwa->or_byteorder = byteorder;
+ rwa->or_need_sync = ORNS_MAYBE;
+
return (0);
}
@@ -2341,7 +2694,8 @@ dmu_recv_cleanup_ds(dmu_recv_cookie_t *drc)
rrw_exit(&ds->ds_bp_rwlock, FTAG);
dsl_dataset_name(ds, name);
dsl_dataset_disown(ds, dsflags, dmu_recv_tag);
- (void) dsl_destroy_head(name);
+ if (!drc->drc_heal)
+ (void) dsl_destroy_head(name);
}
}
@@ -2702,7 +3056,19 @@ receive_process_record(struct receive_writer_arg *rwa,
ASSERT3U(rrd->bytes_read, >=, rwa->bytes_read);
rwa->bytes_read = rrd->bytes_read;
- if (rrd->header.drr_type != DRR_WRITE) {
+ /* We can only heal write records; other ones get ignored */
+ if (rwa->heal && rrd->header.drr_type != DRR_WRITE) {
+ if (rrd->abd != NULL) {
+ abd_free(rrd->abd);
+ rrd->abd = NULL;
+ } else if (rrd->payload != NULL) {
+ kmem_free(rrd->payload, rrd->payload_size);
+ rrd->payload = NULL;
+ }
+ return (0);
+ }
+
+ if (!rwa->heal && rrd->header.drr_type != DRR_WRITE) {
err = flush_write_batch(rwa);
if (err != 0) {
if (rrd->abd != NULL) {
@@ -2737,9 +3103,16 @@ receive_process_record(struct receive_writer_arg *rwa,
case DRR_WRITE:
{
err = receive_process_write_record(rwa, rrd);
- if (err != EAGAIN) {
+ if (rwa->heal) {
/*
- * On success, receive_process_write_record() returns
+ * If healing - always free the abd after processing
+ */
+ abd_free(rrd->abd);
+ rrd->abd = NULL;
+ } else if (err != EAGAIN) {
+ /*
+ * On success, a non-healing
+ * receive_process_write_record() returns
* EAGAIN to indicate that we do not want to free
* the rrd or arc_buf.
*/
@@ -2830,8 +3203,9 @@ receive_writer_thread(void *arg)
* EAGAIN indicates that this record has been saved (on
* raw->write_batch), and will be used again, so we don't
* free it.
+ * When healing data we always need to free the record.
*/
- if (err != EAGAIN) {
+ if (err != EAGAIN || rwa->heal) {
if (rwa->err == 0)
rwa->err = err;
kmem_free(rrd, sizeof (*rrd));
@@ -2839,10 +3213,13 @@ receive_writer_thread(void *arg)
}
kmem_free(rrd, sizeof (*rrd));
- int err = flush_write_batch(rwa);
- if (rwa->err == 0)
- rwa->err = err;
-
+ if (rwa->heal) {
+ zio_wait(rwa->heal_pio);
+ } else {
+ int err = flush_write_batch(rwa);
+ if (rwa->err == 0)
+ rwa->err = err;
+ }
mutex_enter(&rwa->mutex);
rwa->done = B_TRUE;
cv_signal(&rwa->cv);
@@ -2926,17 +3303,19 @@ dmu_recv_stream(dmu_recv_cookie_t *drc, offset_t *voffp)
if (err != 0)
goto out;
- /*
- * If this is a new dataset we set the key immediately.
- * Otherwise we don't want to change the key until we
- * are sure the rest of the receive succeeded so we stash
- * the keynvl away until then.
- */
- err = dsl_crypto_recv_raw(spa_name(drc->drc_os->os_spa),
- drc->drc_ds->ds_object, drc->drc_fromsnapobj,
- drc->drc_drrb->drr_type, keynvl, drc->drc_newfs);
- if (err != 0)
- goto out;
+ if (!drc->drc_heal) {
+ /*
+ * If this is a new dataset we set the key immediately.
+ * Otherwise we don't want to change the key until we
+ * are sure the rest of the receive succeeded so we
+ * stash the keynvl away until then.
+ */
+ err = dsl_crypto_recv_raw(spa_name(drc->drc_os->os_spa),
+ drc->drc_ds->ds_object, drc->drc_fromsnapobj,
+ drc->drc_drrb->drr_type, keynvl, drc->drc_newfs);
+ if (err != 0)
+ goto out;
+ }
/* see comment in dmu_recv_end_sync() */
drc->drc_ivset_guid = 0;
@@ -2954,6 +3333,17 @@ dmu_recv_stream(dmu_recv_cookie_t *drc, offset_t *voffp)
}
/*
+ * For compatibility with recursive send streams, we do this here,
+ * rather than in dmu_recv_begin. If we pull the next header too
+ * early, and it's the END record, we break the `recv_skip` logic.
+ */
+ if (drc->drc_drr_begin->drr_payloadlen == 0) {
+ err = receive_read_payload_and_next_header(drc, 0, NULL);
+ if (err != 0)
+ goto out;
+ }
+
+ /*
* If we failed before this point we will clean up any new resume
* state that was created. Now that we've gotten past the initial
* checks we are ok to retain that resume state.
@@ -2967,11 +3357,17 @@ dmu_recv_stream(dmu_recv_cookie_t *drc, offset_t *voffp)
mutex_init(&rwa->mutex, NULL, MUTEX_DEFAULT, NULL);
rwa->os = drc->drc_os;
rwa->byteswap = drc->drc_byteswap;
+ rwa->heal = drc->drc_heal;
+ rwa->tofs = drc->drc_tofs;
rwa->resumable = drc->drc_resumable;
rwa->raw = drc->drc_raw;
rwa->spill = drc->drc_spill;
rwa->full = (drc->drc_drr_begin->drr_u.drr_begin.drr_fromguid == 0);
rwa->os->os_raw_receive = drc->drc_raw;
+ if (drc->drc_heal) {
+ rwa->heal_pio = zio_root(drc->drc_os->os_spa, NULL, NULL,
+ ZIO_FLAG_GODFATHER);
+ }
list_create(&rwa->write_batch, sizeof (struct receive_record_arg),
offsetof(struct receive_record_arg, node.bqn_node));
@@ -3107,7 +3503,9 @@ dmu_recv_end_check(void *arg, dmu_tx_t *tx)
ASSERT3P(drc->drc_ds->ds_owner, ==, dmu_recv_tag);
- if (!drc->drc_newfs) {
+ if (drc->drc_heal) {
+ error = 0;
+ } else if (!drc->drc_newfs) {
dsl_dataset_t *origin_head;
error = dsl_dataset_hold(dp, drc->drc_tofs, FTAG, &origin_head);
@@ -3183,13 +3581,18 @@ dmu_recv_end_sync(void *arg, dmu_tx_t *tx)
dmu_recv_cookie_t *drc = arg;
dsl_pool_t *dp = dmu_tx_pool(tx);
boolean_t encrypted = drc->drc_ds->ds_dir->dd_crypto_obj != 0;
- uint64_t newsnapobj;
+ uint64_t newsnapobj = 0;
spa_history_log_internal_ds(drc->drc_ds, "finish receiving",
tx, "snap=%s", drc->drc_tosnap);
drc->drc_ds->ds_objset->os_raw_receive = B_FALSE;
- if (!drc->drc_newfs) {
+ if (drc->drc_heal) {
+ if (drc->drc_keynvl != NULL) {
+ nvlist_free(drc->drc_keynvl);
+ drc->drc_keynvl = NULL;
+ }
+ } else if (!drc->drc_newfs) {
dsl_dataset_t *origin_head;
VERIFY0(dsl_dataset_hold(dp, drc->drc_tofs, FTAG,
@@ -3303,7 +3706,7 @@ dmu_recv_end_sync(void *arg, dmu_tx_t *tx)
* tunable is set, in which case we will leave the newly-generated
* value.
*/
- if (drc->drc_raw && drc->drc_ivset_guid != 0) {
+ if (!drc->drc_heal && drc->drc_raw && drc->drc_ivset_guid != 0) {
dmu_object_zapify(dp->dp_meta_objset, newsnapobj,
DMU_OT_DSL_DATASET, tx);
VERIFY0(zap_update(dp->dp_meta_objset, newsnapobj,
@@ -3370,7 +3773,7 @@ dmu_recv_end(dmu_recv_cookie_t *drc, void *owner)
if (error != 0) {
dmu_recv_cleanup_ds(drc);
nvlist_free(drc->drc_keynvl);
- } else {
+ } else if (!drc->drc_heal) {
if (drc->drc_newfs) {
zvol_create_minor(drc->drc_tofs);
}
@@ -3392,11 +3795,15 @@ dmu_objset_is_receiving(objset_t *os)
os->os_dsl_dataset->ds_owner == dmu_recv_tag);
}
-ZFS_MODULE_PARAM(zfs_recv, zfs_recv_, queue_length, INT, ZMOD_RW,
+ZFS_MODULE_PARAM(zfs_recv, zfs_recv_, queue_length, UINT, ZMOD_RW,
"Maximum receive queue length");
-ZFS_MODULE_PARAM(zfs_recv, zfs_recv_, queue_ff, INT, ZMOD_RW,
+ZFS_MODULE_PARAM(zfs_recv, zfs_recv_, queue_ff, UINT, ZMOD_RW,
"Receive queue fill fraction");
-ZFS_MODULE_PARAM(zfs_recv, zfs_recv_, write_batch_size, INT, ZMOD_RW,
+ZFS_MODULE_PARAM(zfs_recv, zfs_recv_, write_batch_size, UINT, ZMOD_RW,
"Maximum amount of writes to batch into one transaction");
+
+ZFS_MODULE_PARAM(zfs_recv, zfs_recv_, best_effort_corrective, INT, ZMOD_RW,
+ "Ignore errors during corrective receive");
+/* END CSTYLED */