summaryrefslogtreecommitdiffstats
path: root/xlators/cluster/dht
diff options
context:
space:
mode:
authorSusant Palai <spalai@redhat.com>2015-04-12 15:55:02 +0530
committerVijay Bellur <vbellur@redhat.com>2015-05-07 02:37:02 -0700
commit579186aeba940e3ec73093c48e17b5f6f94910d0 (patch)
tree5d86f55336c3d5be941718a40f91c4b3f0884f55 /xlators/cluster/dht
parent2a4f346fe57fb21330857b7eb75153dc8abc4def (diff)
rebalance: Introducing local crawl and parallel migration
The current patch address two part of the design proposed. 1. Rebalance multiple files in parallel 2. Crawl only bricks that belong to the current node Brief design explanation for the above two points. 1. Rebalance multiple files in parallel: ------------------------------------- The existing rebalance engine is single threaded. Hence, introduced multiple threads which will be running parallel to the crawler. The current rebalance migration is converted to a "Producer-Consumer" frame work. Where Producer is : Crawler Consumer is : Migrating Threads Crawler: Crawler is the main thread. The job of the crawler is now limited to fix-layout of each directory and add the files which are eligible for the migration to a global queue in a round robin manner so that we will use all the disk resources efficiently. Hence, the crawler will not be "blocked" by migration process. Producer: Producer will monitor the global queue. If any file is added to this queue, it will dqueue that entry and migrate the file. Currently 20 migration threads are spawned at the beginning of the rebalance process. Hence, multiple file migration happens in parallel. 2. Crawl only bricks that belong to the current node: -------------------------------------------------- As rebalance process is spawned per node, it migrates only the files that belongs to it's own node for the sake of load balancing. But it also reads entries from the whole cluster, which is not necessary as readdir hits other nodes. New Design: As part of the new design the rebalancer decides the subvols that are local to the rebalancer node by checking the node-uuid of root directory prior to the crawler starts. Hence, readdir won't hit the whole cluster as it has already the context of local subvols and also node-uuid request for each file can be avoided. This makes the rebalance process "more scalable". Change-Id: I6f1b44086a09df8ca23935fd213509c70cc0c050 BUG: 1217381 Signed-off-by: Susant Palai <spalai@redhat.com> Reviewed-on: http://review.gluster.org/10466 Tested-by: Gluster Build System <jenkins@build.gluster.com> Tested-by: NetBSD Build System Reviewed-by: N Balachandran <nbalacha@redhat.com>
Diffstat (limited to 'xlators/cluster/dht')
-rw-r--r--xlators/cluster/dht/src/dht-common.c130
-rw-r--r--xlators/cluster/dht/src/dht-common.h47
-rw-r--r--xlators/cluster/dht/src/dht-helper.c23
-rw-r--r--xlators/cluster/dht/src/dht-mem-types.h3
-rw-r--r--xlators/cluster/dht/src/dht-rebalance.c1157
-rw-r--r--xlators/cluster/dht/src/dht-shared.c26
6 files changed, 1134 insertions, 252 deletions
diff --git a/xlators/cluster/dht/src/dht-common.c b/xlators/cluster/dht/src/dht-common.c
index 2cfd862de65..2122839c020 100644
--- a/xlators/cluster/dht/src/dht-common.c
+++ b/xlators/cluster/dht/src/dht-common.c
@@ -2534,6 +2534,87 @@ dht_vgetxattr_fill_and_set (dht_local_t *local, dict_t **dict, xlator_t *this,
out:
return ret;
}
+int
+dht_find_local_subvol_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
+ int op_ret, int op_errno, dict_t *xattr,
+ dict_t *xdata)
+{
+ dht_local_t *local = NULL;
+ dht_conf_t *conf = NULL;
+ call_frame_t *prev = NULL;
+ int this_call_cnt = 0;
+ int ret = 0;
+ char *uuid_str = NULL;
+ uuid_t node_uuid = {0,};
+
+
+ VALIDATE_OR_GOTO (frame, out);
+ VALIDATE_OR_GOTO (frame->local, out);
+
+ local = frame->local;
+ prev = cookie;
+ conf = this->private;
+
+ LOCK (&frame->lock);
+ {
+ this_call_cnt = --local->call_cnt;
+ if (op_ret < 0) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "getxattr err (%s) for dir",
+ strerror (op_errno));
+ local->op_ret = -1;
+ local->op_errno = op_errno;
+ goto unlock;
+ }
+
+ ret = dict_get_str (xattr, local->xsel, &uuid_str);
+
+ if (ret < 0) {
+ gf_log (this->name, GF_LOG_ERROR, "Failed to "
+ "get %s", local->xsel);
+ local->op_ret = -1;
+ local->op_errno = EINVAL;
+ goto unlock;
+ }
+
+ if (gf_uuid_parse (uuid_str, node_uuid)) {
+ gf_log (this->name, GF_LOG_ERROR, "Failed to parse uuid"
+ " failed for %s", prev->this->name);
+ local->op_ret = -1;
+ local->op_errno = EINVAL;
+ goto unlock;
+ }
+
+ if (gf_uuid_compare (node_uuid, conf->defrag->node_uuid)) {
+ gf_log (this->name, GF_LOG_DEBUG, "subvol %s does not"
+ "belong to this node", prev->this->name);
+ } else {
+ conf->local_subvols[(conf->local_subvols_cnt)++]
+ = prev->this;
+ gf_log (this->name, GF_LOG_DEBUG, "subvol %s belongs to"
+ " this node", prev->this->name);
+ }
+ }
+
+ local->op_ret = 0;
+ unlock:
+ UNLOCK (&frame->lock);
+
+ if (!is_last_call (this_call_cnt))
+ goto out;
+
+ if (local->op_ret == -1) {
+ goto unwind;
+ }
+
+ DHT_STACK_UNWIND (getxattr, frame, 0, 0, NULL, NULL);
+ goto out;
+
+ unwind:
+ DHT_STACK_UNWIND (getxattr, frame, -1, local->op_errno, NULL, NULL);
+ out:
+ return 0;
+}
int
dht_vgetxattr_dir_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
@@ -2899,7 +2980,8 @@ dht_getxattr (call_frame_t *frame, xlator_t *this,
int op_errno = -1;
int i = 0;
int cnt = 0;
-
+ char *node_uuid_key = NULL;
+ int ret = -1;
VALIDATE_OR_GOTO (frame, err);
VALIDATE_OR_GOTO (this, err);
VALIDATE_OR_GOTO (loc, err);
@@ -2940,6 +3022,28 @@ dht_getxattr (call_frame_t *frame, xlator_t *this,
return 0;
}
+ if (key && DHT_IS_DIR(layout) &&
+ (!strcmp (key, GF_REBAL_FIND_LOCAL_SUBVOL))) {
+ ret = gf_asprintf
+ (&node_uuid_key, "%s", GF_XATTR_NODE_UUID_KEY);
+ if (ret == -1 || !node_uuid_key) {
+ gf_log (this->name, GF_LOG_ERROR, "Failed to copy key");
+ op_errno = ENOMEM;
+ goto err;
+ }
+ (void) strncpy (local->xsel, node_uuid_key, 256);
+ cnt = local->call_cnt = conf->subvolume_cnt;
+ for (i = 0; i < cnt; i++) {
+ STACK_WIND (frame, dht_find_local_subvol_cbk,
+ conf->subvolumes[i],
+ conf->subvolumes[i]->fops->getxattr,
+ loc, node_uuid_key, xdata);
+ }
+ if (node_uuid_key)
+ GF_FREE (node_uuid_key);
+ return 0;
+ }
+
/* for file use cached subvolume (obviously!): see if {}
* below
* for directory:
@@ -2949,6 +3053,7 @@ dht_getxattr (call_frame_t *frame, xlator_t *this,
* NOTE: Don't trust inode here, as that may not be valid
* (until inode_link() happens)
*/
+
if (key && DHT_IS_DIR(layout) &&
(XATTR_IS_PATHINFO (key)
|| (strcmp (key, GF_XATTR_NODE_UUID_KEY) == 0))) {
@@ -3838,13 +3943,24 @@ dht_opendir (call_frame_t *frame, xlator_t *this, loc_t *loc, fd_t *fd,
goto err;
}
- local->call_cnt = conf->subvolume_cnt;
+ if (!(conf->local_subvols_cnt) || !conf->defrag) {
+ local->call_cnt = conf->subvolume_cnt;
- for (i = 0; i < conf->subvolume_cnt; i++) {
- STACK_WIND (frame, dht_fd_cbk,
- conf->subvolumes[i],
- conf->subvolumes[i]->fops->opendir,
- loc, fd, xdata);
+ for (i = 0; i < conf->subvolume_cnt; i++) {
+ STACK_WIND (frame, dht_fd_cbk,
+ conf->subvolumes[i],
+ conf->subvolumes[i]->fops->opendir,
+ loc, fd, xdata);
+
+ }
+ } else {
+ local->call_cnt = conf->local_subvols_cnt;
+ for (i = 0; i < conf->local_subvols_cnt; i++) {
+ STACK_WIND (frame, dht_fd_cbk,
+ conf->local_subvols[i],
+ conf->local_subvols[i]->fops->opendir,
+ loc, fd, xdata);
+ }
}
return 0;
diff --git a/xlators/cluster/dht/src/dht-common.h b/xlators/cluster/dht/src/dht-common.h
index 3ca626feec8..0e290465d44 100644
--- a/xlators/cluster/dht/src/dht-common.h
+++ b/xlators/cluster/dht/src/dht-common.h
@@ -290,6 +290,20 @@ struct gf_defrag_pattern_list {
gf_defrag_pattern_list_t *next;
};
+struct dht_container {
+ union {
+ struct list_head list;
+ struct {
+ struct _gf_dirent_t *next;
+ struct _gf_dirent_t *prev;
+ };
+ };
+ gf_dirent_t *df_entry;
+ xlator_t *this;
+ loc_t *parent_loc;
+ dict_t *migrate_data;
+};
+
struct gf_defrag_info_ {
uint64_t total_files;
uint64_t total_data;
@@ -317,6 +331,19 @@ struct gf_defrag_info_ {
uint64_t total_files_demoted;
int write_freq_threshold;
int read_freq_threshold;
+
+ pthread_cond_t parallel_migration_cond;
+ pthread_mutex_t dfq_mutex;
+ pthread_cond_t rebalance_crawler_alarm;
+ int32_t q_entry_count;
+ int32_t global_error;
+ struct dht_container *queue;
+ int32_t crawl_done;
+ int32_t abort;
+ int32_t wakeup_crawler;
+
+ /* Hard link handle requirement */
+ synclock_t link_lock;
};
typedef struct gf_defrag_info_ gf_defrag_info_t;
@@ -394,9 +421,19 @@ struct dht_conf {
dht_methods_t *methods;
struct mem_pool *lock_pool;
+
+ /*local subvol storage for rebalance*/
+ xlator_t **local_subvols;
+ int32_t local_subvols_cnt;
};
typedef struct dht_conf dht_conf_t;
+struct dht_dfoffset_ctx {
+ xlator_t *this;
+ off_t offset;
+ int32_t readdir_done;
+};
+typedef struct dht_dfoffset_ctx dht_dfoffset_ctx_t;
struct dht_disk_layout {
uint32_t cnt;
@@ -420,6 +457,14 @@ typedef enum {
GF_DHT_WEIGHTED_DISTRIBUTION
} dht_distribution_type_t;
+struct dir_dfmeta {
+ gf_dirent_t *equeue;
+ dht_dfoffset_ctx_t *offset_var;
+ struct list_head **head;
+ struct list_head **iterator;
+ int *fetch_entries;
+};
+
#define ENTRY_MISSING(op_ret, op_errno) (op_ret == -1 && op_errno == ENOENT)
#define is_revalidate(loc) (dht_inode_ctx_layout_get (loc->inode, this, NULL) == 0)
@@ -605,6 +650,8 @@ int dht_start_rebalance_task (xlator_t *this, call_frame_t *frame);
int dht_rebalance_in_progress_check (xlator_t *this, call_frame_t *frame);
int dht_rebalance_complete_check (xlator_t *this, call_frame_t *frame);
+int
+dht_init_local_subvolumes (xlator_t *this, dht_conf_t *conf);
/* FOPS */
int32_t dht_lookup (call_frame_t *frame,
diff --git a/xlators/cluster/dht/src/dht-helper.c b/xlators/cluster/dht/src/dht-helper.c
index cab66017b84..b5114b620ce 100644
--- a/xlators/cluster/dht/src/dht-helper.c
+++ b/xlators/cluster/dht/src/dht-helper.c
@@ -731,7 +731,28 @@ err:
return -1;
}
+int
+dht_init_local_subvolumes (xlator_t *this, dht_conf_t *conf)
+{
+ xlator_list_t *subvols = NULL;
+ int cnt = 0;
+ if (!conf)
+ return -1;
+
+ for (subvols = this->children; subvols; subvols = subvols->next)
+ cnt++;
+
+ conf->local_subvols = GF_CALLOC (cnt, sizeof (xlator_t *),
+ gf_dht_mt_xlator_t);
+ if (!conf->local_subvols) {
+ return -1;
+ }
+
+ conf->local_subvols_cnt = 0;
+
+ return 0;
+}
int
dht_init_subvolumes (xlator_t *this, dht_conf_t *conf)
@@ -752,6 +773,8 @@ dht_init_subvolumes (xlator_t *this, dht_conf_t *conf)
}
conf->subvolume_cnt = cnt;
+ conf->local_subvols_cnt = 0;
+
dht_set_subvol_range(this);
cnt = 0;
diff --git a/xlators/cluster/dht/src/dht-mem-types.h b/xlators/cluster/dht/src/dht-mem-types.h
index e893eb48fd8..46028e6d9e0 100644
--- a/xlators/cluster/dht/src/dht-mem-types.h
+++ b/xlators/cluster/dht/src/dht-mem-types.h
@@ -30,6 +30,9 @@ enum gf_dht_mem_types_ {
gf_defrag_info_mt,
gf_dht_mt_inode_ctx_t,
gf_dht_mt_ctx_stat_time_t,
+ gf_dht_mt_dirent_t,
+ gf_dht_mt_container_t,
+ gf_dht_mt_octx_t,
gf_dht_mt_end
};
#endif
diff --git a/xlators/cluster/dht/src/dht-rebalance.c b/xlators/cluster/dht/src/dht-rebalance.c
index b0e21da8cb5..5e9f4d6e1a6 100644
--- a/xlators/cluster/dht/src/dht-rebalance.c
+++ b/xlators/cluster/dht/src/dht-rebalance.c
@@ -23,6 +23,36 @@
#define GF_DISK_SECTOR_SIZE 512
#define DHT_REBALANCE_PID 4242 /* Change it if required */
#define DHT_REBALANCE_BLKSIZE (128 * 1024)
+#define MAX_MIGRATOR_THREAD_COUNT 20
+#define MAX_MIGRATE_QUEUE_COUNT 500
+#define MIN_MIGRATE_QUEUE_COUNT 200
+
+#define GF_CRAWL_INDEX_MOVE(idx, sv_cnt) { \
+ idx++; \
+ idx %= sv_cnt; \
+ }
+
+#define GF_FREE_DIR_DFMETA(dir_dfmeta) { \
+ if (dir_dfmeta) { \
+ GF_FREE (dir_dfmeta->head); \
+ GF_FREE (dir_dfmeta->equeue); \
+ GF_FREE (dir_dfmeta->iterator); \
+ GF_FREE (dir_dfmeta->offset_var); \
+ GF_FREE (dir_dfmeta->fetch_entries); \
+ GF_FREE (dir_dfmeta); \
+ } \
+ } \
+
+void
+dht_set_global_defrag_error (gf_defrag_info_t *defrag, int ret)
+{
+ LOCK (&defrag->lock);
+ {
+ defrag->global_error = ret;
+ }
+ UNLOCK (&defrag->lock);
+ return;
+}
static int
dht_write_with_holes (xlator_t *to, fd_t *fd, struct iovec *vec, int count,
@@ -178,6 +208,47 @@ gf_defrag_handle_hardlink (xlator_t *this, loc_t *loc, dict_t *xattrs,
goto out;
}
+ /*
+ Parallel migration can lead to migration of the hard link multiple
+ times which can lead to data loss. Hence, adding a fresh lookup to
+ decide whether migration is required or not.
+
+ Elaborating the scenario for let say 10 hardlinks [link{1..10}]:
+ Let say the first hard link "link1" does the setxattr of the
+ new hashed subvolume info on the cached file. As there are multiple
+ threads working, we might have already all the links created on the
+ new hashed by the time we reach hardlink let say link5. Now the
+ number of links on hashed is equal to that of cached. Hence, file
+ migration will happen for link6.
+
+ Cached Hashed
+ --------T link6 rwxrwxrwx link6
+
+ Now post above state all the link file on the cached will be zero
+ byte linkto files. Hence, if we still do migration for the following
+ files link{7..10}, we will end up migrating 0 data leading to data
+ loss.
+ Hence, a lookup can make sure whether we need to migrate the
+ file or not.
+ */
+
+ ret = syncop_lookup (this, loc, NULL, NULL,
+ NULL, NULL);
+ if (ret) {
+ /*Ignore ENOENT and ESTALE as file might have been
+ migrated already*/
+ if (-ret == ENOENT || -ret == ESTALE) {
+ ret = -2;
+ goto out;
+ }
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ DHT_MSG_MIGRATE_FILE_FAILED,
+ "Migrate file failed:%s lookup failed with ret = %d",
+ loc->path, ret);
+ ret = -1;
+ goto out;
+ }
+
cached_subvol = dht_subvol_get_cached (this, loc->inode);
if (!cached_subvol) {
gf_msg (this->name, GF_LOG_ERROR, 0,
@@ -198,6 +269,11 @@ gf_defrag_handle_hardlink (xlator_t *this, loc_t *loc, dict_t *xattrs,
goto out;
}
+ if (hashed_subvol == cached_subvol) {
+ ret = -2;
+ goto out;
+ }
+
gf_log (this->name, GF_LOG_INFO, "Attempting to migrate hardlink %s "
"with gfid %s from %s -> %s", loc->name, uuid_utoa (loc->gfid),
cached_subvol->name, hashed_subvol->name);
@@ -288,7 +364,8 @@ out:
*/
static inline int
__is_file_migratable (xlator_t *this, loc_t *loc,
- struct iatt *stbuf, dict_t *xattrs, int flags)
+ struct iatt *stbuf, dict_t *xattrs, int flags,
+ gf_defrag_info_t *defrag)
{
int ret = -1;
@@ -308,13 +385,14 @@ __is_file_migratable (xlator_t *this, loc_t *loc,
if (stbuf->ia_nlink > 1) {
/* support for decomission */
if (flags == GF_DHT_MIGRATE_HARDLINK) {
- ret = gf_defrag_handle_hardlink (this, loc,
- xattrs, stbuf);
-
- /*
- Returning zero will force the file to be remigrated.
- Checkout gf_defrag_handle_hardlink for more information.
- */
+ synclock_lock (&defrag->link_lock);
+ ret = gf_defrag_handle_hardlink
+ (this, loc, xattrs, stbuf);
+ synclock_unlock (&defrag->link_lock);
+ /*
+ Returning zero will force the file to be remigrated.
+ Checkout gf_defrag_handle_hardlink for more information.
+ */
if (ret && ret != -2) {
gf_msg (this->name, GF_LOG_WARNING, 0,
DHT_MSG_MIGRATE_FILE_FAILED,
@@ -610,6 +688,7 @@ __dht_rebalance_migrate_data (xlator_t *from, xlator_t *to, fd_t *src, fd_t *dst
while (total < ia_size) {
read_size = (((ia_size - total) > DHT_REBALANCE_BLKSIZE) ?
DHT_REBALANCE_BLKSIZE : (ia_size - total));
+
ret = syncop_readv (from, src, read_size,
offset, 0, &vector, &count, &iobref, NULL,
NULL);
@@ -904,6 +983,11 @@ dht_migrate_file (xlator_t *this, loc_t *loc, xlator_t *from, xlator_t *to,
loc_t tmp_loc = {0, };
gf_boolean_t locked = _gf_false;
int lk_ret = -1;
+ gf_defrag_info_t *defrag = NULL;
+
+ defrag = conf->defrag;
+ if (!defrag)
+ goto out;
gf_log (this->name, GF_LOG_INFO, "%s: attempting to move from %s to %s",
loc->path, from->name, to->name);
@@ -960,7 +1044,7 @@ dht_migrate_file (xlator_t *this, loc_t *loc, xlator_t *from, xlator_t *to,
src_ia_prot = stbuf.ia_prot;
/* Check if file can be migrated */
- ret = __is_file_migratable (this, loc, &stbuf, xattr_rsp, flag);
+ ret = __is_file_migratable (this, loc, &stbuf, xattr_rsp, flag, defrag);
if (ret) {
if (ret == -2)
ret = 0;
@@ -1295,6 +1379,7 @@ dht_start_rebalance_task (xlator_t *this, call_frame_t *frame)
{
int ret = -1;
+ frame->root->pid = GF_CLIENT_PID_DEFRAG;
ret = synctask_new (this->ctx->env, rebalance_task,
rebalance_task_completion,
frame, frame);
@@ -1406,61 +1491,341 @@ gf_defrag_pattern_match (gf_defrag_info_t *defrag, char *name, uint64_t size)
return ret;
}
-/* We do a depth first traversal of directories. But before we move into
- * subdirs, we complete the data migration of those directories whose layouts
- * have been fixed
- */
+int dht_dfreaddirp_done (dht_dfoffset_ctx_t *offset_var, int cnt) {
+
+ int i;
+ int result = 1;
+
+ for (i = 0; i < cnt; i++) {
+ if (offset_var[i].readdir_done == 0) {
+ result = 0;
+ break;
+ }
+ }
+ return result;
+}
+
+int static
+gf_defrag_ctx_subvols_init (dht_dfoffset_ctx_t *offset_var, xlator_t *this) {
+
+ int i;
+ dht_conf_t *conf = NULL;
+
+ conf = this->private;
+
+ if (!conf)
+ return -1;
+
+ for (i = 0; i < conf->local_subvols_cnt; i++) {
+ offset_var[i].this = conf->local_subvols[i];
+ offset_var[i].offset = (off_t) 0;
+ offset_var[i].readdir_done = 0;
+ }
+
+ return 0;
+}
int
-gf_defrag_migrate_data (xlator_t *this, gf_defrag_info_t *defrag, loc_t *loc,
- dict_t *migrate_data)
+gf_defrag_migrate_single_file (void *opaque)
{
- int ret = -1;
- loc_t entry_loc = {0,};
- fd_t *fd = NULL;
- gf_dirent_t entries;
- gf_dirent_t *tmp = NULL;
+ xlator_t *this = NULL;
+ dht_conf_t *conf = NULL;
+ gf_defrag_info_t *defrag = NULL;
+ int ret = 0;
gf_dirent_t *entry = NULL;
- gf_boolean_t free_entries = _gf_false;
- off_t offset = 0;
- dict_t *dict = NULL;
+ struct timeval start = {0,};
+ loc_t entry_loc = {0,};
+ loc_t *loc = NULL;
struct iatt iatt = {0,};
+ dict_t *migrate_data = NULL;
int32_t op_errno = 0;
- char *uuid_str = NULL;
- uuid_t node_uuid = {0,};
- struct timeval dir_start = {0,};
struct timeval end = {0,};
double elapsed = {0,};
- struct timeval start = {0,};
- int loglevel = GF_LOG_TRACE;
+ struct dht_container *rebal_entry = NULL;
- gf_log (this->name, GF_LOG_INFO, "migrate data called on %s",
- loc->path);
- gettimeofday (&dir_start, NULL);
+ rebal_entry = (struct dht_container *)opaque;
+ if (!rebal_entry) {
+ gf_log (this->name, GF_LOG_ERROR, "rebal_entry is NULL");
+ ret = -1;
+ goto out;
+ }
- fd = fd_create (loc->inode, defrag->pid);
- if (!fd) {
- gf_log (this->name, GF_LOG_ERROR, "Failed to create fd");
+ this = rebal_entry->this;
+
+ conf = this->private;
+
+ defrag = conf->defrag;
+
+ loc = rebal_entry->parent_loc;
+
+ migrate_data = rebal_entry->migrate_data;
+
+ entry = rebal_entry->df_entry;
+
+ if (defrag->defrag_status != GF_DEFRAG_STATUS_STARTED) {
+ ret = -1;
goto out;
}
- ret = syncop_opendir (this, loc, fd, NULL, NULL);
+ if (defrag->stats == _gf_true) {
+ gettimeofday (&start, NULL);
+ }
+
+ if (defrag->defrag_pattern &&
+ (gf_defrag_pattern_match (defrag, entry->d_name,
+ entry->d_stat.ia_size) == _gf_false)) {
+ gf_log (this->name, GF_LOG_ERROR, "pattern_match failed");
+ goto out;
+ }
+
+ memset (&entry_loc, 0, sizeof (entry_loc));
+
+ ret = dht_build_child_loc (this, &entry_loc, loc, entry->d_name);
+ if (ret) {
+ LOCK (&defrag->lock);
+ {
+ defrag->total_failures += 1;
+ }
+ UNLOCK (&defrag->lock);
+
+ ret = 0;
+
+ gf_log (this->name, GF_LOG_ERROR, "Child loc build failed");
+
+ goto out;
+ }
+
+ gf_uuid_copy (entry_loc.gfid, entry->d_stat.ia_gfid);
+
+ gf_uuid_copy (entry_loc.pargfid, loc->gfid);
+
+ entry_loc.inode->ia_type = entry->d_stat.ia_type;
+
+ ret = syncop_lookup (this, &entry_loc, &iatt, NULL, NULL, NULL);
if (ret) {
gf_msg (this->name, GF_LOG_ERROR, 0,
- DHT_MSG_MIGRATE_DATA_FAILED,
- "Migrate data failed: Failed to open dir %s",
- loc->path);
- ret = -1;
+ DHT_MSG_MIGRATE_FILE_FAILED,
+ "Migrate file failed: %s lookup failed",
+ entry_loc.name);
+ ret = 0;
goto out;
}
- INIT_LIST_HEAD (&entries.list);
+ ret = syncop_setxattr (this, &entry_loc, migrate_data, 0, NULL, NULL);
+ if (ret < 0) {
+ op_errno = -ret;
+ /* errno is overloaded. See
+ * rebalance_task_completion () */
+ if (op_errno == ENOSPC) {
+ gf_msg_debug (this->name, 0, "migrate-data skipped for"
+ " %s due to space constraints",
+ entry_loc.path);
+ LOCK (&defrag->lock);
+ {
+ defrag->skipped += 1;
+ }
+ UNLOCK (&defrag->lock);
+ } else if (op_errno != EEXIST) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ DHT_MSG_MIGRATE_FILE_FAILED,
+ "migrate-data failed for %s", entry_loc.path);
- while ((ret = syncop_readdirp (this, fd, 131072, offset, &entries,
- NULL, NULL)) != 0) {
+ LOCK (&defrag->lock);
+ {
+ defrag->total_failures += 1;
+ }
+ UNLOCK (&defrag->lock);
- if (ret < 0) {
+ }
+
+ ret = gf_defrag_handle_migrate_error (op_errno, defrag);
+
+ if (!ret) {
+ gf_msg(this->name, GF_LOG_ERROR, 0,
+ DHT_MSG_MIGRATE_FILE_FAILED,
+ "migrate-data on %s failed: %s", entry_loc.path,
+ strerror (op_errno));
+ } else if (ret == 1) {
+ ret = 0;
+ goto out;
+ } else if (ret == -1) {
+ goto out;
+ }
+ } else if (ret > 0) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ DHT_MSG_MIGRATE_FILE_FAILED,
+ "migrate-data failed for %s", entry_loc.path);
+ ret = 0;
+ LOCK (&defrag->lock);
+ {
+ defrag->total_failures += 1;
+ }
+ UNLOCK (&defrag->lock);
+ }
+
+ LOCK (&defrag->lock);
+ {
+ defrag->total_files += 1;
+ defrag->total_data += iatt.ia_size;
+ }
+ UNLOCK (&defrag->lock);
+
+ if (defrag->stats == _gf_true) {
+ gettimeofday (&end, NULL);
+ elapsed = (end.tv_sec - start.tv_sec) * 1e6 +
+ (end.tv_usec - start.tv_usec);
+ gf_log (this->name, GF_LOG_INFO, "Migration of "
+ "file:%s size:%"PRIu64" bytes took %.2f"
+ "secs and ret: %d", entry_loc.name,
+ iatt.ia_size, elapsed/1e6, ret);
+ }
+
+out:
+ loc_wipe (&entry_loc);
+
+ return ret;
+
+}
+
+void *
+gf_defrag_task (void *opaque)
+{
+ struct list_head *q_head = NULL;
+ struct dht_container *iterator = NULL;
+ gf_defrag_info_t *defrag = NULL;
+ int ret = 0;
+ gf_boolean_t true = _gf_true;
+
+
+ defrag = (gf_defrag_info_t *)opaque;
+ if (!defrag) {
+ gf_msg ("dht", GF_LOG_ERROR, 0, 0, "defrag is NULL");
+ goto out;
+ }
+
+ q_head = &(defrag->queue[0].list);
+
+ /* The following while loop will dequeue one entry from the defrag->queue
+ under lock. We will update the defrag->global_error only when there
+ is an error which is critical to stop the rebalance process. The stop
+ message will be intimated to other migrator threads by setting the
+ defrag->defrag_status to GF_DEFRAG_STATUS_FAILED.
+
+ In defrag->queue, a low watermark (MIN_MIGRATE_QUEUE_COUNT) is
+ maintained so that crawler does not starve the file migration
+ workers and a high watermark (MAX_MIGRATE_QUEUE_COUNT) so that
+ crawler does not go far ahead in filling up the queue.
+ */
+
+ while (true) {
+
+ if (defrag->defrag_status != GF_DEFRAG_STATUS_STARTED) {
+ goto out;
+ }
+
+ pthread_mutex_lock (&defrag->dfq_mutex);
+ {
+ if (defrag->q_entry_count) {
+ iterator = list_entry (q_head->next,
+ typeof(*iterator), list);
+
+ gf_log ("DHT", GF_LOG_DEBUG, "picking entry "
+ "%s", iterator->df_entry->d_name);
+
+ list_del_init (&(iterator->list));
+
+ defrag->q_entry_count--;
+
+ if ((defrag->q_entry_count <
+ MIN_MIGRATE_QUEUE_COUNT) &&
+ defrag->wakeup_crawler) {
+ pthread_cond_broadcast (
+ &defrag->rebalance_crawler_alarm);
+ }
+ pthread_mutex_unlock (&defrag->dfq_mutex);
+ ret = gf_defrag_migrate_single_file
+ ((void *)iterator);
+
+ /*Critical errors: ENOTCONN and ENOSPACE*/
+ if (ret) {
+ dht_set_global_defrag_error
+ (defrag, ret);
+
+ defrag->defrag_status =
+ GF_DEFRAG_STATUS_FAILED;
+ goto out;
+ }
+
+ gf_dirent_free (iterator->df_entry);
+ GF_FREE (iterator);
+ continue;
+ } else {
+
+ /* defrag->crawl_done flag is set means crawling
+ file system is done and hence a list_empty when
+ the above flag is set indicates there are no more
+ entries to be added to the queue and rebalance is
+ finished */
+
+ if (!defrag->crawl_done) {
+ pthread_cond_wait (
+ &defrag->parallel_migration_cond,
+ &defrag->dfq_mutex);
+ }
+
+ if (defrag->crawl_done &&
+ !defrag->q_entry_count) {
+ pthread_cond_broadcast (
+ &defrag->parallel_migration_cond);
+ goto unlock;
+ } else {
+ pthread_mutex_unlock
+ (&defrag->dfq_mutex);
+ continue;
+ }
+ }
+
+ }
+unlock:
+ pthread_mutex_unlock (&defrag->dfq_mutex);
+ break;
+ }
+out:
+ return NULL;
+}
+
+int static
+gf_defrag_get_entry (xlator_t *this, int i, struct dht_container **container,
+ loc_t *loc, dht_conf_t *conf, gf_defrag_info_t *defrag,
+ fd_t *fd, dict_t *migrate_data,
+ struct dir_dfmeta *dir_dfmeta, dict_t *xattr_req)
+{
+ int ret = -1;
+ char is_linkfile = 0;
+ gf_dirent_t *df_entry = NULL;
+ loc_t entry_loc = {0,};
+ dict_t *xattr_rsp = NULL;
+ struct iatt iatt = {0,};
+ struct dht_container *tmp_container = NULL;
+ xlator_t *hashed_subvol = NULL;
+ xlator_t *cached_subvol = NULL;
+
+ if (dir_dfmeta->offset_var[i].readdir_done == 1) {
+ ret = 0;
+ goto out;
+ }
+ if (dir_dfmeta->fetch_entries[i] == 1) {
+ ret = syncop_readdirp (conf->local_subvols[i], fd, 131072,
+ dir_dfmeta->offset_var[i].offset,
+ &(dir_dfmeta->equeue[i]),
+ NULL, NULL);
+ if (ret == 0) {
+ dir_dfmeta->offset_var[i].readdir_done = 1;
+ ret = 0;
+ goto out;
+ }
+ if (ret < 0) {
gf_msg (this->name, GF_LOG_ERROR, 0,
DHT_MSG_MIGRATE_DATA_FAILED,
"%s: Migrate data failed: Readdir returned"
@@ -1470,213 +1835,429 @@ gf_defrag_migrate_data (xlator_t *this, gf_defrag_info_t *defrag, loc_t *loc,
goto out;
}
- if (list_empty (&entries.list))
- break;
+ if (list_empty (&(dir_dfmeta->equeue[i].list))) {
+ dir_dfmeta->offset_var[i].readdir_done = 1;
+ ret = 0;
+ goto out;
+ }
- free_entries = _gf_true;
+ dir_dfmeta->fetch_entries[i] = 0;
+ }
- list_for_each_entry_safe (entry, tmp, &entries.list, list) {
+ while (1) {
- if (dict) {
- dict_unref (dict);
- dict = NULL;
- }
+ if (defrag->defrag_status != GF_DEFRAG_STATUS_STARTED) {
+ ret = -1;
+ goto out;
+ }
- if (defrag->defrag_status != GF_DEFRAG_STATUS_STARTED) {
- ret = 1;
- goto out;
- }
+ df_entry = list_entry (dir_dfmeta->iterator[i]->next,
+ typeof (*df_entry), list);
- offset = entry->d_off;
+ if (&df_entry->list == dir_dfmeta->head[i]) {
+ gf_dirent_free (&(dir_dfmeta->equeue[i]));
+ INIT_LIST_HEAD (&(dir_dfmeta->equeue[i].list));
+ dir_dfmeta->fetch_entries[i] = 1;
+ dir_dfmeta->iterator[i] = dir_dfmeta->head[i];
+ ret = 0;
+ goto out;
+ }
- if (!strcmp (entry->d_name, ".") ||
- !strcmp (entry->d_name, ".."))
- continue;
+ dir_dfmeta->iterator[i] = dir_dfmeta->iterator[i]->next;
- if (IA_ISDIR (entry->d_stat.ia_type))
- continue;
+ dir_dfmeta->offset_var[i].offset = df_entry->d_off;
+ if (!strcmp (df_entry->d_name, ".") ||
+ !strcmp (df_entry->d_name, ".."))
+ continue;
- defrag->num_files_lookedup++;
- if (defrag->stats == _gf_true) {
- gettimeofday (&start, NULL);
- }
+ if (IA_ISDIR (df_entry->d_stat.ia_type))
+ continue;
- if (defrag->defrag_pattern &&
- (gf_defrag_pattern_match (defrag, entry->d_name,
- entry->d_stat.ia_size)
- == _gf_false)) {
- continue;
- }
+ defrag->num_files_lookedup++;
- loc_wipe (&entry_loc);
- ret =dht_build_child_loc (this, &entry_loc, loc,
- entry->d_name);
- if (ret) {
- gf_log (this->name, GF_LOG_ERROR, "Child loc"
- " build failed");
- goto out;
- }
+ if (defrag->defrag_pattern &&
+ (gf_defrag_pattern_match (defrag, df_entry->d_name,
+ df_entry->d_stat.ia_size)
+ == _gf_false)) {
+ continue;
+ }
- if (gf_uuid_is_null (entry->d_stat.ia_gfid)) {
- gf_msg (this->name, GF_LOG_ERROR, 0,
- DHT_MSG_GFID_NULL,
- "%s/%s gfid not present", loc->path,
- entry->d_name);
- continue;
- }
+ loc_wipe (&entry_loc);
+ ret = dht_build_child_loc (this, &entry_loc, loc,
+ df_entry->d_name);
+ if (ret) {
+ gf_log (this->name, GF_LOG_ERROR, "Child loc"
+ " build failed");
+ ret = -1;
+ goto out;
+ }
- gf_uuid_copy (entry_loc.gfid, entry->d_stat.ia_gfid);
+ if (gf_uuid_is_null (df_entry->d_stat.ia_gfid)) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ DHT_MSG_GFID_NULL,
+ "%s/%s gfid not present", loc->path,
+ df_entry->d_name);
+ continue;
+ }
- if (gf_uuid_is_null (loc->gfid)) {
- gf_msg (this->name, GF_LOG_ERROR, 0,
- DHT_MSG_GFID_NULL,
- "%s/%s gfid not present", loc->path,
- entry->d_name);
- continue;
- }
+ gf_uuid_copy (entry_loc.gfid, df_entry->d_stat.ia_gfid);
- gf_uuid_copy (entry_loc.pargfid, loc->gfid);
+ if (gf_uuid_is_null (loc->gfid)) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ DHT_MSG_GFID_NULL,
+ "%s/%s gfid not present", loc->path,
+ df_entry->d_name);
+ continue;
+ }
- entry_loc.inode->ia_type = entry->d_stat.ia_type;
+ gf_uuid_copy (entry_loc.pargfid, loc->gfid);
- ret = syncop_lookup (this, &entry_loc, &iatt, NULL,
- NULL, NULL);
- if (ret) {
- gf_msg (this->name, GF_LOG_ERROR, 0,
- DHT_MSG_MIGRATE_FILE_FAILED,
- "Migrate file failed:%s lookup failed",
- entry_loc.path);
- ret = -1;
- continue;
- }
+ entry_loc.inode->ia_type = df_entry->d_stat.ia_type;
+ ret = syncop_lookup (conf->local_subvols[i], &entry_loc,
+ &iatt, NULL, xattr_req, &xattr_rsp);
+ if (ret) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ DHT_MSG_MIGRATE_FILE_FAILED,
+ "Migrate file failed:%s lookup failed",
+ entry_loc.path);
+ continue;
+ }
- ret = syncop_getxattr (this, &entry_loc, &dict,
- GF_XATTR_NODE_UUID_KEY, NULL,
- NULL);
- if(ret < 0) {
- gf_msg (this->name, GF_LOG_ERROR, 0,
- DHT_MSG_MIGRATE_FILE_FAILED,
- "Migrate file failed:"
- "Failed to get node-uuid for %s",
- entry_loc.path);
- ret = -1;
- continue;
- }
- ret = dict_get_str (dict, GF_XATTR_NODE_UUID_KEY,
- &uuid_str);
- if(ret < 0) {
- gf_log (this->name, GF_LOG_ERROR, "Failed to "
- "get node-uuid from dict for %s",
- entry_loc.path);
- ret = -1;
- continue;
- }
+ is_linkfile = check_is_linkfile (NULL, &iatt, xattr_rsp,
+ conf->link_xattr_name);
- if (gf_uuid_parse (uuid_str, node_uuid)) {
- gf_log (this->name, GF_LOG_ERROR, "gf_uuid_parse "
- "failed for %s", entry_loc.path);
- continue;
- }
+ if (is_linkfile) {
+ /* No need to add linkto file to the queue for
+ migration. Only the actual data file need to
+ be checked for migration criteria.
+ */
+ gf_log (this->name, GF_LOG_INFO, "linkfile."
+ " Hence skip for file: %s", entry_loc.path);
+ continue;
+ }
- /* if file belongs to different node, skip migration
- * the other node will take responsibility of migration
- */
- if (gf_uuid_compare (node_uuid, defrag->node_uuid)) {
- gf_msg_trace (this->name, 0, "%s does not"
- "belong to this node",
- entry_loc.path);
- continue;
- }
- uuid_str = NULL;
+ ret = syncop_lookup (this, &entry_loc, NULL, NULL,
+ NULL, NULL);
+ if (ret) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ DHT_MSG_MIGRATE_FILE_FAILED,
+ "Migrate file failed:%s lookup failed",
+ entry_loc.path);
+ continue;
+ }
- /* if distribute is present, it will honor this key.
- * -1, ENODATA is returned if distribute is not present
- * or file doesn't have a link-file. If file has
- * link-file, the path of link-file will be the value,
- * and also that guarantees that file has to be mostly
- * migrated */
+ /* if distribute is present, it will honor this key.
+ * -1, ENODATA is returned if distribute is not present
+ * or file doesn't have a link-file. If file has
+ * link-file, the path of link-file will be the value,
+ * and also that guarantees that file has to be mostly
+ * migrated */
- ret = syncop_getxattr (this, &entry_loc, NULL,
- GF_XATTR_LINKINFO_KEY, NULL,
- NULL);
- if (ret < 0) {
- if (-ret != ENODATA) {
- loglevel = GF_LOG_ERROR;
- defrag->total_failures += 1;
- } else {
- loglevel = GF_LOG_TRACE;
- }
- gf_log (this->name, loglevel, "%s: failed to "
- "get "GF_XATTR_LINKINFO_KEY" key - %s",
- entry_loc.path, strerror (-ret));
- ret = -1;
- continue;
+ hashed_subvol = dht_subvol_get_hashed (this, &entry_loc);
+ if (!hashed_subvol) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ DHT_MSG_HASHED_SUBVOL_GET_FAILED,
+ "Failed to get hashed subvol for %s",
+ loc->path);
+ continue;
+ }
+
+ cached_subvol = dht_subvol_get_cached (this, entry_loc.inode);
+ if (!cached_subvol) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ DHT_MSG_CACHED_SUBVOL_GET_FAILED,
+ "Failed to get cached subvol for %s",
+ loc->path);
+
+ continue;
+ }
+
+ if (hashed_subvol == cached_subvol) {
+ continue;
+ }
+
+ /*Build Container Structure */
+
+ tmp_container = GF_CALLOC (1, sizeof(struct dht_container),
+ gf_dht_mt_container_t);
+ if (!tmp_container) {
+ gf_log (this->name, GF_LOG_ERROR, "Failed to allocate "
+ "memory for container");
+ ret = -1;
+ goto out;
+ }
+ tmp_container->df_entry = gf_dirent_for_name (df_entry->d_name);
+ if (!tmp_container->df_entry) {
+ gf_log (this->name, GF_LOG_ERROR, "Failed to allocate "
+ "memory for df_entry");
+ ret = -1;
+ goto out;
+ }
+
+ tmp_container->df_entry->d_stat = df_entry->d_stat;
+
+ tmp_container->df_entry->d_ino = df_entry->d_ino;
+
+ tmp_container->df_entry->d_type = df_entry->d_type;
+
+ tmp_container->df_entry->d_len = df_entry->d_len;
+
+ tmp_container->parent_loc = GF_CALLOC(1, sizeof(*loc),
+ gf_dht_mt_loc_t);
+ if (!tmp_container->parent_loc) {
+ gf_log (this->name, GF_LOG_ERROR, "Failed to allocate "
+ "memory for loc");
+ ret = -1;
+ goto out;
+ }
+
+
+ ret = loc_copy (tmp_container->parent_loc, loc);
+ if (ret) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "loc_copy failed");
+ ret = -1;
+ goto out;
+ }
+
+ tmp_container->migrate_data = migrate_data;
+
+ tmp_container->this = this;
+
+ if (df_entry->dict)
+ tmp_container->df_entry->dict =
+ dict_ref (df_entry->dict);
+
+ /*Build Container Structue >> END*/
+
+ ret = 0;
+ goto out;
+
+ }
+
+out:
+ if (ret == 0) {
+ *container = tmp_container;
+ } else {
+ if (tmp_container) {
+ GF_FREE (tmp_container->df_entry);
+ GF_FREE (tmp_container->parent_loc);
+ GF_FREE (tmp_container);
+ }
+ }
+
+ if (xattr_rsp)
+ dict_unref (xattr_rsp);
+ return ret;
+}
+
+int
+gf_defrag_process_dir (xlator_t *this, gf_defrag_info_t *defrag, loc_t *loc,
+ dict_t *migrate_data)
+{
+ int ret = -1;
+ fd_t *fd = NULL;
+ dht_conf_t *conf = NULL;
+ gf_dirent_t entries;
+ dict_t *dict = NULL;
+ dict_t *xattr_req = NULL;
+ struct timeval dir_start = {0,};
+ struct timeval end = {0,};
+ double elapsed = {0,};
+ int local_subvols_cnt = 0;
+ int i = 0;
+ struct dht_container *container = NULL;
+ int ldfq_count = 0;
+ int dfc_index = 0;
+ struct dir_dfmeta *dir_dfmeta = NULL;
+
+ gf_log (this->name, GF_LOG_INFO, "migrate data called on %s",
+ loc->path);
+ gettimeofday (&dir_start, NULL);
+
+ conf = this->private;
+ local_subvols_cnt = conf->local_subvols_cnt;
+
+ if (!local_subvols_cnt) {
+ ret = 0;
+ goto out;
+ }
+
+ fd = fd_create (loc->inode, defrag->pid);
+ if (!fd) {
+ gf_log (this->name, GF_LOG_ERROR, "Failed to create fd");
+ ret = -1;
+ goto out;
+ }
+
+ ret = syncop_opendir (this, loc, fd, NULL, NULL);
+ if (ret) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ DHT_MSG_MIGRATE_DATA_FAILED,
+ "Migrate data failed: Failed to open dir %s",
+ loc->path);
+ ret = -1;
+ goto out;
+ }
+
+ dir_dfmeta = GF_CALLOC (1, sizeof (*dir_dfmeta),
+ gf_common_mt_pointer);
+ if (!dir_dfmeta) {
+ gf_log (this->name, GF_LOG_ERROR, "dir_dfmeta is NULL");
+ ret = -1;
+ goto out;
+ }
+
+
+ dir_dfmeta->head = GF_CALLOC (local_subvols_cnt,
+ sizeof (*(dir_dfmeta->head)),
+ gf_common_mt_pointer);
+ if (!dir_dfmeta->head) {
+ gf_log (this->name, GF_LOG_ERROR, "dir_dfmeta->head is NULL");
+ ret = -1;
+ goto out;
+ }
+
+ dir_dfmeta->iterator = GF_CALLOC (local_subvols_cnt,
+ sizeof (*(dir_dfmeta->iterator)),
+ gf_common_mt_pointer);
+ if (!dir_dfmeta->iterator) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "dir_dfmeta->iterator is NULL");
+ ret = -1;
+ goto out;
+ }
+
+ dir_dfmeta->equeue = GF_CALLOC (local_subvols_cnt, sizeof (entries),
+ gf_dht_mt_dirent_t);
+ if (!dir_dfmeta->equeue) {
+ gf_log (this->name, GF_LOG_ERROR, "dir_dfmeta->equeue is NULL");
+ ret = -1;
+ goto out;
+ }
+
+ dir_dfmeta->offset_var = GF_CALLOC (local_subvols_cnt,
+ sizeof (dht_dfoffset_ctx_t),
+ gf_dht_mt_octx_t);
+ if (!dir_dfmeta->offset_var) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "dir_dfmeta->offset_var is NULL");
+ ret = -1;
+ goto out;
+ }
+ ret = gf_defrag_ctx_subvols_init (dir_dfmeta->offset_var, this);
+ if (ret) {
+ gf_log (this->name, GF_LOG_ERROR, "dht_dfoffset_ctx_t"
+ "initialization failed");
+ ret = -1;
+ goto out;
+ }
+
+ dir_dfmeta->fetch_entries = GF_CALLOC (local_subvols_cnt,
+ sizeof (int), gf_common_mt_int);
+ if (!dir_dfmeta->fetch_entries) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "dir_dfmeta->fetch_entries is NULL");
+ ret = -1;
+ goto out;
+ }
+
+ for (i = 0; i < local_subvols_cnt ; i++) {
+ INIT_LIST_HEAD (&(dir_dfmeta->equeue[i].list));
+ dir_dfmeta->head[i] = &(dir_dfmeta->equeue[i].list);
+ dir_dfmeta->iterator[i] = dir_dfmeta->head[i];
+ dir_dfmeta->fetch_entries[i] = 1;
+ }
+
+ xattr_req = dict_new ();
+ if (!xattr_req) {
+ gf_log (this->name, GF_LOG_ERROR, "dict_new failed");
+ ret = -1;
+ goto out;
+ }
+
+ ret = dict_set_uint32 (xattr_req,
+ conf->link_xattr_name, 256);
+ if (ret) {
+ gf_log (this->name, GF_LOG_ERROR, "failed to set dict for "
+ "key: %s", conf->link_xattr_name);
+ ret = -1;
+ goto out;
+ }
+
+ /*
+ Job: Read entries from each local subvol and store the entries
+ in equeue array of linked list. Now pick one entry from the
+ equeue array in a round robin basis and add them to defrag Queue.
+ */
+
+ while (!dht_dfreaddirp_done(dir_dfmeta->offset_var,
+ local_subvols_cnt)) {
+
+ pthread_mutex_lock (&defrag->dfq_mutex);
+ {
+ while (defrag->q_entry_count >
+ MAX_MIGRATE_QUEUE_COUNT) {
+ defrag->wakeup_crawler = 1;
+ pthread_cond_wait (
+ &defrag->rebalance_crawler_alarm,
+ &defrag->dfq_mutex);
}
- ret = syncop_setxattr (this, &entry_loc, migrate_data,
- 0, NULL, NULL);
- if (ret < 0) {
- op_errno = -ret;
- /* errno is overloaded. See
- * rebalance_task_completion () */
- if (op_errno == ENOSPC) {
- gf_msg_debug (this->name, 0,
- "migrate-data skipped for"
- " %s due to space "
- "constraints",
- entry_loc.path);
- defrag->skipped +=1;
- } else{
- gf_msg (this->name, GF_LOG_ERROR, 0,
- DHT_MSG_MIGRATE_FILE_FAILED,
- "migrate-data failed for %s",
- entry_loc.path);
- defrag->total_failures +=1;
- }
+ ldfq_count = defrag->q_entry_count;
- ret = gf_defrag_handle_migrate_error (op_errno,
- defrag);
+ if (defrag->wakeup_crawler) {
+ defrag->wakeup_crawler = 0;
+ }
- if (!ret)
- gf_msg_debug (this->name, 0,
- "migrate-data on %s "
- "failed: %s",
- entry_loc.path,
- strerror (op_errno));
- else if (ret == 1)
- continue;
- else if (ret == -1)
- goto out;
- } else if (ret > 0) {
- gf_msg (this->name, GF_LOG_ERROR, 0,
- DHT_MSG_MIGRATE_FILE_FAILED,
- "migrate-data failed for %s",
- entry_loc.path);
- defrag->total_failures +=1;
+ }
+ pthread_mutex_unlock (&defrag->dfq_mutex);
+
+ while (ldfq_count <= MAX_MIGRATE_QUEUE_COUNT &&
+ !dht_dfreaddirp_done(dir_dfmeta->offset_var,
+ local_subvols_cnt)) {
+
+ ret = gf_defrag_get_entry (this, dfc_index, &container,
+ loc, conf, defrag, fd,
+ migrate_data, dir_dfmeta,
+ xattr_req);
+ if (ret) {
+ gf_log ("DHT", GF_LOG_INFO, "Found critical "
+ "error from gf_defrag_get_entry");
+ ret = -1;
+ goto out;
}
- LOCK (&defrag->lock);
- {
- defrag->total_files += 1;
- defrag->total_data += iatt.ia_size;
+ /* Check if we got an entry, else we need to move the
+ index to the next subvol */
+ if (!container) {
+ GF_CRAWL_INDEX_MOVE(dfc_index,
+ local_subvols_cnt);
+ continue;
}
- UNLOCK (&defrag->lock);
- if (defrag->stats == _gf_true) {
- gettimeofday (&end, NULL);
- elapsed = (end.tv_sec - start.tv_sec) * 1e6 +
- (end.tv_usec - start.tv_usec);
- gf_log (this->name, GF_LOG_INFO, "Migration of "
- "file:%s size:%"PRIu64" bytes took %.2f"
- "secs", entry_loc.path, iatt.ia_size,
- elapsed/1e6);
+
+ /* Q this entry in the dfq */
+ pthread_mutex_lock (&defrag->dfq_mutex);
+ {
+ list_add_tail (&container->list,
+ &(defrag->queue[0].list));
+ defrag->q_entry_count++;
+ ldfq_count = defrag->q_entry_count;
+
+ gf_log (this->name, GF_LOG_DEBUG, "added "
+ "file:%s parent:%s to the queue ",
+ container->df_entry->d_name,
+ container->parent_loc->path);
+
+ pthread_cond_signal (
+ &defrag->parallel_migration_cond);
}
- }
+ pthread_mutex_unlock (&defrag->dfq_mutex);
- gf_dirent_free (&entries);
- free_entries = _gf_false;
- INIT_LIST_HEAD (&entries.list);
+ GF_CRAWL_INDEX_MOVE(dfc_index, local_subvols_cnt);
+ }
}
gettimeofday (&end, NULL);
@@ -1686,20 +2267,20 @@ gf_defrag_migrate_data (xlator_t *this, gf_defrag_info_t *defrag, loc_t *loc,
"%.2f secs", loc->path, elapsed/1e6);
ret = 0;
out:
- if (free_entries)
- gf_dirent_free (&entries);
- loc_wipe (&entry_loc);
+ GF_FREE_DIR_DFMETA (dir_dfmeta);
if (dict)
dict_unref(dict);
+ if (xattr_req)
+ dict_unref(xattr_req);
+
if (fd)
fd_unref (fd);
return ret;
}
-
int
gf_defrag_fix_layout (xlator_t *this, gf_defrag_info_t *defrag, loc_t *loc,
dict_t *fix_layout, dict_t *migrate_data)
@@ -1725,7 +2306,7 @@ gf_defrag_fix_layout (xlator_t *this, gf_defrag_info_t *defrag, loc_t *loc,
if ((defrag->cmd != GF_DEFRAG_CMD_START_TIER) &&
(defrag->cmd != GF_DEFRAG_CMD_START_LAYOUT_FIX)) {
- ret = gf_defrag_migrate_data (this, defrag, loc, migrate_data);
+ ret = gf_defrag_process_dir (this, defrag, loc, migrate_data);
if (ret)
goto out;
}
@@ -1877,34 +2458,39 @@ out:
int
gf_defrag_start_crawl (void *data)
{
- xlator_t *this = NULL;
- dht_conf_t *conf = NULL;
- gf_defrag_info_t *defrag = NULL;
- int ret = -1;
- loc_t loc = {0,};
- struct iatt iatt = {0,};
- struct iatt parent = {0,};
- dict_t *fix_layout = NULL;
- dict_t *migrate_data = NULL;
- dict_t *status = NULL;
- glusterfs_ctx_t *ctx = NULL;
- dht_methods_t *methods = NULL;
+ xlator_t *this = NULL;
+ dht_conf_t *conf = NULL;
+ gf_defrag_info_t *defrag = NULL;
+ int ret = -1;
+ loc_t loc = {0,};
+ struct iatt iatt = {0,};
+ struct iatt parent = {0,};
+ dict_t *fix_layout = NULL;
+ dict_t *migrate_data = NULL;
+ dict_t *status = NULL;
+ dict_t *dict = NULL;
+ glusterfs_ctx_t *ctx = NULL;
+ dht_methods_t *methods = NULL;
+ int i = 0;
+ int thread_index = 0;
+ int err = 0;
+ pthread_t tid[MAX_MIGRATOR_THREAD_COUNT];
this = data;
if (!this)
- goto out;
+ goto exit;
ctx = this->ctx;
if (!ctx)
- goto out;
+ goto exit;
conf = this->private;
if (!conf)
- goto out;
+ goto exit;
defrag = conf->defrag;
if (!defrag)
- goto out;
+ goto exit;
gettimeofday (&defrag->start_time, NULL);
dht_build_root_inode (this, &defrag->root_inode);
@@ -1938,7 +2524,7 @@ gf_defrag_start_crawl (void *data)
"Failed to start rebalance:"
"Failed to set dictionary value: key = %s",
GF_XATTR_FIX_LAYOUT_KEY);
-
+ ret = -1;
goto out;
}
@@ -1970,6 +2556,53 @@ gf_defrag_start_crawl (void *data)
"non-force");
if (ret)
goto out;
+
+ /* Find local subvolumes */
+ ret = syncop_getxattr (this, &loc, &dict,
+ GF_REBAL_FIND_LOCAL_SUBVOL,
+ NULL, NULL);
+ if (ret) {
+ gf_msg (this->name, GF_LOG_ERROR, 0, 0, "local "
+ "subvolume determination failed with error: %d",
+ -ret);
+ ret = -1;
+ goto out;
+ }
+
+ for (i = 0 ; i < conf->local_subvols_cnt; i++) {
+ gf_msg (this->name, GF_LOG_INFO, 0, 0, "local subvols "
+ "are %s", conf->local_subvols[i]->name);
+ }
+
+ /* Initialize global entry queue */
+ defrag->queue = GF_CALLOC (1, sizeof (struct dht_container),
+ gf_dht_mt_container_t);
+
+ if (!defrag->queue) {
+ gf_log (this->name, GF_LOG_INFO, "No memory for queue");
+ ret = -1;
+ goto out;
+ }
+
+ INIT_LIST_HEAD (&(defrag->queue[0].list));
+
+ /*Spawn Threads Here*/
+ while (thread_index < MAX_MIGRATOR_THREAD_COUNT) {
+ err = pthread_create(&(tid[thread_index]), NULL,
+ &gf_defrag_task, (void *)defrag);
+ if (err != 0) {
+ gf_log ("DHT", GF_LOG_ERROR,
+ "Thread[%d] creation failed. "
+ "Aborting Rebalance",
+ thread_index);
+ ret = -1;
+ goto out;
+ } else {
+ gf_log ("DHT", GF_LOG_INFO, "Thread[%d] "
+ "creation successful", thread_index);
+ }
+ thread_index++;
+ }
}
ret = gf_defrag_fix_layout (this, defrag, &loc, fix_layout,
@@ -1987,13 +2620,40 @@ gf_defrag_start_crawl (void *data)
}
methods->migration_other(this, defrag);
}
+ gf_log ("DHT", GF_LOG_INFO, "crawling file-system completed");
+out:
+ /* We are here means crawling the entire file system is done
+ or something failed. Set defrag->crawl_done flag to intimate
+ the migrator threads to exhaust the defrag->queue and terminate*/
+
+ if (ret) {
+ defrag->defrag_status = GF_DEFRAG_STATUS_FAILED;
+ }
+
+ pthread_mutex_lock (&defrag->dfq_mutex);
+ {
+ defrag->crawl_done = 1;
+
+ pthread_cond_broadcast (
+ &defrag->parallel_migration_cond);
+ }
+ pthread_mutex_unlock (&defrag->dfq_mutex);
+
+ /*Wait for all the threads to complete their task*/
+ for (i = 0; i < thread_index; i++) {
+ pthread_join (tid[i], NULL);
+ }
+
+ if (defrag->queue) {
+ gf_dirent_free (defrag->queue[0].df_entry);
+ INIT_LIST_HEAD (&(defrag->queue[0].list));
+ }
if ((defrag->defrag_status != GF_DEFRAG_STATUS_STOPPED) &&
(defrag->defrag_status != GF_DEFRAG_STATUS_FAILED)) {
defrag->defrag_status = GF_DEFRAG_STATUS_COMPLETE;
}
-out:
LOCK (&defrag->lock);
{
status = dict_new ();
@@ -2006,11 +2666,14 @@ out:
}
UNLOCK (&defrag->lock);
- if (defrag) {
- GF_FREE (defrag);
- conf->defrag = NULL;
- }
+ GF_FREE (defrag->queue);
+ GF_FREE (defrag);
+ conf->defrag = NULL;
+
+ if (dict)
+ dict_unref(dict);
+exit:
return ret;
}
@@ -2033,6 +2696,7 @@ gf_defrag_start (void *data)
dht_conf_t *conf = NULL;
gf_defrag_info_t *defrag = NULL;
xlator_t *this = NULL;
+ xlator_t *old_THIS = NULL;
this = data;
conf = this->private;
@@ -2058,6 +2722,8 @@ gf_defrag_start (void *data)
defrag->defrag_status = GF_DEFRAG_STATUS_STARTED;
+ old_THIS = THIS;
+ THIS = this;
ret = synctask_new (this->ctx->env, gf_defrag_start_crawl,
gf_defrag_done, frame, this);
@@ -2065,6 +2731,7 @@ gf_defrag_start (void *data)
gf_msg (this->name, GF_LOG_ERROR, 0,
DHT_MSG_REBALANCE_START_FAILED,
"Could not create task for rebalance");
+ THIS = old_THIS;
out:
return NULL;
}
diff --git a/xlators/cluster/dht/src/dht-shared.c b/xlators/cluster/dht/src/dht-shared.c
index fc281b80287..3eccff925fb 100644
--- a/xlators/cluster/dht/src/dht-shared.c
+++ b/xlators/cluster/dht/src/dht-shared.c
@@ -589,6 +589,23 @@ dht_init (xlator_t *this)
defrag->cmd = cmd;
defrag->stats = _gf_false;
+
+ defrag->queue = NULL;
+
+ defrag->crawl_done = 0;
+
+ defrag->global_error = 0;
+
+ defrag->q_entry_count = 0;
+
+ defrag->wakeup_crawler = 0;
+
+ synclock_init (&defrag->link_lock, SYNC_LOCK_DEFAULT);
+ pthread_mutex_init (&defrag->dfq_mutex, 0);
+ pthread_cond_init (&defrag->parallel_migration_cond, 0);
+ pthread_cond_init (&defrag->rebalance_crawler_alarm, 0);
+ defrag->global_error = 0;
+
}
conf->search_unhashed = GF_DHT_LOOKUP_UNHASHED_ON;
@@ -651,6 +668,15 @@ dht_init (xlator_t *this)
goto err;
}
+ if (cmd) {
+ ret = dht_init_local_subvolumes (this, conf);
+ if (ret) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "dht_init_local_subvolumes failed");
+ goto err;
+ }
+ }
+
if (dict_get_str (this->options, "decommissioned-bricks", &temp_str) == 0) {
ret = dht_parse_decommissioned_bricks (this, conf, temp_str);
if (ret == -1)