summaryrefslogtreecommitdiffstats
path: root/xlators/cluster/dht/src/dht-rename.c
diff options
context:
space:
mode:
Diffstat (limited to 'xlators/cluster/dht/src/dht-rename.c')
-rw-r--r--xlators/cluster/dht/src/dht-rename.c61
1 files changed, 54 insertions, 7 deletions
diff --git a/xlators/cluster/dht/src/dht-rename.c b/xlators/cluster/dht/src/dht-rename.c
index 7e7e7151af7..53c61f8a714 100644
--- a/xlators/cluster/dht/src/dht-rename.c
+++ b/xlators/cluster/dht/src/dht-rename.c
@@ -25,17 +25,22 @@ dht_rename_dir_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
struct iatt *prenewparent, struct iatt *postnewparent,
dict_t *xdata)
{
- dht_local_t *local = NULL;
- int this_call_cnt = 0;
- xlator_t *prev = NULL;
- char gfid[GF_UUID_BUF_SIZE] = {0};
+ dht_conf_t *conf = NULL;
+ dht_local_t *local = NULL;
+ int this_call_cnt = 0;
+ xlator_t *prev = NULL;
+ int i = 0;
+ char gfid[GF_UUID_BUF_SIZE] = {0};
+ int subvol_cnt = -1;
+ conf = this->private;
local = frame->local;
prev = cookie;
+ subvol_cnt = dht_subvol_cnt (this, prev);
+ local->ret_cache[subvol_cnt] = op_ret;
if (op_ret == -1) {
- /* TODO: undo the damage */
gf_uuid_unparse(local->loc.inode->gfid, gfid);
gf_msg (this->name, GF_LOG_INFO, op_errno,
@@ -63,6 +68,42 @@ dht_rename_dir_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
unwind:
this_call_cnt = dht_frame_return (frame);
if (is_last_call (this_call_cnt)) {
+ /* We get here with local->call_cnt == 0. Which means
+ * we are the only one executing this code, there is
+ * no contention. Therefore it's safe to manipulate or
+ * deref local->call_cnt directly (without locking).
+ */
+ if (local->ret_cache[conf->subvolume_cnt] == 0) {
+ /* count errant subvols in last field of ret_cache */
+ for (i = 0; i < conf->subvolume_cnt; i++) {
+ if (local->ret_cache[i] != 0)
+ ++local->ret_cache[conf->subvolume_cnt];
+ }
+ if (local->ret_cache[conf->subvolume_cnt]) {
+ /* undoing the damage:
+ * for all subvolumes, where rename
+ * succeeded, we perform the reverse operation
+ */
+ for (i = 0; i < conf->subvolume_cnt; i++) {
+ if (local->ret_cache[i] == 0)
+ ++local->call_cnt;
+ }
+ for (i = 0; i < conf->subvolume_cnt; i++) {
+ if (local->ret_cache[i])
+ continue;
+
+ STACK_WIND (frame,
+ dht_rename_dir_cbk,
+ conf->subvolumes[i],
+ conf->subvolumes[i]->fops->rename,
+ &local->loc2, &local->loc,
+ NULL);
+ }
+
+ return 0;
+ }
+ }
+
WIPE (&local->preoldparent);
WIPE (&local->postoldparent);
WIPE (&local->preparent);
@@ -96,8 +137,6 @@ dht_rename_hashed_dir_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
if (op_ret == -1) {
- /* TODO: undo the damage */
-
gf_uuid_unparse(local->loc.inode->gfid, gfid);
gf_msg (this->name, GF_LOG_INFO, op_errno,
@@ -324,6 +363,14 @@ dht_rename_dir (call_frame_t *frame, xlator_t *this)
conf = frame->this->private;
local = frame->local;
+ local->ret_cache = GF_CALLOC (conf->subvolume_cnt + 1, sizeof (int),
+ gf_dht_ret_cache_t);
+
+ if (local->ret_cache == NULL) {
+ op_errno = ENOMEM;
+ goto err;
+ }
+
/* We must take a lock on all the subvols with src gfid.
* Along with this if dst exists we must take lock on
* any one subvol with dst gfid.