summaryrefslogtreecommitdiffstats
path: root/xlators/cluster/dht/src/dht-common.c
diff options
context:
space:
mode:
Diffstat (limited to 'xlators/cluster/dht/src/dht-common.c')
-rw-r--r--xlators/cluster/dht/src/dht-common.c192
1 files changed, 166 insertions, 26 deletions
diff --git a/xlators/cluster/dht/src/dht-common.c b/xlators/cluster/dht/src/dht-common.c
index ee37d2f36af..4c93084ec82 100644
--- a/xlators/cluster/dht/src/dht-common.c
+++ b/xlators/cluster/dht/src/dht-common.c
@@ -55,6 +55,9 @@ int32_t dht_set_fixed_dir_stat (struct iatt *stat)
int
+dht_rmdir_unlock (call_frame_t *frame, xlator_t *this);
+
+int
dht_aggregate_quota_xattr (dict_t *dst, char *key, data_t *value)
{
int ret = -1;
@@ -4669,6 +4672,10 @@ dht_readdirp_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int op_ret,
if (IA_ISINVAL(orig_entry->d_stat.ia_type)) {
/*stat failed somewhere- ignore this entry*/
+ gf_msg_debug (this->name, EINVAL,
+ "Invalid stat, ignoring entry "
+ "%s gfid %s", orig_entry->d_name,
+ uuid_utoa (orig_entry->d_stat.ia_gfid));
continue;
}
@@ -4681,7 +4688,6 @@ dht_readdirp_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int op_ret,
* corresponding hashed subvolume will take care of the
* directory entry.
*/
-
if (readdir_optimize) {
if (prev->this == local->first_up_subvol)
goto list;
@@ -5199,7 +5205,7 @@ out:
if (local && local->lock.locks) {
/* store op_errno for failure case*/
local->op_errno = op_errno;
- local->refresh_layout_unlock (frame, this, op_ret);
+ local->refresh_layout_unlock (frame, this, op_ret, 1);
if (op_ret == 0) {
DHT_STACK_UNWIND (mknod, frame, op_ret, op_errno,
@@ -5260,7 +5266,7 @@ dht_mknod_linkfile_create_cbk (call_frame_t *frame, void *cookie,
return 0;
err:
if (local && local->lock.locks) {
- local->refresh_layout_unlock (frame, this, -1);
+ local->refresh_layout_unlock (frame, this, -1, 1);
} else {
DHT_STACK_UNWIND (mknod, frame, -1,
op_errno, NULL, NULL, NULL,
@@ -5367,7 +5373,7 @@ dht_mknod_do (call_frame_t *frame)
local->umask, local->params);
return 0;
err:
- local->refresh_layout_unlock (frame, this, -1);
+ local->refresh_layout_unlock (frame, this, -1, 1);
return 0;
}
@@ -5382,7 +5388,8 @@ dht_mknod_unlock_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
}
int32_t
-dht_mknod_finish (call_frame_t *frame, xlator_t *this, int op_ret)
+dht_mknod_finish (call_frame_t *frame, xlator_t *this, int op_ret,
+ int invoke_cbk)
{
dht_local_t *local = NULL, *lock_local = NULL;
call_frame_t *lock_frame = NULL;
@@ -5457,7 +5464,7 @@ dht_mknod_lock_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
return 0;
err:
- dht_mknod_finish (frame, this, -1);
+ dht_mknod_finish (frame, this, -1, 0);
return 0;
}
@@ -5488,7 +5495,7 @@ dht_mknod_lock (call_frame_t *frame, xlator_t *subvol)
local->lock.lk_count = count;
ret = dht_blocking_inodelk (frame, lk_array, count,
- dht_mknod_lock_cbk);
+ IGNORE_ENOENT_ESTALE, dht_mknod_lock_cbk);
if (ret < 0) {
local->lock.locks = NULL;
@@ -6028,7 +6035,7 @@ out:
if (local && local->lock.locks) {
/* store op_errno for failure case*/
local->op_errno = op_errno;
- local->refresh_layout_unlock (frame, this, op_ret);
+ local->refresh_layout_unlock (frame, this, op_ret, 1);
if (op_ret == 0) {
DHT_STACK_UNWIND (create, frame, op_ret, op_errno, fd,
@@ -6087,7 +6094,7 @@ dht_create_linkfile_create_cbk (call_frame_t *frame, void *cookie,
return 0;
err:
if (local && local->lock.locks) {
- local->refresh_layout_unlock (frame, this, -1);
+ local->refresh_layout_unlock (frame, this, -1, 1);
} else {
DHT_STACK_UNWIND (create, frame, -1,
op_errno, NULL, NULL, NULL,
@@ -6253,7 +6260,7 @@ dht_create_do (call_frame_t *frame)
local->umask, local->fd, local->params);
return 0;
err:
- local->refresh_layout_unlock (frame, this, -1);
+ local->refresh_layout_unlock (frame, this, -1, 1);
return 0;
}
@@ -6267,7 +6274,8 @@ dht_create_unlock_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
}
int32_t
-dht_create_finish (call_frame_t *frame, xlator_t *this, int op_ret)
+dht_create_finish (call_frame_t *frame, xlator_t *this, int op_ret,
+ int invoke_cbk)
{
dht_local_t *local = NULL, *lock_local = NULL;
call_frame_t *lock_frame = NULL;
@@ -6342,7 +6350,7 @@ dht_create_lock_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
return 0;
err:
- dht_create_finish (frame, this, -1);
+ dht_create_finish (frame, this, -1, 0);
return 0;
}
@@ -6373,7 +6381,7 @@ dht_create_lock (call_frame_t *frame, xlator_t *subvol)
local->lock.lk_count = count;
ret = dht_blocking_inodelk (frame, lk_array, count,
- dht_create_lock_cbk);
+ IGNORE_ENOENT_ESTALE, dht_create_lock_cbk);
if (ret < 0) {
local->lock.locks = NULL;
@@ -6797,8 +6805,8 @@ dht_rmdir_selfheal_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
int
dht_rmdir_hashed_subvol_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
- int op_ret, int op_errno, struct iatt *preparent,
- struct iatt *postparent, dict_t *xdata)
+ int op_ret, int op_errno, struct iatt *preparent,
+ struct iatt *postparent, dict_t *xdata)
{
dht_local_t *local = NULL;
dht_conf_t *conf = NULL;
@@ -6818,7 +6826,8 @@ dht_rmdir_hashed_subvol_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
local->op_errno = op_errno;
local->op_ret = -1;
if (conf->subvolume_cnt != 1) {
- if (op_errno != ENOENT && op_errno != EACCES) {
+ if (op_errno != ENOENT && op_errno != EACCES
+ && op_errno != ESTALE) {
local->need_selfheal = 1;
}
}
@@ -6842,6 +6851,7 @@ unlock:
this_call_cnt = dht_frame_return (frame);
if (is_last_call (this_call_cnt)) {
if (local->need_selfheal) {
+ dht_rmdir_unlock (frame, this);
local->layout =
dht_layout_get (this, local->loc.inode);
@@ -6868,6 +6878,7 @@ unlock:
dht_set_fixed_dir_stat (&local->preparent);
dht_set_fixed_dir_stat (&local->postparent);
+ dht_rmdir_unlock (frame, this);
DHT_STACK_UNWIND (rmdir, frame, local->op_ret,
local->op_errno, &local->preparent,
&local->postparent, NULL);
@@ -6936,6 +6947,7 @@ unlock:
if (done) {
if (local->need_selfheal && local->fop_succeeded) {
+ dht_rmdir_unlock (frame, this);
local->layout =
dht_layout_get (this, local->loc.inode);
@@ -6973,6 +6985,7 @@ unlock:
dht_set_fixed_dir_stat (&local->preparent);
dht_set_fixed_dir_stat (&local->postparent);
+ dht_rmdir_unlock (frame, this);
DHT_STACK_UNWIND (rmdir, frame, local->op_ret,
local->op_errno, &local->preparent,
&local->postparent, NULL);
@@ -6984,11 +6997,110 @@ unlock:
int
+dht_rmdir_unlock_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno, dict_t *xdata)
+{
+ DHT_STACK_DESTROY (frame);
+ return 0;
+}
+
+
+int
+dht_rmdir_unlock (call_frame_t *frame, xlator_t *this)
+{
+ dht_local_t *local = NULL, *lock_local = NULL;
+ call_frame_t *lock_frame = NULL;
+ int lock_count = 0;
+
+ local = frame->local;
+ lock_count = dht_lock_count (local->lock.locks, local->lock.lk_count);
+
+ if (lock_count == 0)
+ goto done;
+
+ lock_frame = copy_frame (frame);
+ if (lock_frame == NULL)
+ goto done;
+
+ lock_local = dht_local_init (lock_frame, &local->loc, NULL,
+ lock_frame->root->op);
+ if (lock_local == NULL)
+ goto done;
+
+ lock_local->lock.locks = local->lock.locks;
+ lock_local->lock.lk_count = local->lock.lk_count;
+
+ local->lock.locks = NULL;
+ local->lock.lk_count = 0;
+ dht_unlock_inodelk (lock_frame, lock_local->lock.locks,
+ lock_local->lock.lk_count,
+ dht_rmdir_unlock_cbk);
+ lock_frame = NULL;
+
+done:
+ if (lock_frame != NULL) {
+ DHT_STACK_DESTROY (lock_frame);
+ }
+
+ return 0;
+}
+
+
+int
+dht_rmdir_lock_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno, dict_t *xdata)
+{
+ dht_local_t *local = NULL;
+ dht_conf_t *conf = NULL;
+ int i = 0;
+
+ VALIDATE_OR_GOTO (this->private, err);
+
+ conf = this->private;
+ local = frame->local;
+
+ if (op_ret < 0) {
+ gf_msg (this->name, GF_LOG_WARNING, op_errno,
+ DHT_MSG_INODE_LK_ERROR,
+ "acquiring inodelk failed rmdir for %s)",
+ local->loc.path);
+
+ local->op_ret = -1;
+ local->op_errno = op_errno;
+ goto err;
+ }
+
+ for (i = 0; i < conf->subvolume_cnt; i++) {
+ if (local->hashed_subvol &&
+ (local->hashed_subvol == conf->subvolumes[i]))
+ continue;
+
+ STACK_WIND (frame, dht_rmdir_cbk,
+ conf->subvolumes[i],
+ conf->subvolumes[i]->fops->rmdir,
+ &local->loc, local->flags, NULL);
+ }
+
+ return 0;
+
+err:
+ /* No harm in calling an extra rmdir unlock */
+ dht_rmdir_unlock (frame, this);
+ DHT_STACK_UNWIND (rmdir, frame, local->op_ret, local->op_errno,
+ &local->preparent, &local->postparent, NULL);
+
+ return 0;
+}
+
+
+int
dht_rmdir_do (call_frame_t *frame, xlator_t *this)
{
dht_local_t *local = NULL;
dht_conf_t *conf = NULL;
- int i = 0;
+ dht_lock_t **lk_array = NULL;
+ int i = 0, ret = -1;
+ int count = 1;
xlator_t *hashed_subvol = NULL;
char gfid[GF_UUID_BUF_SIZE] ={0};
@@ -7002,7 +7114,6 @@ dht_rmdir_do (call_frame_t *frame, xlator_t *this)
local->call_cnt = conf->subvolume_cnt;
-
/* first remove from non-hashed_subvol */
hashed_subvol = dht_subvol_get_hashed (this, &local->loc);
@@ -7026,15 +7137,39 @@ dht_rmdir_do (call_frame_t *frame, xlator_t *this)
return 0;
}
- for (i = 0; i < conf->subvolume_cnt; i++) {
- if (hashed_subvol &&
- (hashed_subvol == conf->subvolumes[i]))
- continue;
+ count = conf->subvolume_cnt;
- STACK_WIND (frame, dht_rmdir_cbk,
- conf->subvolumes[i],
- conf->subvolumes[i]->fops->rmdir,
- &local->loc, local->flags, NULL);
+ lk_array = GF_CALLOC (count, sizeof (*lk_array), gf_common_mt_char);
+ if (lk_array == NULL) {
+ local->op_ret = -1;
+ local->op_errno = ENOMEM;
+ goto err;
+ }
+
+ for (i = 0; i < count; i++) {
+ lk_array[i] = dht_lock_new (frame->this,
+ conf->subvolumes[i],
+ &local->loc, F_WRLCK,
+ DHT_LAYOUT_HEAL_DOMAIN);
+ if (lk_array[i] == NULL) {
+ local->op_ret = -1;
+ local->op_errno = EINVAL;
+ goto err;
+ }
+ }
+
+ local->lock.locks = lk_array;
+ local->lock.lk_count = count;
+
+ ret = dht_blocking_inodelk (frame, lk_array, count,
+ IGNORE_ENOENT_ESTALE,
+ dht_rmdir_lock_cbk);
+ if (ret < 0) {
+ local->lock.locks = NULL;
+ local->lock.lk_count = 0;
+ local->op_ret = -1;
+ local->op_errno = errno ? errno : EINVAL;
+ goto err;
}
return 0;
@@ -7043,6 +7178,11 @@ err:
dht_set_fixed_dir_stat (&local->preparent);
dht_set_fixed_dir_stat (&local->postparent);
+ if (lk_array != NULL) {
+ dht_lock_array_free (lk_array, count);
+ GF_FREE (lk_array);
+ }
+
DHT_STACK_UNWIND (rmdir, frame, local->op_ret, local->op_errno,
&local->preparent, &local->postparent, NULL);
return 0;