summaryrefslogtreecommitdiffstats
path: root/xlators/cluster/dht/src/dht-common.c
diff options
context:
space:
mode:
Diffstat (limited to 'xlators/cluster/dht/src/dht-common.c')
-rw-r--r--xlators/cluster/dht/src/dht-common.c246
1 files changed, 246 insertions, 0 deletions
diff --git a/xlators/cluster/dht/src/dht-common.c b/xlators/cluster/dht/src/dht-common.c
index 785ecbc615c..d371fc442da 100644
--- a/xlators/cluster/dht/src/dht-common.c
+++ b/xlators/cluster/dht/src/dht-common.c
@@ -147,6 +147,246 @@ out:
int
+dht_discover_complete (xlator_t *this, call_frame_t *discover_frame)
+{
+ dht_local_t *local = NULL;
+ call_frame_t *main_frame = NULL;
+ int op_errno = 0;
+ int ret = -1;
+ dht_layout_t *layout = NULL;
+
+ local = discover_frame->local;
+ layout = local->layout;
+
+ LOCK(&discover_frame->lock);
+ {
+ main_frame = local->main_frame;
+ local->main_frame = NULL;
+ }
+ UNLOCK(&discover_frame->lock);
+
+ if (!main_frame)
+ return 0;
+
+ if (local->file_count && local->dir_count) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "path %s exists as a file on one subvolume "
+ "and directory on another. "
+ "Please fix it manually",
+ local->loc.path);
+ op_errno = EIO;
+ goto out;
+ }
+
+ if (local->cached_subvol) {
+ ret = dht_layout_preset (this, local->cached_subvol,
+ local->inode);
+ if (ret < 0) {
+ gf_log (this->name, GF_LOG_WARNING,
+ "failed to set layout for subvolume %s",
+ local->cached_subvol ? local->cached_subvol->name : "<nil>");
+ op_errno = EINVAL;
+ goto out;
+ }
+ } else {
+ ret = dht_layout_normalize (this, &local->loc, layout);
+
+ if (ret != 0) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "normalizing failed on %s",
+ local->loc.path);
+ op_errno = EINVAL;
+ goto out;
+ }
+
+ dht_layout_set (this, local->inode, layout);
+ }
+
+ DHT_STACK_UNWIND (lookup, main_frame, local->op_ret, local->op_errno,
+ local->inode, &local->stbuf, local->xattr,
+ &local->postparent);
+ return 0;
+out:
+ DHT_STACK_UNWIND (lookup, main_frame, -1, op_errno, NULL, NULL, NULL,
+ NULL);
+
+ return ret;
+}
+
+
+int
+dht_discover_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
+ int op_ret, int op_errno,
+ inode_t *inode, struct iatt *stbuf, dict_t *xattr,
+ struct iatt *postparent)
+{
+ dht_local_t *local = NULL;
+ int this_call_cnt = 0;
+ call_frame_t *prev = NULL;
+ dht_layout_t *layout = NULL;
+ int ret = -1;
+ int is_dir = 0;
+ int is_linkfile = 0;
+ int attempt_unwind = 0;
+
+ GF_VALIDATE_OR_GOTO ("dht", frame, out);
+ GF_VALIDATE_OR_GOTO ("dht", this, out);
+ GF_VALIDATE_OR_GOTO ("dht", frame->local, out);
+ GF_VALIDATE_OR_GOTO ("dht", this->private, out);
+ GF_VALIDATE_OR_GOTO ("dht", cookie, out);
+
+ local = frame->local;
+ prev = cookie;
+
+ layout = local->layout;
+
+ /* Check if the gfid is different for file from other node */
+ if (!op_ret && uuid_compare (local->gfid, stbuf->ia_gfid)) {
+ gf_log (this->name, GF_LOG_WARNING,
+ "%s: gfid different on %s",
+ local->loc.path, prev->this->name);
+ }
+
+
+ LOCK (&frame->lock);
+ {
+ /* TODO: assert equal mode on stbuf->st_mode and
+ local->stbuf->st_mode
+
+ else mkdir/chmod/chown and fix
+ */
+ ret = dht_layout_merge (this, layout, prev->this,
+ op_ret, op_errno, xattr);
+ if (ret)
+ gf_log (this->name, GF_LOG_WARNING,
+ "%s: failed to merge layouts", local->loc.path);
+
+ if (op_ret == -1) {
+ local->op_errno = op_errno;
+ gf_log (this->name, GF_LOG_DEBUG,
+ "lookup of %s on %s returned error (%s)",
+ local->loc.path, prev->this->name,
+ strerror (op_errno));
+
+ goto unlock;
+ }
+
+ is_linkfile = check_is_linkfile (inode, stbuf, xattr);
+ is_dir = check_is_dir (inode, stbuf, xattr);
+
+ if (is_dir) {
+ local->dir_count ++;
+ } else {
+ local->file_count ++;
+
+ if (!is_linkfile) {
+ /* real file */
+ local->cached_subvol = prev->this;
+ attempt_unwind = 1;
+ } else {
+ goto unlock;
+ }
+ }
+
+ local->op_ret = 0;
+
+ if (local->xattr == NULL) {
+ local->xattr = dict_ref (xattr);
+ } else {
+ dht_aggregate_xattr (local->xattr, xattr);
+ }
+
+ if (local->inode == NULL)
+ local->inode = inode_ref (inode);
+
+ dht_iatt_merge (this, &local->stbuf, stbuf, prev->this);
+ dht_iatt_merge (this, &local->postparent, postparent,
+ prev->this);
+ }
+unlock:
+ UNLOCK (&frame->lock);
+out:
+ this_call_cnt = dht_frame_return (frame);
+
+ if (is_last_call (this_call_cnt) || attempt_unwind) {
+ dht_discover_complete (this, frame);
+ }
+
+ if (is_last_call (this_call_cnt))
+ DHT_STACK_DESTROY (frame);
+
+ return 0;
+}
+
+
+int
+dht_discover (call_frame_t *frame, xlator_t *this, loc_t *loc)
+{
+ int ret;
+ dht_local_t *local = NULL;
+ dht_conf_t *conf = NULL;
+ int call_cnt = 0;
+ int op_errno = EINVAL;
+ int i = 0;
+ call_frame_t *discover_frame = NULL;
+
+
+ conf = this->private;
+ local = frame->local;
+
+ ret = dict_set_uint32 (local->xattr_req,
+ "trusted.glusterfs.dht", 4 * 4);
+ if (ret)
+ gf_log (this->name, GF_LOG_WARNING,
+ "%s: failed to set 'trusted.glusterfs.dht' key",
+ loc->path);
+
+ ret = dict_set_uint32 (local->xattr_req,
+ "trusted.glusterfs.dht.linkto", 256);
+ if (ret)
+ gf_log (this->name, GF_LOG_WARNING,
+ "%s: failed to set 'trusted.glusterfs.dht.linkto' key",
+ loc->path);
+
+ call_cnt = conf->subvolume_cnt;
+ local->call_cnt = call_cnt;
+
+ local->layout = dht_layout_new (this, conf->subvolume_cnt);
+
+ if (!local->layout) {
+ op_errno = ENOMEM;
+ goto err;
+ }
+
+ uuid_copy (local->gfid, loc->gfid);
+
+ discover_frame = copy_frame (frame);
+ if (!discover_frame) {
+ op_errno = ENOMEM;
+ goto err;
+ }
+
+ discover_frame->local = local;
+ frame->local = NULL;
+ local->main_frame = frame;
+
+ for (i = 0; i < call_cnt; i++) {
+ STACK_WIND (discover_frame, dht_discover_cbk,
+ conf->subvolumes[i],
+ conf->subvolumes[i]->fops->lookup,
+ &local->loc, local->xattr_req);
+ }
+
+ return 0;
+
+err:
+ DHT_STACK_UNWIND (lookup, frame, -1, op_errno, NULL, NULL, NULL, NULL);
+
+ return 0;
+}
+
+
+int
dht_lookup_dir_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
int op_ret, int op_errno,
inode_t *inode, struct iatt *stbuf, dict_t *xattr,
@@ -1086,6 +1326,12 @@ dht_lookup (call_frame_t *frame, xlator_t *this,
local->xattr_req = dict_new ();
}
+ if (uuid_is_null (loc->pargfid) && !uuid_is_null (loc->gfid) &&
+ !__is_root_gfid (loc->inode->gfid)) {
+ local->cached_subvol = NULL;
+ dht_discover (frame, this, loc);
+ return 0;
+ }
if (!hashed_subvol)
hashed_subvol = dht_subvol_get_hashed (this, loc);