summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJeff Darcy <jdarcy@redhat.com>2016-07-22 13:11:15 -0400
committerJeff Darcy <jdarcy@redhat.com>2016-07-25 08:58:49 -0400
commit375eb14b6e0594a0a2b7abe96bcd2d9ad626d516 (patch)
treede16a6e68d07a98b87185b2fe0d32cc85907832a
parentb44305104d262ffd3b07345ce7867f9b0d0ed8f0 (diff)
io-threads: distribute work fairly among clientsround-robin2
This is the full "queue of queues" approach where each client gets its own queue (per priority) and we round-robin among them. Change-Id: I73955d1b9bb93f2ff781b48dfe2509009c519ec6 Signed-off-by: Jeff Darcy <jdarcy@redhat.com>
-rw-r--r--xlators/performance/io-threads/src/io-threads.c103
-rw-r--r--xlators/performance/io-threads/src/io-threads.h16
-rw-r--r--xlators/performance/io-threads/src/iot-mem-types.h1
3 files changed, 107 insertions, 13 deletions
diff --git a/xlators/performance/io-threads/src/io-threads.c b/xlators/performance/io-threads/src/io-threads.c
index c0bcea4..c6a18fd 100644
--- a/xlators/performance/io-threads/src/io-threads.c
+++ b/xlators/performance/io-threads/src/io-threads.c
@@ -48,19 +48,68 @@ struct volume_options options[];
} \
} while (0)
+iot_client_ctx_t *
+iot_get_ctx (xlator_t *this, client_t *client)
+{
+ iot_client_ctx_t *ctx = NULL;
+ int i;
+
+ if (client_ctx_get (client, this, (void **)&ctx) != 0) {
+ ctx = GF_CALLOC (IOT_PRI_MAX, sizeof(*ctx),
+ gf_iot_mt_client_ctx_t);
+ if (ctx) {
+ for (i = 0; i < IOT_PRI_MAX; ++i) {
+ INIT_LIST_HEAD (&ctx[i].clients);
+ INIT_LIST_HEAD (&ctx[i].reqs);
+ }
+ if (client_ctx_set (client, this, ctx) != 0) {
+ GF_FREE (ctx);
+ ctx = NULL;
+ }
+ }
+ }
+
+ return ctx;
+}
+
call_stub_t *
__iot_dequeue (iot_conf_t *conf, int *pri)
{
- call_stub_t *stub = NULL;
- int i = 0;
+ call_stub_t *stub = NULL;
+ int i = 0;
+ iot_client_ctx_t *ctx;
*pri = -1;
for (i = 0; i < IOT_PRI_MAX; i++) {
- if (list_empty (&conf->reqs[i]) ||
- (conf->ac_iot_count[i] >= conf->ac_iot_limit[i]))
+
+ if (conf->ac_iot_count[i] >= conf->ac_iot_limit[i]) {
+ continue;
+ }
+
+ if (list_empty (&conf->clients[i])) {
+ continue;
+ }
+
+ /* Get the first per-client queue for this priority. */
+ ctx = list_first_entry (&conf->clients[i],
+ iot_client_ctx_t, clients);
+ if (!ctx) {
continue;
+ }
+
+ if (list_empty (&ctx->reqs)) {
+ continue;
+ }
+
+ /* Get the first request on that queue. */
+ stub = list_first_entry (&ctx->reqs, call_stub_t, list);
+ list_del_init (&stub->list);
+ if (list_empty (&ctx->reqs)) {
+ list_del_init (&ctx->clients);
+ } else {
+ list_rotate_left (&conf->clients[i]);
+ }
- stub = list_entry (conf->reqs[i].next, call_stub_t, list);
conf->ac_iot_count[i]++;
*pri = i;
break;
@@ -71,7 +120,6 @@ __iot_dequeue (iot_conf_t *conf, int *pri)
conf->queue_size--;
conf->queue_sizes[*pri]--;
- list_del_init (&stub->list);
return stub;
}
@@ -80,15 +128,31 @@ __iot_dequeue (iot_conf_t *conf, int *pri)
void
__iot_enqueue (iot_conf_t *conf, call_stub_t *stub, int pri)
{
+ client_t *client = stub->frame->root->client;
+ iot_client_ctx_t *ctx;
+
if (pri < 0 || pri >= IOT_PRI_MAX)
pri = IOT_PRI_MAX-1;
- list_add_tail (&stub->list, &conf->reqs[pri]);
+ if (client) {
+ ctx = iot_get_ctx (THIS, client);
+ if (ctx) {
+ ctx = &ctx[pri];
+ }
+ } else {
+ ctx = NULL;
+ }
+ if (!ctx) {
+ ctx = &conf->no_client[pri];
+ }
+
+ if (list_empty (&ctx->reqs)) {
+ list_add_tail (&ctx->clients, &conf->clients[pri]);
+ }
+ list_add_tail (&stub->list, &ctx->reqs);
conf->queue_size++;
conf->queue_sizes[pri]++;
-
- return;
}
@@ -958,7 +1022,9 @@ init (xlator_t *this)
conf->this = this;
for (i = 0; i < IOT_PRI_MAX; i++) {
- INIT_LIST_HEAD (&conf->reqs[i]);
+ INIT_LIST_HEAD (&conf->clients[i]);
+ INIT_LIST_HEAD (&conf->no_client[i].clients);
+ INIT_LIST_HEAD (&conf->no_client[i].reqs);
}
ret = iot_workers_scale (conf);
@@ -991,6 +1057,19 @@ fini (xlator_t *this)
return;
}
+int
+iot_client_destroy (xlator_t *this, client_t *client)
+{
+ void *tmp = NULL;
+
+ if (client_ctx_del (client, this, &tmp) == 0) {
+ GF_FREE (tmp);
+ }
+
+ return 0;
+}
+
+
struct xlator_dumpops dumpops = {
.priv = iot_priv_dump,
};
@@ -1046,7 +1125,9 @@ struct xlator_fops fops = {
.setactivelk = iot_setactivelk,
};
-struct xlator_cbks cbks;
+struct xlator_cbks cbks = {
+ .client_destroy = iot_client_destroy,
+};
struct volume_options options[] = {
{ .key = {"thread-count"},
diff --git a/xlators/performance/io-threads/src/io-threads.h b/xlators/performance/io-threads/src/io-threads.h
index 6d9ea25..7c4ce78 100644
--- a/xlators/performance/io-threads/src/io-threads.h
+++ b/xlators/performance/io-threads/src/io-threads.h
@@ -48,6 +48,11 @@ typedef enum {
IOT_PRI_MAX,
} iot_pri_t;
+typedef struct {
+ struct list_head clients;
+ struct list_head reqs;
+} iot_client_ctx_t;
+
struct iot_conf {
pthread_mutex_t mutex;
pthread_cond_t cond;
@@ -58,7 +63,14 @@ struct iot_conf {
int32_t idle_time; /* in seconds */
- struct list_head reqs[IOT_PRI_MAX];
+ struct list_head clients[IOT_PRI_MAX];
+ /*
+ * It turns out that there are several ways a frame can get to us
+ * without having an associated client (server_first_lookup was the
+ * first one I hit). Instead of trying to update all such callers,
+ * we use this to queue them.
+ */
+ iot_client_ctx_t no_client[IOT_PRI_MAX];
int32_t ac_iot_limit[IOT_PRI_MAX];
int32_t ac_iot_count[IOT_PRI_MAX];
@@ -68,7 +80,7 @@ struct iot_conf {
gf_boolean_t least_priority; /*Enable/Disable least-priority */
xlator_t *this;
- size_t stack_size;
+ size_t stack_size;
};
typedef struct iot_conf iot_conf_t;
diff --git a/xlators/performance/io-threads/src/iot-mem-types.h b/xlators/performance/io-threads/src/iot-mem-types.h
index 4fa8302..fbf9188 100644
--- a/xlators/performance/io-threads/src/iot-mem-types.h
+++ b/xlators/performance/io-threads/src/iot-mem-types.h
@@ -16,6 +16,7 @@
enum gf_iot_mem_types_ {
gf_iot_mt_iot_conf_t = gf_common_mt_end + 1,
+ gf_iot_mt_client_ctx_t,
gf_iot_mt_end
};
#endif