From 04a72476789d06c0e55b73f953ff1ad68739ebd4 Mon Sep 17 00:00:00 2001 From: Susant Palai Date: Mon, 4 May 2015 16:43:02 +0530 Subject: dht/rebalance: Throttle rebalance Throttle value will be "normal" by default. For throttling down, a thread will be put in to sleep. And for throttling up, gf_defrag_process_dir will wake up the sleeping threads. Change-Id: I4892ab14982a1ff305aeb2d8bbd33c79d6877b69 BUG: 1219579 Signed-off-by: Susant Palai Reviewed-on: http://review.gluster.org/10526 Reviewed-by: Raghavendra G Tested-by: Raghavendra G Reviewed-on: http://review.gluster.org/10629 Reviewed-by: Vijay Bellur Tested-by: Vijay Bellur --- xlators/cluster/dht/src/dht-rebalance.c | 68 ++++++++++++++++++++++++++++++++- 1 file changed, 66 insertions(+), 2 deletions(-) (limited to 'xlators/cluster/dht/src/dht-rebalance.c') diff --git a/xlators/cluster/dht/src/dht-rebalance.c b/xlators/cluster/dht/src/dht-rebalance.c index 11b63c8ee01..d229dd69000 100644 --- a/xlators/cluster/dht/src/dht-rebalance.c +++ b/xlators/cluster/dht/src/dht-rebalance.c @@ -19,14 +19,17 @@ #include #include #include +#include #define GF_DISK_SECTOR_SIZE 512 #define DHT_REBALANCE_PID 4242 /* Change it if required */ #define DHT_REBALANCE_BLKSIZE (128 * 1024) -#define MAX_MIGRATOR_THREAD_COUNT 20 +#define MAX_MIGRATOR_THREAD_COUNT 40 #define MAX_MIGRATE_QUEUE_COUNT 500 #define MIN_MIGRATE_QUEUE_COUNT 200 +#define MAX(a, b) (((a) > (b))?(a):(b)) + #define GF_CRAWL_INDEX_MOVE(idx, sv_cnt) { \ idx++; \ idx %= sv_cnt; \ @@ -1725,6 +1728,36 @@ gf_defrag_task (void *opaque) pthread_mutex_lock (&defrag->dfq_mutex); { + + /*Throttle down: + If the reconfigured count is less than current thread + count, then the current thread will sleep */ + + /*TODO: Need to refactor the following block to work + *under defrag->lock. For now access + * defrag->current_thread_count and rthcount under + * dfq_mutex lock */ + while (!defrag->crawl_done && + (defrag->recon_thread_count < + defrag->current_thread_count)) { + defrag->current_thread_count--; + gf_log ("DHT", GF_LOG_INFO, + "Thread sleeping. " + "defrag->current_thread_count: %d", + defrag->current_thread_count); + + pthread_cond_wait ( + &defrag->df_wakeup_thread, + &defrag->dfq_mutex); + + defrag->current_thread_count++; + + gf_log ("DHT", GF_LOG_INFO, + "Thread wokeup. " + "defrag->current_thread_count: %d", + defrag->current_thread_count); + } + if (defrag->q_entry_count) { iterator = list_entry (q_head->next, typeof(*iterator), list); @@ -2068,9 +2101,11 @@ gf_defrag_process_dir (xlator_t *this, gf_defrag_info_t *defrag, loc_t *loc, double elapsed = {0,}; int local_subvols_cnt = 0; int i = 0; + int j = 0; struct dht_container *container = NULL; int ldfq_count = 0; int dfc_index = 0; + int throttle_up = 0; struct dir_dfmeta *dir_dfmeta = NULL; gf_log (this->name, GF_LOG_INFO, "migrate data called on %s", @@ -2198,6 +2233,25 @@ gf_defrag_process_dir (xlator_t *this, gf_defrag_info_t *defrag, loc_t *loc, pthread_mutex_lock (&defrag->dfq_mutex); { + + /*Throttle up: If reconfigured count is higher than + current thread count, wake up the sleeping threads + TODO: Need to refactor this. Instead of making the + thread sleep and wake, we should terminate and spawn + threads on-demand*/ + + if (defrag->recon_thread_count > + defrag->current_thread_count) { + throttle_up = + (defrag->recon_thread_count - + defrag->current_thread_count); + for (j = 0; j < throttle_up; j++) { + pthread_cond_signal ( + &defrag->df_wakeup_thread); + } + + } + while (defrag->q_entry_count > MAX_MIGRATE_QUEUE_COUNT) { defrag->wakeup_crawler = 1; @@ -2474,6 +2528,7 @@ gf_defrag_start_crawl (void *data) int i = 0; int thread_index = 0; int err = 0; + int thread_spawn_count = 0; pthread_t tid[MAX_MIGRATOR_THREAD_COUNT]; this = data; @@ -2586,8 +2641,15 @@ gf_defrag_start_crawl (void *data) INIT_LIST_HEAD (&(defrag->queue[0].list)); + thread_spawn_count = MAX ((get_nprocs() - 4), 4); + + gf_log (this->name, GF_LOG_DEBUG, "thread_spawn_count: %d", + thread_spawn_count); + + defrag->current_thread_count = thread_spawn_count; + /*Spawn Threads Here*/ - while (thread_index < MAX_MIGRATOR_THREAD_COUNT) { + while (thread_index < thread_spawn_count) { err = pthread_create(&(tid[thread_index]), NULL, &gf_defrag_task, (void *)defrag); if (err != 0) { @@ -2636,6 +2698,8 @@ out: pthread_cond_broadcast ( &defrag->parallel_migration_cond); + pthread_cond_broadcast ( + &defrag->df_wakeup_thread); } pthread_mutex_unlock (&defrag->dfq_mutex); -- cgit