summaryrefslogtreecommitdiffstats
path: root/xlators/cluster/afr/src/afr-inode-write.c
diff options
context:
space:
mode:
authorPranith Kumar K <pranithk@gluster.com>2011-08-20 15:48:27 +0530
committerVijay Bellur <vijay@gluster.com>2011-08-20 06:42:55 -0700
commit1af420c700fbc49b65cf7faceb3270e81cd991ce (patch)
treeee0dcfe62b4965191424b3121a4dd126e81260b8 /xlators/cluster/afr/src/afr-inode-write.c
parent2ebacdfdd3c39bf2d3139cb7d811356758a2350a (diff)
cluster/afr: Perform self-heal without locking the whole file
Change-Id: I206571c77f2d7b3c9f9d7bb82a936366fd99ce5c BUG: 3182 Reviewed-on: http://review.gluster.com/141 Tested-by: Gluster Build System <jenkins@build.gluster.com> Reviewed-by: Vijay Bellur <vijay@gluster.com>
Diffstat (limited to 'xlators/cluster/afr/src/afr-inode-write.c')
-rw-r--r--xlators/cluster/afr/src/afr-inode-write.c302
1 files changed, 253 insertions, 49 deletions
diff --git a/xlators/cluster/afr/src/afr-inode-write.c b/xlators/cluster/afr/src/afr-inode-write.c
index c292b7493f6..faf3db40041 100644
--- a/xlators/cluster/afr/src/afr-inode-write.c
+++ b/xlators/cluster/afr/src/afr-inode-write.c
@@ -46,6 +46,7 @@
#include "afr.h"
#include "afr-transaction.h"
+#include "afr-self-heal-common.h"
/* {{{ writev */
@@ -125,19 +126,21 @@ afr_writev_wind_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
return 0;
}
-
int
afr_writev_wind (call_frame_t *frame, xlator_t *this)
{
afr_local_t *local = NULL;
afr_private_t *priv = NULL;
+ afr_internal_lock_t *int_lock = NULL;
int i = 0;
int call_count = -1;
local = frame->local;
priv = this->private;
+ int_lock = &local->internal_lock;
- call_count = afr_up_children_count (priv->child_count, local->child_up);
+ call_count = afr_locked_children_count (int_lock->inode_locked_nodes,
+ priv->child_count);
if (call_count == 0) {
local->transaction.resume (frame, this);
@@ -147,7 +150,7 @@ afr_writev_wind (call_frame_t *frame, xlator_t *this)
local->call_count = call_count;
for (i = 0; i < priv->child_count; i++) {
- if (local->child_up[i]) {
+ if (local->child_up[i] && int_lock->inode_locked_nodes[i]) {
STACK_WIND_COOKIE (frame, afr_writev_wind_cbk,
(void *) (long) i,
priv->children[i],
@@ -188,10 +191,10 @@ afr_writev_done (call_frame_t *frame, xlator_t *this)
int
afr_do_writev (call_frame_t *frame, xlator_t *this)
{
- call_frame_t * transaction_frame = NULL;
- afr_local_t * local = NULL;
- int op_ret = -1;
- int op_errno = 0;
+ call_frame_t *transaction_frame = NULL;
+ afr_local_t *local = NULL;
+ int op_ret = -1;
+ int op_errno = 0;
local = frame->local;
@@ -235,6 +238,202 @@ out:
return 0;
}
+static int
+afr_prepare_loc (call_frame_t *frame, fd_t *fd)
+{
+ afr_local_t *local = NULL;
+ char *name = NULL;
+ char *path = NULL;
+ int ret = 0;
+
+ if ((!fd) || (!fd->inode))
+ return -1;
+
+ local = frame->local;
+ ret = inode_path (fd->inode, NULL, (char **)&path);
+ if (ret <= 0) {
+ gf_log (frame->this->name, GF_LOG_DEBUG,
+ "Unable to get path for gfid: %s",
+ uuid_utoa (fd->inode->gfid));
+ return -1;
+ }
+
+ if (local->loc.path) {
+ if (strcmp (path, local->loc.path))
+ gf_log (frame->this->name, GF_LOG_DEBUG,
+ "overwriting old loc->path %s with %s",
+ local->loc.path, path);
+ GF_FREE ((char *)local->loc.path);
+ }
+ local->loc.path = path;
+
+ name = strrchr (local->loc.path, '/');
+ if (name)
+ name++;
+ local->loc.name = name;
+
+ if (local->loc.inode) {
+ inode_unref (local->loc.inode);
+ }
+ local->loc.inode = inode_ref (fd->inode);
+
+ if (local->loc.parent) {
+ inode_unref (local->loc.parent);
+ }
+
+ local->loc.parent = inode_parent (local->loc.inode, 0, NULL);
+
+ return 0;
+}
+
+afr_fd_paused_call_t*
+afr_paused_call_create (call_frame_t *frame)
+{
+ afr_local_t *local = NULL;
+ afr_fd_paused_call_t *paused_call = NULL;
+
+ local = frame->local;
+ GF_ASSERT (local->fop_call_continue);
+
+ paused_call = GF_CALLOC (1, sizeof (*paused_call),
+ gf_afr_fd_paused_call_t);
+ if (paused_call) {
+ INIT_LIST_HEAD (&paused_call->call_list);
+ paused_call->frame = frame;
+ }
+
+ return paused_call;
+}
+
+static int
+afr_pause_fd_fop (call_frame_t *frame, xlator_t *this, afr_fd_ctx_t *fd_ctx)
+{
+ afr_fd_paused_call_t *paused_call = NULL;
+ int ret = 0;
+
+ paused_call = afr_paused_call_create (frame);
+ if (paused_call)
+ list_add (&paused_call->call_list, &fd_ctx->paused_calls);
+ else
+ ret = -ENOMEM;
+
+ return ret;
+}
+
+static void
+afr_trigger_open_fd_self_heal (call_frame_t *frame, xlator_t *this)
+{
+ afr_local_t *local = NULL;
+ afr_self_heal_t *sh = NULL;
+ char sh_type_str[256] = {0};
+
+ local = frame->local;
+ sh = &local->self_heal;
+
+ sh->need_missing_entry_self_heal = _gf_true;
+ sh->need_gfid_self_heal = _gf_true;
+ sh->need_data_self_heal = _gf_true;
+ afr_self_heal_type_str_get(&local->self_heal, sh_type_str,
+ sizeof(sh_type_str));
+ gf_log (this->name, GF_LOG_INFO, "%s self-heal triggered. "
+ "path: %s, reason: Replicate up down flush, data lock "
+ "is held", sh_type_str, local->loc.path);
+
+ afr_launch_self_heal (frame, this, local->fd->inode, _gf_true,
+ local->fd->inode->ia_type, NULL, NULL);
+}
+
+int
+afr_open_fd_fix (call_frame_t *frame, xlator_t *this, gf_boolean_t pause_fop)
+{
+ int ret = 0;
+ int i = 0;
+ afr_fd_ctx_t *fd_ctx = NULL;
+ gf_boolean_t need_self_heal = _gf_false;
+ int *need_open = NULL;
+ int need_open_count = 0;
+ afr_local_t *local = NULL;
+ afr_private_t *priv = NULL;
+ gf_boolean_t fop_continue = _gf_true;
+ gf_boolean_t queue_fop = _gf_false;
+
+ local = frame->local;
+ priv = this->private;
+
+ GF_ASSERT (local->fd);
+ if (pause_fop)
+ GF_ASSERT (local->fop_call_continue);
+
+ ret = afr_prepare_loc (frame, local->fd);
+ if (ret < 0) {
+ //File does not exist we cant open it.
+ ret = 0;
+ goto out;
+ }
+
+ fd_ctx = afr_fd_ctx_get (local->fd, this);
+ if (!fd_ctx) {
+ ret = -EINVAL;
+ goto unlock;
+ }
+
+ LOCK (&local->fd->lock);
+ {
+ if (fd_ctx->up_count < priv->up_count) {
+ need_self_heal = _gf_true;
+ fd_ctx->up_count = priv->up_count;
+ fd_ctx->down_count = priv->down_count;
+ }
+ for (i = 0; i < priv->child_count; i++) {
+ if ((fd_ctx->opened_on[i] == AFR_FD_NOT_OPENED) &&
+ local->child_up[i]) {
+ fd_ctx->opened_on[i] = AFR_FD_OPENING;
+ if (!need_open)
+ need_open = GF_CALLOC (priv->child_count,
+ sizeof (*need_open),
+ gf_afr_mt_int32_t);
+ need_open[i] = 1;
+ need_open_count++;
+ } else if (pause_fop && local->child_up[i] &&
+ (fd_ctx->opened_on[i] == AFR_FD_OPENING)) {
+ queue_fop = _gf_true;
+ }
+ }
+
+ if (queue_fop) {
+ GF_ASSERT (pause_fop);
+ gf_log (this->name, GF_LOG_INFO, "Pause fd %p",
+ local->fd);
+ ret = afr_pause_fd_fop (frame, this, fd_ctx);
+ if (ret)
+ goto unlock;
+ }
+ }
+unlock:
+ UNLOCK (&local->fd->lock);
+ if (ret) {
+ gf_log (this->name, GF_LOG_ERROR, "Failed to fix fd for %s",
+ local->loc.path);
+ fop_continue = _gf_false;
+ goto out;
+ }
+
+ if (need_self_heal)
+ afr_trigger_open_fd_self_heal (frame, this);
+
+ if (!need_open_count)
+ goto out;
+
+ gf_log (this->name, GF_LOG_INFO, "Opening fd %p", local->fd);
+ afr_fix_open (frame, this, fd_ctx, need_open_count, need_open);
+ fop_continue = _gf_false;
+out:
+ if (need_open)
+ GF_FREE (need_open);
+ if (fop_continue && local->fop_call_continue)
+ local->fop_call_continue (frame, this);
+ return ret;
+}
int
afr_writev (call_frame_t *frame, xlator_t *this, fd_t *fd,
@@ -246,8 +445,6 @@ afr_writev (call_frame_t *frame, xlator_t *this, fd_t *fd,
int ret = -1;
int op_ret = -1;
int op_errno = 0;
- uint64_t ctx = 0;
- afr_fd_ctx_t *fd_ctx = NULL;
VALIDATE_OR_GOTO (frame, out);
VALIDATE_OR_GOTO (this, out);
@@ -272,21 +469,14 @@ afr_writev (call_frame_t *frame, xlator_t *this, fd_t *fd,
local->cont.writev.iobref = iobref_ref (iobref);
local->fd = fd_ref (fd);
+ local->fop_call_continue = afr_do_writev;
- ret = fd_ctx_get (fd, this, &ctx);
- if (ret < 0) {
+ ret = afr_open_fd_fix (frame, this, _gf_true);
+ if (ret) {
+ op_errno = -ret;
goto out;
}
- fd_ctx = (afr_fd_ctx_t *)(long) ctx;
-
- if (fd_ctx->up_count < priv->up_count) {
- local->openfd_flush_cbk = afr_do_writev;
- afr_openfd_flush (frame, this, fd);
- } else {
- afr_do_writev (frame, this);
- }
-
op_ret = 0;
out:
if (op_ret == -1) {
@@ -395,13 +585,16 @@ afr_truncate_wind (call_frame_t *frame, xlator_t *this)
{
afr_local_t *local = NULL;
afr_private_t *priv = NULL;
+ afr_internal_lock_t *int_lock = NULL;
int call_count = -1;
int i = 0;
local = frame->local;
priv = this->private;
+ int_lock = &local->internal_lock;
- call_count = afr_up_children_count (priv->child_count, local->child_up);
+ call_count = afr_locked_children_count (int_lock->inode_locked_nodes,
+ priv->child_count);
if (call_count == 0) {
local->transaction.resume (frame, this);
@@ -411,7 +604,7 @@ afr_truncate_wind (call_frame_t *frame, xlator_t *this)
local->call_count = call_count;
for (i = 0; i < priv->child_count; i++) {
- if (local->child_up[i]) {
+ if (local->child_up[i] && int_lock->inode_locked_nodes[i]) {
STACK_WIND_COOKIE (frame, afr_truncate_wind_cbk,
(void *) (long) i,
priv->children[i],
@@ -602,13 +795,16 @@ afr_ftruncate_wind (call_frame_t *frame, xlator_t *this)
{
afr_local_t *local = NULL;
afr_private_t *priv = NULL;
+ afr_internal_lock_t *int_lock = NULL;
int call_count = -1;
int i = 0;
local = frame->local;
priv = this->private;
+ int_lock = &local->internal_lock;
- call_count = afr_up_children_count (priv->child_count, local->child_up);
+ call_count = afr_locked_children_count (int_lock->inode_locked_nodes,
+ priv->child_count);
if (call_count == 0) {
local->transaction.resume (frame, this);
@@ -618,7 +814,7 @@ afr_ftruncate_wind (call_frame_t *frame, xlator_t *this)
local->call_count = call_count;
for (i = 0; i < priv->child_count; i++) {
- if (local->child_up[i]) {
+ if (local->child_up[i] && int_lock->inode_locked_nodes[i]) {
STACK_WIND_COOKIE (frame, afr_ftruncate_wind_cbk,
(void *) (long) i,
priv->children[i],
@@ -702,8 +898,6 @@ afr_ftruncate (call_frame_t *frame, xlator_t *this,
int ret = -1;
int op_ret = -1;
int op_errno = 0;
- uint64_t ctx = 0;
- afr_fd_ctx_t *fd_ctx = NULL;
VALIDATE_OR_GOTO (frame, out);
VALIDATE_OR_GOTO (this, out);
@@ -725,21 +919,14 @@ afr_ftruncate (call_frame_t *frame, xlator_t *this,
local->cont.ftruncate.ino = fd->inode->ino;
local->fd = fd_ref (fd);
+ local->fop_call_continue = afr_do_ftruncate;
- ret = fd_ctx_get (fd, this, &ctx);
- if (ret < 0) {
+ ret = afr_open_fd_fix (frame, this, _gf_true);
+ if (ret) {
+ op_errno = -ret;
goto out;
}
- fd_ctx = (afr_fd_ctx_t *)(long) ctx;
-
- if (fd_ctx->up_count < priv->up_count) {
- local->openfd_flush_cbk = afr_do_ftruncate;
- afr_openfd_flush (frame, this, fd);
- } else {
- afr_do_ftruncate (frame, this);
- }
-
op_ret = 0;
out:
if (op_ret == -1) {
@@ -849,13 +1036,16 @@ afr_setattr_wind (call_frame_t *frame, xlator_t *this)
{
afr_local_t *local = NULL;
afr_private_t *priv = NULL;
+ afr_internal_lock_t *int_lock = NULL;
int call_count = -1;
int i = 0;
local = frame->local;
priv = this->private;
+ int_lock = &local->internal_lock;
- call_count = afr_up_children_count (priv->child_count, local->child_up);
+ call_count = afr_locked_children_count (int_lock->inode_locked_nodes,
+ priv->child_count);
if (call_count == 0) {
local->transaction.resume (frame, this);
@@ -865,7 +1055,7 @@ afr_setattr_wind (call_frame_t *frame, xlator_t *this)
local->call_count = call_count;
for (i = 0; i < priv->child_count; i++) {
- if (local->child_up[i]) {
+ if (local->child_up[i] && int_lock->inode_locked_nodes[i]) {
STACK_WIND_COOKIE (frame, afr_setattr_wind_cbk,
(void *) (long) i,
priv->children[i],
@@ -1056,13 +1246,16 @@ afr_fsetattr_wind (call_frame_t *frame, xlator_t *this)
{
afr_local_t *local = NULL;
afr_private_t *priv = NULL;
+ afr_internal_lock_t *int_lock = NULL;
int call_count = -1;
int i = 0;
local = frame->local;
priv = this->private;
+ int_lock = &local->internal_lock;
- call_count = afr_up_children_count (priv->child_count, local->child_up);
+ call_count = afr_locked_children_count (int_lock->inode_locked_nodes,
+ priv->child_count);
if (call_count == 0) {
local->transaction.resume (frame, this);
@@ -1072,7 +1265,7 @@ afr_fsetattr_wind (call_frame_t *frame, xlator_t *this)
local->call_count = call_count;
for (i = 0; i < priv->child_count; i++) {
- if (local->child_up[i]) {
+ if (local->child_up[i] && int_lock->inode_locked_nodes[i]) {
STACK_WIND_COOKIE (frame, afr_fsetattr_wind_cbk,
(void *) (long) i,
priv->children[i],
@@ -1104,7 +1297,6 @@ afr_fsetattr_done (call_frame_t *frame, xlator_t *this)
return 0;
}
-
int
afr_fsetattr (call_frame_t *frame, xlator_t *this,
fd_t *fd, struct iatt *buf, int32_t valid)
@@ -1124,10 +1316,12 @@ afr_fsetattr (call_frame_t *frame, xlator_t *this,
transaction_frame = copy_frame (frame);
if (!transaction_frame) {
+ op_errno = ENOMEM;
goto out;
}
ALLOC_OR_GOTO (local, afr_local_t, out);
+ transaction_frame->local = local;
ret = AFR_LOCAL_INIT (local, priv);
if (ret < 0) {
@@ -1135,12 +1329,9 @@ afr_fsetattr (call_frame_t *frame, xlator_t *this,
goto out;
}
- transaction_frame->local = local;
-
local->op_ret = -1;
local->cont.fsetattr.ino = fd->inode->ino;
-
local->cont.fsetattr.in_buf = *buf;
local->cont.fsetattr.valid = valid;
@@ -1150,6 +1341,13 @@ afr_fsetattr (call_frame_t *frame, xlator_t *this,
local->fd = fd_ref (fd);
+ op_ret = afr_open_fd_fix (frame, this, _gf_false);
+ if (ret) {
+ op_errno = -op_ret;
+ op_ret = -1;
+ goto out;
+ }
+
local->transaction.main_frame = frame;
local->transaction.start = LLONG_MAX - 1;
local->transaction.len = 0;
@@ -1242,13 +1440,16 @@ afr_setxattr_wind (call_frame_t *frame, xlator_t *this)
{
afr_local_t *local = NULL;
afr_private_t *priv = NULL;
+ afr_internal_lock_t *int_lock = NULL;
int call_count = -1;
int i = 0;
local = frame->local;
priv = this->private;
+ int_lock = &local->internal_lock;
- call_count = afr_up_children_count (priv->child_count, local->child_up);
+ call_count = afr_locked_children_count (int_lock->inode_locked_nodes,
+ priv->child_count);
if (call_count == 0) {
local->transaction.resume (frame, this);
@@ -1258,7 +1459,7 @@ afr_setxattr_wind (call_frame_t *frame, xlator_t *this)
local->call_count = call_count;
for (i = 0; i < priv->child_count; i++) {
- if (local->child_up[i]) {
+ if (local->child_up[i] && int_lock->inode_locked_nodes[i]) {
STACK_WIND_COOKIE (frame, afr_setxattr_wind_cbk,
(void *) (long) i,
priv->children[i],
@@ -1424,13 +1625,16 @@ afr_removexattr_wind (call_frame_t *frame, xlator_t *this)
{
afr_local_t *local = NULL;
afr_private_t *priv = NULL;
+ afr_internal_lock_t *int_lock = NULL;
int call_count = -1;
int i = 0;
local = frame->local;
priv = this->private;
+ int_lock = &local->internal_lock;
- call_count = afr_up_children_count (priv->child_count, local->child_up);
+ call_count = afr_locked_children_count (int_lock->inode_locked_nodes,
+ priv->child_count);
if (call_count == 0) {
local->transaction.resume (frame, this);
@@ -1440,7 +1644,7 @@ afr_removexattr_wind (call_frame_t *frame, xlator_t *this)
local->call_count = call_count;
for (i = 0; i < priv->child_count; i++) {
- if (local->child_up[i]) {
+ if (local->child_up[i] && int_lock->inode_locked_nodes[i]) {
STACK_WIND_COOKIE (frame, afr_removexattr_wind_cbk,
(void *) (long) i,
priv->children[i],