From 7a871c27f7dedfbd228aac4c2eefd6b829aa619c Mon Sep 17 00:00:00 2001 From: Raghavendra G Date: Sat, 9 Nov 2013 13:44:18 +0530 Subject: features/quota: maintain correct link_count in ancestry building codepath. Ancestry building codepath can be executed simultaneously when a file has hardlinks. So, following fixes are needed: * modify local->link_count under locks in ancestry building codepath. * before invoking check_limit on newly constructed parents, link_count should not be set, but instead incremented by as many number of new invocations of check_limit. * decrementing link_count and the check whether the count has hit zero to resume the waiting call_stub should be atomic. Change-Id: I6f52e50547362a5ded6bd9f085570223aea372d1 BUG: 969461 Signed-off-by: Raghavendra G Reviewed-on: http://review.gluster.org/6491 Tested-by: Gluster Build System Reviewed-by: Vijay Bellur Reviewed-on: http://review.gluster.org/6819 --- xlators/features/quota/src/quota.c | 121 +++++++++++++++++++++---------------- 1 file changed, 68 insertions(+), 53 deletions(-) diff --git a/xlators/features/quota/src/quota.c b/xlators/features/quota/src/quota.c index 33a8d38bd..665999860 100644 --- a/xlators/features/quota/src/quota.c +++ b/xlators/features/quota/src/quota.c @@ -245,7 +245,7 @@ out: } inline void -quota_resume_fop_if_validation_done (quota_local_t *local) +quota_link_count_decrement (quota_local_t *local) { call_stub_t *stub = NULL; int link_count = -1; @@ -255,7 +255,7 @@ quota_resume_fop_if_validation_done (quota_local_t *local) LOCK (&local->lock); { - link_count = local->link_count; + link_count = --local->link_count; if (link_count == 0) { stub = local->stub; local->stub = NULL; @@ -283,13 +283,11 @@ quota_handle_validate_error (quota_local_t *local, int32_t op_ret, local->op_ret = op_ret; local->op_errno = op_errno; } - - /* we abort checking limits on this path to root */ - local->link_count--; } UNLOCK (&local->lock); - quota_resume_fop_if_validation_done (local); + /* we abort checking limits on this path to root */ + quota_link_count_decrement (local); out: return; } @@ -795,12 +793,8 @@ quota_check_limit (call_frame_t *frame, inode_t *inode, xlator_t *this, */ if (0 > frame->root->pid) { ret = 0; - LOCK (&local->lock); - { - --local->link_count; - } - UNLOCK (&local->lock); - goto resume; + quota_link_count_decrement (local); + goto done; } priv = this->private; @@ -883,12 +877,7 @@ quota_check_limit (call_frame_t *frame, inode_t *inode, xlator_t *this, } if (__is_root_gfid (_inode->gfid)) { - LOCK (&local->lock); - { - --local->link_count; - } - UNLOCK (&local->lock); - + quota_link_count_decrement (local); break; } @@ -929,8 +918,7 @@ quota_check_limit (call_frame_t *frame, inode_t *inode, xlator_t *this, _inode = NULL; } -resume: - quota_resume_fop_if_validation_done (local); +done: return 0; err: @@ -1304,13 +1292,18 @@ quota_writev (call_frame_t *frame, xlator_t *this, fd_t *fd, UNLOCK (&ctx->lock); } - local->delta = size; - - local->link_count = parents; - local->stub = stub; + LOCK (&local->lock); + { + local->delta = size; + local->link_count = (parents != 0) ? parents : 1; + local->stub = stub; + } + UNLOCK (&local->lock); if (parents == 0) { - local->link_count = 1; + /* nameless lookup on this inode, allow quota to reconstruct + * ancestry as part of check_limit. + */ quota_check_limit (frame, fd->inode, this, NULL, NULL); } else { list_for_each_entry_safe (dentry, tmp, &head, next) { @@ -1413,9 +1406,13 @@ quota_mkdir (call_frame_t *frame, xlator_t *this, loc_t *loc, mode_t mode, goto err; } - local->stub = stub; - local->delta = 0; - local->link_count = 1; + LOCK (&local->lock); + { + local->stub = stub; + local->delta = 0; + local->link_count = 1; + } + UNLOCK (&local->lock); quota_check_limit (frame, loc->parent, this, NULL, NULL); return 0; @@ -1558,9 +1555,13 @@ quota_create (call_frame_t *frame, xlator_t *this, loc_t *loc, int32_t flags, goto err; } - local->link_count = 1; - local->stub = stub; - local->delta = 0; + LOCK (&local->lock); + { + local->link_count = 1; + local->stub = stub; + local->delta = 0; + } + UNLOCK (&local->lock); quota_check_limit (frame, loc->parent, this, NULL, NULL); return 0; @@ -1830,9 +1831,13 @@ quota_link (call_frame_t *frame, xlator_t *this, loc_t *oldloc, loc_t *newloc, goto err; } - local->link_count = 1; - local->stub = stub; - local->delta = (ctx != NULL) ? ctx->buf.ia_blocks * 512 : 0; + LOCK (&local->lock); + { + local->link_count = 1; + local->stub = stub; + local->delta = (ctx != NULL) ? ctx->buf.ia_blocks * 512 : 0; + } + UNLOCK (&local->lock); quota_check_limit (frame, newloc->parent, this, NULL, NULL); return 0; @@ -2035,8 +2040,12 @@ quota_rename (call_frame_t *frame, xlator_t *this, loc_t *oldloc, goto err; } - local->link_count = 1; - local->stub = stub; + LOCK (&local->lock); + { + local->link_count = 1; + local->stub = stub; + } + UNLOCK (&local->lock); if (IA_ISREG (oldloc->inode->ia_type) || IA_ISLNK (oldloc->inode->ia_type)) { @@ -2191,9 +2200,13 @@ quota_symlink (call_frame_t *frame, xlator_t *this, const char *linkpath, goto err; } - local->stub = stub; - local->delta = strlen (linkpath); - local->link_count = 1; + LOCK (&local->lock); + { + local->stub = stub; + local->delta = strlen (linkpath); + local->link_count = 1; + } + UNLOCK (&local->lock); quota_check_limit (frame, loc->parent, this, NULL, NULL); return 0; @@ -3150,9 +3163,13 @@ quota_mknod (call_frame_t *frame, xlator_t *this, loc_t *loc, mode_t mode, goto err; } - local->link_count = 1; - local->stub = stub; - local->delta = 0; + LOCK (&local->lock); + { + local->link_count = 1; + local->stub = stub; + local->delta = 0; + } + UNLOCK (&local->lock); quota_check_limit (frame, loc->parent, this, NULL, NULL); return 0; @@ -3610,9 +3627,7 @@ quota_statfs_validate_cbk (call_frame_t *frame, void *cookie, xlator_t *this, UNLOCK (&ctx->lock); resume: - --local->link_count; - - quota_resume_fop_if_validation_done (local); + quota_link_count_decrement (local); return 0; } @@ -3637,26 +3652,26 @@ quota_statfs (call_frame_t *frame, xlator_t *this, loc_t *loc, dict_t *xdata) } frame->local = local; - local->inode = inode_ref (loc->inode); - local->link_count = 1; - stub = fop_statfs_stub (frame, quota_statfs_helper, loc, xdata); if (!stub) { op_errno = ENOMEM; goto err; } - local->stub = stub; + LOCK (&local->lock); + { + local->inode = inode_ref (loc->inode); + local->link_count = 1; + local->stub = stub; + } + UNLOCK (&local->lock); ret = quota_validate (frame, local->inode, this, quota_statfs_validate_cbk); if (0 > ret) { - op_errno = -ret; - --local->link_count; + quota_handle_validate_error (local, -1, -ret); } - quota_resume_fop_if_validation_done (local); - return 0; } -- cgit