/* Copyright (c) 2008-2012 Red Hat, Inc. This file is part of GlusterFS. This file is licensed to you under your choice of the GNU Lesser General Public License, version 3 or any later version (LGPLv3 or later), or the GNU General Public License, version 2 (GPLv2), in all cases as published by the Free Software Foundation. */ /* TODO: link(oldpath, newpath) fails if newpath already exists. DHT should * delete the newpath if it gets EEXISTS from link() call. */ #include "glusterfs.h" #include "xlator.h" #include "dht-common.h" #include "dht-lock.h" #include "defaults.h" int dht_rename_unlock (call_frame_t *frame, xlator_t *this); int32_t dht_rename_lock_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, dict_t *xdata); int dht_rename_unlock_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, dict_t *xdata) { dht_local_t *local = NULL; local = frame->local; dht_set_fixed_dir_stat (&local->preoldparent); dht_set_fixed_dir_stat (&local->postoldparent); dht_set_fixed_dir_stat (&local->preparent); dht_set_fixed_dir_stat (&local->postparent); if (IA_ISREG (local->stbuf.ia_type)) DHT_STRIP_PHASE1_FLAGS (&local->stbuf); DHT_STACK_UNWIND (rename, frame, local->op_ret, local->op_errno, &local->stbuf, &local->preoldparent, &local->postoldparent, &local->preparent, &local->postparent, local->xattr); return 0; } static void dht_rename_dir_unlock_src (call_frame_t *frame, xlator_t *this) { dht_local_t *local = NULL; local = frame->local; dht_unlock_namespace (frame, &local->lock[0]); return; } static void dht_rename_dir_unlock_dst (call_frame_t *frame, xlator_t *this) { dht_local_t *local = NULL; int op_ret = -1; char src_gfid[GF_UUID_BUF_SIZE] = {0}; char dst_gfid[GF_UUID_BUF_SIZE] = {0}; local = frame->local; /* Unlock entrylk */ dht_unlock_entrylk_wrapper (frame, &local->lock[1].ns.directory_ns); /* Unlock inodelk */ op_ret = dht_unlock_inodelk (frame, local->lock[1].ns.parent_layout.locks, local->lock[1].ns.parent_layout.lk_count, dht_rename_unlock_cbk); if (op_ret < 0) { uuid_utoa_r (local->loc.inode->gfid, src_gfid); if (local->loc2.inode) uuid_utoa_r (local->loc2.inode->gfid, dst_gfid); if (IA_ISREG (local->stbuf.ia_type)) gf_msg (this->name, GF_LOG_WARNING, 0, DHT_MSG_UNLOCKING_FAILED, "winding unlock inodelk failed " "rename (%s:%s:%s %s:%s:%s), " "stale locks left on bricks", local->loc.path, src_gfid, local->src_cached->name, local->loc2.path, dst_gfid, local->dst_cached ? local->dst_cached->name : NULL); else gf_msg (this->name, GF_LOG_WARNING, 0, DHT_MSG_UNLOCKING_FAILED, "winding unlock inodelk failed " "rename (%s:%s %s:%s), " "stale locks left on bricks", local->loc.path, src_gfid, local->loc2.path, dst_gfid); dht_rename_unlock_cbk (frame, NULL, this, 0, 0, NULL); } return; } static int dht_rename_dir_unlock (call_frame_t *frame, xlator_t *this) { dht_rename_dir_unlock_src (frame, this); dht_rename_dir_unlock_dst (frame, this); return 0; } int dht_rename_dir_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, struct iatt *stbuf, struct iatt *preoldparent, struct iatt *postoldparent, struct iatt *prenewparent, struct iatt *postnewparent, dict_t *xdata) { dht_conf_t *conf = NULL; dht_local_t *local = NULL; int this_call_cnt = 0; xlator_t *prev = NULL; int i = 0; char gfid[GF_UUID_BUF_SIZE] = {0}; int subvol_cnt = -1; conf = this->private; local = frame->local; prev = cookie; subvol_cnt = dht_subvol_cnt (this, prev); local->ret_cache[subvol_cnt] = op_ret; if (op_ret == -1) { gf_uuid_unparse(local->loc.inode->gfid, gfid); gf_msg (this->name, GF_LOG_INFO, op_errno, DHT_MSG_RENAME_FAILED, "Rename %s -> %s on %s failed, (gfid = %s)", local->loc.path, local->loc2.path, prev->name, gfid); local->op_ret = op_ret; local->op_errno = op_errno; goto unwind; } /* TODO: construct proper stbuf for dir */ /* * FIXME: is this the correct way to build stbuf and * parent bufs? */ dht_iatt_merge (this, &local->stbuf, stbuf); dht_iatt_merge (this, &local->preoldparent, preoldparent); dht_iatt_merge (this, &local->postoldparent, postoldparent); dht_iatt_merge (this, &local->preparent, prenewparent); dht_iatt_merge (this, &local->postparent, postnewparent); unwind: this_call_cnt = dht_frame_return (frame); if (is_last_call (this_call_cnt)) { /* We get here with local->call_cnt == 0. Which means * we are the only one executing this code, there is * no contention. Therefore it's safe to manipulate or * deref local->call_cnt directly (without locking). */ if (local->ret_cache[conf->subvolume_cnt] == 0) { /* count errant subvols in last field of ret_cache */ for (i = 0; i < conf->subvolume_cnt; i++) { if (local->ret_cache[i] != 0) ++local->ret_cache[conf->subvolume_cnt]; } if (local->ret_cache[conf->subvolume_cnt]) { /* undoing the damage: * for all subvolumes, where rename * succeeded, we perform the reverse operation */ for (i = 0; i < conf->subvolume_cnt; i++) { if (local->ret_cache[i] == 0) ++local->call_cnt; } for (i = 0; i < conf->subvolume_cnt; i++) { if (local->ret_cache[i]) continue; STACK_WIND (frame, dht_rename_dir_cbk, conf->subvolumes[i], conf->subvolumes[i]->fops->rename, &local->loc2, &local->loc, NULL); } return 0; } } WIPE (&local->preoldparent); WIPE (&local->postoldparent); WIPE (&local->preparent); WIPE (&local->postparent); dht_rename_dir_unlock (frame, this); } return 0; } int dht_rename_hashed_dir_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, struct iatt *stbuf, struct iatt *preoldparent, struct iatt *postoldparent, struct iatt *prenewparent, struct iatt *postnewparent, dict_t *xdata) { dht_conf_t *conf = NULL; dht_local_t *local = NULL; int call_cnt = 0; xlator_t *prev = NULL; int i = 0; char gfid[GF_UUID_BUF_SIZE] = {0}; conf = this->private; local = frame->local; prev = cookie; if (op_ret == -1) { gf_uuid_unparse(local->loc.inode->gfid, gfid); gf_msg (this->name, GF_LOG_INFO, op_errno, DHT_MSG_RENAME_FAILED, "rename %s -> %s on %s failed, (gfid = %s) ", local->loc.path, local->loc2.path, prev->name, gfid); local->op_ret = op_ret; local->op_errno = op_errno; goto unwind; } /* TODO: construct proper stbuf for dir */ /* * FIXME: is this the correct way to build stbuf and * parent bufs? */ dht_iatt_merge (this, &local->stbuf, stbuf); dht_iatt_merge (this, &local->preoldparent, preoldparent); dht_iatt_merge (this, &local->postoldparent, postoldparent); dht_iatt_merge (this, &local->preparent, prenewparent); dht_iatt_merge (this, &local->postparent, postnewparent); call_cnt = local->call_cnt = conf->subvolume_cnt - 1; if (!local->call_cnt) goto unwind; for (i = 0; i < conf->subvolume_cnt; i++) { if (conf->subvolumes[i] == local->dst_hashed) continue; STACK_WIND_COOKIE (frame, dht_rename_dir_cbk, conf->subvolumes[i], conf->subvolumes[i], conf->subvolumes[i]->fops->rename, &local->loc, &local->loc2, NULL); if (!--call_cnt) break; } return 0; unwind: WIPE (&local->preoldparent); WIPE (&local->postoldparent); WIPE (&local->preparent); WIPE (&local->postparent); dht_rename_dir_unlock (frame, this); return 0; } int dht_rename_dir_do (call_frame_t *frame, xlator_t *this) { dht_local_t *local = NULL; local = frame->local; if (local->op_ret == -1) goto err; local->op_ret = 0; STACK_WIND_COOKIE (frame, dht_rename_hashed_dir_cbk, local->dst_hashed, local->dst_hashed, local->dst_hashed->fops->rename, &local->loc, &local->loc2, NULL); return 0; err: dht_rename_dir_unlock (frame, this); return 0; } int dht_rename_readdir_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int op_ret, int op_errno, gf_dirent_t *entries, dict_t *xdata) { dht_local_t *local = NULL; int this_call_cnt = -1; xlator_t *prev = NULL; local = frame->local; prev = cookie; if (op_ret > 2) { gf_msg_trace (this->name, 0, "readdir on %s for %s returned %d entries", prev->name, local->loc.path, op_ret); local->op_ret = -1; local->op_errno = ENOTEMPTY; } this_call_cnt = dht_frame_return (frame); if (is_last_call (this_call_cnt)) { dht_rename_dir_do (frame, this); } return 0; } int dht_rename_opendir_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int op_ret, int op_errno, fd_t *fd, dict_t *xdata) { dht_local_t *local = NULL; int this_call_cnt = -1; xlator_t *prev = NULL; char gfid[GF_UUID_BUF_SIZE] = {0}; local = frame->local; prev = cookie; if (op_ret == -1) { gf_uuid_unparse(local->loc.inode->gfid, gfid); gf_msg (this->name, GF_LOG_INFO, op_errno, DHT_MSG_OPENDIR_FAILED, "opendir on %s for %s failed,(gfid = %s) ", prev->name, local->loc.path, gfid); goto err; } fd_bind (fd); STACK_WIND_COOKIE (frame, dht_rename_readdir_cbk, prev, prev, prev->fops->readdir, local->fd, 4096, 0, NULL); return 0; err: this_call_cnt = dht_frame_return (frame); if (is_last_call (this_call_cnt)) { dht_rename_dir_do (frame, this); } return 0; } int dht_rename_dir_lock2_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, dict_t *xdata) { dht_local_t *local = NULL; char src_gfid[GF_UUID_BUF_SIZE] = {0}; char dst_gfid[GF_UUID_BUF_SIZE] = {0}; dht_conf_t *conf = NULL; int i = 0; local = frame->local; conf = this->private; if (op_ret < 0) { uuid_utoa_r (local->loc.inode->gfid, src_gfid); if (local->loc2.inode) uuid_utoa_r (local->loc2.inode->gfid, dst_gfid); gf_msg (this->name, GF_LOG_WARNING, op_errno, DHT_MSG_INODE_LK_ERROR, "acquiring entrylk after inodelk failed" "rename (%s:%s:%s %s:%s:%s)", local->loc.path, src_gfid, local->src_cached->name, local->loc2.path, dst_gfid, local->dst_cached ? local->dst_cached->name : NULL); local->op_ret = -1; local->op_errno = op_errno; goto err; } local->fd = fd_create (local->loc.inode, frame->root->pid); if (!local->fd) { op_errno = ENOMEM; goto err; } local->op_ret = 0; if (!local->dst_cached) { dht_rename_dir_do (frame, this); return 0; } for (i = 0; i < conf->subvolume_cnt; i++) { STACK_WIND_COOKIE (frame, dht_rename_opendir_cbk, conf->subvolumes[i], conf->subvolumes[i], conf->subvolumes[i]->fops->opendir, &local->loc2, local->fd, NULL); } return 0; err: /* No harm in calling an extra unlock */ dht_rename_dir_unlock (frame, this); return 0; } int dht_rename_dir_lock1_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, dict_t *xdata) { dht_local_t *local = NULL; char src_gfid[GF_UUID_BUF_SIZE] = {0}; char dst_gfid[GF_UUID_BUF_SIZE] = {0}; int ret = 0; loc_t *loc = NULL; xlator_t *subvol = NULL; local = frame->local; if (op_ret < 0) { uuid_utoa_r (local->loc.inode->gfid, src_gfid); if (local->loc2.inode) uuid_utoa_r (local->loc2.inode->gfid, dst_gfid); gf_msg (this->name, GF_LOG_WARNING, op_errno, DHT_MSG_INODE_LK_ERROR, "acquiring entrylk after inodelk failed" "rename (%s:%s:%s %s:%s:%s)", local->loc.path, src_gfid, local->src_cached->name, local->loc2.path, dst_gfid, local->dst_cached ? local->dst_cached->name : NULL); local->op_ret = -1; local->op_errno = op_errno; goto err; } if (local->current == &local->lock[0]) { loc = &local->loc2; subvol = local->dst_hashed; local->current = &local->lock[1]; } else { loc = &local->loc; subvol = local->src_hashed; local->current = &local->lock[0]; } ret = dht_protect_namespace (frame, loc, subvol, &local->current->ns, dht_rename_dir_lock2_cbk); if (ret < 0) { op_errno = EINVAL; goto err; } return 0; err: /* No harm in calling an extra unlock */ dht_rename_dir_unlock (frame, this); return 0; } /* * If the hashed subvolumes of both source and dst are the different, * lock in dictionary order of hashed subvol->name. This is important * in case the parent directory is the same for both src and dst to * prevent inodelk deadlocks when racing with a fix-layout op on the parent. * * If the hashed subvols are the same, use the gfid/name to determine * the order of taking locks to prevent entrylk deadlocks when the parent * dirs are the same. * */ static void dht_order_rename_lock (call_frame_t *frame, loc_t **loc, xlator_t **subvol) { int ret = 0; dht_local_t *local = NULL; char src[GF_UUID_BNAME_BUF_SIZE] = {0}; char dst[GF_UUID_BNAME_BUF_SIZE] = {0}; local = frame->local; if (local->src_hashed->name == local->dst_hashed->name) { ret = 0; } else { ret = strcmp (local->src_hashed->name, local->dst_hashed->name); } if (ret == 0) { /* hashed subvols are the same for src and dst */ /* Entrylks need to be ordered*/ if (local->loc.pargfid) uuid_utoa_r (local->loc.pargfid, src); else if (local->loc.parent) uuid_utoa_r (local->loc.parent->gfid, src); strcat (src, local->loc.name); if (local->loc2.pargfid) uuid_utoa_r (local->loc2.pargfid, dst); else if (local->loc2.parent) uuid_utoa_r (local->loc2.parent->gfid, dst); strcat (dst, local->loc2.name); ret = strcmp (src, dst); } if (ret <= 0) { /*inodelk in dictionary order of hashed subvol names*/ /*entrylk in dictionary order of gfid/basename */ local->current = &local->lock[0]; *loc = &local->loc; *subvol = local->src_hashed; } else { local->current = &local->lock[1]; *loc = &local->loc2; *subvol = local->dst_hashed; } return; } int dht_rename_dir (call_frame_t *frame, xlator_t *this) { dht_conf_t *conf = NULL; dht_local_t *local = NULL; loc_t *loc = NULL; xlator_t *subvol = NULL; int i = 0; int ret = 0; int op_errno = -1; conf = frame->this->private; local = frame->local; local->ret_cache = GF_CALLOC (conf->subvolume_cnt + 1, sizeof (int), gf_dht_ret_cache_t); if (local->ret_cache == NULL) { op_errno = ENOMEM; goto err; } local->call_cnt = conf->subvolume_cnt; for (i = 0; i < conf->subvolume_cnt; i++) { if (!conf->subvolume_status[i]) { gf_msg (this->name, GF_LOG_INFO, 0, DHT_MSG_RENAME_FAILED, "Rename dir failed: subvolume down (%s)", conf->subvolumes[i]->name); op_errno = ENOTCONN; goto err; } } /* Locks on src and dst needs to ordered which otherwise might cause * deadlocks when rename (src, dst) and rename (dst, src) is done from * two different clients */ dht_order_rename_lock (frame, &loc, &subvol); /* Rename must take locks on src to avoid lookup selfheal from * recreating src on those subvols where the rename was successful. * The locks can't be issued parallel as two different clients might * attempt same rename command and be in dead lock. */ ret = dht_protect_namespace (frame, loc, subvol, &local->current->ns, dht_rename_dir_lock1_cbk); if (ret < 0) { op_errno = EINVAL; goto err; } return 0; err: op_errno = (op_errno == -1) ? errno : op_errno; DHT_STACK_UNWIND (rename, frame, -1, op_errno, NULL, NULL, NULL, NULL, NULL, NULL); return 0; } static int dht_rename_track_for_changelog (xlator_t *this, dict_t *xattr, loc_t *oldloc, loc_t *newloc) { int ret = -1; dht_changelog_rename_info_t *info = NULL; char *name = NULL; int len1 = 0; int len2 = 0; int size = 0; if (!xattr || !oldloc || !newloc || !this) return ret; len1 = strlen (oldloc->name) + 1; len2 = strlen (newloc->name) + 1; size = sizeof (dht_changelog_rename_info_t) + len1 + len2; info = GF_CALLOC (size, sizeof(char), gf_common_mt_char); if (!info) { gf_msg (this->name, GF_LOG_ERROR, 0, DHT_MSG_DICT_SET_FAILED, "Failed to calloc memory"); return ret; } gf_uuid_copy (info->old_pargfid, oldloc->pargfid); gf_uuid_copy (info->new_pargfid, newloc->pargfid); info->oldname_len = len1; info->newname_len = len2; strncpy (info->buffer, oldloc->name, len1); name = info->buffer + len1; strncpy (name, newloc->name, len2); ret = dict_set_bin (xattr, DHT_CHANGELOG_RENAME_OP_KEY, info, size); if (ret) { gf_msg (this->name, GF_LOG_ERROR, 0, DHT_MSG_DICT_SET_FAILED, "Failed to set dictionary value: key = %s," " path = %s", DHT_CHANGELOG_RENAME_OP_KEY, oldloc->name); GF_FREE (info); } return ret; } #define DHT_MARKER_DONT_ACCOUNT(xattr) do { \ int tmp = -1; \ if (!xattr) { \ xattr = dict_new (); \ if (!xattr) \ break; \ } \ tmp = dict_set_str (xattr, GLUSTERFS_MARKER_DONT_ACCOUNT_KEY, \ "yes"); \ if (tmp) { \ gf_msg (this->name, GF_LOG_ERROR, 0, \ DHT_MSG_DICT_SET_FAILED, \ "Failed to set dictionary value: key = %s," \ " path = %s",GLUSTERFS_MARKER_DONT_ACCOUNT_KEY, \ local->loc.path); \ } \ }while (0) #define DHT_CHANGELOG_TRACK_AS_RENAME(xattr, oldloc, newloc) do { \ int tmp = -1; \ if (!xattr) { \ xattr = dict_new (); \ if (!xattr) { \ gf_msg (this->name, GF_LOG_ERROR, 0, \ DHT_MSG_DICT_SET_FAILED, \ "Failed to create dictionary to " \ "track rename"); \ break; \ } \ } \ \ tmp = dht_rename_track_for_changelog (this, xattr, \ oldloc, newloc); \ \ if (tmp) { \ gf_msg (this->name, GF_LOG_ERROR, 0, \ DHT_MSG_DICT_SET_FAILED, \ "Failed to set dictionary value: key = %s," \ " path = %s", DHT_CHANGELOG_RENAME_OP_KEY, \ (oldloc)->path); \ } \ } while (0) int dht_rename_unlock (call_frame_t *frame, xlator_t *this) { dht_local_t *local = NULL; int op_ret = -1; char src_gfid[GF_UUID_BUF_SIZE] = {0}; char dst_gfid[GF_UUID_BUF_SIZE] = {0}; dht_ilock_wrap_t inodelk_wrapper = {0, }; local = frame->local; inodelk_wrapper.locks = local->rename_inodelk_backward_compatible; inodelk_wrapper.lk_count = local->rename_inodelk_bc_count; op_ret = dht_unlock_inodelk_wrapper (frame, &inodelk_wrapper); if (op_ret < 0) { uuid_utoa_r (local->loc.inode->gfid, src_gfid); if (local->loc2.inode) uuid_utoa_r (local->loc2.inode->gfid, dst_gfid); if (IA_ISREG (local->stbuf.ia_type)) gf_msg (this->name, GF_LOG_WARNING, 0, DHT_MSG_UNLOCKING_FAILED, "winding unlock inodelk failed " "rename (%s:%s:%s %s:%s:%s), " "stale locks left on bricks", local->loc.path, src_gfid, local->src_cached->name, local->loc2.path, dst_gfid, local->dst_cached ? local->dst_cached->name : NULL); else gf_msg (this->name, GF_LOG_WARNING, 0, DHT_MSG_UNLOCKING_FAILED, "winding unlock inodelk failed " "rename (%s:%s %s:%s), " "stale locks left on bricks", local->loc.path, src_gfid, local->loc2.path, dst_gfid); } dht_unlock_namespace (frame, &local->lock[0]); dht_unlock_namespace (frame, &local->lock[1]); dht_rename_unlock_cbk (frame, NULL, this, local->op_ret, local->op_errno, NULL); return 0; } int dht_rename_done (call_frame_t *frame, xlator_t *this) { dht_local_t *local = NULL; local = frame->local; if (local->linked == _gf_true) { local->linked = _gf_false; dht_linkfile_attr_heal (frame, this); } dht_rename_unlock (frame, this); return 0; } int dht_rename_unlink_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, struct iatt *preparent, struct iatt *postparent, dict_t *xdata) { dht_local_t *local = NULL; xlator_t *prev = NULL; int this_call_cnt = 0; local = frame->local; prev = cookie; FRAME_SU_UNDO (frame, dht_local_t); if (!local) { gf_msg (this->name, GF_LOG_ERROR, 0, DHT_MSG_INVALID_VALUE, "!local, should not happen"); goto out; } this_call_cnt = dht_frame_return (frame); if (op_ret == -1) { gf_msg (this->name, GF_LOG_WARNING, op_errno, DHT_MSG_UNLINK_FAILED, "%s: Rename: unlink on %s failed ", local->loc.path, prev->name); } WIPE (&local->preoldparent); WIPE (&local->postoldparent); WIPE (&local->preparent); WIPE (&local->postparent); if (is_last_call (this_call_cnt)) { dht_rename_done (frame, this); } out: return 0; } int dht_rename_cleanup (call_frame_t *frame) { dht_local_t *local = NULL; xlator_t *this = NULL; xlator_t *src_hashed = NULL; xlator_t *src_cached = NULL; xlator_t *dst_hashed = NULL; xlator_t *dst_cached = NULL; int call_cnt = 0; dict_t *xattr = NULL; char gfid[GF_UUID_BUF_SIZE] = {0}; local = frame->local; this = frame->this; src_hashed = local->src_hashed; src_cached = local->src_cached; dst_hashed = local->dst_hashed; dst_cached = local->dst_cached; if (src_cached == dst_cached) goto nolinks; if (local->linked && (dst_hashed != src_hashed )&& (dst_hashed != src_cached)) { call_cnt++; } if (local->added_link && (src_cached != dst_hashed)) { call_cnt++; } local->call_cnt = call_cnt; if (!call_cnt) goto nolinks; DHT_MARK_FOP_INTERNAL (xattr); gf_uuid_unparse(local->loc.inode->gfid, gfid); if (local->linked && (dst_hashed != src_hashed) && (dst_hashed != src_cached)) { dict_t *xattr_new = NULL; gf_msg_trace (this->name, 0, "unlinking linkfile %s @ %s => %s, (gfid = %s)", local->loc.path, dst_hashed->name, src_cached->name, gfid); xattr_new = dict_copy_with_ref (xattr, NULL); DHT_MARKER_DONT_ACCOUNT(xattr_new); FRAME_SU_DO (frame, dht_local_t); STACK_WIND_COOKIE (frame, dht_rename_unlink_cbk, dst_hashed, dst_hashed, dst_hashed->fops->unlink, &local->loc, 0, xattr_new); dict_unref (xattr_new); xattr_new = NULL; } if (local->added_link && (src_cached != dst_hashed)) { dict_t *xattr_new = NULL; gf_msg_trace (this->name, 0, "unlinking link %s => %s (%s), (gfid = %s)", local->loc.path, local->loc2.path, src_cached->name, gfid); xattr_new = dict_copy_with_ref (xattr, NULL); if (gf_uuid_compare (local->loc.pargfid, local->loc2.pargfid) == 0) { DHT_MARKER_DONT_ACCOUNT(xattr_new); } /* * * The link to file is created using root permission. * Hence deletion should happen using root. Otherwise * it will fail. */ FRAME_SU_DO (frame, dht_local_t); STACK_WIND_COOKIE (frame, dht_rename_unlink_cbk, src_cached, src_cached, src_cached->fops->unlink, &local->loc2, 0, xattr_new); dict_unref (xattr_new); xattr_new = NULL; } if (xattr) dict_unref (xattr); return 0; nolinks: WIPE (&local->preoldparent); WIPE (&local->postoldparent); WIPE (&local->preparent); WIPE (&local->postparent); dht_rename_unlock (frame, this); return 0; } int dht_rename_links_create_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, inode_t *inode, struct iatt *stbuf, struct iatt *preparent, struct iatt *postparent, dict_t *xdata) { xlator_t *prev = NULL; dht_local_t *local = NULL; prev = cookie; local = frame->local; if (op_ret == -1) { gf_msg (this->name, GF_LOG_WARNING, op_errno, DHT_MSG_CREATE_LINK_FAILED, "link/file %s on %s failed", local->loc.path, prev->name); } if (local->linked == _gf_true) { local->linked = _gf_false; dht_linkfile_attr_heal (frame, this); } DHT_STACK_DESTROY (frame); return 0; } int dht_rename_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, struct iatt *stbuf, struct iatt *preoldparent, struct iatt *postoldparent, struct iatt *prenewparent, struct iatt *postnewparent, dict_t *xdata) { dht_local_t *local = NULL; xlator_t *prev = NULL; xlator_t *src_hashed = NULL; xlator_t *src_cached = NULL; xlator_t *dst_hashed = NULL; xlator_t *dst_cached = NULL; xlator_t *rename_subvol = NULL; call_frame_t *link_frame = NULL; dht_local_t *link_local = NULL; dict_t *xattr = NULL; local = frame->local; prev = cookie; src_hashed = local->src_hashed; src_cached = local->src_cached; dst_hashed = local->dst_hashed; dst_cached = local->dst_cached; if (local->linked == _gf_true) FRAME_SU_UNDO (frame, dht_local_t); /* It is a critical failure iff we fail to rename the cached file * if the rename of the linkto failed, it is not a critical failure, * and we do not want to lose the created hard link for the new * name as that could have been read by other clients. * * NOTE: If another client is attempting the same oldname -> newname * rename, and finds both file names as existing, and are hard links * to each other, then FUSE would send in an unlink for oldname. In * this time duration if we treat the linkto as a critical error and * unlink the newname we created, we would have effectively lost the * file to rename operations. * * Repercussions of treating this as a non-critical error is that * we could leave behind a stale linkto file and/or not create the new * linkto file, the second case would be rectified by a subsequent * lookup, the first case by a rebalance, like for all stale linkto * files */ if (op_ret == -1) { /* Critical failure: unable to rename the cached file */ if (prev == src_cached) { gf_msg (this->name, GF_LOG_WARNING, op_errno, DHT_MSG_RENAME_FAILED, "%s: Rename on %s failed, (gfid = %s) ", local->loc.path, prev->name, local->loc.inode ? uuid_utoa(local->loc.inode->gfid):""); local->op_ret = op_ret; local->op_errno = op_errno; goto cleanup; } else { /* Non-critical failure, unable to rename the linkto * file */ gf_msg (this->name, GF_LOG_INFO, op_errno, DHT_MSG_RENAME_FAILED, "%s: Rename (linkto file) on %s failed, " "(gfid = %s) ", local->loc.path, prev->name, local->loc.inode ? uuid_utoa(local->loc.inode->gfid):""); } } if (xdata) { if (!local->xattr) local->xattr = dict_ref (xdata); else local->xattr = dict_copy_with_ref (xdata, local->xattr); } if ((src_cached == dst_cached) && (dst_hashed != dst_cached)) { link_frame = copy_frame (frame); if (!link_frame) { goto err; } /* fop value sent as maxvalue because it is not used anywhere in this case */ link_local = dht_local_init (link_frame, &local->loc2, NULL, GF_FOP_MAXVALUE); if (!link_local) { goto err; } if (link_local->loc.inode) inode_unref (link_local->loc.inode); link_local->loc.inode = inode_ref (local->loc.inode); gf_uuid_copy (link_local->gfid, local->loc.inode->gfid); dht_linkfile_create (link_frame, dht_rename_links_create_cbk, this, src_cached, dst_hashed, &link_local->loc); } err: /* Merge attrs only from src_cached. In case there of src_cached != * dst_hashed, this ignores linkfile attrs. */ if (prev == src_cached) { dht_iatt_merge (this, &local->stbuf, stbuf); dht_iatt_merge (this, &local->preoldparent, preoldparent); dht_iatt_merge (this, &local->postoldparent, postoldparent); dht_iatt_merge (this, &local->preparent, prenewparent); dht_iatt_merge (this, &local->postparent, postnewparent); } /* NOTE: rename_subvol is the same subvolume from which dht_rename_cbk * is called. since rename has already happened on rename_subvol, * unlink should not be sent for oldpath (either linkfile or cached-file) * on rename_subvol. */ if (src_cached == dst_cached) rename_subvol = src_cached; else rename_subvol = dst_hashed; /* TODO: delete files in background */ if (src_cached != dst_hashed && src_cached != dst_cached) local->call_cnt++; if (src_hashed != rename_subvol && src_hashed != src_cached) local->call_cnt++; if (dst_cached && dst_cached != dst_hashed && dst_cached != src_cached) local->call_cnt++; if (local->call_cnt == 0) goto unwind; DHT_MARK_FOP_INTERNAL (xattr); if (src_cached != dst_hashed && src_cached != dst_cached) { dict_t *xattr_new = NULL; xattr_new = dict_copy_with_ref (xattr, NULL); gf_msg_trace (this->name, 0, "deleting old src datafile %s @ %s", local->loc.path, src_cached->name); if (gf_uuid_compare (local->loc.pargfid, local->loc2.pargfid) == 0) { DHT_MARKER_DONT_ACCOUNT(xattr_new); } DHT_CHANGELOG_TRACK_AS_RENAME(xattr_new, &local->loc, &local->loc2); STACK_WIND_COOKIE (frame, dht_rename_unlink_cbk, src_cached, src_cached, src_cached->fops->unlink, &local->loc, 0, xattr_new); dict_unref (xattr_new); xattr_new = NULL; } if (src_hashed != rename_subvol && src_hashed != src_cached) { dict_t *xattr_new = NULL; xattr_new = dict_copy_with_ref (xattr, NULL); gf_msg_trace (this->name, 0, "deleting old src linkfile %s @ %s", local->loc.path, src_hashed->name); DHT_MARKER_DONT_ACCOUNT(xattr_new); STACK_WIND_COOKIE (frame, dht_rename_unlink_cbk, src_hashed, src_hashed, src_hashed->fops->unlink, &local->loc, 0, xattr_new); dict_unref (xattr_new); xattr_new = NULL; } if (dst_cached && (dst_cached != dst_hashed) && (dst_cached != src_cached)) { gf_msg_trace (this->name, 0, "deleting old dst datafile %s @ %s", local->loc2.path, dst_cached->name); STACK_WIND_COOKIE (frame, dht_rename_unlink_cbk, dst_cached, dst_cached, dst_cached->fops->unlink, &local->loc2, 0, xattr); } if (xattr) dict_unref (xattr); return 0; unwind: WIPE (&local->preoldparent); WIPE (&local->postoldparent); WIPE (&local->preparent); WIPE (&local->postparent); dht_rename_done (frame, this); return 0; cleanup: dht_rename_cleanup (frame); return 0; } int dht_do_rename (call_frame_t *frame) { dht_local_t *local = NULL; xlator_t *dst_hashed = NULL; xlator_t *src_cached = NULL; xlator_t *dst_cached = NULL; xlator_t *this = NULL; xlator_t *rename_subvol = NULL; local = frame->local; this = frame->this; dst_hashed = local->dst_hashed; dst_cached = local->dst_cached; src_cached = local->src_cached; if (src_cached == dst_cached) rename_subvol = src_cached; else rename_subvol = dst_hashed; if ((src_cached != dst_hashed) && (rename_subvol == dst_hashed)) { DHT_MARKER_DONT_ACCOUNT(local->xattr_req); } if (rename_subvol == src_cached) { DHT_CHANGELOG_TRACK_AS_RENAME(local->xattr_req, &local->loc, &local->loc2); } gf_msg_trace (this->name, 0, "renaming %s => %s (%s)", local->loc.path, local->loc2.path, rename_subvol->name); if (local->linked == _gf_true) FRAME_SU_DO (frame, dht_local_t); STACK_WIND_COOKIE (frame, dht_rename_cbk, rename_subvol, rename_subvol, rename_subvol->fops->rename, &local->loc, &local->loc2, local->xattr_req); return 0; } int dht_rename_link_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, inode_t *inode, struct iatt *stbuf, struct iatt *preparent, struct iatt *postparent, dict_t *xdata) { dht_local_t *local = NULL; xlator_t *prev = NULL; local = frame->local; prev = cookie; if (op_ret == -1) { gf_msg_debug (this->name, 0, "link/file on %s failed (%s)", prev->name, strerror (op_errno)); local->op_ret = -1; local->op_errno = op_errno; local->added_link = _gf_false; } else dht_iatt_merge (this, &local->stbuf, stbuf); if (local->op_ret == -1) goto cleanup; dht_do_rename (frame); return 0; cleanup: dht_rename_cleanup (frame); return 0; } int dht_rename_linkto_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, inode_t *inode, struct iatt *stbuf, struct iatt *preparent, struct iatt *postparent, dict_t *xdata) { dht_local_t *local = NULL; xlator_t *prev = NULL; xlator_t *src_cached = NULL; dict_t *xattr = NULL; local = frame->local; DHT_MARK_FOP_INTERNAL (xattr); prev = cookie; src_cached = local->src_cached; if (op_ret == -1) { gf_msg_debug (this->name, 0, "link/file on %s failed (%s)", prev->name, strerror (op_errno)); local->op_ret = -1; local->op_errno = op_errno; } /* If linkto creation failed move to failure cleanup code, * instead of continuing with creating the link file */ if (local->op_ret != 0) { goto cleanup; } gf_msg_trace (this->name, 0, "link %s => %s (%s)", local->loc.path, local->loc2.path, src_cached->name); if (gf_uuid_compare (local->loc.pargfid, local->loc2.pargfid) == 0) { DHT_MARKER_DONT_ACCOUNT(xattr); } local->added_link = _gf_true; STACK_WIND_COOKIE (frame, dht_rename_link_cbk, src_cached, src_cached, src_cached->fops->link, &local->loc, &local->loc2, xattr); if (xattr) dict_unref (xattr); return 0; cleanup: dht_rename_cleanup (frame); if (xattr) dict_unref (xattr); return 0; } int dht_rename_unlink_links_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, struct iatt *preparent, struct iatt *postparent, dict_t *xdata) { dht_local_t *local = NULL; xlator_t *prev = NULL; local = frame->local; prev = cookie; if ((op_ret == -1) && (op_errno != ENOENT)) { gf_msg_debug (this->name, 0, "unlink of %s on %s failed (%s)", local->loc2.path, prev->name, strerror (op_errno)); local->op_ret = -1; local->op_errno = op_errno; } if (local->op_ret == -1) goto cleanup; dht_do_rename (frame); return 0; cleanup: dht_rename_cleanup (frame); return 0; } int dht_rename_create_links (call_frame_t *frame) { dht_local_t *local = NULL; xlator_t *this = NULL; xlator_t *src_hashed = NULL; xlator_t *src_cached = NULL; xlator_t *dst_hashed = NULL; xlator_t *dst_cached = NULL; int call_cnt = 0; dict_t *xattr = NULL; local = frame->local; this = frame->this; src_hashed = local->src_hashed; src_cached = local->src_cached; dst_hashed = local->dst_hashed; dst_cached = local->dst_cached; DHT_MARK_FOP_INTERNAL (xattr); if (src_cached == dst_cached) { dict_t *xattr_new = NULL; if (dst_hashed == dst_cached) goto nolinks; xattr_new = dict_copy_with_ref (xattr, NULL); gf_msg_trace (this->name, 0, "unlinking dst linkfile %s @ %s", local->loc2.path, dst_hashed->name); DHT_MARKER_DONT_ACCOUNT(xattr_new); STACK_WIND_COOKIE (frame, dht_rename_unlink_links_cbk, dst_hashed, dst_hashed, dst_hashed->fops->unlink, &local->loc2, 0, xattr_new); dict_unref (xattr_new); if (xattr) dict_unref (xattr); return 0; } if (src_cached != dst_hashed) { /* needed to create the link file */ call_cnt++; if (dst_hashed != src_hashed) /* needed to create the linkto file */ call_cnt ++; } /* We should not have any failures post the link creation, as this * introduces the newname into the namespace. Clients could have cached * the existence of the newname and may start taking actions based on * the same. Hence create the linkto first, and then attempt the link. * * NOTE: If another client is attempting the same oldname -> newname * rename, and finds both file names as existing, and are hard links * to each other, then FUSE would send in an unlink for oldname. In * this time duration if we treat the linkto as a critical error and * unlink the newname we created, we would have effectively lost the * file to rename operations. */ if (dst_hashed != src_hashed && src_cached != dst_hashed) { gf_msg_trace (this->name, 0, "linkfile %s @ %s => %s", local->loc.path, dst_hashed->name, src_cached->name); memcpy (local->gfid, local->loc.inode->gfid, 16); dht_linkfile_create (frame, dht_rename_linkto_cbk, this, src_cached, dst_hashed, &local->loc); } else if (src_cached != dst_hashed) { dict_t *xattr_new = NULL; xattr_new = dict_copy_with_ref (xattr, NULL); gf_msg_trace (this->name, 0, "link %s => %s (%s)", local->loc.path, local->loc2.path, src_cached->name); if (gf_uuid_compare (local->loc.pargfid, local->loc2.pargfid) == 0) { DHT_MARKER_DONT_ACCOUNT(xattr_new); } local->added_link = _gf_true; STACK_WIND_COOKIE (frame, dht_rename_link_cbk, src_cached, src_cached, src_cached->fops->link, &local->loc, &local->loc2, xattr_new); dict_unref (xattr_new); } nolinks: if (!call_cnt) { /* skip to next step */ dht_do_rename (frame); } if (xattr) dict_unref (xattr); return 0; } int dht_rename_lookup_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int op_ret, int op_errno, inode_t *inode, struct iatt *stbuf, dict_t *xattr, struct iatt *postparent) { dht_local_t *local = NULL; int call_cnt = 0; dht_conf_t *conf = NULL; char gfid_local[GF_UUID_BUF_SIZE] = {0}; char gfid_server[GF_UUID_BUF_SIZE] = {0}; int child_index = -1; gf_boolean_t is_src = _gf_false; loc_t *loc = NULL; child_index = (long)cookie; local = frame->local; conf = this->private; is_src = (child_index == 0); if (is_src) loc = &local->loc; else loc = &local->loc2; if (op_ret >= 0) { if (is_src) local->src_cached = dht_subvol_get_cached (this, local->loc.inode); else { if (loc->inode) gf_uuid_unparse (loc->inode->gfid, gfid_local); gf_msg_debug (this->name, 0, "dst_cached before lookup: %s, " "(path:%s)(gfid:%s),", local->loc2.path, local->dst_cached ? local->dst_cached->name : NULL, local->dst_cached ? gfid_local : NULL); local->dst_cached = dht_subvol_get_cached (this, local->loc2_copy.inode); gf_uuid_unparse (stbuf->ia_gfid, gfid_local); gf_msg_debug (this->name, GF_LOG_WARNING, "dst_cached after lookup: %s, " "(path:%s)(gfid:%s)", local->loc2.path, local->dst_cached ? local->dst_cached->name : NULL, local->dst_cached ? gfid_local : NULL); if ((local->loc2.inode == NULL) || gf_uuid_compare (stbuf->ia_gfid, local->loc2.inode->gfid)) { if (local->loc2.inode != NULL) { inode_unlink (local->loc2.inode, local->loc2.parent, local->loc2.name); inode_unref (local->loc2.inode); } local->loc2.inode = inode_link (local->loc2_copy.inode, local->loc2_copy.parent, local->loc2_copy.name, stbuf); gf_uuid_copy (local->loc2.gfid, stbuf->ia_gfid); } } } if (op_ret < 0) { if (is_src) { /* The meaning of is_linkfile is overloaded here. For locking * to work properly both rebalance and rename should acquire * lock on datafile. The reason for sending this lookup is to * find out whether we've acquired a lock on data file. * Between the lookup before rename and this rename, the * file could be migrated by a rebalance process and now this * file this might be a linkto file. We verify that by sending * this lookup. However, if this lookup fails we cannot really * say whether we've acquired lock on a datafile or linkto file. * So, we act conservatively and _assume_ * that this is a linkfile and fail the rename operation. */ local->is_linkfile = _gf_true; local->op_errno = op_errno; } else { if (local->dst_cached) gf_msg_debug (this->name, op_errno, "file %s (gfid:%s) was present " "(hashed-subvol=%s, " "cached-subvol=%s) before rename," " but lookup failed", local->loc2.path, uuid_utoa (local->loc2.inode->gfid), local->dst_hashed->name, local->dst_cached->name); if (dht_inode_missing (op_errno)) local->dst_cached = NULL; } } else if (is_src && xattr && check_is_linkfile (inode, stbuf, xattr, conf->link_xattr_name)) { local->is_linkfile = _gf_true; /* Found linkto file instead of data file, passdown ENOENT * based on the above comment */ local->op_errno = ENOENT; } if (!local->is_linkfile && (op_ret >= 0) && gf_uuid_compare (loc->gfid, stbuf->ia_gfid)) { gf_uuid_unparse (loc->gfid, gfid_local); gf_uuid_unparse (stbuf->ia_gfid, gfid_server); gf_msg (this->name, GF_LOG_WARNING, 0, DHT_MSG_GFID_MISMATCH, "path:%s, received a different gfid, local_gfid= %s" " server_gfid: %s", local->loc.path, gfid_local, gfid_server); /* Will passdown ENOENT anyway since the file we sent on * rename is replaced with a different file */ local->op_errno = ENOENT; /* Since local->is_linkfile is used here to detect failure, * marking this to true */ local->is_linkfile = _gf_true; } call_cnt = dht_frame_return (frame); if (is_last_call (call_cnt)) { if (local->is_linkfile) { local->op_ret = -1; goto fail; } dht_rename_create_links (frame); } return 0; fail: dht_rename_unlock (frame, this); return 0; } int dht_rename_file_lock1_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, dict_t *xdata) { dht_local_t *local = NULL; char src_gfid[GF_UUID_BUF_SIZE] = {0}; char dst_gfid[GF_UUID_BUF_SIZE] = {0}; int ret = 0; loc_t *loc = NULL; xlator_t *subvol = NULL; local = frame->local; if (op_ret < 0) { uuid_utoa_r (local->loc.inode->gfid, src_gfid); if (local->loc2.inode) uuid_utoa_r (local->loc2.inode->gfid, dst_gfid); gf_msg (this->name, GF_LOG_WARNING, op_errno, DHT_MSG_INODE_LK_ERROR, "protecting namespace of %s failed" "rename (%s:%s:%s %s:%s:%s)", local->current == &local->lock[0] ? local->loc.path : local->loc2.path, local->loc.path, src_gfid, local->src_hashed->name, local->loc2.path, dst_gfid, local->dst_hashed ? local->dst_hashed->name : NULL); local->op_ret = -1; local->op_errno = op_errno; goto err; } if (local->current == &local->lock[0]) { loc = &local->loc2; subvol = local->dst_hashed; local->current = &local->lock[1]; } else { loc = &local->loc; subvol = local->src_hashed; local->current = &local->lock[0]; } ret = dht_protect_namespace (frame, loc, subvol, &local->current->ns, dht_rename_lock_cbk); if (ret < 0) { op_errno = EINVAL; goto err; } return 0; err: /* No harm in calling an extra unlock */ dht_rename_unlock (frame, this); return 0; } int32_t dht_rename_file_protect_namespace (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, dict_t *xdata) { dht_local_t *local = NULL; char src_gfid[GF_UUID_BUF_SIZE] = {0}; char dst_gfid[GF_UUID_BUF_SIZE] = {0}; int ret = 0; loc_t *loc = NULL; xlator_t *subvol = NULL; local = frame->local; if (op_ret < 0) { uuid_utoa_r (local->loc.inode->gfid, src_gfid); if (local->loc2.inode) uuid_utoa_r (local->loc2.inode->gfid, dst_gfid); gf_msg (this->name, GF_LOG_WARNING, op_errno, DHT_MSG_INODE_LK_ERROR, "acquiring inodelk failed " "rename (%s:%s:%s %s:%s:%s)", local->loc.path, src_gfid, local->src_cached->name, local->loc2.path, dst_gfid, local->dst_cached ? local->dst_cached->name : NULL); local->op_ret = -1; local->op_errno = op_errno; goto err; } /* Locks on src and dst needs to ordered which otherwise might cause * deadlocks when rename (src, dst) and rename (dst, src) is done from * two different clients */ dht_order_rename_lock (frame, &loc, &subvol); ret = dht_protect_namespace (frame, loc, subvol, &local->current->ns, dht_rename_file_lock1_cbk); if (ret < 0) { op_errno = EINVAL; goto err; } return 0; err: /* Its fine to call unlock even when no locks are acquired, as we check * for lock->locked before winding a unlock call. */ dht_rename_unlock (frame, this); return 0; } int32_t dht_rename_lock_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, dict_t *xdata) { dht_local_t *local = NULL; char src_gfid[GF_UUID_BUF_SIZE] = {0}; char dst_gfid[GF_UUID_BUF_SIZE] = {0}; dict_t *xattr_req = NULL; dht_conf_t *conf = NULL; int i = 0; xlator_t *subvol = NULL; dht_lock_t *lock = NULL; local = frame->local; conf = this->private; if (op_ret < 0) { uuid_utoa_r (local->loc.inode->gfid, src_gfid); if (local->loc2.inode) uuid_utoa_r (local->loc2.inode->gfid, dst_gfid); gf_msg (this->name, GF_LOG_WARNING, op_errno, DHT_MSG_INODE_LK_ERROR, "protecting namespace of %s failed. " "rename (%s:%s:%s %s:%s:%s)", local->current == &local->lock[0] ? local->loc.path : local->loc2.path, local->loc.path, src_gfid, local->src_hashed->name, local->loc2.path, dst_gfid, local->dst_hashed ? local->dst_hashed->name : NULL); local->op_ret = -1; local->op_errno = op_errno; goto done; } xattr_req = dict_new (); if (xattr_req == NULL) { local->op_ret = -1; local->op_errno = ENOMEM; goto done; } op_ret = dict_set_uint32 (xattr_req, conf->link_xattr_name, 256); if (op_ret < 0) { local->op_ret = -1; local->op_errno = -op_ret; goto done; } /* dst_cached might've changed. This normally happens for two reasons: * 1. rebalance migrated dst * 2. Another parallel rename was done overwriting dst * * Doing a lookup on local->loc2 when dst exists, but is associated * with a different gfid will result in an ESTALE error. So, do a fresh * lookup with a new inode on dst-path and handle change of dst-cached * in the cbk. Also, to identify dst-cached changes we do a lookup on * "this" rather than the subvol. */ loc_copy (&local->loc2_copy, &local->loc2); inode_unref (local->loc2_copy.inode); local->loc2_copy.inode = inode_new (local->loc.inode->table); /* Why not use local->lock.locks[?].loc for lookup post lock phase * --------------------------------------------------------------- * "layout.parent_layout.locks[?].loc" does not have the name and pargfid * populated. * Reason: If we had populated the name and pargfid, server might * resolve to a successful lookup even if there is a file with same name * with a different gfid(unlink & create) as server does name based * resolution on first priority. And this can result in operating on a * different inode entirely. * * Now consider a scenario where source file was renamed by some other * client to a new name just before this lock was granted. So if a * lookup would be done on local->lock[0].layout.parent_layout.locks[?].loc, * server will send success even if the entry was renamed (since server will * do a gfid based resolution). So once a lock is granted, make sure the file * exists with the name that the client requested with. * */ local->call_cnt = 2; for (i = 0; i < 2; i++) { if (i == 0) { lock = local->rename_inodelk_backward_compatible[0]; if (gf_uuid_compare (local->loc.gfid, lock->loc.gfid) == 0) subvol = lock->xl; else { lock = local->rename_inodelk_backward_compatible[1]; subvol = lock->xl; } } else { subvol = this; } STACK_WIND_COOKIE (frame, dht_rename_lookup_cbk, (void *)(long)i, subvol, subvol->fops->lookup, (i == 0) ? &local->loc : &local->loc2_copy, xattr_req); } dict_unref (xattr_req); return 0; done: /* Its fine to call unlock even when no locks are acquired, as we check * for lock->locked before winding a unlock call. */ dht_rename_unlock (frame, this); if (xattr_req) dict_unref (xattr_req); return 0; } int dht_rename_lock (call_frame_t *frame) { dht_local_t *local = NULL; int count = 1, ret = -1; dht_lock_t **lk_array = NULL; local = frame->local; if (local->dst_cached) count++; lk_array = GF_CALLOC (count, sizeof (*lk_array), gf_common_mt_pointer); if (lk_array == NULL) goto err; lk_array[0] = dht_lock_new (frame->this, local->src_cached, &local->loc, F_WRLCK, DHT_FILE_MIGRATE_DOMAIN, NULL, FAIL_ON_ANY_ERROR); if (lk_array[0] == NULL) goto err; if (local->dst_cached) { /* dst might be removed by the time inodelk reaches bricks, * which can result in ESTALE errors. POSIX imposes no * restriction for dst to be present for renames to be * successful. So, we'll ignore ESTALE errors. As far as * synchronization on dst goes, we'll achieve the same by * holding entrylk on parent directory of dst in the namespace * of basename(dst). Also, there might not be quorum in cluster * xlators like EC/disperse on errno, in which case they return * EIO. For eg., in a disperse (4 + 2), 3 might return success * and three might return ESTALE. Disperse, having no Quorum * unwinds inodelk with EIO. So, ignore EIO too. */ lk_array[1] = dht_lock_new (frame->this, local->dst_cached, &local->loc2, F_WRLCK, DHT_FILE_MIGRATE_DOMAIN, NULL, IGNORE_ENOENT_ESTALE_EIO); if (lk_array[1] == NULL) goto err; } local->rename_inodelk_backward_compatible = lk_array; local->rename_inodelk_bc_count = count; /* retaining inodelks for the sake of backward compatibility. Please * make sure to remove this inodelk once all of 3.10, 3.12 and 3.13 * reach EOL. Better way of getting synchronization would be to acquire * entrylks on src and dst parent directories in the namespace of * basenames of src and dst */ ret = dht_blocking_inodelk (frame, lk_array, count, dht_rename_file_protect_namespace); if (ret < 0) { local->rename_inodelk_backward_compatible = NULL; local->rename_inodelk_bc_count = 0; goto err; } return 0; err: if (lk_array != NULL) { int tmp_count = 0, i = 0; for (i = 0; (i < count) && (lk_array[i]); i++, tmp_count++); dht_lock_array_free (lk_array, tmp_count); GF_FREE (lk_array); } return -1; } int dht_rename (call_frame_t *frame, xlator_t *this, loc_t *oldloc, loc_t *newloc, dict_t *xdata) { xlator_t *src_cached = NULL; xlator_t *src_hashed = NULL; xlator_t *dst_cached = NULL; xlator_t *dst_hashed = NULL; int op_errno = -1; int ret = -1; dht_local_t *local = NULL; dht_conf_t *conf = NULL; char gfid[GF_UUID_BUF_SIZE] = {0}; char newgfid[GF_UUID_BUF_SIZE] = {0}; gf_boolean_t free_xdata = _gf_false; VALIDATE_OR_GOTO (frame, err); VALIDATE_OR_GOTO (this, err); VALIDATE_OR_GOTO (oldloc, err); VALIDATE_OR_GOTO (newloc, err); conf = this->private; if (conf->subvolume_cnt == 1) { if (!IA_ISDIR (oldloc->inode->ia_type)) { if (!xdata) { free_xdata = _gf_true; } DHT_CHANGELOG_TRACK_AS_RENAME(xdata, oldloc, newloc); } default_rename (frame, this, oldloc, newloc, xdata); if (free_xdata && xdata) { dict_unref(xdata); xdata = NULL; } return 0; } gf_uuid_unparse(oldloc->inode->gfid, gfid); src_hashed = dht_subvol_get_hashed (this, oldloc); if (!src_hashed) { gf_msg (this->name, GF_LOG_INFO, 0, DHT_MSG_RENAME_FAILED, "No hashed subvolume in layout for path=%s," "(gfid = %s)", oldloc->path, gfid); op_errno = EINVAL; goto err; } src_cached = dht_subvol_get_cached (this, oldloc->inode); if (!src_cached) { gf_msg (this->name, GF_LOG_INFO, 0, DHT_MSG_RENAME_FAILED, "No cached subvolume for path = %s," "(gfid = %s)", oldloc->path, gfid); op_errno = EINVAL; goto err; } dst_hashed = dht_subvol_get_hashed (this, newloc); if (!dst_hashed) { gf_msg (this->name, GF_LOG_INFO, 0, DHT_MSG_RENAME_FAILED, "No hashed subvolume in layout for path=%s", newloc->path); op_errno = EINVAL; goto err; } if (newloc->inode) dst_cached = dht_subvol_get_cached (this, newloc->inode); local = dht_local_init (frame, oldloc, NULL, GF_FOP_RENAME); if (!local) { op_errno = ENOMEM; goto err; } /* cached_subvol will be set from dht_local_init, reset it to NULL, as the logic of handling rename is different */ local->cached_subvol = NULL; ret = loc_copy (&local->loc2, newloc); if (ret == -1) { op_errno = ENOMEM; goto err; } local->src_hashed = src_hashed; local->src_cached = src_cached; local->dst_hashed = dst_hashed; local->dst_cached = dst_cached; if (xdata) local->xattr_req = dict_ref (xdata); if (newloc->inode) gf_uuid_unparse(newloc->inode->gfid, newgfid); gf_msg (this->name, GF_LOG_INFO, 0, DHT_MSG_RENAME_INFO, "renaming %s (%s) (hash=%s/cache=%s) => %s (%s) " "(hash=%s/cache=%s) ", oldloc->path, gfid, src_hashed->name, src_cached->name, newloc->path, newloc->inode ? newgfid : NULL, dst_hashed->name, dst_cached ? dst_cached->name : ""); if (IA_ISDIR (oldloc->inode->ia_type)) { dht_rename_dir (frame, this); } else { local->op_ret = 0; ret = dht_rename_lock (frame); if (ret < 0) { op_errno = ENOMEM; goto err; } } return 0; err: op_errno = (op_errno == -1) ? errno : op_errno; DHT_STACK_UNWIND (rename, frame, -1, op_errno, NULL, NULL, NULL, NULL, NULL, NULL); return 0; }