summaryrefslogtreecommitdiffstats
path: root/libglusterfs/src/syncop.c
diff options
context:
space:
mode:
Diffstat (limited to 'libglusterfs/src/syncop.c')
-rw-r--r--libglusterfs/src/syncop.c232
1 files changed, 184 insertions, 48 deletions
diff --git a/libglusterfs/src/syncop.c b/libglusterfs/src/syncop.c
index 712e5b1f239..096c29efe0a 100644
--- a/libglusterfs/src/syncop.c
+++ b/libglusterfs/src/syncop.c
@@ -39,42 +39,76 @@ syncop_create_frame ()
return (call_frame_t *)frame;
}
-void
-synctask_yield (struct synctask *task)
+
+static void
+__run (struct synctask *task)
{
- struct syncenv *env = NULL;
+ struct syncenv *env = NULL;
env = task->env;
- if (swapcontext (&task->ctx, &env->sched) < 0) {
- gf_log ("syncop", GF_LOG_ERROR,
- "swapcontext failed (%s)", strerror (errno));
- }
-}
-
-
-void
-synctask_yawn (struct synctask *task)
+ list_del_init (&task->all_tasks);
+ switch (task->state) {
+ case SYNCTASK_INIT:
+ break;
+ case SYNCTASK_RUN:
+ gf_log (task->xl->name, GF_LOG_WARNING,
+ "re-running already running task");
+ env->runcount--;
+ break;
+ case SYNCTASK_WAIT:
+ env->waitcount--;
+ break;
+ case SYNCTASK_DONE:
+ gf_log (task->xl->name, GF_LOG_WARNING,
+ "running completed task");
+ break;
+ }
+
+ list_add_tail (&task->all_tasks, &env->runq);
+ env->runcount++;
+ task->state = SYNCTASK_RUN;
+}
+
+
+static void
+__wait (struct synctask *task)
{
- struct syncenv *env = NULL;
+ struct syncenv *env = NULL;
- env = task->env;
+ env = task->env;
- pthread_mutex_lock (&env->mutex);
- {
- list_del_init (&task->all_tasks);
- list_add (&task->all_tasks, &env->waitq);
- }
- pthread_mutex_unlock (&env->mutex);
+ list_del_init (&task->all_tasks);
+ switch (task->state) {
+ case SYNCTASK_INIT:
+ break;
+ case SYNCTASK_RUN:
+ env->runcount--;
+ break;
+ case SYNCTASK_WAIT:
+ gf_log (task->xl->name, GF_LOG_WARNING,
+ "re-waiting already waiting task");
+ env->waitcount--;
+ break;
+ case SYNCTASK_DONE:
+ gf_log (task->xl->name, GF_LOG_WARNING,
+ "running completed task");
+ break;
+ }
+
+ list_add_tail (&task->all_tasks, &env->waitq);
+ env->waitcount++;
+ task->state = SYNCTASK_WAIT;
}
void
-synctask_zzzz (struct synctask *task)
+synctask_yield (struct synctask *task)
{
- synctask_yawn (task);
-
- synctask_yield (task);
+ if (swapcontext (&task->ctx, &task->proc->sched) < 0) {
+ gf_log ("syncop", GF_LOG_ERROR,
+ "swapcontext failed (%s)", strerror (errno));
+ }
}
@@ -87,8 +121,10 @@ synctask_wake (struct synctask *task)
pthread_mutex_lock (&env->mutex);
{
- list_del_init (&task->all_tasks);
- list_add_tail (&task->all_tasks, &env->runq);
+ task->woken = 1;
+
+ if (task->slept)
+ __run (task);
}
pthread_mutex_unlock (&env->mutex);
@@ -99,21 +135,17 @@ synctask_wake (struct synctask *task)
void
synctask_wrap (struct synctask *old_task)
{
- int ret;
struct synctask *task = NULL;
/* Do not trust the pointer received. It may be
wrong and can lead to crashes. */
task = synctask_get ();
- ret = task->syncfn (task->opaque);
- task->synccbk (ret, task->frame, task->opaque);
+ task->ret = task->syncfn (task->opaque);
+ if (task->synccbk)
+ task->synccbk (task->ret, task->frame, task->opaque);
- /* cannot destroy @task right here as we are
- in the execution stack of @task itself
- */
- task->complete = 1;
- synctask_wake (task);
+ task->state = SYNCTASK_DONE;
synctask_yield (task);
}
@@ -127,20 +159,42 @@ synctask_destroy (struct synctask *task)
if (task->stack)
FREE (task->stack);
+
+ pthread_mutex_destroy (&task->mutex);
+
+ pthread_cond_destroy (&task->cond);
+
FREE (task);
}
+void
+synctask_done (struct synctask *task)
+{
+ if (task->synccbk) {
+ synctask_destroy (task);
+ return;
+ }
+
+ pthread_mutex_lock (&task->mutex);
+ {
+ task->done = 1;
+ pthread_cond_broadcast (&task->cond);
+ }
+ pthread_mutex_unlock (&task->mutex);
+}
+
+
int
synctask_new (struct syncenv *env, synctask_fn_t fn, synctask_cbk_t cbk,
call_frame_t *frame, void *opaque)
{
struct synctask *newtask = NULL;
xlator_t *this = THIS;
+ int ret = 0;
VALIDATE_OR_GOTO (env, err);
VALIDATE_OR_GOTO (fn, err);
- VALIDATE_OR_GOTO (cbk, err);
VALIDATE_OR_GOTO (frame, err);
newtask = CALLOC (1, sizeof (*newtask));
@@ -150,7 +204,7 @@ synctask_new (struct syncenv *env, synctask_fn_t fn, synctask_cbk_t cbk,
newtask->env = env;
newtask->xl = this;
newtask->syncfn = fn;
- newtask->synccbk = cbk;
+ newtask->synccbk = cbk;
newtask->opaque = opaque;
newtask->frame = frame;
@@ -175,9 +229,33 @@ synctask_new (struct syncenv *env, synctask_fn_t fn, synctask_cbk_t cbk,
makecontext (&newtask->ctx, (void *) synctask_wrap, 2, newtask);
+ newtask->state = SYNCTASK_INIT;
+
+ newtask->slept = 1;
+
+ if (!cbk) {
+ pthread_mutex_init (&newtask->mutex, NULL);
+ pthread_cond_init (&newtask->cond, NULL);
+ newtask->done = 0;
+ }
+
synctask_wake (newtask);
- return 0;
+ if (!cbk) {
+ pthread_mutex_lock (&newtask->mutex);
+ {
+ while (!newtask->done) {
+ pthread_cond_wait (&newtask->cond, &newtask->mutex);
+ }
+ }
+ pthread_mutex_unlock (&newtask->mutex);
+
+ ret = newtask->ret;
+
+ synctask_destroy (newtask);
+ }
+
+ return ret;
err:
if (newtask) {
if (newtask->stack)
@@ -189,10 +267,13 @@ err:
struct synctask *
-syncenv_task (struct syncenv *env)
+syncenv_task (struct syncproc *proc)
{
+ struct syncenv *env = NULL;
struct synctask *task = NULL;
+ env = proc->env;
+
pthread_mutex_lock (&env->mutex);
{
while (list_empty (&env->runq))
@@ -201,6 +282,9 @@ syncenv_task (struct syncenv *env)
task = list_entry (env->runq.next, struct synctask, all_tasks);
list_del_init (&task->all_tasks);
+ env->runcount--;
+
+ task->proc = proc;
}
pthread_mutex_unlock (&env->mutex);
@@ -218,10 +302,29 @@ synctask_switchto (struct synctask *task)
synctask_set (task);
THIS = task->xl;
- if (swapcontext (&env->sched, &task->ctx) < 0) {
+ task->woken = 0;
+ task->slept = 0;
+
+ if (swapcontext (&task->proc->sched, &task->ctx) < 0) {
gf_log ("syncop", GF_LOG_ERROR,
"swapcontext failed (%s)", strerror (errno));
}
+
+ if (task->state == SYNCTASK_DONE) {
+ synctask_done (task);
+ return;
+ }
+
+ pthread_mutex_lock (&env->mutex);
+ {
+ if (task->woken) {
+ __run (task);
+ } else {
+ task->slept = 1;
+ __wait (task);
+ }
+ }
+ pthread_mutex_unlock (&env->mutex);
}
@@ -229,19 +332,18 @@ void *
syncenv_processor (void *thdata)
{
struct syncenv *env = NULL;
+ struct syncproc *proc = NULL;
struct synctask *task = NULL;
- env = thdata;
+ proc = thdata;
+ env = proc->env;
for (;;) {
- task = syncenv_task (env);
-
- if (task->complete) {
- synctask_destroy (task);
- continue;
- }
+ task = syncenv_task (proc);
synctask_switchto (task);
+
+ syncenv_scale (env);
}
return NULL;
@@ -249,6 +351,33 @@ syncenv_processor (void *thdata)
void
+syncenv_scale (struct syncenv *env)
+{
+ int thmax = 0;
+ int i = 0;
+ int ret = 0;
+
+ pthread_mutex_lock (&env->mutex);
+ {
+ if (env->procs > env->runcount)
+ goto unlock;
+
+ thmax = max (env->runcount, SYNCENV_PROC_MAX);
+ for (i = env->procs; i < thmax; i++) {
+ env->proc[i].env = env;
+ ret = pthread_create (&env->proc[i].processor, NULL,
+ syncenv_processor, &env->proc[i]);
+ if (ret)
+ break;
+ env->procs++;
+ }
+ }
+unlock:
+ pthread_mutex_unlock (&env->mutex);
+}
+
+
+void
syncenv_destroy (struct syncenv *env)
{
@@ -260,6 +389,7 @@ syncenv_new (size_t stacksize)
{
struct syncenv *newenv = NULL;
int ret = 0;
+ int i = 0;
newenv = CALLOC (1, sizeof (*newenv));
@@ -276,8 +406,14 @@ syncenv_new (size_t stacksize)
if (stacksize)
newenv->stacksize = stacksize;
- ret = pthread_create (&newenv->processor, NULL,
- syncenv_processor, newenv);
+ for (i = 0; i < SYNCENV_PROC_MIN; i++) {
+ newenv->proc[i].env = newenv;
+ ret = pthread_create (&newenv->proc[i].processor, NULL,
+ syncenv_processor, &newenv->proc[i]);
+ if (ret)
+ break;
+ newenv->procs++;
+ }
if (ret != 0)
syncenv_destroy (newenv);