diff options
Diffstat (limited to 'libglusterfs/src')
| -rw-r--r-- | libglusterfs/src/syncop.c | 43 | ||||
| -rw-r--r-- | libglusterfs/src/syncop.h | 155 | 
2 files changed, 122 insertions, 76 deletions
diff --git a/libglusterfs/src/syncop.c b/libglusterfs/src/syncop.c index b15ee31ba07..a8faa65e08a 100644 --- a/libglusterfs/src/syncop.c +++ b/libglusterfs/src/syncop.c @@ -80,15 +80,24 @@ __wait (struct synctask *task)  void -synctask_yield (struct synctask *task) +synctask_waitfor (struct synctask *task, int waitfor)  { -	xlator_t *oldTHIS = THIS; +	struct syncenv *env = NULL; +        xlator_t *oldTHIS = THIS; + +	env = task->env;  #if defined(__NetBSD__) && defined(_UC_TLSBASE)  	/* Preserve pthread private pointer through swapcontex() */  	task->proc->sched.uc_flags &= ~_UC_TLSBASE;  #endif +	pthread_mutex_lock (&env->mutex); +	{ +		task->waitfor = waitfor; +	} +	pthread_mutex_unlock (&env->mutex); +          if (swapcontext (&task->ctx, &task->proc->sched) < 0) {                  gf_log ("syncop", GF_LOG_ERROR,                          "swapcontext failed (%s)", strerror (errno)); @@ -99,6 +108,29 @@ synctask_yield (struct synctask *task)  void +synctask_yield (struct synctask *task) +{ +	synctask_waitfor (task, 1); +} + + +void +synctask_yawn (struct synctask *task) +{ +	struct syncenv *env = NULL; + +	env = task->env; + +	pthread_mutex_lock (&env->mutex); +	{ +		task->woken = 0; +		task->waitfor = 0; +	} +	pthread_mutex_unlock (&env->mutex); +} + + +void  synctask_wake (struct synctask *task)  {          struct syncenv *env = NULL; @@ -107,9 +139,9 @@ synctask_wake (struct synctask *task)          pthread_mutex_lock (&env->mutex);          { -                task->woken = 1; +                task->woken++; -                if (task->slept) +                if (task->slept && task->woken >= task->waitfor)                          __run (task);          }          pthread_mutex_unlock (&env->mutex); @@ -338,6 +370,7 @@ synctask_switchto (struct synctask *task)          task->woken = 0;          task->slept = 0; +	task->waitfor = 0;  #if defined(__NetBSD__) && defined(_UC_TLSBASE)  	/* Preserve pthread private pointer through swapcontex() */ @@ -356,7 +389,7 @@ synctask_switchto (struct synctask *task)          pthread_mutex_lock (&env->mutex);          { -                if (task->woken) { +                if (task->woken >= task->waitfor) {                          __run (task);                  } else {                          task->slept = 1; diff --git a/libglusterfs/src/syncop.h b/libglusterfs/src/syncop.h index 37e2b0e288a..d4086291a47 100644 --- a/libglusterfs/src/syncop.h +++ b/libglusterfs/src/syncop.h @@ -36,11 +36,11 @@ typedef int (*synctask_fn_t) (void *opaque);  typedef enum { -	SYNCTASK_INIT = 0, -	SYNCTASK_RUN, +        SYNCTASK_INIT = 0, +        SYNCTASK_RUN,          SYNCTASK_SUSPEND, -	SYNCTASK_WAIT, -	SYNCTASK_DONE, +        SYNCTASK_WAIT, +        SYNCTASK_DONE,  } synctask_state_t;  /* for one sequential execution of @syncfn */ @@ -52,22 +52,23 @@ struct synctask {          call_frame_t       *opframe;          synctask_cbk_t      synccbk;          synctask_fn_t       syncfn; -	synctask_state_t    state; +        synctask_state_t    state;          void               *opaque;          void               *stack;          int                 woken;          int                 slept; -	int                 ret; +        int                 waitfor; +        int                 ret; -	uid_t               uid; -	gid_t               gid; +        uid_t               uid; +        gid_t               gid;          ucontext_t          ctx; -	struct syncproc    *proc; +        struct syncproc    *proc; -	pthread_mutex_t     mutex; /* for synchronous spawning of synctask */ -	pthread_cond_t      cond; -	int                 done; +        pthread_mutex_t     mutex; /* for synchronous spawning of synctask */ +        pthread_cond_t      cond; +        int                 done;  }; @@ -116,79 +117,85 @@ struct syncargs {          /* do not touch */          struct synctask    *task; -	pthread_mutex_t     mutex; -	pthread_cond_t      cond; -	int                 done; +        pthread_mutex_t     mutex; +        pthread_cond_t      cond; +        int                 wakecnt;  }; -#define __yawn(args) do {					\ -	if (!args->task) {					\ -		pthread_mutex_init (&args->mutex, NULL);	\ -		pthread_cond_init (&args->cond, NULL);		\ -		args->done = 0;					\ -	}							\ -	} while (0) - - -#define __wake(args) do {					\ -	if (args->task) {					\ -		synctask_wake (args->task);			\ -	} else {						\ -		pthread_mutex_lock (&args->mutex);		\ -		{						\ -			args->done = 1;				\ -			pthread_cond_signal (&args->cond);	\ -		}						\ -		pthread_mutex_unlock (&args->mutex);		\ -	}							\ -	} while (0) - - -#define __yield(args) do {						\ -	if (args->task) {						\ -		synctask_yield (args->task);				\ -	} else {							\ -		pthread_mutex_lock (&args->mutex);			\ -		{							\ -			while (!args->done)				\ -				pthread_cond_wait (&args->cond,		\ -						   &args->mutex);	\ -		}							\ -		pthread_mutex_unlock (&args->mutex);			\ -		pthread_mutex_destroy (&args->mutex);			\ -		pthread_cond_destroy (&args->cond);			\ -	}								\ -	} while (0) +#define __yawn(args) do {                                       \ +        args->task = synctask_get ();                           \ +        if (args->task) {                                       \ +                synctask_yawn (args->task);                     \ +        } else {                                                \ +                pthread_mutex_init (&args->mutex, NULL);        \ +                pthread_cond_init (&args->cond, NULL);          \ +                args->wakecnt = 0;                              \ +        }                                                       \ +        } while (0) + + +#define __wake(args) do {                                       \ +        if (args->task) {                                       \ +                synctask_wake (args->task);                     \ +        } else {                                                \ +                pthread_mutex_lock (&args->mutex);              \ +                {                                               \ +                        args->wakecnt++;                        \ +                        pthread_cond_signal (&args->cond);      \ +                }                                               \ +                pthread_mutex_unlock (&args->mutex);            \ +        }                                                       \ +        } while (0) + + +#define __waitfor(args, cnt) do {                                       \ +        if (args->task) {                                               \ +                synctask_waitfor (args->task, cnt);                     \ +        } else {                                                        \ +                pthread_mutex_lock (&args->mutex);                      \ +                {                                                       \ +                        while (args->wakecnt < cnt)                     \ +                                pthread_cond_wait (&args->cond,         \ +                                                   &args->mutex);       \ +                }                                                       \ +                pthread_mutex_unlock (&args->mutex);                    \ +                pthread_mutex_destroy (&args->mutex);                   \ +                pthread_cond_destroy (&args->cond);                     \ +        }                                                               \ +        } while (0) + + +#define __yield(args) __waitfor(args, 1)  #define SYNCOP(subvol, stb, cbk, op, params ...) do {                   \                  struct  synctask        *task = NULL;                   \ -		call_frame_t            *frame = NULL;			\ +                call_frame_t            *frame = NULL;                  \                                                                          \                  task = synctask_get ();                                 \                  stb->task = task;                                       \ -		if (task)						\ -			frame = task->opframe;				\ -		else							\ -			frame = create_frame (THIS, THIS->ctx->pool);	\ +                if (task)                                               \ +                        frame = task->opframe;                          \ +                else                                                    \ +                        frame = create_frame (THIS, THIS->ctx->pool);   \                  if (task) {                                             \                          frame->root->uid = task->uid;                   \                          frame->root->gid = task->gid;                   \                  }                                                       \ -									\ -		__yawn (stb);						\                                                                          \ -                STACK_WIND_COOKIE (frame, cbk, (void *)stb, subvol,	\ -				   op, params);				\ -		if (task)						\ -			task->state = SYNCTASK_SUSPEND;			\ -									\ -                __yield (stb);						\ -		if (task)						\ -			STACK_RESET (frame->root);			\ -		else							\ -			STACK_DESTROY (frame->root);			\ +                __yawn (stb);                                           \ +                                                                        \ +                STACK_WIND_COOKIE (frame, cbk, (void *)stb, subvol,     \ +                                   op, params);                         \ +                if (task)                                               \ +                        task->state = SYNCTASK_SUSPEND;                 \ +                                                                        \ +                __yield (stb);                                          \ +                if (task)                                               \ +                        STACK_RESET (frame->root);                      \ +                else                                                    \ +                        STACK_DESTROY (frame->root);                    \          } while (0) @@ -201,6 +208,12 @@ void syncenv_scale (struct syncenv *env);  int synctask_new (struct syncenv *, synctask_fn_t, synctask_cbk_t, call_frame_t* frame, void *);  void synctask_wake (struct synctask *task);  void synctask_yield (struct synctask *task); +void synctask_yawn (struct synctask *task); +void synctask_waitfor (struct synctask *task, int count); + +#define synctask_barrier_init(args) __yawn (args) +#define synctask_barrier_wait(args, n) __waitfor (args, n) +#define synctask_barrier_wake(args) __wake (args)  int synctask_setid (struct synctask *task, uid_t uid, gid_t gid);  #define SYNCTASK_SETID(uid, gid) synctask_setid (synctask_get(), uid, gid); @@ -264,7 +277,7 @@ int syncop_fstat (xlator_t *subvol, fd_t *fd, struct iatt *stbuf);  int syncop_stat (xlator_t *subvol, loc_t *loc, struct iatt *stbuf);  int syncop_symlink (xlator_t *subvol, loc_t *loc, const char *newpath, -		    dict_t *dict); +                    dict_t *dict);  int syncop_readlink (xlator_t *subvol, loc_t *loc, char **buffer, size_t size);  int syncop_mknod (xlator_t *subvol, loc_t *loc, mode_t mode, dev_t rdev,                    dict_t *dict);  | 
