diff options
Diffstat (limited to 'module/zfs/dmu_recv.c')
-rw-r--r-- | module/zfs/dmu_recv.c | 557 |
1 files changed, 482 insertions, 75 deletions
diff --git a/module/zfs/dmu_recv.c b/module/zfs/dmu_recv.c index 5ac862519166..680aed4513bc 100644 --- a/module/zfs/dmu_recv.c +++ b/module/zfs/dmu_recv.c @@ -6,7 +6,7 @@ * You may not use this file except in compliance with the License. * * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE - * or http://www.opensolaris.org/os/licensing. + * or https://opensource.org/licenses/CDDL-1.0. * See the License for the specific language governing permissions * and limitations under the License. * @@ -27,8 +27,12 @@ * Copyright (c) 2018, loli10K <ezomori.nozomu@gmail.com>. All rights reserved. * Copyright (c) 2019, Klara Inc. * Copyright (c) 2019, Allan Jude + * Copyright (c) 2019 Datto Inc. + * Copyright (c) 2022 Axcient. */ +#include <sys/arc.h> +#include <sys/spa_impl.h> #include <sys/dmu.h> #include <sys/dmu_impl.h> #include <sys/dmu_send.h> @@ -64,13 +68,20 @@ #endif #include <sys/zfs_file.h> -static int zfs_recv_queue_length = SPA_MAXBLOCKSIZE; -static int zfs_recv_queue_ff = 20; -static int zfs_recv_write_batch_size = 1024 * 1024; +static uint_t zfs_recv_queue_length = SPA_MAXBLOCKSIZE; +static uint_t zfs_recv_queue_ff = 20; +static uint_t zfs_recv_write_batch_size = 1024 * 1024; +static int zfs_recv_best_effort_corrective = 0; static const void *const dmu_recv_tag = "dmu_recv_tag"; const char *const recv_clone_name = "%recv"; +typedef enum { + ORNS_NO, + ORNS_YES, + ORNS_MAYBE +} or_need_sync_t; + static int receive_read_payload_and_next_header(dmu_recv_cookie_t *ra, int len, void *buf); @@ -102,6 +113,8 @@ struct receive_writer_arg { boolean_t done; int err; + const char *tofs; + boolean_t heal; boolean_t resumable; boolean_t raw; /* DMU_BACKUP_FEATURE_RAW set */ boolean_t spill; /* DRR_FLAG_SPILL_BLOCK set */ @@ -121,6 +134,10 @@ struct receive_writer_arg { uint8_t or_iv[ZIO_DATA_IV_LEN]; uint8_t or_mac[ZIO_DATA_MAC_LEN]; boolean_t or_byteorder; + zio_t *heal_pio; + + /* Keep track of DRR_FREEOBJECTS right after DRR_OBJECT_RANGE */ + or_need_sync_t or_need_sync; }; typedef struct dmu_recv_begin_arg { @@ -343,9 +360,10 @@ static int recv_begin_check_existing_impl(dmu_recv_begin_arg_t *drba, dsl_dataset_t *ds, uint64_t fromguid, uint64_t featureflags) { - uint64_t val; + uint64_t obj; uint64_t children; int error; + dsl_dataset_t *snap; dsl_pool_t *dp = ds->ds_dir->dd_pool; boolean_t encrypted = ds->ds_dir->dd_crypto_obj != 0; boolean_t raw = (featureflags & DMU_BACKUP_FEATURE_RAW) != 0; @@ -354,7 +372,7 @@ recv_begin_check_existing_impl(dmu_recv_begin_arg_t *drba, dsl_dataset_t *ds, /* Temporary clone name must not exist. */ error = zap_lookup(dp->dp_meta_objset, dsl_dir_phys(ds->ds_dir)->dd_child_dir_zapobj, recv_clone_name, - 8, 1, &val); + 8, 1, &obj); if (error != ENOENT) return (error == 0 ? SET_ERROR(EBUSY) : error); @@ -362,12 +380,16 @@ recv_begin_check_existing_impl(dmu_recv_begin_arg_t *drba, dsl_dataset_t *ds, if (dsl_dataset_has_resume_receive_state(ds)) return (SET_ERROR(EBUSY)); - /* New snapshot name must not exist. */ + /* New snapshot name must not exist if we're not healing it. */ error = zap_lookup(dp->dp_meta_objset, dsl_dataset_phys(ds)->ds_snapnames_zapobj, - drba->drba_cookie->drc_tosnap, 8, 1, &val); - if (error != ENOENT) + drba->drba_cookie->drc_tosnap, 8, 1, &obj); + if (drba->drba_cookie->drc_heal) { + if (error != 0) + return (error); + } else if (error != ENOENT) { return (error == 0 ? SET_ERROR(EEXIST) : error); + } /* Must not have children if receiving a ZVOL. */ error = zap_count(dp->dp_meta_objset, @@ -392,8 +414,40 @@ recv_begin_check_existing_impl(dmu_recv_begin_arg_t *drba, dsl_dataset_t *ds, if (error != 0) return (error); - if (fromguid != 0) { - dsl_dataset_t *snap; + if (drba->drba_cookie->drc_heal) { + /* Encryption is incompatible with embedded data. */ + if (encrypted && embed) + return (SET_ERROR(EINVAL)); + + /* Healing is not supported when in 'force' mode. */ + if (drba->drba_cookie->drc_force) + return (SET_ERROR(EINVAL)); + + /* Must have keys loaded if doing encrypted non-raw recv. */ + if (encrypted && !raw) { + if (spa_keystore_lookup_key(dp->dp_spa, ds->ds_object, + NULL, NULL) != 0) + return (SET_ERROR(EACCES)); + } + + error = dsl_dataset_hold_obj(dp, obj, FTAG, &snap); + if (error != 0) + return (error); + + /* + * When not doing best effort corrective recv healing can only + * be done if the send stream is for the same snapshot as the + * one we are trying to heal. + */ + if (zfs_recv_best_effort_corrective == 0 && + drba->drba_cookie->drc_drrb->drr_toguid != + dsl_dataset_phys(snap)->ds_guid) { + dsl_dataset_rele(snap, FTAG); + return (SET_ERROR(ENOTSUP)); + } + dsl_dataset_rele(snap, FTAG); + } else if (fromguid != 0) { + /* Sanity check the incremental recv */ uint64_t obj = dsl_dataset_phys(ds)->ds_prev_snap_obj; /* Can't perform a raw receive on top of a non-raw receive */ @@ -459,7 +513,7 @@ recv_begin_check_existing_impl(dmu_recv_begin_arg_t *drba, dsl_dataset_t *ds, dsl_dataset_rele(snap, FTAG); } else { - /* if full, then must be forced */ + /* If full and not healing then must be forced. */ if (!drba->drba_cookie->drc_force) return (SET_ERROR(EEXIST)); @@ -602,7 +656,7 @@ dmu_recv_begin_check(void *arg, dmu_tx_t *tx) * so add the DS_HOLD_FLAG_DECRYPT flag only if we are dealing * with a dataset we may encrypt. */ - if (drba->drba_dcp != NULL && + if (drba->drba_dcp == NULL || drba->drba_dcp->cp_crypt != ZIO_CRYPT_OFF) { dsflags |= DS_HOLD_FLAG_DECRYPT; } @@ -626,6 +680,10 @@ dmu_recv_begin_check(void *arg, dmu_tx_t *tx) char buf[ZFS_MAX_DATASET_NAME_LEN]; objset_t *os; + /* healing recv must be done "into" an existing snapshot */ + if (drba->drba_cookie->drc_heal == B_TRUE) + return (SET_ERROR(ENOTSUP)); + /* * If it's a non-clone incremental, we are missing the * target fs, so fail the recv. @@ -807,7 +865,7 @@ dmu_recv_begin_sync(void *arg, dmu_tx_t *tx) error = dsl_dataset_hold_flags(dp, tofs, dsflags, FTAG, &ds); if (error == 0) { - /* create temporary clone */ + /* Create temporary clone unless we're doing corrective recv */ dsl_dataset_t *snap = NULL; if (drba->drba_cookie->drc_fromsnapobj != 0) { @@ -815,8 +873,15 @@ dmu_recv_begin_sync(void *arg, dmu_tx_t *tx) drba->drba_cookie->drc_fromsnapobj, FTAG, &snap)); ASSERT3P(dcp, ==, NULL); } - dsobj = dsl_dataset_create_sync(ds->ds_dir, recv_clone_name, - snap, crflags, drba->drba_cred, dcp, tx); + if (drc->drc_heal) { + /* When healing we want to use the provided snapshot */ + VERIFY0(dsl_dataset_snap_lookup(ds, drc->drc_tosnap, + &dsobj)); + } else { + dsobj = dsl_dataset_create_sync(ds->ds_dir, + recv_clone_name, snap, crflags, drba->drba_cred, + dcp, tx); + } if (drba->drba_cookie->drc_fromsnapobj != 0) dsl_dataset_rele(snap, FTAG); dsl_dataset_rele_flags(ds, dsflags, FTAG); @@ -933,7 +998,8 @@ dmu_recv_begin_sync(void *arg, dmu_tx_t *tx) */ rrw_enter(&newds->ds_bp_rwlock, RW_READER, FTAG); if (BP_IS_HOLE(dsl_dataset_get_blkptr(newds)) && - (featureflags & DMU_BACKUP_FEATURE_RAW) == 0) { + (featureflags & DMU_BACKUP_FEATURE_RAW) == 0 && + !drc->drc_heal) { (void) dmu_objset_create_impl(dp->dp_spa, newds, dsl_dataset_get_blkptr(newds), drrb->drr_type, tx); } @@ -989,13 +1055,24 @@ dmu_recv_resume_begin_check(void *arg, dmu_tx_t *tx) dsflags |= DS_HOLD_FLAG_DECRYPT; } + boolean_t recvexist = B_TRUE; if (dsl_dataset_hold_flags(dp, recvname, dsflags, FTAG, &ds) != 0) { /* %recv does not exist; continue in tofs */ + recvexist = B_FALSE; error = dsl_dataset_hold_flags(dp, tofs, dsflags, FTAG, &ds); if (error != 0) return (error); } + /* + * Resume of full/newfs recv on existing dataset should be done with + * force flag + */ + if (recvexist && drrb->drr_fromguid == 0 && !drc->drc_force) { + dsl_dataset_rele_flags(ds, dsflags, FTAG); + return (SET_ERROR(ZFS_ERR_RESUME_EXISTS)); + } + /* check that ds is marked inconsistent */ if (!DS_IS_INCONSISTENT(ds)) { dsl_dataset_rele_flags(ds, dsflags, FTAG); @@ -1140,13 +1217,14 @@ dmu_recv_resume_begin_sync(void *arg, dmu_tx_t *tx) * succeeds; otherwise we will leak the holds on the datasets. */ int -dmu_recv_begin(char *tofs, char *tosnap, dmu_replay_record_t *drr_begin, - boolean_t force, boolean_t resumable, nvlist_t *localprops, - nvlist_t *hidden_args, char *origin, dmu_recv_cookie_t *drc, - zfs_file_t *fp, offset_t *voffp) +dmu_recv_begin(const char *tofs, const char *tosnap, + dmu_replay_record_t *drr_begin, boolean_t force, boolean_t heal, + boolean_t resumable, nvlist_t *localprops, nvlist_t *hidden_args, + const char *origin, dmu_recv_cookie_t *drc, zfs_file_t *fp, + offset_t *voffp) { dmu_recv_begin_arg_t drba = { 0 }; - int err; + int err = 0; memset(drc, 0, sizeof (dmu_recv_cookie_t)); drc->drc_drr_begin = drr_begin; @@ -1154,6 +1232,7 @@ dmu_recv_begin(char *tofs, char *tosnap, dmu_replay_record_t *drr_begin, drc->drc_tosnap = tosnap; drc->drc_tofs = tofs; drc->drc_force = force; + drc->drc_heal = heal; drc->drc_resumable = resumable; drc->drc_cred = CRED(); drc->drc_proc = curproc; @@ -1177,20 +1256,36 @@ dmu_recv_begin(char *tofs, char *tosnap, dmu_replay_record_t *drr_begin, DMU_GET_FEATUREFLAGS(drc->drc_drrb->drr_versioninfo); uint32_t payloadlen = drc->drc_drr_begin->drr_payloadlen; - void *payload = NULL; - if (payloadlen != 0) - payload = kmem_alloc(payloadlen, KM_SLEEP); - err = receive_read_payload_and_next_header(drc, payloadlen, - payload); - if (err != 0) { - kmem_free(payload, payloadlen); - return (err); - } + /* + * Since OpenZFS 2.0.0, we have enforced a 64MB limit in userspace + * configurable via ZFS_SENDRECV_MAX_NVLIST. We enforce 256MB as a hard + * upper limit. Systems with less than 1GB of RAM will see a lower + * limit from `arc_all_memory() / 4`. + */ + if (payloadlen > (MIN((1U << 28), arc_all_memory() / 4))) + return (E2BIG); + + if (payloadlen != 0) { + void *payload = vmem_alloc(payloadlen, KM_SLEEP); + /* + * For compatibility with recursive send streams, we don't do + * this here if the stream could be part of a package. Instead, + * we'll do it in dmu_recv_stream. If we pull the next header + * too early, and it's the END record, we break the `recv_skip` + * logic. + */ + + err = receive_read_payload_and_next_header(drc, payloadlen, + payload); + if (err != 0) { + vmem_free(payload, payloadlen); + return (err); + } err = nvlist_unpack(payload, payloadlen, &drc->drc_begin_nvl, KM_SLEEP); - kmem_free(payload, payloadlen); + vmem_free(payload, payloadlen); if (err != 0) { kmem_free(drc->drc_next_rrd, sizeof (*drc->drc_next_rrd)); @@ -1243,6 +1338,186 @@ dmu_recv_begin(char *tofs, char *tosnap, dmu_replay_record_t *drr_begin, return (err); } +/* + * Holds data need for corrective recv callback + */ +typedef struct cr_cb_data { + uint64_t size; + zbookmark_phys_t zb; + spa_t *spa; +} cr_cb_data_t; + +static void +corrective_read_done(zio_t *zio) +{ + cr_cb_data_t *data = zio->io_private; + /* Corruption corrected; update error log if needed */ + if (zio->io_error == 0) { + spa_remove_error(data->spa, &data->zb, + BP_GET_LOGICAL_BIRTH(zio->io_bp)); + } + kmem_free(data, sizeof (cr_cb_data_t)); + abd_free(zio->io_abd); +} + +/* + * zio_rewrite the data pointed to by bp with the data from the rrd's abd. + */ +static int +do_corrective_recv(struct receive_writer_arg *rwa, struct drr_write *drrw, + struct receive_record_arg *rrd, blkptr_t *bp) +{ + int err; + zio_t *io; + zbookmark_phys_t zb; + dnode_t *dn; + abd_t *abd = rrd->abd; + zio_cksum_t bp_cksum = bp->blk_cksum; + zio_flag_t flags = ZIO_FLAG_SPECULATIVE | ZIO_FLAG_DONT_RETRY | + ZIO_FLAG_CANFAIL; + + if (rwa->raw) + flags |= ZIO_FLAG_RAW; + + err = dnode_hold(rwa->os, drrw->drr_object, FTAG, &dn); + if (err != 0) + return (err); + SET_BOOKMARK(&zb, dmu_objset_id(rwa->os), drrw->drr_object, 0, + dbuf_whichblock(dn, 0, drrw->drr_offset)); + dnode_rele(dn, FTAG); + + if (!rwa->raw && DRR_WRITE_COMPRESSED(drrw)) { + /* Decompress the stream data */ + abd_t *dabd = abd_alloc_linear( + drrw->drr_logical_size, B_FALSE); + err = zio_decompress_data(drrw->drr_compressiontype, + abd, abd_to_buf(dabd), abd_get_size(abd), + abd_get_size(dabd), NULL); + + if (err != 0) { + abd_free(dabd); + return (err); + } + /* Swap in the newly decompressed data into the abd */ + abd_free(abd); + abd = dabd; + } + + if (!rwa->raw && BP_GET_COMPRESS(bp) != ZIO_COMPRESS_OFF) { + /* Recompress the data */ + abd_t *cabd = abd_alloc_linear(BP_GET_PSIZE(bp), + B_FALSE); + void *buf = abd_to_buf(cabd); + uint64_t csize = zio_compress_data(BP_GET_COMPRESS(bp), + abd, &buf, abd_get_size(abd), + rwa->os->os_complevel); + abd_zero_off(cabd, csize, BP_GET_PSIZE(bp) - csize); + /* Swap in newly compressed data into the abd */ + abd_free(abd); + abd = cabd; + flags |= ZIO_FLAG_RAW_COMPRESS; + } + + /* + * The stream is not encrypted but the data on-disk is. + * We need to re-encrypt the buf using the same + * encryption type, salt, iv, and mac that was used to encrypt + * the block previosly. + */ + if (!rwa->raw && BP_USES_CRYPT(bp)) { + dsl_dataset_t *ds; + dsl_crypto_key_t *dck = NULL; + uint8_t salt[ZIO_DATA_SALT_LEN]; + uint8_t iv[ZIO_DATA_IV_LEN]; + uint8_t mac[ZIO_DATA_MAC_LEN]; + boolean_t no_crypt = B_FALSE; + dsl_pool_t *dp = dmu_objset_pool(rwa->os); + abd_t *eabd = abd_alloc_linear(BP_GET_PSIZE(bp), B_FALSE); + + zio_crypt_decode_params_bp(bp, salt, iv); + zio_crypt_decode_mac_bp(bp, mac); + + dsl_pool_config_enter(dp, FTAG); + err = dsl_dataset_hold_flags(dp, rwa->tofs, + DS_HOLD_FLAG_DECRYPT, FTAG, &ds); + if (err != 0) { + dsl_pool_config_exit(dp, FTAG); + abd_free(eabd); + return (SET_ERROR(EACCES)); + } + + /* Look up the key from the spa's keystore */ + err = spa_keystore_lookup_key(rwa->os->os_spa, + zb.zb_objset, FTAG, &dck); + if (err != 0) { + dsl_dataset_rele_flags(ds, DS_HOLD_FLAG_DECRYPT, + FTAG); + dsl_pool_config_exit(dp, FTAG); + abd_free(eabd); + return (SET_ERROR(EACCES)); + } + + err = zio_do_crypt_abd(B_TRUE, &dck->dck_key, + BP_GET_TYPE(bp), BP_SHOULD_BYTESWAP(bp), salt, iv, + mac, abd_get_size(abd), abd, eabd, &no_crypt); + + spa_keystore_dsl_key_rele(rwa->os->os_spa, dck, FTAG); + dsl_dataset_rele_flags(ds, DS_HOLD_FLAG_DECRYPT, FTAG); + dsl_pool_config_exit(dp, FTAG); + + ASSERT0(no_crypt); + if (err != 0) { + abd_free(eabd); + return (err); + } + /* Swap in the newly encrypted data into the abd */ + abd_free(abd); + abd = eabd; + + /* + * We want to prevent zio_rewrite() from trying to + * encrypt the data again + */ + flags |= ZIO_FLAG_RAW_ENCRYPT; + } + rrd->abd = abd; + + io = zio_rewrite(NULL, rwa->os->os_spa, BP_GET_LOGICAL_BIRTH(bp), bp, + abd, BP_GET_PSIZE(bp), NULL, NULL, ZIO_PRIORITY_SYNC_WRITE, flags, + &zb); + + ASSERT(abd_get_size(abd) == BP_GET_LSIZE(bp) || + abd_get_size(abd) == BP_GET_PSIZE(bp)); + + /* compute new bp checksum value and make sure it matches the old one */ + zio_checksum_compute(io, BP_GET_CHECKSUM(bp), abd, abd_get_size(abd)); + if (!ZIO_CHECKSUM_EQUAL(bp_cksum, io->io_bp->blk_cksum)) { + zio_destroy(io); + if (zfs_recv_best_effort_corrective != 0) + return (0); + return (SET_ERROR(ECKSUM)); + } + + /* Correct the corruption in place */ + err = zio_wait(io); + if (err == 0) { + cr_cb_data_t *cb_data = + kmem_alloc(sizeof (cr_cb_data_t), KM_SLEEP); + cb_data->spa = rwa->os->os_spa; + cb_data->size = drrw->drr_logical_size; + cb_data->zb = zb; + /* Test if healing worked by re-reading the bp */ + err = zio_wait(zio_read(rwa->heal_pio, rwa->os->os_spa, bp, + abd_alloc_for_io(drrw->drr_logical_size, B_FALSE), + drrw->drr_logical_size, corrective_read_done, + cb_data, ZIO_PRIORITY_ASYNC_READ, flags, NULL)); + } + if (err != 0 && zfs_recv_best_effort_corrective != 0) + err = 0; + + return (err); +} + static int receive_read(dmu_recv_cookie_t *drc, int len, void *buf) { @@ -1256,11 +1531,11 @@ receive_read(dmu_recv_cookie_t *drc, int len, void *buf) (drc->drc_featureflags & DMU_BACKUP_FEATURE_RAW) != 0); while (done < len) { - ssize_t resid; + ssize_t resid = len - done; zfs_file_t *fp = drc->drc_fp; int err = zfs_file_read(fp, (char *)buf + done, len - done, &resid); - if (resid == len - done) { + if (err == 0 && resid == len - done) { /* * Note: ECKSUM or ZFS_ERR_STREAM_TRUNCATED indicates * that the receive was interrupted and can @@ -1523,17 +1798,19 @@ receive_handle_existing_object(const struct receive_writer_arg *rwa, } /* - * The dmu does not currently support decreasing nlevels - * or changing the number of dnode slots on an object. For - * non-raw sends, this does not matter and the new object - * can just use the previous one's nlevels. For raw sends, - * however, the structure of the received dnode (including - * nlevels and dnode slots) must match that of the send - * side. Therefore, instead of using dmu_object_reclaim(), - * we must free the object completely and call - * dmu_object_claim_dnsize() instead. + * The dmu does not currently support decreasing nlevels or changing + * indirect block size if there is already one, same as changing the + * number of of dnode slots on an object. For non-raw sends this + * does not matter and the new object can just use the previous one's + * parameters. For raw sends, however, the structure of the received + * dnode (including indirects and dnode slots) must match that of the + * send side. Therefore, instead of using dmu_object_reclaim(), we + * must free the object completely and call dmu_object_claim_dnsize() + * instead. */ - if ((rwa->raw && drro->drr_nlevels < doi->doi_indirection) || + if ((rwa->raw && ((doi->doi_indirection > 1 && + indblksz != doi->doi_metadata_block_size) || + drro->drr_nlevels < doi->doi_indirection)) || dn_slots != doi->doi_dnodesize >> DNODE_SHIFT) { err = dmu_free_long_object(rwa->os, drro->drr_object); if (err != 0) @@ -1641,6 +1918,8 @@ receive_object(struct receive_writer_arg *rwa, struct drr_object *drro, if (err == 0) { err = receive_handle_existing_object(rwa, drro, &doi, data, &object_to_hold, &new_blksz); + if (err != 0) + return (err); } else if (err == EEXIST) { /* * The object requested is currently an interior slot of a @@ -1657,10 +1936,22 @@ receive_object(struct receive_writer_arg *rwa, struct drr_object *drro, /* object was freed and we are about to allocate a new one */ object_to_hold = DMU_NEW_OBJECT; } else { + /* + * If the only record in this range so far was DRR_FREEOBJECTS + * with at least one actually freed object, it's possible that + * the block will now be converted to a hole. We need to wait + * for the txg to sync to prevent races. + */ + if (rwa->or_need_sync == ORNS_YES) + txg_wait_synced(dmu_objset_pool(rwa->os), 0); + /* object is free and we are about to allocate a new one */ object_to_hold = DMU_NEW_OBJECT; } + /* Only relevant for the first object in the range */ + rwa->or_need_sync = ORNS_NO; + /* * If this is a multi-slot dnode there is a chance that this * object will expand into a slot that is already used by @@ -1822,6 +2113,16 @@ receive_object(struct receive_writer_arg *rwa, struct drr_object *drro, dmu_buf_rele(db, FTAG); dnode_rele(dn, FTAG); } + + /* + * If the receive fails, we want the resume stream to start with the + * same record that we last successfully received. There is no way to + * request resume from the object record, but we can benefit from the + * fact that sender always sends object record before anything else, + * after which it will "resend" data at offset 0 and resume normally. + */ + save_resume_state(rwa, drro->drr_object, 0, tx); + dmu_tx_commit(tx); return (0); @@ -1854,6 +2155,9 @@ receive_freeobjects(struct receive_writer_arg *rwa, if (err != 0) return (err); + + if (rwa->or_need_sync == ORNS_MAYBE) + rwa->or_need_sync = ORNS_YES; } if (next_err != ESRCH) return (next_err); @@ -1937,10 +2241,10 @@ flush_write_batch_impl(struct receive_writer_arg *rwa) if (err == 0) abd_free(abd); } else { - zio_prop_t zp; + zio_prop_t zp = {0}; dmu_write_policy(rwa->os, dn, 0, 0, &zp); - enum zio_flag zio_flags = 0; + zio_flag_t zio_flags = 0; if (rwa->raw) { zp.zp_encrypt = B_TRUE; @@ -2049,6 +2353,53 @@ receive_process_write_record(struct receive_writer_arg *rwa, !DMU_OT_IS_VALID(drrw->drr_type)) return (SET_ERROR(EINVAL)); + if (rwa->heal) { + blkptr_t *bp; + dmu_buf_t *dbp; + int flags = DB_RF_CANFAIL; + + if (rwa->raw) + flags |= DB_RF_NO_DECRYPT; + + if (rwa->byteswap) { + dmu_object_byteswap_t byteswap = + DMU_OT_BYTESWAP(drrw->drr_type); + dmu_ot_byteswap[byteswap].ob_func(abd_to_buf(rrd->abd), + DRR_WRITE_PAYLOAD_SIZE(drrw)); + } + + err = dmu_buf_hold_noread(rwa->os, drrw->drr_object, + drrw->drr_offset, FTAG, &dbp); + if (err != 0) + return (err); + + /* Try to read the object to see if it needs healing */ + err = dbuf_read((dmu_buf_impl_t *)dbp, NULL, flags); + /* + * We only try to heal when dbuf_read() returns a ECKSUMs. + * Other errors (even EIO) get returned to caller. + * EIO indicates that the device is not present/accessible, + * so writing to it will likely fail. + * If the block is healthy, we don't want to overwrite it + * unnecessarily. + */ + if (err != ECKSUM) { + dmu_buf_rele(dbp, FTAG); + return (err); + } + /* Make sure the on-disk block and recv record sizes match */ + if (drrw->drr_logical_size != dbp->db_size) { + err = ENOTSUP; + dmu_buf_rele(dbp, FTAG); + return (err); + } + /* Get the block pointer for the corrupted block */ + bp = dmu_buf_get_blkptr(dbp); + err = do_corrective_recv(rwa, drrw, rrd, bp); + dmu_buf_rele(dbp, FTAG); + return (err); + } + /* * For resuming to work, records must be in increasing order * by (object, offset). @@ -2189,7 +2540,7 @@ receive_spill(struct receive_writer_arg *rwa, struct drr_spill *drrs, * size of the provided arc_buf_t. */ if (db_spill->db_size != drrs->drr_length) { - dmu_buf_will_fill(db_spill, tx); + dmu_buf_will_fill(db_spill, tx, B_FALSE); VERIFY0(dbuf_spill_set_blksz(db_spill, drrs->drr_length, tx)); } @@ -2295,6 +2646,8 @@ receive_object_range(struct receive_writer_arg *rwa, memcpy(rwa->or_mac, drror->drr_mac, ZIO_DATA_MAC_LEN); rwa->or_byteorder = byteorder; + rwa->or_need_sync = ORNS_MAYBE; + return (0); } @@ -2341,7 +2694,8 @@ dmu_recv_cleanup_ds(dmu_recv_cookie_t *drc) rrw_exit(&ds->ds_bp_rwlock, FTAG); dsl_dataset_name(ds, name); dsl_dataset_disown(ds, dsflags, dmu_recv_tag); - (void) dsl_destroy_head(name); + if (!drc->drc_heal) + (void) dsl_destroy_head(name); } } @@ -2702,7 +3056,19 @@ receive_process_record(struct receive_writer_arg *rwa, ASSERT3U(rrd->bytes_read, >=, rwa->bytes_read); rwa->bytes_read = rrd->bytes_read; - if (rrd->header.drr_type != DRR_WRITE) { + /* We can only heal write records; other ones get ignored */ + if (rwa->heal && rrd->header.drr_type != DRR_WRITE) { + if (rrd->abd != NULL) { + abd_free(rrd->abd); + rrd->abd = NULL; + } else if (rrd->payload != NULL) { + kmem_free(rrd->payload, rrd->payload_size); + rrd->payload = NULL; + } + return (0); + } + + if (!rwa->heal && rrd->header.drr_type != DRR_WRITE) { err = flush_write_batch(rwa); if (err != 0) { if (rrd->abd != NULL) { @@ -2737,9 +3103,16 @@ receive_process_record(struct receive_writer_arg *rwa, case DRR_WRITE: { err = receive_process_write_record(rwa, rrd); - if (err != EAGAIN) { + if (rwa->heal) { /* - * On success, receive_process_write_record() returns + * If healing - always free the abd after processing + */ + abd_free(rrd->abd); + rrd->abd = NULL; + } else if (err != EAGAIN) { + /* + * On success, a non-healing + * receive_process_write_record() returns * EAGAIN to indicate that we do not want to free * the rrd or arc_buf. */ @@ -2830,8 +3203,9 @@ receive_writer_thread(void *arg) * EAGAIN indicates that this record has been saved (on * raw->write_batch), and will be used again, so we don't * free it. + * When healing data we always need to free the record. */ - if (err != EAGAIN) { + if (err != EAGAIN || rwa->heal) { if (rwa->err == 0) rwa->err = err; kmem_free(rrd, sizeof (*rrd)); @@ -2839,10 +3213,13 @@ receive_writer_thread(void *arg) } kmem_free(rrd, sizeof (*rrd)); - int err = flush_write_batch(rwa); - if (rwa->err == 0) - rwa->err = err; - + if (rwa->heal) { + zio_wait(rwa->heal_pio); + } else { + int err = flush_write_batch(rwa); + if (rwa->err == 0) + rwa->err = err; + } mutex_enter(&rwa->mutex); rwa->done = B_TRUE; cv_signal(&rwa->cv); @@ -2926,17 +3303,19 @@ dmu_recv_stream(dmu_recv_cookie_t *drc, offset_t *voffp) if (err != 0) goto out; - /* - * If this is a new dataset we set the key immediately. - * Otherwise we don't want to change the key until we - * are sure the rest of the receive succeeded so we stash - * the keynvl away until then. - */ - err = dsl_crypto_recv_raw(spa_name(drc->drc_os->os_spa), - drc->drc_ds->ds_object, drc->drc_fromsnapobj, - drc->drc_drrb->drr_type, keynvl, drc->drc_newfs); - if (err != 0) - goto out; + if (!drc->drc_heal) { + /* + * If this is a new dataset we set the key immediately. + * Otherwise we don't want to change the key until we + * are sure the rest of the receive succeeded so we + * stash the keynvl away until then. + */ + err = dsl_crypto_recv_raw(spa_name(drc->drc_os->os_spa), + drc->drc_ds->ds_object, drc->drc_fromsnapobj, + drc->drc_drrb->drr_type, keynvl, drc->drc_newfs); + if (err != 0) + goto out; + } /* see comment in dmu_recv_end_sync() */ drc->drc_ivset_guid = 0; @@ -2954,6 +3333,17 @@ dmu_recv_stream(dmu_recv_cookie_t *drc, offset_t *voffp) } /* + * For compatibility with recursive send streams, we do this here, + * rather than in dmu_recv_begin. If we pull the next header too + * early, and it's the END record, we break the `recv_skip` logic. + */ + if (drc->drc_drr_begin->drr_payloadlen == 0) { + err = receive_read_payload_and_next_header(drc, 0, NULL); + if (err != 0) + goto out; + } + + /* * If we failed before this point we will clean up any new resume * state that was created. Now that we've gotten past the initial * checks we are ok to retain that resume state. @@ -2967,11 +3357,17 @@ dmu_recv_stream(dmu_recv_cookie_t *drc, offset_t *voffp) mutex_init(&rwa->mutex, NULL, MUTEX_DEFAULT, NULL); rwa->os = drc->drc_os; rwa->byteswap = drc->drc_byteswap; + rwa->heal = drc->drc_heal; + rwa->tofs = drc->drc_tofs; rwa->resumable = drc->drc_resumable; rwa->raw = drc->drc_raw; rwa->spill = drc->drc_spill; rwa->full = (drc->drc_drr_begin->drr_u.drr_begin.drr_fromguid == 0); rwa->os->os_raw_receive = drc->drc_raw; + if (drc->drc_heal) { + rwa->heal_pio = zio_root(drc->drc_os->os_spa, NULL, NULL, + ZIO_FLAG_GODFATHER); + } list_create(&rwa->write_batch, sizeof (struct receive_record_arg), offsetof(struct receive_record_arg, node.bqn_node)); @@ -3107,7 +3503,9 @@ dmu_recv_end_check(void *arg, dmu_tx_t *tx) ASSERT3P(drc->drc_ds->ds_owner, ==, dmu_recv_tag); - if (!drc->drc_newfs) { + if (drc->drc_heal) { + error = 0; + } else if (!drc->drc_newfs) { dsl_dataset_t *origin_head; error = dsl_dataset_hold(dp, drc->drc_tofs, FTAG, &origin_head); @@ -3183,13 +3581,18 @@ dmu_recv_end_sync(void *arg, dmu_tx_t *tx) dmu_recv_cookie_t *drc = arg; dsl_pool_t *dp = dmu_tx_pool(tx); boolean_t encrypted = drc->drc_ds->ds_dir->dd_crypto_obj != 0; - uint64_t newsnapobj; + uint64_t newsnapobj = 0; spa_history_log_internal_ds(drc->drc_ds, "finish receiving", tx, "snap=%s", drc->drc_tosnap); drc->drc_ds->ds_objset->os_raw_receive = B_FALSE; - if (!drc->drc_newfs) { + if (drc->drc_heal) { + if (drc->drc_keynvl != NULL) { + nvlist_free(drc->drc_keynvl); + drc->drc_keynvl = NULL; + } + } else if (!drc->drc_newfs) { dsl_dataset_t *origin_head; VERIFY0(dsl_dataset_hold(dp, drc->drc_tofs, FTAG, @@ -3303,7 +3706,7 @@ dmu_recv_end_sync(void *arg, dmu_tx_t *tx) * tunable is set, in which case we will leave the newly-generated * value. */ - if (drc->drc_raw && drc->drc_ivset_guid != 0) { + if (!drc->drc_heal && drc->drc_raw && drc->drc_ivset_guid != 0) { dmu_object_zapify(dp->dp_meta_objset, newsnapobj, DMU_OT_DSL_DATASET, tx); VERIFY0(zap_update(dp->dp_meta_objset, newsnapobj, @@ -3370,7 +3773,7 @@ dmu_recv_end(dmu_recv_cookie_t *drc, void *owner) if (error != 0) { dmu_recv_cleanup_ds(drc); nvlist_free(drc->drc_keynvl); - } else { + } else if (!drc->drc_heal) { if (drc->drc_newfs) { zvol_create_minor(drc->drc_tofs); } @@ -3392,11 +3795,15 @@ dmu_objset_is_receiving(objset_t *os) os->os_dsl_dataset->ds_owner == dmu_recv_tag); } -ZFS_MODULE_PARAM(zfs_recv, zfs_recv_, queue_length, INT, ZMOD_RW, +ZFS_MODULE_PARAM(zfs_recv, zfs_recv_, queue_length, UINT, ZMOD_RW, "Maximum receive queue length"); -ZFS_MODULE_PARAM(zfs_recv, zfs_recv_, queue_ff, INT, ZMOD_RW, +ZFS_MODULE_PARAM(zfs_recv, zfs_recv_, queue_ff, UINT, ZMOD_RW, "Receive queue fill fraction"); -ZFS_MODULE_PARAM(zfs_recv, zfs_recv_, write_batch_size, INT, ZMOD_RW, +ZFS_MODULE_PARAM(zfs_recv, zfs_recv_, write_batch_size, UINT, ZMOD_RW, "Maximum amount of writes to batch into one transaction"); + +ZFS_MODULE_PARAM(zfs_recv, zfs_recv_, best_effort_corrective, INT, ZMOD_RW, + "Ignore errors during corrective receive"); +/* END CSTYLED */ |