From fac3ff8bfb3958a3bdc34dc9bff7cb281597e40f Mon Sep 17 00:00:00 2001 From: Pavan Sondur Date: Sun, 25 Jul 2010 18:13:08 +0000 Subject: syncop: initial implementation Resending Avati's syncop patch with a few bug fixes. (please do not skip the IMPORTANT NOTES section) * Framework for SYNChronous OPerations -------------------------------------- This patch provides a framework for performing synchronous operations over the underlying actual asynchronous GlusterFS FOPS. * Use cases ----------- 1. Convenient implementation of crawler thread in replicate/pump 2. Convenient implementation of high level control flow in DVM * Background ------------ All (almost) threads in GlusterFS are hosts for executing aysnchronous file operations using the STACK_WIND and STACK_UNWIND primitives - as calls and callbacks. While the STACK_WIND and STACK_UNWIND macros provide high control for efficiently implementing file operations in a clustered/parallel environment, there are tasks where the nature of the task itself is sequential and the execution performance of the task is not critical. In these cases the complexity to implement the task with STACK_WIND/STACK_UNWIND based operations as calls and callbacks is an overkill. * Introduction --------------- syncop: are wrappers around the STACK_WIND/STACK_UNWIND based asynchronous fops. synctask: a sequential task (a C function) which uses syncops. syncenv: an environement to schedule and execute synctasks. The synchronicity is implemented via ucontext.h based continuations. Execution of synchronous tasks is possible only in a synchronous environment. Therefore, the first step is to create such an environment - struct syncenv *env = syncenv_new (0); This creates a synchronous environment, with a thread (scheduler) to host the synchronous tasks. Creation of this environment is generally to be done at the time of process initialization. Next is to spawn a synchronous task in this environment - int slow_self_heal (void *data); int completion_func (int ret, void *data); ret = synctask_new (env, slow_self_heal, completion_func, data); Here slow_self_heal is a task which is implemented using synchronous operations. When slow_self_heal() completes, completion_func() is called with the first parameter as the return value of slow_self_heal(). Both these functions get the @data argument as the same value passed to synctask_new(). int slow_self_heal (void *data) { xlator_t *child = FIRST_CHILD (THIS); fd_t *dir = NULL; ... dir = syncop_opendir (child, loc); entry = syncop_readdir (dir); ... return ret; } * IMPORTANT NOTES ----------------- - calling syncops in code executing outside the synchronous environment will very likely cause and undesired blocking of the executing thread leading to deadlocks!! The synchronous environment is a special thread where such sleeps are safe, and these sleeps result in the scheduler to 'swap in' other synctasks. - syncops can put the task to sleep. DO NOT issue syncops while holding mutexes. This is very similar to the blunder of holding a mutex and doing STACK_WIND. - It works best when synctasks use only syncops. If a call_frame is created and STACK_WIND'ed, the callback would very likely happen in a thread outside the synchronous enviroment, at an undefined time - as expected. So note that the synchronous environment does not tame the notorious behaviour of STACK_WIND. Signed-off-by: Anand V. Avati Signed-off-by: Anand V. Avati BUG: 971 (dynamic volume management) URL: http://bugs.gluster.com/cgi-bin/bugzilla3/show_bug.cgi?id=971 --- libglusterfs/src/syncop.c | 350 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 350 insertions(+) create mode 100644 libglusterfs/src/syncop.c (limited to 'libglusterfs/src/syncop.c') diff --git a/libglusterfs/src/syncop.c b/libglusterfs/src/syncop.c new file mode 100644 index 00000000000..beb5d9db4a1 --- /dev/null +++ b/libglusterfs/src/syncop.c @@ -0,0 +1,350 @@ +/* + Copyright (c) 2010 Gluster, Inc. + This file is part of GlusterFS. + + GlusterFS is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published + by the Free Software Foundation; either version 3 of the License, + or (at your option) any later version. + + GlusterFS is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see + . +*/ + +#ifndef _CONFIG_H +#define _CONFIG_H +#include "config.h" +#endif + +#include "syncop.h" + + +void +synctask_yield (struct synctask *task) +{ + struct syncenv *env = NULL; + + env = task->env; + + if (swapcontext (&task->ctx, &env->sched) < 0) { + gf_log ("syncop", GF_LOG_ERROR, + "swapcontext failed (%s)", strerror (errno)); + } +} + + +void +synctask_yawn (struct synctask *task) +{ + struct syncenv *env = NULL; + + env = task->env; + + pthread_mutex_lock (&env->mutex); + { + list_del_init (&task->all_tasks); + list_add (&task->all_tasks, &env->waitq); + } + pthread_mutex_unlock (&env->mutex); +} + + +void +synctask_zzzz (struct synctask *task) +{ + synctask_yawn (task); + + synctask_yield (task); +} + + +void +synctask_wake (struct synctask *task) +{ + struct syncenv *env = NULL; + + env = task->env; + + pthread_mutex_lock (&env->mutex); + { + list_del_init (&task->all_tasks); + list_add_tail (&task->all_tasks, &env->runq); + } + pthread_mutex_unlock (&env->mutex); + + pthread_cond_broadcast (&env->cond); +} + + +void +synctask_wrap (struct synctask *task) +{ + int ret; + + ret = task->syncfn (task->opaque); + task->synccbk (ret, task->opaque); + + /* cannot destroy @task right here as we are + in the execution stack of @task itself + */ + task->complete = 1; + synctask_wake (task); +} + + +void +synctask_destroy (struct synctask *task) +{ + if (!task) + return; + + if (task->stack) + FREE (task); + FREE (task); +} + + +int +synctask_new (struct syncenv *env, synctask_fn_t fn, synctask_cbk_t cbk, + void *opaque) +{ + struct synctask *newtask = NULL; + + newtask = CALLOC (1, sizeof (*newtask)); + if (!newtask) + return -ENOMEM; + + newtask->env = env; + newtask->xl = THIS; + newtask->syncfn = fn; + newtask->synccbk = cbk; + newtask->opaque = opaque; + + INIT_LIST_HEAD (&newtask->all_tasks); + + if (getcontext (&newtask->ctx) < 0) { + gf_log ("syncop", GF_LOG_ERROR, + "getcontext failed (%s)", + strerror (errno)); + goto err; + } + + newtask->stack = CALLOC (1, env->stacksize); + if (!newtask->stack) { + gf_log ("syncop", GF_LOG_ERROR, + "out of memory for stack"); + goto err; + } + + newtask->ctx.uc_stack.ss_sp = newtask->stack; + newtask->ctx.uc_stack.ss_size = env->stacksize; + + makecontext (&newtask->ctx, (void *) synctask_wrap, 2, newtask); + + synctask_wake (newtask); + + return 0; +err: + if (newtask) { + if (newtask->stack) + FREE (newtask->stack); + FREE (newtask); + } + return -1; +} + + +struct synctask * +syncenv_task (struct syncenv *env) +{ + struct synctask *task = NULL; + + pthread_mutex_lock (&env->mutex); + { + while (list_empty (&env->runq)) + pthread_cond_wait (&env->cond, &env->mutex); + + task = list_entry (env->runq.next, struct synctask, all_tasks); + + list_del_init (&task->all_tasks); + } + pthread_mutex_unlock (&env->mutex); + + return task; +} + + +void +synctask_switchto (struct synctask *task) +{ + struct syncenv *env = NULL; + + env = task->env; + + synctask_set (task); + THIS = task->xl; + + if (swapcontext (&env->sched, &task->ctx) < 0) { + gf_log ("syncop", GF_LOG_ERROR, + "swapcontext failed (%s)", strerror (errno)); + } +} + + +void * +syncenv_processor (void *thdata) +{ + struct syncenv *env = NULL; + struct synctask *task = NULL; + + env = thdata; + + for (;;) { + task = syncenv_task (env); + + if (task->complete) { + synctask_destroy (task); + continue; + } + + synctask_switchto (task); + } + + return NULL; +} + + +void +syncenv_destroy (struct syncenv *env) +{ + +} + + +struct syncenv * +syncenv_new (size_t stacksize) +{ + struct syncenv *newenv = NULL; + int ret = 0; + + newenv = CALLOC (1, sizeof (*newenv)); + + if (!newenv) + return NULL; + + pthread_mutex_init (&newenv->mutex, NULL); + pthread_cond_init (&newenv->cond, NULL); + + INIT_LIST_HEAD (&newenv->runq); + INIT_LIST_HEAD (&newenv->waitq); + + newenv->stacksize = SYNCENV_DEFAULT_STACKSIZE; + if (stacksize) + newenv->stacksize = stacksize; + + ret = pthread_create (&newenv->processor, NULL, + syncenv_processor, newenv); + + if (ret != 0) + syncenv_destroy (newenv); + + return newenv; +} + + +/* FOPS */ + + +int +syncop_lookup_cbk (call_frame_t *frame, void *cookie, xlator_t *this, + int op_ret, int op_errno, inode_t *inode, + struct iatt *iatt, dict_t *xattr, struct iatt *parent) +{ + struct syncargs *args = NULL; + + args = cookie; + + args->op_ret = op_ret; + args->op_errno = op_errno; + + if (op_ret == 0) { + args->iatt1 = *iatt; + args->xattr = xattr; + args->iatt2 = *parent; + } + + __wake (args); + + return 0; +} + + +int +syncop_lookup (xlator_t *subvol, loc_t *loc, dict_t *xattr_req, + struct iatt *iatt, dict_t **xattr_rsp, struct iatt *parent) +{ + struct syncargs args = {0, }; + + SYNCOP (subvol, (&args), syncop_lookup_cbk, subvol->fops->lookup, + loc, xattr_req); + + if (iatt) + *iatt = args.iatt1; + if (xattr_rsp) + *xattr_rsp = args.xattr; + if (parent) + *parent = args.iatt2; + + errno = args.op_errno; + return args.op_ret; +} + + + +int +syncop_setattr_cbk (call_frame_t *frame, void *cookie, xlator_t *this, + int op_ret, int op_errno, + struct iatt *preop, struct iatt *postop) +{ + struct syncargs *args = NULL; + + args = cookie; + + args->op_ret = op_ret; + args->op_errno = op_errno; + + if (op_ret == 0) { + args->iatt1 = *preop; + args->iatt2 = *postop; + } + + __wake (args); + + return 0; +} + + +int +syncop_setattr (xlator_t *subvol, loc_t *loc, struct iatt *iatt, int valid, + struct iatt *preop, struct iatt *postop) +{ + struct syncargs args = {0, }; + + SYNCOP (subvol, (&args), syncop_setattr_cbk, subvol->fops->setattr, + loc, iatt, valid); + + if (preop) + *preop = args.iatt1; + if (postop) + *postop = args.iatt2; + + errno = args.op_errno; + return args.op_ret; +} + -- cgit