diff options
| author | Anand Avati <avati@redhat.com> | 2012-02-21 09:25:14 +0530 | 
|---|---|---|
| committer | Anand Avati <avati@redhat.com> | 2012-02-20 21:12:08 -0800 | 
| commit | 1206437fcfc1f3e1bd4a6faec3341c240bae5cf2 (patch) | |
| tree | aec03c585583007ee57d3053b62dfe40e06700ef /libglusterfs/src | |
| parent | dfc88bf3727fb33e2fc273bd7f24401e0209f39e (diff) | |
syncop: Multi-processor support in syncenv
This patch introduces:
- multithreading of syncop processors permitting synctasks to be executed
  concurrently if the runqueue has many tasks.
- Auto scaling of syncop processors based on runqueue length.
- Execute a synctask (synctask_new) in a blocking way if callback function
  is set NULL. The return value of the syncfn will be the return value
  of synctask_new()
Change-Id: Iff369709af9adfd07be3386842876a24e1a5a9b5
BUG: 763820
Reviewed-on: http://review.gluster.com/443
Tested-by: Gluster Build System <jenkins@build.gluster.com>
Reviewed-by: Anand Avati <avati@redhat.com>
Diffstat (limited to 'libglusterfs/src')
| -rw-r--r-- | libglusterfs/src/syncop.c | 232 | ||||
| -rw-r--r-- | libglusterfs/src/syncop.h | 51 | 
2 files changed, 216 insertions, 67 deletions
diff --git a/libglusterfs/src/syncop.c b/libglusterfs/src/syncop.c index 712e5b1f2..096c29efe 100644 --- a/libglusterfs/src/syncop.c +++ b/libglusterfs/src/syncop.c @@ -39,42 +39,76 @@ syncop_create_frame ()          return (call_frame_t *)frame;  } -void -synctask_yield (struct synctask *task) + +static void +__run (struct synctask *task)  { -        struct syncenv   *env = NULL; +        struct syncenv *env = NULL;          env = task->env; -        if (swapcontext (&task->ctx, &env->sched) < 0) { -                gf_log ("syncop", GF_LOG_ERROR, -                        "swapcontext failed (%s)", strerror (errno)); -        } -} - - -void -synctask_yawn (struct synctask *task) +        list_del_init (&task->all_tasks); +	switch (task->state) { +	case SYNCTASK_INIT: +		break; +	case SYNCTASK_RUN: +		gf_log (task->xl->name, GF_LOG_WARNING, +			"re-running already running task"); +		env->runcount--; +		break; +	case SYNCTASK_WAIT: +		env->waitcount--; +		break; +	case SYNCTASK_DONE: +		gf_log (task->xl->name, GF_LOG_WARNING, +			"running completed task"); +		break; +	} + +        list_add_tail (&task->all_tasks, &env->runq); +	env->runcount++; +	task->state = SYNCTASK_RUN; +} + + +static void +__wait (struct synctask *task)  { -        struct syncenv  *env = NULL; +        struct syncenv *env = NULL; -        env  = task->env; +        env = task->env; -        pthread_mutex_lock (&env->mutex); -        { -                list_del_init (&task->all_tasks); -                list_add (&task->all_tasks, &env->waitq); -        } -        pthread_mutex_unlock (&env->mutex); +        list_del_init (&task->all_tasks); +	switch (task->state) { +	case SYNCTASK_INIT: +		break; +	case SYNCTASK_RUN: +		env->runcount--; +		break; +	case SYNCTASK_WAIT: +		gf_log (task->xl->name, GF_LOG_WARNING, +			"re-waiting already waiting task"); +		env->waitcount--; +		break; +	case SYNCTASK_DONE: +		gf_log (task->xl->name, GF_LOG_WARNING, +			"running completed task"); +		break; +	} + +        list_add_tail (&task->all_tasks, &env->waitq); +	env->waitcount++; +	task->state = SYNCTASK_WAIT;  }  void -synctask_zzzz (struct synctask *task) +synctask_yield (struct synctask *task)  { -        synctask_yawn (task); - -        synctask_yield (task); +        if (swapcontext (&task->ctx, &task->proc->sched) < 0) { +                gf_log ("syncop", GF_LOG_ERROR, +                        "swapcontext failed (%s)", strerror (errno)); +        }  } @@ -87,8 +121,10 @@ synctask_wake (struct synctask *task)          pthread_mutex_lock (&env->mutex);          { -                list_del_init (&task->all_tasks); -                list_add_tail (&task->all_tasks, &env->runq); +                task->woken = 1; + +                if (task->slept) +                        __run (task);          }          pthread_mutex_unlock (&env->mutex); @@ -99,21 +135,17 @@ synctask_wake (struct synctask *task)  void  synctask_wrap (struct synctask *old_task)  { -        int              ret;          struct synctask *task = NULL;          /* Do not trust the pointer received. It may be             wrong and can lead to crashes. */          task = synctask_get (); -        ret = task->syncfn (task->opaque); -        task->synccbk (ret, task->frame, task->opaque); +        task->ret = task->syncfn (task->opaque); +	if (task->synccbk) +		task->synccbk (task->ret, task->frame, task->opaque); -        /* cannot destroy @task right here as we are -           in the execution stack of @task itself -        */ -        task->complete = 1; -        synctask_wake (task); +        task->state = SYNCTASK_DONE;          synctask_yield (task);  } @@ -127,20 +159,42 @@ synctask_destroy (struct synctask *task)          if (task->stack)                  FREE (task->stack); + +	pthread_mutex_destroy (&task->mutex); + +	pthread_cond_destroy (&task->cond); +          FREE (task);  } +void +synctask_done (struct synctask *task) +{ +	if (task->synccbk) { +		synctask_destroy (task); +		return; +	} + +	pthread_mutex_lock (&task->mutex); +	{ +		task->done = 1; +		pthread_cond_broadcast (&task->cond); +	} +	pthread_mutex_unlock (&task->mutex); +} + +  int  synctask_new (struct syncenv *env, synctask_fn_t fn, synctask_cbk_t cbk,                call_frame_t *frame, void *opaque)  {          struct synctask *newtask = NULL;          xlator_t        *this    = THIS; +	int              ret     = 0;          VALIDATE_OR_GOTO (env, err);          VALIDATE_OR_GOTO (fn, err); -        VALIDATE_OR_GOTO (cbk, err);          VALIDATE_OR_GOTO (frame, err);          newtask = CALLOC (1, sizeof (*newtask)); @@ -150,7 +204,7 @@ synctask_new (struct syncenv *env, synctask_fn_t fn, synctask_cbk_t cbk,          newtask->env        = env;          newtask->xl         = this;          newtask->syncfn     = fn; -        newtask->synccbk    = cbk; +	newtask->synccbk    = cbk;          newtask->opaque     = opaque;          newtask->frame      = frame; @@ -175,9 +229,33 @@ synctask_new (struct syncenv *env, synctask_fn_t fn, synctask_cbk_t cbk,          makecontext (&newtask->ctx, (void *) synctask_wrap, 2, newtask); +	newtask->state = SYNCTASK_INIT; + +        newtask->slept = 1; + +	if (!cbk) { +		pthread_mutex_init (&newtask->mutex, NULL); +		pthread_cond_init (&newtask->cond, NULL); +		newtask->done = 0; +	} +          synctask_wake (newtask); -        return 0; +	if (!cbk) { +		pthread_mutex_lock (&newtask->mutex); +		{ +			while (!newtask->done) { +				pthread_cond_wait (&newtask->cond, &newtask->mutex); +			} +		} +		pthread_mutex_unlock (&newtask->mutex); + +		ret = newtask->ret; + +		synctask_destroy (newtask); +	} + +        return ret;  err:          if (newtask) {                  if (newtask->stack) @@ -189,10 +267,13 @@ err:  struct synctask * -syncenv_task (struct syncenv *env) +syncenv_task (struct syncproc *proc)  { +	struct syncenv   *env = NULL;          struct synctask  *task = NULL; +	env = proc->env; +          pthread_mutex_lock (&env->mutex);          {                  while (list_empty (&env->runq)) @@ -201,6 +282,9 @@ syncenv_task (struct syncenv *env)                  task = list_entry (env->runq.next, struct synctask, all_tasks);                  list_del_init (&task->all_tasks); +		env->runcount--; + +		task->proc = proc;          }          pthread_mutex_unlock (&env->mutex); @@ -218,10 +302,29 @@ synctask_switchto (struct synctask *task)          synctask_set (task);          THIS = task->xl; -        if (swapcontext (&env->sched, &task->ctx) < 0) { +        task->woken = 0; +        task->slept = 0; + +        if (swapcontext (&task->proc->sched, &task->ctx) < 0) {                  gf_log ("syncop", GF_LOG_ERROR,                          "swapcontext failed (%s)", strerror (errno));          } + +        if (task->state == SYNCTASK_DONE) { +                synctask_done (task); +                return; +        } + +        pthread_mutex_lock (&env->mutex); +        { +                if (task->woken) { +                        __run (task); +                } else { +                        task->slept = 1; +                        __wait (task); +                } +        } +        pthread_mutex_unlock (&env->mutex);  } @@ -229,19 +332,18 @@ void *  syncenv_processor (void *thdata)  {          struct syncenv  *env = NULL; +        struct syncproc *proc = NULL;          struct synctask *task = NULL; -        env = thdata; +        proc = thdata; +        env = proc->env;          for (;;) { -                task = syncenv_task (env); - -                if (task->complete) { -                        synctask_destroy (task); -                        continue; -                } +                task = syncenv_task (proc);                  synctask_switchto (task); + +		syncenv_scale (env);          }          return NULL; @@ -249,6 +351,33 @@ syncenv_processor (void *thdata)  void +syncenv_scale (struct syncenv *env) +{ +	int  thmax = 0; +	int  i = 0; +	int  ret = 0; + +	pthread_mutex_lock (&env->mutex); +	{ +		if (env->procs > env->runcount) +			goto unlock; + +		thmax = max (env->runcount, SYNCENV_PROC_MAX); +		for (i = env->procs; i < thmax; i++) { +			env->proc[i].env = env; +			ret = pthread_create (&env->proc[i].processor, NULL, +					      syncenv_processor, &env->proc[i]); +			if (ret) +				break; +			env->procs++; +		} +	} +unlock: +	pthread_mutex_unlock (&env->mutex); +} + + +void  syncenv_destroy (struct syncenv *env)  { @@ -260,6 +389,7 @@ syncenv_new (size_t stacksize)  {          struct syncenv *newenv = NULL;          int             ret = 0; +        int             i = 0;          newenv = CALLOC (1, sizeof (*newenv)); @@ -276,8 +406,14 @@ syncenv_new (size_t stacksize)          if (stacksize)                  newenv->stacksize = stacksize; -        ret = pthread_create (&newenv->processor, NULL, -                              syncenv_processor, newenv); +        for (i = 0; i < SYNCENV_PROC_MIN; i++) { +                newenv->proc[i].env = newenv; +                ret = pthread_create (&newenv->proc[i].processor, NULL, +                                      syncenv_processor, &newenv->proc[i]); +                if (ret) +                        break; +                newenv->procs++; +        }          if (ret != 0)                  syncenv_destroy (newenv); diff --git a/libglusterfs/src/syncop.h b/libglusterfs/src/syncop.h index 7d8a2cb02..9554edb72 100644 --- a/libglusterfs/src/syncop.h +++ b/libglusterfs/src/syncop.h @@ -30,8 +30,11 @@  #include <pthread.h>  #include <ucontext.h> +#define SYNCENV_PROC_MAX 16 +#define SYNCENV_PROC_MIN 2  struct synctask; +struct syncproc;  struct syncenv; @@ -40,6 +43,13 @@ typedef int (*synctask_cbk_t) (int ret, call_frame_t *frame, void *opaque);  typedef int (*synctask_fn_t) (void *opaque); +typedef enum { +	SYNCTASK_INIT = 0, +	SYNCTASK_RUN, +	SYNCTASK_WAIT, +	SYNCTASK_DONE, +} synctask_state_t; +  /* for one sequential execution of @syncfn */  struct synctask {          struct list_head    all_tasks; @@ -48,25 +58,43 @@ struct synctask {          call_frame_t       *frame;          synctask_cbk_t      synccbk;          synctask_fn_t       syncfn; +	synctask_state_t    state;          void               *opaque;          void               *stack; +        int                 woken; +        int                 slept;          int                 complete; +	int                 ret;          ucontext_t          ctx; +	struct syncproc    *proc; + +	pthread_mutex_t     mutex; /* for synchronous spawning of synctask */ +	pthread_cond_t      cond; +	int                 done;  }; -/* hosts the scheduler thread and framework for executing synctasks */ -struct syncenv { + +struct syncproc {          pthread_t           processor; +        ucontext_t          sched; +        struct syncenv     *env;          struct synctask    *current; +}; + +/* hosts the scheduler thread and framework for executing synctasks */ +struct syncenv { +        struct syncproc     proc[SYNCENV_PROC_MAX]; +        int                 procs;          struct list_head    runq; +        int                 runcount;          struct list_head    waitq; +        int                 waitcount;          pthread_mutex_t     mutex;          pthread_cond_t      cond; -        ucontext_t          sched;          size_t              stacksize;  }; @@ -92,20 +120,6 @@ struct syncargs {  }; -#define __yawn(args) do {                                               \ -                struct synctask *task = NULL;                           \ -                                                                        \ -                task = synctask_get ();                                 \ -                if (task) {                                             \ -                        args->task = task;                              \ -                        synctask_yawn (task);                           \ -                } else {                                                \ -                        pthread_mutex_init (&args->mutex, NULL);        \ -                        pthread_cond_init (&args->cond, NULL);          \ -                }                                                       \ -        } while (0) - -  #define __yield(args) do {                                              \                  if (args->task) {                                       \                          synctask_yield (args->task);                    \ @@ -143,7 +157,6 @@ struct syncargs {                                                                          \                  frame = syncop_create_frame ();                         \                                                                          \ -                __yawn (stb);                                           \                  STACK_WIND_COOKIE (frame, cbk, (void *)stb, subvol, op, params); \                  __yield (stb);                                          \          } while (0) @@ -153,10 +166,10 @@ struct syncargs {  struct syncenv * syncenv_new ();  void syncenv_destroy (struct syncenv *); +void syncenv_scale (struct syncenv *env);  int synctask_new (struct syncenv *, synctask_fn_t, synctask_cbk_t, call_frame_t* frame, void *);  void synctask_zzzz (struct synctask *task); -void synctask_yawn (struct synctask *task);  void synctask_wake (struct synctask *task);  void synctask_yield (struct synctask *task);  | 
