summaryrefslogtreecommitdiffstats
path: root/xlators/features/marker/src/marker-quota.c
diff options
context:
space:
mode:
authorRaghavendra G <raghavendra@gluster.com>2011-05-08 23:41:23 +0000
committerAnand Avati <avati@gluster.com>2011-07-20 06:12:42 -0700
commite559ea5f8056472a6df8fde942239cb6342dc23e (patch)
treef781e2ac4e2381f3f245dcbf4afeae3607995f0f /xlators/features/marker/src/marker-quota.c
parentfd60df8798b74ea2a9c793798d4ad2ab3fbf9394 (diff)
features/marker: fix race-conditions, memory corruptions and fd-leaks.
- remove xattrs from newpath after rename is complete. - hold inodelk on both parents (if they are different) before doing rename and gather contribution values of oldpath and newpath to their parents while still holding the locks. Use these contribution values to reduce parent sizes. - performance optimization: abort updation process if delta is zero. - libglusterfs/call-stub: Allow unwinding of frames for rename during call_resume_unwind. - fixes in dirty inode healing codepath: - fix fd-leak of fd opened on the directory. - don't add (instead just assign) next offset at which readdir has to be sent to local->d_off. - assign to local->d_off before winding lookup call to get child contribution. - use mutexes while accessing contribution values stored in inode context. - use contribution values from backend instead of in-memory while reducing parent sizes during rename - wipe parent_loc in marker_local_unref. - check for refcount being zero holding lock in quota_local_unref. - hold parent inodelk during creation of xattrs on directory. - use contribution value to reduce parent's size, if the value to be subtracted is not passed as argument to reduce_parent_size. - skip contribution creation on root. Change-Id: I97a9ac7efc5cf82abd3837fa6f9766c35676a908 BUG: 2697 Signed-off-by: Junaid <junaid@gluster.com> Reviewed-on: http://review.gluster.com/31 Tested-by: Gluster Build System <jenkins@build.gluster.com>
Diffstat (limited to 'xlators/features/marker/src/marker-quota.c')
-rw-r--r--xlators/features/marker/src/marker-quota.c609
1 files changed, 415 insertions, 194 deletions
diff --git a/xlators/features/marker/src/marker-quota.c b/xlators/features/marker/src/marker-quota.c
index 8d4f17ea583..16171bfb8aa 100644
--- a/xlators/features/marker/src/marker-quota.c
+++ b/xlators/features/marker/src/marker-quota.c
@@ -34,7 +34,7 @@ void
mq_assign_lk_owner (xlator_t *this, call_frame_t *frame)
{
marker_conf_t *conf = NULL;
- uint64_t lk_owner = 0;
+ uint64_t lk_owner = 0;
conf = this->private;
@@ -108,11 +108,12 @@ dirty_inode_updation_done (call_frame_t *frame, void *cookie, xlator_t *this,
int32_t op_ret, int32_t op_errno)
{
quota_local_t *local = NULL;
+ int32_t value = 0;
local = frame->local;
if (!local->err)
- QUOTA_SAFE_DECREMENT (&local->lock, local->ref);
+ QUOTA_SAFE_DECREMENT (&local->lock, local->ref, value);
else
frame->local = NULL;
@@ -125,7 +126,7 @@ int32_t
release_lock_on_dirty_inode (call_frame_t *frame, void *cookie, xlator_t *this,
int32_t op_ret, int32_t op_errno)
{
- struct gf_flock lock;
+ struct gf_flock lock = {0, };
quota_local_t *local = NULL;
local = frame->local;
@@ -158,13 +159,13 @@ release_lock_on_dirty_inode (call_frame_t *frame, void *cookie, xlator_t *this,
int32_t
mark_inode_undirty (call_frame_t *frame, void *cookie, xlator_t *this,
- int32_t op_ret, int32_t op_errno, dict_t *dict)
+ int32_t op_ret, int32_t op_errno, dict_t *dict)
{
- int32_t ret = -1;
- int64_t *size = NULL;
- dict_t *newdict = NULL;
- quota_local_t *local = NULL;
- marker_conf_t *priv = NULL;
+ int32_t ret = -1;
+ int64_t *size = NULL;
+ dict_t *newdict = NULL;
+ quota_local_t *local = NULL;
+ marker_conf_t *priv = NULL;
local = (quota_local_t *) frame->local;
@@ -180,7 +181,11 @@ mark_inode_undirty (call_frame_t *frame, void *cookie, xlator_t *this,
if (ret)
goto wind;
- local->ctx->size = ntoh64 (*size);
+ LOCK (&local->ctx->lock);
+ {
+ local->ctx->size = ntoh64 (*size);
+ }
+ UNLOCK (&local->ctx->lock);
wind:
newdict = dict_new ();
@@ -282,10 +287,10 @@ err:
int32_t
get_dirty_inode_size (call_frame_t *frame, xlator_t *this)
{
- int32_t ret = -1;
- dict_t *dict = NULL;
- quota_local_t *local = NULL;
- marker_conf_t *priv = NULL;
+ int32_t ret = -1;
+ dict_t *dict = NULL;
+ quota_local_t *local = NULL;
+ marker_conf_t *priv = NULL;
local = (quota_local_t *) frame->local;
@@ -326,11 +331,11 @@ get_child_contribution (call_frame_t *frame,
dict_t *dict,
struct iatt *postparent)
{
- int32_t ret = -1;
- int32_t val = 0;
- char contri_key [512] = {0, };
- int64_t *contri = NULL;
- quota_local_t *local = NULL;
+ int32_t ret = -1;
+ int32_t val = 0;
+ char contri_key [512] = {0, };
+ int64_t *contri = NULL;
+ quota_local_t *local = NULL;
local = frame->local;
@@ -370,7 +375,7 @@ out:
if (val== 0) {
if (local->err) {
- QUOTA_SAFE_DECREMENT (&local->lock, local->ref);
+ QUOTA_SAFE_DECREMENT (&local->lock, local->ref, val);
quota_local_unref (this, local);
} else
@@ -389,15 +394,15 @@ quota_readdir_cbk (call_frame_t *frame,
int32_t op_errno,
gf_dirent_t *entries)
{
- char contri_key [512] = {0, };
- loc_t loc;
- int32_t ret = 0;
- off_t offset = 0;
- int32_t count = 0;
- dict_t *dict = NULL;
- quota_local_t *local = NULL;
- gf_dirent_t *entry = NULL;
- call_frame_t *newframe = NULL;
+ char contri_key [512] = {0, };
+ int32_t ret = 0;
+ off_t offset = 0;
+ int32_t count = 0;
+ dict_t *dict = NULL;
+ quota_local_t *local = NULL;
+ gf_dirent_t *entry = NULL;
+ call_frame_t *newframe = NULL;
+ loc_t loc = {0, };
local = frame->local;
@@ -418,15 +423,17 @@ quota_readdir_cbk (call_frame_t *frame,
local->dentry_child_count = 0;
list_for_each_entry (entry, (&entries->list), list) {
- gf_log (this->name, GF_LOG_DEBUG, "entry = %s", entry->d_name);
+ gf_log (this->name, GF_LOG_DEBUG, "entry = %s", entry->d_name);
- if ((!strcmp (entry->d_name, ".")) || (!strcmp (entry->d_name,
- ".."))) {
- gf_log (this->name, GF_LOG_DEBUG, "entry = %s",
- entry->d_name);
- continue;
- }
- count++;
+ if ((!strcmp (entry->d_name, ".")) || (!strcmp (entry->d_name,
+ ".."))) {
+ gf_log (this->name, GF_LOG_DEBUG, "entry = %s",
+ entry->d_name);
+ continue;
+ }
+
+ offset = entry->d_off;
+ count++;
}
local->frame = frame;
@@ -435,6 +442,7 @@ quota_readdir_cbk (call_frame_t *frame,
LOCK (&local->lock);
{
local->dentry_child_count = count;
+ local->d_off = offset;
}
UNLOCK (&local->lock);
}
@@ -447,7 +455,6 @@ quota_readdir_cbk (call_frame_t *frame,
".."))) {
gf_log (this->name, GF_LOG_DEBUG, "entry = %s",
entry->d_name);
- offset = entry->d_off;
continue;
}
@@ -513,17 +520,12 @@ quota_readdir_cbk (call_frame_t *frame,
break;
}
}
- gf_log (this->name, GF_LOG_DEBUG, "offset before =%"PRIu64,
- local->d_off);
- local->d_off +=offset;
- gf_log (this->name, GF_LOG_DEBUG, "offset after = %"PRIu64,
- local->d_off);
- if (ret)
+ if (ret) {
release_lock_on_dirty_inode (frame, NULL, this, 0, 0);
-
- else if (count == 0 )
+ } else if (count == 0 ) {
get_dirty_inode_size (frame, this);
+ }
return 0;
}
@@ -586,8 +588,8 @@ check_if_still_dirty (call_frame_t *frame,
priv = this->private;
if (!dict) {
- ret = -1;
- goto err;
+ ret = -1;
+ goto err;
}
ret = dict_get_int8 (dict, QUOTA_DIRTY_KEY, &dirty);
@@ -619,6 +621,10 @@ err:
release_lock_on_dirty_inode (frame, NULL, this, 0, 0);
}
+ if (fd != NULL) {
+ fd_unref (fd);
+ }
+
return 0;
}
@@ -626,10 +632,10 @@ int32_t
get_dirty_xattr (call_frame_t *frame, void *cookie,
xlator_t *this, int32_t op_ret, int32_t op_errno)
{
- int32_t ret = -1;
- dict_t *xattr_req = NULL;
- quota_local_t *local = NULL;
- marker_conf_t *priv = NULL;
+ int32_t ret = -1;
+ dict_t *xattr_req = NULL;
+ quota_local_t *local = NULL;
+ marker_conf_t *priv = NULL;
if (op_ret == -1) {
dirty_inode_updation_done (frame, NULL, this, 0, 0);
@@ -716,7 +722,7 @@ update_dirty_inode (xlator_t *this,
return 0;
fr_destroy:
- QUOTA_STACK_DESTROY (frame, this);
+ QUOTA_STACK_DESTROY (frame, this);
out:
return 0;
@@ -727,18 +733,42 @@ int32_t
quota_inode_creation_done (call_frame_t *frame, void *cookie, xlator_t *this,
int32_t op_ret, int32_t op_errno)
{
- quota_local_t *local = NULL;
-
if (frame == NULL)
return 0;
+ QUOTA_STACK_DESTROY (frame, this);
+
+ return 0;
+}
+
+
+int32_t
+quota_xattr_creation_release_lock (call_frame_t *frame, void *cookie,
+ xlator_t *this, int32_t op_ret,
+ int32_t op_errno)
+{
+ struct gf_flock lock = {0, };
+ quota_local_t *local = NULL;
+
local = frame->local;
- QUOTA_STACK_DESTROY (frame, this);
+ lock.l_type = F_UNLCK;
+ lock.l_whence = SEEK_SET;
+ lock.l_start = 0;
+ lock.l_len = 0;
+ lock.l_pid = 0;
+
+ STACK_WIND (frame,
+ quota_inode_creation_done,
+ FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->inodelk,
+ this->name, &local->loc,
+ F_SETLKW, &lock);
return 0;
}
+
int32_t
create_dirty_xattr (call_frame_t *frame, void *cookie, xlator_t *this,
int32_t op_ret, int32_t op_errno, dict_t *dict)
@@ -748,8 +778,9 @@ create_dirty_xattr (call_frame_t *frame, void *cookie, xlator_t *this,
quota_local_t *local = NULL;
marker_conf_t *priv = NULL;
- if (op_ret == -1 && op_errno == ENOTCONN)
+ if (op_ret < 0) {
goto err;
+ }
local = frame->local;
@@ -757,27 +788,31 @@ create_dirty_xattr (call_frame_t *frame, void *cookie, xlator_t *this,
if (local->loc.inode->ia_type == IA_IFDIR) {
newdict = dict_new ();
- if (!newdict)
+ if (!newdict) {
goto err;
+ }
ret = dict_set_int8 (newdict, QUOTA_DIRTY_KEY, 0);
- if (ret == -1)
+ if (ret == -1) {
goto err;
+ }
- STACK_WIND (frame, quota_inode_creation_done,
+ STACK_WIND (frame, quota_xattr_creation_release_lock,
FIRST_CHILD(this),
FIRST_CHILD(this)->fops->setxattr,
&local->loc, newdict, 0);
- } else
- quota_inode_creation_done (frame, NULL, this, 0, 0);
+ } else {
+ quota_xattr_creation_release_lock (frame, NULL, this, 0, 0);
+ }
ret = 0;
err:
- if (ret == -1)
- quota_inode_creation_done (frame, NULL, this, -1, 0);
+ if (ret < 0) {
+ quota_xattr_creation_release_lock (frame, NULL, this, 0, 0);
+ }
- if (newdict)
+ if (newdict != NULL)
dict_unref (newdict);
return 0;
@@ -785,27 +820,28 @@ err:
int32_t
-quota_set_inode_xattr (xlator_t *this, loc_t *loc)
+quota_create_xattr (xlator_t *this, call_frame_t *frame)
{
int32_t ret = 0;
int64_t *value = NULL;
int64_t *size = NULL;
dict_t *dict = NULL;
char key[512] = {0, };
- call_frame_t *frame = NULL;
quota_local_t *local = NULL;
marker_conf_t *priv = NULL;
quota_inode_ctx_t *ctx = NULL;
inode_contribution_t *contri = NULL;
- if (loc == NULL || this == NULL)
+ if (frame == NULL || this == NULL)
return 0;
+ local = frame->local;
+
priv = (marker_conf_t *) this->private;
- ret = quota_inode_ctx_get (loc->inode, this, &ctx);
+ ret = quota_inode_ctx_get (local->loc.inode, this, &ctx);
if (ret < 0) {
- ctx = quota_inode_ctx_new (loc->inode, this);
+ ctx = quota_inode_ctx_new (local->loc.inode, this);
if (ctx == NULL) {
gf_log (this->name, GF_LOG_WARNING,
"quota_inode_ctx_new failed");
@@ -818,29 +854,150 @@ quota_set_inode_xattr (xlator_t *this, loc_t *loc)
if (!dict)
goto out;
- if (loc->inode->ia_type == IA_IFDIR) {
+ if (local->loc.inode->ia_type == IA_IFDIR) {
QUOTA_ALLOC_OR_GOTO (size, int64_t, ret, err);
ret = dict_set_bin (dict, QUOTA_SIZE_KEY, size, 8);
if (ret < 0)
goto free_size;
}
- //if '/' then dont set contribution xattr
- if (strcmp (loc->path, "/") == 0)
- goto wind;
+ if (strcmp (local->loc.path, "/") != 0) {
+ contri = add_new_contribution_node (this, ctx, &local->loc);
+ if (contri == NULL)
+ goto err;
- contri = add_new_contribution_node (this, ctx, loc);
- if (contri == NULL)
- goto err;
+ QUOTA_ALLOC_OR_GOTO (value, int64_t, ret, err);
+ GET_CONTRI_KEY (key, local->loc.parent->gfid, ret);
+
+ ret = dict_set_bin (dict, key, value, 8);
+ if (ret < 0)
+ goto free_value;
+ }
+
+ STACK_WIND (frame, create_dirty_xattr, FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->xattrop, &local->loc,
+ GF_XATTROP_ADD_ARRAY64, dict);
+ ret = 0;
+
+free_size:
+ if (ret < 0) {
+ GF_FREE (size);
+ }
+
+free_value:
+ if (ret < 0) {
+ GF_FREE (value);
+ }
+
+err:
+ dict_unref (dict);
+
+out:
+ if (ret < 0) {
+ quota_xattr_creation_release_lock (frame, NULL, this, 0, 0);
+ }
+
+ return 0;
+}
+
+
+int32_t
+quota_check_n_set_inode_xattr (call_frame_t *frame, void *cookie,
+ xlator_t *this, int32_t op_ret, int32_t op_errno,
+ inode_t *inode, struct iatt *buf, dict_t *dict,
+ struct iatt *postparent)
+{
+ quota_local_t *local = NULL;
+ int64_t *size = NULL, *contri = NULL;
+ int8_t dirty = 0;
+ marker_conf_t *priv = NULL;
+ int32_t ret = 0;
+ char contri_key[512] = {0, };
+
+ if (op_ret < 0) {
+ goto out;
+ }
- QUOTA_ALLOC_OR_GOTO (value, int64_t, ret, err);
- GET_CONTRI_KEY (key, loc->parent->gfid, ret);
+ local = frame->local;
+ priv = this->private;
- ret = dict_set_bin (dict, key, value, 8);
+ ret = dict_get_bin (dict, QUOTA_SIZE_KEY, (void **) &size);
if (ret < 0)
- goto free_value;
+ goto create_xattr;
+
+ ret = dict_get_int8 (dict, QUOTA_DIRTY_KEY, &dirty);
+ if (ret < 0)
+ goto create_xattr;
+
+ //check contribution xattr if not root
+ if (strcmp (local->loc.path, "/") != 0) {
+ GET_CONTRI_KEY (contri_key, local->loc.parent->gfid, ret);
+ if (ret < 0)
+ goto out;
+
+ ret = dict_get_bin (dict, contri_key, (void **) &contri);
+ if (ret < 0)
+ goto create_xattr;
+ }
+
+out:
+ quota_xattr_creation_release_lock (frame, NULL, this, 0, 0);
+ return 0;
+
+create_xattr:
+ quota_create_xattr (this, frame);
+ return 0;
+}
+
+
+int32_t
+quota_get_xattr (call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno)
+{
+ dict_t *xattr_req = NULL;
+ quota_local_t *local = NULL;
+ int32_t ret = 0;
+
+ if (op_ret < 0) {
+ goto lock_err;
+ }
+
+ local = frame->local;
+
+ xattr_req = dict_new ();
+ if (xattr_req == NULL) {
+ goto err;
+ }
+
+ ret = quota_req_xattr (this, &local->loc, xattr_req);
+ if (ret < 0) {
+ gf_log (this->name, GF_LOG_WARNING, "cannot request xattr");
+ goto err;
+ }
+
+ STACK_WIND (frame, quota_check_n_set_inode_xattr, FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->lookup, &local->loc, xattr_req);
+
+ return 0;
+
+err:
+ quota_xattr_creation_release_lock (frame, NULL, this, 0, 0);
+ return 0;
+
+lock_err:
+ quota_inode_creation_done (frame, NULL, this, 0, 0);
+ return 0;
+}
+
+
+int32_t
+quota_set_inode_xattr (xlator_t *this, loc_t *loc)
+{
+ struct gf_flock lock = {0, };
+ quota_local_t *local = NULL;
+ int32_t ret = 0;
+ call_frame_t *frame = NULL;
-wind:
frame = create_frame (this, this->ctx->pool);
if (!frame) {
ret = -1;
@@ -848,38 +1005,34 @@ wind:
}
local = quota_local_new ();
- if (local == NULL)
- goto free_size;
-
- local->ctx = ctx;
+ if (local == NULL) {
+ goto err;
+ }
- local->contri = contri;
+ frame->local = local;
ret = loc_copy (&local->loc, loc);
- if (ret < 0)
- quota_local_unref (this, local);
+ if (ret < 0) {
+ goto err;
+ }
frame->local = local;
- STACK_WIND (frame, create_dirty_xattr, FIRST_CHILD(this),
- FIRST_CHILD(this)->fops->xattrop, &local->loc,
- GF_XATTROP_ADD_ARRAY64, dict);
- ret = 0;
+ lock.l_len = 0;
+ lock.l_start = 0;
+ lock.l_type = F_WRLCK;
+ lock.l_whence = SEEK_SET;
-free_size:
- if (ret < 0)
- GF_FREE (size);
+ STACK_WIND (frame,
+ quota_get_xattr,
+ FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->inodelk,
+ this->name, &local->loc, F_SETLKW, &lock);
-free_value:
- if (ret < 0)
- GF_FREE (value);
+ return 0;
err:
- dict_unref (dict);
-
-out:
- if (ret < 0)
- quota_inode_creation_done (NULL, NULL, this, -1, 0);
+ QUOTA_STACK_DESTROY (frame, this);
return 0;
}
@@ -945,7 +1098,8 @@ quota_inodelk_cbk (call_frame_t *frame, void *cookie,
gf_log (this->name, GF_LOG_DEBUG,
"inodelk released on %s", local->parent_loc.path);
- if (strcmp (local->parent_loc.path, "/") == 0) {
+ if ((strcmp (local->parent_loc.path, "/") == 0)
+ || (local->delta == 0)) {
xattr_updation_done (frame, NULL, this, 0, 0, NULL);
} else {
ret = get_parent_inode_local (this, local);
@@ -968,9 +1122,9 @@ quota_release_parent_lock (call_frame_t *frame, void *cookie,
int32_t op_errno)
{
int32_t ret = 0;
- struct gf_flock lock;
quota_local_t *local = NULL;
quota_inode_ctx_t *ctx = NULL;
+ struct gf_flock lock = {0, };
local = frame->local;
@@ -1029,7 +1183,7 @@ quota_mark_undirty (call_frame_t *frame,
priv = this->private;
- //update the size of the parent inode
+ //update the size of the parent inode
if (dict != NULL) {
ret = quota_inode_ctx_get (local->parent_loc.inode, this, &ctx);
if (ret < 0)
@@ -1104,7 +1258,11 @@ quota_update_parent_size (call_frame_t *frame,
goto err;
}
- local->contri->contribution += local->delta;
+ LOCK (&local->contri->lock);
+ {
+ local->contri->contribution += local->delta;
+ }
+ UNLOCK (&local->contri->lock);
gf_log (this->name, GF_LOG_DEBUG, "%s %"PRId64 "%"PRId64,
local->loc.path, local->ctx->size,
@@ -1160,16 +1318,16 @@ quota_update_inode_contribution (call_frame_t *frame, void *cookie,
struct iatt *buf, dict_t *dict,
struct iatt *postparent)
{
- int32_t ret = -1;
- int64_t *size = NULL;
- int64_t *contri = NULL;
- int64_t *delta = NULL;
+ int32_t ret = -1;
+ int64_t *size = NULL, size_int = 0, contri_int = 0;
+ int64_t *contri = NULL;
+ int64_t *delta = NULL;
char contri_key [512] = {0, };
- dict_t *newdict = NULL;
- quota_local_t *local = NULL;
- quota_inode_ctx_t *ctx = NULL;
- marker_conf_t *priv = NULL;
- inode_contribution_t *contribution = NULL;
+ dict_t *newdict = NULL;
+ quota_local_t *local = NULL;
+ quota_inode_ctx_t *ctx = NULL;
+ marker_conf_t *priv = NULL;
+ inode_contribution_t *contribution = NULL;
local = frame->local;
@@ -1203,30 +1361,44 @@ quota_update_inode_contribution (call_frame_t *frame, void *cookie,
} else
ctx->size = buf->ia_blocks * 512;
- ret = dict_get_bin (dict, contri_key, (void **) &contri);
+ size_int = ctx->size;
+ }
+unlock:
+ UNLOCK (&ctx->lock);
+
+ if (ret < 0) {
+ goto err;
+ }
+
+ ret = dict_get_bin (dict, contri_key, (void **) &contri);
+
+ LOCK (&contribution->lock);
+ {
if (ret < 0)
contribution->contribution = 0;
else
contribution->contribution = ntoh64 (*contri);
- ret = 0;
+ contri_int = contribution->contribution;
}
-unlock:
- UNLOCK (&ctx->lock);
-
- if (ret < 0)
- goto err;
+ UNLOCK (&contribution->lock);
gf_log (this->name, GF_LOG_DEBUG, "%s %"PRId64 "%"PRId64,
- local->loc.path, ctx->size, contribution->contribution);
+ local->loc.path, size_int, contri_int);
+
+ local->delta = size_int - contri_int;
+
+ if (local->delta == 0) {
+ quota_mark_undirty (frame, NULL, this, 0, 0, NULL);
+ return 0;
+ }
+
newdict = dict_new ();
if (newdict == NULL) {
ret = -1;
goto err;
}
- local->delta = ctx->size - contribution->contribution;
-
QUOTA_ALLOC_OR_GOTO (delta, int64_t, ret, err);
*delta = hton64 (local->delta);
@@ -1264,12 +1436,12 @@ quota_fetch_child_size_and_contri (call_frame_t *frame, void *cookie,
xlator_t *this, int32_t op_ret,
int32_t op_errno)
{
- int32_t ret = -1;
+ int32_t ret = -1;
char contri_key [512] = {0, };
- dict_t *newdict = NULL;
- quota_local_t *local = NULL;
- marker_conf_t *priv = NULL;
- quota_inode_ctx_t *ctx = NULL;
+ dict_t *newdict = NULL;
+ quota_local_t *local = NULL;
+ marker_conf_t *priv = NULL;
+ quota_inode_ctx_t *ctx = NULL;
local = frame->local;
@@ -1390,7 +1562,7 @@ err:
int32_t
get_lock_on_parent (call_frame_t *frame, xlator_t *this)
{
- struct gf_flock lock;
+ struct gf_flock lock = {0, };
quota_local_t *local = NULL;
GF_VALIDATE_OR_GOTO ("marker", frame, fr_destroy);
@@ -1467,8 +1639,8 @@ err:
int
initiate_quota_txn (xlator_t *this, loc_t *loc)
{
- int32_t ret = -1;
- quota_inode_ctx_t *ctx = NULL;
+ int32_t ret = -1;
+ quota_inode_ctx_t *ctx = NULL;
inode_contribution_t *contribution = NULL;
VALIDATE_OR_GOTO (loc, out);
@@ -1491,34 +1663,33 @@ out:
}
-int32_t
-validate_inode_size_contribution (xlator_t *this,
- loc_t *loc,
- quota_inode_ctx_t *ctx,
- inode_contribution_t *contribution)
-{
- if (ctx->size != contribution->contribution)
- initiate_quota_txn (this, loc);
+/* int32_t */
+/* validate_inode_size_contribution (xlator_t *this, loc_t *loc, int64_t size, */
+/* int64_t contribution) */
+/* { */
+/* if (size != contribution) { */
+/* initiate_quota_txn (this, loc); */
+/* } */
- return 0;
-}
+/* return 0; */
+/* } */
int32_t
inspect_directory_xattr (xlator_t *this,
- loc_t *loc,
- dict_t *dict,
- struct iatt buf)
+ loc_t *loc,
+ dict_t *dict,
+ struct iatt buf)
{
- int32_t ret = 0;
- int8_t dirty = -1;
- int64_t *size = NULL;
- int64_t *contri = NULL;
- char contri_key [512] = {0, };
- marker_conf_t *priv = NULL;
- gf_boolean_t not_root = _gf_false;
- quota_inode_ctx_t *ctx = NULL;
- inode_contribution_t *contribution = NULL;
+ int32_t ret = 0;
+ int8_t dirty = -1;
+ int64_t *size = NULL, size_int = 0;
+ int64_t *contri = NULL, contri_int = 0;
+ char contri_key [512] = {0, };
+ marker_conf_t *priv = NULL;
+ gf_boolean_t not_root = _gf_false;
+ quota_inode_ctx_t *ctx = NULL;
+ inode_contribution_t *contribution = NULL;
priv = this->private;
@@ -1559,20 +1730,28 @@ inspect_directory_xattr (xlator_t *this,
if (ret < 0)
goto out;
- contribution->contribution = ntoh64 (*contri);
+ LOCK (&contribution->lock);
+ {
+ contribution->contribution = ntoh64 (*contri);
+ contri_int = contribution->contribution;
+ }
+ UNLOCK (&contribution->lock);
}
- ctx->size = ntoh64 (*size);
+ LOCK (&ctx->lock);
+ {
+ ctx->size = ntoh64 (*size);
+ ctx->dirty = dirty;
+ size_int = ctx->size;
+ }
+ UNLOCK (&ctx->lock);
gf_log (this->name, GF_LOG_DEBUG, "size=%"PRId64
- " contri=%"PRId64, ctx->size,
- contribution?contribution->contribution:0);
+ " contri=%"PRId64, size_int, contri_int);
- ctx->dirty = dirty;
- if (ctx->dirty == 1) {
+ if (dirty) {
update_dirty_inode (this, loc, ctx, contribution);
- } else if (not_root == _gf_true &&
- ctx->size != contribution->contribution) {
+ } else if ((not_root == _gf_true) && (size_int != contri_int)) {
initiate_quota_txn (this, loc);
}
@@ -1590,13 +1769,13 @@ inspect_file_xattr (xlator_t *this,
dict_t *dict,
struct iatt buf)
{
- int32_t ret = -1;
- uint64_t contri_int = 0;
- int64_t *contri_ptr = NULL;
+ int32_t ret = -1;
+ uint64_t contri_int = 0, size = 0;
+ int64_t *contri_ptr = NULL;
char contri_key [512] = {0, };
- marker_conf_t *priv = NULL;
- quota_inode_ctx_t *ctx = NULL;
- inode_contribution_t *contribution = NULL;
+ marker_conf_t *priv = NULL;
+ quota_inode_ctx_t *ctx = NULL;
+ inode_contribution_t *contribution = NULL;
priv = this->private;
@@ -1618,10 +1797,12 @@ inspect_file_xattr (xlator_t *this,
LOCK (&ctx->lock);
{
ctx->size = 512 * buf.ia_blocks;
+ size = ctx->size;
}
UNLOCK (&ctx->lock);
- list_for_each_entry (contribution, &ctx->contribution_head, contri_list) {
+ list_for_each_entry (contribution, &ctx->contribution_head,
+ contri_list) {
GET_CONTRI_KEY (contri_key, contribution->gfid, ret);
if (ret < 0)
continue;
@@ -1630,14 +1811,19 @@ inspect_file_xattr (xlator_t *this,
if (ret == 0) {
contri_ptr = (int64_t *)(unsigned long)contri_int;
- contribution->contribution = ntoh64 (*contri_ptr);
+ LOCK (&contribution->lock);
+ {
+ contribution->contribution = ntoh64 (*contri_ptr);
+ contri_int = contribution->contribution;
+ }
+ UNLOCK (&contribution->lock);
gf_log (this->name, GF_LOG_DEBUG,
- "size=%"PRId64 " contri=%"PRId64, ctx->size,
- contribution->contribution);
+ "size=%"PRId64 " contri=%"PRId64, size, contri_int);
- ret = validate_inode_size_contribution
- (this, loc, ctx, contribution);
+ if (size != contri_int) {
+ initiate_quota_txn (this, loc);
+ }
} else
initiate_quota_txn (this, loc);
}
@@ -1717,9 +1903,9 @@ int32_t
quota_inode_remove_done (call_frame_t *frame, void *cookie, xlator_t *this,
int32_t op_ret, int32_t op_errno)
{
- int32_t ret = 0;
- char contri_key [512] = {0, };
- quota_local_t *local = NULL;
+ int32_t ret = 0;
+ char contri_key [512] = {0, };
+ quota_local_t *local = NULL;
local = (quota_local_t *) frame->local;
@@ -1734,8 +1920,8 @@ quota_inode_remove_done (call_frame_t *frame, void *cookie, xlator_t *this,
GET_CONTRI_KEY (contri_key, local->contri->gfid, ret);
STACK_WIND (frame, quota_removexattr_cbk, FIRST_CHILD(this),
- FIRST_CHILD(this)->fops->removexattr,
- &local->loc, contri_key);
+ FIRST_CHILD(this)->fops->removexattr,
+ &local->loc, contri_key);
ret = 0;
} else {
quota_removexattr_cbk (frame, NULL, this, 0, 0);
@@ -1758,20 +1944,39 @@ int32_t
mq_inode_remove_done (call_frame_t *frame, void *cookie, xlator_t *this,
int32_t op_ret, int32_t op_errno, dict_t *dict)
{
- int32_t ret;
- struct gf_flock lock;
- quota_inode_ctx_t *ctx;
- quota_local_t *local = NULL;
+ int32_t ret = -1;
+ struct gf_flock lock = {0, };
+ quota_inode_ctx_t *ctx = NULL;
+ quota_local_t *local = NULL;
+ int64_t contribution = 0;
local = frame->local;
if (op_ret == -1)
local->err = -1;
ret = quota_inode_ctx_get (local->parent_loc.inode, this, &ctx);
- if (ret == 0)
- ctx->size -= local->contri->contribution;
- local->contri->contribution = 0;
+ LOCK (&local->contri->lock);
+ {
+ contribution = local->contri->contribution;
+ }
+ UNLOCK (&local->contri->lock);
+
+ if (contribution == local->size) {
+ if (ret == 0) {
+ LOCK (&ctx->lock);
+ {
+ ctx->size -= contribution;
+ }
+ UNLOCK (&ctx->lock);
+
+ LOCK (&local->contri->lock);
+ {
+ local->contri->contribution = 0;
+ }
+ UNLOCK (&local->contri->lock);
+ }
+ }
lock.l_type = F_UNLCK;
lock.l_whence = SEEK_SET;
@@ -1788,7 +1993,7 @@ mq_inode_remove_done (call_frame_t *frame, void *cookie, xlator_t *this,
return 0;
}
-static int32_t
+int32_t
mq_reduce_parent_size_xattr (call_frame_t *frame, void *cookie,
xlator_t *this, int32_t op_ret, int32_t op_errno)
{
@@ -1821,8 +2026,7 @@ mq_reduce_parent_size_xattr (call_frame_t *frame, void *cookie,
QUOTA_ALLOC_OR_GOTO (size, int64_t, ret, err);
- *size = hton64 (-contribution->contribution);
-
+ *size = hton64 (-local->size);
ret = dict_set_bin (dict, QUOTA_SIZE_KEY, size, 8);
if (ret < 0)
@@ -1844,7 +2048,7 @@ err:
}
int32_t
-reduce_parent_size (xlator_t *this, loc_t *loc)
+reduce_parent_size (xlator_t *this, loc_t *loc, int64_t contri)
{
int32_t ret = -1;
struct gf_flock lock = {0,};
@@ -1873,6 +2077,21 @@ reduce_parent_size (xlator_t *this, loc_t *loc)
goto out;
}
+ if (contri >= 0) {
+ local->size = contri;
+ } else {
+ LOCK (&contribution->lock);
+ {
+ local->size = contribution->contribution;
+ }
+ UNLOCK (&contribution->lock);
+ }
+
+ if (local->size == 0) {
+ ret = 0;
+ goto out;
+ }
+
ret = loc_copy (&local->loc, loc);
if (ret < 0)
goto out;
@@ -1904,13 +2123,15 @@ reduce_parent_size (xlator_t *this, loc_t *loc)
FIRST_CHILD(this),
FIRST_CHILD(this)->fops->inodelk,
this->name, &local->parent_loc, F_SETLKW, &lock);
+ local = NULL;
ret = 0;
out:
- if (ret < 0) {
+ if (local != NULL) {
quota_local_unref (this, local);
GF_FREE (local);
}
+
return ret;
}