summaryrefslogtreecommitdiffstats
path: root/xlators/cluster/ec/src/ec-common.c
diff options
context:
space:
mode:
authorPranith Kumar K <pkarampu@redhat.com>2015-05-13 16:57:49 +0530
committerPranith Kumar Karampuri <pkarampu@redhat.com>2015-05-28 04:12:06 -0700
commit3a57ca8ee29ea8e3d3c5bbf28a56a821bfa99d99 (patch)
tree7e919238192422d4bb1f8a86a950013b286b41b3 /xlators/cluster/ec/src/ec-common.c
parent2b8fdde926532014f19d850b1321a4c7046dc001 (diff)
cluster/ec: Fix all EIO errors in EC
Backport of http://review.gluster.org/10770 Backport of http://review.gluster.org/10806 Backport of http://review.gluster.org/10787 Backport of http://review.gluster.org/10868 Backport of http://review.gluster.com/10852 - When a blocking lock is requested, lock request is succeeded even when ec->fragment number of locks are acquired successfully in non-blocking locking phase. This will lead to fop succeeding only on the bricks where the locks are acquired, leading to the necessity of self-heals. To prevent these un-necessary self-heals, if the remaining locks fail with EAGAIN in non-blocking lock phase try blocking locking phase instead. - Handle lookup failures while op in progress - cluster/ec: Correctly cleanup delayed locks When a delayed lock is pending, a graph switch doesn't correctly terminate it. This means that the update of version and size xattrs is lost, causing EIO errors. This patch handles GF_EVENT_PARENT_DOWN event to correctly finish pending udpdates before completing the graph switch. - Fix use after free crash ec_heal creates ec_fop_data but doesn't run ec_manager. ec_fop_data_allocate adds this fop to ec->pending_fops, because ec_manager is not run on this heal fop it is never removed from ec->pending_fops. When it is accessed after free it leads to crash. It is better to not to add HEAL fops to ec->pending_fops because we don't want graph switch to hang the mount because of a BIG file/directory heal. - Forced unlock when lock contention is detected EC uses an eager lock mechanism to optimize multiple read/write requests on the same entry or inode. This increases performance but can have adverse results when other clients try to access the same entry/inode. To solve this, this patch adds a functionality to detect when this happens and force an earlier release to not block other clients. The method consists on requesting GF_GLUSTERFS_INODELK_COUNT and GF_GLUSTERFS_ENTRYLK_COUNT for all fops that take a lock. When this count is greater than one, the lock is marked to be released. All fops already waiting for this lock will be executed normally before releasing the lock, but new requests that also require it will be blocked and restarted after the lock has been released and reacquired again. Another problem was that some operations did correctly lock the parent of an entry when needed, but got the size and version xattrs from the entry instead of the parent. This patch solves this problem by binding all queries of size and version to each lock and replacing all entrylk calls by inodelk ones to remove concurrent updates on directory metadata. This also allows rename to correctly update source and destination directories. BUG: 1225279 Change-Id: I02a6084b138dd38e018a462347cd9ce38610c7ef Reviewed-on: http://review.gluster.org/10926 Tested-by: NetBSD Build System Tested-by: Gluster Build System <jenkins@build.gluster.com> Reviewed-by: Pranith Kumar Karampuri <pkarampu@redhat.com>
Diffstat (limited to 'xlators/cluster/ec/src/ec-common.c')
-rw-r--r--xlators/cluster/ec/src/ec-common.c1509
1 files changed, 819 insertions, 690 deletions
diff --git a/xlators/cluster/ec/src/ec-common.c b/xlators/cluster/ec/src/ec-common.c
index 393d9142797..ba81fc7313f 100644
--- a/xlators/cluster/ec/src/ec-common.c
+++ b/xlators/cluster/ec/src/ec-common.c
@@ -368,24 +368,34 @@ int32_t ec_child_select(ec_fop_data_t * fop)
uintptr_t mask = 0;
int32_t first = 0, num = 0;
+ ec_fop_cleanup(fop);
+
fop->mask &= ec->node_mask;
mask = ec->xl_up;
if (fop->parent == NULL)
{
- if (fop->loc[0].inode != NULL) {
+ if ((fop->flags & EC_FLAG_UPDATE_LOC_PARENT) && fop->loc[0].parent)
+ mask &= ec_inode_good(fop->loc[0].parent, fop->xl);
+
+ if ((fop->flags & EC_FLAG_UPDATE_LOC_INODE) && fop->loc[0].inode) {
mask &= ec_inode_good(fop->loc[0].inode, fop->xl);
}
- if (fop->loc[1].inode != NULL) {
+
+ if ((fop->flags & EC_FLAG_UPDATE_LOC_INODE) && fop->loc[1].inode) {
mask &= ec_inode_good(fop->loc[1].inode, fop->xl);
}
- if (fop->fd != NULL) {
- if (fop->fd->inode != NULL) {
+
+ if (fop->fd) {
+ if ((fop->flags & EC_FLAG_UPDATE_FD_INODE) && fop->fd->inode) {
mask &= ec_inode_good(fop->fd->inode, fop->xl);
}
- mask &= ec_fd_good(fop->fd, fop->xl);
+ if (fop->flags & fop->flags & EC_FLAG_UPDATE_FD) {
+ mask &= ec_fd_good(fop->fd, fop->xl);
+ }
}
}
+
if ((fop->mask & ~mask) != 0)
{
gf_log(fop->xl->name, GF_LOG_WARNING, "Executing operation with "
@@ -420,6 +430,7 @@ int32_t ec_child_select(ec_fop_data_t * fop)
/*Unconditionally wind on healing subvolumes*/
fop->mask |= fop->healing;
fop->remaining = fop->mask;
+ fop->received = 0;
ec_trace("SELECT", fop, "");
@@ -585,7 +596,7 @@ void ec_dispatch_min(ec_fop_data_t * fop)
}
}
-ec_lock_t * ec_lock_allocate(xlator_t * xl, int32_t kind, loc_t * loc)
+ec_lock_t *ec_lock_allocate(xlator_t *xl, loc_t *loc)
{
ec_t * ec = xl->private;
ec_lock_t * lock;
@@ -602,9 +613,9 @@ ec_lock_t * ec_lock_allocate(xlator_t * xl, int32_t kind, loc_t * loc)
lock = mem_get0(ec->lock_pool);
if (lock != NULL)
{
- lock->kind = kind;
lock->good_mask = -1ULL;
INIT_LIST_HEAD(&lock->waiting);
+ INIT_LIST_HEAD(&lock->frozen);
if (ec_loc_from_loc(xl, &lock->loc, loc) != 0)
{
mem_put(lock);
@@ -618,6 +629,9 @@ ec_lock_t * ec_lock_allocate(xlator_t * xl, int32_t kind, loc_t * loc)
void ec_lock_destroy(ec_lock_t * lock)
{
loc_wipe(&lock->loc);
+ if (lock->fd != NULL) {
+ fd_unref(lock->fd);
+ }
mem_put(lock);
}
@@ -627,166 +641,96 @@ int32_t ec_lock_compare(ec_lock_t * lock1, ec_lock_t * lock2)
return gf_uuid_compare(lock1->loc.gfid, lock2->loc.gfid);
}
-ec_lock_link_t *ec_lock_insert(ec_fop_data_t *fop, ec_lock_t *lock,
- int32_t update)
+void ec_lock_insert(ec_fop_data_t *fop, ec_lock_t *lock, uint32_t flags,
+ loc_t *base)
{
- ec_lock_t *new_lock, *tmp;
- ec_lock_link_t *link = NULL;
- int32_t tmp_update;
+ ec_lock_link_t *link;
- new_lock = lock;
+ /* This check is only prepared for up to 2 locks per fop. If more locks
+ * are needed this must be changed. */
if ((fop->lock_count > 0) &&
- (ec_lock_compare(fop->locks[0].lock, new_lock) > 0))
- {
- tmp = fop->locks[0].lock;
- fop->locks[0].lock = new_lock;
- new_lock = tmp;
-
- tmp_update = fop->locks_update;
- fop->locks_update = update;
- update = tmp_update;
- }
- fop->locks[fop->lock_count].lock = new_lock;
- fop->locks[fop->lock_count].fop = fop;
-
- fop->locks_update |= update << fop->lock_count;
-
- fop->lock_count++;
-
- if (lock->timer != NULL) {
- link = lock->timer->data;
- ec_trace("UNLOCK_CANCELLED", link->fop, "lock=%p", lock);
- gf_timer_call_cancel(fop->xl->ctx, lock->timer);
- lock->timer = NULL;
+ (ec_lock_compare(fop->locks[0].lock, lock) < 0)) {
+ fop->first_lock = fop->lock_count;
} else {
- lock->refs++;
- }
-
- return link;
-}
-
-void ec_lock_prepare_entry(ec_fop_data_t *fop, loc_t *loc, int32_t update)
-{
- ec_lock_t * lock = NULL;
- ec_inode_t * ctx = NULL;
- ec_lock_link_t *link = NULL;
- loc_t tmp;
-
- if ((fop->parent != NULL) || (fop->error != 0))
- {
- return;
- }
-
- /* update is only 0 for 'opendir', which needs to lock the entry pointed
- * by loc instead of its parent.
- */
- if (update)
- {
- if (ec_loc_parent(fop->xl, loc, &tmp) != 0) {
- ec_fop_set_error(fop, EIO);
-
- return;
- }
-
- /* If there's another lock, make sure that it's not the same. Otherwise
- * do not insert it.
- *
- * This can only happen on renames where source and target names are
- * in the same directory. */
- if ((fop->lock_count > 0) &&
- (fop->locks[0].lock->loc.inode == tmp.inode)) {
- goto wipe;
+ /* When the first lock is added to the current fop, request lock
+ * counts from locks xlator to be able to determine if there is
+ * contention and release the lock sooner. */
+ if (fop->xdata == NULL) {
+ fop->xdata = dict_new();
+ if (fop->xdata == NULL) {
+ ec_fop_set_error(fop, ENOMEM);
+ return;
+ }
}
- } else {
- if (ec_loc_from_loc(fop->xl, &tmp, loc) != 0) {
- ec_fop_set_error(fop, EIO);
-
+ if (dict_set_str(fop->xdata, GLUSTERFS_INODELK_DOM_COUNT,
+ fop->xl->name) != 0) {
+ ec_fop_set_error(fop, ENOMEM);
return;
}
}
- LOCK(&tmp.inode->lock);
-
- ctx = __ec_inode_get(tmp.inode, fop->xl);
- if (ctx == NULL)
- {
- __ec_fop_set_error(fop, EIO);
-
- goto unlock;
- }
-
- if (ctx->entry_lock != NULL)
- {
- lock = ctx->entry_lock;
- ec_trace("LOCK_ENTRYLK", fop, "lock=%p, inode=%p, path=%s"
- "Lock already acquired",
- lock, tmp.inode, tmp.path);
+ link = &fop->locks[fop->lock_count++];
- goto insert;
- }
+ link->lock = lock;
+ link->fop = fop;
+ link->update[EC_DATA_TXN] = (flags & EC_UPDATE_DATA) != 0;
+ link->update[EC_METADATA_TXN] = (flags & EC_UPDATE_META) != 0;
+ link->base = base;
- lock = ec_lock_allocate(fop->xl, EC_LOCK_ENTRY, &tmp);
- if (lock == NULL)
- {
- __ec_fop_set_error(fop, EIO);
-
- goto unlock;
- }
-
- ec_trace("LOCK_CREATE", fop, "lock=%p", lock);
-
- lock->type = ENTRYLK_WRLCK;
-
- lock->plock = &ctx->entry_lock;
- ctx->entry_lock = lock;
-
-insert:
- link = ec_lock_insert(fop, lock, update);
-
-unlock:
- UNLOCK(&tmp.inode->lock);
-
-wipe:
- loc_wipe(&tmp);
-
- if (link != NULL) {
- ec_resume(link->fop, 0);
- }
+ lock->refs++;
+ lock->inserted++;
}
-void ec_lock_prepare_inode(ec_fop_data_t *fop, loc_t *loc, int32_t update)
+void ec_lock_prepare_inode_internal(ec_fop_data_t *fop, loc_t *loc,
+ uint32_t flags, loc_t *base)
{
- ec_lock_link_t *link = NULL;
- ec_lock_t * lock;
- ec_inode_t * ctx;
+ ec_lock_t *lock = NULL;
+ ec_inode_t *ctx;
- if ((fop->parent != NULL) || (fop->error != 0) || (loc->inode == NULL))
- {
+ if ((fop->parent != NULL) || (fop->error != 0) || (loc->inode == NULL)) {
return;
}
LOCK(&loc->inode->lock);
ctx = __ec_inode_get(loc->inode, fop->xl);
- if (ctx == NULL)
- {
+ if (ctx == NULL) {
__ec_fop_set_error(fop, EIO);
goto unlock;
}
- if (ctx->inode_lock != NULL)
- {
+ if (ctx->inode_lock != NULL) {
lock = ctx->inode_lock;
+
+ /* If there's another lock, make sure that it's not the same. Otherwise
+ * do not insert it.
+ *
+ * This can only happen on renames where source and target names are
+ * in the same directory. */
+ if ((fop->lock_count > 0) && (fop->locks[0].lock == lock)) {
+ /* Combine data/meta updates */
+ fop->locks[0].update[EC_DATA_TXN] |= (flags & EC_UPDATE_DATA) != 0;
+ fop->locks[0].update[EC_METADATA_TXN] |=
+ (flags & EC_UPDATE_META) != 0;
+
+ /* Only one base inode is allowed per fop, so there shouldn't be
+ * overwrites here. */
+ if (base != NULL) {
+ fop->locks[0].base = base;
+ }
+
+ goto update_query;
+ }
+
ec_trace("LOCK_INODELK", fop, "lock=%p, inode=%p. Lock already "
"acquired", lock, loc->inode);
goto insert;
}
- lock = ec_lock_allocate(fop->xl, EC_LOCK_INODE, loc);
- if (lock == NULL)
- {
+ lock = ec_lock_allocate(fop->xl, loc);
+ if (lock == NULL) {
__ec_fop_set_error(fop, EIO);
goto unlock;
@@ -797,154 +741,78 @@ void ec_lock_prepare_inode(ec_fop_data_t *fop, loc_t *loc, int32_t update)
lock->flock.l_type = F_WRLCK;
lock->flock.l_whence = SEEK_SET;
- lock->plock = &ctx->inode_lock;
+ lock->ctx = ctx;
ctx->inode_lock = lock;
insert:
- link = ec_lock_insert(fop, lock, update);
-
+ ec_lock_insert(fop, lock, flags, base);
+update_query:
+ lock->query |= (flags & EC_QUERY_INFO) != 0;
unlock:
UNLOCK(&loc->inode->lock);
+}
- if (link != NULL) {
- ec_resume(link->fop, 0);
- }
+void ec_lock_prepare_inode(ec_fop_data_t *fop, loc_t *loc, uint32_t flags)
+{
+ ec_lock_prepare_inode_internal(fop, loc, flags, NULL);
}
-void ec_lock_prepare_fd(ec_fop_data_t *fop, fd_t *fd, int32_t update)
+void ec_lock_prepare_parent_inode(ec_fop_data_t *fop, loc_t *loc,
+ uint32_t flags)
{
- loc_t loc;
+ loc_t tmp, *base = NULL;
- if ((fop->parent != NULL) || (fop->error != 0))
- {
+ if (fop->error != 0) {
return;
}
- if (ec_loc_from_fd(fop->xl, &loc, fd) == 0)
- {
- ec_lock_prepare_inode(fop, &loc, update);
-
- loc_wipe(&loc);
- }
- else
- {
+ if (ec_loc_parent(fop->xl, loc, &tmp) != 0) {
ec_fop_set_error(fop, EIO);
- }
-}
-
-int32_t ec_locked(call_frame_t * frame, void * cookie, xlator_t * this,
- int32_t op_ret, int32_t op_errno, dict_t * xdata)
-{
- ec_fop_data_t * fop = cookie;
- ec_lock_t * lock = NULL;
-
- if (op_ret >= 0)
- {
- lock = fop->data;
- lock->mask = fop->good;
- lock->acquired = 1;
- fop->parent->mask &= fop->good;
- fop->parent->locked++;
-
- ec_trace("LOCKED", fop->parent, "lock=%p", lock);
-
- ec_lock(fop->parent);
+ return;
}
- else
- {
- gf_log(this->name, GF_LOG_WARNING, "Failed to complete preop lock");
+
+ if ((flags & EC_INODE_SIZE) != 0) {
+ base = loc;
+ flags ^= EC_INODE_SIZE;
}
- return 0;
+ ec_lock_prepare_inode_internal(fop, &tmp, flags, base);
+
+ loc_wipe(&tmp);
}
-void ec_lock(ec_fop_data_t * fop)
+void ec_lock_prepare_fd(ec_fop_data_t *fop, fd_t *fd, uint32_t flags)
{
- ec_lock_t * lock;
-
- while (fop->locked < fop->lock_count)
- {
- lock = fop->locks[fop->locked].lock;
-
- LOCK(&lock->loc.inode->lock);
-
- if (lock->owner != NULL)
- {
- ec_trace("LOCK_WAIT", fop, "lock=%p", lock);
-
- list_add_tail(&fop->locks[fop->locked].wait_list, &lock->waiting);
-
- ec_sleep(fop);
-
- UNLOCK(&lock->loc.inode->lock);
-
- break;
- }
- lock->owner = fop;
-
- UNLOCK(&lock->loc.inode->lock);
-
- if (!lock->acquired)
- {
- ec_owner_set(fop->frame, lock);
-
- if (lock->kind == EC_LOCK_ENTRY)
- {
- ec_trace("LOCK_ACQUIRE", fop, "lock=%p, inode=%p, path=%s",
- lock, lock->loc.inode, lock->loc.path);
-
- ec_entrylk(fop->frame, fop->xl, -1, EC_MINIMUM_ALL, ec_locked,
- lock, fop->xl->name, &lock->loc, NULL,
- ENTRYLK_LOCK, lock->type, NULL);
- }
- else
- {
- ec_trace("LOCK_ACQUIRE", fop, "lock=%p, inode=%p", lock,
- lock->loc.inode);
+ loc_t loc;
- ec_inodelk(fop->frame, fop->xl, -1, EC_MINIMUM_ALL, ec_locked,
- lock, fop->xl->name, &lock->loc, F_SETLKW,
- &lock->flock, NULL);
- }
+ if (fop->error != 0) {
+ return;
+ }
- break;
- }
+ if (ec_loc_from_fd(fop->xl, &loc, fd) != 0) {
+ ec_fop_set_error(fop, EIO);
- ec_trace("LOCK_REUSE", fop, "lock=%p", lock);
+ return;
+ }
- if (lock->have_size)
- {
- fop->pre_size = fop->post_size = lock->size;
- fop->have_size = 1;
- }
- fop->mask &= lock->good_mask;
+ ec_lock_prepare_inode_internal(fop, &loc, flags, NULL);
- fop->locked++;
- }
+ loc_wipe(&loc);
}
gf_boolean_t
-ec_config_check (ec_fop_data_t *fop, dict_t *xdata)
+ec_config_check (ec_fop_data_t *fop, ec_config_t *config)
{
ec_t *ec;
- if (ec_dict_del_config(xdata, EC_XATTR_CONFIG, &fop->config) < 0) {
- gf_log(fop->xl->name, GF_LOG_ERROR, "Failed to get a valid "
- "config");
-
- ec_fop_set_error(fop, EIO);
-
- return _gf_false;
- }
-
ec = fop->xl->private;
- if ((fop->config.version != EC_CONFIG_VERSION) ||
- (fop->config.algorithm != EC_CONFIG_ALGORITHM) ||
- (fop->config.gf_word_size != EC_GF_BITS) ||
- (fop->config.bricks != ec->nodes) ||
- (fop->config.redundancy != ec->redundancy) ||
- (fop->config.chunk_size != EC_METHOD_CHUNK_SIZE)) {
+ if ((config->version != EC_CONFIG_VERSION) ||
+ (config->algorithm != EC_CONFIG_ALGORITHM) ||
+ (config->gf_word_size != EC_GF_BITS) ||
+ (config->bricks != ec->nodes) ||
+ (config->redundancy != ec->redundancy) ||
+ (config->chunk_size != EC_METHOD_CHUNK_SIZE)) {
uint32_t data_bricks;
/* This combination of version/algorithm requires the following
@@ -957,271 +825,201 @@ ec_config_check (ec_fop_data_t *fop, dict_t *xdata)
chunk_size (in bits) must be a multiple of gf_word_size *
(bricks - redundancy) */
- data_bricks = fop->config.bricks - fop->config.redundancy;
- if ((fop->config.redundancy < 1) ||
- (fop->config.redundancy * 2 >= fop->config.bricks) ||
- !ec_is_power_of_2(fop->config.gf_word_size) ||
- ((fop->config.chunk_size * 8) % (fop->config.gf_word_size *
- data_bricks) != 0)) {
+ data_bricks = config->bricks - config->redundancy;
+ if ((config->redundancy < 1) ||
+ (config->redundancy * 2 >= config->bricks) ||
+ !ec_is_power_of_2(config->gf_word_size) ||
+ ((config->chunk_size * 8) % (config->gf_word_size * data_bricks)
+ != 0)) {
gf_log(fop->xl->name, GF_LOG_ERROR, "Invalid or corrupted config");
} else {
gf_log(fop->xl->name, GF_LOG_ERROR, "Unsupported config "
"(V=%u, A=%u, W=%u, "
"N=%u, R=%u, S=%u)",
- fop->config.version, fop->config.algorithm,
- fop->config.gf_word_size, fop->config.bricks,
- fop->config.redundancy, fop->config.chunk_size);
+ config->version, config->algorithm,
+ config->gf_word_size, config->bricks,
+ config->redundancy, config->chunk_size);
}
- ec_fop_set_error(fop, EIO);
-
return _gf_false;
}
return _gf_true;
}
-int32_t ec_get_size_version_set(call_frame_t * frame, void * cookie,
- xlator_t * this, int32_t op_ret,
- int32_t op_errno, inode_t * inode,
- struct iatt * buf, dict_t * xdata,
- struct iatt * postparent)
+int32_t
+ec_prepare_update_cbk (call_frame_t *frame, void *cookie,
+ xlator_t *this, int32_t op_ret, int32_t op_errno,
+ dict_t *dict, dict_t *xdata)
{
- ec_fop_data_t * fop = cookie;
- ec_inode_t * ctx;
+ ec_fop_data_t *fop = cookie, *parent;
+ ec_lock_link_t *link = fop->data;
ec_lock_t *lock = NULL;
+ ec_inode_t *ctx;
- if (op_ret >= 0)
- {
- if ((buf->ia_type == IA_IFREG) && !ec_config_check(fop, xdata)) {
- return 0;
- }
+ lock = link->lock;
+ parent = link->fop;
+ ctx = lock->ctx;
- LOCK(&inode->lock);
+ if (op_ret < 0) {
+ gf_log(this->name, GF_LOG_WARNING,
+ "Failed to get size and version (error %d: %s)", op_errno,
+ strerror (op_errno));
- ctx = __ec_inode_get(inode, this);
- if (ctx != NULL) {
- if (ctx->inode_lock != NULL) {
- lock = ctx->inode_lock;
- lock->version[0] = fop->answer->version[0];
- lock->version[1] = fop->answer->version[1];
+ goto out;
+ }
- if (buf->ia_type == IA_IFREG) {
- lock->have_size = 1;
- lock->size = buf->ia_size;
- }
- }
- if (ctx->entry_lock != NULL) {
- lock = ctx->entry_lock;
- lock->version[0] = fop->answer->version[0];
- lock->version[1] = fop->answer->version[1];
- }
- }
+ op_errno = EIO;
- UNLOCK(&inode->lock);
+ LOCK(&lock->loc.inode->lock);
- if (lock != NULL)
- {
- // Only update parent mask if the lookup has been made with
- // inode locked.
- fop->parent->mask &= fop->good;
- }
+ if (ec_dict_del_array(dict, EC_XATTR_VERSION, ctx->pre_version,
+ EC_VERSION_SIZE) != 0) {
+ gf_log(this->name, GF_LOG_ERROR, "Unable to get version xattr");
- if (buf->ia_type == IA_IFREG) {
- fop->parent->pre_size = fop->parent->post_size = buf->ia_size;
- fop->parent->have_size = 1;
- }
- }
- else
- {
- gf_log(this->name, GF_LOG_WARNING, "Failed to get size and version "
- "(error %d)", op_errno);
- ec_fop_set_error(fop, op_errno);
+ goto unlock;
}
+ ctx->post_version[0] += ctx->pre_version[0];
+ ctx->post_version[1] += ctx->pre_version[1];
- return 0;
-}
+ ctx->have_version = _gf_true;
-gf_boolean_t
-ec_is_data_fop (glusterfs_fop_t fop)
-{
- switch (fop) {
- case GF_FOP_WRITE:
- case GF_FOP_TRUNCATE:
- case GF_FOP_FTRUNCATE:
- case GF_FOP_FALLOCATE:
- case GF_FOP_DISCARD:
- case GF_FOP_ZEROFILL:
- return _gf_true;
- default:
- return _gf_false;
- }
- return _gf_false;
-}
+ if (ec_dict_del_array(dict, EC_XATTR_DIRTY, ctx->pre_dirty,
+ EC_VERSION_SIZE) == 0) {
+ ctx->post_dirty[0] += ctx->pre_dirty[0];
+ ctx->post_dirty[1] += ctx->pre_dirty[1];
-gf_boolean_t
-ec_is_metadata_fop (glusterfs_fop_t fop)
-{
- switch (fop) {
- case GF_FOP_SETATTR:
- case GF_FOP_FSETATTR:
- case GF_FOP_SETXATTR:
- case GF_FOP_FSETXATTR:
- case GF_FOP_REMOVEXATTR:
- case GF_FOP_FREMOVEXATTR:
- return _gf_true;
- default:
- return _gf_false;
- }
- return _gf_false;
-}
+ ctx->have_dirty = _gf_true;
+ }
-int32_t
-ec_prepare_update_cbk (call_frame_t *frame, void *cookie,
- xlator_t *this, int32_t op_ret, int32_t op_errno,
- dict_t *dict, dict_t *xdata)
-{
- ec_fop_data_t *fop = cookie, *parent;
- ec_lock_t *lock = NULL;
- uint64_t size = 0;
- uint64_t version[EC_VERSION_SIZE] = {0, 0};
+ if (lock->loc.inode->ia_type == IA_IFREG) {
+ if (ec_dict_del_number(dict, EC_XATTR_SIZE, &ctx->pre_size) != 0) {
+ gf_log(this->name, GF_LOG_ERROR, "Unable to get size xattr");
- if (op_ret >= 0) {
- parent = fop->parent;
- while ((parent != NULL) && (parent->locks[0].lock == NULL)) {
- parent = parent->parent;
- }
- if (parent == NULL) {
- return 0;
+ goto unlock;
}
+ ctx->post_size = ctx->pre_size;
- lock = parent->locks[0].lock;
- if (ec_is_metadata_fop (fop->parent->id))
- lock->is_dirty[EC_METADATA_TXN] = _gf_true;
- else
- lock->is_dirty[EC_DATA_TXN] = _gf_true;
-
- if (lock->loc.inode->ia_type == IA_IFREG) {
- if (!ec_config_check(fop, dict) ||
- (ec_dict_del_number(dict, EC_XATTR_SIZE, &size) != 0)) {
- ec_fop_set_error(fop, EIO);
- return 0;
- }
- }
+ ctx->have_size = _gf_true;
- if (ec_dict_del_array(dict, EC_XATTR_VERSION, version,
- EC_VERSION_SIZE) != 0) {
- ec_fop_set_error(fop, EIO);
- return 0;
+ if ((ec_dict_del_config(dict, EC_XATTR_CONFIG, &ctx->config) != 0) ||
+ !ec_config_check(parent, &ctx->config)) {
+ gf_log(this->name, GF_LOG_ERROR, "Unable to get config xattr");
+
+ goto unlock;
}
- LOCK(&lock->loc.inode->lock);
+ ctx->have_config = _gf_true;
+ }
- if (lock->loc.inode->ia_type == IA_IFREG) {
- lock->size = size;
- fop->parent->pre_size = fop->parent->post_size = size;
- fop->parent->have_size = lock->have_size = 1;
- }
- lock->version[0] = version[0];
- lock->version[1] = version[1];
+ ctx->have_info = _gf_true;
- UNLOCK(&lock->loc.inode->lock);
+ op_errno = 0;
+
+unlock:
+ UNLOCK(&lock->loc.inode->lock);
+out:
+ if (op_errno == 0) {
+ parent->mask &= fop->good;
- fop->parent->mask &= fop->good;
/*As of now only data healing marks bricks as healing*/
- if (ec_is_data_fop (fop->parent->id))
- fop->parent->healing |= fop->healing;
+ if (ec_is_data_fop (parent->id)) {
+ parent->healing |= fop->healing;
+ }
} else {
- gf_log(this->name, GF_LOG_WARNING,
- "Failed to get size and version (error %d: %s)", op_errno,
- strerror (op_errno));
- ec_fop_set_error(fop, op_errno);
+ ec_fop_set_error(parent, op_errno);
}
return 0;
}
-void ec_get_size_version(ec_fop_data_t * fop)
+void ec_get_size_version(ec_lock_link_t *link)
{
loc_t loc;
- dict_t * xdata;
+ ec_lock_t *lock;
+ ec_inode_t *ctx;
+ ec_fop_data_t *fop;
+ dict_t *dict = NULL;
uid_t uid;
gid_t gid;
int32_t error = ENOMEM;
uint64_t allzero[EC_VERSION_SIZE] = {0, 0};
- if (fop->have_size)
- {
+ lock = link->lock;
+ ctx = lock->ctx;
+
+ /* If ec metadata has already been retrieved, do not try again. */
+ if (ctx->have_info) {
return;
}
- if ((fop->parent != NULL) && fop->parent->have_size)
- {
- fop->pre_size = fop->parent->pre_size;
- fop->post_size = fop->parent->post_size;
-
- fop->have_size = 1;
-
+ /* Determine if there's something we need to retrieve for the current
+ * operation. */
+ if (!lock->query && (lock->loc.inode->ia_type != IA_IFREG)) {
return;
}
+
+ fop = link->fop;
+
uid = fop->frame->root->uid;
gid = fop->frame->root->gid;
- fop->frame->root->uid = 0;
- fop->frame->root->gid = 0;
-
memset(&loc, 0, sizeof(loc));
- xdata = dict_new();
- if (xdata == NULL)
- {
+ dict = dict_new();
+ if (dict == NULL) {
goto out;
}
- if ((ec_dict_set_array(xdata, EC_XATTR_VERSION,
- allzero, EC_VERSION_SIZE) != 0) ||
- (ec_dict_set_number(xdata, EC_XATTR_SIZE, 0) != 0) ||
- (ec_dict_set_number(xdata, EC_XATTR_CONFIG, 0) != 0) ||
- (ec_dict_set_array(xdata, EC_XATTR_DIRTY, allzero,
- EC_VERSION_SIZE) != 0))
- {
+
+ /* Once we know that an xattrop will be needed, we try to get all available
+ * information in a single call. */
+ if ((ec_dict_set_array(dict, EC_XATTR_VERSION, allzero,
+ EC_VERSION_SIZE) != 0) ||
+ (ec_dict_set_array(dict, EC_XATTR_DIRTY, allzero,
+ EC_VERSION_SIZE) != 0)) {
goto out;
}
- error = EIO;
+ if (lock->loc.inode->ia_type == IA_IFREG) {
+ if ((ec_dict_set_number(dict, EC_XATTR_SIZE, 0) != 0) ||
+ (ec_dict_set_number(dict, EC_XATTR_CONFIG, 0) != 0)) {
+ goto out;
+ }
+ }
- if (!fop->use_fd)
- {
- if (ec_loc_from_loc(fop->xl, &loc, &fop->loc[0]) != 0)
- {
+ fop->frame->root->uid = 0;
+ fop->frame->root->gid = 0;
+
+ /* For normal fops, ec_[f]xattrop() must succeed on at least
+ * EC_MINIMUM_MIN bricks, however when this is called as part of a
+ * self-heal operation the mask of target bricks (fop->mask) could
+ * contain less than EC_MINIMUM_MIN bricks, causing the lookup to
+ * always fail. Thus we always use the same minimum used for the main
+ * fop.
+ */
+ if (lock->fd == NULL) {
+ if (ec_loc_from_loc(fop->xl, &loc, &lock->loc) != 0) {
goto out;
}
- if (gf_uuid_is_null(loc.pargfid))
- {
- if (loc.parent != NULL)
- {
+ if (gf_uuid_is_null(loc.pargfid)) {
+ if (loc.parent != NULL) {
inode_unref(loc.parent);
loc.parent = NULL;
}
GF_FREE((char *)loc.path);
- loc.path = NULL;
- loc.name = NULL;
+ loc.path = NULL;
+ loc.name = NULL;
}
- /* For normal fops, ec_lookup() must succeed on at least EC_MINIMUM_MIN
- * bricks, however when this is called as part of a self-heal operation
- * the mask of target bricks (fop->mask) could contain less than
- * EC_MINIMUM_MIN bricks, causing the lookup to always fail. Thus we
- * always use the same minimum used for the main fop.
- */
- ec_lookup(fop->frame, fop->xl, fop->mask, fop->minimum,
- ec_get_size_version_set, NULL, &loc, xdata);
+
+ ec_xattrop (fop->frame, fop->xl, fop->mask, fop->minimum,
+ ec_prepare_update_cbk, link, &loc,
+ GF_XATTROP_ADD_ARRAY64, dict, NULL);
} else {
- if (ec_loc_from_fd(fop->xl, &loc, fop->fd) != 0) {
- goto out;
- }
ec_fxattrop(fop->frame, fop->xl, fop->mask, fop->minimum,
- ec_prepare_update_cbk, NULL, fop->fd,
- GF_XATTROP_ADD_ARRAY64, xdata, NULL);
+ ec_prepare_update_cbk, link, lock->fd,
+ GF_XATTROP_ADD_ARRAY64, dict, NULL);
}
+
error = 0;
out:
@@ -1230,9 +1028,8 @@ out:
loc_wipe(&loc);
- if (xdata != NULL)
- {
- dict_unref(xdata);
+ if (dict != NULL) {
+ dict_unref(dict);
}
if (error != 0) {
@@ -1240,147 +1037,408 @@ out:
}
}
-void ec_prepare_update(ec_fop_data_t *fop)
+gf_boolean_t ec_get_inode_size(ec_fop_data_t *fop, inode_t *inode,
+ uint64_t *size)
{
- loc_t loc;
- dict_t *xdata;
- ec_fop_data_t *tmp;
- ec_lock_t *lock;
- ec_t *ec;
- uid_t uid;
- gid_t gid;
- uint64_t version[2] = {0, 0};
- uint64_t dirty[2] = {0, 0};
- int32_t error = ENOMEM;
+ ec_inode_t *ctx;
+ gf_boolean_t found = _gf_false;
- tmp = fop;
- while ((tmp != NULL) && (tmp->locks[0].lock == NULL)) {
- tmp = tmp->parent;
- }
- if ((tmp != NULL) &&
- (tmp->locks[0].lock->is_dirty[0] || tmp->locks[0].lock->is_dirty[1])) {
- lock = tmp->locks[0].lock;
+ LOCK(&inode->lock);
- fop->pre_size = fop->post_size = lock->size;
- fop->have_size = 1;
+ ctx = __ec_inode_get(inode, fop->xl);
+ if (ctx == NULL) {
+ goto unlock;
+ }
- return;
+ if (ctx->have_size) {
+ *size = ctx->post_size;
+ found = _gf_true;
}
- uid = fop->frame->root->uid;
- gid = fop->frame->root->gid;
- fop->frame->root->uid = 0;
- fop->frame->root->gid = 0;
+unlock:
+ UNLOCK(&inode->lock);
- memset(&loc, 0, sizeof(loc));
+ return found;
+}
- ec = fop->xl->private;
- if (ec_bits_count (fop->mask) >= ec->fragments) {
- /* It is changing data only if the update happens on at least
- * fragment number of bricks. Otherwise it probably is healing*/
- if (ec_is_metadata_fop (fop->id))
- dirty[EC_METADATA_TXN] = 1;
- else
- dirty[EC_DATA_TXN] = 1;
+gf_boolean_t ec_set_inode_size(ec_fop_data_t *fop, inode_t *inode,
+ uint64_t size)
+{
+ ec_inode_t *ctx;
+ gf_boolean_t found = _gf_false;
+
+ LOCK(&inode->lock);
+
+ ctx = __ec_inode_get(inode, fop->xl);
+ if (ctx == NULL) {
+ goto unlock;
}
- xdata = dict_new();
- if (xdata == NULL) {
- goto out;
+ /* Normal fops always have ctx->have_size set. However self-heal calls this
+ * to prepare the inode, so ctx->have_size will be false. In this case we
+ * prepare both pre_size and post_size, and set have_size and have_info to
+ * true. */
+ if (!ctx->have_size) {
+ ctx->pre_size = size;
+ ctx->have_size = ctx->have_info = _gf_true;
}
- if ((ec_dict_set_array(xdata, EC_XATTR_VERSION,
- version, EC_VERSION_SIZE) != 0) ||
- (ec_dict_set_number(xdata, EC_XATTR_SIZE, 0) != 0) ||
- (ec_dict_set_number(xdata, EC_XATTR_CONFIG, 0) != 0) ||
- (ec_dict_set_array(xdata, EC_XATTR_DIRTY, dirty,
- EC_VERSION_SIZE) != 0)) {
- goto out;
+ ctx->post_size = size;
+
+ found = _gf_true;
+
+unlock:
+ UNLOCK(&inode->lock);
+
+ return found;
+}
+
+void ec_clear_inode_info(ec_fop_data_t *fop, inode_t *inode)
+{
+ ec_inode_t *ctx;
+
+ LOCK(&inode->lock);
+
+ ctx = __ec_inode_get(inode, fop->xl);
+ if (ctx == NULL) {
+ goto unlock;
}
- error = EIO;
+ ctx->have_info = _gf_false;
+ ctx->have_config = _gf_false;
+ ctx->have_version = _gf_false;
+ ctx->have_size = _gf_false;
+ ctx->have_dirty = _gf_false;
- if (!fop->use_fd) {
- if (ec_loc_from_loc(fop->xl, &loc, &fop->loc[0]) != 0) {
- goto out;
- }
+ memset(&ctx->config, 0, sizeof(ctx->config));
+ memset(ctx->pre_version, 0, sizeof(ctx->pre_version));
+ memset(ctx->post_version, 0, sizeof(ctx->post_version));
+ ctx->pre_size = ctx->post_size = 0;
+ memset(ctx->pre_dirty, 0, sizeof(ctx->pre_dirty));
+ memset(ctx->post_dirty, 0, sizeof(ctx->post_dirty));
+
+unlock:
+ UNLOCK(&inode->lock);
+}
- ec_xattrop(fop->frame, fop->xl, fop->mask, fop->minimum,
- ec_prepare_update_cbk, NULL, &loc, GF_XATTROP_ADD_ARRAY64,
- xdata, NULL);
+int32_t ec_get_real_size_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno, inode_t *inode,
+ struct iatt *buf, dict_t *xdata,
+ struct iatt *postparent)
+{
+ ec_fop_data_t *fop = cookie;
+ ec_lock_link_t *link;
+
+ if (op_ret >= 0) {
+ link = fop->data;
+ if (ec_dict_del_number(xdata, EC_XATTR_SIZE, &link->size) != 0) {
+ gf_log(this->name, GF_LOG_WARNING,
+ "Unable to determine real file size");
+ }
} else {
- ec_fxattrop(fop->frame, fop->xl, fop->mask, fop->minimum,
- ec_prepare_update_cbk, NULL, fop->fd,
- GF_XATTROP_ADD_ARRAY64, xdata, NULL);
+ /* Prevent failure of parent fop. */
+ fop->error = 0;
}
- error = 0;
+ return 0;
+}
-out:
+/* This function is used to get the trusted.ec.size xattr from a file when
+ * no lock is needed on the inode. This is only required to maintan iatt
+ * structs on fops that manipulate directory entries but do not operate
+ * directly on the inode, like link, rename, ...
+ *
+ * Any error processing this request is ignored. In the worst case, an invalid
+ * or not up to date value in the iatt could cause some cache invalidation.
+ */
+void ec_get_real_size(ec_lock_link_t *link)
+{
+ ec_fop_data_t *fop;
+ dict_t *xdata;
- fop->frame->root->uid = uid;
- fop->frame->root->gid = gid;
+ if (link->base == NULL) {
+ return;
+ }
- loc_wipe(&loc);
+ if (link->base->inode->ia_type != IA_IFREG) {
+ return;
+ }
+ fop = link->fop;
+
+ if (ec_get_inode_size(fop, link->base->inode, &link->size)) {
+ return;
+ }
+
+ xdata = dict_new();
+ if (xdata == NULL) {
+ return;
+ }
+ if (ec_dict_set_number(xdata, EC_XATTR_SIZE, 0) != 0) {
+ goto out;
+ }
+
+ /* Send a simple lookup. A single answer is considered ok since this value
+ * is only used to return an iatt struct related to an inode that is not
+ * locked and have not suffered any operation. */
+ ec_lookup(fop->frame, fop->xl, fop->mask, 1, ec_get_real_size_cbk, link,
+ link->base, xdata);
+
+out:
if (xdata != NULL) {
dict_unref(xdata);
}
+}
- if (error != 0) {
- ec_fop_set_error(fop, error);
+void ec_lock_acquired(ec_lock_link_t *link)
+{
+ ec_lock_t *lock;
+ ec_fop_data_t *fop;
+
+ lock = link->lock;
+ fop = link->fop;
+
+ ec_trace("LOCKED", link->fop, "lock=%p", lock);
+
+ /* If the fop has an fd available, attach it to the lock structure to be
+ * able to do fxattrop calls instead of xattrop. It's safe to change this
+ * here because no xattrop using the fd can start concurrently at this
+ * point. */
+ if (fop->use_fd) {
+ if (lock->fd != NULL) {
+ fd_unref(lock->fd);
+ }
+ lock->fd = fd_ref(fop->fd);
}
+ lock->acquired = _gf_true;
+
+ fop->mask &= lock->good_mask;
+
+ fop->locked++;
+
+ ec_get_size_version(link);
+ ec_get_real_size(link);
}
-int32_t ec_unlocked(call_frame_t *frame, void *cookie, xlator_t *this,
- int32_t op_ret, int32_t op_errno, dict_t *xdata)
+int32_t ec_locked(call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno, dict_t *xdata)
{
ec_fop_data_t *fop = cookie;
+ ec_lock_link_t *link = NULL;
+ ec_lock_t *lock = NULL;
- if (op_ret < 0) {
- gf_log(this->name, GF_LOG_WARNING, "entry/inode unlocking failed (%s)",
- ec_fop_name(fop->parent->id));
+ if (op_ret >= 0) {
+ link = fop->data;
+ lock = link->lock;
+ lock->mask = lock->good_mask = fop->good;
+
+ ec_lock_acquired(link);
+ ec_lock(fop->parent);
} else {
- ec_trace("UNLOCKED", fop->parent, "lock=%p", fop->data);
+ gf_log(this->name, GF_LOG_WARNING, "Failed to complete preop lock");
}
return 0;
}
-void ec_unlock_lock(ec_fop_data_t *fop, ec_lock_t *lock)
+gf_boolean_t ec_lock_acquire(ec_lock_link_t *link)
{
- if ((lock->mask != 0) && lock->acquired) {
+ ec_lock_t *lock;
+ ec_fop_data_t *fop;
+
+ lock = link->lock;
+ fop = link->fop;
+ if (!lock->acquired) {
ec_owner_set(fop->frame, lock);
- switch (lock->kind) {
- case EC_LOCK_ENTRY:
- ec_trace("UNLOCK_ENTRYLK", fop, "lock=%p, inode=%p, path=%s", lock,
- lock->loc.inode, lock->loc.path);
+ ec_trace("LOCK_ACQUIRE", fop, "lock=%p, inode=%p", lock,
+ lock->loc.inode);
+
+ lock->flock.l_type = F_WRLCK;
+ ec_inodelk(fop->frame, fop->xl, -1, EC_MINIMUM_ALL, ec_locked,
+ link, fop->xl->name, &lock->loc, F_SETLKW, &lock->flock,
+ NULL);
+
+ return _gf_false;
+ }
+
+ ec_trace("LOCK_REUSE", fop, "lock=%p", lock);
+
+ ec_lock_acquired(link);
+
+ return _gf_true;
+}
+
+void ec_lock(ec_fop_data_t *fop)
+{
+ ec_lock_link_t *link;
+ ec_lock_link_t *timer_link = NULL;
+ ec_lock_t *lock;
+
+ /* There is a chance that ec_resume is called on fop even before ec_sleep.
+ * Which can result in refs == 0 for fop leading to use after free in this
+ * function when it calls ec_sleep so do ec_sleep at start and end of this
+ * function.*/
+ ec_sleep (fop);
+ while (fop->locked < fop->lock_count) {
+ /* Since there are only up to 2 locks per fop, this xor will change
+ * the order of the locks if fop->first_lock is 1. */
+ link = &fop->locks[fop->locked ^ fop->first_lock];
+ lock = link->lock;
+
+ timer_link = NULL;
+
+ LOCK(&lock->loc.inode->lock);
+ GF_ASSERT (lock->inserted > 0);
+ lock->inserted--;
+
+ if (lock->timer != NULL) {
+ GF_ASSERT (lock->release == _gf_false);
+ timer_link = lock->timer->data;
+ ec_trace("UNLOCK_CANCELLED", timer_link->fop, "lock=%p", lock);
+ gf_timer_call_cancel(fop->xl->ctx, lock->timer);
+ lock->timer = NULL;
+
+ lock->refs--;
+ /* There should remain at least 1 ref, the current one. */
+ GF_ASSERT(lock->refs > 0);
+ }
+
+ GF_ASSERT(list_empty(&link->wait_list));
+
+ if ((lock->owner != NULL) || lock->release) {
+ if (lock->release) {
+ ec_trace("LOCK_QUEUE_FREEZE", fop, "lock=%p", lock);
- ec_entrylk(fop->frame, fop->xl, lock->mask, EC_MINIMUM_ALL,
- ec_unlocked, lock, fop->xl->name, &lock->loc, NULL,
- ENTRYLK_UNLOCK, lock->type, NULL);
+ list_add_tail(&link->wait_list, &lock->frozen);
+
+ /* The lock is frozen, so we move the current reference to
+ * refs_frozen. After that, there should remain at least one
+ * ref belonging to the lock that is processing the release. */
+ lock->refs--;
+ GF_ASSERT(lock->refs > 0);
+ lock->refs_frozen++;
+ } else {
+ ec_trace("LOCK_QUEUE_WAIT", fop, "lock=%p", lock);
+
+ list_add_tail(&link->wait_list, &lock->waiting);
+ }
+
+ UNLOCK(&lock->loc.inode->lock);
+
+ ec_sleep(fop);
break;
+ }
- case EC_LOCK_INODE:
- lock->flock.l_type = F_UNLCK;
- ec_trace("UNLOCK_INODELK", fop, "lock=%p, inode=%p", lock,
- lock->loc.inode);
+ lock->owner = fop;
- ec_inodelk(fop->frame, fop->xl, lock->mask, EC_MINIMUM_ALL,
- ec_unlocked, lock, fop->xl->name, &lock->loc, F_SETLK,
- &lock->flock, NULL);
+ UNLOCK(&lock->loc.inode->lock);
+ if (!ec_lock_acquire(link)) {
break;
+ }
- default:
- gf_log(fop->xl->name, GF_LOG_ERROR, "Invalid lock type");
+ if (timer_link != NULL) {
+ ec_resume(timer_link->fop, 0);
+ timer_link = NULL;
}
}
+ ec_resume (fop, 0);
+
+ if (timer_link != NULL) {
+ ec_resume(timer_link->fop, 0);
+ }
+}
+
+void
+ec_lock_unfreeze(ec_lock_link_t *link)
+{
+ ec_lock_t *lock;
- ec_trace("LOCK_DESTROY", fop, "lock=%p", lock);
+ lock = link->lock;
+
+ LOCK(&lock->loc.inode->lock);
+
+ lock->acquired = _gf_false;
+ lock->release = _gf_false;
+
+ lock->refs--;
+ GF_ASSERT (lock->refs == lock->inserted);
+
+ GF_ASSERT(list_empty(&lock->waiting) && (lock->owner == NULL));
+
+ list_splice_init(&lock->frozen, &lock->waiting);
+ lock->refs += lock->refs_frozen;
+ lock->refs_frozen = 0;
+
+ if (!list_empty(&lock->waiting)) {
+ link = list_entry(lock->waiting.next, ec_lock_link_t, wait_list);
+ list_del_init(&link->wait_list);
+
+ lock->owner = link->fop;
+
+ UNLOCK(&lock->loc.inode->lock);
+
+ ec_trace("LOCK_UNFREEZE", link->fop, "lock=%p", lock);
+
+ if (ec_lock_acquire(link)) {
+ ec_lock(link->fop);
+ }
+ ec_resume(link->fop, 0);
+ } else if (lock->refs == 0) {
+ ec_trace("LOCK_DESTROY", link->fop, "lock=%p", lock);
+
+ lock->ctx->inode_lock = NULL;
+
+ UNLOCK(&lock->loc.inode->lock);
- ec_lock_destroy(lock);
+ ec_lock_destroy(lock);
+ } else {
+ UNLOCK(&lock->loc.inode->lock);
+ }
+}
+
+int32_t ec_unlocked(call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno, dict_t *xdata)
+{
+ ec_fop_data_t *fop = cookie;
+ ec_lock_link_t *link = fop->data;
+
+ if (op_ret < 0) {
+ gf_log(this->name, GF_LOG_WARNING, "entry/inode unlocking failed (%s)",
+ ec_fop_name(link->fop->id));
+ } else {
+ ec_trace("UNLOCKED", link->fop, "lock=%p", link->lock);
+ }
+
+ ec_lock_unfreeze(link);
+
+ return 0;
+}
+
+void ec_unlock_lock(ec_lock_link_t *link)
+{
+ ec_lock_t *lock;
+ ec_fop_data_t *fop;
+
+ lock = link->lock;
+ fop = link->fop;
+
+ ec_clear_inode_info(fop, lock->loc.inode);
+
+ if ((lock->mask != 0) && lock->acquired) {
+ ec_owner_set(fop->frame, lock);
+
+ lock->flock.l_type = F_UNLCK;
+ ec_trace("UNLOCK_INODELK", fop, "lock=%p, inode=%p", lock,
+ lock->loc.inode);
+
+ ec_inodelk(fop->frame, fop->xl, lock->mask, EC_MINIMUM_ALL,
+ ec_unlocked, link, fop->xl->name, &lock->loc, F_SETLK,
+ &lock->flock, NULL);
+ } else {
+ ec_lock_unfreeze(link);
+ }
}
int32_t ec_update_size_version_done(call_frame_t * frame, void * cookie,
@@ -1388,111 +1446,128 @@ int32_t ec_update_size_version_done(call_frame_t * frame, void * cookie,
int32_t op_errno, dict_t * xattr,
dict_t * xdata)
{
- ec_fop_data_t * fop = cookie;
+ ec_fop_data_t *fop = cookie;
+ ec_lock_link_t *link;
+ ec_lock_t *lock;
+ ec_inode_t *ctx;
- if (op_ret < 0)
- {
+ if (op_ret < 0) {
gf_log(fop->xl->name, GF_LOG_ERROR, "Failed to update version and "
"size (error %d)", op_errno);
- }
- else
- {
+ } else {
fop->parent->mask &= fop->good;
+ link = fop->data;
+ lock = link->lock;
+ ctx = lock->ctx;
+
+ if (ec_dict_del_array(xattr, EC_XATTR_VERSION, ctx->post_version,
+ EC_VERSION_SIZE) == 0) {
+ ctx->pre_version[0] = ctx->post_version[0];
+ ctx->pre_version[1] = ctx->post_version[1];
+
+ ctx->have_version = _gf_true;
+ }
+ if (ec_dict_del_number(xattr, EC_XATTR_SIZE, &ctx->post_size) == 0) {
+ ctx->pre_size = ctx->post_size;
+
+ ctx->have_size = _gf_true;
+ }
+ if (ec_dict_del_array(xattr, EC_XATTR_DIRTY, ctx->post_dirty,
+ EC_VERSION_SIZE) == 0) {
+ ctx->pre_dirty[0] = ctx->post_dirty[0];
+ ctx->pre_dirty[1] = ctx->post_dirty[1];
+
+ ctx->have_dirty = _gf_true;
+ }
+ if ((ec_dict_del_config(xdata, EC_XATTR_CONFIG, &ctx->config) == 0) &&
+ ec_config_check(fop->parent, &ctx->config)) {
+ ctx->have_config = _gf_true;
+ }
+
+ ctx->have_info = _gf_true;
}
- if (fop->data != NULL) {
- ec_unlock_lock(fop->parent, fop->data);
+ if ((fop->parent->id != GF_FOP_FLUSH) &&
+ (fop->parent->id != GF_FOP_FSYNC) &&
+ (fop->parent->id != GF_FOP_FSYNCDIR)) {
+ ec_unlock_lock(fop->data);
}
return 0;
}
-uint64_t
-ec_get_dirty_value (ec_t *ec, uintptr_t fop_mask, uint64_t version_delta,
- gf_boolean_t dirty)
-{
- uint64_t dirty_val = 0;
-
- if (version_delta) {
- if (~fop_mask & ec->node_mask) {
- /* fop didn't succeed on all subvols so 'dirty' xattr
- * shouldn't be cleared */
- if (!dirty)
- dirty_val = 1;
- } else {
- /* fop succeed on all subvols so 'dirty' xattr
- * should be cleared */
- if (dirty)
- dirty_val = -1;
- }
- }
- return dirty_val;
-}
-
void
-ec_update_size_version(ec_fop_data_t *fop, loc_t *loc, uint64_t version[2],
- uint64_t size, gf_boolean_t dirty[2], ec_lock_t *lock)
+ec_update_size_version(ec_lock_link_t *link, uint64_t *version,
+ uint64_t size, uint64_t *dirty)
{
- ec_t *ec = fop->xl->private;
+ ec_fop_data_t *fop;
+ ec_lock_t *lock;
+ ec_inode_t *ctx;
dict_t * dict;
uid_t uid;
gid_t gid;
- uint64_t dirty_values[2] = {0};
- int i = 0;
- if (fop->parent != NULL)
- {
- fop->parent->post_size = fop->post_size;
+ fop = link->fop;
- return;
- }
-
- ec_trace("UPDATE", fop, "version=%ld, size=%ld, dirty=%u", version, size,
- dirty);
+ ec_trace("UPDATE", fop, "version=%ld/%ld, size=%ld, dirty=%ld/%ld",
+ version[0], version[1], size, dirty[0], dirty[1]);
dict = dict_new();
- if (dict == NULL)
- {
+ if (dict == NULL) {
goto out;
}
- if (version[0] != 0 || version[1] != 0) {
+ lock = link->lock;
+ ctx = lock->ctx;
+
+ /* If we don't have version information or it has been modified, we
+ * update it. */
+ if (!ctx->have_version || (version[0] != 0) || (version[1] != 0)) {
if (ec_dict_set_array(dict, EC_XATTR_VERSION,
version, EC_VERSION_SIZE) != 0) {
goto out;
}
}
+
if (size != 0) {
+ /* If size has been changed, we should already know the previous size
+ * of the file. */
+ GF_ASSERT(ctx->have_size);
+
if (ec_dict_set_number(dict, EC_XATTR_SIZE, size) != 0) {
goto out;
}
}
- for (i = 0; i < sizeof (dirty_values)/sizeof (dirty_values[0]); i++) {
- dirty_values[i] = ec_get_dirty_value (ec, fop->mask, version[i],
- dirty[i]);
- }
-
- if (dirty_values[0] || dirty_values[1]) {
- if (ec_dict_set_array(dict, EC_XATTR_DIRTY, dirty_values,
+ /* If we don't have dirty information or it has been modified, we update
+ * it. */
+ if (!ctx->have_dirty || (dirty[0] != 0) || (dirty[1] != 0)) {
+ if (ec_dict_set_array(dict, EC_XATTR_DIRTY, dirty,
EC_VERSION_SIZE) != 0) {
goto out;
}
}
+ /* If config information is not know, we request it now. */
+ if ((lock->loc.inode->ia_type == IA_IFREG) && !ctx->have_config) {
+ /* A failure requesting this xattr is ignored because it's not
+ * absolutely required right now. */
+ ec_dict_set_number(dict, EC_XATTR_CONFIG, 0);
+ }
+
uid = fop->frame->root->uid;
gid = fop->frame->root->gid;
fop->frame->root->uid = 0;
fop->frame->root->gid = 0;
- if (fop->use_fd) {
- ec_fxattrop(fop->frame, fop->xl, fop->mask, EC_MINIMUM_MIN,
- ec_update_size_version_done, lock, fop->fd,
+ if (link->lock->fd == NULL) {
+ ec_xattrop(fop->frame, fop->xl, fop->mask, EC_MINIMUM_MIN,
+ ec_update_size_version_done, link, &link->lock->loc,
GF_XATTROP_ADD_ARRAY64, dict, NULL);
} else {
- ec_xattrop(fop->frame, fop->xl, fop->mask, EC_MINIMUM_MIN,
- ec_update_size_version_done, lock, loc,
+ ec_fxattrop(fop->frame, fop->xl, fop->mask, EC_MINIMUM_MIN,
+ ec_update_size_version_done, link, link->lock->fd,
GF_XATTROP_ADD_ARRAY64, dict, NULL);
}
@@ -1504,8 +1579,7 @@ ec_update_size_version(ec_fop_data_t *fop, loc_t *loc, uint64_t version[2],
return;
out:
- if (dict != NULL)
- {
+ if (dict != NULL) {
dict_unref(dict);
}
@@ -1514,46 +1588,99 @@ out:
gf_log(fop->xl->name, GF_LOG_ERROR, "Unable to update version and size");
}
-void ec_unlock_now(ec_fop_data_t *fop, ec_lock_t *lock)
+gf_boolean_t
+ec_update_info(ec_lock_link_t *link)
{
- ec_trace("UNLOCK_NOW", fop, "lock=%p", lock);
+ ec_lock_t *lock;
+ ec_inode_t *ctx;
+ uint64_t version[2];
+ uint64_t dirty[2];
+ uint64_t size;
- if ((lock->version_delta[0] != 0) || (lock->version_delta[1] != 0) ||
- lock->is_dirty[0] || lock->is_dirty[1]) {
- ec_update_size_version(fop, &lock->loc, lock->version_delta,
- lock->size_delta, lock->is_dirty, lock);
- } else {
- ec_unlock_lock(fop, lock);
+ lock = link->lock;
+ ctx = lock->ctx;
+
+ /* pre_version[*] will be 0 if have_version is false */
+ version[0] = ctx->post_version[0] - ctx->pre_version[0];
+ version[1] = ctx->post_version[1] - ctx->pre_version[1];
+
+ size = ctx->post_size - ctx->pre_size;
+
+ /* pre_dirty[*] will be 0 if have_dirty is false */
+ dirty[0] = ctx->post_dirty[0] - ctx->pre_dirty[0];
+ dirty[1] = ctx->post_dirty[1] - ctx->pre_dirty[1];
+
+ if ((version[0] != 0) || (version[1] != 0) ||
+ (dirty[0] != 0) || (dirty[1] != 0)) {
+ ec_update_size_version(link, version, size, dirty);
+
+ return _gf_true;
}
- ec_resume(fop, 0);
+ return _gf_false;
}
-void ec_unlock_timer_cbk(void *data)
+void
+ec_unlock_now(ec_lock_link_t *link)
{
- ec_lock_link_t *link = data;
- ec_lock_t *lock = link->lock;
- ec_fop_data_t *fop = NULL;
+ ec_trace("UNLOCK_NOW", link->fop, "lock=%p", link->lock);
- LOCK(&lock->loc.inode->lock);
+ if (!ec_update_info(link)) {
+ ec_unlock_lock(link);
+ }
- if (lock->timer != NULL) {
- fop = link->fop;
+ ec_resume(link->fop, 0);
+}
- ec_trace("UNLOCK_DELAYED", fop, "lock=%p", lock);
+void
+ec_unlock_timer_del(ec_lock_link_t *link)
+{
+ int32_t before = 0;
+ ec_lock_t *lock;
+ inode_t *inode;
+ gf_boolean_t now = _gf_false;
+
+ lock = link->lock;
+
+ /* A race condition can happen if timer expires, calls this function
+ * and the lock is released (lock->loc is wiped) but the fop is not
+ * fully completed yet (it's still on the list of pending fops). In
+ * this case, this function can also be called if ec_unlock_force() is
+ * called. */
+ inode = lock->loc.inode;
+ if (inode == NULL) {
+ return;
+ }
- GF_ASSERT(lock->refs == 1);
+ LOCK(&inode->lock);
- gf_timer_call_cancel(fop->xl->ctx, lock->timer);
- lock->timer = NULL;
- *lock->plock = NULL;
- }
+ if (lock->timer != NULL) {
+ ec_trace("UNLOCK_DELAYED", link->fop, "lock=%p", lock);
- UNLOCK(&lock->loc.inode->lock);
+ gf_timer_call_cancel(link->fop->xl->ctx, lock->timer);
+ lock->timer = NULL;
- if (fop != NULL) {
- ec_unlock_now(fop, lock);
- }
+ lock->release = now = _gf_true;
+
+ before = lock->refs + lock->refs_frozen;
+ list_splice_init(&lock->waiting, &lock->frozen);
+ lock->refs_frozen += lock->refs - lock->inserted - 1;
+ lock->refs = 1 + lock->inserted;
+ /* We moved around the locks, so total number of locks shouldn't
+ * change by this operation*/
+ GF_ASSERT (before == (lock->refs + lock->refs_frozen));
+ }
+
+ UNLOCK(&inode->lock);
+
+ if (now) {
+ ec_unlock_now(link);
+ }
+}
+
+void ec_unlock_timer_cbk(void *data)
+{
+ ec_unlock_timer_del(data);
}
void ec_unlock_timer_add(ec_lock_link_t *link)
@@ -1561,28 +1688,28 @@ void ec_unlock_timer_add(ec_lock_link_t *link)
struct timespec delay;
ec_fop_data_t *fop = link->fop;
ec_lock_t *lock = link->lock;
- int32_t refs = 1;
+ gf_boolean_t now = _gf_false;
LOCK(&lock->loc.inode->lock);
GF_ASSERT(lock->timer == NULL);
- if (lock->refs != 1) {
+ if ((lock->refs - lock->inserted) > 1) {
ec_trace("UNLOCK_SKIP", fop, "lock=%p", lock);
lock->refs--;
UNLOCK(&lock->loc.inode->lock);
} else if (lock->acquired) {
- delay.tv_sec = 1;
- delay.tv_nsec = 0;
+ ec_t *ec = fop->xl->private;
ec_sleep(fop);
- /* If healing is needed, do not delay lock release to let self-heal
- * start working as soon as possible. */
- if (!ec_fop_needs_heal(fop)) {
- ec_trace("UNLOCK_DELAY", fop, "lock=%p", lock);
+ /* If healing is needed, the lock needs to be released due to
+ * contention, or ec is shutting down, do not delay lock release. */
+ if (!lock->release && !ec_fop_needs_heal(fop) && !ec->shutdown) {
+ ec_trace("UNLOCK_DELAY", fop, "lock=%p, release=%d", lock,
+ lock->release);
delay.tv_sec = 1;
delay.tv_nsec = 0;
@@ -1592,26 +1719,25 @@ void ec_unlock_timer_add(ec_lock_link_t *link)
gf_log(fop->xl->name, GF_LOG_WARNING, "Unable to delay an "
"unlock");
- *lock->plock = NULL;
- refs = 0;
+ lock->release = now = _gf_true;
}
} else {
- ec_trace("UNLOCK_FORCE", fop, "lock=%p", lock);
- *lock->plock = NULL;
- refs = 0;
+ ec_trace("UNLOCK_FORCE", fop, "lock=%p, release=%d", lock,
+ lock->release);
+ lock->release = now = _gf_true;
}
UNLOCK(&lock->loc.inode->lock);
- if (refs == 0) {
- ec_unlock_now(fop, lock);
+ if (now) {
+ ec_unlock_now(link);
}
} else {
- *lock->plock = NULL;
+ lock->release = _gf_true;
UNLOCK(&lock->loc.inode->lock);
- ec_lock_destroy(lock);
+ ec_lock_unfreeze(link);
}
}
@@ -1624,52 +1750,60 @@ void ec_unlock(ec_fop_data_t *fop)
}
}
-void ec_flush_size_version(ec_fop_data_t * fop)
+void
+ec_unlock_force(ec_fop_data_t *fop)
{
- ec_lock_t * lock;
- uint64_t version[2], delta;
- gf_boolean_t dirty[2] = {_gf_false, _gf_false};
+ int32_t i;
- GF_ASSERT(fop->lock_count == 1);
+ for (i = 0; i < fop->lock_count; i++) {
+ ec_trace("UNLOCK_FORCED", fop, "lock=%p", &fop->locks[i]);
- lock = fop->locks[0].lock;
-
- LOCK(&lock->loc.inode->lock);
-
- GF_ASSERT(lock->owner == fop);
-
- version[0] = lock->version_delta[0];
- version[1] = lock->version_delta[1];
- dirty[0] = lock->is_dirty[0];
- dirty[1] = lock->is_dirty[1];
- delta = lock->size_delta;
- lock->version_delta[0] = 0;
- lock->version_delta[1] = 0;
- lock->size_delta = 0;
- lock->is_dirty[0] = _gf_false;
- lock->is_dirty[1] = _gf_false;
+ ec_unlock_timer_del(&fop->locks[i]);
+ }
+}
- UNLOCK(&lock->loc.inode->lock);
+void ec_flush_size_version(ec_fop_data_t *fop)
+{
+ GF_ASSERT(fop->lock_count == 1);
- if (version[0] > 0 || version[1] > 0 || dirty[0] || dirty[1])
- {
- ec_update_size_version(fop, &lock->loc, version, delta, dirty,
- NULL);
- }
+ ec_update_info(&fop->locks[0]);
}
void ec_lock_reuse(ec_fop_data_t *fop)
{
- ec_fop_data_t * wait_fop;
- ec_lock_t * lock;
- ec_lock_link_t * link;
- int32_t i;
+ ec_t *ec;
+ ec_cbk_data_t *cbk;
+ ec_lock_t *lock;
+ ec_lock_link_t *link;
+ ec_inode_t *ctx;
+ int32_t i, count;
+ gf_boolean_t release = _gf_false;
+
+ cbk = fop->answer;
+ if (cbk != NULL) {
+ if (cbk->xdata != NULL) {
+ if ((dict_get_int32(cbk->xdata, GLUSTERFS_INODELK_COUNT,
+ &count) == 0) && (count > 1)) {
+ release = _gf_true;
+ }
+ if (release) {
+ gf_log(fop->xl->name, GF_LOG_DEBUG,
+ "Lock contention detected");
+ }
+ }
+ } else {
+ /* If we haven't get an answer with enough quorum, we always release
+ * the lock. */
+ release = _gf_true;
+ }
+
+ ec = fop->xl->private;
for (i = 0; i < fop->lock_count; i++)
{
- wait_fop = NULL;
-
- lock = fop->locks[i].lock;
+ link = &fop->locks[i];
+ lock = link->lock;
+ ctx = lock->ctx;
LOCK(&lock->loc.inode->lock);
@@ -1677,47 +1811,42 @@ void ec_lock_reuse(ec_fop_data_t *fop)
GF_ASSERT(lock->owner == fop);
lock->owner = NULL;
+ lock->release |= release;
- if (((fop->locks_update >> i) & 1) != 0) {
- if (fop->error == 0)
- {
- if (ec_is_metadata_fop (fop->id)) {
- lock->version_delta[1]++;
- } else {
- lock->version_delta[0]++;
- }
- lock->size_delta += fop->post_size - fop->pre_size;
- if (fop->have_size) {
- lock->size = fop->post_size;
- lock->have_size = 1;
+ if ((fop->error == 0) && (cbk != NULL) && (cbk->op_ret >= 0)) {
+ if (link->update[0]) {
+ ctx->post_version[0]++;
+ if (ec->node_mask & ~fop->mask) {
+ ctx->post_dirty[0]++;
+ }
+ }
+ if (link->update[1]) {
+ ctx->post_version[1]++;
+ if (ec->node_mask & ~fop->mask) {
+ ctx->post_dirty[1]++;
}
}
}
lock->good_mask &= fop->mask;
+ link = NULL;
if (!list_empty(&lock->waiting))
{
link = list_entry(lock->waiting.next, ec_lock_link_t, wait_list);
list_del_init(&link->wait_list);
- wait_fop = link->fop;
-
- if (lock->kind == EC_LOCK_INODE)
- {
- wait_fop->pre_size = wait_fop->post_size = fop->post_size;
- wait_fop->have_size = fop->have_size;
- }
- wait_fop->mask &= fop->mask;
+ lock->owner = link->fop;
}
UNLOCK(&lock->loc.inode->lock);
- if (wait_fop != NULL)
+ if (link != NULL)
{
- ec_lock(wait_fop);
-
- ec_resume(wait_fop, 0);
+ if (ec_lock_acquire(link)) {
+ ec_lock(link->fop);
+ }
+ ec_resume(link->fop, 0);
}
}
}