diff options
Diffstat (limited to 'libglusterfs/src/syncop.c')
-rw-r--r-- | libglusterfs/src/syncop.c | 359 |
1 files changed, 288 insertions, 71 deletions
diff --git a/libglusterfs/src/syncop.c b/libglusterfs/src/syncop.c index 2eb7b49fc4c..df20cec559f 100644 --- a/libglusterfs/src/syncop.c +++ b/libglusterfs/src/syncop.c @@ -11,6 +11,10 @@ #include "glusterfs/syncop.h" #include "glusterfs/libglusterfs-messages.h" +#ifdef HAVE_TSAN_API +#include <sanitizer/tsan_interface.h> +#endif + int syncopctx_setfsuid(void *uid) { @@ -97,6 +101,13 @@ syncopctx_setfsgroups(int count, const void *groups) /* set/reset the ngrps, this is where reset of groups is handled */ opctx->ngrps = count; + + if ((opctx->valid & SYNCOPCTX_GROUPS) == 0) { + /* This is the first time we are storing groups into the TLS structure + * so we mark the current thread so that it will be properly cleaned + * up when the thread terminates. */ + gf_thread_needs_cleanup(); + } opctx->valid |= SYNCOPCTX_GROUPS; out: @@ -147,10 +158,14 @@ out: return ret; } +void * +syncenv_processor(void *thdata); + static void __run(struct synctask *task) { struct syncenv *env = NULL; + int32_t total, ret, i; env = task->env; @@ -166,7 +181,6 @@ __run(struct synctask *task) env->runcount--; break; case SYNCTASK_WAIT: - env->waitcount--; break; case SYNCTASK_DONE: gf_msg(task->xl->name, GF_LOG_WARNING, 0, LG_MSG_COMPLETED_TASK, @@ -180,8 +194,27 @@ __run(struct synctask *task) } list_add_tail(&task->all_tasks, &env->runq); - env->runcount++; task->state = SYNCTASK_RUN; + + env->runcount++; + + total = env->procs + env->runcount - env->procs_idle; + if (total > env->procmax) { + total = env->procmax; + } + if (total > env->procs) { + for (i = 0; i < env->procmax; i++) { + if (env->proc[i].env == NULL) { + env->proc[i].env = env; + ret = gf_thread_create(&env->proc[i].processor, NULL, + syncenv_processor, &env->proc[i], + "sproc%d", i); + if ((ret < 0) || (++env->procs >= total)) { + break; + } + } + } + } } static void @@ -203,7 +236,6 @@ __wait(struct synctask *task) gf_msg(task->xl->name, GF_LOG_WARNING, 0, LG_MSG_REWAITING_TASK, "re-waiting already waiting " "task"); - env->waitcount--; break; case SYNCTASK_DONE: gf_msg(task->xl->name, GF_LOG_WARNING, 0, LG_MSG_COMPLETED_TASK, @@ -216,12 +248,11 @@ __wait(struct synctask *task) } list_add_tail(&task->all_tasks, &env->waitq); - env->waitcount++; task->state = SYNCTASK_WAIT; } void -synctask_yield(struct synctask *task) +synctask_yield(struct synctask *task, struct timespec *delta) { xlator_t *oldTHIS = THIS; @@ -230,9 +261,16 @@ synctask_yield(struct synctask *task) task->proc->sched.uc_flags &= ~_UC_TLSBASE; #endif + task->delta = delta; + if (task->state != SYNCTASK_DONE) { task->state = SYNCTASK_SUSPEND; } + +#ifdef HAVE_TSAN_API + __tsan_switch_to_fiber(task->proc->tsan.fiber, 0); +#endif + if (swapcontext(&task->ctx, &task->proc->sched) < 0) { gf_msg("syncop", GF_LOG_ERROR, errno, LG_MSG_SWAPCONTEXT_FAILED, "swapcontext failed"); @@ -242,6 +280,35 @@ synctask_yield(struct synctask *task) } void +synctask_sleep(int32_t secs) +{ + struct timespec delta; + struct synctask *task; + + task = synctask_get(); + + if (task == NULL) { + sleep(secs); + } else { + delta.tv_sec = secs; + delta.tv_nsec = 0; + + synctask_yield(task, &delta); + } +} + +static void +__synctask_wake(struct synctask *task) +{ + task->woken = 1; + + if (task->slept) + __run(task); + + pthread_cond_broadcast(&task->env->cond); +} + +void synctask_wake(struct synctask *task) { struct syncenv *env = NULL; @@ -250,13 +317,18 @@ synctask_wake(struct synctask *task) pthread_mutex_lock(&env->mutex); { - task->woken = 1; + if (task->timer != NULL) { + if (gf_timer_call_cancel(task->xl->ctx, task->timer) != 0) { + goto unlock; + } - if (task->slept) - __run(task); + task->timer = NULL; + task->synccond = NULL; + } - pthread_cond_broadcast(&env->cond); + __synctask_wake(task); } +unlock: pthread_mutex_unlock(&env->mutex); } @@ -275,7 +347,7 @@ synctask_wrap(void) task->state = SYNCTASK_DONE; - synctask_yield(task); + synctask_yield(task, NULL); } void @@ -294,6 +366,10 @@ synctask_destroy(struct synctask *task) pthread_cond_destroy(&task->cond); } +#ifdef HAVE_TSAN_API + __tsan_destroy_fiber(task->tsan.fiber); +#endif + GF_FREE(task); } @@ -404,6 +480,13 @@ synctask_create(struct syncenv *env, size_t stacksize, synctask_fn_t fn, makecontext(&newtask->ctx, (void (*)(void))synctask_wrap, 0); +#ifdef HAVE_TSAN_API + newtask->tsan.fiber = __tsan_create_fiber(0); + snprintf(newtask->tsan.name, TSAN_THREAD_NAMELEN, "<synctask of %s>", + this->name); + __tsan_set_fiber_name(newtask->tsan.fiber, newtask->tsan.name); +#endif + newtask->state = SYNCTASK_INIT; newtask->slept = 1; @@ -415,11 +498,6 @@ synctask_create(struct syncenv *env, size_t stacksize, synctask_fn_t fn, } synctask_wake(newtask); - /* - * Make sure someone's there to execute anything we just put on the - * run queue. - */ - syncenv_scale(env); return newtask; err: @@ -513,8 +591,12 @@ syncenv_task(struct syncproc *proc) goto unlock; } - sleep_till.tv_sec = time(NULL) + SYNCPROC_IDLE_TIME; + env->procs_idle++; + + sleep_till.tv_sec = gf_time() + SYNCPROC_IDLE_TIME; ret = pthread_cond_timedwait(&env->cond, &env->mutex, &sleep_till); + + env->procs_idle--; } task = list_entry(env->runq.next, struct synctask, all_tasks); @@ -533,6 +615,34 @@ unlock: return task; } +static void +synctask_timer(void *data) +{ + struct synctask *task = data; + struct synccond *cond; + + cond = task->synccond; + if (cond != NULL) { + pthread_mutex_lock(&cond->pmutex); + + list_del_init(&task->waitq); + task->synccond = NULL; + + pthread_mutex_unlock(&cond->pmutex); + + task->ret = -ETIMEDOUT; + } + + pthread_mutex_lock(&task->env->mutex); + + gf_timer_call_cancel(task->xl->ctx, task->timer); + task->timer = NULL; + + __synctask_wake(task); + + pthread_mutex_unlock(&task->env->mutex); +} + void synctask_switchto(struct synctask *task) { @@ -548,6 +658,10 @@ synctask_switchto(struct synctask *task) task->ctx.uc_flags &= ~_UC_TLSBASE; #endif +#ifdef HAVE_TSAN_API + __tsan_switch_to_fiber(task->tsan.fiber, 0); +#endif + if (swapcontext(&task->proc->sched, &task->ctx) < 0) { gf_msg("syncop", GF_LOG_ERROR, errno, LG_MSG_SWAPCONTEXT_FAILED, "swapcontext failed"); @@ -565,7 +679,14 @@ synctask_switchto(struct synctask *task) } else { task->slept = 1; __wait(task); + + if (task->delta != NULL) { + task->timer = gf_timer_call_after(task->xl->ctx, *task->delta, + synctask_timer, task); + } } + + task->delta = NULL; } pthread_mutex_unlock(&env->mutex); } @@ -573,63 +694,27 @@ synctask_switchto(struct synctask *task) void * syncenv_processor(void *thdata) { - struct syncenv *env = NULL; struct syncproc *proc = NULL; struct synctask *task = NULL; proc = thdata; - env = proc->env; - for (;;) { - task = syncenv_task(proc); - if (!task) - break; +#ifdef HAVE_TSAN_API + proc->tsan.fiber = __tsan_create_fiber(0); + snprintf(proc->tsan.name, TSAN_THREAD_NAMELEN, "<sched of syncenv@%p>", + proc); + __tsan_set_fiber_name(proc->tsan.fiber, proc->tsan.name); +#endif + while ((task = syncenv_task(proc)) != NULL) { synctask_switchto(task); - - syncenv_scale(env); } - return NULL; -} - -void -syncenv_scale(struct syncenv *env) -{ - int diff = 0; - int scale = 0; - int i = 0; - int ret = 0; - - pthread_mutex_lock(&env->mutex); - { - if (env->procs > env->runcount) - goto unlock; - - scale = env->runcount; - if (scale > env->procmax) - scale = env->procmax; - if (scale > env->procs) - diff = scale - env->procs; - while (diff) { - diff--; - for (; (i < env->procmax); i++) { - if (env->proc[i].processor == 0) - break; - } +#ifdef HAVE_TSAN_API + __tsan_destroy_fiber(proc->tsan.fiber); +#endif - env->proc[i].env = env; - ret = gf_thread_create(&env->proc[i].processor, NULL, - syncenv_processor, &env->proc[i], - "sproc%03hx", env->procs & 0x3ff); - if (ret) - break; - env->procs++; - i++; - } - } -unlock: - pthread_mutex_unlock(&env->mutex); + return NULL; } /* The syncenv threads are cleaned up in this routine. @@ -708,12 +793,13 @@ syncenv_new(size_t stacksize, int procmin, int procmax) newenv->stacksize = stacksize; newenv->procmin = procmin; newenv->procmax = procmax; + newenv->procs_idle = 0; for (i = 0; i < newenv->procmin; i++) { newenv->proc[i].env = newenv; ret = gf_thread_create(&newenv->proc[i].processor, NULL, syncenv_processor, &newenv->proc[i], "sproc%d", - newenv->procs); + i); if (ret) break; newenv->procs++; @@ -803,7 +889,7 @@ __synclock_lock(struct synclock *lock) task->woken = 0; list_add_tail(&task->waitq, &lock->waitq); pthread_mutex_unlock(&lock->guard); - synctask_yield(task); + synctask_yield(task, NULL); /* task is removed from waitq in unlock, * under lock->guard.*/ pthread_mutex_lock(&lock->guard); @@ -956,6 +1042,136 @@ synclock_unlock(synclock_t *lock) return ret; } +/* Condition variables */ + +int32_t +synccond_init(synccond_t *cond) +{ + int32_t ret; + + INIT_LIST_HEAD(&cond->waitq); + + ret = pthread_mutex_init(&cond->pmutex, NULL); + if (ret != 0) { + return -ret; + } + + ret = pthread_cond_init(&cond->pcond, NULL); + if (ret != 0) { + pthread_mutex_destroy(&cond->pmutex); + } + + return -ret; +} + +void +synccond_destroy(synccond_t *cond) +{ + pthread_cond_destroy(&cond->pcond); + pthread_mutex_destroy(&cond->pmutex); +} + +int +synccond_timedwait(synccond_t *cond, synclock_t *lock, struct timespec *delta) +{ + struct timespec now; + struct synctask *task = NULL; + int ret; + + task = synctask_get(); + + if (task == NULL) { + if (delta != NULL) { + timespec_now_realtime(&now); + timespec_adjust_delta(&now, *delta); + } + + pthread_mutex_lock(&cond->pmutex); + + if (delta == NULL) { + ret = -pthread_cond_wait(&cond->pcond, &cond->pmutex); + } else { + ret = -pthread_cond_timedwait(&cond->pcond, &cond->pmutex, &now); + } + } else { + pthread_mutex_lock(&cond->pmutex); + + list_add_tail(&task->waitq, &cond->waitq); + task->synccond = cond; + + ret = synclock_unlock(lock); + if (ret == 0) { + pthread_mutex_unlock(&cond->pmutex); + + synctask_yield(task, delta); + + ret = synclock_lock(lock); + if (ret == 0) { + ret = task->ret; + } + task->ret = 0; + + return ret; + } + + list_del_init(&task->waitq); + } + + pthread_mutex_unlock(&cond->pmutex); + + return ret; +} + +int +synccond_wait(synccond_t *cond, synclock_t *lock) +{ + return synccond_timedwait(cond, lock, NULL); +} + +void +synccond_signal(synccond_t *cond) +{ + struct synctask *task; + + pthread_mutex_lock(&cond->pmutex); + + if (!list_empty(&cond->waitq)) { + task = list_first_entry(&cond->waitq, struct synctask, waitq); + list_del_init(&task->waitq); + + pthread_mutex_unlock(&cond->pmutex); + + synctask_wake(task); + } else { + pthread_cond_signal(&cond->pcond); + + pthread_mutex_unlock(&cond->pmutex); + } +} + +void +synccond_broadcast(synccond_t *cond) +{ + struct list_head list; + struct synctask *task; + + INIT_LIST_HEAD(&list); + + pthread_mutex_lock(&cond->pmutex); + + list_splice_init(&cond->waitq, &list); + pthread_cond_broadcast(&cond->pcond); + + pthread_mutex_unlock(&cond->pmutex); + + while (!list_empty(&list)) { + task = list_first_entry(&list, struct synctask, waitq); + list_del_init(&task->waitq); + + synctask_wake(task); + } +} + /* Barriers */ int @@ -1025,7 +1241,7 @@ __syncbarrier_wait(struct syncbarrier *barrier, int waitfor) /* called within a synctask */ list_add_tail(&task->waitq, &barrier->waitq); pthread_mutex_unlock(&barrier->guard); - synctask_yield(task); + synctask_yield(task, NULL); pthread_mutex_lock(&barrier->guard); } else { /* called by a non-synctask */ @@ -2874,12 +3090,13 @@ syncop_seek(xlator_t *subvol, fd_t *fd, off_t offset, gf_seek_what_t what, SYNCOP(subvol, (&args), syncop_seek_cbk, subvol->fops->seek, fd, offset, what, xdata_in); - if (*off) - *off = args.offset; - - if (args.op_ret == -1) + if (args.op_ret < 0) { return -args.op_errno; - return args.op_ret; + } else { + if (off) + *off = args.offset; + return args.op_ret; + } } int |