summaryrefslogtreecommitdiffstats
path: root/xlators/performance/io-threads/src/io-threads.c
diff options
context:
space:
mode:
authorJeff Darcy <jdarcy@redhat.com>2016-07-22 13:11:15 -0400
committerJeff Darcy <jdarcy@redhat.com>2016-07-28 11:52:06 -0700
commit1faf452818fe51fae1f07d17afab910a5e0da0cd (patch)
tree4731e6094e8a31625481567527986248e588b4d9 /xlators/performance/io-threads/src/io-threads.c
parent25d1bdc094a8408afc074817ec6d63c812c571f9 (diff)
io-threads: distribute work fairly among clients
This is the full "queue of queues" approach where each client gets its own queue (per priority) and we round-robin among them. Change-Id: I73955d1b9bb93f2ff781b48dfe2509009c519ec6 BUG: 1360402 Signed-off-by: Jeff Darcy <jdarcy@redhat.com> Reviewed-on: http://review.gluster.org/14904 NetBSD-regression: NetBSD Build System <jenkins@build.gluster.org> CentOS-regression: Gluster Build System <jenkins@build.gluster.org> Smoke: Gluster Build System <jenkins@build.gluster.org>
Diffstat (limited to 'xlators/performance/io-threads/src/io-threads.c')
-rw-r--r--xlators/performance/io-threads/src/io-threads.c103
1 files changed, 92 insertions, 11 deletions
diff --git a/xlators/performance/io-threads/src/io-threads.c b/xlators/performance/io-threads/src/io-threads.c
index c0bcea48e3d..c6a18fdc0b3 100644
--- a/xlators/performance/io-threads/src/io-threads.c
+++ b/xlators/performance/io-threads/src/io-threads.c
@@ -48,19 +48,68 @@ struct volume_options options[];
} \
} while (0)
+iot_client_ctx_t *
+iot_get_ctx (xlator_t *this, client_t *client)
+{
+ iot_client_ctx_t *ctx = NULL;
+ int i;
+
+ if (client_ctx_get (client, this, (void **)&ctx) != 0) {
+ ctx = GF_CALLOC (IOT_PRI_MAX, sizeof(*ctx),
+ gf_iot_mt_client_ctx_t);
+ if (ctx) {
+ for (i = 0; i < IOT_PRI_MAX; ++i) {
+ INIT_LIST_HEAD (&ctx[i].clients);
+ INIT_LIST_HEAD (&ctx[i].reqs);
+ }
+ if (client_ctx_set (client, this, ctx) != 0) {
+ GF_FREE (ctx);
+ ctx = NULL;
+ }
+ }
+ }
+
+ return ctx;
+}
+
call_stub_t *
__iot_dequeue (iot_conf_t *conf, int *pri)
{
- call_stub_t *stub = NULL;
- int i = 0;
+ call_stub_t *stub = NULL;
+ int i = 0;
+ iot_client_ctx_t *ctx;
*pri = -1;
for (i = 0; i < IOT_PRI_MAX; i++) {
- if (list_empty (&conf->reqs[i]) ||
- (conf->ac_iot_count[i] >= conf->ac_iot_limit[i]))
+
+ if (conf->ac_iot_count[i] >= conf->ac_iot_limit[i]) {
+ continue;
+ }
+
+ if (list_empty (&conf->clients[i])) {
+ continue;
+ }
+
+ /* Get the first per-client queue for this priority. */
+ ctx = list_first_entry (&conf->clients[i],
+ iot_client_ctx_t, clients);
+ if (!ctx) {
continue;
+ }
+
+ if (list_empty (&ctx->reqs)) {
+ continue;
+ }
+
+ /* Get the first request on that queue. */
+ stub = list_first_entry (&ctx->reqs, call_stub_t, list);
+ list_del_init (&stub->list);
+ if (list_empty (&ctx->reqs)) {
+ list_del_init (&ctx->clients);
+ } else {
+ list_rotate_left (&conf->clients[i]);
+ }
- stub = list_entry (conf->reqs[i].next, call_stub_t, list);
conf->ac_iot_count[i]++;
*pri = i;
break;
@@ -71,7 +120,6 @@ __iot_dequeue (iot_conf_t *conf, int *pri)
conf->queue_size--;
conf->queue_sizes[*pri]--;
- list_del_init (&stub->list);
return stub;
}
@@ -80,15 +128,31 @@ __iot_dequeue (iot_conf_t *conf, int *pri)
void
__iot_enqueue (iot_conf_t *conf, call_stub_t *stub, int pri)
{
+ client_t *client = stub->frame->root->client;
+ iot_client_ctx_t *ctx;
+
if (pri < 0 || pri >= IOT_PRI_MAX)
pri = IOT_PRI_MAX-1;
- list_add_tail (&stub->list, &conf->reqs[pri]);
+ if (client) {
+ ctx = iot_get_ctx (THIS, client);
+ if (ctx) {
+ ctx = &ctx[pri];
+ }
+ } else {
+ ctx = NULL;
+ }
+ if (!ctx) {
+ ctx = &conf->no_client[pri];
+ }
+
+ if (list_empty (&ctx->reqs)) {
+ list_add_tail (&ctx->clients, &conf->clients[pri]);
+ }
+ list_add_tail (&stub->list, &ctx->reqs);
conf->queue_size++;
conf->queue_sizes[pri]++;
-
- return;
}
@@ -958,7 +1022,9 @@ init (xlator_t *this)
conf->this = this;
for (i = 0; i < IOT_PRI_MAX; i++) {
- INIT_LIST_HEAD (&conf->reqs[i]);
+ INIT_LIST_HEAD (&conf->clients[i]);
+ INIT_LIST_HEAD (&conf->no_client[i].clients);
+ INIT_LIST_HEAD (&conf->no_client[i].reqs);
}
ret = iot_workers_scale (conf);
@@ -991,6 +1057,19 @@ fini (xlator_t *this)
return;
}
+int
+iot_client_destroy (xlator_t *this, client_t *client)
+{
+ void *tmp = NULL;
+
+ if (client_ctx_del (client, this, &tmp) == 0) {
+ GF_FREE (tmp);
+ }
+
+ return 0;
+}
+
+
struct xlator_dumpops dumpops = {
.priv = iot_priv_dump,
};
@@ -1046,7 +1125,9 @@ struct xlator_fops fops = {
.setactivelk = iot_setactivelk,
};
-struct xlator_cbks cbks;
+struct xlator_cbks cbks = {
+ .client_destroy = iot_client_destroy,
+};
struct volume_options options[] = {
{ .key = {"thread-count"},