diff options
Diffstat (limited to 'libglusterfs/src/syncop.h')
| -rw-r--r-- | libglusterfs/src/syncop.h | 251 |
1 files changed, 229 insertions, 22 deletions
diff --git a/libglusterfs/src/syncop.h b/libglusterfs/src/syncop.h index 726e8c49a..f790981f0 100644 --- a/libglusterfs/src/syncop.h +++ b/libglusterfs/src/syncop.h @@ -23,6 +23,14 @@ #define SYNCENV_PROC_MAX 16 #define SYNCENV_PROC_MIN 2 +#define SYNCPROC_IDLE_TIME 600 + +/* + * Flags for syncopctx valid elements + */ +#define SYNCOPCTX_UID 0x00000001 +#define SYNCOPCTX_GID 0x00000002 +#define SYNCOPCTX_GROUPS 0x00000004 struct synctask; struct syncproc; @@ -35,11 +43,12 @@ typedef int (*synctask_fn_t) (void *opaque); typedef enum { - SYNCTASK_INIT = 0, - SYNCTASK_RUN, + SYNCTASK_INIT = 0, + SYNCTASK_RUN, SYNCTASK_SUSPEND, - SYNCTASK_WAIT, - SYNCTASK_DONE, + SYNCTASK_WAIT, + SYNCTASK_DONE, + SYNCTASK_ZOMBIE, } synctask_state_t; /* for one sequential execution of @syncfn */ @@ -51,19 +60,24 @@ struct synctask { call_frame_t *opframe; synctask_cbk_t synccbk; synctask_fn_t syncfn; - synctask_state_t state; + synctask_state_t state; void *opaque; void *stack; int woken; int slept; - int ret; + int ret; + + uid_t uid; + gid_t gid; ucontext_t ctx; - struct syncproc *proc; + struct syncproc *proc; - pthread_mutex_t mutex; /* for synchronous spawning of synctask */ - pthread_cond_t cond; - int done; + pthread_mutex_t mutex; /* for synchronous spawning of synctask */ + pthread_cond_t cond; + int done; + + struct list_head waitq; /* can wait only "once" at a time */ }; @@ -84,6 +98,9 @@ struct syncenv { struct list_head waitq; int waitcount; + int procmin; + int procmax; + pthread_mutex_t mutex; pthread_cond_t cond; @@ -91,6 +108,25 @@ struct syncenv { }; +struct synclock { + pthread_mutex_t guard; /* guard the remaining members, pair @cond */ + pthread_cond_t cond; /* waiting non-synctasks */ + struct list_head waitq; /* waiting synctasks */ + gf_boolean_t lock; /* _gf_true or _gf_false, lock status */ + struct synctask *owner; /* NULL if current owner is not a synctask */ +}; +typedef struct synclock synclock_t; + + +struct syncbarrier { + pthread_mutex_t guard; /* guard the remaining members, pair @cond */ + pthread_cond_t cond; /* waiting non-synctasks */ + struct list_head waitq; /* waiting synctasks */ + int count; /* count the number of wakes */ +}; +typedef struct syncbarrier syncbarrier_t; + + struct syncargs { int op_ret; int op_errno; @@ -104,42 +140,196 @@ struct syncargs { struct iobref *iobref; char *buffer; dict_t *xdata; + struct gf_flock flock; /* some more _cbk needs */ uuid_t uuid; char *errstr; dict_t *dict; + pthread_mutex_t lock_dict; + + syncbarrier_t barrier; /* do not touch */ struct synctask *task; + pthread_mutex_t mutex; + pthread_cond_t cond; + int done; +}; + +struct syncopctx { + unsigned int valid; /* valid flags for elements that are set */ + uid_t uid; + gid_t gid; + int grpsize; + int ngrps; + gid_t *groups; }; -#define __wake(args) synctask_wake(args->task) +#define __yawn(args) do { \ + args->task = synctask_get (); \ + if (args->task) \ + break; \ + pthread_mutex_init (&args->mutex, NULL); \ + pthread_cond_init (&args->cond, NULL); \ + args->done = 0; \ + } while (0) + + +#define __wake(args) do { \ + if (args->task) { \ + synctask_wake (args->task); \ + } else { \ + pthread_mutex_lock (&args->mutex); \ + { \ + args->done = 1; \ + pthread_cond_signal (&args->cond); \ + } \ + pthread_mutex_unlock (&args->mutex); \ + } \ + } while (0) + + +#define __yield(args) do { \ + if (args->task) { \ + synctask_yield (args->task); \ + } else { \ + pthread_mutex_lock (&args->mutex); \ + { \ + while (!args->done) \ + pthread_cond_wait (&args->cond, \ + &args->mutex); \ + } \ + pthread_mutex_unlock (&args->mutex); \ + pthread_mutex_destroy (&args->mutex); \ + pthread_cond_destroy (&args->cond); \ + } \ + } while (0) #define SYNCOP(subvol, stb, cbk, op, params ...) do { \ struct synctask *task = NULL; \ + call_frame_t *frame = NULL; \ \ task = synctask_get (); \ stb->task = task; \ + if (task) \ + frame = task->opframe; \ + else \ + frame = syncop_create_frame (THIS); \ + \ + if (task) { \ + frame->root->uid = task->uid; \ + frame->root->gid = task->gid; \ + } \ + \ + __yawn (stb); \ + \ + STACK_WIND_COOKIE (frame, cbk, (void *)stb, subvol, \ + op, params); \ \ - STACK_WIND_COOKIE (task->opframe, cbk, (void *)stb, \ - subvol, op, params); \ - task->state = SYNCTASK_SUSPEND; \ - synctask_yield (stb->task); \ - STACK_RESET (task->opframe->root); \ + __yield (stb); \ + if (task) \ + STACK_RESET (frame->root); \ + else \ + STACK_DESTROY (frame->root); \ } while (0) #define SYNCENV_DEFAULT_STACKSIZE (2 * 1024 * 1024) -struct syncenv * syncenv_new (); +struct syncenv * syncenv_new (size_t stacksize, int procmin, int procmax); void syncenv_destroy (struct syncenv *); void syncenv_scale (struct syncenv *env); int synctask_new (struct syncenv *, synctask_fn_t, synctask_cbk_t, call_frame_t* frame, void *); +struct synctask *synctask_create (struct syncenv *, synctask_fn_t, + synctask_cbk_t, call_frame_t *, void *); +int synctask_join (struct synctask *task); void synctask_wake (struct synctask *task); void synctask_yield (struct synctask *task); +void synctask_waitfor (struct synctask *task, int count); + +#define synctask_barrier_init(args) syncbarrier_init (&args->barrier) +#define synctask_barrier_wait(args, n) syncbarrier_wait (&args->barrier, n) +#define synctask_barrier_wake(args) syncbarrier_wake (&args->barrier) + +int synctask_setid (struct synctask *task, uid_t uid, gid_t gid); +#define SYNCTASK_SETID(uid, gid) synctask_setid (synctask_get(), uid, gid); + +int syncopctx_setfsuid (void *uid); +int syncopctx_setfsgid (void *gid); +int syncopctx_setfsgroups (int count, const void *groups); + +static inline call_frame_t * +syncop_create_frame (xlator_t *this) +{ + call_frame_t *frame = NULL; + int ngrps = -1; + struct syncopctx *opctx = NULL; + + frame = create_frame (this, this->ctx->pool); + if (!frame) + return NULL; + + frame->root->pid = getpid (); + + opctx = syncopctx_getctx (); + if (opctx && (opctx->valid & SYNCOPCTX_UID)) + frame->root->uid = opctx->uid; + else + frame->root->uid = geteuid (); + + if (opctx && (opctx->valid & SYNCOPCTX_GID)) + frame->root->gid = opctx->gid; + else + frame->root->gid = getegid (); + + if (opctx && (opctx->valid & SYNCOPCTX_GROUPS)) { + ngrps = opctx->ngrps; + + if (ngrps != 0 && opctx->groups != NULL) { + if (call_stack_alloc_groups (frame->root, ngrps) != 0) { + STACK_DESTROY (frame->root); + return NULL; + } + + memcpy (frame->root->groups, opctx->groups, + (sizeof (gid_t) * ngrps)); + } + } + else { + ngrps = getgroups (0, 0); + if (ngrps < 0) { + STACK_DESTROY (frame->root); + return NULL; + } + + if (call_stack_alloc_groups (frame->root, ngrps) != 0) { + STACK_DESTROY (frame->root); + return NULL; + } + + if (getgroups (ngrps, frame->root->groups) < 0) { + STACK_DESTROY (frame->root); + return NULL; + } + } + + return frame; +} + +int synclock_init (synclock_t *lock); +int synclock_destory (synclock_t *lock); +int synclock_lock (synclock_t *lock); +int synclock_trylock (synclock_t *lock); +int synclock_unlock (synclock_t *lock); + + +int syncbarrier_init (syncbarrier_t *barrier); +int syncbarrier_wait (syncbarrier_t *barrier, int waitfor); +int syncbarrier_wake (syncbarrier_t *barrier); +int syncbarrier_destroy (syncbarrier_t *barrier); int syncop_lookup (xlator_t *subvol, loc_t *loc, dict_t *xattr_req, /* out */ @@ -174,13 +364,13 @@ int syncop_removexattr (xlator_t *subvol, loc_t *loc, const char *name); int syncop_fremovexattr (xlator_t *subvol, fd_t *fd, const char *name); int syncop_create (xlator_t *subvol, loc_t *loc, int32_t flags, mode_t mode, - fd_t *fd, dict_t *dict); + fd_t *fd, dict_t *dict, struct iatt *iatt); int syncop_open (xlator_t *subvol, loc_t *loc, int32_t flags, fd_t *fd); int syncop_close (fd_t *fd); int syncop_write (xlator_t *subvol, fd_t *fd, const char *buf, int size, off_t offset, struct iobref *iobref, uint32_t flags); -int syncop_writev (xlator_t *subvol, fd_t *fd, struct iovec *vector, +int syncop_writev (xlator_t *subvol, fd_t *fd, const struct iovec *vector, int32_t count, off_t offset, struct iobref *iobref, uint32_t flags); int syncop_readv (xlator_t *subvol, fd_t *fd, size_t size, off_t off, @@ -192,14 +382,31 @@ int syncop_ftruncate (xlator_t *subvol, fd_t *fd, off_t offset); int syncop_truncate (xlator_t *subvol, loc_t *loc, off_t offset); int syncop_unlink (xlator_t *subvol, loc_t *loc); +int syncop_rmdir (xlator_t *subvol, loc_t *loc); -int syncop_fsync (xlator_t *subvol, fd_t *fd); +int syncop_fsync (xlator_t *subvol, fd_t *fd, int dataonly); +int syncop_flush (xlator_t *subvol, fd_t *fd); int syncop_fstat (xlator_t *subvol, fd_t *fd, struct iatt *stbuf); int syncop_stat (xlator_t *subvol, loc_t *loc, struct iatt *stbuf); -int syncop_symlink (xlator_t *subvol, loc_t *loc, char *newpath, dict_t *dict); +int syncop_symlink (xlator_t *subvol, loc_t *loc, const char *newpath, + dict_t *dict, struct iatt *iatt); int syncop_readlink (xlator_t *subvol, loc_t *loc, char **buffer, size_t size); int syncop_mknod (xlator_t *subvol, loc_t *loc, mode_t mode, dev_t rdev, - dict_t *dict); + dict_t *dict, struct iatt *iatt); +int syncop_mkdir (xlator_t *subvol, loc_t *loc, mode_t mode, dict_t *dict, + struct iatt *iatt); int syncop_link (xlator_t *subvol, loc_t *oldloc, loc_t *newloc); +int syncop_fsyncdir (xlator_t *subvol, fd_t *fd, int datasync); +int syncop_access (xlator_t *subvol, loc_t *loc, int32_t mask); +int syncop_fallocate(xlator_t *subvol, fd_t *fd, int32_t keep_size, off_t offset, + size_t len); +int syncop_discard(xlator_t *subvol, fd_t *fd, off_t offset, size_t len); + +int syncop_zerofill(xlator_t *subvol, fd_t *fd, off_t offset, size_t len); + +int syncop_rename (xlator_t *subvol, loc_t *oldloc, loc_t *newloc); + +int syncop_lk (xlator_t *subvol, fd_t *fd, int cmd, struct gf_flock *flock); + #endif /* _SYNCOP_H */ |
