summaryrefslogtreecommitdiffstats
path: root/xlators/cluster/dht/src
diff options
context:
space:
mode:
authorN Balachandran <nbalacha@redhat.com>2018-06-06 12:57:50 +0530
committerN Balachandran <nbalacha@redhat.com>2018-06-13 03:49:00 +0000
commit5702ff3012f6b97f6b497b5c2e89e8700caf8bc1 (patch)
tree2a123da9fd17402728dd49a6e5972241e3ea7c5f /xlators/cluster/dht/src
parent9647f0c64bfb0af2e4354998d1be672f7101a358 (diff)
cluster/dht: Refactor rebalance code
Created init and cleanup functions for certain functionality in order to improve readability. Removed unused code. Change-Id: Ia6a2f4ab64923b6ea8e10487227fb5621eec1488 updates: bz#1586363 Signed-off-by: N Balachandran <nbalacha@redhat.com>
Diffstat (limited to 'xlators/cluster/dht/src')
-rw-r--r--xlators/cluster/dht/src/dht-rebalance.c562
1 files changed, 253 insertions, 309 deletions
diff --git a/xlators/cluster/dht/src/dht-rebalance.c b/xlators/cluster/dht/src/dht-rebalance.c
index 0f836522499..3b986be97e0 100644
--- a/xlators/cluster/dht/src/dht-rebalance.c
+++ b/xlators/cluster/dht/src/dht-rebalance.c
@@ -4369,26 +4369,74 @@ gf_tier_wait_fix_lookup (gf_defrag_info_t *defrag) {
}
/******************Tier background Fix layout functions END********************/
-
-uint64_t
-gf_defrag_subvol_file_size (xlator_t *this, loc_t *root_loc)
+int
+dht_init_local_subvols_and_nodeuuids (xlator_t *this, dht_conf_t *conf,
+ loc_t *loc)
{
- int ret = -1;
- struct statvfs buf = {0,};
- if (!this)
- return 0;
+ dict_t *dict = NULL;
+ gf_defrag_info_t *defrag = NULL;
+ uuid_t *uuid_ptr = NULL;
+ int ret = -1;
+ int i = 0;
+ int j = 0;
- ret = syncop_statfs (this, root_loc, &buf, NULL, NULL);
+ defrag = conf->defrag;
+
+ if (defrag->cmd != GF_DEFRAG_CMD_START_TIER) {
+ /* Find local subvolumes */
+ ret = syncop_getxattr (this, loc, &dict,
+ GF_REBAL_FIND_LOCAL_SUBVOL,
+ NULL, NULL);
+ if (ret && (ret != -ENODATA)) {
+ gf_msg (this->name, GF_LOG_ERROR, -ret, 0, "local "
+ "subvolume determination failed with error: %d",
+ -ret);
+ ret = -1;
+ goto out;
+ }
+
+ if (!ret)
+ goto out;
+ }
+
+ ret = syncop_getxattr (this, loc, &dict,
+ GF_REBAL_OLD_FIND_LOCAL_SUBVOL,
+ NULL, NULL);
if (ret) {
- /* Aargh! */
- return 0;
+ gf_msg (this->name, GF_LOG_ERROR, -ret, 0, "local "
+ "subvolume determination failed with error: %d",
+ -ret);
+ ret = -1;
+ goto out;
}
- return ((buf.f_blocks - buf.f_bfree) * buf.f_frsize);
+ ret = 0;
+
+out:
+ if (ret) {
+ return ret;
+ }
+
+ for (i = 0 ; i < conf->local_subvols_cnt; i++) {
+ gf_msg (this->name, GF_LOG_INFO, 0, 0, "local subvol: "
+ "%s", conf->local_subvols[i]->name);
+
+ for (j = 0; j < conf->local_nodeuuids[i].count; j++) {
+ uuid_ptr = &(conf->local_nodeuuids[i].elements[j].uuid);
+ gf_msg (this->name, GF_LOG_INFO, 0, 0,
+ "node uuid : %s",
+ uuid_utoa(*uuid_ptr));
+ }
+ }
+
+ return ret;
}
+
+/* Functions for the rebalance estimates feature */
+
uint64_t
-gf_defrag_subvol_file_cnt (xlator_t *this, loc_t *root_loc)
+gf_defrag_subvol_file_size (xlator_t *this, loc_t *root_loc)
{
int ret = -1;
struct statvfs buf = {0,};
@@ -4401,10 +4449,9 @@ gf_defrag_subvol_file_cnt (xlator_t *this, loc_t *root_loc)
/* Aargh! */
return 0;
}
- return (buf.f_files - buf.f_ffree);
+ return ((buf.f_blocks - buf.f_bfree) * buf.f_frsize);
}
-
uint64_t
gf_defrag_total_file_size (xlator_t *this, loc_t *root_loc)
{
@@ -4420,7 +4467,7 @@ gf_defrag_total_file_size (xlator_t *this, loc_t *root_loc)
for (i = 0 ; i < conf->local_subvols_cnt; i++) {
size_files = gf_defrag_subvol_file_size (conf->local_subvols[i],
- root_loc);
+ root_loc);
total_size += size_files;
gf_msg (this->name, GF_LOG_INFO, 0, 0, "local subvol: %s,"
"cnt = %"PRIu64, conf->local_subvols[i]->name,
@@ -4434,88 +4481,6 @@ gf_defrag_total_file_size (xlator_t *this, loc_t *root_loc)
}
-uint64_t
-gf_defrag_total_file_cnt (xlator_t *this, loc_t *root_loc)
-{
- dht_conf_t *conf = NULL;
- int i = 0;
- uint64_t num_files = 0;
- uint64_t total_entries = 0;
-
- conf = this->private;
- if (!conf) {
- return 0;
- }
-
- for (i = 0 ; i < conf->local_subvols_cnt; i++) {
- num_files = gf_defrag_subvol_file_cnt (conf->local_subvols[i],
- root_loc);
- total_entries += num_files;
- gf_msg (this->name, GF_LOG_INFO, 0, 0, "local subvol: %s,"
- "cnt = %"PRIu64, conf->local_subvols[i]->name,
- num_files);
- }
-
- /* FIXFIXFIX: halve the number of files to negate .glusterfs contents
- We need a better way to figure this out */
-
- total_entries = total_entries/2;
- if (total_entries > 20000)
- total_entries += 10000;
-
- gf_msg (this->name, GF_LOG_INFO, 0, 0,
- "Total number of files = %"PRIu64, total_entries);
-
- return total_entries;
-}
-
-
-
-int
-dht_get_local_subvols_and_nodeuuids (xlator_t *this, dht_conf_t *conf,
- loc_t *loc)
-{
-
- dict_t *dict = NULL;
- gf_defrag_info_t *defrag = NULL;
- int ret = -1;
-
- defrag = conf->defrag;
-
- if (defrag->cmd != GF_DEFRAG_CMD_START_TIER) {
- /* Find local subvolumes */
- ret = syncop_getxattr (this, loc, &dict,
- GF_REBAL_FIND_LOCAL_SUBVOL,
- NULL, NULL);
- if (ret && (ret != -ENODATA)) {
-
- gf_msg (this->name, GF_LOG_ERROR, -ret, 0, "local "
- "subvolume determination failed with error: %d",
- -ret);
- ret = -1;
- goto out;
- }
-
- if (!ret)
- goto out;
- }
-
- ret = syncop_getxattr (this, loc, &dict,
- GF_REBAL_OLD_FIND_LOCAL_SUBVOL,
- NULL, NULL);
- if (ret) {
-
- gf_msg (this->name, GF_LOG_ERROR, -ret, 0, "local "
- "subvolume determination failed with error: %d",
- -ret);
- ret = -1;
- goto out;
- }
- ret = 0;
-out:
- return ret;
-}
-
static void*
dht_file_counter_thread (void *args)
{
@@ -4572,6 +4537,176 @@ dht_file_counter_thread (void *args)
return NULL;
}
+int
+gf_defrag_estimates_cleanup (xlator_t *this, gf_defrag_info_t *defrag,
+ pthread_t filecnt_thread)
+{
+ int ret = -1;
+
+ /* Wake up the filecounter thread.
+ * By now the defrag status will no longer be
+ * GF_DEFRAG_STATUS_STARTED so the thread will exit the loop.
+ */
+ pthread_mutex_lock (&defrag->fc_mutex);
+ {
+ pthread_cond_broadcast (&defrag->fc_wakeup_cond);
+ }
+ pthread_mutex_unlock (&defrag->fc_mutex);
+
+ ret = pthread_join (filecnt_thread, NULL);
+ if (ret) {
+ gf_msg ("dht", GF_LOG_ERROR, ret, 0,
+ "file_counter_thread: pthread_join failed.");
+ ret = -1;
+ }
+ return ret;
+}
+
+
+int
+gf_defrag_estimates_init (xlator_t *this, loc_t *loc,
+ pthread_t *filecnt_thread)
+{
+ int ret = -1;
+ dht_conf_t *conf = NULL;
+ gf_defrag_info_t *defrag = NULL;
+
+ conf = this->private;
+ defrag = conf->defrag;
+
+ g_totalsize = gf_defrag_total_file_size (this, loc);
+ if (!g_totalsize) {
+ gf_msg (this->name, GF_LOG_ERROR, 0, 0, "Failed to get "
+ "the total data size. Unable to estimate "
+ "time to complete rebalance.");
+ goto out;
+ }
+
+ ret = gf_thread_create (filecnt_thread, NULL,
+ &dht_file_counter_thread,
+ (void *)defrag, "dhtfcnt");
+
+ if (ret) {
+ gf_msg (this->name, GF_LOG_ERROR, ret, 0, "Failed to "
+ "create the file counter thread ");
+ ret = -1;
+ }
+ ret = 0;
+out:
+ return ret;
+}
+
+
+/* Init and cleanup functions for parallel file migration*/
+int
+gf_defrag_parallel_migration_init (xlator_t *this, gf_defrag_info_t *defrag,
+ pthread_t **tid_array, int *thread_index)
+{
+ int ret = -1;
+ int thread_spawn_count = 0;
+ int index = 0;
+ pthread_t *tid = NULL;
+ char thread_name[GF_THREAD_NAMEMAX] = {0,};
+
+ if (!defrag)
+ goto out;
+
+ /* Initialize global entry queue */
+ defrag->queue = GF_CALLOC (1, sizeof (struct dht_container),
+ gf_dht_mt_container_t);
+
+ if (!defrag->queue) {
+ gf_msg (this->name, GF_LOG_ERROR, ENOMEM, 0,
+ "Failed to initialise migration queue");
+ ret = -1;
+ goto out;
+ }
+
+ INIT_LIST_HEAD (&(defrag->queue[0].list));
+
+ thread_spawn_count = MAX (MAX_REBAL_THREADS, 4);
+
+ gf_msg_debug (this->name, 0, "thread_spawn_count: %d",
+ thread_spawn_count);
+
+ tid = GF_CALLOC (thread_spawn_count, sizeof (pthread_t),
+ gf_common_mt_pthread_t);
+ if (!tid) {
+ gf_msg (this->name, GF_LOG_ERROR, ENOMEM, 0,
+ "Failed to create migration threads");
+ ret = -1;
+ goto out;
+ }
+ defrag->current_thread_count = thread_spawn_count;
+
+ /*Spawn Threads Here*/
+ while (index < thread_spawn_count) {
+ snprintf (thread_name, sizeof(thread_name),
+ "%s%d", "dhtmig", index + 1);
+ ret = gf_thread_create (&(tid[index]), NULL,
+ &gf_defrag_task, (void *)defrag,
+ thread_name);
+ if (ret != 0) {
+ gf_msg ("DHT", GF_LOG_ERROR, ret, 0,
+ "Thread[%d] creation failed. ",
+ index);
+ ret = -1;
+ goto out;
+ } else {
+ gf_log ("DHT", GF_LOG_INFO, "Thread[%d] "
+ "creation successful", index);
+ }
+ index++;
+ }
+
+ ret = 0;
+out:
+ *thread_index = index;
+ *tid_array = tid;
+
+ return ret;
+}
+
+int
+gf_defrag_parallel_migration_cleanup (gf_defrag_info_t *defrag,
+ pthread_t *tid_array, int thread_index)
+{
+ int ret = -1;
+ int i = 0;
+
+ if (!defrag)
+ goto out;
+
+ /* Wake up all migration threads */
+ pthread_mutex_lock (&defrag->dfq_mutex);
+ {
+ defrag->crawl_done = 1;
+
+ pthread_cond_broadcast (&defrag->parallel_migration_cond);
+ pthread_cond_broadcast (&defrag->df_wakeup_thread);
+ }
+ pthread_mutex_unlock (&defrag->dfq_mutex);
+
+ /*Wait for all the threads to complete their task*/
+ for (i = 0; i < thread_index; i++) {
+ pthread_join (tid_array[i], NULL);
+ }
+
+ GF_FREE (tid_array);
+
+ /* Cleanup the migration queue */
+ if (defrag->queue) {
+ gf_dirent_free (defrag->queue[0].df_entry);
+ INIT_LIST_HEAD (&(defrag->queue[0].list));
+ }
+
+ GF_FREE (defrag->queue);
+
+ ret = 0;
+out:
+ return ret;
+}
+
int
@@ -4580,28 +4715,22 @@ gf_defrag_start_crawl (void *data)
xlator_t *this = NULL;
dht_conf_t *conf = NULL;
gf_defrag_info_t *defrag = NULL;
- int ret = -1;
- loc_t loc = {0,};
- struct iatt iatt = {0,};
- struct iatt parent = {0,};
dict_t *fix_layout = NULL;
dict_t *migrate_data = NULL;
dict_t *status = NULL;
glusterfs_ctx_t *ctx = NULL;
dht_methods_t *methods = NULL;
- int i = 0;
+ call_frame_t *statfs_frame = NULL;
+ xlator_t *old_THIS = NULL;
+ int ret = -1;
+ loc_t loc = {0,};
+ struct iatt iatt = {0,};
+ struct iatt parent = {0,};
int thread_index = 0;
- int err = 0;
- int thread_spawn_count = 0;
pthread_t *tid = NULL;
- char thread_name[GF_THREAD_NAMEMAX] = {0,};
pthread_t filecnt_thread;
gf_boolean_t is_tier_detach = _gf_false;
- call_frame_t *statfs_frame = NULL;
- xlator_t *old_THIS = NULL;
- int j = 0;
gf_boolean_t fc_thread_started = _gf_false;
- uuid_t *uuid_ptr = NULL;
this = data;
if (!this)
@@ -4717,6 +4846,8 @@ gf_defrag_start_crawl (void *data)
}
if (defrag->cmd != GF_DEFRAG_CMD_START_LAYOUT_FIX) {
+ /* We need to migrate files */
+
migrate_data = dict_new ();
if (!migrate_data) {
defrag->total_failures++;
@@ -4732,102 +4863,32 @@ gf_defrag_start_crawl (void *data)
goto out;
}
- ret = dht_get_local_subvols_and_nodeuuids (this, conf, &loc);
+ ret = dht_init_local_subvols_and_nodeuuids (this, conf, &loc);
if (ret) {
ret = -1;
goto out;
}
- for (i = 0 ; i < conf->local_subvols_cnt; i++) {
- gf_msg (this->name, GF_LOG_INFO, 0, 0, "local subvols "
- "are %s", conf->local_subvols[i]->name);
-
- for (j = 0; j < conf->local_nodeuuids[i].count; j++) {
- uuid_ptr = &(conf->local_nodeuuids[i].elements[j].uuid);
- gf_msg (this->name, GF_LOG_INFO, 0, 0,
- "node uuids are %s",
- uuid_utoa(*uuid_ptr));
- }
- }
-
- g_totalsize = gf_defrag_total_file_size (this, &loc);
- if (!g_totalsize) {
- gf_msg (this->name, GF_LOG_ERROR, 0, 0, "Failed to get "
- "the total data size. Unable to estimate "
- "time to complete rebalance.");
- }
-
- g_totalfiles = gf_defrag_total_file_cnt (this, &loc);
- if (!g_totalfiles) {
- gf_msg (this->name, GF_LOG_ERROR, 0, 0, "Failed to get "
- "the total number of files. Unable to estimate "
- "time to complete rebalance.");
+ /* Initialise the structures required for parallel migration */
+ ret = gf_defrag_parallel_migration_init (this, defrag, &tid,
+ &thread_index);
+ if (ret) {
+ gf_msg (this->name, GF_LOG_ERROR, 0, 0,
+ "Aborting rebalance.");
+ goto out;
}
- ret = gf_thread_create (&filecnt_thread, NULL,
- &dht_file_counter_thread,
- (void *)defrag, "dhtfcnt");
-
+ ret = gf_defrag_estimates_init (this, &loc, &filecnt_thread);
if (ret) {
- gf_msg (this->name, GF_LOG_ERROR, ret, 0, "Failed to "
- "create the file counter thread ");
+ /* Not a fatal error. Allow the rebalance to proceed*/
ret = 0;
} else {
fc_thread_started = _gf_true;
}
-
- /* Initialize global entry queue */
- defrag->queue = GF_CALLOC (1, sizeof (struct dht_container),
- gf_dht_mt_container_t);
-
- if (!defrag->queue) {
- gf_log (this->name, GF_LOG_ERROR, "No memory for "
- "queue");
- ret = -1;
- goto out;
- }
-
- INIT_LIST_HEAD (&(defrag->queue[0].list));
-
- thread_spawn_count = MAX (MAX_REBAL_THREADS, 4);
-
- gf_msg_debug (this->name, 0, "thread_spawn_count: %d",
- thread_spawn_count);
-
- tid = GF_CALLOC (thread_spawn_count, sizeof (pthread_t),
- gf_common_mt_pthread_t);
- if (!tid) {
- gf_log (this->name, GF_LOG_ERROR, "Insufficient memory "
- "for tid");
- ret = -1;
- goto out;
- }
-
- defrag->current_thread_count = thread_spawn_count;
-
- /*Spawn Threads Here*/
- while (thread_index < thread_spawn_count) {
- snprintf (thread_name, sizeof(thread_name),
- "%s%d", "dhtdf", thread_index + 1);
- err = gf_thread_create (&(tid[thread_index]), NULL,
- &gf_defrag_task, (void *)defrag,
- thread_name);
- if (err != 0) {
- gf_log ("DHT", GF_LOG_ERROR,
- "Thread[%d] creation failed. "
- "Aborting Rebalance",
- thread_index);
- ret = -1;
- goto out;
- } else {
- gf_log ("DHT", GF_LOG_INFO, "Thread[%d] "
- "creation successful", thread_index);
- }
- thread_index++;
- }
}
+
if (defrag->cmd == GF_DEFRAG_CMD_START_TIER) {
/* Fix layout for attach tier */
ret = gf_tier_start_fix_layout (this, &loc, defrag, fix_layout);
@@ -4882,23 +4943,6 @@ out:
defrag->defrag_status = GF_DEFRAG_STATUS_FAILED;
}
- pthread_mutex_lock (&defrag->dfq_mutex);
- {
- defrag->crawl_done = 1;
-
- pthread_cond_broadcast (
- &defrag->parallel_migration_cond);
- pthread_cond_broadcast (
- &defrag->df_wakeup_thread);
- }
- pthread_mutex_unlock (&defrag->dfq_mutex);
-
- /*Wait for all the threads to complete their task*/
- for (i = 0; i < thread_index; i++) {
- pthread_join (tid[i], NULL);
- }
-
- GF_FREE (tid);
if (defrag->cmd == GF_DEFRAG_CMD_START_TIER) {
/* Wait for the tier fixlayout to
@@ -4913,10 +4957,7 @@ out:
gf_tier_clear_fix_layout (this, &loc, defrag);
}
- if (defrag->queue) {
- gf_dirent_free (defrag->queue[0].df_entry);
- INIT_LIST_HEAD (&(defrag->queue[0].list));
- }
+ gf_defrag_parallel_migration_cleanup (defrag, tid, thread_index);
if ((defrag->defrag_status != GF_DEFRAG_STATUS_STOPPED) &&
(defrag->defrag_status != GF_DEFRAG_STATUS_FAILED)) {
@@ -4924,13 +4965,7 @@ out:
}
if (fc_thread_started) {
- pthread_mutex_lock (&defrag->fc_mutex);
- {
- pthread_cond_broadcast (&defrag->fc_wakeup_cond);
- }
- pthread_mutex_unlock (&defrag->fc_mutex);
-
- pthread_join (filecnt_thread, NULL);
+ gf_defrag_estimates_cleanup (this, defrag, filecnt_thread);
}
dht_send_rebalance_event (this, defrag->cmd, defrag->defrag_status);
@@ -4947,7 +4982,6 @@ out:
}
UNLOCK (&defrag->lock);
- GF_FREE (defrag->queue);
GF_FREE (defrag);
conf->defrag = NULL;
@@ -5075,84 +5109,6 @@ out:
}
-
-uint64_t
-gf_defrag_get_estimates (dht_conf_t *conf)
-{
- gf_defrag_info_t *defrag = NULL;
- loc_t loc = {0,};
- double rate_lookedup = 0;
- uint64_t dirs_processed = 0;
- uint64_t files_processed = 0;
- uint64_t total_processed = 0;
- uint64_t tmp_count = 0;
- uint64_t time_to_complete = 0;
- struct timeval now = {0,};
- double elapsed = 0;
-
-
- defrag = conf->defrag;
-
- if (!g_totalfiles)
- goto out;
-
- gettimeofday (&now, NULL);
- elapsed = now.tv_sec - defrag->start_time.tv_sec;
-
- /* I tried locking before accessing num_files_lookedup and
- * num_dirs_processed but the status function
- * never seemed to get the lock, causing the status cli to
- * hang.
- */
-
- dirs_processed = defrag->num_dirs_processed;
- files_processed = defrag->num_files_lookedup;
-
- total_processed = files_processed + dirs_processed;
-
- if (total_processed > g_totalfiles) {
- /* lookup the number of files again
- * The problem here is that not all the newly added files
- * might need to be processed. So this need not work
- * in some cases
- */
- dht_build_root_loc (defrag->root_inode, &loc);
- g_totalfiles = gf_defrag_total_file_cnt (defrag->this, &loc);
- if (!g_totalfiles)
- goto out;
- }
-
- /* rate at which files looked up */
- rate_lookedup = (total_processed)/elapsed;
-
- /* We initially sum up dirs across all local subvols because we get the
- * file count from the inodes on each subvol.
- * The same directories will be counted for each subvol but
- * we want them to be counted once.
- */
-
- tmp_count = g_totalfiles
- - (dirs_processed * (conf->local_subvols_cnt - 1));
-
- if (rate_lookedup) {
- time_to_complete = (tmp_count)/rate_lookedup;
-
- } else {
-
- gf_msg (THIS->name, GF_LOG_ERROR, 0, 0,
- "Unable to calculate estimated time for rebalance");
- }
-
- gf_log (THIS->name, GF_LOG_INFO,
- "TIME: (count) total_processed=%"PRIu64" tmp_cnt = %"PRIu64","
- "rate_lookedup=%f", total_processed, tmp_count,
- rate_lookedup);
-
-out:
- return time_to_complete;
-}
-
-
int
gf_defrag_status_get (dht_conf_t *conf, dict_t *dict)
{
@@ -5196,18 +5152,6 @@ gf_defrag_status_get (dht_conf_t *conf, dict_t *dict)
if ((defrag->cmd != GF_DEFRAG_CMD_START_TIER)
&& (defrag->defrag_status == GF_DEFRAG_STATUS_STARTED)) {
-/*
- time_to_complete = gf_defrag_get_estimates (conf);
-
- if (time_to_complete && (time_to_complete > elapsed))
- time_left = time_to_complete - elapsed;
-
- gf_log (THIS->name, GF_LOG_INFO,
- "TIME: Estimated total time to complete based on"
- " count = %"PRIu64 " seconds, seconds left = %"PRIu64"",
- time_to_complete, time_left);
-*/
-
time_to_complete = gf_defrag_get_estimates_based_on_size (conf);
if (time_to_complete && (time_to_complete > elapsed))