summaryrefslogtreecommitdiffstats
path: root/xlators/performance/io-threads/src/io-threads.c
diff options
context:
space:
mode:
Diffstat (limited to 'xlators/performance/io-threads/src/io-threads.c')
-rw-r--r--xlators/performance/io-threads/src/io-threads.c1246
1 files changed, 934 insertions, 312 deletions
diff --git a/xlators/performance/io-threads/src/io-threads.c b/xlators/performance/io-threads/src/io-threads.c
index 04d94691ba3..c0ebdd33caf 100644
--- a/xlators/performance/io-threads/src/io-threads.c
+++ b/xlators/performance/io-threads/src/io-threads.c
@@ -8,6 +8,11 @@
cases as published by the Free Software Foundation.
*/
+#ifndef _CONFIG_H
+#define _CONFIG_H
+#include "config.h"
+#endif
+
#include "call-stub.h"
#include "defaults.h"
#include "glusterfs.h"
@@ -20,12 +25,14 @@
#include <sys/time.h>
#include <time.h>
#include "locking.h"
-#include "io-threads-messages.h"
#include "timespec.h"
+#include "hashfn.h"
void *iot_worker (void *arg);
int iot_workers_scale (iot_conf_t *conf);
int __iot_workers_scale (iot_conf_t *conf);
+int32_t iot_reinit_clock (iot_conf_t *conf, int i, uint32_t total_weight);
+int32_t __iot_reinit_clock (iot_conf_t *conf, int i, uint32_t total_weight);
struct volume_options options[];
#define IOT_FOP(name, frame, this, args ...) \
@@ -50,84 +57,622 @@ struct volume_options options[];
} \
} while (0)
+gf_boolean_t
+__iot_ns_queue_empty (iot_ns_queue_t *queue)
+{
+ return (queue->size == 0);
+}
+
+/* Fetch a namespace queue for the given ns_info struct. If the hash was not
+ * found or the clock is not enabled, then return NULL which will be substituted
+ * by the unknown queue. */
+iot_ns_queue_t *
+__iot_get_ns_queue (iot_conf_t *conf, ns_info_t *info)
+{
+ char ns_key[DICT_UINT32_KEY_SIZE];
+ int32_t ret = -1;
+ iot_ns_queue_t *queue = NULL;
+
+ if (!conf->ns_weighted_queueing || !info->found) {
+ gf_log (GF_IO_THREADS, GF_LOG_TRACE,
+ "Namespace not found for %u", info->hash);
+ return NULL;
+ }
+
+ dict_uint32_to_key (info->hash, ns_key);
+ ret = dict_get_ptr (conf->ns_queues, ns_key, (void **)&queue);
+
+ if (ret) {
+ gf_log (GF_IO_THREADS, GF_LOG_TRACE,
+ "Namespace not found for %u", info->hash);
+ }
+
+ /* If successful, return `queue`. */
+ return (!ret && queue) ? queue : NULL;
+}
+
+/* When we parse a new namespace conf file, we create a whole new set of queue
+ * structs; however, old requests may be sitting on old queues. This function
+ * drains the old requests lists into the new queue, or alternatively appends
+ * it onto the unknown queue if there is no corresponding new queue.
+ * (i.e. if it was removed from the conf file) */
+int
+__iot_drain_ns_queue_foreach (dict_t *this, char *key, data_t *value, void *data)
+{
+ int ret = 0;
+ iot_conf_t *conf = data;
+ dict_t *new_dict = conf->ns_queues;
+ iot_ns_queue_t *old_queue = data_to_ptr (value);
+ iot_ns_queue_t *new_queue = NULL;
+ int i;
+
+ ret = dict_get_ptr (new_dict, key, (void **)&new_queue);
+
+ /* Don't drain the unknown queue. */
+ if (old_queue == conf->ns_unknown_queue) {
+ return 0;
+ }
+
+ for (i = 0; i < IOT_PRI_MAX; i++) {
+ /* If we didn't find a "new queue" corresponding to the old,
+ * then drain into the unknown queue of that priority level. */
+ if (!ret && new_queue) {
+ list_append_init (&old_queue[i].reqs,
+ &new_queue[i].reqs);
+ new_queue[i].size += old_queue[i].size;
+ } else {
+ list_append_init (&old_queue[i].reqs,
+ &conf->ns_unknown_queue[i].reqs);
+ conf->ns_unknown_queue[i].size += old_queue[i].size;
+ }
+ }
+
+ return 0;
+}
+
+/* Drain the namespace queues in old_dict, if there are any. Then free the dict
+ * and clear the clock structs. */
+void
+__iot_drain_and_clear_clock (iot_conf_t *conf, dict_t *old_dict)
+{
+ int i;
+
+ if (old_dict) {
+ dict_foreach (old_dict, __iot_drain_ns_queue_foreach, conf);
+ dict_destroy (old_dict);
+ }
+
+ for (i = 0; i < IOT_PRI_MAX; i++) {
+ GF_FREE (conf->ns_clocks[i].slots);
+ conf->ns_clocks[i].slots = NULL;
+ conf->ns_clocks[i].idx = 0;
+ conf->ns_clocks[i].size = 0;
+ }
+}
+
+/* Parse a single namespace conf line. This constructs a new queue and puts it
+ * into the namespace dictionary as well, skipping duplicated namespaces with
+ * a warning. */
+int32_t
+__iot_ns_parse_conf_line (iot_conf_t *conf, char *file_line, uint32_t *total_weight)
+{
+ char ns_key[DICT_UINT32_KEY_SIZE];
+ char *ns_name = NULL;
+ iot_ns_queue_t *queue = NULL;
+ int32_t queue_weight = 1;
+ uint32_t ns_hash = 0;
+ int i, ret = -1, scanned = -1;
+
+ ns_name = GF_CALLOC (strlen (file_line), sizeof (char), 0);
+ if (!ns_name) {
+ gf_log (GF_IO_THREADS, GF_LOG_WARNING,
+ "Memory allocation error!");
+ ret = ENOMEM;
+ goto out;
+ }
+
+ /* Scan the line, skipping the second column which corresponds to a
+ * throttling rate, which we don't particularly care about. */
+ scanned = sscanf (file_line, "%s %*d %d", ns_name, &queue_weight);
+ if (scanned < 1 || strlen (ns_name) < 1) {
+ gf_log (GF_IO_THREADS, GF_LOG_WARNING,
+ "Empty or malformatted line \"%s\" while parsing", file_line);
+ goto out;
+ }
+
+ /* Hash the namespace name, convert it to a key, then search the dict
+ * for an entry matching this key. */
+ ns_hash = SuperFastHash (ns_name, strlen (ns_name));
+
+ gf_log (GF_IO_THREADS, GF_LOG_INFO,
+ "Parsed namespace \'%s\' (%u)", ns_name, ns_hash);
+
+ dict_uint32_to_key (ns_hash, ns_key);
+ ret = dict_get_ptr (conf->ns_queues, ns_key, (void **)&queue);
+ if (!ret && queue) {
+ gf_log (GF_IO_THREADS, GF_LOG_WARNING,
+ "Duplicate-hashed queue found for namespace %s", ns_name);
+ /* Since ret == 0, we won't free the queue inadvertently. */
+ goto out;
+ }
+
+ queue = GF_CALLOC (IOT_PRI_MAX, sizeof (iot_ns_queue_t), 0);
+ if (!queue) {
+ gf_log (GF_IO_THREADS, GF_LOG_WARNING,
+ "Memory allocation error!");
+ ret = -(ENOMEM);
+ goto out;
+ }
+
+ /* Init queues. */
+ for (i = 0; i < IOT_PRI_MAX; i++) {
+ INIT_LIST_HEAD (&queue[i].reqs);
+ queue[i].hash = ns_hash;
+ queue[i].weight = conf->ns_default_weight;
+ queue[i].size = 0;
+ }
+
+ ret = dict_set_ptr (conf->ns_queues, ns_key, queue);
+ if (ret) {
+ goto out;
+ }
+
+ ret = dict_set_dynstr_with_alloc (conf->hash_to_ns, ns_key, ns_name);
+ if (ret) {
+ goto out;
+ }
+
+ if (scanned < 2 || queue_weight < 1) {
+ gf_log (GF_IO_THREADS, GF_LOG_WARNING,
+ "No weight (or too low) found in config line for namespace %s, "
+ "defaulting to weight of %u.", ns_name, conf->ns_default_weight);
+ } else {
+ gf_log (GF_IO_THREADS, GF_LOG_INFO,
+ "Parsed weight \'%s\' = %u", ns_name, queue_weight);
+ for (i = 0; i < IOT_PRI_MAX; i++) {
+ queue[i].weight = (uint32_t) queue_weight;
+ }
+ }
+
+ *total_weight += queue->weight;
+
+out:
+ if (ns_name) {
+ GF_FREE (ns_name);
+ }
+
+ if (ret && queue) {
+ GF_FREE (queue);
+ }
+
+ return ret;
+}
+
+/* This function (re)initializes the clock that is used by the en/de-queue
+ * operations. */
+void
+__iot_reinit_ns_conf (iot_conf_t *conf)
+{
+ char ns_unknown_key[DICT_UINT32_KEY_SIZE];
+ dict_t *old_dict, *new_dict;
+ FILE *fp = NULL;
+ char *line = NULL;
+ size_t len = 0;
+ uint32_t total_weight;
+ int i, ret = 0;
+
+ if (!conf) {
+ return;
+ }
+
+ if (conf->ns_weighted_queueing) {
+ gf_log (GF_IO_THREADS, GF_LOG_INFO,
+ "Loading %s from disk.",
+ _IOT_NAMESPACE_CONF);
+
+ fp = fopen (_IOT_NAMESPACE_CONF, "r");
+ if (!fp) {
+ gf_log (GF_IO_THREADS, GF_LOG_INFO,
+ "Cannot open file for reading.");
+ ret = ENOENT;
+ goto out;
+ }
+
+ /* Save the old queues; we will need to drain old requests out
+ * of it once we make new queues. */
+ old_dict = conf->ns_queues;
+ conf->ns_queues = new_dict = get_new_dict ();
+
+ /* Include the unknown queue weight, which isn't a parsed line. */
+ total_weight = conf->ns_default_weight;
+
+ /* Parse the new config file line-by-line, making a new queue
+ * for each namespace that is parsed. */
+ while (getline (&line, &len, fp) != -1) {
+ __iot_ns_parse_conf_line (conf, line, &total_weight);
+ }
+ free (line);
+
+ /* Drain old queues into new queues, or into unknown queue. */
+ __iot_drain_and_clear_clock (conf, old_dict);
+
+ /* We will add the unknown queue manually into the dictionaries. */
+ dict_uint32_to_key (0, ns_unknown_key);
+ ret = dict_set_dynstr_with_alloc (conf->hash_to_ns,
+ ns_unknown_key, "unknown");
+ if (ret) {
+ goto out;
+ }
+
+ /* Set the ns_unknown_queue as static pointer so it's not freed
+ * in the queue drain step next time the automation. */
+ ret = dict_set_static_ptr (conf->ns_queues, ns_unknown_key,
+ conf->ns_unknown_queue);
+ if (ret) {
+ goto out;
+ }
+
+ for (i = 0; i < IOT_PRI_MAX; i++) {
+ ret = __iot_reinit_clock (conf, i, total_weight);
+ if (ret) {
+ goto out;
+ }
+ }
+
+ /* Finally, keep our conf struct updated so we don't spuriously
+ * reconfigure the clock. */
+ get_file_mtime (_IOT_NAMESPACE_CONF, &conf->ns_conf_mtime);
+ }
+
+ ret = 0;
+out:
+ if (fp) {
+ fclose (fp);
+ }
+
+ if (ret) {
+ gf_log (GF_IO_THREADS, GF_LOG_INFO,
+ "There was an error loading the namespaces conf file, "
+ "disabling clock.");
+ conf->ns_weighted_queueing = _gf_false;
+ }
+
+ /* If our clock isn't enabled (or it was disabled because of an error)
+ * then drain the queues if there are any. */
+ if (!conf->ns_weighted_queueing) {
+ old_dict = conf->ns_queues;
+ /* This NULL signals to the drain that we're not populating any
+ * new queues... */
+ conf->ns_queues = NULL;
+ __iot_drain_and_clear_clock (conf, old_dict);
+ }
+}
+
+/* This is a simple iterative algorithm which tries to allocate the lowest
+ * amount of slots that maintains the same proportional amount of work given
+ * to each namespace.
+ *
+ * Each namespace has a weight, and the proportion of "weight / total weight"
+ * is something we'll call the "ideal" work ratio. If we have no latency and
+ * infinite queues, this ratio is the amount of requests we serve (out of total)
+ * for the given namespace.
+ *
+ * For a given namespace idx i, let the namespace's weight be W_i, and the total
+ * weight be TW. The "ideal" percentage is thus given by W_i/TW. Now if we
+ * choose some arbitrary total number of slots TS, the number of slots we
+ * should give to the namespace is given by S_i = TS*(W_i/TW). Since this is
+ * probably not an integer, we'll actually floor the number, meaning that the
+ * sum of S_i for each namespace most likely doesn't equal the TS we chose
+ * earlier. Let this "real" total sum of slots be RS.
+ *
+ * Now we have the concept of an "realized" percentage given by the ratio of
+ * _allocated slots_ instead of just ideal weights, given by S_i/RS. We consider
+ * this set of slot allocations to be ideal if these two ratios (slots and
+ * weights) are "close enough", given by our ns-weight-tolerance option.
+ *
+ * We then use a loop to distribute the shares, using some modulo magic to
+ * get a good, semi-even distribution in the slots array. The main concern here
+ * is trying to make sure that no single share is bunched up.
+ * If we have namespaces A and B with 2 and 8 slots respectively, we should
+ * shoot for a distribution like [A B B B B A B B B] unstead of like
+ * [A A B B B B B B B B]. This loop tries its best to do that. We don't want
+ * to shuffle randomly either, since there is still a risk of having a bad
+ * bunching if our RNG is bad or we're unlucky... */
+int32_t
+__iot_reinit_clock (iot_conf_t *conf, int i, uint32_t total_weight)
+{
+ int32_t ret = -1;
+ uint32_t try_slots, total_slots, slots_idx, rep;
+ data_pair_t *curr = NULL;
+ iot_ns_queue_t *curr_queue = NULL;
+ iot_ns_queue_t **slots = NULL;
+ char *ns_name = NULL;
+ gf_boolean_t fail;
+ double real_percentage;
+
+ /* Initialize the "ideal" percentage each queue should have. */
+ dict_foreach_inline (conf->ns_queues, curr) {
+ curr_queue = data_to_ptr (curr->value);
+ curr_queue = &curr_queue[i];
+
+ curr_queue->percentage = ((double) curr_queue->weight)
+ / total_weight;
+ }
+
+ /* We try to satisfy each percentage within some margin, first trying
+ * 1 slot, until we get up to the total sum of all weights, which will
+ * obviously satisfy each percentage but in many cases is far too large
+ * for a slots matrix. */
+ for (try_slots = 1; try_slots <= total_weight; try_slots++) {
+ fail = _gf_false;
+ total_slots = 0;
+
+ /* Calculate how many slots each namespace much get. Since we're
+ * rounding integers, we keep track of the actual total number
+ * of slots in `total_slots`. */
+ dict_foreach_inline (conf->ns_queues, curr) {
+ curr_queue = data_to_ptr (curr->value);
+ curr_queue = &curr_queue[i];
+
+ curr_queue->slots = (int) (curr_queue->percentage * try_slots);
+ total_slots += curr_queue->slots;
+
+ /* If we've allocated less than 1 slot for the queue,
+ * we should find a larger size. */
+ if (curr_queue->slots < 1) {
+ fail = _gf_true;
+ break;
+ }
+ }
+
+ if (fail) {
+ continue;
+ }
+
+ dict_foreach_inline (conf->ns_queues, curr) {
+ curr_queue = data_to_ptr (curr->value);
+ curr_queue = &curr_queue[i];
+
+ real_percentage = ((double) curr_queue->slots) / total_slots;
+ /* If the realized percentage is more than ns_weight_tolerance
+ * percent away from the ideal percentage, then let's try
+ * another number. */
+ if (abs (real_percentage - curr_queue->percentage) * 100.0
+ > conf->ns_weight_tolerance) {
+ fail = _gf_true;
+ break;
+ }
+ }
+
+ if (!fail) {
+ break;
+ }
+ }
+
+ /* Report the fits that we have found. */
+ dict_foreach_inline (conf->ns_queues, curr) {
+ curr_queue = data_to_ptr (curr->value);
+ curr_queue = &curr_queue[i];
+
+ real_percentage = ((double) curr_queue->slots) / total_slots;
+ ret = dict_get_str (conf->hash_to_ns, curr->key, &ns_name);
+
+ if (ret || !ns_name) {
+ continue;
+ }
+
+ gf_log (GF_IO_THREADS, GF_LOG_INFO,
+ "Initializing namespace \'%s\' (weight: %d) with %d slots. "
+ "Ideal percentage: %0.2f%%, real percentage: %0.2f%%.",
+ ns_name, curr_queue->weight, curr_queue->slots,
+ curr_queue->percentage * 100.0, real_percentage * 100.0);
+ }
+
+ /* At this point, we've either found a good fit, or gotten all the way to
+ * total_weight. In either case, we can start allocating slots. */
+ slots = GF_CALLOC (total_slots, sizeof (iot_ns_queue_t *), 0);
+ slots_idx = 0;
+ rep = 0;
+
+ if (!slots) {
+ ret = -(ENOMEM);
+ goto out;
+ }
+
+ /* Allocate slots, with some fun modulo-math to make sure that they're
+ * well distributed. */
+ while (total_slots != slots_idx) {
+ dict_foreach_inline (conf->ns_queues, curr) {
+ curr_queue = data_to_ptr (curr->value);
+ curr_queue = &curr_queue[i];
+
+ if (curr_queue->slots == 0) {
+ continue;
+ }
+
+ if (rep % (total_slots / curr_queue->slots) == 0) {
+ slots[slots_idx++] = curr_queue;
+ curr_queue->slots--;
+ }
+ }
+
+ rep++;
+ }
+
+ /* Set the slots into the queue, and we're ready to go! */
+ conf->ns_clocks[i].slots = slots;
+ conf->ns_clocks[i].size = total_slots;
+ conf->ns_clocks[i].idx = 0;
+
+ ret = 0;
+out:
+ if (ret && slots) {
+ GF_FREE (slots);
+ }
+
+ return ret;
+}
+
+
+void *
+iot_reinit_ns_conf_thread (void *arg)
+{
+ xlator_t *this = arg;
+ iot_conf_t *conf = this->private;
+ time_t curr_mtime = {0, };
+
+ while (_gf_true) {
+ sleep (conf->ns_conf_reinit_secs);
+
+ get_file_mtime (_IOT_NAMESPACE_CONF, &curr_mtime);
+ if (conf->ns_weighted_queueing && curr_mtime != conf->ns_conf_mtime) {
+ pthread_setcancelstate (PTHREAD_CANCEL_DISABLE, NULL);
+ pthread_mutex_lock (&conf->mutex);
+ {
+ __iot_reinit_ns_conf (conf);
+ }
+ pthread_mutex_unlock (&conf->mutex);
+ pthread_setcancelstate (PTHREAD_CANCEL_ENABLE, NULL);
+ } else {
+ gf_log (GF_IO_THREADS, GF_LOG_DEBUG,
+ "Config file %s not modified, skipping.",
+ _IOT_NAMESPACE_CONF);
+ }
+ }
+}
+
+static void
+start_iot_reinit_ns_conf_thread (xlator_t *this)
+{
+ iot_conf_t *priv = this->private;
+ int ret;
+
+ if (!priv) {
+ return;
+ }
+
+ if (priv->reinit_ns_conf_thread_running) {
+ gf_log (GF_IO_THREADS, GF_LOG_INFO, "reinit_ns_conf_thread already started.");
+ return;
+ }
+
+ gf_log (GF_IO_THREADS, GF_LOG_INFO, "Starting reinit_ns_conf_thread.");
+
+ ret = pthread_create (&priv->reinit_ns_conf_thread, NULL, iot_reinit_ns_conf_thread, this);
+ if (ret == 0) {
+ priv->reinit_ns_conf_thread_running = _gf_true;
+ } else {
+ gf_log (this->name, GF_LOG_WARNING,
+ "pthread_create(iot_reinit_ns_conf_thread) failed");
+ }
+}
+
+static void
+stop_iot_reinit_ns_conf_thread (xlator_t *this)
+{
+ iot_conf_t *priv = this->private;
+
+ if (!priv) {
+ return;
+ }
+
+ if (!priv->reinit_ns_conf_thread_running) {
+ gf_log (GF_IO_THREADS, GF_LOG_INFO, "reinit_ns_conf_thread already stopped.");
+ return;
+ }
+
+ gf_log (GF_IO_THREADS, GF_LOG_INFO, "Stopping reinit_ns_conf_thread.");
+
+ if (pthread_cancel (priv->reinit_ns_conf_thread) != 0) {
+ gf_log (this->name, GF_LOG_WARNING,
+ "pthread_cancel(iot_reinit_ns_conf_thread) failed");
+ }
+
+ if (pthread_join (priv->reinit_ns_conf_thread, NULL) != 0) {
+ gf_log (this->name, GF_LOG_WARNING,
+ "pthread_join(iot_reinit_ns_conf_thread) failed");
+ }
+
+ /* Failure probably means it's already dead. */
+ priv->reinit_ns_conf_thread_running = _gf_false;
+}
+
call_stub_t *
-__iot_dequeue (iot_conf_t *conf, int *pri, struct timespec *sleep)
+__iot_dequeue (iot_conf_t *conf, int *pri)
{
- call_stub_t *stub = NULL;
- int i = 0;
- struct timeval curtv = {0,}, difftv = {0,};
+ call_stub_t *stub = NULL;
+ iot_ns_clock_t *curr_clock;
+ iot_ns_queue_t *queue = NULL;
+ int i, start_idx;
*pri = -1;
- sleep->tv_sec = 0;
- sleep->tv_nsec = 0;
+
for (i = 0; i < IOT_PRI_MAX; i++) {
- if (list_empty (&conf->reqs[i]) ||
- (conf->ac_iot_count[i] >= conf->ac_iot_limit[i]))
+ if (conf->ac_iot_count[i] >= conf->ac_iot_limit[i] ||
+ conf->queue_sizes[i] == 0) {
+ /* If we have too many workers currently serving this
+ * priority level, or no reqs at this level, skip. */
continue;
+ }
- if (i == IOT_PRI_LEAST) {
- pthread_mutex_lock(&conf->throttle.lock);
- if (!conf->throttle.sample_time.tv_sec) {
- /* initialize */
- gettimeofday(&conf->throttle.sample_time, NULL);
- } else {
- /*
- * Maintain a running count of least priority
- * operations that are handled over a particular
- * time interval. The count is provided via
- * state dump and is used as a measure against
- * least priority op throttling.
- */
- gettimeofday(&curtv, NULL);
- timersub(&curtv, &conf->throttle.sample_time,
- &difftv);
- if (difftv.tv_sec >= IOT_LEAST_THROTTLE_DELAY) {
- conf->throttle.cached_rate =
- conf->throttle.sample_cnt;
- conf->throttle.sample_cnt = 0;
- conf->throttle.sample_time = curtv;
- }
-
- /*
- * If we're over the configured rate limit,
- * provide an absolute time to the caller that
- * represents the soonest we're allowed to
- * return another least priority request.
- */
- if (conf->throttle.rate_limit &&
- conf->throttle.sample_cnt >=
- conf->throttle.rate_limit) {
- struct timeval delay;
- delay.tv_sec = IOT_LEAST_THROTTLE_DELAY;
- delay.tv_usec = 0;
-
- timeradd(&conf->throttle.sample_time,
- &delay, &curtv);
- TIMEVAL_TO_TIMESPEC(&curtv, sleep);
-
- pthread_mutex_unlock(
- &conf->throttle.lock);
- break;
- }
- }
- conf->throttle.sample_cnt++;
- pthread_mutex_unlock(&conf->throttle.lock);
- }
-
- stub = list_entry (conf->reqs[i].next, call_stub_t, list);
+ if (conf->ns_weighted_queueing) {
+ /* Get the clock for this priority level, and keep track
+ * of where the search started, so we know if we've
+ * searched through the whole clock. */
+ curr_clock = &conf->ns_clocks[i];
+ start_idx = curr_clock->idx;
+
+ do {
+ /* Get the queue for the current index (modulo
+ * size), and increment the clock forward. */
+ queue = curr_clock->slots[curr_clock->idx];
+ curr_clock->idx = (curr_clock->idx + 1) % curr_clock->size;
+
+ /* If we have a request to serve, then we're done. */
+ if (!__iot_ns_queue_empty (queue)) {
+ break;
+ }
+
+ /* Otherwise, keep searching until we've looped
+ * back to the start. */
+ queue = NULL;
+ } while (curr_clock->idx != start_idx);
+
+ /* If we have no queue, we must've not found a req to
+ * serve. Let's try the next priority. */
+ if (!queue) {
+ continue;
+ }
+ } else {
+ /* If our unknown queue is empty, we have no other
+ * queues to serve. */
+ if (__iot_ns_queue_empty (&conf->ns_unknown_queue[i])) {
+ continue;
+ }
+
+ /* Select the unknown queue as the next queue to
+ * serve a request from. */
+ queue = &conf->ns_unknown_queue[i];
+ }
+
+ /* Otherwise take a request off of the queue. */
+ stub = list_first_entry (&queue->reqs, call_stub_t, list);
+ list_del_init (&stub->list);
+
+ /* Increment the number of workers serving this priority,
+ * and record which priority we are serving. Update queue
+ * sizes and set `pri` variable for the caller. */
conf->ac_iot_count[i]++;
conf->queue_marked[i] = _gf_false;
+
+ conf->queue_size--;
+ conf->queue_sizes[i]--;
+ queue->size--;
+
*pri = i;
break;
}
- if (!stub)
- return NULL;
-
- conf->queue_size--;
- conf->queue_sizes[*pri]--;
- list_del_init (&stub->list);
-
return stub;
}
@@ -135,15 +680,29 @@ __iot_dequeue (iot_conf_t *conf, int *pri, struct timespec *sleep)
void
__iot_enqueue (iot_conf_t *conf, call_stub_t *stub, int pri)
{
- if (pri < 0 || pri >= IOT_PRI_MAX)
- pri = IOT_PRI_MAX-1;
+ ns_info_t *info = &stub->frame->root->ns_info;
+ iot_ns_queue_t *queue = 0;
+
+ /* If we have an invalid priority, set it to LEAST. */
+ if (pri < 0 || pri >= IOT_PRI_MAX) {
+ pri = IOT_PRI_MAX - 1;
+ }
- list_add_tail (&stub->list, &conf->reqs[pri]);
+ /* Get the queue for this namespace. If we don't have one,
+ * use the unknown queue that always exists in the conf struct. */
+ queue = __iot_get_ns_queue (conf, info);
+ if (!queue) {
+ queue = conf->ns_unknown_queue;
+ }
+
+ /* Get the (pri)'th level queue, and add the request to the queue. */
+ queue = &queue[pri];
+ list_add_tail (&stub->list, &queue->reqs);
+ /* Update size records. */
conf->queue_size++;
conf->queue_sizes[pri]++;
-
- return;
+ queue->size++;
}
@@ -156,28 +715,24 @@ iot_worker (void *data)
struct timespec sleep_till = {0, };
int ret = 0;
int pri = -1;
- struct timespec sleep = {0,};
- gf_boolean_t bye = _gf_false;
+ char timeout = 0;
+ char bye = 0;
conf = data;
this = conf->this;
THIS = this;
+ gf_log (GF_IO_THREADS, GF_LOG_DEBUG, "IOT worker spawned.");
- for (;;) {
+ while (_gf_true) {
pthread_mutex_lock (&conf->mutex);
{
if (pri != -1) {
conf->ac_iot_count[pri]--;
pri = -1;
}
- while (conf->queue_size == 0) {
- if (conf->down) {
- bye = _gf_true;/*Avoid sleep*/
- break;
- }
- clock_gettime (CLOCK_REALTIME_COARSE,
- &sleep_till);
+ while (conf->queue_size == 0) {
+ clock_gettime (CLOCK_REALTIME_COARSE, &sleep_till);
sleep_till.tv_sec += conf->idle_time;
conf->sleep_count++;
@@ -186,55 +741,52 @@ iot_worker (void *data)
&sleep_till);
conf->sleep_count--;
- if (conf->down || ret == ETIMEDOUT) {
- bye = _gf_true;
+ if (ret == ETIMEDOUT) {
+ timeout = 1;
break;
}
}
- if (bye) {
- if (conf->down ||
- conf->curr_count > IOT_MIN_THREADS) {
+ if (timeout) {
+ if (conf->curr_count > IOT_MIN_THREADS) {
conf->curr_count--;
- if (conf->curr_count == 0)
- pthread_cond_broadcast (&conf->cond);
- gf_msg_debug (conf->this->name, 0,
- "terminated. "
- "conf->curr_count=%d",
- conf->curr_count);
+ bye = 1;
+ gf_log (conf->this->name, GF_LOG_DEBUG,
+ "timeout, terminated. conf->curr_count=%d",
+ conf->curr_count);
} else {
- bye = _gf_false;
+ timeout = 0;
}
}
- if (!bye) {
- stub = __iot_dequeue (conf, &pri, &sleep);
- if (!stub && (sleep.tv_sec || sleep.tv_nsec)) {
- pthread_cond_timedwait(&conf->cond,
- &conf->mutex,
- &sleep);
- pthread_mutex_unlock(&conf->mutex);
- continue;
- }
- }
+ stub = __iot_dequeue (conf, &pri);
}
pthread_mutex_unlock (&conf->mutex);
if (stub) { /* guard against spurious wakeups */
if (stub->poison) {
gf_log (this->name, GF_LOG_INFO,
- "Dropping poisoned request %p.", stub);
+ "dropping poisoned request %p", stub);
call_stub_destroy (stub);
} else {
call_resume (stub);
}
}
- stub = NULL;
- if (bye)
+ if (bye) {
break;
+ }
+ }
+
+ if (pri != -1) {
+ pthread_mutex_lock (&conf->mutex);
+ {
+ conf->ac_iot_count[pri]--;
+ }
+ pthread_mutex_unlock (&conf->mutex);
}
+ gf_log (GF_IO_THREADS, GF_LOG_DEBUG, "IOT worker died.");
return NULL;
}
@@ -273,6 +825,9 @@ iot_get_pri_meaning (iot_pri_t pri)
{
char *name = NULL;
switch (pri) {
+ case IOT_PRI_UNSPEC:
+ name = "unspecified";
+ break;
case IOT_PRI_HI:
name = "fast";
break;
@@ -288,13 +843,11 @@ iot_get_pri_meaning (iot_pri_t pri)
case IOT_PRI_MAX:
name = "invalid";
break;
- case IOT_PRI_UNSPEC:
- name = "unspecified";
- break;
}
return name;
}
+
int
iot_schedule (call_frame_t *frame, xlator_t *this, call_stub_t *stub)
{
@@ -307,75 +860,17 @@ iot_schedule (call_frame_t *frame, xlator_t *this, call_stub_t *stub)
goto out;
}
- switch (stub->fop) {
- case GF_FOP_OPEN:
- case GF_FOP_STAT:
- case GF_FOP_FSTAT:
- case GF_FOP_LOOKUP:
- case GF_FOP_ACCESS:
- case GF_FOP_READLINK:
- case GF_FOP_OPENDIR:
- case GF_FOP_STATFS:
- case GF_FOP_READDIR:
- case GF_FOP_READDIRP:
- case GF_FOP_GETACTIVELK:
- case GF_FOP_SETACTIVELK:
- pri = IOT_PRI_HI;
- break;
-
- case GF_FOP_CREATE:
- case GF_FOP_FLUSH:
- case GF_FOP_LK:
- case GF_FOP_INODELK:
- case GF_FOP_FINODELK:
- case GF_FOP_ENTRYLK:
- case GF_FOP_FENTRYLK:
- case GF_FOP_LEASE:
- case GF_FOP_UNLINK:
- case GF_FOP_SETATTR:
- case GF_FOP_FSETATTR:
- case GF_FOP_MKNOD:
- case GF_FOP_MKDIR:
- case GF_FOP_RMDIR:
- case GF_FOP_SYMLINK:
- case GF_FOP_RENAME:
- case GF_FOP_LINK:
- case GF_FOP_SETXATTR:
- case GF_FOP_GETXATTR:
- case GF_FOP_FGETXATTR:
- case GF_FOP_FSETXATTR:
- case GF_FOP_REMOVEXATTR:
- case GF_FOP_FREMOVEXATTR:
- pri = IOT_PRI_NORMAL;
- break;
+ if (frame->pri != IOT_PRI_UNSPEC) {
+ pri = frame->pri;
+ goto out;
+ }
- case GF_FOP_READ:
- case GF_FOP_WRITE:
- case GF_FOP_FSYNC:
- case GF_FOP_TRUNCATE:
- case GF_FOP_FTRUNCATE:
- case GF_FOP_FSYNCDIR:
- case GF_FOP_XATTROP:
- case GF_FOP_FXATTROP:
- case GF_FOP_RCHECKSUM:
- case GF_FOP_FALLOCATE:
- case GF_FOP_DISCARD:
- case GF_FOP_ZEROFILL:
- pri = IOT_PRI_LO;
- break;
+ // Retrieve the fop priority
+ pri = iot_fop_to_pri (stub->fop);
- case GF_FOP_FORGET:
- case GF_FOP_RELEASE:
- case GF_FOP_RELEASEDIR:
- case GF_FOP_GETSPEC:
- break;
- case GF_FOP_IPC:
- default:
- return -EINVAL;
- }
out:
- gf_msg_debug (this->name, 0, "%s scheduled as %s fop",
- gf_fop_list[stub->fop], iot_get_pri_meaning (pri));
+ gf_log (this->name, GF_LOG_DEBUG, "%s scheduled as %s fop",
+ gf_fop_list[stub->fop], iot_get_pri_meaning (pri));
ret = do_iot_schedule (this->private, stub, pri);
return ret;
}
@@ -619,34 +1114,115 @@ iot_setxattr (call_frame_t *frame, xlator_t *this, loc_t *loc, dict_t *dict,
}
+/* Populate all queue size keys for a specific priority level. */
int
-iot_getxattr (call_frame_t *frame, xlator_t *this, loc_t *loc,
- const char *name, dict_t *xdata)
+__iot_populate_ns_queue_sizes (iot_conf_t *conf, dict_t *depths, int pri)
+{
+ int ret = 0;
+ data_pair_t *curr = NULL;
+ iot_ns_queue_t *curr_queue = NULL;
+ char *temp_key = NULL;
+ char *ns_name = NULL;
+ const char *pri_str = fop_pri_to_string (pri);;
+
+ /* For each namespace at this priority level, we try to grab the n
+ * namespace name and record a key like `namespaces.NAME.PRI_LEVEL`. */
+ dict_foreach_inline (conf->ns_queues, curr) {
+ curr_queue = data_to_ptr (curr->value);
+ curr_queue = &curr_queue[pri];
+
+ /* Try to retrieve the namespace's un-hashed real name. */
+ ret = dict_get_str (conf->hash_to_ns, curr->key, &ns_name);
+ if (ret || !ns_name) {
+ continue;
+ }
+
+ /* Give the root namespace a readable name. */
+ if (strncmp (ns_name, "/", 1) == 0) {
+ ns_name = "root";
+ }
+
+ /* Print a new temporary key for the namespace and priority level. */
+ ret = gf_asprintf (&temp_key, "namespaces.%s.%s", ns_name, pri_str);
+ if (ret == -1 || !temp_key) {
+ ret = -(ENOMEM);
+ goto out;
+ }
+
+ /* Insert the key and queue-size. */
+ ret = dict_set_int32 (depths, temp_key, curr_queue->size);
+ GF_FREE (temp_key);
+ temp_key = NULL;
+
+ if (ret) {
+ goto out;
+ }
+ }
+
+out:
+ return ret;
+}
+
+/* Populate global queue counts (per-priority) and if namespace queueing is
+ * enabled, then also add those keys to the dictionary as well. */
+int
+__iot_populate_queue_sizes (iot_conf_t *conf, dict_t **depths)
{
- iot_conf_t *conf = NULL;
- dict_t *depths = NULL;
- int i = 0;
+ int ret = 0, pri = 0;
+ const char *pri_str = NULL;
- conf = this->private;
+ /* We explicitly do not want a reference count for this dict
+ * in this translator, since it will get freed in io_stats later. */
+ *depths = get_new_dict ();
+ if (!*depths) {
+ return -(ENOMEM);
+ }
- if (conf && name && strcmp (name, IO_THREADS_QUEUE_SIZE_KEY) == 0) {
- // We explicitly do not want a reference count
- // for this dict in this translator
- depths = get_new_dict ();
- if (!depths)
- goto unwind_special_getxattr;
+ for (pri = 0; pri < IOT_PRI_MAX; pri++) {
+ pri_str = fop_pri_to_string (pri);
- for (i = 0; i < IOT_PRI_MAX; i++) {
- if (dict_set_int32 (depths,
- (char *)fop_pri_to_string (i),
- conf->queue_sizes[i]) != 0) {
- dict_destroy (depths);
- depths = NULL;
- goto unwind_special_getxattr;
+ /* First, let's add an entry for the number of requests
+ * per prority (globally). */
+ ret = dict_set_int32 (*depths, (char *)pri_str,
+ conf->queue_sizes[pri]);
+ if (ret) {
+ goto out;
+ }
+
+ /* If namespace queueing is enabled, then try to populate
+ * per-namespace queue keys as well. */
+ if (conf->ns_weighted_queueing) {
+ ret = __iot_populate_ns_queue_sizes (conf, *depths, pri);
+ if (ret) {
+ goto out;
}
}
+ }
+
+out:
+ if (ret) {
+ dict_destroy (*depths);
+ *depths = NULL;
+ }
+
+ return ret;
+}
+
+int
+iot_getxattr (call_frame_t *frame, xlator_t *this, loc_t *loc,
+ const char *name, dict_t *xdata)
+{
+ iot_conf_t *conf = this->private;
+ dict_t *depths = NULL;
+
+ if (name && strcmp (name, IO_THREADS_QUEUE_SIZE_KEY) == 0) {
+ /* Populate depths dict, or it'll stay NULL if an error occurred. */
+ pthread_mutex_lock (&conf->mutex);
+ {
+ __iot_populate_queue_sizes (conf, &depths);
+ }
+ pthread_mutex_unlock (&conf->mutex);
-unwind_special_getxattr:
STACK_UNWIND_STRICT (getxattr, frame, 0, 0, depths, xdata);
return 0;
}
@@ -824,10 +1400,9 @@ __iot_workers_scale (iot_conf_t *conf)
ret = gf_thread_create (&thread, &conf->w_attr, iot_worker, conf);
if (ret == 0) {
conf->curr_count++;
- gf_msg_debug (conf->this->name, 0,
- "scaled threads to %d (queue_size=%d/%d)",
- conf->curr_count,
- conf->queue_size, scale);
+ gf_log (conf->this->name, GF_LOG_DEBUG,
+ "scaled threads to %d (queue_size=%d/%d)",
+ conf->curr_count, conf->queue_size, scale);
} else {
break;
}
@@ -872,13 +1447,11 @@ set_stack_size (iot_conf_t *conf)
if (err == EINVAL) {
err = pthread_attr_getstacksize (&conf->w_attr, &stacksize);
if (!err)
- gf_msg (this->name, GF_LOG_WARNING,
- 0, IO_THREADS_MSG_SIZE_NOT_SET,
+ gf_log (this->name, GF_LOG_WARNING,
"Using default thread stack size %zd",
stacksize);
else
- gf_msg (this->name, GF_LOG_WARNING,
- 0, IO_THREADS_MSG_SIZE_NOT_SET,
+ gf_log (this->name, GF_LOG_WARNING,
"Using default thread stack size");
}
@@ -897,9 +1470,8 @@ mem_acct_init (xlator_t *this)
ret = xlator_mem_acct_init (this, gf_iot_mt_end + 1);
if (ret != 0) {
- gf_msg (this->name, GF_LOG_ERROR,
- ENOMEM, IO_THREADS_MSG_NO_MEMORY,
- "Memory accounting init failed");
+ gf_log (this->name, GF_LOG_ERROR, "Memory accounting init"
+ "failed");
return ret;
}
@@ -938,10 +1510,6 @@ iot_priv_dump (xlator_t *this)
gf_proc_dump_write("least_priority_threads", "%d",
conf->ac_iot_limit[IOT_PRI_LEAST]);
- gf_proc_dump_write("cached least rate", "%u",
- conf->throttle.cached_rate);
- gf_proc_dump_write("least rate limit", "%u", conf->throttle.rate_limit);
-
return 0;
}
@@ -1107,13 +1675,16 @@ stop_iot_watchdog (xlator_t *this)
int
reconfigure (xlator_t *this, dict_t *options)
{
- iot_conf_t *conf = NULL;
- int ret = -1;
+ iot_conf_t *conf = NULL;
+ int i, ret = -1;
+ gf_boolean_t ns_weighted_queueing = _gf_false;
conf = this->private;
if (!conf)
goto out;
+ GF_OPTION_RECONF ("iam-brick-daemon", conf->iambrickd, options, bool, out);
+
GF_OPTION_RECONF ("thread-count", conf->max_count, options, int32, out);
GF_OPTION_RECONF ("fops-per-thread-ratio", conf->fops_per_thread_ratio,
@@ -1132,25 +1703,52 @@ reconfigure (xlator_t *this, dict_t *options)
GF_OPTION_RECONF ("least-prio-threads",
conf->ac_iot_limit[IOT_PRI_LEAST], options, int32,
out);
+
GF_OPTION_RECONF ("enable-least-priority", conf->least_priority,
options, bool, out);
- GF_OPTION_RECONF ("least-rate-limit", conf->throttle.rate_limit,
- options, int32, out);
-
GF_OPTION_RECONF ("cleanup-disconnected-reqs",
conf->cleanup_disconnected_reqs, options, bool, out);
GF_OPTION_RECONF ("watchdog-secs", conf->watchdog_secs, options,
int32, out);
-
if (conf->watchdog_secs > 0) {
start_iot_watchdog (this);
} else {
stop_iot_watchdog (this);
}
- ret = 0;
+ GF_OPTION_RECONF ("ns-weighted-queueing", ns_weighted_queueing, options, bool, out);
+ GF_OPTION_RECONF ("ns-default-weight", conf->ns_default_weight, options, uint32, out);
+ GF_OPTION_RECONF ("ns-weight-tolerance", conf->ns_weight_tolerance, options, double, out);
+ GF_OPTION_RECONF ("ns-conf-reinit-secs", conf->ns_conf_reinit_secs, options,
+ uint32, out);
+
+ if (!conf->iambrickd) {
+ ns_weighted_queueing = _gf_false;
+ }
+
+ /* Reinit the default weight, which is the weight of the unknown queue. */
+ for (i = 0; i < IOT_PRI_MAX; i++) {
+ conf->ns_unknown_queue[i].weight = conf->ns_default_weight;
+ }
+
+ if (ns_weighted_queueing != conf->ns_weighted_queueing) {
+ pthread_mutex_lock (&conf->mutex);
+ {
+ conf->ns_weighted_queueing = ns_weighted_queueing;
+ __iot_reinit_ns_conf (conf);
+ }
+ pthread_mutex_unlock (&conf->mutex);
+ }
+
+ if (conf->ns_weighted_queueing) {
+ start_iot_reinit_ns_conf_thread (this);
+ } else {
+ stop_iot_reinit_ns_conf_thread (this);
+ }
+
+ ret = 0;
out:
return ret;
}
@@ -1160,49 +1758,43 @@ int
init (xlator_t *this)
{
iot_conf_t *conf = NULL;
- int ret = -1;
- int i = 0;
+ int i, ret = -1;
if (!this->children || this->children->next) {
- gf_msg ("io-threads", GF_LOG_ERROR, 0,
- IO_THREADS_MSG_XLATOR_CHILD_MISCONFIGURED,
- "FATAL: iot not configured "
- "with exactly one child");
+ gf_log ("io-threads", GF_LOG_ERROR,
+ "FATAL: iot not configured with exactly one child");
goto out;
}
if (!this->parents) {
- gf_msg (this->name, GF_LOG_WARNING, 0,
- IO_THREADS_MSG_VOL_MISCONFIGURED,
- "dangling volume. check volfile ");
+ gf_log (this->name, GF_LOG_WARNING,
+ "dangling volume. check volfile ");
}
conf = (void *) GF_CALLOC (1, sizeof (*conf),
gf_iot_mt_iot_conf_t);
if (conf == NULL) {
- gf_msg (this->name, GF_LOG_ERROR, ENOMEM,
- IO_THREADS_MSG_NO_MEMORY, "out of memory");
+ gf_log (this->name, GF_LOG_ERROR,
+ "out of memory");
goto out;
}
- if ((ret = pthread_cond_init(&conf->cond, NULL)) != 0) {
- gf_msg (this->name, GF_LOG_ERROR, 0,
- IO_THREADS_MSG_INIT_FAILED,
+ if ((ret = pthread_cond_init (&conf->cond, NULL)) != 0) {
+ gf_log (this->name, GF_LOG_ERROR,
"pthread_cond_init failed (%d)", ret);
goto out;
}
- conf->cond_inited = _gf_true;
- if ((ret = pthread_mutex_init(&conf->mutex, NULL)) != 0) {
- gf_msg (this->name, GF_LOG_ERROR, 0,
- IO_THREADS_MSG_INIT_FAILED,
+ if ((ret = pthread_mutex_init (&conf->mutex, NULL)) != 0) {
+ gf_log (this->name, GF_LOG_ERROR,
"pthread_mutex_init failed (%d)", ret);
goto out;
}
- conf->mutex_inited = _gf_true;
set_stack_size (conf);
+ GF_OPTION_INIT ("iam-brick-daemon", conf->iambrickd, bool, out);
+
GF_OPTION_INIT ("thread-count", conf->max_count, int32, out);
GF_OPTION_INIT ("fops-per-thread-ratio", conf->fops_per_thread_ratio,
@@ -1228,33 +1820,45 @@ init (xlator_t *this)
GF_OPTION_INIT ("cleanup-disconnected-reqs",
conf->cleanup_disconnected_reqs, bool, out);
- GF_OPTION_INIT ("least-rate-limit", conf->throttle.rate_limit, int32,
- out);
+ conf->this = this;
+
+ GF_OPTION_INIT ("ns-weighted-queueing", conf->ns_weighted_queueing, bool, out);
+ GF_OPTION_INIT ("ns-default-weight", conf->ns_default_weight, uint32, out);
+ GF_OPTION_INIT ("ns-weight-tolerance", conf->ns_weight_tolerance, double, out);
+ GF_OPTION_INIT ("ns-conf-reinit-secs", conf->ns_conf_reinit_secs, uint32, out);
- if ((ret = pthread_mutex_init(&conf->throttle.lock, NULL)) != 0) {
- gf_msg (this->name, GF_LOG_ERROR, 0,
- IO_THREADS_MSG_INIT_FAILED,
- "pthread_mutex_init failed (%d)", ret);
- goto out;
+ for (i = 0; i < IOT_PRI_MAX; i++) {
+ INIT_LIST_HEAD (&conf->ns_unknown_queue[i].reqs);
+ conf->ns_unknown_queue[i].hash = 0;
+ conf->ns_unknown_queue[i].weight = conf->ns_default_weight;
+ conf->ns_unknown_queue[i].size = 0;
}
- conf->this = this;
+ if (!conf->iambrickd) {
+ conf->ns_weighted_queueing = _gf_false;
+ }
- for (i = 0; i < IOT_PRI_MAX; i++) {
- INIT_LIST_HEAD (&conf->reqs[i]);
+ conf->hash_to_ns = dict_new ();
+
+ pthread_mutex_lock (&conf->mutex);
+ {
+ __iot_reinit_ns_conf (conf);
}
+ pthread_mutex_unlock (&conf->mutex);
ret = iot_workers_scale (conf);
-
if (ret == -1) {
- gf_msg (this->name, GF_LOG_ERROR, 0,
- IO_THREADS_MSG_INIT_FAILED,
+ gf_log (this->name, GF_LOG_ERROR,
"cannot initialize worker threads, exiting init");
goto out;
}
this->private = conf;
+ if (conf->ns_weighted_queueing) {
+ start_iot_reinit_ns_conf_thread (this);
+ }
+
conf->watchdog_secs = 0;
GF_OPTION_INIT ("watchdog-secs", conf->watchdog_secs, int32, out);
if (conf->watchdog_secs > 0) {
@@ -1263,57 +1867,24 @@ init (xlator_t *this)
ret = 0;
out:
- if (ret)
+ if (ret) {
GF_FREE (conf);
+ }
return ret;
}
-static void
-iot_exit_threads (iot_conf_t *conf)
-{
- pthread_mutex_lock (&conf->mutex);
- {
- conf->down = _gf_true;
- /*Let all the threads know that xl is going down*/
- pthread_cond_broadcast (&conf->cond);
- while (conf->curr_count)/*Wait for threads to exit*/
- pthread_cond_wait (&conf->cond, &conf->mutex);
- }
- pthread_mutex_unlock (&conf->mutex);
-}
-
-int
-notify (xlator_t *this, int32_t event, void *data, ...)
-{
- iot_conf_t *conf = this->private;
-
- if (GF_EVENT_PARENT_DOWN == event)
- iot_exit_threads (conf);
-
- default_notify (this, event, data);
-
- return 0;
-}
void
fini (xlator_t *this)
{
iot_conf_t *conf = this->private;
- if (!conf)
- return;
-
- if (conf->mutex_inited && conf->cond_inited)
- iot_exit_threads (conf);
-
- if (conf->cond_inited)
- pthread_cond_destroy (&conf->cond);
-
- if (conf->mutex_inited)
- pthread_mutex_destroy (&conf->mutex);
-
stop_iot_watchdog (this);
+ stop_iot_reinit_ns_conf_thread (this);
+
+ dict_unref (conf->hash_to_ns);
+ conf->hash_to_ns = NULL;
GF_FREE (conf);
@@ -1321,13 +1892,32 @@ fini (xlator_t *this)
return;
}
+/* Clears a queue of requests from the given client (which is assumed to
+ * have disconnected or otherwise stopped needing these requests...) */
+void
+clear_reqs_from_queue (xlator_t *this, client_t *client, struct list_head *queue)
+{
+ call_stub_t *curr;
+ call_stub_t *next;
+
+ list_for_each_entry_safe (curr, next, queue, list) {
+ if (curr->frame->root->client != client) {
+ continue;
+ }
+ gf_log (this->name, GF_LOG_INFO,
+ "poisoning %s fop at %p for client %s",
+ gf_fop_list[curr->fop], curr, client->client_uid);
+ curr->poison = _gf_true;
+ }
+}
+
static int
iot_disconnect_cbk (xlator_t *this, client_t *client)
{
+ iot_conf_t *conf = this->private;
+ data_pair_t *curr = NULL;
+ iot_ns_queue_t *curr_queue = NULL;
int i;
- call_stub_t *curr;
- call_stub_t *next;
- iot_conf_t *conf = this->private;
if (!conf || !conf->cleanup_disconnected_reqs) {
goto out;
@@ -1335,15 +1925,15 @@ iot_disconnect_cbk (xlator_t *this, client_t *client)
pthread_mutex_lock (&conf->mutex);
for (i = 0; i < IOT_PRI_MAX; i++) {
- list_for_each_entry_safe (curr, next, &conf->reqs[i], list) {
- if (curr->frame->root->client != client) {
- continue;
+ /* Clear client reqs from the unknown queue. */
+ clear_reqs_from_queue (this, client, &conf->ns_unknown_queue[i].reqs);
+ /* Clear client reqs from each of the namespace queues. */
+ if (conf->ns_weighted_queueing && conf->ns_queues) {
+ dict_foreach_inline (conf->ns_queues, curr) {
+ curr_queue = data_to_ptr (curr->value);
+ curr_queue = &curr_queue[i];
+ clear_reqs_from_queue (this, client, &curr_queue->reqs);
}
- gf_log (this->name, GF_LOG_INFO,
- "poisoning %s fop at %p for client %s",
- gf_fop_list[curr->fop], curr,
- client->client_uid);
- curr->poison = _gf_true;
}
}
pthread_mutex_unlock (&conf->mutex);
@@ -1412,7 +2002,7 @@ struct volume_options options[] = {
.type = GF_OPTION_TYPE_INT,
.min = IOT_MIN_THREADS,
.max = IOT_MAX_THREADS,
- .default_value = "16",
+ .default_value = "32",
.description = "Number of threads in IO threads translator which "
"perform concurrent IO operations"
@@ -1435,7 +2025,7 @@ struct volume_options options[] = {
.type = GF_OPTION_TYPE_INT,
.min = IOT_MIN_THREADS,
.max = IOT_MAX_THREADS,
- .default_value = "16",
+ .default_value = "32",
.description = "Max number of threads in IO threads translator which "
"perform high priority IO operations at a given time"
@@ -1444,7 +2034,7 @@ struct volume_options options[] = {
.type = GF_OPTION_TYPE_INT,
.min = IOT_MIN_THREADS,
.max = IOT_MAX_THREADS,
- .default_value = "16",
+ .default_value = "32",
.description = "Max number of threads in IO threads translator which "
"perform normal priority IO operations at a given time"
@@ -1453,7 +2043,7 @@ struct volume_options options[] = {
.type = GF_OPTION_TYPE_INT,
.min = IOT_MIN_THREADS,
.max = IOT_MAX_THREADS,
- .default_value = "16",
+ .default_value = "32",
.description = "Max number of threads in IO threads translator which "
"perform low priority IO operations at a given time"
@@ -1477,14 +2067,6 @@ struct volume_options options[] = {
.max = 0x7fffffff,
.default_value = "120",
},
- { .key = {"least-rate-limit"},
- .type = GF_OPTION_TYPE_INT,
- .min = 0,
- .max = INT_MAX,
- .default_value = "0",
- .description = "Max number of least priority operations to handle "
- "per-second"
- },
{ .key = {"watchdog-secs"},
.type = GF_OPTION_TYPE_INT,
.min = 0,
@@ -1495,7 +2077,47 @@ struct volume_options options[] = {
{ .key = {"cleanup-disconnected-reqs"},
.type = GF_OPTION_TYPE_BOOL,
.default_value = "off",
- .description = "'Poison' queued requests when a client disconnects"
+ .description = "Enable/Disable least priority"
+ },
+ { .key = {"ns-weighted-queueing"},
+ .type = GF_OPTION_TYPE_BOOL,
+ .default_value = "off",
+ .description = "Enables the namespace queues clock."
+ },
+ { .key = {"ns-default-weight"},
+ .type = GF_OPTION_TYPE_INT,
+ .min = 1,
+ .max = 0x7fffffff,
+ .default_value = "100",
+ .description = "The default weight of a queue which doesn't have a "
+ "weight specified in the namespace conf. This is also "
+ "the weight of the unknown (default) queue."
+ },
+ { .key = {"ns-weight-tolerance"},
+ .type = GF_OPTION_TYPE_DOUBLE,
+ .default_value = "0.5",
+ .description = "The tolerance in percentage (out of 100) that the "
+ "slot-allocation algorithm will tolerate for weight/total "
+ "and slots/total percentages. This corresponds to "
+ "ideal and realized workload percentages allocated "
+ "to the namespace."
+ },
+ { .key = {"ns-conf-reinit-secs"},
+ .type = GF_OPTION_TYPE_INT,
+ .min = 1,
+ .max = 0x7fffffff,
+ .default_value = "5",
+ .description = "Number of seconds that the ns conf reinit thread "
+ "sleeps before trying to detect changes in the "
+ "namespace config file."
+ },
+ {
+ .key = {"iam-brick-daemon"},
+ .type = GF_OPTION_TYPE_BOOL,
+ .default_value = "off",
+ .description = "This option differentiates if the io-stats "
+ "translator is running as part of brick daemon "
+ "or not."
},
{ .key = {NULL},
},