diff options
| -rw-r--r-- | tests/bugs/quota/bug-1235182.t | 17 | ||||
| -rw-r--r-- | tests/bugs/quota/bug-1240991.t | 43 | ||||
| -rw-r--r-- | xlators/features/marker/src/marker-quota-helper.c | 5 | ||||
| -rw-r--r-- | xlators/features/marker/src/marker-quota.c | 549 | ||||
| -rw-r--r-- | xlators/features/marker/src/marker-quota.h | 5 | ||||
| -rw-r--r-- | xlators/features/marker/src/marker.c | 453 | ||||
| -rw-r--r-- | xlators/features/marker/src/marker.h | 4 | 
7 files changed, 540 insertions, 536 deletions
diff --git a/tests/bugs/quota/bug-1235182.t b/tests/bugs/quota/bug-1235182.t index e28b557f558..2f963e664c6 100644 --- a/tests/bugs/quota/bug-1235182.t +++ b/tests/bugs/quota/bug-1235182.t @@ -28,13 +28,22 @@ TEST $CLI volume quota $V0 limit-usage / 1GB  TEST $CLI volume quota $V0 hard-timeout 0  TEST $CLI volume quota $V0 soft-timeout 0 -$QDD $M0/f1 256 400& +TEST mkdir $M0/1 +$QDD $M0/1/f1 256 400&  PID=$! -EXPECT_WITHIN $PROCESS_UP_TIMEOUT "0" STAT $M0/f1 -TESTS_EXPECTED_IN_LOOP=50 +EXPECT_WITHIN $PROCESS_UP_TIMEOUT "0" STAT $M0/1/f1 +TESTS_EXPECTED_IN_LOOP=150  for i in {1..50}; do          ii=`expr $i + 1`; -        TEST_IN_LOOP mv $M0/f$i $M0/f$ii; +        touch $M0/$i/f$ii +        echo Hello > $M0/$i/f$ii + +        #rename within same dir +        TEST_IN_LOOP mv -f $M0/$i/f$i $M0/$i/f$ii; + +        #rename to different dir +        TEST_IN_LOOP mkdir $M0/$ii +        TEST_IN_LOOP mv -f $M0/$i/f$ii $M0/$ii/f$ii;  done  echo "Wait for process with pid $PID to complete" diff --git a/tests/bugs/quota/bug-1240991.t b/tests/bugs/quota/bug-1240991.t new file mode 100644 index 00000000000..2a1a6d1868e --- /dev/null +++ b/tests/bugs/quota/bug-1240991.t @@ -0,0 +1,43 @@ +#!/bin/bash + +# This regression test tries to ensure renaming a directory with content, and +# no limit set, is accounted properly, when moved into a directory with quota +# limit set. + +. $(dirname $0)/../../include.rc +. $(dirname $0)/../../volume.rc + +cleanup; + +TEST glusterd +TEST pidof glusterd; +TEST $CLI volume info; + +TEST $CLI volume create $V0 $H0:$B0/${V0} +TEST $CLI volume start $V0; + +TEST $CLI volume quota $V0 enable; + +TEST glusterfs --volfile-id=$V0 --volfile-server=$H0 $M0; + +TEST $CLI volume quota $V0 hard-timeout 0 +TEST $CLI volume quota $V0 soft-timeout 0 + +TEST mkdir -p $M0/dir/dir1 +TEST $CLI volume quota $V0 limit-objects /dir 20 + +TEST mkdir $M0/dir/dir1/d{1..5} +TEST touch $M0/dir/dir1/f{1..5} +TEST mv $M0/dir/dir1 $M0/dir/dir2 + +#Number of files under /dir is 5 +EXPECT_WITHIN $MARKER_UPDATE_TIMEOUT "5" quota_object_list_field "/dir" 4 + +#Number of directories under /dir is 7 +EXPECT_WITHIN $MARKER_UPDATE_TIMEOUT "7" quota_object_list_field "/dir" 5 + +TEST $CLI volume stop $V0 +TEST $CLI volume delete $V0 +EXPECT "1" get_aux + +cleanup; diff --git a/xlators/features/marker/src/marker-quota-helper.c b/xlators/features/marker/src/marker-quota-helper.c index a103bb945a2..df76f12cf47 100644 --- a/xlators/features/marker/src/marker-quota-helper.c +++ b/xlators/features/marker/src/marker-quota-helper.c @@ -174,6 +174,9 @@ mq_get_contribution_node (inode_t *inode, quota_inode_ctx_t *ctx)          LOCK (&ctx->lock);          { +                if (list_empty (&ctx->contribution_head)) +                        goto unlock; +                  list_for_each_entry (temp, &ctx->contribution_head,                                       contri_list) {                          if (gf_uuid_compare (temp->gfid, inode->gfid) == 0) { @@ -183,7 +186,9 @@ mq_get_contribution_node (inode_t *inode, quota_inode_ctx_t *ctx)                          }                  }          } +unlock:          UNLOCK (&ctx->lock); +  out:          return contri;  } diff --git a/xlators/features/marker/src/marker-quota.c b/xlators/features/marker/src/marker-quota.c index 2e51f338a31..9aae36963d7 100644 --- a/xlators/features/marker/src/marker-quota.c +++ b/xlators/features/marker/src/marker-quota.c @@ -2592,15 +2592,11 @@ out:  int32_t  mq_remove_contri (xlator_t *this, loc_t *loc, quota_inode_ctx_t *ctx, -                  inode_contribution_t *contri, quota_meta_t *delta, -                  gf_boolean_t remove_xattr) +                  inode_contribution_t *contri, quota_meta_t *delta)  {          int32_t              ret                         = -1;          char                 contri_key[CONTRI_KEY_MAX]  = {0, }; -        if (remove_xattr == _gf_false) -                goto done; -          GET_CONTRI_KEY (contri_key, contri->gfid, ret);          if (ret < 0) {                  gf_log (this->name, GF_LOG_ERROR, "get contri_key " @@ -2627,7 +2623,6 @@ mq_remove_contri (xlator_t *this, loc_t *loc, quota_inode_ctx_t *ctx,                  }          } -done:          LOCK (&contri->lock);          {                  contri->contribution += delta->size; @@ -2781,8 +2776,8 @@ mq_synctask_cleanup (int ret, call_frame_t *frame, void *opaque)  }  int -mq_synctask (xlator_t *this, synctask_fn_t task, gf_boolean_t spawn, loc_t *loc, -             int64_t contri) +mq_synctask1 (xlator_t *this, synctask_fn_t task, gf_boolean_t spawn, +              loc_t *loc, quota_meta_t *contri)  {          int32_t              ret         = -1;          quota_synctask_t    *args        = NULL; @@ -2798,7 +2793,14 @@ mq_synctask (xlator_t *this, synctask_fn_t task, gf_boolean_t spawn, loc_t *loc,          args->this = this;          loc_copy (&args->loc, loc); -        args->contri = contri; + +        if (contri) { +                args->contri = *contri; +        } else { +                args->contri.size = -1; +                args->contri.file_count = -1; +                args->contri.dir_count = -1; +        }          if (spawn) {                  ret = synctask_new1 (this->ctx->env, 1024 * 16, task, @@ -2817,6 +2819,12 @@ out:          return ret;  } +int +mq_synctask (xlator_t *this, synctask_fn_t task, gf_boolean_t spawn, loc_t *loc) +{ +        return mq_synctask1 (this, task, spawn, loc, NULL); +} +  int32_t  mq_prevalidate_txn (xlator_t *this, loc_t *origin_loc, loc_t *loc,                      quota_inode_ctx_t **ctx) @@ -2861,136 +2869,6 @@ out:  }  int -mq_start_quota_txn_v2 (xlator_t *this, loc_t *loc, quota_inode_ctx_t *ctx, -                       inode_contribution_t *contri) -{ -        int32_t            ret        = -1; -        loc_t              child_loc  = {0,}; -        loc_t              parent_loc = {0,}; -        gf_boolean_t       locked     = _gf_false; -        gf_boolean_t       dirty      = _gf_false; -        gf_boolean_t       status     = _gf_false; -        quota_meta_t       delta      = {0, }; - -        GF_VALIDATE_OR_GOTO ("marker", contri, out); -        GF_REF_GET (contri); - -        GF_VALIDATE_OR_GOTO ("marker", loc, out); -        GF_VALIDATE_OR_GOTO ("marker", loc->inode, out); -        GF_VALIDATE_OR_GOTO ("marker", ctx, out); - -        ret = mq_loc_copy (&child_loc, loc); -        if (ret < 0) { -                gf_log (this->name, GF_LOG_ERROR, "loc copy failed"); -                goto out; -        } - -        while (!__is_root_gfid (child_loc.gfid)) { -                /* To improve performance, abort current transaction -                 * if one is already in progress for same inode -                 */ -                if (status == _gf_true) { -                        /* status will alreday set before txn start, -                         * so it should not be set in first -                         * loop iteration -                         */ -                        ret = mq_test_and_set_ctx_updation_status (ctx, -                                                                   &status); -                        if (ret < 0 || status == _gf_true) -                                goto out; -                } - -                ret = mq_inode_loc_fill (NULL, child_loc.parent, &parent_loc); -                if (ret < 0) { -                        gf_log (this->name, GF_LOG_ERROR, "loc fill failed"); -                        goto out; -                } - -                ret = mq_lock (this, &parent_loc, F_WRLCK); -                if (ret < 0) -                        goto out; -                locked = _gf_true; - -                mq_set_ctx_updation_status (ctx, _gf_false); -                status = _gf_true; - -                ret = mq_get_delta (this, &child_loc, &delta, ctx, contri); -                if (ret < 0) -                        goto out; - -                if (quota_meta_is_null (&delta)) -                        goto out; - -                ret = mq_mark_dirty (this, &parent_loc, 1); -                if (ret < 0) -                        goto out; -                dirty = _gf_true; - -                ret = mq_update_contri (this, &child_loc, contri, &delta); -                if (ret < 0) -                        goto out; - -                ret = mq_update_size (this, &parent_loc, &delta); -                if (ret < 0) { -                        gf_log (this->name, GF_LOG_DEBUG, "rollback " -                                "contri updation"); -                        mq_sub_meta (&delta, NULL); -                        mq_update_contri (this, &child_loc, contri, &delta); -                        goto out; -                } - -                ret = mq_mark_dirty (this, &parent_loc, 0); -                dirty = _gf_false; - -                ret = mq_lock (this, &parent_loc, F_UNLCK); -                locked = _gf_false; - -                if (__is_root_gfid (parent_loc.gfid)) -                        break; - -                /* Repeate above steps upwards till the root */ -                loc_wipe (&child_loc); -                ret = mq_loc_copy (&child_loc, &parent_loc); -                if (ret < 0) -                        goto out; -                loc_wipe (&parent_loc); - -                ret = mq_inode_ctx_get (child_loc.inode, this, &ctx); -                if (ret < 0) -                        goto out; - -                if (list_empty (&ctx->contribution_head)) { -                        gf_log (this->name, GF_LOG_ERROR, -                                "contribution node list is empty (%s)", -                                uuid_utoa(child_loc.inode->gfid)); -                        ret = -1; -                        goto out; -                } - -                GF_REF_PUT (contri); -                contri = mq_get_contribution_node (child_loc.parent, ctx); -                GF_ASSERT (contri != NULL); -        } - -out: -        if (ret >= 0 && dirty) -                ret = mq_mark_dirty (this, &parent_loc, 0); - -        if (locked) -                ret = mq_lock (this, &parent_loc, F_UNLCK); - -        if (ctx && status == _gf_false) -                mq_set_ctx_updation_status (ctx, _gf_false); - -        loc_wipe (&child_loc); -        loc_wipe (&parent_loc); -        if (contri) -                GF_REF_PUT (contri); - -        return 0; -} - -int  mq_create_xattrs_task (void *opaque)  {          int32_t                  ret        = -1; @@ -3067,7 +2945,7 @@ _mq_create_xattrs_txn (xlator_t *this, loc_t *origin_loc, gf_boolean_t spawn)          if (ret < 0 || status == _gf_true)                  goto out; -        ret = mq_synctask (this, mq_create_xattrs_task, spawn, &loc, 0); +        ret = mq_synctask (this, mq_create_xattrs_task, spawn, &loc);  out:          if (ret < 0 && status == _gf_false)                  mq_set_ctx_create_status (ctx, _gf_false); @@ -3109,13 +2987,13 @@ mq_reduce_parent_size_task (void *opaque)          quota_inode_ctx_t       *ctx           = NULL;          inode_contribution_t    *contribution  = NULL;          quota_meta_t             delta         = {0, }; +        quota_meta_t             contri        = {0, };          loc_t                    parent_loc    = {0,};          gf_boolean_t             locked        = _gf_false;          gf_boolean_t             dirty         = _gf_false;          quota_synctask_t        *args          = NULL;          xlator_t                *this          = NULL;          loc_t                   *loc           = NULL; -        int64_t                  contri        = 0;          gf_boolean_t             remove_xattr  = _gf_true;          GF_ASSERT (opaque); @@ -3133,26 +3011,6 @@ mq_reduce_parent_size_task (void *opaque)                  goto out;          } -        contribution = mq_get_contribution_node (loc->parent, ctx); -        if (contribution == NULL) { -                ret = -1; -                gf_log_callingfn (this->name, GF_LOG_WARNING, -                                  "contribution for the node %s is NULL", -                                  loc->path); -                goto out; -        } - -        if (contri >= 0) { -                /* contri paramater is supplied only for rename operation. -                 * remove xattr is alreday performed, we need to skip -                 * removexattr for rename operation -                 */ -                remove_xattr = _gf_false; -                delta.size = contri; -                delta.file_count = 1; -                delta.dir_count = 0; -        } -          ret = mq_inode_loc_fill (NULL, loc->parent, &parent_loc);          if (ret < 0) {                  gf_log (this->name, GF_LOG_ERROR, "loc fill failed"); @@ -3164,7 +3022,26 @@ mq_reduce_parent_size_task (void *opaque)                  goto out;          locked = _gf_true; -        if (contri < 0) { +        if (contri.size >= 0) { +                /* contri paramater is supplied only for rename operation. +                 * remove xattr is alreday performed, we need to skip +                 * removexattr for rename operation +                 */ +                remove_xattr = _gf_false; +                delta.size = contri.size; +                delta.file_count = contri.file_count; +                delta.dir_count = contri.dir_count; +        } else { +                remove_xattr = _gf_true; +                contribution = mq_get_contribution_node (loc->parent, ctx); +                if (contribution == NULL) { +                        ret = -1; +                        gf_log (this->name, GF_LOG_DEBUG, +                                "contribution for the node %s is NULL", +                                loc->path); +                        goto out; +                } +                  LOCK (&contribution->lock);                  {                          delta.size = contribution->contribution; @@ -3174,26 +3051,6 @@ mq_reduce_parent_size_task (void *opaque)                  UNLOCK (&contribution->lock);          } -        /* TODO: Handle handlinks with better approach -           Iterating dentry_list without a lock is not a good idea -        if (loc->inode->ia_type != IA_IFDIR) { -                list_for_each_entry (dentry, &inode->dentry_list, inode_list) { -                        if (loc->parent == dentry->parent) { -                                * If the file has another link within the same -                                * directory, we should not be reducing the size -                                * of parent -                                * -                                delta = 0; -                                idelta = 0; -                                break; -                        } -                } -        } -        */ - -        if (quota_meta_is_null (&delta)) -                goto out; -          ret = mq_mark_dirty (this, &parent_loc, 1);          if (ret < 0)                  goto out; @@ -3201,9 +3058,13 @@ mq_reduce_parent_size_task (void *opaque)          mq_sub_meta (&delta, NULL); -        ret = mq_remove_contri (this, loc, ctx, contribution, &delta, -                                remove_xattr); -        if (ret < 0) +        if (remove_xattr) { +                ret = mq_remove_contri (this, loc, ctx, contribution, &delta); +                if (ret < 0) +                        goto out; +        } + +        if (quota_meta_is_null (&delta))                  goto out;          ret = mq_update_size (this, &parent_loc, &delta); @@ -3229,7 +3090,8 @@ out:  }  int32_t -mq_reduce_parent_size_txn (xlator_t *this, loc_t *origin_loc, int64_t contri) +mq_reduce_parent_size_txn (xlator_t *this, loc_t *origin_loc, +                           quota_meta_t *contri)  {          int32_t                  ret           = -1;          loc_t                    loc           = {0, }; @@ -3252,8 +3114,8 @@ mq_reduce_parent_size_txn (xlator_t *this, loc_t *origin_loc, int64_t contri)                  goto out;          } -        ret = mq_synctask (this, mq_reduce_parent_size_task, _gf_true, &loc, -                           contri); +        ret = mq_synctask1 (this, mq_reduce_parent_size_task, _gf_true, &loc, +                            contri);  out:          loc_wipe (&loc);          return ret; @@ -3262,74 +3124,204 @@ out:  int  mq_initiate_quota_task (void *opaque)  { -        int32_t                 ret          = -1; -        quota_inode_ctx_t      *ctx          = NULL; -        inode_contribution_t   *contribution = NULL; -        quota_synctask_t       *args         = NULL; -        xlator_t               *this         = NULL; -        loc_t                  *loc          = NULL; - -        GF_ASSERT (opaque); +        int32_t                ret        = -1; +        loc_t                  child_loc  = {0,}; +        loc_t                  parent_loc = {0,}; +        gf_boolean_t           locked     = _gf_false; +        gf_boolean_t           dirty      = _gf_false; +        gf_boolean_t           status     = _gf_false; +        quota_meta_t           delta      = {0, }; +        quota_synctask_t      *args       = NULL; +        xlator_t              *this       = NULL; +        loc_t                 *loc        = NULL; +        inode_contribution_t  *contri     = NULL; +        quota_inode_ctx_t     *ctx        = NULL; +        inode_t               *tmp_parent = NULL; + +        GF_VALIDATE_OR_GOTO ("marker", opaque, out);          args = (quota_synctask_t *) opaque;          loc = &args->loc;          this = args->this; + +        GF_VALIDATE_OR_GOTO ("marker", this, out);          THIS = this; -        ret = mq_inode_ctx_get (loc->inode, this, &ctx); -        if (ret == -1) { -                gf_log (this->name, GF_LOG_WARNING, -                        "inode ctx get failed, aborting quota txn"); +        GF_VALIDATE_OR_GOTO (this->name, loc, out); +        GF_VALIDATE_OR_GOTO (this->name, loc->inode, out); + +        ret = mq_loc_copy (&child_loc, loc); +        if (ret < 0) { +                gf_log (this->name, GF_LOG_ERROR, "loc copy failed");                  goto out;          } -        /* Create the contribution node if its absent. Is it right to -           assume that if the contribution node is not there, then -           create one and proceed instead of returning? -           Reason for this assumption is for hard links. Suppose -           hard link for a file f1 present in a directory d1 is -           created in the directory d2 (as f2). Now, since d2's -           contribution is not there in f1's inode ctx, d2's -           contribution xattr wont be created and will create problems -           for quota operations. -        */ -        contribution = mq_get_contribution_node (loc->parent, ctx); -        if (!contribution) { -                if (!loc_is_root(loc)) -                        gf_log_callingfn (this->name, GF_LOG_TRACE, -                                          "contribution node for the " -                                          "path (%s) with parent (%s) " -                                          "not found", loc->path, -                                          loc->parent ? -                                          uuid_utoa (loc->parent->gfid) : -                                          NULL); +        while (!__is_root_gfid (child_loc.gfid)) { -                contribution = mq_add_new_contribution_node (this, ctx, loc); -                if (!contribution) { -                        if (!loc_is_root(loc)) -                                gf_log_callingfn (this->name, GF_LOG_WARNING, -                                                  "could not allocate " -                                                  " contribution node for (%s) " -                                                  "parent: (%s)", loc->path, -                                                  loc->parent ? -                                                 uuid_utoa (loc->parent->gfid) : -                                                  NULL); -                        ret = -1; +                ret = mq_inode_ctx_get (child_loc.inode, this, &ctx); +                if (ret < 0) { +                        gf_log (this->name, GF_LOG_WARNING, +                                "inode ctx get failed for %s, " +                                "aborting update txn", child_loc.path);                          goto out;                  } -        } -        mq_start_quota_txn_v2 (this, loc, ctx, contribution); +                /* To improve performance, abort current transaction +                 * if one is already in progress for same inode +                 */ +                if (status == _gf_true) { +                        /* status will alreday set before txn start, +                         * so it should not be set in first +                         * loop iteration +                         */ +                        ret = mq_test_and_set_ctx_updation_status (ctx, +                                                                   &status); +                        if (ret < 0 || status == _gf_true) +                                goto out; +                } + +                ret = mq_inode_loc_fill (NULL, child_loc.parent, &parent_loc); +                if (ret < 0) { +                        gf_log (this->name, GF_LOG_ERROR, "loc fill failed"); +                        goto out; +                } + +                ret = mq_lock (this, &parent_loc, F_WRLCK); +                if (ret < 0) +                        goto out; +                locked = _gf_true; + +                mq_set_ctx_updation_status (ctx, _gf_false); +                status = _gf_true; + +                /* Contribution node can be NULL in below scenarios and +                   create if needed: + +                   Scenario 1) +                   In this case create a new contribution node +                   Suppose hard link for a file f1 present in a directory d1 is +                   created in the directory d2 (as f2). Now, since d2's +                   contribution is not there in f1's inode ctx, d2's +                   contribution xattr wont be created and will create problems +                   for quota operations. + +                   Don't create contribution if parent has been changed after +                   taking a lock, this can happen when rename is performed +                   and writes is still in-progress for the same file + +                   Scenario 2) +                   When a rename operation is performed, contribution node +                   for olp path will be removed. + +                   Create contribution node only if oldparent is same as +                   newparent. +                   Consider below example +                   1) rename FOP invoked on file 'x' +                   2) write is still in progress for file 'x' +                   3) rename takes a lock on old-parent +                   4) write-update txn blocked on old-parent to acquire lock +                   5) in rename_cbk, contri xattrs are removed and contribution +                      is deleted and lock is released +                   6) now write-update txn gets the lock and updates the +                      wrong parent as it was holding lock on old parent +                      so validate parent once the lock is acquired + +                     For more information on thsi problem, please see +                     doc for marker_rename in file marker.c +                */ +                contri = mq_get_contribution_node (child_loc.parent, ctx); +                if (contri == NULL) { +                        tmp_parent = inode_parent (child_loc.inode, 0, NULL); +                        if (tmp_parent == NULL) { +                                ret = -1; +                                goto out; +                        } +                        if (gf_uuid_compare(tmp_parent->gfid, +                                            parent_loc.gfid)) { +                                /* abort txn if parent has changed */ +                                ret = 0; +                                goto out; +                        } + +                        inode_unref (tmp_parent); +                        tmp_parent = NULL; + +                        contri = mq_add_new_contribution_node (this, ctx, +                                                               &child_loc); +                        if (contri == NULL) { +                                gf_log (this->name, GF_LOG_ERROR, "Failed to " +                                        "create contribution node for %s, " +                                        "abort update txn", child_loc.path); +                                ret = -1; +                                goto out; +                        } +                } + +                ret = mq_get_delta (this, &child_loc, &delta, ctx, contri); +                if (ret < 0) +                        goto out; + +                if (quota_meta_is_null (&delta)) +                        goto out; + +                ret = mq_mark_dirty (this, &parent_loc, 1); +                if (ret < 0) +                        goto out; +                dirty = _gf_true; + +                ret = mq_update_contri (this, &child_loc, contri, &delta); +                if (ret < 0) +                        goto out; + +                ret = mq_update_size (this, &parent_loc, &delta); +                if (ret < 0) { +                        gf_log (this->name, GF_LOG_DEBUG, "rollback " +                                "contri updation"); +                        mq_sub_meta (&delta, NULL); +                        mq_update_contri (this, &child_loc, contri, &delta); +                        goto out; +                } + +                ret = mq_mark_dirty (this, &parent_loc, 0); +                dirty = _gf_false; + +                ret = mq_lock (this, &parent_loc, F_UNLCK); +                locked = _gf_false; + +                if (__is_root_gfid (parent_loc.gfid)) +                        break; + +                /* Repeate above steps upwards till the root */ +                loc_wipe (&child_loc); +                ret = mq_loc_copy (&child_loc, &parent_loc); +                if (ret < 0) +                        goto out; + +                loc_wipe (&parent_loc); +                GF_REF_PUT (contri); +                contri = NULL; +        } -        ret = 0;  out: -        if (contribution) -                GF_REF_PUT (contribution); +        if (ret >= 0 && dirty) +                ret = mq_mark_dirty (this, &parent_loc, 0); -        if (ctx && ret < 0) +        if (locked) +                ret = mq_lock (this, &parent_loc, F_UNLCK); + +        if (ctx && status == _gf_false)                  mq_set_ctx_updation_status (ctx, _gf_false); -        return ret; +        loc_wipe (&child_loc); +        loc_wipe (&parent_loc); + +        if (tmp_parent) +                inode_unref (tmp_parent); + +        if (contri) +                GF_REF_PUT (contri); + +        return 0;  }  int @@ -3359,7 +3351,7 @@ _mq_initiate_quota_txn (xlator_t *this, loc_t *origin_loc, gf_boolean_t spawn)          if (ret < 0 || status == _gf_true)                  goto out; -        ret = mq_synctask (this, mq_initiate_quota_task, spawn, &loc, 0); +        ret = mq_synctask (this, mq_initiate_quota_task, spawn, &loc);  out:          if (ret < 0 && status == _gf_false) @@ -3556,14 +3548,14 @@ mq_update_dirty_inode_txn (xlator_t *this, loc_t *loc)          GF_VALIDATE_OR_GOTO ("marker", loc, out);          GF_VALIDATE_OR_GOTO ("marker", loc->inode, out); -        ret = mq_synctask (this, mq_update_dirty_inode_task, _gf_true, -                           loc, 0); +        ret = mq_synctask (this, mq_update_dirty_inode_task, _gf_true, loc);  out:          return ret;  }  int32_t -mq_inspect_directory_xattr (xlator_t *this, loc_t *loc, quota_inode_ctx_t *ctx, +mq_inspect_directory_xattr (xlator_t *this, quota_inode_ctx_t *ctx, +                            inode_contribution_t *contribution, loc_t *loc,                              dict_t *dict, struct iatt buf)  {          int32_t               ret                          = 0; @@ -3572,19 +3564,6 @@ mq_inspect_directory_xattr (xlator_t *this, loc_t *loc, quota_inode_ctx_t *ctx,          quota_meta_t          contri                       = {0, };          quota_meta_t          delta                        = {0, };          char                  contri_key[CONTRI_KEY_MAX]   = {0, }; -        inode_contribution_t *contribution                 = NULL; - -        if (!loc_is_root(loc)) { -                contribution = mq_add_new_contribution_node (this, ctx, loc); -                if (contribution == NULL) { -                        if (!gf_uuid_is_null (loc->inode->gfid)) -                                gf_log (this->name, GF_LOG_DEBUG, -                                        "cannot add a new contribution node " -                                        "(%s)", uuid_utoa (loc->inode->gfid)); -                        ret = -1; -                        goto out; -                } -        }          ret = dict_get_int8 (dict, QUOTA_DIRTY_KEY, &dirty);          if (ret < 0) { @@ -3647,14 +3626,12 @@ create_xattr:                  ret = mq_create_xattrs_txn (this, loc);  out: -        if (contribution) -                GF_REF_PUT (contribution); -          return ret;  }  int32_t -mq_inspect_file_xattr (xlator_t *this, quota_inode_ctx_t *ctx, loc_t *loc, +mq_inspect_file_xattr (xlator_t *this, quota_inode_ctx_t *ctx, +                       inode_contribution_t *contribution, loc_t *loc,                         dict_t *dict, struct iatt buf)  {          int32_t               ret                          = -1; @@ -3662,15 +3639,6 @@ mq_inspect_file_xattr (xlator_t *this, quota_inode_ctx_t *ctx, loc_t *loc,          quota_meta_t          contri                       = {0, };          quota_meta_t          delta                        = {0, };          char                  contri_key[CONTRI_KEY_MAX]   = {0, }; -        inode_contribution_t *contribution                 = NULL; - -        contribution = mq_add_new_contribution_node (this, ctx, loc); -        if (contribution == NULL) { -                gf_log_callingfn (this->name, GF_LOG_DEBUG, "cannot allocate " -                                  "contribution node (path:%s)", loc->path); -                ret = -1; -                goto out; -        }          LOCK (&ctx->lock);          { @@ -3708,9 +3676,6 @@ mq_inspect_file_xattr (xlator_t *this, quota_inode_ctx_t *ctx, loc_t *loc,          /* TODO: revist this code when fixing hardlinks */  out: -        if (contribution) -                GF_REF_PUT (contribution); -          return ret;  } @@ -3718,22 +3683,48 @@ int32_t  mq_xattr_state (xlator_t *this, loc_t *origin_loc, dict_t *dict,                  struct iatt buf)  { -        int32_t               ret     = -1; -        quota_inode_ctx_t    *ctx     = NULL; -        loc_t                 loc     = {0, }; +        int32_t               ret             = -1; +        quota_inode_ctx_t    *ctx             = NULL; +        loc_t                 loc             = {0, }; +        inode_contribution_t *contribution    = NULL; + +        if (((buf.ia_type == IA_IFREG) && !dht_is_linkfile (&buf, dict)) +            || (buf.ia_type == IA_IFLNK) || (buf.ia_type == IA_IFDIR)) { +                /* do healing only for these type of files */ +        } else { +                ret = 0; +                goto out; +        }          ret = mq_prevalidate_txn (this, origin_loc, &loc, &ctx);          if (ret < 0)                  goto out; -        if (((buf.ia_type == IA_IFREG) && !dht_is_linkfile (&buf, dict)) -            || (buf.ia_type == IA_IFLNK))  { -                mq_inspect_file_xattr (this, ctx, &loc, dict, buf); -        } else if (buf.ia_type == IA_IFDIR) -                mq_inspect_directory_xattr (this, &loc, ctx, dict, buf); +        if (!loc_is_root(&loc)) { +                contribution = mq_add_new_contribution_node (this, ctx, &loc); +                if (contribution == NULL) { +                        if (!gf_uuid_is_null (loc.inode->gfid)) +                                gf_log (this->name, GF_LOG_WARNING, +                                        "cannot add a new contribution node " +                                        "(%s)", uuid_utoa (loc.gfid)); +                        ret = -1; +                        goto out; +                } +        } + +        if (buf.ia_type == IA_IFDIR) +                mq_inspect_directory_xattr (this, ctx, contribution, &loc, dict, +                                            buf); +        else +                mq_inspect_file_xattr (this, ctx, contribution, &loc, dict, +                                       buf);  out:          loc_wipe (&loc); + +        if (contribution) +                GF_REF_PUT (contribution); +          return ret;  } diff --git a/xlators/features/marker/src/marker-quota.h b/xlators/features/marker/src/marker-quota.h index 89ac559954d..65cb0b297ac 100644 --- a/xlators/features/marker/src/marker-quota.h +++ b/xlators/features/marker/src/marker-quota.h @@ -18,6 +18,7 @@  #include "xlator.h"  #include "marker-mem-types.h"  #include "refcount.h" +#include "quota-common-utils.h"  #define QUOTA_XATTR_PREFIX "trusted.glusterfs"  #define QUOTA_DIRTY_KEY "trusted.glusterfs.quota.dirty" @@ -98,7 +99,7 @@ typedef struct quota_inode_ctx quota_inode_ctx_t;  struct quota_synctask {          xlator_t      *this;          loc_t          loc; -        int64_t        contri; +        quota_meta_t   contri;          gf_boolean_t   is_static;  };  typedef struct quota_synctask quota_synctask_t; @@ -146,7 +147,7 @@ int32_t  mq_reduce_parent_size (xlator_t *, loc_t *, int64_t);  int32_t -mq_reduce_parent_size_txn (xlator_t *, loc_t *, int64_t); +mq_reduce_parent_size_txn (xlator_t *, loc_t *, quota_meta_t *);  int32_t  mq_rename_update_newpath (xlator_t *, loc_t *); diff --git a/xlators/features/marker/src/marker.c b/xlators/features/marker/src/marker.c index ba599da2d5f..6265028a9df 100644 --- a/xlators/features/marker/src/marker.c +++ b/xlators/features/marker/src/marker.c @@ -210,6 +210,11 @@ marker_local_unref (marker_local_t *local)          if (local->xdata)                  dict_unref (local->xdata); +        if (local->lk_frame) { +                STACK_DESTROY (local->lk_frame->root); +                local->lk_frame = NULL; +        } +          if (local->oplocal) {                  marker_local_unref (local->oplocal);                  local->oplocal = NULL; @@ -836,7 +841,7 @@ marker_rmdir_cbk (call_frame_t *frame, void *cookie, xlator_t *this,          priv = this->private;          if (priv->feature_enabled & GF_QUOTA) -                mq_reduce_parent_size_txn (this, &local->loc, -1); +                mq_reduce_parent_size_txn (this, &local->loc, NULL);          if (priv->feature_enabled & GF_XTIME)                  marker_xtime_update_marks (this, local); @@ -905,7 +910,7 @@ marker_unlink_cbk (call_frame_t *frame, void *cookie, xlator_t *this,          if (priv->feature_enabled & GF_QUOTA) {                  if (!local->skip_txn) -                        mq_reduce_parent_size_txn (this, &local->loc, -1); +                        mq_reduce_parent_size_txn (this, &local->loc, NULL);          }          if (priv->feature_enabled & GF_XTIME) @@ -1048,35 +1053,23 @@ marker_rename_done (call_frame_t *frame, void *cookie, xlator_t *this,          frame->local = NULL;          if (op_ret < 0) { -                if (local->err == 0) { -                        local->err =  op_errno ? op_errno : EINVAL; -                } -                  gf_log (this->name, GF_LOG_WARNING,                          "inodelk (UNLOCK) failed on path:%s (gfid:%s) (%s)", -                        local->parent_loc.path, -                        uuid_utoa (local->parent_loc.inode->gfid), +                        oplocal->parent_loc.path, +                        uuid_utoa (oplocal->parent_loc.inode->gfid),                          strerror (op_errno));          } -        if (local->stub != NULL) { -                call_resume (local->stub); -                local->stub = NULL; -        } else if (local->err != 0) { -                STACK_UNWIND_STRICT (rename, frame, -1, local->err, NULL, NULL, -                                     NULL, NULL, NULL, NULL); -        } else { -                gf_log (this->name, GF_LOG_CRITICAL, -                        "continuation stub to unwind the call is absent, hence " -                        "call will be hung (call-stack id = %"PRIu64")", -                        frame->root->unique); -        } +        if (local->err != 0) +                goto err; -        mq_reduce_parent_size_txn (this, &oplocal->loc, oplocal->contribution); +        mq_reduce_parent_size_txn (this, &oplocal->loc, &oplocal->contribution);          if (local->loc.inode != NULL) { -                mq_reduce_parent_size_txn (this, &local->loc, -                                           local->contribution); +                /* If destination file exits before rename, it would have +                 * been unlinked while renaming a file +                 */ +                mq_reduce_parent_size_txn (this, &local->loc, NULL);          }          newloc.inode = inode_ref (oplocal->loc.inode); @@ -1097,39 +1090,26 @@ marker_rename_done (call_frame_t *frame, void *cookie, xlator_t *this,                  marker_xtime_update_marks (this, local);          } +err:          marker_local_unref (local);          marker_local_unref (oplocal); +          return 0;  } -int32_t -marker_rename_release_newp_lock (call_frame_t *frame, void *cookie, -                                 xlator_t *this, int32_t op_ret, -                                 int32_t op_errno, dict_t *xdata) +void +marker_rename_release_oldp_lock (marker_local_t *local, xlator_t *this)  { -        marker_local_t  *local = NULL, *oplocal = NULL; -        struct gf_flock  lock  = {0, }; +        marker_local_t        *oplocal  = NULL; +        call_frame_t          *lk_frame = NULL; +        struct gf_flock        lock     = {0, }; -        local = frame->local;          oplocal = local->oplocal; +        lk_frame = local->lk_frame; -        if (op_ret < 0) { -                if (local->err == 0) { -                        local->err = op_errno ? op_errno : EINVAL; -                } - -                gf_log (this->name, GF_LOG_WARNING, -                        "inodelk (UNLOCK) failed on %s (gfid:%s) (%s)", -                        oplocal->parent_loc.path, -                        uuid_utoa (oplocal->parent_loc.inode->gfid), -                        strerror (op_errno)); -        } - -        if (local->next_lock_on == NULL) { -                marker_rename_done (frame, NULL, this, 0, 0, NULL); -                goto out; -        } +        if (lk_frame == NULL) +                goto err;          lock.l_type   = F_UNLCK;          lock.l_whence = SEEK_SET; @@ -1137,47 +1117,75 @@ marker_rename_release_newp_lock (call_frame_t *frame, void *cookie,          lock.l_len    = 0;          lock.l_pid    = 0; -        STACK_WIND (frame, +        STACK_WIND (lk_frame,                      marker_rename_done,                      FIRST_CHILD(this),                      FIRST_CHILD(this)->fops->inodelk, -                    this->name, &local->parent_loc, F_SETLKW, &lock, NULL); +                    this->name, &oplocal->parent_loc, F_SETLKW, &lock, NULL); -out: -        return 0; +        return; + +err: +        marker_local_unref (local); +        marker_local_unref (oplocal);  }  int32_t -marker_rename_release_oldp_lock (call_frame_t *frame, void *cookie, -                                 xlator_t *this, int32_t op_ret, -                                 int32_t op_errno, dict_t *xdata) +marker_rename_unwind (call_frame_t *frame, void *cookie, xlator_t *this, +                      int32_t op_ret, int32_t op_errno, dict_t *xdata)  { -        marker_local_t  *local = NULL, *oplocal = NULL; -        struct gf_flock  lock  = {0, }; +        marker_local_t       *local    = NULL; +        marker_local_t       *oplocal  = NULL; +        quota_inode_ctx_t    *ctx      = NULL; +        inode_contribution_t *contri   = NULL;          local = frame->local;          oplocal = local->oplocal; - -        if ((op_ret < 0) && (op_errno != ENOATTR) && (op_errno != ENODATA)) { -                local->err = op_errno; -        } +        frame->local = NULL;          //Reset frame uid and gid if set.          if (cookie == (void *) _GF_UID_GID_CHANGED)                  MARKER_RESET_UID_GID (frame, frame->root, local); -        lock.l_type   = F_UNLCK; -        lock.l_whence = SEEK_SET; -        lock.l_start  = 0; -        lock.l_len    = 0; -        lock.l_pid    = 0; +        if (op_ret < 0) +                local->err =  op_errno ? op_errno : EINVAL; + +        if (local->stub != NULL) { +                /* Remove contribution node from in-memory even if +                 * remove-xattr has failed as the rename is already performed +                 * if local->stub is set, which means rename was sucessful +                 */ +                mq_inode_ctx_get (oplocal->loc.inode, this, &ctx); +                if (ctx) { +                        contri = mq_get_contribution_node (oplocal->loc.parent, +                                                           ctx); +                        if (contri) { +                                QUOTA_FREE_CONTRIBUTION_NODE (ctx, contri); +                                GF_REF_PUT (contri); +                        } +                } + +                call_resume (local->stub); +                local->stub = NULL; +                local->err = 0; +        } else if (local->err != 0) { +                STACK_UNWIND_STRICT (rename, frame, -1, local->err, NULL, NULL, +                                     NULL, NULL, NULL, NULL); +        } else { +                gf_log (this->name, GF_LOG_CRITICAL, +                        "continuation stub to unwind the call is absent, hence " +                        "call will be hung (call-stack id = %"PRIu64")", +                        frame->root->unique); +        } + +        /* If there are in-progress writes on old-path when during rename +         * operation, update txn will update the wrong path if lock +         * is released before rename unwind. +         * So release lock only after rename unwind +         */ +        marker_rename_release_oldp_lock (local, this); -        STACK_WIND (frame, -                    marker_rename_release_newp_lock, -                    FIRST_CHILD(this), -                    FIRST_CHILD(this)->fops->inodelk, -                    this->name, &oplocal->parent_loc, F_SETLKW, &lock, NULL);          return 0;  } @@ -1249,7 +1257,7 @@ marker_rename_cbk (call_frame_t *frame, void *cookie, xlator_t *this,                  newloc.parent = inode_ref (local->loc.parent);                  gf_uuid_copy (newloc.gfid, oplocal->loc.inode->gfid); -                STACK_WIND_COOKIE (frame, marker_rename_release_oldp_lock, +                STACK_WIND_COOKIE (frame, marker_rename_unwind,                                     frame->cookie, FIRST_CHILD(this),                                     FIRST_CHILD(this)->fops->removexattr,                                     &newloc, contri_key, NULL); @@ -1283,7 +1291,7 @@ out:          return 0;  quota_err: -        marker_rename_release_oldp_lock (frame, NULL, this, 0, 0, NULL); +        marker_rename_unwind (frame, NULL, this, 0, 0, NULL);          return 0;  } @@ -1296,60 +1304,7 @@ marker_do_rename (call_frame_t *frame, void *cookie, xlator_t *this,          marker_local_t       *oplocal                    = NULL;          char                  contri_key[CONTRI_KEY_MAX] = {0, };          int32_t               ret                        = 0; -        int64_t              *contribution               = 0; - -        local = frame->local; -        oplocal = local->oplocal; - -        //Reset frame uid and gid if set. -        if (cookie == (void *) _GF_UID_GID_CHANGED) -                MARKER_RESET_UID_GID (frame, frame->root, local); - -        if ((op_ret < 0) && (op_errno != ENOATTR) && (op_errno != ENODATA)) { -                local->err = op_errno ? op_errno : EINVAL; -                gf_log (this->name, GF_LOG_WARNING, -                        "fetching contribution values from %s (gfid:%s) " -                        "failed (%s)", local->loc.path, -                        uuid_utoa (local->loc.inode->gfid), -                        strerror (op_errno)); -                goto err; -        } - -        if (local->loc.inode != NULL) { -                GET_CONTRI_KEY (contri_key, local->loc.parent->gfid, ret); -                if (ret < 0) { -                        local->err = errno ? errno : ENOMEM; -                        goto err; -                } - -                if (dict_get_bin (dict, contri_key, -                                  (void **) &contribution) == 0) { -                        local->contribution = ntoh64 (*contribution); -                } -        } - -        STACK_WIND (frame, marker_rename_cbk, FIRST_CHILD(this), -                    FIRST_CHILD(this)->fops->rename, &oplocal->loc, -                    &local->loc, local->xdata); - -        return 0; - -err: -        marker_rename_release_oldp_lock (frame, NULL, this, 0, 0, NULL); -        return 0; -} - - -int32_t -marker_get_newpath_contribution (call_frame_t *frame, void *cookie, -                                 xlator_t *this, int32_t op_ret, -                                 int32_t op_errno, dict_t *dict, dict_t *xdata) -{ -        marker_local_t *local                      = NULL; -        marker_local_t *oplocal                    = NULL; -        char            contri_key[CONTRI_KEY_MAX] = {0, }; -        int32_t         ret                        = 0; -        int64_t        *contribution               = 0; +        quota_meta_t          contribution               = {0, };          local = frame->local;          oplocal = local->oplocal; @@ -1373,68 +1328,51 @@ marker_get_newpath_contribution (call_frame_t *frame, void *cookie,                  local->err = errno ? errno : ENOMEM;                  goto err;          } +        quota_dict_get_meta (dict, contri_key, &contribution); +        oplocal->contribution = contribution; -        if (dict_get_bin (dict, contri_key, (void **) &contribution) == 0) -                oplocal->contribution = ntoh64 (*contribution); - -        if (local->loc.inode != NULL) { -                GET_CONTRI_KEY (contri_key, local->loc.parent->gfid, ret); -                if (ret < 0) { -                        local->err = errno ? errno : ENOMEM; -                        goto err; -                } - -                /* getxattr requires uid and gid to be 0, -                 * reset them in the callback. -                 */ -                MARKER_SET_UID_GID (frame, local, frame->root); -                if (gf_uuid_is_null (local->loc.gfid)) -                        gf_uuid_copy (local->loc.gfid, local->loc.inode->gfid); - -                GF_UUID_ASSERT (local->loc.gfid); - -                STACK_WIND_COOKIE (frame, marker_do_rename, -                                   frame->cookie, FIRST_CHILD(this), -                                   FIRST_CHILD(this)->fops->getxattr, -                                   &local->loc, contri_key, NULL); -        } else { -                marker_do_rename (frame, NULL, this, 0, 0, NULL, NULL); -        } +        STACK_WIND (frame, marker_rename_cbk, FIRST_CHILD(this), +                    FIRST_CHILD(this)->fops->rename, &oplocal->loc, +                    &local->loc, local->xdata);          return 0; +  err: -        marker_rename_release_oldp_lock (frame, NULL, this, 0, 0, NULL); +        marker_rename_unwind (frame, NULL, this, 0, 0, NULL);          return 0;  } -  int32_t -marker_get_oldpath_contribution (call_frame_t *frame, void *cookie, +marker_get_oldpath_contribution (call_frame_t *lk_frame, void *cookie,                                   xlator_t *this, int32_t op_ret,                                   int32_t op_errno, dict_t *xdata)  { -        marker_local_t *local                      = NULL; -        marker_local_t *oplocal                    = NULL; -        char            contri_key[CONTRI_KEY_MAX] = {0, }; -        int32_t         ret                        = 0; +        call_frame_t    *frame                      = NULL; +        marker_local_t  *local                      = NULL; +        marker_local_t  *oplocal                    = NULL; +        char             contri_key[CONTRI_KEY_MAX] = {0, }; +        int32_t          ret                        = 0; -        local = frame->local; +        local = lk_frame->local;          oplocal = local->oplocal; +        frame = local->frame;          if (op_ret < 0) {                  local->err = op_errno ? op_errno : EINVAL;                  gf_log (this->name, GF_LOG_WARNING,                          "cannot hold inodelk on %s (gfid:%s) (%s)", -                        local->next_lock_on->path, -                        uuid_utoa (local->next_lock_on->inode->gfid), +                        oplocal->loc.path, uuid_utoa (oplocal->loc.inode->gfid),                          strerror (op_errno)); -                goto lock_err; +                goto err; + +                STACK_DESTROY (local->lk_frame->root); +                local->lk_frame = NULL;          }          GET_CONTRI_KEY (contri_key, oplocal->loc.parent->gfid, ret);          if (ret < 0) {                  local->err = errno ? errno : ENOMEM; -                goto quota_err; +                goto err;          }          /* getxattr requires uid and gid to be 0, @@ -1448,79 +1386,102 @@ marker_get_oldpath_contribution (call_frame_t *frame, void *cookie,          GF_UUID_ASSERT (oplocal->loc.gfid); -        STACK_WIND_COOKIE (frame, marker_get_newpath_contribution, +        STACK_WIND_COOKIE (frame, marker_do_rename,                             frame->cookie, FIRST_CHILD(this),                             FIRST_CHILD(this)->fops->getxattr,                             &oplocal->loc, contri_key, NULL); -        return 0; - -quota_err: -        marker_rename_release_oldp_lock (frame, NULL, this, 0, 0, NULL); -        return 0; - -lock_err: -        if ((local->next_lock_on == NULL) -            || (local->next_lock_on == &local->parent_loc)) { -                local->next_lock_on = NULL; -                marker_rename_release_oldp_lock (frame, NULL, this, 0, 0, NULL); -        } else { -                marker_rename_release_newp_lock (frame, NULL, this, 0, 0, NULL); -        }          return 0; -} - - -int32_t -marker_rename_inodelk_cbk (call_frame_t *frame, void *cookie, xlator_t *this, -                           int32_t op_ret, int32_t op_errno, dict_t *xdata) -{ -        marker_local_t  *local = NULL, *oplocal = NULL; -        loc_t           *loc   = NULL; -        struct gf_flock  lock  = {0, }; - -        local = frame->local; -        oplocal = local->oplocal; - -        if (op_ret < 0) { -                if (local->next_lock_on != &oplocal->parent_loc) { -                        loc = &oplocal->parent_loc; -                } else { -                        loc = &local->parent_loc; -                } - -                local->err = op_errno ? op_errno : EINVAL; -                gf_log (this->name, GF_LOG_WARNING, -                        "cannot hold inodelk on %s (gfid:%s) (%s)", -                        loc->path, uuid_utoa (loc->inode->gfid), -                        strerror (op_errno)); -                goto err; -        } - -        if (local->next_lock_on != NULL) { -                lock.l_len    = 0; -                lock.l_start  = 0; -                lock.l_type   = F_WRLCK; -                lock.l_whence = SEEK_SET; - -                STACK_WIND (frame, -                            marker_get_oldpath_contribution, -                            FIRST_CHILD(this), -                            FIRST_CHILD(this)->fops->inodelk, -                            this->name, local->next_lock_on, -                            F_SETLKW, &lock, NULL); -        } else { -                marker_get_oldpath_contribution (frame, 0, this, 0, 0, NULL); -        } - -        return 0; -  err: -        marker_rename_done (frame, NULL, this, 0, 0, NULL); -        return 0; -} - - +        marker_rename_unwind (frame, NULL, this, 0, 0, NULL); +        return 0; +} + + +/* For a marker_rename FOP, following is the algorithm used for Quota + * accounting. The use-case considered is: + * 1. rename (src, dst) + * 2. both src and dst exist + * 3. there are parallel operations on src and dst (lets say through fds + *    opened on them before rename was initiated). + * + * PS: We've not thought through whether this algo works in the presence of + *     hardlinks to src and/or dst. + * + * Algorithm: + * ========== + * + * 1) set inodelk on src-parent + *    As part of rename operation, parent can change for the file. + *    We need to remove contribution (both on disk xattr and in-memory one) + *    to src-parent (and its ancestors) and add the contribution to dst-parent + *    (and its ancestors). While we are doing these operations, contribution of + *    the file/directory shouldn't be changing as we want to be sure that + *      a) what we subtract from src-parent is exactly what we add to dst-parent + *      b) we should subtract from src-parent exactly what we contributed to + *         src-parent + *    So, We hold a lock on src-parent to block any parallel transcations on + *    src-inode (since thats the one which survives rename). + * + *    If there are any parallel transactions on dst-inode they keep succeeding + *    till the association of dst-inode with dst-parent is broken because of an + *    inode_rename after unwind of rename fop from marker. Only after unwind + *    (and hence inode_rename), we delete and subtract the contribution of + *    dst-inode to dst-parent. That way we are making sure we subtract exactly + *    what dst-inode contributed to dst-parent. + * + * 2) lookup contribution to src-parent on src-inode. + *    We need to save the contribution info for use at step-8. + * + * 3) wind rename + *    Perform rename on disk + * + * 4) remove xattr on src-loc + *    After rename, parent can change, so + *    need to remove xattrs storing contribution to src-parent. + * + * 5) remove contribution node corresponding to src-parent from the in-memory + *    list. + *    After rename, contri gfid can change and we have + *    also removed xattr from file. + *    We need to remove in-memory contribution node to prevent updations to + *    src-parent even after a successful rename + * + * 6) unwind rename + *    This will ensure that rename is done in the server + *    inode table. An inode_rename disassociates src-inode from src-parent and + *    associates it with dst-parent. It also disassociates dst-inode from + *    dst-parent. After inode_rename, inode_parent on src-inode will give + *    dst-parent and inode_parent on dst-inode will return NULL (assuming + *    dst-inode doesn't have any hardlinks). + * + * 7) release inodelk on src-parent + *    Lock on src-parent should be released only after + *    rename on disk, remove xattr and rename_unwind (and hence inode_rename) + *    operations. If lock is released before inode_rename, a parallel + *    transaction on src-inode can still update src-parent (as inode_parent on + *    src-inode can still return src-parent). This would make the + *    contribution from src-inode to src-parent stored in step-2 stale. + * + * 8) Initiate mq_reduce_parent_size_txn on src-parent to remove contribution + *    of src-inode to src-parent. We use the contribution stored in step-2. + *    Since, we had acquired the lock on src-parent all along step-2 through + *    inode_rename, we can be sure that a parallel transaction wouldn't have + *    added a delta to src-parent. + * + * 9) Initiate mq_reduce_parent_size_txn on dst-parent if dst-inode exists. + *    The size reduced from dst-parent and its ancestors is the + *    size stored as contribution to dst-parent in dst-inode. + *    If the destination file had existed, rename will unlink the + *    destination file as part of its operation. + *    We need to reduce the size on the dest parent similarly to + *    unlink. Since, we are initiating reduce-parent-size transaction after + *    inode_rename, we can be sure that a parallel transaction wouldn't add + *    delta to dst-parent while we are reducing the contribution of dst-inode + *    from its ancestors before rename. + * + * 10) create contribution xattr to dst-parent on src-inode. + */  int32_t  marker_rename (call_frame_t *frame, xlator_t *this, loc_t *oldloc,                 loc_t *newloc, dict_t *xdata) @@ -1530,7 +1491,6 @@ marker_rename (call_frame_t *frame, xlator_t *this, loc_t *oldloc,          marker_local_t *oplocal          = NULL;          marker_conf_t  *priv             = NULL;          struct gf_flock lock             = {0, }; -        loc_t          *lock_on          = NULL;          priv = this->private; @@ -1569,19 +1529,6 @@ marker_rename (call_frame_t *frame, xlator_t *this, loc_t *oldloc,          if (ret < 0)                  goto err; -        if ((newloc->inode != NULL) && (newloc->parent != oldloc->parent) -            && (gf_uuid_compare (newloc->parent->gfid, -                              oldloc->parent->gfid) < 0)) { -                lock_on = &local->parent_loc; -                local->next_lock_on = &oplocal->parent_loc; -        } else { -                lock_on = &oplocal->parent_loc; -                if ((newloc->inode != NULL) && (newloc->parent -                                                != oldloc->parent)) { -                        local->next_lock_on = &local->parent_loc; -                } -        } -          lock.l_len    = 0;          lock.l_start  = 0;          lock.l_type   = F_WRLCK; @@ -1589,14 +1536,22 @@ marker_rename (call_frame_t *frame, xlator_t *this, loc_t *oldloc,          local->xdata = dict_ref (xdata); -        if (is_lk_owner_null (&frame->root->lk_owner)) -                set_lk_owner_from_ptr (&frame->root->lk_owner, frame->root); +        local->frame = frame; +        local->lk_frame = create_frame (this, this->ctx->pool); +        if (local->lk_frame == NULL) +                goto err; + +        local->lk_frame->root->uid = 0; +        local->lk_frame->root->gid = 0; +        local->lk_frame->local = local; +        set_lk_owner_from_ptr (&local->lk_frame->root->lk_owner, +                               local->lk_frame->root); -        STACK_WIND (frame, -                    marker_rename_inodelk_cbk, +        STACK_WIND (local->lk_frame, +                    marker_get_oldpath_contribution,                      FIRST_CHILD(this),                      FIRST_CHILD(this)->fops->inodelk, -                    this->name, lock_on, +                    this->name, &oplocal->parent_loc,                      F_SETLKW, &lock, NULL);          return 0; diff --git a/xlators/features/marker/src/marker.h b/xlators/features/marker/src/marker.h index f198859062e..12a51bc1484 100644 --- a/xlators/features/marker/src/marker.h +++ b/xlators/features/marker/src/marker.h @@ -97,7 +97,6 @@ struct marker_local{          pid_t           pid;          loc_t           loc;          loc_t           parent_loc; -        loc_t          *next_lock_on;          uid_t           uid;          gid_t           gid;          int32_t         ref; @@ -106,7 +105,8 @@ struct marker_local{          mode_t          mode;          int32_t         err;          call_stub_t    *stub; -        int64_t         contribution; +        call_frame_t   *lk_frame; +        quota_meta_t    contribution;          struct marker_local *oplocal;          /* marker quota specific */  | 
