From 02a235b5a5fcfffd17debfbf3fceeddffe171682 Mon Sep 17 00:00:00 2001 From: Pranith Kumar K Date: Thu, 17 Mar 2016 09:32:02 +0530 Subject: syncop: Add parallel dir scan functionality Most of this functionality's ideas are contributed by Richard Wareing, in his patch: https://bugzilla.redhat.com/show_bug.cgi?id=1221737#c1 VERY BIG thanks to him :-). After starting porting/testing the patch above, I found a few things we can improve in this patch based on the results we got in testing. 1) We are reading all the indices before we launch self-heals. In some customer cases I worked on there were almost 5million files/directories that needed heal. With such a big number self-heal daemon will be OOM killed if we go this route. So I modified this to launch heals based on a queue length limit. 2) We found that for directory hierarchies, multi-threaded self-heal patch was not giving better results compared to single-threaded self-heal because of the order problems. We improved index xlator to give gfid type to make sure that all directories in the indices are healed before the files that follow in that iteration of readdir output(http://review.gluster.org/13553). In our testing this lead to zero errors of self-heals as we were only doing self-heals in parallel for files and not directories. I think we can further improve self-heal speed for directories by doing name heals in parallel based on similar techniques Richard's patch showed. I think the best thing there would be to introduce synccond_t infra (pthread_cond_t kind of infra for syncops) which I am planning to implement for future releases. 3) Based on 1), 2) and the fact that afr already does retries of the indices in a loop I removed retries again in the threads. 4) After the refactor, the changes required to bring in multi-threaded self-heal for ec would just be ~10 lines, most of it will be about options initialization. Our tests found that we are able to easily saturate network :-). High level description of the final feature: Traditionally self-heal daemon reads the indices (gfids) that need to be healed from the brick and initiates heal one gfid at a time. Goal of this feature is to add parallelization to the way we do self-heals in a way we do not regress in any case but increase parallelization wherever we can. As part of this following knobs are introduced to improve parallelization: 1) We can launch 'max-jobs' number of heals in parallel. 2) We can keep reading indices as long as the wait-q for heals doesn't go over 'max-qlen' passed as arguments to multi-threaded dir_scan. As a first cut, we always do healing of directories in serial order one at a time but for files we launch heals in parallel. In future we can do name-heals of dir in parallel, but this is not implemented as of now. Reason for this is mentioned already in '2)' above. AFR/EC can introduce options like max-shd-threads/wait-qlength which can be set by users to increase the rate of heals when they want. Please note that the options will take effect only for the next crawl. >BUG: 1221737 >Change-Id: I8fc0afc334def87797f6d41e309cefc722a317d2 >Signed-off-by: Pranith Kumar K >Reviewed-on: http://review.gluster.org/13569 >NetBSD-regression: NetBSD Build System >CentOS-regression: Gluster Build System >Reviewed-by: Jeff Darcy >Smoke: Gluster Build System BUG: 1325857 Change-Id: I23235bbb923208eee6a8be711bbfb14350edb11b Signed-off-by: Pranith Kumar K Reviewed-on: http://review.gluster.org/13967 Smoke: Gluster Build System NetBSD-regression: NetBSD Build System CentOS-regression: Gluster Build System --- libglusterfs/src/mem-types.h | 1 + libglusterfs/src/syncop-utils.c | 238 ++++++++++++++++++++++++++++++++++++++++ libglusterfs/src/syncop-utils.h | 7 ++ 3 files changed, 246 insertions(+) (limited to 'libglusterfs') diff --git a/libglusterfs/src/mem-types.h b/libglusterfs/src/mem-types.h index 84949c61487..dd96cc63545 100644 --- a/libglusterfs/src/mem-types.h +++ b/libglusterfs/src/mem-types.h @@ -155,6 +155,7 @@ enum gf_common_mem_types_ { gf_common_mt_synctask, gf_common_mt_syncstack, gf_common_mt_syncenv, + gf_common_mt_scan_data, gf_common_mt_end }; #endif diff --git a/libglusterfs/src/syncop-utils.c b/libglusterfs/src/syncop-utils.c index 16ae1f7d74d..5e6b9fa5bfe 100644 --- a/libglusterfs/src/syncop-utils.c +++ b/libglusterfs/src/syncop-utils.c @@ -14,9 +14,24 @@ #endif #include "syncop.h" +#include "syncop-utils.h" #include "common-utils.h" #include "libglusterfs-messages.h" +struct syncop_dir_scan_data { + xlator_t *subvol; + loc_t *parent; + void *data; + gf_dirent_t *q; + gf_dirent_t *entry; + pthread_cond_t *cond; + pthread_mutex_t *mut; + syncop_dir_scan_fn_t fn; + uint32_t *jobs_running; + uint32_t *qlen; + int32_t *retval; +}; + int syncop_dirfd (xlator_t *subvol, loc_t *loc, fd_t **fd, int pid) { @@ -224,6 +239,229 @@ out: return ret; } +static void +_scan_data_destroy (struct syncop_dir_scan_data *data) +{ + GF_FREE (data); +} + +static int +_dir_scan_job_fn_cbk (int ret, call_frame_t *frame, void *opaque) +{ + struct syncop_dir_scan_data *scan_data = opaque; + + _scan_data_destroy (scan_data); + return 0; +} + +static int +_dir_scan_job_fn (void *data) +{ + struct syncop_dir_scan_data *scan_data = data; + gf_dirent_t *entry = NULL; + int ret = 0; + + entry = scan_data->entry; + scan_data->entry = NULL; + do { + ret = scan_data->fn (scan_data->subvol, entry, + scan_data->parent, + scan_data->data); + gf_dirent_entry_free (entry); + entry = NULL; + pthread_mutex_lock (scan_data->mut); + { + if (ret || list_empty (&scan_data->q->list)) { + (*scan_data->jobs_running)--; + *scan_data->retval |= ret; + pthread_cond_broadcast (scan_data->cond); + } else { + entry = list_first_entry (&scan_data->q->list, + typeof (*scan_data->q), list); + list_del_init (&entry->list); + (*scan_data->qlen)--; + } + } + pthread_mutex_unlock (scan_data->mut); + } while (entry); + + return ret; +} + +static int +_run_dir_scan_task (xlator_t *subvol, loc_t *parent, gf_dirent_t *q, + gf_dirent_t *entry, int *retval, pthread_mutex_t *mut, + pthread_cond_t *cond, uint32_t *jobs_running, + uint32_t *qlen, syncop_dir_scan_fn_t fn, void *data) +{ + int ret = 0; + struct syncop_dir_scan_data *scan_data = NULL; + + + scan_data = GF_CALLOC (1, sizeof (struct syncop_dir_scan_data), + gf_common_mt_scan_data); + if (!scan_data) { + ret = -ENOMEM; + goto out; + } + + scan_data->subvol = subvol; + scan_data->parent = parent; + scan_data->data = data; + scan_data->mut = mut; + scan_data->cond = cond; + scan_data->fn = fn; + scan_data->jobs_running = jobs_running; + scan_data->entry = entry; + scan_data->q = q; + scan_data->qlen = qlen; + scan_data->retval = retval; + + ret = synctask_new (subvol->ctx->env, _dir_scan_job_fn, + _dir_scan_job_fn_cbk, NULL, scan_data); +out: + if (ret < 0) { + gf_dirent_entry_free (entry); + _scan_data_destroy (scan_data); + pthread_mutex_lock (mut); + { + *jobs_running = *jobs_running - 1; + } + pthread_mutex_unlock (mut); + /*No need to cond-broadcast*/ + } + return ret; +} + +int +syncop_mt_dir_scan (xlator_t *subvol, loc_t *loc, int pid, void *data, + syncop_dir_scan_fn_t fn, dict_t *xdata, uint32_t max_jobs, + uint32_t max_qlen) +{ + fd_t *fd = NULL; + uint64_t offset = 0; + gf_dirent_t *last = NULL; + int ret = 0; + int retval = 0; + gf_dirent_t q; + gf_dirent_t *entry = NULL; + gf_dirent_t *tmp = NULL; + uint32_t jobs_running = 0; + uint32_t qlen = 0; + pthread_cond_t cond; + pthread_mutex_t mut; + gf_boolean_t cond_init = _gf_false; + gf_boolean_t mut_init = _gf_false; + gf_dirent_t entries; + + /*For this functionality to be implemented in general, we need + * synccond_t infra which doesn't block the executing thread. Until then + * return failures inside synctask if they use this.*/ + if (synctask_get()) + return -ENOTSUP; + + if (max_jobs == 0) + return -EINVAL; + + /*Code becomes simpler this way. cond_wait just on qlength. + * Little bit of cheating*/ + if (max_qlen == 0) + max_qlen = 1; + + ret = syncop_dirfd (subvol, loc, &fd, pid); + if (ret) + goto out; + + INIT_LIST_HEAD (&entries.list); + INIT_LIST_HEAD (&q.list); + ret = pthread_mutex_init (&mut, NULL); + if (ret) + goto out; + mut_init = _gf_true; + + ret = pthread_cond_init (&cond, NULL); + if (ret) + goto out; + cond_init = _gf_true; + + while ((ret = syncop_readdir (subvol, fd, 131072, offset, &entries, + xdata, NULL))) { + if (ret < 0) + break; + + if (ret > 0) { + /* If the entries are only '.', and '..' then ret + * value will be non-zero. so set it to zero here. */ + ret = 0; + } + + last = list_last_entry (&entries.list, typeof (*last), list); + offset = last->d_off; + + list_for_each_entry_safe (entry, tmp, &entries.list, list) { + list_del_init (&entry->list); + if (!strcmp (entry->d_name, ".") || + !strcmp (entry->d_name, "..")) { + gf_dirent_entry_free (entry); + continue; + } + + if (entry->d_type == IA_IFDIR) { + ret = fn (subvol, entry, loc, data); + gf_dirent_entry_free (entry); + if (ret) + break; + continue; + } + + pthread_mutex_lock (&mut); + { + while (qlen == max_qlen) + pthread_cond_wait (&cond, &mut); + if (max_jobs == jobs_running) { + list_add_tail (&entry->list, &q.list); + qlen++; + entry = NULL; + } else { + jobs_running++; + } + } + pthread_mutex_unlock (&mut); + if (retval) /*Any jobs failed?*/ + break; + + if (!entry) + continue; + + ret = _run_dir_scan_task (subvol, loc, &q, entry, + &retval, &mut, &cond, + &jobs_running, &qlen, fn, data); + if (ret) + break; + } + } + +out: + if (fd) + fd_unref (fd); + if (mut_init && cond_init) { + pthread_mutex_lock (&mut); + { + while (jobs_running) + pthread_cond_wait (&cond, &mut); + } + pthread_mutex_unlock (&mut); + gf_dirent_free (&q); + gf_dirent_free (&entries); + } + + if (mut_init) + pthread_mutex_destroy (&mut); + if (cond_init) + pthread_cond_destroy (&cond); + return ret|retval; +} + int syncop_dir_scan (xlator_t *subvol, loc_t *loc, int pid, void *data, int (*fn) (xlator_t *subvol, gf_dirent_t *entry, loc_t *parent, diff --git a/libglusterfs/src/syncop-utils.h b/libglusterfs/src/syncop-utils.h index 7a9ccacb285..52bcfd99429 100644 --- a/libglusterfs/src/syncop-utils.h +++ b/libglusterfs/src/syncop-utils.h @@ -11,11 +11,18 @@ #ifndef _SYNCOP_UTILS_H #define _SYNCOP_UTILS_H +typedef int (*syncop_dir_scan_fn_t) (xlator_t *subvol, gf_dirent_t *entry, + loc_t *parent, void *data); int syncop_ftw (xlator_t *subvol, loc_t *loc, int pid, void *data, int (*fn) (xlator_t *subvol, gf_dirent_t *entry, loc_t *parent, void *data)); +int +syncop_mt_dir_scan (xlator_t *subvol, loc_t *loc, int pid, void *data, + syncop_dir_scan_fn_t fn, dict_t *xdata, uint32_t max_jobs, + uint32_t max_qlen); + int syncop_dir_scan (xlator_t *subvol, loc_t *loc, int pid, void *data, int (*fn) (xlator_t *subvol, gf_dirent_t *entry, loc_t *parent, -- cgit