summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--xlators/cluster/dht/src/dht-common.h2
-rw-r--r--xlators/cluster/dht/src/dht-inode-read.c103
-rw-r--r--xlators/features/locks/src/posix.c27
3 files changed, 125 insertions, 7 deletions
diff --git a/xlators/cluster/dht/src/dht-common.h b/xlators/cluster/dht/src/dht-common.h
index 0ef5c81a608..b2e9df68996 100644
--- a/xlators/cluster/dht/src/dht-common.h
+++ b/xlators/cluster/dht/src/dht-common.h
@@ -128,6 +128,8 @@ struct dht_rebalance_ {
dict_t *xdata;
dict_t *xattr;
int32_t set;
+ struct gf_flock flock;
+ int lock_cmd;
};
/**
diff --git a/xlators/cluster/dht/src/dht-inode-read.c b/xlators/cluster/dht/src/dht-inode-read.c
index 72c81c400af..16a82cd5b98 100644
--- a/xlators/cluster/dht/src/dht-inode-read.c
+++ b/xlators/cluster/dht/src/dht-inode-read.c
@@ -692,8 +692,9 @@ int
dht_flush_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
int op_ret, int op_errno, dict_t *xdata)
{
- dht_local_t *local = NULL;
- xlator_t *subvol = 0;
+ dht_local_t *local = NULL;
+ xlator_t *subvol = 0;
+ int ret = 0;
local = frame->local;
@@ -702,6 +703,14 @@ dht_flush_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
if (local->call_cnt != 1)
goto out;
+ local->rebalance.target_op_fn = dht_flush2;
+
+ local->op_ret = op_ret;
+ local->op_errno = op_errno;
+
+ if (xdata)
+ local->rebalance.xdata = dict_ref (xdata);
+
/* If context is set, then send flush() it to the destination */
dht_inode_ctx_get_mig_info (this, local->fd->inode, NULL, &subvol);
if (subvol && dht_fd_open_on_dst (this, local->fd, subvol)) {
@@ -709,6 +718,13 @@ dht_flush_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
return 0;
}
+ if (op_errno == EREMOTE) {
+ ret = dht_rebalance_complete_check (this, frame);
+ if (!ret) {
+ return 0;
+ }
+ }
+
out:
DHT_STACK_UNWIND (flush, frame, op_ret, op_errno, xdata);
@@ -734,7 +750,8 @@ dht_flush2 (xlator_t *this, xlator_t *subvol, call_frame_t *frame, int ret)
local->call_cnt = 2; /* This is the second attempt */
STACK_WIND (frame, dht_flush_cbk,
- subvol, subvol->fops->flush, local->fd, NULL);
+ subvol, subvol->fops->flush, local->fd,
+ local->rebalance.xdata);
return 0;
@@ -942,24 +959,92 @@ int
dht_lk_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
int op_ret, int op_errno, struct gf_flock *flock, dict_t *xdata)
{
+ dht_local_t *local = NULL;
+ int ret = -1;
+ xlator_t *subvol = NULL;
+
+ local = frame->local;
+
+ if (!local) {
+ op_ret = -1;
+ op_errno = EINVAL;
+ goto out;
+ }
+
+ if (local->call_cnt != 1)
+ goto out;
+
+ local->rebalance.target_op_fn = dht_lk2;
+
+ local->op_ret = op_ret;
+ local->op_errno = op_errno;
+
+ if (xdata)
+ local->rebalance.xdata = dict_ref (xdata);
+
+ if (op_errno == EREMOTE) {
+ dht_inode_ctx_get_mig_info (this, local->fd->inode,
+ NULL, &subvol);
+ if (subvol && dht_fd_open_on_dst (this, local->fd, subvol)) {
+ dht_lk2 (this, subvol, frame, 0);
+ return 0;
+ } else {
+ ret = dht_rebalance_complete_check (this, frame);
+ if (!ret) {
+ return 0;
+ }
+ }
+ }
+
+out:
DHT_STACK_UNWIND (lk, frame, op_ret, op_errno, flock, xdata);
return 0;
}
+int
+dht_lk2 (xlator_t *this, xlator_t *subvol, call_frame_t *frame, int ret)
+{
+ dht_local_t *local = NULL;
+ int32_t op_errno = EINVAL;
+
+ if ((frame == NULL) || (frame->local == NULL))
+ goto out;
+
+ local = frame->local;
+
+ op_errno = local->op_errno;
+
+ if (subvol == NULL)
+ goto out;
+
+ local->call_cnt = 2; /* This is the second attempt */
+
+ STACK_WIND (frame, dht_lk_cbk, subvol, subvol->fops->lk, local->fd,
+ local->rebalance.lock_cmd, &local->rebalance.flock,
+ local->rebalance.xdata);
+
+ return 0;
+
+out:
+ DHT_STACK_UNWIND (lk, frame, -1, op_errno, NULL, NULL);
+ return 0;
+}
int
dht_lk (call_frame_t *frame, xlator_t *this,
fd_t *fd, int cmd, struct gf_flock *flock, dict_t *xdata)
{
- xlator_t *subvol = NULL;
- int op_errno = -1;
-
+ xlator_t *subvol = NULL;
+ int op_errno = -1;
+ dht_local_t *local = NULL;
VALIDATE_OR_GOTO (frame, err);
VALIDATE_OR_GOTO (this, err);
VALIDATE_OR_GOTO (fd, err);
+ local = dht_local_init (frame, NULL, fd, GF_FOP_LK);
+
subvol = dht_subvol_get_cached (this, fd->inode);
if (!subvol) {
gf_msg_debug (this->name, 0,
@@ -968,7 +1053,11 @@ dht_lk (call_frame_t *frame, xlator_t *this,
goto err;
}
- /* TODO: for rebalance, we need to preserve the fop arguments */
+ local->rebalance.flock = *flock;
+ local->rebalance.lock_cmd = cmd;
+
+ local->call_cnt = 1;
+
STACK_WIND (frame, dht_lk_cbk, subvol, subvol->fops->lk, fd,
cmd, flock, xdata);
diff --git a/xlators/features/locks/src/posix.c b/xlators/features/locks/src/posix.c
index a48d1c49240..1d40c154162 100644
--- a/xlators/features/locks/src/posix.c
+++ b/xlators/features/locks/src/posix.c
@@ -1268,6 +1268,17 @@ pl_flush (call_frame_t *frame, xlator_t *this,
return 0;
}
+ pthread_mutex_lock (&pl_inode->mutex);
+ {
+ if (pl_inode->migrated) {
+ pthread_mutex_unlock (&pl_inode->mutex);
+ STACK_UNWIND_STRICT (flush, frame, -1, EREMOTE,
+ NULL);
+ return 0;
+ }
+ }
+ pthread_mutex_unlock (&pl_inode->mutex);
+
pl_trace_flush (this, frame, fd);
if (frame->root->lk_owner.len == 0) {
@@ -1968,6 +1979,22 @@ pl_lk (call_frame_t *frame, xlator_t *this,
#endif
case F_SETLK:
memcpy (&reqlock->user_flock, flock, sizeof (struct gf_flock));
+
+ pthread_mutex_lock (&pl_inode->mutex);
+ {
+ if (pl_inode->migrated) {
+ op_errno = EREMOTE;
+ pthread_mutex_unlock (&pl_inode->mutex);
+ STACK_UNWIND_STRICT (lk, frame, op_ret, op_errno,
+ flock, xdata);
+
+ __destroy_lock (reqlock);
+ goto out;
+ }
+ }
+ pthread_mutex_unlock (&pl_inode->mutex);
+
+
ret = pl_verify_reservelk (this, pl_inode, reqlock, can_block);
if (ret < 0) {
gf_log (this->name, GF_LOG_TRACE,