diff options
| author | Susant Palai <spalai@redhat.com> | 2015-05-04 16:43:02 +0530 | 
|---|---|---|
| committer | Vijay Bellur <vbellur@redhat.com> | 2015-05-07 10:26:54 -0700 | 
| commit | 04a72476789d06c0e55b73f953ff1ad68739ebd4 (patch) | |
| tree | f9a79337f1a133d6cbcffe5a6a444412197a5990 | |
| parent | 641e91fa83ae70514745d55728e1e32a740c2401 (diff) | |
dht/rebalance: Throttle rebalance
Throttle value will be "normal" by default. For throttling down,
a thread will be put in to sleep. And for throttling up,
gf_defrag_process_dir will wake up the sleeping threads.
Change-Id: I4892ab14982a1ff305aeb2d8bbd33c79d6877b69
BUG: 1219579
Signed-off-by: Susant Palai <spalai@redhat.com>
Reviewed-on: http://review.gluster.org/10526
Reviewed-by: Raghavendra G <rgowdapp@redhat.com>
Tested-by: Raghavendra G <rgowdapp@redhat.com>
Reviewed-on: http://review.gluster.org/10629
Reviewed-by: Vijay Bellur <vbellur@redhat.com>
Tested-by: Vijay Bellur <vbellur@redhat.com>
| -rw-r--r-- | tests/basic/distribute/throttle-rebal.t | 42 | ||||
| -rw-r--r-- | xlators/cluster/dht/src/dht-common.h | 8 | ||||
| -rw-r--r-- | xlators/cluster/dht/src/dht-rebalance.c | 68 | ||||
| -rw-r--r-- | xlators/cluster/dht/src/dht-shared.c | 61 | ||||
| -rw-r--r-- | xlators/mgmt/glusterd/src/glusterd-volume-set.c | 36 | 
5 files changed, 211 insertions, 4 deletions
diff --git a/tests/basic/distribute/throttle-rebal.t b/tests/basic/distribute/throttle-rebal.t new file mode 100644 index 00000000000..89495aee71b --- /dev/null +++ b/tests/basic/distribute/throttle-rebal.t @@ -0,0 +1,42 @@ +#!/bin/bash +# Test to check +. $(dirname $0)/../../include.rc +. $(dirname $0)/../../volume.rc + +#Check rebal-throttle set option sanity +cleanup; + +TEST glusterd +TEST pidof glusterd +TEST $CLI volume create $V0 $H0:$B0/brick1 $H0:$B0/brick2 +TEST $CLI volume start $V0 + +function set_throttle { +        local level=$1 +        $CLI volume set $V0 cluster.rebal-throttle $level 2>&1 |grep -oE 'success|failed' +} + + +THROTTLE_LEVEL="lazy" +EXPECT "success" set_throttle $THROTTLE_LEVEL +EXPECT "$THROTTLE_LEVEL" echo `$CLI volume info | grep rebal-throttle | awk '{print $2}'` + +THROTTLE_LEVEL="normal" +EXPECT "success" set_throttle $THROTTLE_LEVEL +EXPECT "$THROTTLE_LEVEL" echo `$CLI volume info | grep rebal-throttle | awk '{print $2}'` + + +THROTTLE_LEVEL="aggressive" +EXPECT "success" set_throttle $THROTTLE_LEVEL +EXPECT "$THROTTLE_LEVEL" echo `$CLI volume info | grep rebal-throttle | awk '{print $2}'` + +THROTTLE_LEVEL="garbage" +EXPECT "failed" set_throttle $THROTTLE_LEVEL + +#check if throttle-level is still aggressive +EXPECT "aggressive" echo `$CLI volume info | grep rebal-throttle | awk '{print $2}'` + +TEST $CLI volume stop $V0; +TEST $CLI volume delete $V0; + +cleanup; diff --git a/xlators/cluster/dht/src/dht-common.h b/xlators/cluster/dht/src/dht-common.h index 0e290465d44..3a1906c0108 100644 --- a/xlators/cluster/dht/src/dht-common.h +++ b/xlators/cluster/dht/src/dht-common.h @@ -342,6 +342,13 @@ struct gf_defrag_info_ {          int32_t                      abort;          int32_t                      wakeup_crawler; +        /*Throttle params*/ +        /*stands for reconfigured thread count*/ +        int32_t                      recon_thread_count; +        /*stands for current running thread count*/ +        int32_t                      current_thread_count; +        pthread_cond_t               df_wakeup_thread; +          /* Hard link handle requirement */          synclock_t                   link_lock;  }; @@ -417,6 +424,7 @@ struct dht_conf {          /* Support size-weighted rebalancing (heterogeneous bricks). */          gf_boolean_t    do_weighting;          gf_boolean_t    randomize_by_gfid; +        char           *dthrottle;          dht_methods_t  *methods; diff --git a/xlators/cluster/dht/src/dht-rebalance.c b/xlators/cluster/dht/src/dht-rebalance.c index 11b63c8ee01..d229dd69000 100644 --- a/xlators/cluster/dht/src/dht-rebalance.c +++ b/xlators/cluster/dht/src/dht-rebalance.c @@ -19,14 +19,17 @@  #include <signal.h>  #include <fnmatch.h>  #include <signal.h> +#include <sys/sysinfo.h>  #define GF_DISK_SECTOR_SIZE             512  #define DHT_REBALANCE_PID               4242 /* Change it if required */  #define DHT_REBALANCE_BLKSIZE           (128 * 1024) -#define MAX_MIGRATOR_THREAD_COUNT       20 +#define MAX_MIGRATOR_THREAD_COUNT       40  #define MAX_MIGRATE_QUEUE_COUNT         500  #define MIN_MIGRATE_QUEUE_COUNT         200 +#define MAX(a, b) (((a) > (b))?(a):(b)) +  #define GF_CRAWL_INDEX_MOVE(idx, sv_cnt)  {     \                  idx++;                          \                  idx %= sv_cnt;                  \ @@ -1725,6 +1728,36 @@ gf_defrag_task (void *opaque)                  pthread_mutex_lock (&defrag->dfq_mutex);                  { + +                        /*Throttle down: +                          If the reconfigured count is less than current thread +                          count, then the current thread will sleep */ + +                        /*TODO: Need to refactor the following block to work +                         *under defrag->lock. For now access +                         * defrag->current_thread_count and rthcount under +                         * dfq_mutex lock */ +                        while (!defrag->crawl_done && +                              (defrag->recon_thread_count < +                                        defrag->current_thread_count)) { +                                defrag->current_thread_count--; +                                gf_log ("DHT", GF_LOG_INFO, +                                        "Thread sleeping. " +                                        "defrag->current_thread_count: %d", +                                         defrag->current_thread_count); + +                                pthread_cond_wait ( +                                           &defrag->df_wakeup_thread, +                                           &defrag->dfq_mutex); + +                                defrag->current_thread_count++; + +                                gf_log ("DHT", GF_LOG_INFO, +                                        "Thread wokeup. " +                                        "defrag->current_thread_count: %d", +                                         defrag->current_thread_count); +                        } +                          if (defrag->q_entry_count) {                                  iterator = list_entry (q_head->next,                                                  typeof(*iterator), list); @@ -2068,9 +2101,11 @@ gf_defrag_process_dir (xlator_t *this, gf_defrag_info_t *defrag, loc_t *loc,          double                   elapsed           = {0,};          int                      local_subvols_cnt = 0;          int                      i                 = 0; +        int                      j                 = 0;          struct  dht_container   *container         = NULL;          int                      ldfq_count        = 0;          int                      dfc_index         = 0; +        int                      throttle_up       = 0;          struct dir_dfmeta       *dir_dfmeta        = NULL;          gf_log (this->name, GF_LOG_INFO, "migrate data called on %s", @@ -2198,6 +2233,25 @@ gf_defrag_process_dir (xlator_t *this, gf_defrag_info_t *defrag, loc_t *loc,                  pthread_mutex_lock (&defrag->dfq_mutex);                  { + +                      /*Throttle up: If reconfigured count is higher than +                        current thread count, wake up the sleeping threads +                        TODO: Need to refactor this. Instead of making the +                        thread sleep and wake, we should terminate and spawn +                        threads on-demand*/ + +                        if (defrag->recon_thread_count > +                                         defrag->current_thread_count) { +                                throttle_up = +                                        (defrag->recon_thread_count - +                                            defrag->current_thread_count); +                                for (j = 0; j < throttle_up; j++) { +                                        pthread_cond_signal ( +                                             &defrag->df_wakeup_thread); +                                } + +                        } +                          while (defrag->q_entry_count >                                          MAX_MIGRATE_QUEUE_COUNT) {                                  defrag->wakeup_crawler = 1; @@ -2474,6 +2528,7 @@ gf_defrag_start_crawl (void *data)          int                      i              = 0;          int                     thread_index    = 0;          int                     err             = 0; +        int                     thread_spawn_count = 0;          pthread_t tid[MAX_MIGRATOR_THREAD_COUNT];          this = data; @@ -2586,8 +2641,15 @@ gf_defrag_start_crawl (void *data)                  INIT_LIST_HEAD (&(defrag->queue[0].list)); +                thread_spawn_count = MAX ((get_nprocs() - 4), 4); + +                gf_log (this->name, GF_LOG_DEBUG, "thread_spawn_count: %d", +                        thread_spawn_count); + +                defrag->current_thread_count = thread_spawn_count; +                  /*Spawn Threads Here*/ -                while (thread_index < MAX_MIGRATOR_THREAD_COUNT) { +                while (thread_index < thread_spawn_count) {                          err = pthread_create(&(tid[thread_index]), NULL,                                       &gf_defrag_task, (void *)defrag);                          if (err != 0) { @@ -2636,6 +2698,8 @@ out:                  pthread_cond_broadcast (                          &defrag->parallel_migration_cond); +                pthread_cond_broadcast ( +                        &defrag->df_wakeup_thread);          }          pthread_mutex_unlock (&defrag->dfq_mutex); diff --git a/xlators/cluster/dht/src/dht-shared.c b/xlators/cluster/dht/src/dht-shared.c index 3eccff925fb..22c843a5c77 100644 --- a/xlators/cluster/dht/src/dht-shared.c +++ b/xlators/cluster/dht/src/dht-shared.c @@ -15,11 +15,32 @@  #endif  /* TODO: add NS locking */ - +#include <sys/sysinfo.h>  #include "statedump.h"  #include "dht-common.h"  #include "dht-messages.h" +#define MAX(a, b) (((a) > (b))?(a):(b)) + +#define GF_DECIDE_DEFRAG_THROTTLE_COUNT(throttle_count, conf) {         \ +                                                                        \ +                pthread_mutex_lock (&conf->defrag->dfq_mutex);          \ +                                                                        \ +                if (!strcasecmp (conf->dthrottle, "lazy"))              \ +                        conf->defrag->recon_thread_count = 1;           \ +                                                                        \ +                throttle_count = MAX ((get_nprocs() - 4), 4);           \ +                                                                        \ +                if (!strcasecmp (conf->dthrottle, "normal"))            \ +                        conf->defrag->recon_thread_count =              \ +                                                 (throttle_count / 2);  \ +                                                                        \ +                if (!strcasecmp (conf->dthrottle, "aggressive"))        \ +                        conf->defrag->recon_thread_count =              \ +                                                 throttle_count;        \ +                                                                        \ +                pthread_mutex_unlock (&conf->defrag->dfq_mutex);        \ +        }                                                               \  /* TODO:     - use volumename in xattr instead of "dht" @@ -374,6 +395,7 @@ dht_reconfigure (xlator_t *this, dict_t *options)          char            *temp_str = NULL;          gf_boolean_t     search_unhashed;          int              ret = -1; +        int              throttle_count = 0;          GF_VALIDATE_OR_GOTO ("dht", this, out);          GF_VALIDATE_OR_GOTO ("dht", options, out); @@ -426,6 +448,16 @@ dht_reconfigure (xlator_t *this, dict_t *options)                            conf->randomize_by_gfid,                            options, bool, out); +        GF_OPTION_RECONF ("rebal-throttle", conf->dthrottle, options, +                          str, out); + +        if (conf->defrag) { +                GF_DECIDE_DEFRAG_THROTTLE_COUNT (throttle_count, conf); +                gf_log ("DHT", GF_LOG_INFO, "conf->dthrottle: %s, " +                        "conf->defrag->recon_thread_count: %d", +                         conf->dthrottle, conf->defrag->recon_thread_count); +        } +          if (conf->defrag) {                  GF_OPTION_RECONF ("rebalance-stats", conf->defrag->stats,                                    options, bool, out); @@ -534,7 +566,7 @@ dht_init (xlator_t *this)          gf_defrag_info_t                *defrag         = NULL;          int                              cmd            = 0;          char                            *node_uuid      = NULL; - +        int                              throttle_count = 0;          GF_VALIDATE_OR_GOTO ("dht", this, err); @@ -604,6 +636,8 @@ dht_init (xlator_t *this)                  pthread_mutex_init (&defrag->dfq_mutex, 0);                  pthread_cond_init  (&defrag->parallel_migration_cond, 0);                  pthread_cond_init  (&defrag->rebalance_crawler_alarm, 0); +                pthread_cond_init  (&defrag->df_wakeup_thread, 0); +                  defrag->global_error = 0;          } @@ -710,6 +744,17 @@ dht_init (xlator_t *this)          GF_OPTION_INIT ("randomize-hash-range-by-gfid",                          conf->randomize_by_gfid, bool, err); +        if (defrag) { +                GF_OPTION_INIT ("rebal-throttle", +                                 conf->dthrottle, str, err); + +                GF_DECIDE_DEFRAG_THROTTLE_COUNT(throttle_count, conf); + +                gf_log ("DHT", GF_LOG_DEBUG, "conf->dthrottle: %s, " +                        "conf->defrag->recon_thread_count: %d", +                         conf->dthrottle, conf->defrag->recon_thread_count); +        } +          GF_OPTION_INIT ("xattr-name", conf->xattr_name, str, err);          gf_asprintf (&conf->link_xattr_name, "%s."DHT_LINKFILE_STR,                       conf->xattr_name); @@ -922,5 +967,17 @@ struct volume_options options[] = {            "subvolume to which it hashes"          }, +        { .key =  {"rebal-throttle"}, +          .type = GF_OPTION_TYPE_STR, +          .default_value = "normal", +          .description = " Sets the maximum number of parallel file migrations " +                         "allowed on a node during the rebalance operation. The" +                         " default value is normal and allows a max of " +                         "[($(processing units) - 4) / 2), 2]  files to be " +                         "migrated at a time. Lazy will allow only one file to " +                         "be migrated at a time and aggressive will allow " +                         "max of [($(processing units) - 4) / 2), 4]" +        }, +          { .key  = {NULL} },  }; diff --git a/xlators/mgmt/glusterd/src/glusterd-volume-set.c b/xlators/mgmt/glusterd/src/glusterd-volume-set.c index 9f751c90f3d..b5e65f00a0b 100644 --- a/xlators/mgmt/glusterd/src/glusterd-volume-set.c +++ b/xlators/mgmt/glusterd/src/glusterd-volume-set.c @@ -74,6 +74,35 @@ out:  }  static int +validate_defrag_throttle_option (glusterd_volinfo_t *volinfo, dict_t *dict, +                                 char *key, char *value, char **op_errstr) +{ +        char                 errstr[2048] = ""; +        glusterd_conf_t     *priv         = NULL; +        int                  ret          = 0; +        xlator_t            *this         = NULL; + +        this = THIS; +        GF_ASSERT (this); + +        if (!strcasecmp (value, "lazy") || +            !strcasecmp (value, "normal") || +            !strcasecmp (value, "aggressive")) { +                ret = 0; +        } else { +                ret = -1; +                snprintf (errstr, sizeof (errstr), "%s should be " +                          "{lazy|normal|aggressive}", key); +                gf_log (this->name, GF_LOG_ERROR, "%s", errstr); +                *op_errstr = gf_strdup (errstr); +        } + +        gf_log (this->name, GF_LOG_DEBUG, "Returning %d", ret); + +        return ret; +} + +static int  validate_quota (glusterd_volinfo_t *volinfo, dict_t *dict, char *key,                  char *value, char **op_errstr)  { @@ -357,6 +386,13 @@ struct volopt_map_entry glusterd_volopt_map[] = {            .op_version = GD_OP_VERSION_3_6_0,            .flags      = OPT_FLAG_CLIENT_OPT,          }, +        { .key         = "cluster.rebal-throttle", +          .voltype     = "cluster/distribute", +          .option      = "rebal-throttle", +          .op_version  = GD_OP_VERSION_3_7_0, +          .validate_fn = validate_defrag_throttle_option, +          .flags       = OPT_FLAG_CLIENT_OPT, +        },          /* NUFA xlator options (Distribute special case) */          { .key        = "cluster.nufa",            .voltype    = "cluster/distribute",  | 
