summaryrefslogtreecommitdiffstats
path: root/xlators/cluster/dht/src/dht-common.c
diff options
context:
space:
mode:
Diffstat (limited to 'xlators/cluster/dht/src/dht-common.c')
-rw-r--r--xlators/cluster/dht/src/dht-common.c248
1 files changed, 203 insertions, 45 deletions
diff --git a/xlators/cluster/dht/src/dht-common.c b/xlators/cluster/dht/src/dht-common.c
index ba082e28956..e3af5a7fe8b 100644
--- a/xlators/cluster/dht/src/dht-common.c
+++ b/xlators/cluster/dht/src/dht-common.c
@@ -41,6 +41,11 @@ dht_setxattr2 (xlator_t *this, xlator_t *subvol, call_frame_t *frame,
int ret);
+int
+dht_rmdir_readdirp_do (call_frame_t *readdirp_frame, xlator_t *this);
+
+
+
/* Sets the blocks and size values to fixed values. This is to be called
* only for dirs. The caller is responsible for checking the type
*/
@@ -8211,8 +8216,8 @@ dht_rmdir_linkfile_unlink_cbk (call_frame_t *frame, void *cookie, xlator_t *this
dht_local_t *local = NULL;
xlator_t *prev = NULL;
xlator_t *src = NULL;
- call_frame_t *main_frame = NULL;
- dht_local_t *main_local = NULL;
+ call_frame_t *readdirp_frame = NULL;
+ dht_local_t *readdirp_local = NULL;
int this_call_cnt = 0;
char gfid[GF_UUID_BUF_SIZE] ={0};
@@ -8221,8 +8226,9 @@ dht_rmdir_linkfile_unlink_cbk (call_frame_t *frame, void *cookie, xlator_t *this
prev = cookie;
src = prev;
- main_frame = local->main_frame;
- main_local = main_frame->local;
+
+ readdirp_frame = local->main_frame;
+ readdirp_local = readdirp_frame->local;
gf_uuid_unparse(local->loc.gfid, gfid);
@@ -8231,16 +8237,18 @@ dht_rmdir_linkfile_unlink_cbk (call_frame_t *frame, void *cookie, xlator_t *this
"Unlinked linkfile %s on %s, gfid = %s",
local->loc.path, src->name, gfid);
} else {
- main_local->op_ret = -1;
- main_local->op_errno = op_errno;
+ if (op_errno != ENOENT) {
+ readdirp_local->op_ret = -1;
+ readdirp_local->op_errno = op_errno;
+ }
gf_msg_debug (this->name, op_errno,
"Unlink of %s on %s failed. (gfid = %s)",
local->loc.path, src->name, gfid);
}
- this_call_cnt = dht_frame_return (main_frame);
+ this_call_cnt = dht_frame_return (readdirp_frame);
if (is_last_call (this_call_cnt))
- dht_rmdir_do (main_frame, this);
+ dht_rmdir_readdirp_do (readdirp_frame, this);
DHT_STACK_DESTROY (frame);
return 0;
@@ -8255,8 +8263,8 @@ dht_rmdir_lookup_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
dht_local_t *local = NULL;
xlator_t *prev = NULL;
xlator_t *src = NULL;
- call_frame_t *main_frame = NULL;
- dht_local_t *main_local = NULL;
+ call_frame_t *readdirp_frame = NULL;
+ dht_local_t *readdirp_local = NULL;
int this_call_cnt = 0;
dht_conf_t *conf = this->private;
char gfid[GF_UUID_BUF_SIZE] = {0};
@@ -8265,17 +8273,24 @@ dht_rmdir_lookup_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
prev = cookie;
src = prev;
- main_frame = local->main_frame;
- main_local = main_frame->local;
+gf_log ("dht", GF_LOG_INFO, "dht_rmdir_lookup_cbk %s", local->loc.path);
+ readdirp_frame = local->main_frame;
+ readdirp_local = readdirp_frame->local;
- if (op_ret != 0)
+ if (op_ret != 0) {
+
+ gf_msg (this->name, GF_LOG_WARNING, op_errno,
+ DHT_MSG_NOT_LINK_FILE_ERROR,
+ "lookup failed for %s on %s (type=0%o)",
+ local->loc.path, src->name, stbuf->ia_type);
goto err;
+ }
if (!check_is_linkfile (inode, stbuf, xattr, conf->link_xattr_name)) {
- main_local->op_ret = -1;
- main_local->op_errno = ENOTEMPTY;
+ readdirp_local->op_ret = -1;
+ readdirp_local->op_errno = ENOTEMPTY;
- gf_uuid_unparse(local->loc.gfid, gfid);
+ gf_uuid_unparse(local->loc.gfid, gfid);
gf_msg (this->name, GF_LOG_WARNING, 0,
DHT_MSG_NOT_LINK_FILE_ERROR,
@@ -8289,9 +8304,9 @@ dht_rmdir_lookup_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
return 0;
err:
- this_call_cnt = dht_frame_return (main_frame);
+ this_call_cnt = dht_frame_return (readdirp_frame);
if (is_last_call (this_call_cnt))
- dht_rmdir_do (main_frame, this);
+ dht_rmdir_readdirp_do (readdirp_frame, this);
DHT_STACK_DESTROY (frame);
return 0;
@@ -8304,24 +8319,30 @@ dht_rmdir_cached_lookup_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
struct iatt *stbuf, dict_t *xattr,
struct iatt *parent)
{
- dht_local_t *local = NULL;
- xlator_t *src = NULL;
- call_frame_t *main_frame = NULL;
- dht_local_t *main_local = NULL;
- int this_call_cnt = 0;
- dht_conf_t *conf = this->private;
- dict_t *xattrs = NULL;
- int ret = 0;
+ dht_local_t *local = NULL;
+ xlator_t *src = NULL;
+ call_frame_t *readdirp_frame = NULL;
+ dht_local_t *readdirp_local = NULL;
+ int this_call_cnt = 0;
+ dht_conf_t *conf = this->private;
+ dict_t *xattrs = NULL;
+ int ret = 0;
local = frame->local;
src = local->hashed_subvol;
- main_frame = local->main_frame;
- main_local = main_frame->local;
+
+ /* main_frame here is the readdirp_frame */
+
+ readdirp_frame = local->main_frame;
+ readdirp_local = readdirp_frame->local;
+
+ gf_msg_debug (this->name, 0, "returning for %s ",
+ local->loc.path);
if (op_ret == 0) {
- main_local->op_ret = -1;
- main_local->op_errno = ENOTEMPTY;
+ readdirp_local->op_ret = -1;
+ readdirp_local->op_errno = ENOTEMPTY;
gf_msg (this->name, GF_LOG_WARNING, 0,
DHT_MSG_SUBVOL_ERROR,
@@ -8329,8 +8350,13 @@ dht_rmdir_cached_lookup_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
local->loc.path, src->name);
goto err;
} else if (op_errno != ENOENT) {
- main_local->op_ret = -1;
- main_local->op_errno = op_errno;
+ readdirp_local->op_ret = -1;
+ readdirp_local->op_errno = op_errno;
+
+ gf_msg (this->name, GF_LOG_WARNING, op_errno,
+ DHT_MSG_SUBVOL_ERROR,
+ "%s not found on cached subvol %s",
+ local->loc.path, src->name);
goto err;
}
@@ -8351,7 +8377,6 @@ dht_rmdir_cached_lookup_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
dict_unref (xattrs);
goto err;
}
-
STACK_WIND_COOKIE (frame, dht_rmdir_lookup_cbk, src, src,
src->fops->lookup, &local->loc, xattrs);
if (xattrs)
@@ -8360,9 +8385,16 @@ dht_rmdir_cached_lookup_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
return 0;
err:
- this_call_cnt = dht_frame_return (main_frame);
+ this_call_cnt = dht_frame_return (readdirp_frame);
+
+ /* Once all the lookups/unlinks etc have returned, proceed to wind
+ * readdirp on the subvol again until no entries are returned.
+ * This is required if there are more entries than can be returned
+ * in a single readdirp call.
+ */
+
if (is_last_call (this_call_cnt))
- dht_rmdir_do (main_frame, this);
+ dht_rmdir_readdirp_do (readdirp_frame, this);
DHT_STACK_DESTROY (frame);
return 0;
@@ -8457,12 +8489,13 @@ dht_rmdir_is_subvol_empty (call_frame_t *frame, xlator_t *this,
gf_uuid_unparse(lookup_local->loc.gfid, gfid);
- gf_msg_trace (this->name, 0,
+ gf_msg_debug (this->name, 0,
"looking up %s on subvolume %s, gfid = %s",
lookup_local->loc.path, src->name, gfid);
LOCK (&frame->lock);
{
+ /* Increment the call count for the readdir frame */
local->call_cnt++;
}
UNLOCK (&frame->lock);
@@ -8470,15 +8503,27 @@ dht_rmdir_is_subvol_empty (call_frame_t *frame, xlator_t *this,
subvol = dht_linkfile_subvol (this, NULL, &trav->d_stat,
trav->dict);
if (!subvol) {
+
gf_msg (this->name, GF_LOG_INFO, 0,
DHT_MSG_INVALID_LINKFILE,
"Linkfile does not have link subvolume. "
"path = %s, gfid = %s",
lookup_local->loc.path, gfid);
+
+ gf_msg_debug (this->name, 0,
+ "looking up %s on subvolume %s, gfid = %s",
+ lookup_local->loc.path, src->name, gfid);
+
STACK_WIND_COOKIE (lookup_frame, dht_rmdir_lookup_cbk,
src, src, src->fops->lookup,
&lookup_local->loc, xattrs);
} else {
+ gf_msg_debug (this->name, 0,
+ "Looking up linkfile target %s on "
+ " subvolume %s, gfid = %s",
+ lookup_local->loc.path, subvol->name,
+ gfid);
+
STACK_WIND (lookup_frame, dht_rmdir_cached_lookup_cbk,
subvol, subvol->fops->lookup,
&lookup_local->loc, xattrs);
@@ -8500,17 +8545,56 @@ err:
}
+
+/*
+ * No more entries on this subvol. Proceed to the actual rmdir operation.
+ */
+
+void
+dht_rmdir_readdirp_done (call_frame_t *readdirp_frame, xlator_t *this)
+{
+
+ call_frame_t *main_frame = NULL;
+ dht_local_t *main_local = NULL;
+ dht_local_t *local = NULL;
+ int this_call_cnt = 0;
+
+
+ local = readdirp_frame->local;
+ main_frame = local->main_frame;
+ main_local = main_frame->local;
+
+ /* At least one readdirp failed.
+ * This is a bit hit or miss - if readdirp failed on more than
+ * one subvol, we don't know which error is returned.
+ */
+ if (local->op_ret == -1) {
+ main_local->op_ret = local->op_ret;
+ main_local->op_errno = local->op_errno;
+ }
+
+ this_call_cnt = dht_frame_return (main_frame);
+
+ if (is_last_call (this_call_cnt))
+ dht_rmdir_do (main_frame, this);
+
+
+ DHT_STACK_DESTROY (readdirp_frame);
+}
+
+
+
int
dht_rmdir_readdirp_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
int op_ret, int op_errno, gf_dirent_t *entries,
dict_t *xdata)
{
dht_local_t *local = NULL;
- int this_call_cnt = -1;
xlator_t *prev = NULL;
xlator_t *src = NULL;
int ret = 0;
+
local = frame->local;
prev = cookie;
src = prev;
@@ -8526,7 +8610,7 @@ dht_rmdir_readdirp_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
local->loc.path, op_ret);
local->op_ret = -1;
local->op_errno = ENOTEMPTY;
- break;
+ goto done;
default:
/* @ret number of linkfiles are getting unlinked */
gf_msg_trace (this->name, 0,
@@ -8535,17 +8619,54 @@ dht_rmdir_readdirp_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
local->loc.path, ret);
break;
}
+
}
- this_call_cnt = dht_frame_return (frame);
- if (is_last_call (this_call_cnt)) {
- dht_rmdir_do (frame, this);
+ if (ret) {
+ return 0;
}
+done:
+ /* readdirp failed or no linkto files were found on this subvol */
+
+ dht_rmdir_readdirp_done (frame, this);
return 0;
}
+/* Keep sending readdirp on the subvol until it returns no more entries
+ * It is possible that not all entries will fit in a single readdirp in which
+ * case the rmdir will keep failing with ENOTEMPTY
+ */
+
+int
+dht_rmdir_readdirp_do (call_frame_t *readdirp_frame, xlator_t *this)
+{
+ dht_local_t *local = NULL;
+
+ local = readdirp_frame->local;
+
+ if (local->op_ret == -1) {
+ /* there is no point doing another readdirp on this
+ * subvol . */
+ dht_rmdir_readdirp_done (readdirp_frame, this);
+ return 0;
+ }
+
+ gf_msg_debug ("this->name", 0, "Calling dht_rmdir_readdirp_do for %p",
+ readdirp_frame);
+
+ STACK_WIND_COOKIE (readdirp_frame, dht_rmdir_readdirp_cbk,
+ local->hashed_subvol,
+ local->hashed_subvol,
+ local->hashed_subvol->fops->readdirp,
+ local->fd, 4096, 0, local->xattr);
+
+
+ return 0;
+
+}
+
int
dht_rmdir_opendir_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
@@ -8554,11 +8675,13 @@ dht_rmdir_opendir_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
dht_local_t *local = NULL;
int this_call_cnt = -1;
xlator_t *prev = NULL;
- dict_t *dict = NULL;
int ret = 0;
dht_conf_t *conf = this->private;
+ dict_t *dict = NULL;
int i = 0;
- char gfid[GF_UUID_BUF_SIZE] = {0};
+ char gfid[GF_UUID_BUF_SIZE] = {0};
+ dht_local_t *readdirp_local = NULL;
+ call_frame_t *readdirp_frame = NULL;
local = frame->local;
prev = cookie;
@@ -8586,6 +8709,7 @@ dht_rmdir_opendir_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
goto err;
fd_bind (fd);
+
dict = dict_new ();
if (!dict) {
local->op_ret = -1;
@@ -8601,16 +8725,50 @@ dht_rmdir_opendir_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
local->loc.path, conf->link_xattr_name);
local->call_cnt = conf->subvolume_cnt;
+
+
+ /* Create a separate frame per subvol as we might need
+ * to resend readdirp multiple times to get all the
+ * entries.
+ */
+
for (i = 0; i < conf->subvolume_cnt; i++) {
- STACK_WIND_COOKIE (frame, dht_rmdir_readdirp_cbk,
+
+ readdirp_frame = copy_frame (frame);
+
+ if (!readdirp_frame) {
+ local->call_cnt--;
+ continue;
+ }
+
+ readdirp_local = dht_local_init (readdirp_frame, &local->loc,
+ local->fd, 0);
+
+ if (!readdirp_local) {
+ DHT_STACK_DESTROY (readdirp_frame);
+ local->call_cnt--;
+ continue;
+ }
+ readdirp_local->main_frame = frame;
+ readdirp_local->op_ret = 0;
+ readdirp_local->xattr = dict_ref (dict);
+ /* overload this field to save the subvol info */
+ readdirp_local->hashed_subvol = conf->subvolumes[i];
+
+ STACK_WIND_COOKIE (readdirp_frame, dht_rmdir_readdirp_cbk,
conf->subvolumes[i], conf->subvolumes[i],
conf->subvolumes[i]->fops->readdirp,
- local->fd, 4096, 0, dict);
+ readdirp_local->fd, 4096, 0,
+ readdirp_local->xattr);
}
if (dict)
dict_unref (dict);
+ /* Could not wind readdirp to any subvol */
+ if (!local->call_cnt)
+ goto err;
+
return 0;
err: