diff options
| author | Raghavendra G <rgowdapp@redhat.com> | 2013-12-02 09:32:53 +0530 | 
|---|---|---|
| committer | Vijay Bellur <vbellur@redhat.com> | 2014-01-25 07:51:10 -0800 | 
| commit | 49eb5ea29cf8d6eab0c5f60d70fe8d6a0113b61e (patch) | |
| tree | a95529ae5f7cbfffe0e3a0876f2397c6d85091f7 /xlators/features/quota | |
| parent | 9a34ea6a0a95154013676cabf8528b2679fb36c4 (diff) | |
features/quota: remove in-memory accounting of files in enforcer
Accounting was done in enforcer (though marker is the ultimate source
of truth) to offset cached directory size becoming stale. However,
with enforcer being moved to brick we can no longer maintain correct
cluster wide size for a directory. Hence removing accounting code from
enforcer.
Change-Id: I5ea94234da4da85ed5f5ced1354d8de3454b3fcb
BUG: 969461
Signed-off-by: Raghavendra G <rgowdapp@redhat.com>
Reviewed-on: http://review.gluster.org/6434
Tested-by: Gluster Build System <jenkins@build.gluster.com>
Reviewed-by: Vijay Bellur <vbellur@redhat.com>
Diffstat (limited to 'xlators/features/quota')
| -rw-r--r-- | xlators/features/quota/src/quota.c | 385 | ||||
| -rw-r--r-- | xlators/features/quota/src/quota.h | 46 | 
2 files changed, 210 insertions, 221 deletions
| diff --git a/xlators/features/quota/src/quota.c b/xlators/features/quota/src/quota.c index 2c7e2984938..0af514cef0c 100644 --- a/xlators/features/quota/src/quota.c +++ b/xlators/features/quota/src/quota.c @@ -215,6 +215,7 @@ __quota_dentry_new (quota_inode_ctx_t *ctx, char *name, uuid_t par)          dentry->name = gf_strdup (name);          if (dentry->name == NULL) {                  GF_FREE (dentry); +                dentry = NULL;                  goto err;          } @@ -378,31 +379,51 @@ quota_timeout (struct timeval *tv, int32_t timeout)          return timed_out;  } +inline void +quota_add_parent (quota_dentry_t *dentry, struct list_head *list) +{ +        quota_dentry_t *entry = NULL; +        gf_boolean_t    found = _gf_false; + +        if ((dentry == NULL) || (list == NULL)) { +                goto out; +        } + +        list_for_each_entry (entry, list, next) { +                if (uuid_compare (dentry->par, entry->par) == 0) { +                        found = _gf_true; +                        goto out; +                } +        } + +        list_add_tail (&dentry->next, list); + +out: +        return; +} +  int32_t  quota_build_ancestry_cbk (call_frame_t *frame, void *cookie, xlator_t *this,                            int32_t op_ret, int32_t op_errno,                            gf_dirent_t *entries, dict_t *xdata)  { -        inode_t           *parent             = NULL; -        gf_dirent_t       *entry              = NULL; -        loc_t              loc                = {0, }; -        quota_dentry_t    *dentry             = NULL, *tmp = NULL; -        quota_inode_ctx_t *ctx                = NULL; -        struct list_head   parents            = {0, }; -        quota_local_t     *local              = NULL; -        call_frame_t      *continuation_frame = NULL; +        inode_t           *parent  = NULL, *tmp_parent = NULL; +        gf_dirent_t       *entry   = NULL; +        loc_t              loc     = {0, }; +        quota_dentry_t    *dentry  = NULL, *tmp = NULL; +        quota_inode_ctx_t *ctx     = NULL; +        struct list_head   parents = {0, }; +        quota_local_t     *local   = NULL;          INIT_LIST_HEAD (&parents); -        continuation_frame = frame->local; +        local = frame->local;          frame->local = NULL; -        local = continuation_frame->local; -          if (op_ret < 0)                  goto err; -        parent = inode_parent (local->validate_loc.inode, 0, NULL); +        parent = inode_parent (local->loc.inode, 0, NULL);          if (parent == NULL) {                  gf_log (this->name, GF_LOG_WARNING, "parent is NULL");                  op_errno = EINVAL; @@ -423,61 +444,77 @@ quota_build_ancestry_cbk (call_frame_t *frame, void *cookie, xlator_t *this,                                   * pointer and continue                                   */ -                                parent = NULL; +                                tmp_parent = NULL;                          }                          uuid_copy (loc.gfid, entry->d_stat.ia_gfid);                          loc.inode = inode_ref (entry->inode); -                        loc.parent = inode_ref (parent); +                        loc.parent = inode_ref (tmp_parent);                          loc.name = entry->d_name;                          quota_fill_inodectx (this, entry->inode, entry->dict,                                               &loc, &entry->d_stat, &op_errno); -                        parent = entry->inode; +                        tmp_parent = entry->inode;                          loc_wipe (&loc);                  }          } -        quota_inode_ctx_get (local->validate_loc.inode, this, &ctx, 0); - -        local->link_count = 0; +        quota_inode_ctx_get (local->loc.inode, this, &ctx, 0);          if (ctx != NULL) {                  LOCK (&ctx->lock);                  {                          list_for_each_entry (dentry, &ctx->parents, next) { +                                /* we built ancestry for a non-directory */                                  tmp = __quota_dentry_new (NULL, dentry->name,                                                            dentry->par); -                                list_add_tail (&tmp->next, &parents); -                                local->link_count++; +                                quota_add_parent (tmp, &parents); + +                                if (list_empty (&tmp->next)) { +                                        __quota_dentry_free (tmp); +                                        tmp = NULL; +                                }                          }                  }                  UNLOCK (&ctx->lock);          } -        if (local->link_count != 0) { -                list_for_each_entry_safe (dentry, tmp, &parents, next) { -                        quota_check_limit (continuation_frame, -                                           local->validate_loc.inode, -                                           this, dentry->name, dentry->par); -                        __quota_dentry_free (dentry); +        if (list_empty (&parents)) { +                /* we built ancestry for a directory */ +                list_for_each_entry (entry, &entries->list, list) { +                        if (entry->inode == local->loc.inode) +                                break;                  } -        } else { -                local->link_count = 1; -                quota_check_limit (continuation_frame, parent, this, NULL, -                                   NULL); + +                GF_ASSERT (&entry->list != &entries->list); + +                tmp = __quota_dentry_new (NULL, entry->d_name, parent->gfid); +                quota_add_parent (tmp, &parents);          } -        STACK_DESTROY (frame->root); -        return 0; +        local->ancestry_cbk (&parents, local->loc.inode, 0, 0, +                             local->ancestry_data); +        goto cleanup;  err: +        local->ancestry_cbk (NULL, NULL, -1, op_errno, local->ancestry_data); + +cleanup:          STACK_DESTROY (frame->root); +        quota_local_cleanup (this, local); + +        if (parent != NULL) { +                inode_unref (parent); +                parent = NULL; +        } + +        list_for_each_entry_safe (dentry, tmp, &parents, next) { +                __quota_dentry_free (dentry); +        } -        quota_handle_validate_error (local, -1, op_errno);          return 0;  } @@ -486,85 +523,91 @@ quota_build_ancestry_open_cbk (call_frame_t *frame, void *cookie,                                 xlator_t *this, int32_t op_ret, int32_t op_errno,                                 fd_t *fd, dict_t *xdata)  { -        int            ret                = -1; -        dict_t        *xdata_req          = NULL; -        quota_local_t *local              = NULL; -        call_frame_t  *continuation_frame = NULL; +        dict_t        *xdata_req = NULL; +        quota_local_t *local     = NULL; + +        if (op_ret < 0) { +                goto err; +        }          xdata_req = dict_new ();          if (xdata_req == NULL) { -                ret = -ENOMEM; +                op_ret = -ENOMEM;                  goto err;          } -        ret = dict_set_int8 (xdata_req, QUOTA_LIMIT_KEY, 1); -        if (ret < 0) +        op_ret = dict_set_int8 (xdata_req, QUOTA_LIMIT_KEY, 1); +        if (op_ret < 0) { +                op_errno = -op_ret;                  goto err; +        } -        ret = dict_set_int8 (xdata_req, GET_ANCESTRY_DENTRY_KEY, 1); -        if (ret < 0) +        op_ret = dict_set_int8 (xdata_req, GET_ANCESTRY_DENTRY_KEY, 1); +        if (op_ret < 0) { +                op_errno = -op_ret;                  goto err; +        }          /* This would ask posix layer to construct dentry chain till root */          STACK_WIND (frame, quota_build_ancestry_cbk, FIRST_CHILD(this),                      FIRST_CHILD(this)->fops->readdirp, fd, 0, 0, xdata_req); -        ret = 0; +        op_ret = 0;  err:          fd_unref (fd);          dict_unref (xdata_req); -        if (ret < 0) { -                continuation_frame = frame->local; +        if (op_ret < 0) { +                local = frame->local;                  frame->local = NULL; +                local->ancestry_cbk (NULL, NULL, -1, op_errno, +                                     local->ancestry_data); +                quota_local_cleanup (this, local);                  STACK_DESTROY (frame->root); - -                local = continuation_frame->local; -                quota_handle_validate_error (local, -1, op_errno);          } -        return ret; +        return 0;  }  int -quota_build_ancestry (call_frame_t *frame, inode_t *inode, xlator_t *this) +quota_build_ancestry (inode_t *inode, quota_ancestry_built_t ancestry_cbk, +                      void *data)  {          loc_t          loc       = {0, };          fd_t          *fd        = NULL;          quota_local_t *local     = NULL;          call_frame_t  *new_frame = NULL; -        int            ret       = -1; +        int            op_errno  = EINVAL; +        xlator_t      *this      = NULL; + +        this = THIS;          loc.inode = inode_ref (inode);          uuid_copy (loc.gfid, inode->gfid); -        gf_log (this->name, GF_LOG_WARNING, "building ancestry"); - -        local = frame->local; - -        LOCK (&local->lock); -        { -                loc_wipe (&local->validate_loc); +        fd = fd_create (inode, 0); -                ret = quota_inode_loc_fill (inode, &local->validate_loc); -                if (ret < 0) { -                        gf_log (this->name, GF_LOG_WARNING, -                                "cannot fill loc for inode (gfid:%s), hence " -                                "aborting quota-checks and continuing with fop", -                                uuid_utoa (inode->gfid)); -                } +        new_frame = create_frame (this, this->ctx->pool); +        if (new_frame == NULL) { +                op_errno = ENOMEM; +                goto err;          } -        UNLOCK (&local->lock); -        fd = fd_create (inode, 0); - -        new_frame = copy_frame (frame);          new_frame->root->uid = new_frame->root->gid = 0; -        new_frame->local = frame; +        local = quota_local_new (); +        if (local == NULL) { +                op_errno = ENOMEM; +                goto err; +        } + +        new_frame->local = local; +        local->ancestry_cbk = ancestry_cbk; +        local->ancestry_data = data; +        local->loc.inode = inode_ref (inode);          if (IA_ISDIR (inode->ia_type)) {                  STACK_WIND (new_frame, quota_build_ancestry_open_cbk, @@ -579,7 +622,25 @@ quota_build_ancestry (call_frame_t *frame, inode_t *inode, xlator_t *this)          }          loc_wipe (&loc); +        return 0; +err: +        ancestry_cbk (NULL, NULL, -1, op_errno, data); + +        fd_unref (fd); + +        local = new_frame->local; +        new_frame->local = NULL; + +        if (local != NULL) { +                quota_local_cleanup (this, local); +        } + +        if (new_frame != NULL) { +                STACK_DESTROY (new_frame->root); +        } + +        loc_wipe (&loc);          return 0;  } @@ -646,6 +707,56 @@ err:          return ret;  } +void +quota_check_limit_continuation (struct list_head *parents, inode_t *inode, +                                int32_t op_ret, int32_t op_errno, void *data) +{ +        call_frame_t   *frame        = NULL; +        xlator_t       *this         = NULL; +        quota_local_t  *local        = NULL; +        quota_dentry_t *entry        = NULL; +        inode_t        *parent       = NULL; +        int             parent_count = 0; + +        frame = data; +        local = frame->local; +        this = THIS; + +        if ((op_ret < 0) || list_empty (parents)) { +                if (list_empty (parents)) { +                        gf_log (this->name, GF_LOG_WARNING, +                                "Couldn't build ancestry for inode (gfid:%s). " +                                "Without knowing ancestors till root, quota " +                                "cannot be enforced. " +                                "Hence, failing fop with EIO", +                                uuid_utoa (inode->gfid)); +                        op_errno = EIO; +                } + +                quota_handle_validate_error (local, -1, op_errno); +                goto out; +        } + +        list_for_each_entry (entry, parents, next) { +                parent_count++; +        } + +        LOCK (&local->lock); +        { +                local->link_count += (parent_count - 1); +        } +        UNLOCK (&local->lock); + +        list_for_each_entry (entry, parents, next) { +                parent = inode_find (inode->table, entry->par); + +                quota_check_limit (frame, parent, this, NULL, NULL); +        } + +out: +        return; +} +  int32_t  quota_check_limit (call_frame_t *frame, inode_t *inode, xlator_t *this,                     char *name, uuid_t par) @@ -789,7 +900,9 @@ quota_check_limit (call_frame_t *frame, inode_t *inode, xlator_t *this,                  }                  if (parent == NULL) { -                        ret = quota_build_ancestry (frame, _inode, this); +                        ret = quota_build_ancestry (_inode, +                                                    quota_check_limit_continuation, +                                                    frame);                          if (ret < 0) {                                  op_errno = -ret;                                  goto err; @@ -1017,77 +1130,6 @@ off:          return 0;  } - -void -quota_update_size (xlator_t *this, inode_t *inode, char *name, uuid_t par, -                   int64_t delta) -{ -        inode_t           *_inode    = NULL; -        inode_t           *parent    = NULL; -        uint64_t           value     = 0; -        quota_inode_ctx_t *ctx       = NULL; -        uuid_t             trav_uuid = {0,}; - -        GF_VALIDATE_OR_GOTO ("quota", this, out); -        GF_VALIDATE_OR_GOTO (this->name, inode, out); - -        inode_ctx_get (inode, this, &value); -        ctx = (quota_inode_ctx_t *)(unsigned long)value; - -        _inode = inode_ref (inode); - -        if ( par != NULL ) { -                uuid_copy (trav_uuid, par); -        } - -        do { -                if ((ctx != NULL) && (ctx->hard_lim >= 0)) { -                        quota_log_usage (this, ctx, _inode, delta); -                        LOCK (&ctx->lock); -                        { -                                ctx->size += delta; -                                if (ctx->size < 0) -                                        ctx->size = 0; -                        } -                        UNLOCK (&ctx->lock); -                } - -                if (__is_root_gfid (_inode->gfid)) { -                        break; -                } - -                parent = inode_parent (_inode, trav_uuid, name); -                if (parent == NULL) { -                        /* TODO: build ancestry and continue updating size */ -                        gf_log (this->name, GF_LOG_DEBUG, -                                "cannot find parent for inode (gfid:%s), hence " -                                "aborting size updation of parents", -                                uuid_utoa (_inode->gfid)); -                } - -                if (name != NULL) { -                        name = NULL; -                        uuid_clear (trav_uuid); -                } - -                inode_unref (_inode); -                _inode = parent; - -                if (_inode == NULL) { -                        break; -                } - -                value = 0; -                ctx = NULL; -                inode_ctx_get (_inode, this, &value); -                ctx = (quota_inode_ctx_t *)(unsigned long)value; -        } while (1); - -out: -        return; -} - -  int32_t  quota_writev_cbk (call_frame_t *frame, void *cookie, xlator_t *this,                    int32_t op_ret, int32_t op_errno, struct iatt *prebuf, @@ -1097,9 +1139,6 @@ quota_writev_cbk (call_frame_t *frame, void *cookie, xlator_t *this,          uint64_t                 ctx_int        = 0;          quota_inode_ctx_t       *ctx            = NULL;          quota_local_t           *local          = NULL; -        quota_dentry_t          *dentry         = NULL, *tmp = NULL; -        int64_t                  delta          = 0; -        struct list_head         head           = {0, };          local = frame->local; @@ -1107,8 +1146,6 @@ quota_writev_cbk (call_frame_t *frame, void *cookie, xlator_t *this,                  goto out;          } -        INIT_LIST_HEAD (&head); -          ret = inode_ctx_get (local->loc.inode, this, &ctx_int);          if (ret) {                  gf_log (this->name, GF_LOG_WARNING, @@ -1128,25 +1165,9 @@ quota_writev_cbk (call_frame_t *frame, void *cookie, xlator_t *this,          LOCK (&ctx->lock);          {                  ctx->buf = *postbuf; - -                list_for_each_entry (dentry, &ctx->parents, next) { -                        tmp = __quota_dentry_new (NULL, dentry->name, -                                                  dentry->par); -                        list_add_tail (&tmp->next, &head); -                } -          }          UNLOCK (&ctx->lock); -        if (postbuf->ia_blocks != prebuf->ia_blocks) -                delta = local->delta; - -        list_for_each_entry_safe (dentry, tmp, &head, next) { -                quota_update_size (this, local->loc.inode, dentry->name, -                                   dentry->par, delta); -                __quota_dentry_free (dentry); -        } -  out:          QUOTA_STACK_UNWIND (writev, frame, op_ret, op_errno, prebuf, postbuf,                              xdata); @@ -1583,12 +1604,6 @@ quota_unlink_cbk (call_frame_t *frame, void *cookie, xlator_t *this,                  goto out;          } -        if (!local->skip_check) -                quota_update_size (this, local->loc.inode, -                                   (char *)local->loc.name, -                                   local->loc.parent->gfid, -                                   (-(ctx->buf.ia_blocks * 512))); -          LOCK (&ctx->lock);          {                  list_for_each_entry (dentry, &ctx->parents, next) { @@ -1680,9 +1695,6 @@ quota_link_cbk (call_frame_t *frame, void *cookie, xlator_t *this,          if (local->skip_check)                  goto out; -        quota_update_size (this, local->loc.parent, NULL, NULL, -                           (buf->ia_blocks * 512)); -          ret = quota_inode_ctx_get (inode, this, &ctx, 0);          if ((ret == -1) || (ctx == NULL)) {                  gf_log (this->name, GF_LOG_DEBUG, "quota context is NULL on " @@ -1867,13 +1879,6 @@ quota_rename_cbk (call_frame_t *frame, void *cookie, xlator_t *this,                  size = buf->ia_blocks * 512;          } -        if (local->oldloc.parent != local->newloc.parent) { -                quota_update_size (this, local->oldloc.parent, NULL, NULL, -                                   (-size)); -                quota_update_size (this, local->newloc.parent, NULL, NULL, -                                   size); -        } -          if (!QUOTA_REG_OR_LNK_FILE (local->oldloc.inode->ia_type)) {                  goto out;          } @@ -2131,7 +2136,6 @@ quota_symlink_cbk (call_frame_t *frame, void *cookie, xlator_t *this,                     struct iatt *buf, struct iatt *preparent,                     struct iatt *postparent, dict_t *xdata)  { -        int64_t            size   = 0;          quota_local_t     *local  = NULL;          quota_inode_ctx_t *ctx    = NULL;          quota_dentry_t    *dentry = NULL; @@ -2141,9 +2145,6 @@ quota_symlink_cbk (call_frame_t *frame, void *cookie, xlator_t *this,          }          local = frame->local; -        size = buf->ia_blocks * 512; - -        quota_update_size (this, local->loc.parent, NULL, NULL, size);          quota_inode_ctx_get (local->loc.inode, this, &ctx, 1);          if (ctx == NULL) { @@ -2275,7 +2276,6 @@ quota_truncate_cbk (call_frame_t *frame, void *cookie, xlator_t *this,  {          quota_local_t     *local = NULL;          quota_inode_ctx_t *ctx   = NULL; -        int64_t            delta = 0;          if (op_ret < 0) {                  goto out; @@ -2287,10 +2287,6 @@ quota_truncate_cbk (call_frame_t *frame, void *cookie, xlator_t *this,                  goto out;          } -        delta = (postbuf->ia_blocks - prebuf->ia_blocks) * 512; - -        quota_update_size (this, local->loc.inode, NULL, NULL, delta); -          quota_inode_ctx_get (local->loc.inode, this, &ctx, 0);          if (ctx == NULL) {                  gf_log (this->name, GF_LOG_DEBUG, "quota context is NULL on " @@ -2362,7 +2358,6 @@ quota_ftruncate_cbk (call_frame_t *frame, void *cookie, xlator_t *this,  {          quota_local_t     *local = NULL;          quota_inode_ctx_t *ctx   = NULL; -        int64_t            delta = 0;          if (op_ret < 0) {                  goto out; @@ -2374,10 +2369,6 @@ quota_ftruncate_cbk (call_frame_t *frame, void *cookie, xlator_t *this,                  goto out;          } -        delta = (postbuf->ia_blocks - prebuf->ia_blocks) * 512; - -        quota_update_size (this, local->loc.inode, NULL, NULL, delta); -          quota_inode_ctx_get (local->loc.inode, this, &ctx, 0);          if (ctx == NULL) {                  gf_log (this->name, GF_LOG_DEBUG, "quota context is NULL on " @@ -3849,8 +3840,6 @@ quota_fallocate_cbk(call_frame_t *frame, void *cookie, xlator_t *this,          uint64_t                 ctx_int        = 0;          quota_inode_ctx_t       *ctx            = NULL;          quota_local_t           *local          = NULL; -        quota_dentry_t          *dentry         = NULL; -        int64_t                  delta          = 0;          local = frame->local; @@ -3880,12 +3869,6 @@ quota_fallocate_cbk(call_frame_t *frame, void *cookie, xlator_t *this,          }          UNLOCK (&ctx->lock); -        list_for_each_entry (dentry, &ctx->parents, next) { -                delta = (postbuf->ia_blocks - prebuf->ia_blocks) * 512; -                quota_update_size (this, local->loc.inode, -                                   dentry->name, dentry->par, delta); -        } -  out:          QUOTA_STACK_UNWIND (fallocate, frame, op_ret, op_errno, prebuf, postbuf,                              xdata); diff --git a/xlators/features/quota/src/quota.h b/xlators/features/quota/src/quota.h index 02bc0d8b6b4..84c3257feec 100644 --- a/xlators/features/quota/src/quota.h +++ b/xlators/features/quota/src/quota.h @@ -160,28 +160,34 @@ struct quota_limit {  } __attribute__ ((packed));  typedef struct quota_limit quota_limit_t; +typedef void +(*quota_ancestry_built_t) (struct list_head *parents, inode_t *inode, +                           int32_t op_ret, int32_t op_errno, void *data); +  struct quota_local { -        gf_lock_t           lock; -        uint32_t            validate_count; -        uint32_t            link_count; -        loc_t               loc; -        loc_t               oldloc; -        loc_t               newloc; -        loc_t               validate_loc; -        int64_t             delta; -        int32_t             op_ret; -        int32_t             op_errno; -        int64_t             size; -        gf_boolean_t        skip_check; -        char                just_validated; -        fop_lookup_cbk_t    validate_cbk; -        inode_t            *inode; -        call_stub_t        *stub; -        struct iobref      *iobref; -        quota_limit_t       limit; -        int64_t             space_available; +        gf_lock_t               lock; +        uint32_t                validate_count; +        uint32_t                link_count; +        loc_t                   loc; +        loc_t                   oldloc; +        loc_t                   newloc; +        loc_t                   validate_loc; +        int64_t                 delta; +        int32_t                 op_ret; +        int32_t                 op_errno; +        int64_t                 size; +        gf_boolean_t            skip_check; +        char                    just_validated; +        fop_lookup_cbk_t        validate_cbk; +        inode_t                *inode; +        call_stub_t            *stub; +        struct iobref          *iobref; +        quota_limit_t           limit; +        int64_t                 space_available; +        quota_ancestry_built_t  ancestry_cbk; +        void                   *ancestry_data;  }; -typedef struct quota_local  quota_local_t; +typedef struct quota_local      quota_local_t;  struct quota_priv {          uint32_t                soft_timeout; | 
