diff options
| -rw-r--r-- | xlators/performance/io-threads/src/io-threads.c | 103 | ||||
| -rw-r--r-- | xlators/performance/io-threads/src/io-threads.h | 16 | ||||
| -rw-r--r-- | xlators/performance/io-threads/src/iot-mem-types.h | 1 | 
3 files changed, 107 insertions, 13 deletions
diff --git a/xlators/performance/io-threads/src/io-threads.c b/xlators/performance/io-threads/src/io-threads.c index c0bcea48e3d..c6a18fdc0b3 100644 --- a/xlators/performance/io-threads/src/io-threads.c +++ b/xlators/performance/io-threads/src/io-threads.c @@ -48,19 +48,68 @@ struct volume_options options[];                  }                                                              \          } while (0) +iot_client_ctx_t * +iot_get_ctx (xlator_t *this, client_t *client) +{ +        iot_client_ctx_t        *ctx    = NULL; +        int                      i; + +        if (client_ctx_get (client, this, (void **)&ctx) != 0) { +                ctx = GF_CALLOC (IOT_PRI_MAX, sizeof(*ctx), +                                 gf_iot_mt_client_ctx_t); +                if (ctx) { +                        for (i = 0; i < IOT_PRI_MAX; ++i) { +                                INIT_LIST_HEAD (&ctx[i].clients); +                                INIT_LIST_HEAD (&ctx[i].reqs); +                        } +                        if (client_ctx_set (client, this, ctx) != 0) { +                                GF_FREE (ctx); +                                ctx = NULL; +                        } +                } +        } + +        return ctx; +} +  call_stub_t *  __iot_dequeue (iot_conf_t *conf, int *pri)  { -        call_stub_t  *stub = NULL; -        int           i = 0; +        call_stub_t             *stub = NULL; +        int                     i = 0; +        iot_client_ctx_t        *ctx;          *pri = -1;          for (i = 0; i < IOT_PRI_MAX; i++) { -                if (list_empty (&conf->reqs[i]) || -                   (conf->ac_iot_count[i] >= conf->ac_iot_limit[i])) + +                if (conf->ac_iot_count[i] >= conf->ac_iot_limit[i]) { +                        continue; +                } + +                if (list_empty (&conf->clients[i])) { +                        continue; +                } + +                /* Get the first per-client queue for this priority. */ +                ctx = list_first_entry (&conf->clients[i], +                                        iot_client_ctx_t, clients); +                if (!ctx) {                          continue; +                } + +                if (list_empty (&ctx->reqs)) { +                        continue; +                } + +                /* Get the first request on that queue. */ +                stub = list_first_entry (&ctx->reqs, call_stub_t, list); +                list_del_init (&stub->list); +                if (list_empty (&ctx->reqs)) { +                        list_del_init (&ctx->clients); +                } else { +                        list_rotate_left (&conf->clients[i]); +                } -                stub = list_entry (conf->reqs[i].next, call_stub_t, list);                  conf->ac_iot_count[i]++;                  *pri = i;                  break; @@ -71,7 +120,6 @@ __iot_dequeue (iot_conf_t *conf, int *pri)          conf->queue_size--;          conf->queue_sizes[*pri]--; -        list_del_init (&stub->list);          return stub;  } @@ -80,15 +128,31 @@ __iot_dequeue (iot_conf_t *conf, int *pri)  void  __iot_enqueue (iot_conf_t *conf, call_stub_t *stub, int pri)  { +        client_t                *client = stub->frame->root->client; +        iot_client_ctx_t        *ctx; +          if (pri < 0 || pri >= IOT_PRI_MAX)                  pri = IOT_PRI_MAX-1; -        list_add_tail (&stub->list, &conf->reqs[pri]); +        if (client) { +                ctx = iot_get_ctx (THIS, client); +                if (ctx) { +                        ctx = &ctx[pri]; +                } +        } else { +                ctx = NULL; +        } +        if (!ctx) { +                ctx = &conf->no_client[pri]; +        } + +        if (list_empty (&ctx->reqs)) { +                list_add_tail (&ctx->clients, &conf->clients[pri]); +        } +        list_add_tail (&stub->list, &ctx->reqs);          conf->queue_size++;          conf->queue_sizes[pri]++; - -        return;  } @@ -958,7 +1022,9 @@ init (xlator_t *this)          conf->this = this;          for (i = 0; i < IOT_PRI_MAX; i++) { -                INIT_LIST_HEAD (&conf->reqs[i]); +                INIT_LIST_HEAD (&conf->clients[i]); +                INIT_LIST_HEAD (&conf->no_client[i].clients); +                INIT_LIST_HEAD (&conf->no_client[i].reqs);          }  	ret = iot_workers_scale (conf); @@ -991,6 +1057,19 @@ fini (xlator_t *this)  	return;  } +int +iot_client_destroy (xlator_t *this, client_t *client) +{ +        void    *tmp    = NULL; + +        if (client_ctx_del (client, this, &tmp) == 0) { +                GF_FREE (tmp); +        } + +        return 0; +} + +  struct xlator_dumpops dumpops = {          .priv    = iot_priv_dump,  }; @@ -1046,7 +1125,9 @@ struct xlator_fops fops = {          .setactivelk = iot_setactivelk,  }; -struct xlator_cbks cbks; +struct xlator_cbks cbks = { +        .client_destroy = iot_client_destroy, +};  struct volume_options options[] = {  	{ .key  = {"thread-count"}, diff --git a/xlators/performance/io-threads/src/io-threads.h b/xlators/performance/io-threads/src/io-threads.h index 6d9ea255909..7c4ce7849b4 100644 --- a/xlators/performance/io-threads/src/io-threads.h +++ b/xlators/performance/io-threads/src/io-threads.h @@ -48,6 +48,11 @@ typedef enum {          IOT_PRI_MAX,  } iot_pri_t; +typedef struct { +        struct list_head        clients; +        struct list_head        reqs; +} iot_client_ctx_t; +  struct iot_conf {          pthread_mutex_t      mutex;          pthread_cond_t       cond; @@ -58,7 +63,14 @@ struct iot_conf {          int32_t              idle_time;   /* in seconds */ -        struct list_head     reqs[IOT_PRI_MAX]; +        struct list_head     clients[IOT_PRI_MAX]; +        /* +         * It turns out that there are several ways a frame can get to us +         * without having an associated client (server_first_lookup was the +         * first one I hit).  Instead of trying to update all such callers, +         * we use this to queue them. +         */ +        iot_client_ctx_t     no_client[IOT_PRI_MAX];          int32_t              ac_iot_limit[IOT_PRI_MAX];          int32_t              ac_iot_count[IOT_PRI_MAX]; @@ -68,7 +80,7 @@ struct iot_conf {          gf_boolean_t         least_priority; /*Enable/Disable least-priority */          xlator_t            *this; -        size_t              stack_size; +        size_t               stack_size;  };  typedef struct iot_conf iot_conf_t; diff --git a/xlators/performance/io-threads/src/iot-mem-types.h b/xlators/performance/io-threads/src/iot-mem-types.h index 4fa8302d1f4..fbf9188f9cd 100644 --- a/xlators/performance/io-threads/src/iot-mem-types.h +++ b/xlators/performance/io-threads/src/iot-mem-types.h @@ -16,6 +16,7 @@  enum gf_iot_mem_types_ {          gf_iot_mt_iot_conf_t  = gf_common_mt_end + 1, +        gf_iot_mt_client_ctx_t,          gf_iot_mt_end  };  #endif  | 
