summaryrefslogtreecommitdiffstats
path: root/libglusterfs/src/syncop.c
diff options
context:
space:
mode:
Diffstat (limited to 'libglusterfs/src/syncop.c')
-rw-r--r--libglusterfs/src/syncop.c1127
1 files changed, 994 insertions, 133 deletions
diff --git a/libglusterfs/src/syncop.c b/libglusterfs/src/syncop.c
index c84832dfb..c1620bb70 100644
--- a/libglusterfs/src/syncop.c
+++ b/libglusterfs/src/syncop.c
@@ -1,5 +1,5 @@
/*
- Copyright (c) 2008-2012 Red Hat, Inc. <http://www.redhat.com>
+ Copyright (c) 2008-2013 Red Hat, Inc. <http://www.redhat.com>
This file is part of GlusterFS.
This file is licensed to you under your choice of the GNU Lesser
@@ -15,6 +15,160 @@
#include "syncop.h"
+int
+syncopctx_setfsuid (void *uid)
+{
+ struct syncopctx *opctx = NULL;
+ int ret = 0;
+
+ /* In args check */
+ if (!uid) {
+ ret = -1;
+ errno = EINVAL;
+ goto out;
+ }
+
+ opctx = syncopctx_getctx ();
+
+ /* alloc for this thread the first time */
+ if (!opctx) {
+ opctx = GF_CALLOC (1, sizeof (*opctx), gf_common_mt_syncopctx);
+ if (!opctx) {
+ ret = -1;
+ goto out;
+ }
+
+ ret = syncopctx_setctx (opctx);
+ if (ret != 0) {
+ GF_FREE (opctx);
+ opctx = NULL;
+ goto out;
+ }
+ }
+
+out:
+ if (opctx && uid) {
+ opctx->uid = *(uid_t *)uid;
+ opctx->valid |= SYNCOPCTX_UID;
+ }
+
+ return ret;
+}
+
+int
+syncopctx_setfsgid (void *gid)
+{
+ struct syncopctx *opctx = NULL;
+ int ret = 0;
+
+ /* In args check */
+ if (!gid) {
+ ret = -1;
+ errno = EINVAL;
+ goto out;
+ }
+
+ opctx = syncopctx_getctx ();
+
+ /* alloc for this thread the first time */
+ if (!opctx) {
+ opctx = GF_CALLOC (1, sizeof (*opctx), gf_common_mt_syncopctx);
+ if (!opctx) {
+ ret = -1;
+ goto out;
+ }
+
+ ret = syncopctx_setctx (opctx);
+ if (ret != 0) {
+ GF_FREE (opctx);
+ opctx = NULL;
+ goto out;
+ }
+ }
+
+out:
+ if (opctx && gid) {
+ opctx->gid = *(gid_t *)gid;
+ opctx->valid |= SYNCOPCTX_GID;
+ }
+
+ return ret;
+}
+
+int
+syncopctx_setfsgroups (int count, const void *groups)
+{
+ struct syncopctx *opctx = NULL;
+ gid_t *tmpgroups = NULL;
+ int ret = 0;
+
+ /* In args check */
+ if (count != 0 && !groups) {
+ ret = -1;
+ errno = EINVAL;
+ goto out;
+ }
+
+ opctx = syncopctx_getctx ();
+
+ /* alloc for this thread the first time */
+ if (!opctx) {
+ opctx = GF_CALLOC (1, sizeof (*opctx), gf_common_mt_syncopctx);
+ if (!opctx) {
+ ret = -1;
+ goto out;
+ }
+
+ ret = syncopctx_setctx (opctx);
+ if (ret != 0) {
+ GF_FREE (opctx);
+ opctx = NULL;
+ goto out;
+ }
+ }
+
+ /* resize internal groups as required */
+ if (count && opctx->grpsize < count) {
+ if (opctx->groups) {
+ tmpgroups = GF_REALLOC (opctx->groups,
+ (sizeof (gid_t) * count));
+ /* NOTE: Not really required to zero the reallocation,
+ * as ngrps controls the validity of data,
+ * making a note irrespective */
+ if (tmpgroups == NULL) {
+ opctx->grpsize = 0;
+ GF_FREE (opctx->groups);
+ opctx->groups = NULL;
+ ret = -1;
+ goto out;
+ }
+ }
+ else {
+ tmpgroups = GF_CALLOC (count, sizeof (gid_t),
+ gf_common_mt_syncopctx);
+ if (tmpgroups == NULL) {
+ opctx->grpsize = 0;
+ ret = -1;
+ goto out;
+ }
+ }
+
+ opctx->groups = tmpgroups;
+ opctx->grpsize = count;
+ }
+
+ /* copy out the groups passed */
+ if (count)
+ memcpy (opctx->groups, groups, (sizeof (gid_t) * count));
+
+ /* set/reset the ngrps, this is where reset of groups is handled */
+ opctx->ngrps = count;
+ opctx->valid |= SYNCOPCTX_GROUPS;
+
+out:
+ return ret;
+}
+
static void
__run (struct synctask *task)
{
@@ -23,27 +177,31 @@ __run (struct synctask *task)
env = task->env;
list_del_init (&task->all_tasks);
- switch (task->state) {
- case SYNCTASK_INIT:
+ switch (task->state) {
+ case SYNCTASK_INIT:
case SYNCTASK_SUSPEND:
- break;
- case SYNCTASK_RUN:
- gf_log (task->xl->name, GF_LOG_WARNING,
- "re-running already running task");
- env->runcount--;
- break;
- case SYNCTASK_WAIT:
- env->waitcount--;
- break;
- case SYNCTASK_DONE:
+ break;
+ case SYNCTASK_RUN:
+ gf_log (task->xl->name, GF_LOG_DEBUG,
+ "re-running already running task");
+ env->runcount--;
+ break;
+ case SYNCTASK_WAIT:
+ env->waitcount--;
+ break;
+ case SYNCTASK_DONE:
+ gf_log (task->xl->name, GF_LOG_WARNING,
+ "running completed task");
+ return;
+ case SYNCTASK_ZOMBIE:
gf_log (task->xl->name, GF_LOG_WARNING,
- "running completed task");
- break;
- }
+ "attempted to wake up zombie!!");
+ return;
+ }
list_add_tail (&task->all_tasks, &env->runq);
- env->runcount++;
- task->state = SYNCTASK_RUN;
+ env->runcount++;
+ task->state = SYNCTASK_RUN;
}
@@ -55,37 +213,52 @@ __wait (struct synctask *task)
env = task->env;
list_del_init (&task->all_tasks);
- switch (task->state) {
- case SYNCTASK_INIT:
+ switch (task->state) {
+ case SYNCTASK_INIT:
case SYNCTASK_SUSPEND:
- break;
- case SYNCTASK_RUN:
- env->runcount--;
- break;
- case SYNCTASK_WAIT:
- gf_log (task->xl->name, GF_LOG_WARNING,
- "re-waiting already waiting task");
- env->waitcount--;
- break;
- case SYNCTASK_DONE:
+ break;
+ case SYNCTASK_RUN:
+ env->runcount--;
+ break;
+ case SYNCTASK_WAIT:
+ gf_log (task->xl->name, GF_LOG_WARNING,
+ "re-waiting already waiting task");
+ env->waitcount--;
+ break;
+ case SYNCTASK_DONE:
+ gf_log (task->xl->name, GF_LOG_WARNING,
+ "running completed task");
+ return;
+ case SYNCTASK_ZOMBIE:
gf_log (task->xl->name, GF_LOG_WARNING,
- "running completed task");
- break;
- }
+ "attempted to sleep a zombie!!");
+ return;
+ }
list_add_tail (&task->all_tasks, &env->waitq);
- env->waitcount++;
- task->state = SYNCTASK_WAIT;
+ env->waitcount++;
+ task->state = SYNCTASK_WAIT;
}
void
synctask_yield (struct synctask *task)
{
+ xlator_t *oldTHIS = THIS;
+
+#if defined(__NetBSD__) && defined(_UC_TLSBASE)
+ /* Preserve pthread private pointer through swapcontex() */
+ task->proc->sched.uc_flags &= ~_UC_TLSBASE;
+#endif
+
+ if (task->state != SYNCTASK_DONE)
+ task->state = SYNCTASK_SUSPEND;
if (swapcontext (&task->ctx, &task->proc->sched) < 0) {
gf_log ("syncop", GF_LOG_ERROR,
"swapcontext failed (%s)", strerror (errno));
}
+
+ THIS = oldTHIS;
}
@@ -102,10 +275,10 @@ synctask_wake (struct synctask *task)
if (task->slept)
__run (task);
+
+ pthread_cond_broadcast (&env->cond);
}
pthread_mutex_unlock (&env->mutex);
-
- pthread_cond_broadcast (&env->cond);
}
void
@@ -118,8 +291,8 @@ synctask_wrap (struct synctask *old_task)
task = synctask_get ();
task->ret = task->syncfn (task->opaque);
- if (task->synccbk)
- task->synccbk (task->ret, task->frame, task->opaque);
+ if (task->synccbk)
+ task->synccbk (task->ret, task->frame, task->opaque);
task->state = SYNCTASK_DONE;
@@ -133,15 +306,15 @@ synctask_destroy (struct synctask *task)
if (!task)
return;
- if (task->stack)
- FREE (task->stack);
+ FREE (task->stack);
if (task->opframe)
STACK_DESTROY (task->opframe->root);
- pthread_mutex_destroy (&task->mutex);
-
- pthread_cond_destroy (&task->cond);
+ if (task->synccbk == NULL) {
+ pthread_mutex_destroy (&task->mutex);
+ pthread_cond_destroy (&task->cond);
+ }
FREE (task);
}
@@ -150,34 +323,50 @@ synctask_destroy (struct synctask *task)
void
synctask_done (struct synctask *task)
{
- if (task->synccbk) {
- synctask_destroy (task);
- return;
- }
+ if (task->synccbk) {
+ synctask_destroy (task);
+ return;
+ }
- pthread_mutex_lock (&task->mutex);
- {
- task->done = 1;
- pthread_cond_broadcast (&task->cond);
- }
- pthread_mutex_unlock (&task->mutex);
+ pthread_mutex_lock (&task->mutex);
+ {
+ task->state = SYNCTASK_ZOMBIE;
+ task->done = 1;
+ pthread_cond_broadcast (&task->cond);
+ }
+ pthread_mutex_unlock (&task->mutex);
}
int
-synctask_new (struct syncenv *env, synctask_fn_t fn, synctask_cbk_t cbk,
- call_frame_t *frame, void *opaque)
+synctask_setid (struct synctask *task, uid_t uid, gid_t gid)
+{
+ if (!task)
+ return -1;
+
+ if (uid != -1)
+ task->uid = uid;
+
+ if (gid != -1)
+ task->gid = gid;
+
+ return 0;
+}
+
+
+struct synctask *
+synctask_create (struct syncenv *env, synctask_fn_t fn, synctask_cbk_t cbk,
+ call_frame_t *frame, void *opaque)
{
struct synctask *newtask = NULL;
xlator_t *this = THIS;
- int ret = 0;
VALIDATE_OR_GOTO (env, err);
VALIDATE_OR_GOTO (fn, err);
newtask = CALLOC (1, sizeof (*newtask));
if (!newtask)
- return -ENOMEM;
+ return NULL;
newtask->frame = frame;
if (!frame) {
@@ -190,10 +379,15 @@ synctask_new (struct syncenv *env, synctask_fn_t fn, synctask_cbk_t cbk,
newtask->env = env;
newtask->xl = this;
newtask->syncfn = fn;
- newtask->synccbk = cbk;
+ newtask->synccbk = cbk;
newtask->opaque = opaque;
+ /* default to the uid/gid of the passed frame */
+ newtask->uid = newtask->opframe->root->uid;
+ newtask->gid = newtask->opframe->root->gid;
+
INIT_LIST_HEAD (&newtask->all_tasks);
+ INIT_LIST_HEAD (&newtask->waitq);
if (getcontext (&newtask->ctx) < 0) {
gf_log ("syncop", GF_LOG_ERROR,
@@ -212,67 +406,114 @@ synctask_new (struct syncenv *env, synctask_fn_t fn, synctask_cbk_t cbk,
newtask->ctx.uc_stack.ss_sp = newtask->stack;
newtask->ctx.uc_stack.ss_size = env->stacksize;
- makecontext (&newtask->ctx, (void *) synctask_wrap, 2, newtask);
+ makecontext (&newtask->ctx, (void (*)(void)) synctask_wrap, 2, newtask);
- newtask->state = SYNCTASK_INIT;
+ newtask->state = SYNCTASK_INIT;
newtask->slept = 1;
- if (!cbk) {
- pthread_mutex_init (&newtask->mutex, NULL);
- pthread_cond_init (&newtask->cond, NULL);
- newtask->done = 0;
- }
+ if (!cbk) {
+ pthread_mutex_init (&newtask->mutex, NULL);
+ pthread_cond_init (&newtask->cond, NULL);
+ newtask->done = 0;
+ }
synctask_wake (newtask);
+ /*
+ * Make sure someone's there to execute anything we just put on the
+ * run queue.
+ */
+ syncenv_scale(env);
- if (!cbk) {
- pthread_mutex_lock (&newtask->mutex);
- {
- while (!newtask->done) {
- pthread_cond_wait (&newtask->cond, &newtask->mutex);
- }
- }
- pthread_mutex_unlock (&newtask->mutex);
-
- ret = newtask->ret;
-
- synctask_destroy (newtask);
- }
-
- return ret;
+ return newtask;
err:
if (newtask) {
- if (newtask->stack)
- FREE (newtask->stack);
+ FREE (newtask->stack);
if (newtask->opframe)
STACK_DESTROY (newtask->opframe->root);
FREE (newtask);
}
- return -1;
+
+ return NULL;
+}
+
+
+int
+synctask_join (struct synctask *task)
+{
+ int ret = 0;
+
+ pthread_mutex_lock (&task->mutex);
+ {
+ while (!task->done)
+ pthread_cond_wait (&task->cond, &task->mutex);
+ }
+ pthread_mutex_unlock (&task->mutex);
+
+ ret = task->ret;
+
+ synctask_destroy (task);
+
+ return ret;
+}
+
+
+int
+synctask_new (struct syncenv *env, synctask_fn_t fn, synctask_cbk_t cbk,
+ call_frame_t *frame, void *opaque)
+{
+ struct synctask *newtask = NULL;
+ int ret = 0;
+
+ newtask = synctask_create (env, fn, cbk, frame, opaque);
+ if (!newtask)
+ return -1;
+
+ if (!cbk)
+ ret = synctask_join (newtask);
+
+ return ret;
}
struct synctask *
syncenv_task (struct syncproc *proc)
{
- struct syncenv *env = NULL;
+ struct syncenv *env = NULL;
struct synctask *task = NULL;
+ struct timespec sleep_till = {0, };
+ int ret = 0;
- env = proc->env;
+ env = proc->env;
pthread_mutex_lock (&env->mutex);
{
- while (list_empty (&env->runq))
- pthread_cond_wait (&env->cond, &env->mutex);
+ while (list_empty (&env->runq)) {
+ sleep_till.tv_sec = time (NULL) + SYNCPROC_IDLE_TIME;
+ ret = pthread_cond_timedwait (&env->cond, &env->mutex,
+ &sleep_till);
+ if (!list_empty (&env->runq))
+ break;
+ if ((ret == ETIMEDOUT) &&
+ (env->procs > env->procmin)) {
+ task = NULL;
+ env->procs--;
+ memset (proc, 0, sizeof (*proc));
+ goto unlock;
+ }
+ }
task = list_entry (env->runq.next, struct synctask, all_tasks);
list_del_init (&task->all_tasks);
- env->runcount--;
+ env->runcount--;
+
+ task->woken = 0;
+ task->slept = 0;
- task->proc = proc;
+ task->proc = proc;
}
+unlock:
pthread_mutex_unlock (&env->mutex);
return task;
@@ -289,8 +530,10 @@ synctask_switchto (struct synctask *task)
synctask_set (task);
THIS = task->xl;
- task->woken = 0;
- task->slept = 0;
+#if defined(__NetBSD__) && defined(_UC_TLSBASE)
+ /* Preserve pthread private pointer through swapcontex() */
+ task->ctx.uc_flags &= ~_UC_TLSBASE;
+#endif
if (swapcontext (&task->proc->sched, &task->ctx) < 0) {
gf_log ("syncop", GF_LOG_ERROR,
@@ -326,10 +569,12 @@ syncenv_processor (void *thdata)
for (;;) {
task = syncenv_task (proc);
+ if (!task)
+ break;
synctask_switchto (task);
- syncenv_scale (env);
+ syncenv_scale (env);
}
return NULL;
@@ -339,27 +584,39 @@ syncenv_processor (void *thdata)
void
syncenv_scale (struct syncenv *env)
{
- int thmax = 0;
- int i = 0;
- int ret = 0;
+ int diff = 0;
+ int scale = 0;
+ int i = 0;
+ int ret = 0;
- pthread_mutex_lock (&env->mutex);
- {
- if (env->procs > env->runcount)
- goto unlock;
-
- thmax = min (env->runcount, SYNCENV_PROC_MAX);
- for (i = env->procs; i < thmax; i++) {
- env->proc[i].env = env;
- ret = pthread_create (&env->proc[i].processor, NULL,
- syncenv_processor, &env->proc[i]);
- if (ret)
- break;
- env->procs++;
- }
- }
+ pthread_mutex_lock (&env->mutex);
+ {
+ if (env->procs > env->runcount)
+ goto unlock;
+
+ scale = env->runcount;
+ if (scale > env->procmax)
+ scale = env->procmax;
+ if (scale > env->procs)
+ diff = scale - env->procs;
+ while (diff) {
+ diff--;
+ for (; (i < env->procmax); i++) {
+ if (env->proc[i].processor == 0)
+ break;
+ }
+
+ env->proc[i].env = env;
+ ret = gf_thread_create (&env->proc[i].processor, NULL,
+ syncenv_processor, &env->proc[i]);
+ if (ret)
+ break;
+ env->procs++;
+ i++;
+ }
+ }
unlock:
- pthread_mutex_unlock (&env->mutex);
+ pthread_mutex_unlock (&env->mutex);
}
@@ -371,12 +628,20 @@ syncenv_destroy (struct syncenv *env)
struct syncenv *
-syncenv_new (size_t stacksize)
+syncenv_new (size_t stacksize, int procmin, int procmax)
{
struct syncenv *newenv = NULL;
int ret = 0;
int i = 0;
+ if (!procmin || procmin < 0)
+ procmin = SYNCENV_PROC_MIN;
+ if (!procmax || procmax > SYNCENV_PROC_MAX)
+ procmax = SYNCENV_PROC_MAX;
+
+ if (procmin > procmax)
+ return NULL;
+
newenv = CALLOC (1, sizeof (*newenv));
if (!newenv)
@@ -391,11 +656,13 @@ syncenv_new (size_t stacksize)
newenv->stacksize = SYNCENV_DEFAULT_STACKSIZE;
if (stacksize)
newenv->stacksize = stacksize;
+ newenv->procmin = procmin;
+ newenv->procmax = procmax;
- for (i = 0; i < SYNCENV_PROC_MIN; i++) {
+ for (i = 0; i < newenv->procmin; i++) {
newenv->proc[i].env = newenv;
- ret = pthread_create (&newenv->proc[i].processor, NULL,
- syncenv_processor, &newenv->proc[i]);
+ ret = gf_thread_create (&newenv->proc[i].processor, NULL,
+ syncenv_processor, &newenv->proc[i]);
if (ret)
break;
newenv->procs++;
@@ -408,6 +675,268 @@ syncenv_new (size_t stacksize)
}
+int
+synclock_init (synclock_t *lock)
+{
+ if (!lock)
+ return -1;
+
+ pthread_cond_init (&lock->cond, 0);
+ lock->lock = 0;
+ INIT_LIST_HEAD (&lock->waitq);
+
+ return pthread_mutex_init (&lock->guard, 0);
+}
+
+
+int
+synclock_destroy (synclock_t *lock)
+{
+ if (!lock)
+ return -1;
+
+ pthread_cond_destroy (&lock->cond);
+ return pthread_mutex_destroy (&lock->guard);
+}
+
+
+static int
+__synclock_lock (struct synclock *lock)
+{
+ struct synctask *task = NULL;
+
+ if (!lock)
+ return -1;
+
+ task = synctask_get ();
+
+ while (lock->lock) {
+ if (task) {
+ /* called within a synctask */
+ list_add_tail (&task->waitq, &lock->waitq);
+ pthread_mutex_unlock (&lock->guard);
+ synctask_yield (task);
+ /* task is removed from waitq in unlock,
+ * under lock->guard.*/
+ pthread_mutex_lock (&lock->guard);
+ } else {
+ /* called by a non-synctask */
+ pthread_cond_wait (&lock->cond, &lock->guard);
+ }
+ }
+
+ lock->lock = _gf_true;
+ lock->owner = task;
+
+ return 0;
+}
+
+
+int
+synclock_lock (synclock_t *lock)
+{
+ int ret = 0;
+
+ pthread_mutex_lock (&lock->guard);
+ {
+ ret = __synclock_lock (lock);
+ }
+ pthread_mutex_unlock (&lock->guard);
+
+ return ret;
+}
+
+
+int
+synclock_trylock (synclock_t *lock)
+{
+ int ret = 0;
+
+ errno = 0;
+
+ pthread_mutex_lock (&lock->guard);
+ {
+ if (lock->lock) {
+ errno = EBUSY;
+ ret = -1;
+ goto unlock;
+ }
+
+ ret = __synclock_lock (lock);
+ }
+unlock:
+ pthread_mutex_unlock (&lock->guard);
+
+ return ret;
+}
+
+
+static int
+__synclock_unlock (synclock_t *lock)
+{
+ struct synctask *task = NULL;
+ struct synctask *curr = NULL;
+
+ if (!lock)
+ return -1;
+
+ curr = synctask_get ();
+
+ if (lock->owner != curr) {
+ /* warn ? */
+ }
+
+ lock->lock = _gf_false;
+
+ /* There could be both synctasks and non synctasks
+ waiting (or none, or either). As a mid-approach
+ between maintaining too many waiting counters
+ at one extreme and a thundering herd on unlock
+ at the other, call a cond_signal (which wakes
+ one waiter) and first synctask waiter. So at
+ most we have two threads waking up to grab the
+ just released lock.
+ */
+ pthread_cond_signal (&lock->cond);
+ if (!list_empty (&lock->waitq)) {
+ task = list_entry (lock->waitq.next, struct synctask, waitq);
+ list_del_init (&task->waitq);
+ synctask_wake (task);
+ }
+
+ return 0;
+}
+
+
+int
+synclock_unlock (synclock_t *lock)
+{
+ int ret = 0;
+
+ pthread_mutex_lock (&lock->guard);
+ {
+ ret = __synclock_unlock (lock);
+ }
+ pthread_mutex_unlock (&lock->guard);
+
+ return ret;
+}
+
+/* Barriers */
+
+int
+syncbarrier_init (struct syncbarrier *barrier)
+{
+ if (!barrier) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ pthread_cond_init (&barrier->cond, 0);
+ barrier->count = 0;
+ INIT_LIST_HEAD (&barrier->waitq);
+
+ return pthread_mutex_init (&barrier->guard, 0);
+}
+
+
+int
+syncbarrier_destroy (struct syncbarrier *barrier)
+{
+ if (!barrier) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ pthread_cond_destroy (&barrier->cond);
+ return pthread_mutex_destroy (&barrier->guard);
+}
+
+
+static int
+__syncbarrier_wait (struct syncbarrier *barrier, int waitfor)
+{
+ struct synctask *task = NULL;
+
+ if (!barrier) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ task = synctask_get ();
+
+ while (barrier->count < waitfor) {
+ if (task) {
+ /* called within a synctask */
+ list_add_tail (&task->waitq, &barrier->waitq);
+ pthread_mutex_unlock (&barrier->guard);
+ synctask_yield (task);
+ pthread_mutex_lock (&barrier->guard);
+ } else {
+ /* called by a non-synctask */
+ pthread_cond_wait (&barrier->cond, &barrier->guard);
+ }
+ }
+
+ barrier->count = 0;
+
+ return 0;
+}
+
+
+int
+syncbarrier_wait (struct syncbarrier *barrier, int waitfor)
+{
+ int ret = 0;
+
+ pthread_mutex_lock (&barrier->guard);
+ {
+ ret = __syncbarrier_wait (barrier, waitfor);
+ }
+ pthread_mutex_unlock (&barrier->guard);
+
+ return ret;
+}
+
+
+static int
+__syncbarrier_wake (struct syncbarrier *barrier)
+{
+ struct synctask *task = NULL;
+
+ if (!barrier) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ barrier->count++;
+
+ pthread_cond_signal (&barrier->cond);
+ if (!list_empty (&barrier->waitq)) {
+ task = list_entry (barrier->waitq.next, struct synctask, waitq);
+ list_del_init (&task->waitq);
+ synctask_wake (task);
+ }
+
+ return 0;
+}
+
+
+int
+syncbarrier_wake (struct syncbarrier *barrier)
+{
+ int ret = 0;
+
+ pthread_mutex_lock (&barrier->guard);
+ {
+ ret = __syncbarrier_wake (barrier);
+ }
+ pthread_mutex_unlock (&barrier->guard);
+
+ return ret;
+}
+
+
/* FOPS */
@@ -470,6 +999,8 @@ entry_copy (gf_dirent_t *source)
sink->d_type = source->d_type;
sink->d_stat = source->d_stat;
+ if (source->inode)
+ sink->inode = inode_ref (source->inode);
return sink;
}
@@ -628,6 +1159,34 @@ syncop_opendir (xlator_t *subvol,
}
int
+syncop_fsyncdir_cbk (call_frame_t *frame, void* cookie, xlator_t *this,
+ int op_ret, int op_errno, dict_t *xdata)
+{
+ struct syncargs *args = NULL;
+
+ args = cookie;
+
+ args->op_ret = op_ret;
+ args->op_errno = op_errno;
+
+ __wake (args);
+
+ return 0;
+}
+
+int
+syncop_fsyncdir (xlator_t *subvol, fd_t *fd, int datasync)
+{
+ struct syncargs args = {0, };
+
+ SYNCOP (subvol, (&args), syncop_fsyncdir_cbk, subvol->fops->fsyncdir,
+ fd, datasync, NULL);
+
+ errno = args.op_errno;
+ return args.op_ret;
+}
+
+int
syncop_removexattr_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
int op_ret, int op_errno, dict_t *xdata)
{
@@ -921,9 +1480,6 @@ syncop_open_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
args->op_ret = op_ret;
args->op_errno = op_errno;
- if (op_ret != -1)
- fd_ref (fd);
-
__wake (args);
return 0;
@@ -981,9 +1537,12 @@ syncop_readv (xlator_t *subvol, fd_t *fd, size_t size, off_t off,
SYNCOP (subvol, (&args), syncop_readv_cbk, subvol->fops->readv,
fd, size, off, flags, NULL);
+ if (args.op_ret < 0)
+ goto out;
+
if (vector)
*vector = args.vector;
- else if (args.vector)
+ else
GF_FREE (args.vector);
if (count)
@@ -995,6 +1554,7 @@ syncop_readv (xlator_t *subvol, fd_t *fd, size_t size, off_t off,
else if (args.iobref)
iobref_unref (args.iobref);
+out:
errno = args.op_errno;
return args.op_ret;
@@ -1018,14 +1578,15 @@ syncop_writev_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
}
int
-syncop_writev (xlator_t *subvol, fd_t *fd, struct iovec *vector,
+syncop_writev (xlator_t *subvol, fd_t *fd, const struct iovec *vector,
int32_t count, off_t offset, struct iobref *iobref,
uint32_t flags)
{
struct syncargs args = {0, };
SYNCOP (subvol, (&args), syncop_writev_cbk, subvol->fops->writev,
- fd, vector, count, offset, flags, iobref, NULL);
+ fd, (struct iovec *) vector, count, offset, flags, iobref,
+ NULL);
errno = args.op_errno;
return args.op_ret;
@@ -1069,8 +1630,8 @@ syncop_create_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
args->op_ret = op_ret;
args->op_errno = op_errno;
- if (op_ret != -1)
- fd_ref (fd);
+ if (buf)
+ args->iatt1 = *buf;
__wake (args);
@@ -1079,7 +1640,7 @@ syncop_create_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
int
syncop_create (xlator_t *subvol, loc_t *loc, int32_t flags, mode_t mode,
- fd_t *fd, dict_t *xdata)
+ fd_t *fd, dict_t *xdata, struct iatt *iatt)
{
struct syncargs args = {0, };
@@ -1087,6 +1648,9 @@ syncop_create (xlator_t *subvol, loc_t *loc, int32_t flags, mode_t mode,
loc, flags, mode, 0, fd, xdata);
errno = args.op_errno;
+ if (iatt)
+ *iatt = args.iatt1;
+
return args.op_ret;
}
@@ -1121,9 +1685,39 @@ syncop_unlink (xlator_t *subvol, loc_t *loc)
}
int
+syncop_rmdir_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
+ int op_ret, int op_errno, struct iatt *preparent,
+ struct iatt *postparent, dict_t *xdata)
+{
+ struct syncargs *args = NULL;
+
+ args = cookie;
+
+ args->op_ret = op_ret;
+ args->op_errno = op_errno;
+
+ __wake (args);
+
+ return 0;
+}
+
+int
+syncop_rmdir (xlator_t *subvol, loc_t *loc)
+{
+ struct syncargs args = {0, };
+
+ SYNCOP (subvol, (&args), syncop_rmdir_cbk, subvol->fops->rmdir, loc,
+ 0, NULL);
+
+ errno = args.op_errno;
+ return args.op_ret;
+}
+
+
+int
syncop_link_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
- int32_t op_ret, int32_t op_errno, inode_t *inode,
- struct iatt *buf, struct iatt *preparent,
+ int32_t op_ret, int32_t op_errno, inode_t *inode,
+ struct iatt *buf, struct iatt *preparent,
struct iatt *postparent, dict_t *xdata)
{
struct syncargs *args = NULL;
@@ -1152,6 +1746,41 @@ syncop_link (xlator_t *subvol, loc_t *oldloc, loc_t *newloc)
return args.op_ret;
}
+
+int
+syncop_rename_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno, struct iatt *buf,
+ struct iatt *preoldparent, struct iatt *postoldparent,
+ struct iatt *prenewparent, struct iatt *postnewparent,
+ dict_t *xdata)
+{
+ struct syncargs *args = NULL;
+
+ args = cookie;
+
+ args->op_ret = op_ret;
+ args->op_errno = op_errno;
+
+ __wake (args);
+
+ return 0;
+}
+
+
+int
+syncop_rename (xlator_t *subvol, loc_t *oldloc, loc_t *newloc)
+{
+ struct syncargs args = {0, };
+
+ SYNCOP (subvol, (&args), syncop_rename_cbk, subvol->fops->rename,
+ oldloc, newloc, NULL);
+
+ errno = args.op_errno;
+
+ return args.op_ret;
+}
+
+
int
syncop_ftruncate_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
int op_ret, int op_errno, struct iatt *prebuf,
@@ -1212,12 +1841,43 @@ syncop_fsync_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
}
int
-syncop_fsync (xlator_t *subvol, fd_t *fd)
+syncop_fsync (xlator_t *subvol, fd_t *fd, int dataonly)
{
struct syncargs args = {0, };
SYNCOP (subvol, (&args), syncop_fsync_cbk, subvol->fops->fsync,
- fd, 0, NULL);
+ fd, dataonly, NULL);
+
+ errno = args.op_errno;
+ return args.op_ret;
+
+}
+
+
+int
+syncop_flush_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno, dict_t *xdata)
+{
+ struct syncargs *args = NULL;
+
+ args = cookie;
+
+ args->op_ret = op_ret;
+ args->op_errno = op_errno;
+
+ __wake (args);
+
+ return 0;
+
+}
+
+int
+syncop_flush (xlator_t *subvol, fd_t *fd)
+{
+ struct syncargs args = {0};
+
+ SYNCOP (subvol, (&args), syncop_flush_cbk, subvol->fops->flush,
+ fd, NULL);
errno = args.op_errno;
return args.op_ret;
@@ -1287,6 +1947,8 @@ syncop_symlink_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
args->op_ret = op_ret;
args->op_errno = op_errno;
+ if (buf)
+ args->iatt1 = *buf;
__wake (args);
@@ -1294,7 +1956,8 @@ syncop_symlink_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
}
int
-syncop_symlink (xlator_t *subvol, loc_t *loc, char *newpath, dict_t *dict)
+syncop_symlink (xlator_t *subvol, loc_t *loc, const char *newpath, dict_t *dict,
+ struct iatt *iatt)
{
struct syncargs args = {0, };
@@ -1302,6 +1965,9 @@ syncop_symlink (xlator_t *subvol, loc_t *loc, char *newpath, dict_t *dict)
newpath, loc, 0, dict);
errno = args.op_errno;
+ if (iatt)
+ *iatt = args.iatt1;
+
return args.op_ret;
}
@@ -1336,8 +2002,7 @@ syncop_readlink (xlator_t *subvol, loc_t *loc, char **buffer, size_t size)
if (buffer)
*buffer = args.buffer;
- else if (args.buffer)
- GF_FREE (args.buffer);
+ else GF_FREE (args.buffer);
errno = args.op_errno;
return args.op_ret;
@@ -1356,6 +2021,9 @@ syncop_mknod_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
args->op_ret = op_ret;
args->op_errno = op_errno;
+ if (buf)
+ args->iatt1 = *buf;
+
__wake (args);
return 0;
@@ -1363,7 +2031,7 @@ syncop_mknod_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
int
syncop_mknod (xlator_t *subvol, loc_t *loc, mode_t mode, dev_t rdev,
- dict_t *dict)
+ dict_t *dict, struct iatt *iatt)
{
struct syncargs args = {0, };
@@ -1371,6 +2039,199 @@ syncop_mknod (xlator_t *subvol, loc_t *loc, mode_t mode, dev_t rdev,
loc, mode, rdev, 0, dict);
errno = args.op_errno;
+ if (iatt)
+ *iatt = args.iatt1;
+
return args.op_ret;
}
+
+
+int
+syncop_mkdir_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno, inode_t *inode,
+ struct iatt *buf, struct iatt *preparent,
+ struct iatt *postparent, dict_t *xdata)
+{
+ struct syncargs *args = NULL;
+
+ args = cookie;
+
+ args->op_ret = op_ret;
+ args->op_errno = op_errno;
+ if (buf)
+ args->iatt1 = *buf;
+
+ __wake (args);
+
+ return 0;
+}
+
+
+int
+syncop_mkdir (xlator_t *subvol, loc_t *loc, mode_t mode, dict_t *dict,
+ struct iatt *iatt)
+{
+ struct syncargs args = {0, };
+
+ SYNCOP (subvol, (&args), syncop_mkdir_cbk, subvol->fops->mkdir,
+ loc, mode, 0, dict);
+
+ errno = args.op_errno;
+ if (iatt)
+ *iatt = args.iatt1;
+
+ return args.op_ret;
+
+}
+
+int
+syncop_access_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno, dict_t *xdata)
+{
+ struct syncargs *args = NULL;
+
+ args = cookie;
+
+ args->op_ret = op_ret;
+ args->op_errno = op_errno;
+ __wake (args);
+
+ return 0;
+}
+
+int
+syncop_access (xlator_t *subvol, loc_t *loc, int32_t mask)
+{
+ struct syncargs args = {0, };
+
+ SYNCOP (subvol, (&args), syncop_access_cbk, subvol->fops->access,
+ loc, mask, NULL);
+
+ errno = args.op_errno;
+ return args.op_ret;
+}
+
+
+int
+syncop_fallocate_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
+ int op_ret, int op_errno, struct iatt *prebuf,
+ struct iatt *postbuf, dict_t *xdata)
+{
+ struct syncargs *args = NULL;
+
+ args = cookie;
+
+ args->op_ret = op_ret;
+ args->op_errno = op_errno;
+
+ __wake (args);
+
+ return 0;
+}
+
+int
+syncop_fallocate(xlator_t *subvol, fd_t *fd, int32_t keep_size, off_t offset,
+ size_t len)
+{
+ struct syncargs args = {0, };
+
+ SYNCOP (subvol, (&args), syncop_fallocate_cbk, subvol->fops->fallocate,
+ fd, keep_size, offset, len, NULL);
+
+ errno = args.op_errno;
+ return args.op_ret;
+}
+
+
+int
+syncop_discard_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
+ int op_ret, int op_errno, struct iatt *prebuf,
+ struct iatt *postbuf, dict_t *xdata)
+{
+ struct syncargs *args = NULL;
+
+ args = cookie;
+
+ args->op_ret = op_ret;
+ args->op_errno = op_errno;
+
+ __wake (args);
+
+ return 0;
+}
+
+int
+syncop_discard(xlator_t *subvol, fd_t *fd, off_t offset, size_t len)
+{
+ struct syncargs args = {0, };
+
+ SYNCOP (subvol, (&args), syncop_discard_cbk, subvol->fops->discard,
+ fd, offset, len, NULL);
+
+ errno = args.op_errno;
+ return args.op_ret;
+}
+
+int
+syncop_zerofill_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
+ int op_ret, int op_errno, struct iatt *prebuf,
+ struct iatt *postbuf, dict_t *xdata)
+{
+ struct syncargs *args = NULL;
+
+ args = cookie;
+
+ args->op_ret = op_ret;
+ args->op_errno = op_errno;
+
+ __wake (args);
+
+ return 0;
+}
+
+int
+syncop_zerofill(xlator_t *subvol, fd_t *fd, off_t offset, size_t len)
+{
+ struct syncargs args = {0, };
+
+ SYNCOP (subvol, (&args), syncop_zerofill_cbk, subvol->fops->zerofill,
+ fd, offset, len, NULL);
+
+ errno = args.op_errno;
+ return args.op_ret;
+}
+
+
+int
+syncop_lk_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
+ int op_ret, int op_errno, struct gf_flock *flock,
+ dict_t *xdata)
+{
+ struct syncargs *args = NULL;
+
+ args = cookie;
+
+ args->op_ret = op_ret;
+ args->op_errno = op_errno;
+ if (flock)
+ args->flock = *flock;
+ __wake (args);
+
+ return 0;
+}
+
+
+int
+syncop_lk (xlator_t *subvol, fd_t *fd, int cmd, struct gf_flock *flock)
+{
+ struct syncargs args = {0, };
+
+ SYNCOP (subvol, (&args), syncop_lk_cbk, subvol->fops->lk,
+ fd, cmd, flock, NULL);
+
+ errno = args.op_errno;
+ *flock = args.flock;
+
+ return args.op_ret;
+}