summaryrefslogtreecommitdiffstats
path: root/libglusterfs/src/throttle-tbf.c
diff options
context:
space:
mode:
Diffstat (limited to 'libglusterfs/src/throttle-tbf.c')
-rw-r--r--libglusterfs/src/throttle-tbf.c375
1 files changed, 187 insertions, 188 deletions
diff --git a/libglusterfs/src/throttle-tbf.c b/libglusterfs/src/throttle-tbf.c
index a425166b681..9519defa37f 100644
--- a/libglusterfs/src/throttle-tbf.c
+++ b/libglusterfs/src/throttle-tbf.c
@@ -27,93 +27,93 @@
#include "throttle-tbf.h"
typedef struct tbf_throttle {
- char done;
+ char done;
- pthread_mutex_t mutex;
- pthread_cond_t cond;
+ pthread_mutex_t mutex;
+ pthread_cond_t cond;
- unsigned long tokens;
+ unsigned long tokens;
- struct list_head list;
+ struct list_head list;
} tbf_throttle_t;
static tbf_throttle_t *
-tbf_init_throttle (unsigned long tokens_required)
+tbf_init_throttle(unsigned long tokens_required)
{
- tbf_throttle_t *throttle = NULL;
+ tbf_throttle_t *throttle = NULL;
- throttle = GF_CALLOC (1, sizeof (*throttle),
- gf_common_mt_tbf_throttle_t);
- if (!throttle)
- return NULL;
+ throttle = GF_CALLOC(1, sizeof(*throttle), gf_common_mt_tbf_throttle_t);
+ if (!throttle)
+ return NULL;
- throttle->done = 0;
- throttle->tokens = tokens_required;
- INIT_LIST_HEAD (&throttle->list);
+ throttle->done = 0;
+ throttle->tokens = tokens_required;
+ INIT_LIST_HEAD(&throttle->list);
- (void) pthread_mutex_init (&throttle->mutex, NULL);
- (void) pthread_cond_init (&throttle->cond, NULL);
+ (void)pthread_mutex_init(&throttle->mutex, NULL);
+ (void)pthread_cond_init(&throttle->cond, NULL);
- return throttle;
+ return throttle;
}
void
-_tbf_dispatch_queued (tbf_bucket_t *bucket)
+_tbf_dispatch_queued(tbf_bucket_t *bucket)
{
- gf_boolean_t xcont = _gf_false;
- tbf_throttle_t *tmp = NULL;
- tbf_throttle_t *throttle = NULL;
-
- list_for_each_entry_safe (throttle, tmp, &bucket->queued, list) {
-
- pthread_mutex_lock (&throttle->mutex);
- {
- if (bucket->tokens < throttle->tokens) {
- xcont = _gf_true;
- goto unblock;
- }
-
- /* this request can now be serviced */
- throttle->done = 1;
- list_del_init (&throttle->list);
-
- bucket->tokens -= throttle->tokens;
- pthread_cond_signal (&throttle->cond);
- }
- unblock:
- pthread_mutex_unlock (&throttle->mutex);
- if (xcont)
- break;
+ gf_boolean_t xcont = _gf_false;
+ tbf_throttle_t *tmp = NULL;
+ tbf_throttle_t *throttle = NULL;
+
+ list_for_each_entry_safe(throttle, tmp, &bucket->queued, list)
+ {
+ pthread_mutex_lock(&throttle->mutex);
+ {
+ if (bucket->tokens < throttle->tokens) {
+ xcont = _gf_true;
+ goto unblock;
+ }
+
+ /* this request can now be serviced */
+ throttle->done = 1;
+ list_del_init(&throttle->list);
+
+ bucket->tokens -= throttle->tokens;
+ pthread_cond_signal(&throttle->cond);
}
+ unblock:
+ pthread_mutex_unlock(&throttle->mutex);
+ if (xcont)
+ break;
+ }
}
-void *tbf_tokengenerator (void *arg)
+void *
+tbf_tokengenerator(void *arg)
{
- unsigned long tokenrate = 0;
- unsigned long maxtokens = 0;
- unsigned long token_gen_interval = 0;
- tbf_bucket_t *bucket = arg;
-
- tokenrate = bucket->tokenrate;
- maxtokens = bucket->maxtokens;
- token_gen_interval = bucket->token_gen_interval;
-
- while (1) {
- usleep (token_gen_interval);
-
- LOCK (&bucket->lock);
- {
- bucket->tokens += tokenrate;
- if (bucket->tokens > maxtokens)
- bucket->tokens = maxtokens;
-
- if (!list_empty (&bucket->queued))
- _tbf_dispatch_queued (bucket);
- }
- UNLOCK (&bucket->lock);
+ unsigned long tokenrate = 0;
+ unsigned long maxtokens = 0;
+ unsigned long token_gen_interval = 0;
+ tbf_bucket_t *bucket = arg;
+
+ tokenrate = bucket->tokenrate;
+ maxtokens = bucket->maxtokens;
+ token_gen_interval = bucket->token_gen_interval;
+
+ while (1) {
+ usleep(token_gen_interval);
+
+ LOCK(&bucket->lock);
+ {
+ bucket->tokens += tokenrate;
+ if (bucket->tokens > maxtokens)
+ bucket->tokens = maxtokens;
+
+ if (!list_empty(&bucket->queued))
+ _tbf_dispatch_queued(bucket);
}
+ UNLOCK(&bucket->lock);
+ }
- return NULL;
+ return NULL;
}
/**
@@ -122,170 +122,169 @@ void *tbf_tokengenerator (void *arg)
* updated _after_ all the required variables are initialized.
*/
static int32_t
-tbf_init_bucket (tbf_t *tbf, tbf_opspec_t *spec)
+tbf_init_bucket(tbf_t *tbf, tbf_opspec_t *spec)
{
- int ret = 0;
- tbf_bucket_t *curr = NULL;
- tbf_bucket_t **bucket = NULL;
+ int ret = 0;
+ tbf_bucket_t *curr = NULL;
+ tbf_bucket_t **bucket = NULL;
- GF_ASSERT (spec->op >= TBF_OP_MIN);
- GF_ASSERT (spec->op <= TBF_OP_MAX);
+ GF_ASSERT(spec->op >= TBF_OP_MIN);
+ GF_ASSERT(spec->op <= TBF_OP_MAX);
- /* no rate? no throttling. */
- if (!spec->rate)
- return 0;
+ /* no rate? no throttling. */
+ if (!spec->rate)
+ return 0;
- bucket = tbf->bucket + spec->op;
+ bucket = tbf->bucket + spec->op;
- curr = GF_CALLOC (1, sizeof (*curr), gf_common_mt_tbf_bucket_t);
- if (!curr)
- goto error_return;
+ curr = GF_CALLOC(1, sizeof(*curr), gf_common_mt_tbf_bucket_t);
+ if (!curr)
+ goto error_return;
- LOCK_INIT (&curr->lock);
- INIT_LIST_HEAD (&curr->queued);
+ LOCK_INIT(&curr->lock);
+ INIT_LIST_HEAD(&curr->queued);
- curr->tokens = 0;
- curr->tokenrate = spec->rate;
- curr->maxtokens = spec->maxlimit;
- curr->token_gen_interval = spec->token_gen_interval;
+ curr->tokens = 0;
+ curr->tokenrate = spec->rate;
+ curr->maxtokens = spec->maxlimit;
+ curr->token_gen_interval = spec->token_gen_interval;
- ret = gf_thread_create (&curr->tokener,
- NULL, tbf_tokengenerator, curr, "tbfclock");
- if (ret != 0)
- goto freemem;
+ ret = gf_thread_create(&curr->tokener, NULL, tbf_tokengenerator, curr,
+ "tbfclock");
+ if (ret != 0)
+ goto freemem;
- *bucket = curr;
- return 0;
+ *bucket = curr;
+ return 0;
- freemem:
- LOCK_DESTROY (&curr->lock);
- GF_FREE (curr);
- error_return:
- return -1;
+freemem:
+ LOCK_DESTROY(&curr->lock);
+ GF_FREE(curr);
+error_return:
+ return -1;
}
-#define TBF_ALLOC_SIZE \
- (sizeof (tbf_t) + (TBF_OP_MAX * sizeof (tbf_bucket_t)))
+#define TBF_ALLOC_SIZE (sizeof(tbf_t) + (TBF_OP_MAX * sizeof(tbf_bucket_t)))
tbf_t *
-tbf_init (tbf_opspec_t *tbfspec, unsigned int count)
+tbf_init(tbf_opspec_t *tbfspec, unsigned int count)
{
- int32_t i = 0;
- int32_t ret = 0;
- tbf_t *tbf = NULL;
- tbf_opspec_t *opspec = NULL;
-
- tbf = GF_CALLOC (1, TBF_ALLOC_SIZE, gf_common_mt_tbf_t);
- if (!tbf)
- goto error_return;
-
- tbf->bucket = (tbf_bucket_t **) ((char *)tbf + sizeof (*tbf));
- for (i = 0; i < TBF_OP_MAX; i++) {
- *(tbf->bucket + i) = NULL;
- }
+ int32_t i = 0;
+ int32_t ret = 0;
+ tbf_t *tbf = NULL;
+ tbf_opspec_t *opspec = NULL;
- for (i = 0; i < count; i++) {
- opspec = tbfspec + i;
+ tbf = GF_CALLOC(1, TBF_ALLOC_SIZE, gf_common_mt_tbf_t);
+ if (!tbf)
+ goto error_return;
- ret = tbf_init_bucket (tbf, opspec);
- if (ret)
- break;
- }
+ tbf->bucket = (tbf_bucket_t **)((char *)tbf + sizeof(*tbf));
+ for (i = 0; i < TBF_OP_MAX; i++) {
+ *(tbf->bucket + i) = NULL;
+ }
+ for (i = 0; i < count; i++) {
+ opspec = tbfspec + i;
+
+ ret = tbf_init_bucket(tbf, opspec);
if (ret)
- goto error_return;
+ break;
+ }
- return tbf;
+ if (ret)
+ goto error_return;
- error_return:
- return NULL;
+ return tbf;
+
+error_return:
+ return NULL;
}
static void
-tbf_mod_bucket (tbf_bucket_t *bucket, tbf_opspec_t *spec)
+tbf_mod_bucket(tbf_bucket_t *bucket, tbf_opspec_t *spec)
{
- LOCK (&bucket->lock);
- {
- bucket->tokens = 0;
- bucket->tokenrate = spec->rate;
- bucket->maxtokens = spec->maxlimit;
- }
- UNLOCK (&bucket->lock);
-
- /* next token tick would unqueue pending operations */
+ LOCK(&bucket->lock);
+ {
+ bucket->tokens = 0;
+ bucket->tokenrate = spec->rate;
+ bucket->maxtokens = spec->maxlimit;
+ }
+ UNLOCK(&bucket->lock);
+
+ /* next token tick would unqueue pending operations */
}
int
-tbf_mod (tbf_t *tbf, tbf_opspec_t *tbfspec)
+tbf_mod(tbf_t *tbf, tbf_opspec_t *tbfspec)
{
- int ret = 0;
- tbf_bucket_t *bucket = NULL;
- tbf_ops_t op = TBF_OP_MIN;
+ int ret = 0;
+ tbf_bucket_t *bucket = NULL;
+ tbf_ops_t op = TBF_OP_MIN;
- if (!tbf || !tbfspec)
- return -1;
+ if (!tbf || !tbfspec)
+ return -1;
- op = tbfspec->op;
+ op = tbfspec->op;
- GF_ASSERT (op >= TBF_OP_MIN);
- GF_ASSERT (op <= TBF_OP_MAX);
+ GF_ASSERT(op >= TBF_OP_MIN);
+ GF_ASSERT(op <= TBF_OP_MAX);
- bucket = *(tbf->bucket + op);
- if (bucket) {
- tbf_mod_bucket (bucket, tbfspec);
- } else {
- ret = tbf_init_bucket (tbf, tbfspec);
- }
+ bucket = *(tbf->bucket + op);
+ if (bucket) {
+ tbf_mod_bucket(bucket, tbfspec);
+ } else {
+ ret = tbf_init_bucket(tbf, tbfspec);
+ }
- return ret;
+ return ret;
}
void
-tbf_throttle (tbf_t *tbf, tbf_ops_t op, unsigned long tokens_requested)
+tbf_throttle(tbf_t *tbf, tbf_ops_t op, unsigned long tokens_requested)
{
- char waitq = 0;
- tbf_bucket_t *bucket = NULL;
- tbf_throttle_t *throttle = NULL;
-
- GF_ASSERT (op >= TBF_OP_MIN);
- GF_ASSERT (op <= TBF_OP_MAX);
-
- bucket = *(tbf->bucket + op);
- if (!bucket)
- return;
+ char waitq = 0;
+ tbf_bucket_t *bucket = NULL;
+ tbf_throttle_t *throttle = NULL;
+
+ GF_ASSERT(op >= TBF_OP_MIN);
+ GF_ASSERT(op <= TBF_OP_MAX);
+
+ bucket = *(tbf->bucket + op);
+ if (!bucket)
+ return;
+
+ LOCK(&bucket->lock);
+ {
+ /**
+ * if there are enough tokens in the bucket there is no need
+ * to throttle the request: therefore, consume the required
+ * number of tokens and continue.
+ */
+ if (tokens_requested <= bucket->tokens) {
+ bucket->tokens -= tokens_requested;
+ } else {
+ throttle = tbf_init_throttle(tokens_requested);
+ if (!throttle) /* let it slip through for now.. */
+ goto unblock;
- LOCK (&bucket->lock);
- {
- /**
- * if there are enough tokens in the bucket there is no need
- * to throttle the request: therefore, consume the required
- * number of tokens and continue.
- */
- if (tokens_requested <= bucket->tokens) {
- bucket->tokens -= tokens_requested;
- } else {
- throttle = tbf_init_throttle (tokens_requested);
- if (!throttle) /* let it slip through for now.. */
- goto unblock;
-
- waitq = 1;
- pthread_mutex_lock (&throttle->mutex);
- list_add_tail (&throttle->list, &bucket->queued);
- }
+ waitq = 1;
+ pthread_mutex_lock(&throttle->mutex);
+ list_add_tail(&throttle->list, &bucket->queued);
}
- unblock:
- UNLOCK (&bucket->lock);
+ }
+unblock:
+ UNLOCK(&bucket->lock);
- if (waitq) {
- while (!throttle->done) {
- pthread_cond_wait (&throttle->cond, &throttle->mutex);
- }
+ if (waitq) {
+ while (!throttle->done) {
+ pthread_cond_wait(&throttle->cond, &throttle->mutex);
+ }
- pthread_mutex_unlock (&throttle->mutex);
+ pthread_mutex_unlock(&throttle->mutex);
- pthread_mutex_destroy (&throttle->mutex);
- pthread_cond_destroy (&throttle->cond);
+ pthread_mutex_destroy(&throttle->mutex);
+ pthread_cond_destroy(&throttle->cond);
- GF_FREE (throttle);
- }
+ GF_FREE(throttle);
+ }
}