diff options
-rw-r--r-- | xlators/mgmt/glusterd/src/glusterd-volgen.c | 1 | ||||
-rw-r--r-- | xlators/performance/io-threads/src/io-threads.c | 82 | ||||
-rw-r--r-- | xlators/performance/io-threads/src/io-threads.h | 10 |
3 files changed, 91 insertions, 2 deletions
diff --git a/xlators/mgmt/glusterd/src/glusterd-volgen.c b/xlators/mgmt/glusterd/src/glusterd-volgen.c index 6ae32091a6c..09c5519fb6b 100644 --- a/xlators/mgmt/glusterd/src/glusterd-volgen.c +++ b/xlators/mgmt/glusterd/src/glusterd-volgen.c @@ -162,6 +162,7 @@ static struct volopt_map_entry glusterd_volopt_map[] = { {"performance.low-prio-threads", "performance/io-threads", NULL, NULL, DOC, 0}, {"performance.least-prio-threads", "performance/io-threads", NULL, NULL, DOC, 0}, {"performance.enable-least-priority", "performance/io-threads", NULL, NULL, DOC, 0}, + {"performance.least-rate-limit", "performance/io-threads", NULL, NULL, DOC, 0}, {"performance.disk-usage-limit", "performance/quota", NULL, NULL, NO_DOC, 0}, {"performance.min-free-disk-limit", "performance/quota", NULL, NULL, NO_DOC, 0}, {"performance.write-behind-window-size", "performance/write-behind", "cache-size", NULL, DOC}, diff --git a/xlators/performance/io-threads/src/io-threads.c b/xlators/performance/io-threads/src/io-threads.c index 7230c2eda42..5e1b9dcf8a8 100644 --- a/xlators/performance/io-threads/src/io-threads.c +++ b/xlators/performance/io-threads/src/io-threads.c @@ -30,16 +30,69 @@ int __iot_workers_scale (iot_conf_t *conf); struct volume_options options[]; call_stub_t * -__iot_dequeue (iot_conf_t *conf, int *pri) +__iot_dequeue (iot_conf_t *conf, int *pri, struct timespec *sleep) { call_stub_t *stub = NULL; int i = 0; + struct timeval curtv = {0,}, difftv = {0,}; *pri = -1; + sleep->tv_sec = 0; + sleep->tv_nsec = 0; for (i = 0; i < IOT_PRI_MAX; i++) { if (list_empty (&conf->reqs[i]) || (conf->ac_iot_count[i] >= conf->ac_iot_limit[i])) continue; + + if (i == IOT_PRI_LEAST) { + pthread_mutex_lock(&conf->throttle.lock); + if (!conf->throttle.sample_time.tv_sec) { + /* initialize */ + gettimeofday(&conf->throttle.sample_time, NULL); + } else { + /* + * Maintain a running count of least priority + * operations that are handled over a particular + * time interval. The count is provided via + * state dump and is used as a measure against + * least priority op throttling. + */ + gettimeofday(&curtv, NULL); + timersub(&curtv, &conf->throttle.sample_time, + &difftv); + if (difftv.tv_sec >= IOT_LEAST_THROTTLE_DELAY) { + conf->throttle.cached_rate = + conf->throttle.sample_cnt; + conf->throttle.sample_cnt = 0; + conf->throttle.sample_time = curtv; + } + + /* + * If we're over the configured rate limit, + * provide an absolute time to the caller that + * represents the soonest we're allowed to + * return another least priority request. + */ + if (conf->throttle.rate_limit && + conf->throttle.sample_cnt >= + conf->throttle.rate_limit) { + struct timeval delay; + delay.tv_sec = IOT_LEAST_THROTTLE_DELAY; + delay.tv_usec = 0; + + timeradd(&conf->throttle.sample_time, + &delay, &curtv); + TIMEVAL_TO_TIMESPEC(&curtv, sleep); + + pthread_mutex_unlock( + &conf->throttle.lock); + break; + } + } + conf->throttle.sample_cnt++; + pthread_mutex_unlock(&conf->throttle.lock); + } + stub = list_entry (conf->reqs[i].next, call_stub_t, list); conf->ac_iot_count[i]++; *pri = i; @@ -83,6 +136,7 @@ iot_worker (void *data) int pri = -1; char timeout = 0; char bye = 0; + struct timespec sleep = {0,}; conf = data; this = conf->this; @@ -123,7 +177,13 @@ iot_worker (void *data) } } - stub = __iot_dequeue (conf, &pri); + stub = __iot_dequeue (conf, &pri, &sleep); + if (!stub && (sleep.tv_sec || sleep.tv_nsec)) { + pthread_cond_timedwait(&conf->cond, + &conf->mutex, &sleep); + pthread_mutex_unlock(&conf->mutex); + continue; + } } pthread_mutex_unlock (&conf->mutex); @@ -2471,6 +2531,9 @@ reconfigure (xlator_t *this, dict_t *options) GF_OPTION_RECONF ("enable-least-priority", conf->least_priority, options, bool, out); + GF_OPTION_RECONF("least-rate-limit", conf->throttle.rate_limit, options, + int32, out); + ret = 0; out: return ret; @@ -2535,6 +2598,14 @@ init (xlator_t *this) GF_OPTION_INIT ("enable-least-priority", conf->least_priority, bool, out); + GF_OPTION_INIT("least-rate-limit", conf->throttle.rate_limit, int32, + out); + if ((ret = pthread_mutex_init(&conf->throttle.lock, NULL)) != 0) { + gf_log (this->name, GF_LOG_ERROR, + "pthread_mutex_init failed (%d)", ret); + goto out; + } + conf->this = this; for (i = 0; i < IOT_PRI_MAX; i++) { @@ -2674,6 +2745,13 @@ struct volume_options options[] = { .max = 0x7fffffff, .default_value = "120", }, + {.key = {"least-rate-limit"}, + .type = GF_OPTION_TYPE_INT, + .min = 0, + .default_value = "0", + .description = "Max number of least priority operations to handle " + "per-second" + }, { .key = {NULL}, }, }; diff --git a/xlators/performance/io-threads/src/io-threads.h b/xlators/performance/io-threads/src/io-threads.h index feac5ae73fe..e5bf4d95087 100644 --- a/xlators/performance/io-threads/src/io-threads.h +++ b/xlators/performance/io-threads/src/io-threads.h @@ -52,6 +52,14 @@ typedef enum { IOT_PRI_MAX, } iot_pri_t; +#define IOT_LEAST_THROTTLE_DELAY 1 /* sample interval in seconds */ +struct iot_least_throttle { + struct timeval sample_time; /* timestamp of current sample */ + uint32_t sample_cnt; /* sample count for active interval */ + uint32_t cached_rate; /* the most recently measured rate */ + int32_t rate_limit; /* user-specified rate limit */ + pthread_mutex_t lock; +}; struct iot_conf { pthread_mutex_t mutex; @@ -73,6 +81,8 @@ struct iot_conf { gf_boolean_t least_priority; /*Enable/Disable least-priority */ xlator_t *this; + + struct iot_least_throttle throttle; }; typedef struct iot_conf iot_conf_t; |