summaryrefslogtreecommitdiffstats
path: root/xlators/cluster/dht/src/dht-rebalance.c
diff options
context:
space:
mode:
authorSusant Palai <spalai@redhat.com>2015-05-04 16:43:02 +0530
committerVijay Bellur <vbellur@redhat.com>2015-05-07 10:26:54 -0700
commit04a72476789d06c0e55b73f953ff1ad68739ebd4 (patch)
treef9a79337f1a133d6cbcffe5a6a444412197a5990 /xlators/cluster/dht/src/dht-rebalance.c
parent641e91fa83ae70514745d55728e1e32a740c2401 (diff)
dht/rebalance: Throttle rebalance
Throttle value will be "normal" by default. For throttling down, a thread will be put in to sleep. And for throttling up, gf_defrag_process_dir will wake up the sleeping threads. Change-Id: I4892ab14982a1ff305aeb2d8bbd33c79d6877b69 BUG: 1219579 Signed-off-by: Susant Palai <spalai@redhat.com> Reviewed-on: http://review.gluster.org/10526 Reviewed-by: Raghavendra G <rgowdapp@redhat.com> Tested-by: Raghavendra G <rgowdapp@redhat.com> Reviewed-on: http://review.gluster.org/10629 Reviewed-by: Vijay Bellur <vbellur@redhat.com> Tested-by: Vijay Bellur <vbellur@redhat.com>
Diffstat (limited to 'xlators/cluster/dht/src/dht-rebalance.c')
-rw-r--r--xlators/cluster/dht/src/dht-rebalance.c68
1 files changed, 66 insertions, 2 deletions
diff --git a/xlators/cluster/dht/src/dht-rebalance.c b/xlators/cluster/dht/src/dht-rebalance.c
index 11b63c8ee01..d229dd69000 100644
--- a/xlators/cluster/dht/src/dht-rebalance.c
+++ b/xlators/cluster/dht/src/dht-rebalance.c
@@ -19,14 +19,17 @@
#include <signal.h>
#include <fnmatch.h>
#include <signal.h>
+#include <sys/sysinfo.h>
#define GF_DISK_SECTOR_SIZE 512
#define DHT_REBALANCE_PID 4242 /* Change it if required */
#define DHT_REBALANCE_BLKSIZE (128 * 1024)
-#define MAX_MIGRATOR_THREAD_COUNT 20
+#define MAX_MIGRATOR_THREAD_COUNT 40
#define MAX_MIGRATE_QUEUE_COUNT 500
#define MIN_MIGRATE_QUEUE_COUNT 200
+#define MAX(a, b) (((a) > (b))?(a):(b))
+
#define GF_CRAWL_INDEX_MOVE(idx, sv_cnt) { \
idx++; \
idx %= sv_cnt; \
@@ -1725,6 +1728,36 @@ gf_defrag_task (void *opaque)
pthread_mutex_lock (&defrag->dfq_mutex);
{
+
+ /*Throttle down:
+ If the reconfigured count is less than current thread
+ count, then the current thread will sleep */
+
+ /*TODO: Need to refactor the following block to work
+ *under defrag->lock. For now access
+ * defrag->current_thread_count and rthcount under
+ * dfq_mutex lock */
+ while (!defrag->crawl_done &&
+ (defrag->recon_thread_count <
+ defrag->current_thread_count)) {
+ defrag->current_thread_count--;
+ gf_log ("DHT", GF_LOG_INFO,
+ "Thread sleeping. "
+ "defrag->current_thread_count: %d",
+ defrag->current_thread_count);
+
+ pthread_cond_wait (
+ &defrag->df_wakeup_thread,
+ &defrag->dfq_mutex);
+
+ defrag->current_thread_count++;
+
+ gf_log ("DHT", GF_LOG_INFO,
+ "Thread wokeup. "
+ "defrag->current_thread_count: %d",
+ defrag->current_thread_count);
+ }
+
if (defrag->q_entry_count) {
iterator = list_entry (q_head->next,
typeof(*iterator), list);
@@ -2068,9 +2101,11 @@ gf_defrag_process_dir (xlator_t *this, gf_defrag_info_t *defrag, loc_t *loc,
double elapsed = {0,};
int local_subvols_cnt = 0;
int i = 0;
+ int j = 0;
struct dht_container *container = NULL;
int ldfq_count = 0;
int dfc_index = 0;
+ int throttle_up = 0;
struct dir_dfmeta *dir_dfmeta = NULL;
gf_log (this->name, GF_LOG_INFO, "migrate data called on %s",
@@ -2198,6 +2233,25 @@ gf_defrag_process_dir (xlator_t *this, gf_defrag_info_t *defrag, loc_t *loc,
pthread_mutex_lock (&defrag->dfq_mutex);
{
+
+ /*Throttle up: If reconfigured count is higher than
+ current thread count, wake up the sleeping threads
+ TODO: Need to refactor this. Instead of making the
+ thread sleep and wake, we should terminate and spawn
+ threads on-demand*/
+
+ if (defrag->recon_thread_count >
+ defrag->current_thread_count) {
+ throttle_up =
+ (defrag->recon_thread_count -
+ defrag->current_thread_count);
+ for (j = 0; j < throttle_up; j++) {
+ pthread_cond_signal (
+ &defrag->df_wakeup_thread);
+ }
+
+ }
+
while (defrag->q_entry_count >
MAX_MIGRATE_QUEUE_COUNT) {
defrag->wakeup_crawler = 1;
@@ -2474,6 +2528,7 @@ gf_defrag_start_crawl (void *data)
int i = 0;
int thread_index = 0;
int err = 0;
+ int thread_spawn_count = 0;
pthread_t tid[MAX_MIGRATOR_THREAD_COUNT];
this = data;
@@ -2586,8 +2641,15 @@ gf_defrag_start_crawl (void *data)
INIT_LIST_HEAD (&(defrag->queue[0].list));
+ thread_spawn_count = MAX ((get_nprocs() - 4), 4);
+
+ gf_log (this->name, GF_LOG_DEBUG, "thread_spawn_count: %d",
+ thread_spawn_count);
+
+ defrag->current_thread_count = thread_spawn_count;
+
/*Spawn Threads Here*/
- while (thread_index < MAX_MIGRATOR_THREAD_COUNT) {
+ while (thread_index < thread_spawn_count) {
err = pthread_create(&(tid[thread_index]), NULL,
&gf_defrag_task, (void *)defrag);
if (err != 0) {
@@ -2636,6 +2698,8 @@ out:
pthread_cond_broadcast (
&defrag->parallel_migration_cond);
+ pthread_cond_broadcast (
+ &defrag->df_wakeup_thread);
}
pthread_mutex_unlock (&defrag->dfq_mutex);