summaryrefslogtreecommitdiffstats
path: root/xlators
diff options
context:
space:
mode:
Diffstat (limited to 'xlators')
-rw-r--r--xlators/mgmt/glusterd/src/glusterd-volgen.c1
-rw-r--r--xlators/performance/io-threads/src/io-threads.c86
-rw-r--r--xlators/performance/io-threads/src/io-threads.h10
3 files changed, 95 insertions, 2 deletions
diff --git a/xlators/mgmt/glusterd/src/glusterd-volgen.c b/xlators/mgmt/glusterd/src/glusterd-volgen.c
index a8c71e29c4c..6f7b3034cbf 100644
--- a/xlators/mgmt/glusterd/src/glusterd-volgen.c
+++ b/xlators/mgmt/glusterd/src/glusterd-volgen.c
@@ -158,6 +158,7 @@ static struct volopt_map_entry glusterd_volopt_map[] = {
{"performance.low-prio-threads", "performance/io-threads", NULL, NULL, DOC, 0, 1},
{"performance.least-prio-threads", "performance/io-threads", NULL, NULL, DOC, 0, 1},
{"performance.enable-least-priority", "performance/io-threads", NULL, NULL, DOC, 0, 2},
+ {"performance.least-rate-limit", "performance/io-threads", NULL, NULL, DOC, 0, 1},
/* Other perf xlators' options */
{"performance.cache-size", "performance/quick-read", NULL, NULL, NO_DOC, 0, 1},
diff --git a/xlators/performance/io-threads/src/io-threads.c b/xlators/performance/io-threads/src/io-threads.c
index 2a5cec7edc2..dbf1929e897 100644
--- a/xlators/performance/io-threads/src/io-threads.c
+++ b/xlators/performance/io-threads/src/io-threads.c
@@ -30,16 +30,69 @@ int __iot_workers_scale (iot_conf_t *conf);
struct volume_options options[];
call_stub_t *
-__iot_dequeue (iot_conf_t *conf, int *pri)
+__iot_dequeue (iot_conf_t *conf, int *pri, struct timespec *sleep)
{
call_stub_t *stub = NULL;
int i = 0;
+ struct timeval curtv = {0,}, difftv = {0,};
*pri = -1;
+ sleep->tv_sec = 0;
+ sleep->tv_nsec = 0;
for (i = 0; i < IOT_PRI_MAX; i++) {
if (list_empty (&conf->reqs[i]) ||
(conf->ac_iot_count[i] >= conf->ac_iot_limit[i]))
continue;
+
+ if (i == IOT_PRI_LEAST) {
+ pthread_mutex_lock(&conf->throttle.lock);
+ if (!conf->throttle.sample_time.tv_sec) {
+ /* initialize */
+ gettimeofday(&conf->throttle.sample_time, NULL);
+ } else {
+ /*
+ * Maintain a running count of least priority
+ * operations that are handled over a particular
+ * time interval. The count is provided via
+ * state dump and is used as a measure against
+ * least priority op throttling.
+ */
+ gettimeofday(&curtv, NULL);
+ timersub(&curtv, &conf->throttle.sample_time,
+ &difftv);
+ if (difftv.tv_sec >= IOT_LEAST_THROTTLE_DELAY) {
+ conf->throttle.cached_rate =
+ conf->throttle.sample_cnt;
+ conf->throttle.sample_cnt = 0;
+ conf->throttle.sample_time = curtv;
+ }
+
+ /*
+ * If we're over the configured rate limit,
+ * provide an absolute time to the caller that
+ * represents the soonest we're allowed to
+ * return another least priority request.
+ */
+ if (conf->throttle.rate_limit &&
+ conf->throttle.sample_cnt >=
+ conf->throttle.rate_limit) {
+ struct timeval delay;
+ delay.tv_sec = IOT_LEAST_THROTTLE_DELAY;
+ delay.tv_usec = 0;
+
+ timeradd(&conf->throttle.sample_time,
+ &delay, &curtv);
+ TIMEVAL_TO_TIMESPEC(&curtv, sleep);
+
+ pthread_mutex_unlock(
+ &conf->throttle.lock);
+ break;
+ }
+ }
+ conf->throttle.sample_cnt++;
+ pthread_mutex_unlock(&conf->throttle.lock);
+ }
+
stub = list_entry (conf->reqs[i].next, call_stub_t, list);
conf->ac_iot_count[i]++;
*pri = i;
@@ -83,6 +136,7 @@ iot_worker (void *data)
int pri = -1;
char timeout = 0;
char bye = 0;
+ struct timespec sleep = {0,};
conf = data;
this = conf->this;
@@ -123,7 +177,13 @@ iot_worker (void *data)
}
}
- stub = __iot_dequeue (conf, &pri);
+ stub = __iot_dequeue (conf, &pri, &sleep);
+ if (!stub && (sleep.tv_sec || sleep.tv_nsec)) {
+ pthread_cond_timedwait(&conf->cond,
+ &conf->mutex, &sleep);
+ pthread_mutex_unlock(&conf->mutex);
+ continue;
+ }
}
pthread_mutex_unlock (&conf->mutex);
@@ -2485,6 +2545,10 @@ iot_priv_dump (xlator_t *this)
gf_proc_dump_write("least_priority_threads", "%d",
conf->ac_iot_limit[IOT_PRI_LEAST]);
+ gf_proc_dump_write("cached least rate", "%u",
+ conf->throttle.cached_rate);
+ gf_proc_dump_write("least rate limit", "%u", conf->throttle.rate_limit);
+
return 0;
}
@@ -2516,6 +2580,9 @@ reconfigure (xlator_t *this, dict_t *options)
GF_OPTION_RECONF ("enable-least-priority", conf->least_priority,
options, bool, out);
+ GF_OPTION_RECONF("least-rate-limit", conf->throttle.rate_limit, options,
+ int32, out);
+
ret = 0;
out:
return ret;
@@ -2580,6 +2647,14 @@ init (xlator_t *this)
GF_OPTION_INIT ("enable-least-priority", conf->least_priority,
bool, out);
+ GF_OPTION_INIT("least-rate-limit", conf->throttle.rate_limit, int32,
+ out);
+ if ((ret = pthread_mutex_init(&conf->throttle.lock, NULL)) != 0) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "pthread_mutex_init failed (%d)", ret);
+ goto out;
+ }
+
conf->this = this;
for (i = 0; i < IOT_PRI_MAX; i++) {
@@ -2722,6 +2797,13 @@ struct volume_options options[] = {
.max = 0x7fffffff,
.default_value = "120",
},
+ {.key = {"least-rate-limit"},
+ .type = GF_OPTION_TYPE_INT,
+ .min = 0,
+ .default_value = "0",
+ .description = "Max number of least priority operations to handle "
+ "per-second"
+ },
{ .key = {NULL},
},
};
diff --git a/xlators/performance/io-threads/src/io-threads.h b/xlators/performance/io-threads/src/io-threads.h
index a6b640884d0..1a9dee9ae2c 100644
--- a/xlators/performance/io-threads/src/io-threads.h
+++ b/xlators/performance/io-threads/src/io-threads.h
@@ -53,6 +53,14 @@ typedef enum {
IOT_PRI_MAX,
} iot_pri_t;
+#define IOT_LEAST_THROTTLE_DELAY 1 /* sample interval in seconds */
+struct iot_least_throttle {
+ struct timeval sample_time; /* timestamp of current sample */
+ uint32_t sample_cnt; /* sample count for active interval */
+ uint32_t cached_rate; /* the most recently measured rate */
+ int32_t rate_limit; /* user-specified rate limit */
+ pthread_mutex_t lock;
+};
struct iot_conf {
pthread_mutex_t mutex;
@@ -75,6 +83,8 @@ struct iot_conf {
xlator_t *this;
size_t stack_size;
+
+ struct iot_least_throttle throttle;
};
typedef struct iot_conf iot_conf_t;