summaryrefslogtreecommitdiffstats
path: root/xlators/features
diff options
context:
space:
mode:
authorRaghavendra G <rgowdapp@redhat.com>2013-12-02 09:32:53 +0530
committerVijay Bellur <vbellur@redhat.com>2014-01-28 08:47:26 -0800
commite0c07e6474956bc90449f36e4cc953dfe35a6fa4 (patch)
treec51b0860f19d36c1c1695cdf1f4bf741d219ec77 /xlators/features
parent0483c68e0d05f1285baff22609f2ec6be65a5306 (diff)
features/quota: remove in-memory accounting of files in enforcer
Accounting was done in enforcer (though marker is the ultimate source of truth) to offset cached directory size becoming stale. However, with enforcer being moved to brick we can no longer maintain correct cluster wide size for a directory. Hence removing accounting code from enforcer. Change-Id: I5ea94234da4da85ed5f5ced1354d8de3454b3fcb BUG: 969461 Signed-off-by: Raghavendra G <rgowdapp@redhat.com> Reviewed-on: http://review.gluster.org/6434 Tested-by: Gluster Build System <jenkins@build.gluster.com> Reviewed-by: Vijay Bellur <vbellur@redhat.com> Reviewed-on: http://review.gluster.org/6818
Diffstat (limited to 'xlators/features')
-rw-r--r--xlators/features/quota/src/quota.c385
-rw-r--r--xlators/features/quota/src/quota.h46
2 files changed, 210 insertions, 221 deletions
diff --git a/xlators/features/quota/src/quota.c b/xlators/features/quota/src/quota.c
index 594c54d74..33a8d38bd 100644
--- a/xlators/features/quota/src/quota.c
+++ b/xlators/features/quota/src/quota.c
@@ -215,6 +215,7 @@ __quota_dentry_new (quota_inode_ctx_t *ctx, char *name, uuid_t par)
dentry->name = gf_strdup (name);
if (dentry->name == NULL) {
GF_FREE (dentry);
+ dentry = NULL;
goto err;
}
@@ -378,31 +379,51 @@ quota_timeout (struct timeval *tv, int32_t timeout)
return timed_out;
}
+inline void
+quota_add_parent (quota_dentry_t *dentry, struct list_head *list)
+{
+ quota_dentry_t *entry = NULL;
+ gf_boolean_t found = _gf_false;
+
+ if ((dentry == NULL) || (list == NULL)) {
+ goto out;
+ }
+
+ list_for_each_entry (entry, list, next) {
+ if (uuid_compare (dentry->par, entry->par) == 0) {
+ found = _gf_true;
+ goto out;
+ }
+ }
+
+ list_add_tail (&dentry->next, list);
+
+out:
+ return;
+}
+
int32_t
quota_build_ancestry_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
int32_t op_ret, int32_t op_errno,
gf_dirent_t *entries, dict_t *xdata)
{
- inode_t *parent = NULL;
- gf_dirent_t *entry = NULL;
- loc_t loc = {0, };
- quota_dentry_t *dentry = NULL, *tmp = NULL;
- quota_inode_ctx_t *ctx = NULL;
- struct list_head parents = {0, };
- quota_local_t *local = NULL;
- call_frame_t *continuation_frame = NULL;
+ inode_t *parent = NULL, *tmp_parent = NULL;
+ gf_dirent_t *entry = NULL;
+ loc_t loc = {0, };
+ quota_dentry_t *dentry = NULL, *tmp = NULL;
+ quota_inode_ctx_t *ctx = NULL;
+ struct list_head parents = {0, };
+ quota_local_t *local = NULL;
INIT_LIST_HEAD (&parents);
- continuation_frame = frame->local;
+ local = frame->local;
frame->local = NULL;
- local = continuation_frame->local;
-
if (op_ret < 0)
goto err;
- parent = inode_parent (local->validate_loc.inode, 0, NULL);
+ parent = inode_parent (local->loc.inode, 0, NULL);
if (parent == NULL) {
gf_log (this->name, GF_LOG_WARNING, "parent is NULL");
op_errno = EINVAL;
@@ -423,61 +444,77 @@ quota_build_ancestry_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
* pointer and continue
*/
- parent = NULL;
+ tmp_parent = NULL;
}
uuid_copy (loc.gfid, entry->d_stat.ia_gfid);
loc.inode = inode_ref (entry->inode);
- loc.parent = inode_ref (parent);
+ loc.parent = inode_ref (tmp_parent);
loc.name = entry->d_name;
quota_fill_inodectx (this, entry->inode, entry->dict,
&loc, &entry->d_stat, &op_errno);
- parent = entry->inode;
+ tmp_parent = entry->inode;
loc_wipe (&loc);
}
}
- quota_inode_ctx_get (local->validate_loc.inode, this, &ctx, 0);
-
- local->link_count = 0;
+ quota_inode_ctx_get (local->loc.inode, this, &ctx, 0);
if (ctx != NULL) {
LOCK (&ctx->lock);
{
list_for_each_entry (dentry, &ctx->parents, next) {
+ /* we built ancestry for a non-directory */
tmp = __quota_dentry_new (NULL, dentry->name,
dentry->par);
- list_add_tail (&tmp->next, &parents);
- local->link_count++;
+ quota_add_parent (tmp, &parents);
+
+ if (list_empty (&tmp->next)) {
+ __quota_dentry_free (tmp);
+ tmp = NULL;
+ }
}
}
UNLOCK (&ctx->lock);
}
- if (local->link_count != 0) {
- list_for_each_entry_safe (dentry, tmp, &parents, next) {
- quota_check_limit (continuation_frame,
- local->validate_loc.inode,
- this, dentry->name, dentry->par);
- __quota_dentry_free (dentry);
+ if (list_empty (&parents)) {
+ /* we built ancestry for a directory */
+ list_for_each_entry (entry, &entries->list, list) {
+ if (entry->inode == local->loc.inode)
+ break;
}
- } else {
- local->link_count = 1;
- quota_check_limit (continuation_frame, parent, this, NULL,
- NULL);
+
+ GF_ASSERT (&entry->list != &entries->list);
+
+ tmp = __quota_dentry_new (NULL, entry->d_name, parent->gfid);
+ quota_add_parent (tmp, &parents);
}
- STACK_DESTROY (frame->root);
- return 0;
+ local->ancestry_cbk (&parents, local->loc.inode, 0, 0,
+ local->ancestry_data);
+ goto cleanup;
err:
+ local->ancestry_cbk (NULL, NULL, -1, op_errno, local->ancestry_data);
+
+cleanup:
STACK_DESTROY (frame->root);
+ quota_local_cleanup (this, local);
+
+ if (parent != NULL) {
+ inode_unref (parent);
+ parent = NULL;
+ }
+
+ list_for_each_entry_safe (dentry, tmp, &parents, next) {
+ __quota_dentry_free (dentry);
+ }
- quota_handle_validate_error (local, -1, op_errno);
return 0;
}
@@ -486,85 +523,91 @@ quota_build_ancestry_open_cbk (call_frame_t *frame, void *cookie,
xlator_t *this, int32_t op_ret, int32_t op_errno,
fd_t *fd, dict_t *xdata)
{
- int ret = -1;
- dict_t *xdata_req = NULL;
- quota_local_t *local = NULL;
- call_frame_t *continuation_frame = NULL;
+ dict_t *xdata_req = NULL;
+ quota_local_t *local = NULL;
+
+ if (op_ret < 0) {
+ goto err;
+ }
xdata_req = dict_new ();
if (xdata_req == NULL) {
- ret = -ENOMEM;
+ op_ret = -ENOMEM;
goto err;
}
- ret = dict_set_int8 (xdata_req, QUOTA_LIMIT_KEY, 1);
- if (ret < 0)
+ op_ret = dict_set_int8 (xdata_req, QUOTA_LIMIT_KEY, 1);
+ if (op_ret < 0) {
+ op_errno = -op_ret;
goto err;
+ }
- ret = dict_set_int8 (xdata_req, GET_ANCESTRY_DENTRY_KEY, 1);
- if (ret < 0)
+ op_ret = dict_set_int8 (xdata_req, GET_ANCESTRY_DENTRY_KEY, 1);
+ if (op_ret < 0) {
+ op_errno = -op_ret;
goto err;
+ }
/* This would ask posix layer to construct dentry chain till root */
STACK_WIND (frame, quota_build_ancestry_cbk, FIRST_CHILD(this),
FIRST_CHILD(this)->fops->readdirp, fd, 0, 0, xdata_req);
- ret = 0;
+ op_ret = 0;
err:
fd_unref (fd);
dict_unref (xdata_req);
- if (ret < 0) {
- continuation_frame = frame->local;
+ if (op_ret < 0) {
+ local = frame->local;
frame->local = NULL;
+ local->ancestry_cbk (NULL, NULL, -1, op_errno,
+ local->ancestry_data);
+ quota_local_cleanup (this, local);
STACK_DESTROY (frame->root);
-
- local = continuation_frame->local;
- quota_handle_validate_error (local, -1, op_errno);
}
- return ret;
+ return 0;
}
int
-quota_build_ancestry (call_frame_t *frame, inode_t *inode, xlator_t *this)
+quota_build_ancestry (inode_t *inode, quota_ancestry_built_t ancestry_cbk,
+ void *data)
{
loc_t loc = {0, };
fd_t *fd = NULL;
quota_local_t *local = NULL;
call_frame_t *new_frame = NULL;
- int ret = -1;
+ int op_errno = EINVAL;
+ xlator_t *this = NULL;
+
+ this = THIS;
loc.inode = inode_ref (inode);
uuid_copy (loc.gfid, inode->gfid);
- gf_log (this->name, GF_LOG_WARNING, "building ancestry");
-
- local = frame->local;
-
- LOCK (&local->lock);
- {
- loc_wipe (&local->validate_loc);
+ fd = fd_create (inode, 0);
- ret = quota_inode_loc_fill (inode, &local->validate_loc);
- if (ret < 0) {
- gf_log (this->name, GF_LOG_WARNING,
- "cannot fill loc for inode (gfid:%s), hence "
- "aborting quota-checks and continuing with fop",
- uuid_utoa (inode->gfid));
- }
+ new_frame = create_frame (this, this->ctx->pool);
+ if (new_frame == NULL) {
+ op_errno = ENOMEM;
+ goto err;
}
- UNLOCK (&local->lock);
- fd = fd_create (inode, 0);
-
- new_frame = copy_frame (frame);
new_frame->root->uid = new_frame->root->gid = 0;
- new_frame->local = frame;
+ local = quota_local_new ();
+ if (local == NULL) {
+ op_errno = ENOMEM;
+ goto err;
+ }
+
+ new_frame->local = local;
+ local->ancestry_cbk = ancestry_cbk;
+ local->ancestry_data = data;
+ local->loc.inode = inode_ref (inode);
if (IA_ISDIR (inode->ia_type)) {
STACK_WIND (new_frame, quota_build_ancestry_open_cbk,
@@ -579,7 +622,25 @@ quota_build_ancestry (call_frame_t *frame, inode_t *inode, xlator_t *this)
}
loc_wipe (&loc);
+ return 0;
+err:
+ ancestry_cbk (NULL, NULL, -1, op_errno, data);
+
+ fd_unref (fd);
+
+ local = new_frame->local;
+ new_frame->local = NULL;
+
+ if (local != NULL) {
+ quota_local_cleanup (this, local);
+ }
+
+ if (new_frame != NULL) {
+ STACK_DESTROY (new_frame->root);
+ }
+
+ loc_wipe (&loc);
return 0;
}
@@ -646,6 +707,56 @@ err:
return ret;
}
+void
+quota_check_limit_continuation (struct list_head *parents, inode_t *inode,
+ int32_t op_ret, int32_t op_errno, void *data)
+{
+ call_frame_t *frame = NULL;
+ xlator_t *this = NULL;
+ quota_local_t *local = NULL;
+ quota_dentry_t *entry = NULL;
+ inode_t *parent = NULL;
+ int parent_count = 0;
+
+ frame = data;
+ local = frame->local;
+ this = THIS;
+
+ if ((op_ret < 0) || list_empty (parents)) {
+ if (list_empty (parents)) {
+ gf_log (this->name, GF_LOG_WARNING,
+ "Couldn't build ancestry for inode (gfid:%s). "
+ "Without knowing ancestors till root, quota "
+ "cannot be enforced. "
+ "Hence, failing fop with EIO",
+ uuid_utoa (inode->gfid));
+ op_errno = EIO;
+ }
+
+ quota_handle_validate_error (local, -1, op_errno);
+ goto out;
+ }
+
+ list_for_each_entry (entry, parents, next) {
+ parent_count++;
+ }
+
+ LOCK (&local->lock);
+ {
+ local->link_count += (parent_count - 1);
+ }
+ UNLOCK (&local->lock);
+
+ list_for_each_entry (entry, parents, next) {
+ parent = inode_find (inode->table, entry->par);
+
+ quota_check_limit (frame, parent, this, NULL, NULL);
+ }
+
+out:
+ return;
+}
+
int32_t
quota_check_limit (call_frame_t *frame, inode_t *inode, xlator_t *this,
char *name, uuid_t par)
@@ -789,7 +900,9 @@ quota_check_limit (call_frame_t *frame, inode_t *inode, xlator_t *this,
}
if (parent == NULL) {
- ret = quota_build_ancestry (frame, _inode, this);
+ ret = quota_build_ancestry (_inode,
+ quota_check_limit_continuation,
+ frame);
if (ret < 0) {
op_errno = -ret;
goto err;
@@ -1018,77 +1131,6 @@ off:
return 0;
}
-
-void
-quota_update_size (xlator_t *this, inode_t *inode, char *name, uuid_t par,
- int64_t delta)
-{
- inode_t *_inode = NULL;
- inode_t *parent = NULL;
- uint64_t value = 0;
- quota_inode_ctx_t *ctx = NULL;
- uuid_t trav_uuid = {0,};
-
- GF_VALIDATE_OR_GOTO ("quota", this, out);
- GF_VALIDATE_OR_GOTO (this->name, inode, out);
-
- inode_ctx_get (inode, this, &value);
- ctx = (quota_inode_ctx_t *)(unsigned long)value;
-
- _inode = inode_ref (inode);
-
- if ( par != NULL ) {
- uuid_copy (trav_uuid, par);
- }
-
- do {
- if ((ctx != NULL) && (ctx->hard_lim >= 0)) {
- quota_log_usage (this, ctx, _inode, delta);
- LOCK (&ctx->lock);
- {
- ctx->size += delta;
- if (ctx->size < 0)
- ctx->size = 0;
- }
- UNLOCK (&ctx->lock);
- }
-
- if (__is_root_gfid (_inode->gfid)) {
- break;
- }
-
- parent = inode_parent (_inode, trav_uuid, name);
- if (parent == NULL) {
- /* TODO: build ancestry and continue updating size */
- gf_log (this->name, GF_LOG_DEBUG,
- "cannot find parent for inode (gfid:%s), hence "
- "aborting size updation of parents",
- uuid_utoa (_inode->gfid));
- }
-
- if (name != NULL) {
- name = NULL;
- uuid_clear (trav_uuid);
- }
-
- inode_unref (_inode);
- _inode = parent;
-
- if (_inode == NULL) {
- break;
- }
-
- value = 0;
- ctx = NULL;
- inode_ctx_get (_inode, this, &value);
- ctx = (quota_inode_ctx_t *)(unsigned long)value;
- } while (1);
-
-out:
- return;
-}
-
-
int32_t
quota_writev_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
int32_t op_ret, int32_t op_errno, struct iatt *prebuf,
@@ -1098,9 +1140,6 @@ quota_writev_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
uint64_t ctx_int = 0;
quota_inode_ctx_t *ctx = NULL;
quota_local_t *local = NULL;
- quota_dentry_t *dentry = NULL, *tmp = NULL;
- int64_t delta = 0;
- struct list_head head = {0, };
local = frame->local;
@@ -1108,8 +1147,6 @@ quota_writev_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
goto out;
}
- INIT_LIST_HEAD (&head);
-
ret = inode_ctx_get (local->loc.inode, this, &ctx_int);
if (ret) {
gf_log (this->name, GF_LOG_WARNING,
@@ -1129,25 +1166,9 @@ quota_writev_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
LOCK (&ctx->lock);
{
ctx->buf = *postbuf;
-
- list_for_each_entry (dentry, &ctx->parents, next) {
- tmp = __quota_dentry_new (NULL, dentry->name,
- dentry->par);
- list_add_tail (&tmp->next, &head);
- }
-
}
UNLOCK (&ctx->lock);
- if (postbuf->ia_blocks != prebuf->ia_blocks)
- delta = local->delta;
-
- list_for_each_entry_safe (dentry, tmp, &head, next) {
- quota_update_size (this, local->loc.inode, dentry->name,
- dentry->par, delta);
- __quota_dentry_free (dentry);
- }
-
out:
QUOTA_STACK_UNWIND (writev, frame, op_ret, op_errno, prebuf, postbuf,
xdata);
@@ -1584,12 +1605,6 @@ quota_unlink_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
goto out;
}
- if (!local->skip_check)
- quota_update_size (this, local->loc.inode,
- (char *)local->loc.name,
- local->loc.parent->gfid,
- (-(ctx->buf.ia_blocks * 512)));
-
LOCK (&ctx->lock);
{
list_for_each_entry (dentry, &ctx->parents, next) {
@@ -1681,9 +1696,6 @@ quota_link_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
if (local->skip_check)
goto out;
- quota_update_size (this, local->loc.parent, NULL, NULL,
- (buf->ia_blocks * 512));
-
ret = quota_inode_ctx_get (inode, this, &ctx, 0);
if ((ret == -1) || (ctx == NULL)) {
gf_log (this->name, GF_LOG_DEBUG, "quota context is NULL on "
@@ -1869,13 +1881,6 @@ quota_rename_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
size = buf->ia_blocks * 512;
}
- if (local->oldloc.parent != local->newloc.parent) {
- quota_update_size (this, local->oldloc.parent, NULL, NULL,
- (-size));
- quota_update_size (this, local->newloc.parent, NULL, NULL,
- size);
- }
-
if (!(IA_ISREG (local->oldloc.inode->ia_type)
|| IA_ISLNK (local->oldloc.inode->ia_type))) {
goto out;
@@ -2074,7 +2079,6 @@ quota_symlink_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
struct iatt *buf, struct iatt *preparent,
struct iatt *postparent, dict_t *xdata)
{
- int64_t size = 0;
quota_local_t *local = NULL;
quota_inode_ctx_t *ctx = NULL;
quota_dentry_t *dentry = NULL;
@@ -2084,9 +2088,6 @@ quota_symlink_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
}
local = frame->local;
- size = buf->ia_blocks * 512;
-
- quota_update_size (this, local->loc.parent, NULL, NULL, size);
quota_inode_ctx_get (local->loc.inode, this, &ctx, 1);
if (ctx == NULL) {
@@ -2218,7 +2219,6 @@ quota_truncate_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
{
quota_local_t *local = NULL;
quota_inode_ctx_t *ctx = NULL;
- int64_t delta = 0;
if (op_ret < 0) {
goto out;
@@ -2230,10 +2230,6 @@ quota_truncate_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
goto out;
}
- delta = (postbuf->ia_blocks - prebuf->ia_blocks) * 512;
-
- quota_update_size (this, local->loc.inode, NULL, NULL, delta);
-
quota_inode_ctx_get (local->loc.inode, this, &ctx, 0);
if (ctx == NULL) {
gf_log (this->name, GF_LOG_DEBUG, "quota context is NULL on "
@@ -2305,7 +2301,6 @@ quota_ftruncate_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
{
quota_local_t *local = NULL;
quota_inode_ctx_t *ctx = NULL;
- int64_t delta = 0;
if (op_ret < 0) {
goto out;
@@ -2317,10 +2312,6 @@ quota_ftruncate_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
goto out;
}
- delta = (postbuf->ia_blocks - prebuf->ia_blocks) * 512;
-
- quota_update_size (this, local->loc.inode, NULL, NULL, delta);
-
quota_inode_ctx_get (local->loc.inode, this, &ctx, 0);
if (ctx == NULL) {
gf_log (this->name, GF_LOG_DEBUG, "quota context is NULL on "
@@ -3804,8 +3795,6 @@ quota_fallocate_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
uint64_t ctx_int = 0;
quota_inode_ctx_t *ctx = NULL;
quota_local_t *local = NULL;
- quota_dentry_t *dentry = NULL;
- int64_t delta = 0;
local = frame->local;
@@ -3835,12 +3824,6 @@ quota_fallocate_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
}
UNLOCK (&ctx->lock);
- list_for_each_entry (dentry, &ctx->parents, next) {
- delta = (postbuf->ia_blocks - prebuf->ia_blocks) * 512;
- quota_update_size (this, local->loc.inode,
- dentry->name, dentry->par, delta);
- }
-
out:
QUOTA_STACK_UNWIND (fallocate, frame, op_ret, op_errno, prebuf, postbuf,
xdata);
diff --git a/xlators/features/quota/src/quota.h b/xlators/features/quota/src/quota.h
index 96c19e77e..82ba90513 100644
--- a/xlators/features/quota/src/quota.h
+++ b/xlators/features/quota/src/quota.h
@@ -157,28 +157,34 @@ struct quota_limit {
} __attribute__ ((packed));
typedef struct quota_limit quota_limit_t;
+typedef void
+(*quota_ancestry_built_t) (struct list_head *parents, inode_t *inode,
+ int32_t op_ret, int32_t op_errno, void *data);
+
struct quota_local {
- gf_lock_t lock;
- uint32_t validate_count;
- uint32_t link_count;
- loc_t loc;
- loc_t oldloc;
- loc_t newloc;
- loc_t validate_loc;
- int64_t delta;
- int32_t op_ret;
- int32_t op_errno;
- int64_t size;
- gf_boolean_t skip_check;
- char just_validated;
- fop_lookup_cbk_t validate_cbk;
- inode_t *inode;
- call_stub_t *stub;
- struct iobref *iobref;
- quota_limit_t limit;
- int64_t space_available;
+ gf_lock_t lock;
+ uint32_t validate_count;
+ uint32_t link_count;
+ loc_t loc;
+ loc_t oldloc;
+ loc_t newloc;
+ loc_t validate_loc;
+ int64_t delta;
+ int32_t op_ret;
+ int32_t op_errno;
+ int64_t size;
+ gf_boolean_t skip_check;
+ char just_validated;
+ fop_lookup_cbk_t validate_cbk;
+ inode_t *inode;
+ call_stub_t *stub;
+ struct iobref *iobref;
+ quota_limit_t limit;
+ int64_t space_available;
+ quota_ancestry_built_t ancestry_cbk;
+ void *ancestry_data;
};
-typedef struct quota_local quota_local_t;
+typedef struct quota_local quota_local_t;
struct quota_priv {
uint32_t soft_timeout;