diff options
Diffstat (limited to 'xlators/cluster')
| -rw-r--r-- | xlators/cluster/dht/src/dht-common.h | 21 | ||||
| -rw-r--r-- | xlators/cluster/dht/src/dht-rebalance.c | 69 | ||||
| -rw-r--r-- | xlators/cluster/dht/src/tier.c | 153 | 
3 files changed, 167 insertions, 76 deletions
diff --git a/xlators/cluster/dht/src/dht-common.h b/xlators/cluster/dht/src/dht-common.h index 0f06a4a6670..5fa97a41881 100644 --- a/xlators/cluster/dht/src/dht-common.h +++ b/xlators/cluster/dht/src/dht-common.h @@ -345,6 +345,12 @@ typedef enum tier_mode_ {          TIER_MODE_WM  } tier_mode_t; +typedef enum tier_pause_state_ { +        TIER_RUNNING = 0, +        TIER_REQUEST_PAUSE, +        TIER_PAUSED +} tier_pause_state_t; +  typedef struct gf_tier_conf {          int                          is_tier;          int                          watermark_hi; @@ -360,11 +366,12 @@ typedef struct gf_tier_conf {          int                          tier_demote_frequency;          uint64_t                     st_last_promoted_size;          uint64_t                     st_last_demoted_size; -        int                          request_pause; -        gf_boolean_t                 paused; +        tier_pause_state_t           pause_state;          struct synctask             *pause_synctask;          gf_timer_t                  *pause_timer;          pthread_mutex_t              pause_mutex; +        int                          promote_in_progress; +        int                          demote_in_progress;  } gf_tier_conf_t;  struct gf_defrag_info_ { @@ -1000,11 +1007,17 @@ int dht_newfile_cbk (call_frame_t *frame, void *cookie, xlator_t *this,  int  gf_defrag_status_get (gf_defrag_info_t *defrag, dict_t *dict); +void +gf_defrag_set_pause_state (gf_tier_conf_t *tier_conf, tier_pause_state_t state); + +tier_pause_state_t +gf_defrag_get_pause_state (gf_tier_conf_t *tier_conf); +  int  gf_defrag_pause_tier (xlator_t *this, gf_defrag_info_t *defrag); -void -gf_defrag_wake_pause_tier (gf_tier_conf_t *defrag, gf_boolean_t pause); +tier_pause_state_t +gf_defrag_check_pause_tier (gf_tier_conf_t *defrag);  int  gf_defrag_resume_tier (xlator_t *this, gf_defrag_info_t *defrag); diff --git a/xlators/cluster/dht/src/dht-rebalance.c b/xlators/cluster/dht/src/dht-rebalance.c index 3698b340fef..e28bb76be66 100644 --- a/xlators/cluster/dht/src/dht-rebalance.c +++ b/xlators/cluster/dht/src/dht-rebalance.c @@ -833,7 +833,7 @@ __tier_migrate_data (gf_defrag_info_t *defrag, xlator_t *from, xlator_t *to, fd_                  else                          ret = syncop_writev (to, dst, vector, count,                                               offset, iobref, 0, NULL, NULL); -                if (defrag->tier_conf.request_pause) { +                if (gf_defrag_get_pause_state (&defrag->tier_conf) != TIER_RUNNING) {                          gf_msg ("tier", GF_LOG_INFO, 0,                                  DHT_MSG_TIER_PAUSED,                                  "Migrate file paused"); @@ -3507,23 +3507,60 @@ out:  }  void -gf_defrag_wake_pause_tier (gf_tier_conf_t *tier_conf, gf_boolean_t pause) +gf_defrag_set_pause_state (gf_tier_conf_t *tier_conf, tier_pause_state_t state) +{ +        pthread_mutex_lock (&tier_conf->pause_mutex); +        tier_conf->pause_state = state; +        pthread_mutex_unlock (&tier_conf->pause_mutex); +} + +tier_pause_state_t +gf_defrag_get_pause_state (gf_tier_conf_t *tier_conf) +{ +        int state; + +        pthread_mutex_lock (&tier_conf->pause_mutex); +        state = tier_conf->pause_state; +        pthread_mutex_unlock (&tier_conf->pause_mutex); + +        return state; +} + +tier_pause_state_t +gf_defrag_check_pause_tier (gf_tier_conf_t *tier_conf)  {          int woke = 0; +        int state  = -1;          pthread_mutex_lock (&tier_conf->pause_mutex); + +        if (tier_conf->pause_state == TIER_RUNNING) +                goto out; + +        if (tier_conf->pause_state == TIER_PAUSED) +                goto out; + +        if (tier_conf->promote_in_progress || +            tier_conf->demote_in_progress) +                goto out; + +        tier_conf->pause_state = TIER_PAUSED; +          if (tier_conf->pause_synctask) { -                tier_conf->paused = pause;                  synctask_wake (tier_conf->pause_synctask);                  tier_conf->pause_synctask = 0;                  woke = 1;          } -        pthread_mutex_unlock (&tier_conf->pause_mutex); -        tier_conf->request_pause = 0;          gf_msg ("tier", GF_LOG_DEBUG, 0,                  DHT_MSG_TIER_PAUSED, -                "woken %d paused %d", woke, tier_conf->paused); +                "woken %d", woke); +out: +        state = tier_conf->pause_state; + +        pthread_mutex_unlock (&tier_conf->pause_mutex); + +        return state;  }  void @@ -3546,7 +3583,7 @@ gf_defrag_pause_tier_timeout (void *data)                  DHT_MSG_TIER_PAUSED,                  "Request pause timer timeout"); -        gf_defrag_wake_pause_tier (&defrag->tier_conf, _gf_false); +        gf_defrag_check_pause_tier (&defrag->tier_conf);  out:          return; @@ -3564,12 +3601,16 @@ gf_defrag_pause_tier (xlator_t *this, gf_defrag_info_t *defrag)          /*           * Set flag requesting to pause tiering. Wait 'delay' seconds for -         * tiering to actually stop as indicated by the "paused" boolean, +         * tiering to actually stop as indicated by the pause state           * before returning success or failure.           */ -        defrag->tier_conf.request_pause = 1; +        gf_defrag_set_pause_state (&defrag->tier_conf, TIER_REQUEST_PAUSE); -        if (defrag->tier_conf.paused == _gf_true) +        /* +         * If migration is not underway, can pause immediately. +         */ +        gf_defrag_check_pause_tier (&defrag->tier_conf); +        if (gf_defrag_get_pause_state (&defrag->tier_conf) == TIER_PAUSED)                  goto out;          gf_msg (this->name, GF_LOG_DEBUG, 0, @@ -3586,11 +3627,12 @@ gf_defrag_pause_tier (xlator_t *this, gf_defrag_info_t *defrag)          synctask_yield (defrag->tier_conf.pause_synctask); -        if (defrag->tier_conf.paused == _gf_true) +        if (gf_defrag_get_pause_state (&defrag->tier_conf) == TIER_PAUSED)                  goto out; -        ret = -1; +        gf_defrag_set_pause_state (&defrag->tier_conf, TIER_RUNNING); +        ret = -1;  out:          gf_msg (this->name, GF_LOG_DEBUG, 0, @@ -3607,8 +3649,7 @@ gf_defrag_resume_tier (xlator_t *this, gf_defrag_info_t *defrag)                  DHT_MSG_TIER_RESUME,                  "Pause end. Resume tiering"); -        defrag->tier_conf.request_pause = 0; -        defrag->tier_conf.paused = _gf_false; +        gf_defrag_set_pause_state (&defrag->tier_conf, TIER_RUNNING);          return 0;  } diff --git a/xlators/cluster/dht/src/tier.c b/xlators/cluster/dht/src/tier.c index 21410dd30dc..8353cdafb60 100644 --- a/xlators/cluster/dht/src/tier.c +++ b/xlators/cluster/dht/src/tier.c @@ -241,51 +241,6 @@ out:  }  int -tier_do_migration (xlator_t *this, int promote) -{ -        gf_defrag_info_t       *defrag = NULL; -        dht_conf_t             *conf   = NULL; -        long                    rand = 0; -        int                     migrate = 0; -        gf_tier_conf_t         *tier_conf = NULL; - -        conf = this->private; -        if (!conf) -                goto exit; - -        defrag = conf->defrag; -        if (!defrag) -                goto exit; - -        if (defrag->tier_conf.mode != TIER_MODE_WM) { -                migrate = 1; -                goto exit; -        } - -        tier_conf = &defrag->tier_conf; - -        switch (tier_conf->watermark_last) { -        case TIER_WM_LOW: -                migrate = promote ? 1 : 0; -                break; -        case TIER_WM_HI: -                migrate = promote ? 0 : 1; -                break; -        case TIER_WM_MID: -                rand = random() % 100; -                if (promote) { -                        migrate = (rand > tier_conf->percent_full); -                } else { -                        migrate = (rand <= tier_conf->percent_full); -                } -                break; -        } - -exit: -        return migrate; -} - -int  tier_check_watermark (xlator_t *this, loc_t *root_loc)  {          tier_watermark_op_t     wm = TIER_WM_NONE; @@ -377,6 +332,85 @@ exit:          return ret;  } +int +tier_do_migration (xlator_t *this, int promote, loc_t *root_loc) +{ +        gf_defrag_info_t       *defrag = NULL; +        dht_conf_t             *conf   = NULL; +        long                    rand = 0; +        int                     migrate = 0; +        gf_tier_conf_t         *tier_conf = NULL; + +        conf = this->private; +        if (!conf) +                goto exit; + +        defrag = conf->defrag; +        if (!defrag) +                goto exit; + +        if (defrag->tier_conf.mode != TIER_MODE_WM) { +                migrate = 1; +                goto exit; +        } + +        if (tier_check_watermark (this, root_loc) != 0) { +                gf_msg (this->name, GF_LOG_CRITICAL, errno, +                        DHT_MSG_LOG_TIER_ERROR, +                        "Failed to get watermark"); +                goto exit; +        } + +        tier_conf = &defrag->tier_conf; + +        switch (tier_conf->watermark_last) { +        case TIER_WM_LOW: +                migrate = promote ? 1 : 0; +                break; +        case TIER_WM_HI: +                migrate = promote ? 0 : 1; +                break; +        case TIER_WM_MID: +                rand = random() % 100; +                if (promote) { +                        migrate = (rand > tier_conf->percent_full); +                } else { +                        migrate = (rand <= tier_conf->percent_full); +                } +                break; +        } + +exit: +        return migrate; +} + +int +tier_migrate (xlator_t *this, int is_promotion, dict_t *migrate_data, +              loc_t *loc, gf_tier_conf_t *tier_conf) +{ +        int ret = -1; + +        pthread_mutex_lock (&tier_conf->pause_mutex); +        if (is_promotion) +                tier_conf->promote_in_progress = 1; +        else +                tier_conf->demote_in_progress = 1; +        pthread_mutex_unlock (&tier_conf->pause_mutex); + +        /* Data migration */ +        ret = syncop_setxattr (this, loc, migrate_data, 0, +                               NULL, NULL); + +        pthread_mutex_lock (&tier_conf->pause_mutex); +        if (is_promotion) +                tier_conf->promote_in_progress = 0; +        else +                tier_conf->demote_in_progress = 0; +        pthread_mutex_unlock (&tier_conf->pause_mutex); + +        return ret; +} +  static int  tier_migrate_using_query_file (void *_args)  { @@ -408,6 +442,7 @@ tier_migrate_using_query_file (void *_args)          dht_conf_t   *conf                      = NULL;          uint64_t total_migrated_bytes           = 0;          int total_files                         = 0; +        loc_t root_loc                          = { 0 };          GF_VALIDATE_OR_GOTO ("tier", query_cbk_args, out);          GF_VALIDATE_OR_GOTO ("tier", query_cbk_args->this, out); @@ -420,6 +455,8 @@ tier_migrate_using_query_file (void *_args)          defrag = query_cbk_args->defrag; +        dht_build_root_loc (defrag->root_inode, &root_loc); +          migrate_data = dict_new ();          if (!migrate_data)                  goto out; @@ -461,7 +498,8 @@ tier_migrate_using_query_file (void *_args)                  dict_del (migrate_data, "from.migrator"); -                if (defrag->tier_conf.request_pause) { +                if (gf_defrag_get_pause_state (&defrag->tier_conf) +                    != TIER_RUNNING) {                          gf_msg (this->name, GF_LOG_INFO, 0,                                  DHT_MSG_LOG_TIER_STATUS,                                  "Tiering paused. " @@ -469,7 +507,7 @@ tier_migrate_using_query_file (void *_args)                          break;                  } -                if (!tier_do_migration (this, query_cbk_args->is_promotion)) { +                if (!tier_do_migration (this, query_cbk_args->is_promotion, &root_loc)) {                          gfdb_methods.gfdb_query_record_free (query_record);                          query_record = NULL;                          continue; @@ -653,7 +691,8 @@ tier_migrate_using_query_file (void *_args)                          gf_uuid_copy (loc.gfid, loc.inode->gfid); -                        if (defrag->tier_conf.request_pause) { +                        if (gf_defrag_get_pause_state (&defrag->tier_conf) +                            != TIER_RUNNING) {                                  gf_msg (this->name, GF_LOG_INFO, 0,                                          DHT_MSG_LOG_TIER_STATUS,                                          "Tiering paused. " @@ -662,9 +701,9 @@ tier_migrate_using_query_file (void *_args)                                  goto abort;                          } -                        /* Data migration */ -                        ret = syncop_setxattr (this, &loc, migrate_data, 0, -                                               NULL, NULL); +                        ret = tier_migrate (this, query_cbk_args->is_promotion, +                                            migrate_data, &loc, &defrag->tier_conf); +                          if (ret) {                                  gf_msg (this->name, GF_LOG_ERROR, -ret,                                          DHT_MSG_LOG_TIER_ERROR, "Failed to " @@ -1639,8 +1678,7 @@ tier_start (xlator_t *this, gf_defrag_info_t *defrag)                          goto out;                  } -                if (tier_conf->request_pause) -                        gf_defrag_wake_pause_tier (tier_conf, _gf_true); +                gf_defrag_check_pause_tier (tier_conf);                  sleep(1); @@ -1664,8 +1702,7 @@ tier_start (xlator_t *this, gf_defrag_info_t *defrag)                          goto out;                  } -                if ((defrag->tier_conf.paused) || -                    (defrag->tier_conf.request_pause)) +                if (gf_defrag_get_pause_state (&defrag->tier_conf) != TIER_RUNNING)                          continue; @@ -2069,15 +2106,15 @@ tier_init (xlator_t *this)                  defrag->tier_conf.mode = ret;          } -        defrag->tier_conf.request_pause = 0; -          pthread_mutex_init (&defrag->tier_conf.pause_mutex, 0); +        gf_defrag_set_pause_state (&defrag->tier_conf, TIER_RUNNING); +          ret = dict_get_str (this->options,                                "tier-pause", &paused);          if (paused && strcmp (paused, "on") == 0) -                defrag->tier_conf.request_pause = 1; +                gf_defrag_set_pause_state (&defrag->tier_conf, TIER_REQUEST_PAUSE);          ret = gf_asprintf(&voldir, "%s/%s",                            DEFAULT_VAR_RUN_DIRECTORY,  | 
