diff options
| author | Anand Avati <avati@redhat.com> | 2012-02-21 09:25:14 +0530 | 
|---|---|---|
| committer | Anand Avati <avati@redhat.com> | 2012-02-20 21:12:08 -0800 | 
| commit | 1206437fcfc1f3e1bd4a6faec3341c240bae5cf2 (patch) | |
| tree | aec03c585583007ee57d3053b62dfe40e06700ef /libglusterfs/src/syncop.c | |
| parent | dfc88bf3727fb33e2fc273bd7f24401e0209f39e (diff) | |
syncop: Multi-processor support in syncenv
This patch introduces:
- multithreading of syncop processors permitting synctasks to be executed
  concurrently if the runqueue has many tasks.
- Auto scaling of syncop processors based on runqueue length.
- Execute a synctask (synctask_new) in a blocking way if callback function
  is set NULL. The return value of the syncfn will be the return value
  of synctask_new()
Change-Id: Iff369709af9adfd07be3386842876a24e1a5a9b5
BUG: 763820
Reviewed-on: http://review.gluster.com/443
Tested-by: Gluster Build System <jenkins@build.gluster.com>
Reviewed-by: Anand Avati <avati@redhat.com>
Diffstat (limited to 'libglusterfs/src/syncop.c')
| -rw-r--r-- | libglusterfs/src/syncop.c | 232 | 
1 files changed, 184 insertions, 48 deletions
diff --git a/libglusterfs/src/syncop.c b/libglusterfs/src/syncop.c index 712e5b1f239..096c29efe0a 100644 --- a/libglusterfs/src/syncop.c +++ b/libglusterfs/src/syncop.c @@ -39,42 +39,76 @@ syncop_create_frame ()          return (call_frame_t *)frame;  } -void -synctask_yield (struct synctask *task) + +static void +__run (struct synctask *task)  { -        struct syncenv   *env = NULL; +        struct syncenv *env = NULL;          env = task->env; -        if (swapcontext (&task->ctx, &env->sched) < 0) { -                gf_log ("syncop", GF_LOG_ERROR, -                        "swapcontext failed (%s)", strerror (errno)); -        } -} - - -void -synctask_yawn (struct synctask *task) +        list_del_init (&task->all_tasks); +	switch (task->state) { +	case SYNCTASK_INIT: +		break; +	case SYNCTASK_RUN: +		gf_log (task->xl->name, GF_LOG_WARNING, +			"re-running already running task"); +		env->runcount--; +		break; +	case SYNCTASK_WAIT: +		env->waitcount--; +		break; +	case SYNCTASK_DONE: +		gf_log (task->xl->name, GF_LOG_WARNING, +			"running completed task"); +		break; +	} + +        list_add_tail (&task->all_tasks, &env->runq); +	env->runcount++; +	task->state = SYNCTASK_RUN; +} + + +static void +__wait (struct synctask *task)  { -        struct syncenv  *env = NULL; +        struct syncenv *env = NULL; -        env  = task->env; +        env = task->env; -        pthread_mutex_lock (&env->mutex); -        { -                list_del_init (&task->all_tasks); -                list_add (&task->all_tasks, &env->waitq); -        } -        pthread_mutex_unlock (&env->mutex); +        list_del_init (&task->all_tasks); +	switch (task->state) { +	case SYNCTASK_INIT: +		break; +	case SYNCTASK_RUN: +		env->runcount--; +		break; +	case SYNCTASK_WAIT: +		gf_log (task->xl->name, GF_LOG_WARNING, +			"re-waiting already waiting task"); +		env->waitcount--; +		break; +	case SYNCTASK_DONE: +		gf_log (task->xl->name, GF_LOG_WARNING, +			"running completed task"); +		break; +	} + +        list_add_tail (&task->all_tasks, &env->waitq); +	env->waitcount++; +	task->state = SYNCTASK_WAIT;  }  void -synctask_zzzz (struct synctask *task) +synctask_yield (struct synctask *task)  { -        synctask_yawn (task); - -        synctask_yield (task); +        if (swapcontext (&task->ctx, &task->proc->sched) < 0) { +                gf_log ("syncop", GF_LOG_ERROR, +                        "swapcontext failed (%s)", strerror (errno)); +        }  } @@ -87,8 +121,10 @@ synctask_wake (struct synctask *task)          pthread_mutex_lock (&env->mutex);          { -                list_del_init (&task->all_tasks); -                list_add_tail (&task->all_tasks, &env->runq); +                task->woken = 1; + +                if (task->slept) +                        __run (task);          }          pthread_mutex_unlock (&env->mutex); @@ -99,21 +135,17 @@ synctask_wake (struct synctask *task)  void  synctask_wrap (struct synctask *old_task)  { -        int              ret;          struct synctask *task = NULL;          /* Do not trust the pointer received. It may be             wrong and can lead to crashes. */          task = synctask_get (); -        ret = task->syncfn (task->opaque); -        task->synccbk (ret, task->frame, task->opaque); +        task->ret = task->syncfn (task->opaque); +	if (task->synccbk) +		task->synccbk (task->ret, task->frame, task->opaque); -        /* cannot destroy @task right here as we are -           in the execution stack of @task itself -        */ -        task->complete = 1; -        synctask_wake (task); +        task->state = SYNCTASK_DONE;          synctask_yield (task);  } @@ -127,20 +159,42 @@ synctask_destroy (struct synctask *task)          if (task->stack)                  FREE (task->stack); + +	pthread_mutex_destroy (&task->mutex); + +	pthread_cond_destroy (&task->cond); +          FREE (task);  } +void +synctask_done (struct synctask *task) +{ +	if (task->synccbk) { +		synctask_destroy (task); +		return; +	} + +	pthread_mutex_lock (&task->mutex); +	{ +		task->done = 1; +		pthread_cond_broadcast (&task->cond); +	} +	pthread_mutex_unlock (&task->mutex); +} + +  int  synctask_new (struct syncenv *env, synctask_fn_t fn, synctask_cbk_t cbk,                call_frame_t *frame, void *opaque)  {          struct synctask *newtask = NULL;          xlator_t        *this    = THIS; +	int              ret     = 0;          VALIDATE_OR_GOTO (env, err);          VALIDATE_OR_GOTO (fn, err); -        VALIDATE_OR_GOTO (cbk, err);          VALIDATE_OR_GOTO (frame, err);          newtask = CALLOC (1, sizeof (*newtask)); @@ -150,7 +204,7 @@ synctask_new (struct syncenv *env, synctask_fn_t fn, synctask_cbk_t cbk,          newtask->env        = env;          newtask->xl         = this;          newtask->syncfn     = fn; -        newtask->synccbk    = cbk; +	newtask->synccbk    = cbk;          newtask->opaque     = opaque;          newtask->frame      = frame; @@ -175,9 +229,33 @@ synctask_new (struct syncenv *env, synctask_fn_t fn, synctask_cbk_t cbk,          makecontext (&newtask->ctx, (void *) synctask_wrap, 2, newtask); +	newtask->state = SYNCTASK_INIT; + +        newtask->slept = 1; + +	if (!cbk) { +		pthread_mutex_init (&newtask->mutex, NULL); +		pthread_cond_init (&newtask->cond, NULL); +		newtask->done = 0; +	} +          synctask_wake (newtask); -        return 0; +	if (!cbk) { +		pthread_mutex_lock (&newtask->mutex); +		{ +			while (!newtask->done) { +				pthread_cond_wait (&newtask->cond, &newtask->mutex); +			} +		} +		pthread_mutex_unlock (&newtask->mutex); + +		ret = newtask->ret; + +		synctask_destroy (newtask); +	} + +        return ret;  err:          if (newtask) {                  if (newtask->stack) @@ -189,10 +267,13 @@ err:  struct synctask * -syncenv_task (struct syncenv *env) +syncenv_task (struct syncproc *proc)  { +	struct syncenv   *env = NULL;          struct synctask  *task = NULL; +	env = proc->env; +          pthread_mutex_lock (&env->mutex);          {                  while (list_empty (&env->runq)) @@ -201,6 +282,9 @@ syncenv_task (struct syncenv *env)                  task = list_entry (env->runq.next, struct synctask, all_tasks);                  list_del_init (&task->all_tasks); +		env->runcount--; + +		task->proc = proc;          }          pthread_mutex_unlock (&env->mutex); @@ -218,10 +302,29 @@ synctask_switchto (struct synctask *task)          synctask_set (task);          THIS = task->xl; -        if (swapcontext (&env->sched, &task->ctx) < 0) { +        task->woken = 0; +        task->slept = 0; + +        if (swapcontext (&task->proc->sched, &task->ctx) < 0) {                  gf_log ("syncop", GF_LOG_ERROR,                          "swapcontext failed (%s)", strerror (errno));          } + +        if (task->state == SYNCTASK_DONE) { +                synctask_done (task); +                return; +        } + +        pthread_mutex_lock (&env->mutex); +        { +                if (task->woken) { +                        __run (task); +                } else { +                        task->slept = 1; +                        __wait (task); +                } +        } +        pthread_mutex_unlock (&env->mutex);  } @@ -229,19 +332,18 @@ void *  syncenv_processor (void *thdata)  {          struct syncenv  *env = NULL; +        struct syncproc *proc = NULL;          struct synctask *task = NULL; -        env = thdata; +        proc = thdata; +        env = proc->env;          for (;;) { -                task = syncenv_task (env); - -                if (task->complete) { -                        synctask_destroy (task); -                        continue; -                } +                task = syncenv_task (proc);                  synctask_switchto (task); + +		syncenv_scale (env);          }          return NULL; @@ -249,6 +351,33 @@ syncenv_processor (void *thdata)  void +syncenv_scale (struct syncenv *env) +{ +	int  thmax = 0; +	int  i = 0; +	int  ret = 0; + +	pthread_mutex_lock (&env->mutex); +	{ +		if (env->procs > env->runcount) +			goto unlock; + +		thmax = max (env->runcount, SYNCENV_PROC_MAX); +		for (i = env->procs; i < thmax; i++) { +			env->proc[i].env = env; +			ret = pthread_create (&env->proc[i].processor, NULL, +					      syncenv_processor, &env->proc[i]); +			if (ret) +				break; +			env->procs++; +		} +	} +unlock: +	pthread_mutex_unlock (&env->mutex); +} + + +void  syncenv_destroy (struct syncenv *env)  { @@ -260,6 +389,7 @@ syncenv_new (size_t stacksize)  {          struct syncenv *newenv = NULL;          int             ret = 0; +        int             i = 0;          newenv = CALLOC (1, sizeof (*newenv)); @@ -276,8 +406,14 @@ syncenv_new (size_t stacksize)          if (stacksize)                  newenv->stacksize = stacksize; -        ret = pthread_create (&newenv->processor, NULL, -                              syncenv_processor, newenv); +        for (i = 0; i < SYNCENV_PROC_MIN; i++) { +                newenv->proc[i].env = newenv; +                ret = pthread_create (&newenv->proc[i].processor, NULL, +                                      syncenv_processor, &newenv->proc[i]); +                if (ret) +                        break; +                newenv->procs++; +        }          if (ret != 0)                  syncenv_destroy (newenv);  | 
