summaryrefslogtreecommitdiffstats
path: root/libglusterfs/src/syncop-utils.c
diff options
context:
space:
mode:
authorPranith Kumar K <pkarampu@redhat.com>2016-03-17 09:32:02 +0530
committerJeff Darcy <jdarcy@redhat.com>2016-04-04 17:36:33 -0700
commitc76a1690bbd909b1c2dd2c495e2a8352d599b14b (patch)
tree30906a06541623d7249e23134bd97fd17e562f9e /libglusterfs/src/syncop-utils.c
parent3394c555171775c6f74c2c556265b044a70a2fa9 (diff)
syncop: Add parallel dir scan functionality
Most of this functionality's ideas are contributed by Richard Wareing, in his patch: https://bugzilla.redhat.com/show_bug.cgi?id=1221737#c1 VERY BIG thanks to him :-). After starting porting/testing the patch above, I found a few things we can improve in this patch based on the results we got in testing. 1) We are reading all the indices before we launch self-heals. In some customer cases I worked on there were almost 5million files/directories that needed heal. With such a big number self-heal daemon will be OOM killed if we go this route. So I modified this to launch heals based on a queue length limit. 2) We found that for directory hierarchies, multi-threaded self-heal patch was not giving better results compared to single-threaded self-heal because of the order problems. We improved index xlator to give gfid type to make sure that all directories in the indices are healed before the files that follow in that iteration of readdir output(http://review.gluster.org/13553). In our testing this lead to zero errors of self-heals as we were only doing self-heals in parallel for files and not directories. I think we can further improve self-heal speed for directories by doing name heals in parallel based on similar techniques Richard's patch showed. I think the best thing there would be to introduce synccond_t infra (pthread_cond_t kind of infra for syncops) which I am planning to implement for future releases. 3) Based on 1), 2) and the fact that afr already does retries of the indices in a loop I removed retries again in the threads. 4) After the refactor, the changes required to bring in multi-threaded self-heal for ec would just be ~10 lines, most of it will be about options initialization. Our tests found that we are able to easily saturate network :-). High level description of the final feature: Traditionally self-heal daemon reads the indices (gfids) that need to be healed from the brick and initiates heal one gfid at a time. Goal of this feature is to add parallelization to the way we do self-heals in a way we do not regress in any case but increase parallelization wherever we can. As part of this following knobs are introduced to improve parallelization: 1) We can launch 'max-jobs' number of heals in parallel. 2) We can keep reading indices as long as the wait-q for heals doesn't go over 'max-qlen' passed as arguments to multi-threaded dir_scan. As a first cut, we always do healing of directories in serial order one at a time but for files we launch heals in parallel. In future we can do name-heals of dir in parallel, but this is not implemented as of now. Reason for this is mentioned already in '2)' above. AFR/EC can introduce options like max-shd-threads/wait-qlength which can be set by users to increase the rate of heals when they want. Please note that the options will take effect only for the next crawl. BUG: 1221737 Change-Id: I8fc0afc334def87797f6d41e309cefc722a317d2 Signed-off-by: Pranith Kumar K <pkarampu@redhat.com> Reviewed-on: http://review.gluster.org/13569 NetBSD-regression: NetBSD Build System <jenkins@build.gluster.org> CentOS-regression: Gluster Build System <jenkins@build.gluster.com> Reviewed-by: Jeff Darcy <jdarcy@redhat.com> Smoke: Gluster Build System <jenkins@build.gluster.com>
Diffstat (limited to 'libglusterfs/src/syncop-utils.c')
-rw-r--r--libglusterfs/src/syncop-utils.c238
1 files changed, 238 insertions, 0 deletions
diff --git a/libglusterfs/src/syncop-utils.c b/libglusterfs/src/syncop-utils.c
index 4e8849f06f8..7421f81f46c 100644
--- a/libglusterfs/src/syncop-utils.c
+++ b/libglusterfs/src/syncop-utils.c
@@ -9,9 +9,24 @@
*/
#include "syncop.h"
+#include "syncop-utils.h"
#include "common-utils.h"
#include "libglusterfs-messages.h"
+struct syncop_dir_scan_data {
+ xlator_t *subvol;
+ loc_t *parent;
+ void *data;
+ gf_dirent_t *q;
+ gf_dirent_t *entry;
+ pthread_cond_t *cond;
+ pthread_mutex_t *mut;
+ syncop_dir_scan_fn_t fn;
+ uint32_t *jobs_running;
+ uint32_t *qlen;
+ int32_t *retval;
+};
+
int
syncop_dirfd (xlator_t *subvol, loc_t *loc, fd_t **fd, int pid)
{
@@ -219,6 +234,229 @@ out:
return ret;
}
+static void
+_scan_data_destroy (struct syncop_dir_scan_data *data)
+{
+ GF_FREE (data);
+}
+
+static int
+_dir_scan_job_fn_cbk (int ret, call_frame_t *frame, void *opaque)
+{
+ struct syncop_dir_scan_data *scan_data = opaque;
+
+ _scan_data_destroy (scan_data);
+ return 0;
+}
+
+static int
+_dir_scan_job_fn (void *data)
+{
+ struct syncop_dir_scan_data *scan_data = data;
+ gf_dirent_t *entry = NULL;
+ int ret = 0;
+
+ entry = scan_data->entry;
+ scan_data->entry = NULL;
+ do {
+ ret = scan_data->fn (scan_data->subvol, entry,
+ scan_data->parent,
+ scan_data->data);
+ gf_dirent_entry_free (entry);
+ entry = NULL;
+ pthread_mutex_lock (scan_data->mut);
+ {
+ if (ret || list_empty (&scan_data->q->list)) {
+ (*scan_data->jobs_running)--;
+ *scan_data->retval |= ret;
+ pthread_cond_broadcast (scan_data->cond);
+ } else {
+ entry = list_first_entry (&scan_data->q->list,
+ typeof (*scan_data->q), list);
+ list_del_init (&entry->list);
+ (*scan_data->qlen)--;
+ }
+ }
+ pthread_mutex_unlock (scan_data->mut);
+ } while (entry);
+
+ return ret;
+}
+
+static int
+_run_dir_scan_task (xlator_t *subvol, loc_t *parent, gf_dirent_t *q,
+ gf_dirent_t *entry, int *retval, pthread_mutex_t *mut,
+ pthread_cond_t *cond, uint32_t *jobs_running,
+ uint32_t *qlen, syncop_dir_scan_fn_t fn, void *data)
+{
+ int ret = 0;
+ struct syncop_dir_scan_data *scan_data = NULL;
+
+
+ scan_data = GF_CALLOC (1, sizeof (struct syncop_dir_scan_data),
+ gf_common_mt_scan_data);
+ if (!scan_data) {
+ ret = -ENOMEM;
+ goto out;
+ }
+
+ scan_data->subvol = subvol;
+ scan_data->parent = parent;
+ scan_data->data = data;
+ scan_data->mut = mut;
+ scan_data->cond = cond;
+ scan_data->fn = fn;
+ scan_data->jobs_running = jobs_running;
+ scan_data->entry = entry;
+ scan_data->q = q;
+ scan_data->qlen = qlen;
+ scan_data->retval = retval;
+
+ ret = synctask_new (subvol->ctx->env, _dir_scan_job_fn,
+ _dir_scan_job_fn_cbk, NULL, scan_data);
+out:
+ if (ret < 0) {
+ gf_dirent_entry_free (entry);
+ _scan_data_destroy (scan_data);
+ pthread_mutex_lock (mut);
+ {
+ *jobs_running = *jobs_running - 1;
+ }
+ pthread_mutex_unlock (mut);
+ /*No need to cond-broadcast*/
+ }
+ return ret;
+}
+
+int
+syncop_mt_dir_scan (xlator_t *subvol, loc_t *loc, int pid, void *data,
+ syncop_dir_scan_fn_t fn, dict_t *xdata, uint32_t max_jobs,
+ uint32_t max_qlen)
+{
+ fd_t *fd = NULL;
+ uint64_t offset = 0;
+ gf_dirent_t *last = NULL;
+ int ret = 0;
+ int retval = 0;
+ gf_dirent_t q;
+ gf_dirent_t *entry = NULL;
+ gf_dirent_t *tmp = NULL;
+ uint32_t jobs_running = 0;
+ uint32_t qlen = 0;
+ pthread_cond_t cond;
+ pthread_mutex_t mut;
+ gf_boolean_t cond_init = _gf_false;
+ gf_boolean_t mut_init = _gf_false;
+ gf_dirent_t entries;
+
+ /*For this functionality to be implemented in general, we need
+ * synccond_t infra which doesn't block the executing thread. Until then
+ * return failures inside synctask if they use this.*/
+ if (synctask_get())
+ return -ENOTSUP;
+
+ if (max_jobs == 0)
+ return -EINVAL;
+
+ /*Code becomes simpler this way. cond_wait just on qlength.
+ * Little bit of cheating*/
+ if (max_qlen == 0)
+ max_qlen = 1;
+
+ ret = syncop_dirfd (subvol, loc, &fd, pid);
+ if (ret)
+ goto out;
+
+ INIT_LIST_HEAD (&entries.list);
+ INIT_LIST_HEAD (&q.list);
+ ret = pthread_mutex_init (&mut, NULL);
+ if (ret)
+ goto out;
+ mut_init = _gf_true;
+
+ ret = pthread_cond_init (&cond, NULL);
+ if (ret)
+ goto out;
+ cond_init = _gf_true;
+
+ while ((ret = syncop_readdir (subvol, fd, 131072, offset, &entries,
+ xdata, NULL))) {
+ if (ret < 0)
+ break;
+
+ if (ret > 0) {
+ /* If the entries are only '.', and '..' then ret
+ * value will be non-zero. so set it to zero here. */
+ ret = 0;
+ }
+
+ last = list_last_entry (&entries.list, typeof (*last), list);
+ offset = last->d_off;
+
+ list_for_each_entry_safe (entry, tmp, &entries.list, list) {
+ list_del_init (&entry->list);
+ if (!strcmp (entry->d_name, ".") ||
+ !strcmp (entry->d_name, "..")) {
+ gf_dirent_entry_free (entry);
+ continue;
+ }
+
+ if (entry->d_type == IA_IFDIR) {
+ ret = fn (subvol, entry, loc, data);
+ gf_dirent_entry_free (entry);
+ if (ret)
+ break;
+ continue;
+ }
+
+ pthread_mutex_lock (&mut);
+ {
+ while (qlen == max_qlen)
+ pthread_cond_wait (&cond, &mut);
+ if (max_jobs == jobs_running) {
+ list_add_tail (&entry->list, &q.list);
+ qlen++;
+ entry = NULL;
+ } else {
+ jobs_running++;
+ }
+ }
+ pthread_mutex_unlock (&mut);
+ if (retval) /*Any jobs failed?*/
+ break;
+
+ if (!entry)
+ continue;
+
+ ret = _run_dir_scan_task (subvol, loc, &q, entry,
+ &retval, &mut, &cond,
+ &jobs_running, &qlen, fn, data);
+ if (ret)
+ break;
+ }
+ }
+
+out:
+ if (fd)
+ fd_unref (fd);
+ if (mut_init && cond_init) {
+ pthread_mutex_lock (&mut);
+ {
+ while (jobs_running)
+ pthread_cond_wait (&cond, &mut);
+ }
+ pthread_mutex_unlock (&mut);
+ gf_dirent_free (&q);
+ gf_dirent_free (&entries);
+ }
+
+ if (mut_init)
+ pthread_mutex_destroy (&mut);
+ if (cond_init)
+ pthread_cond_destroy (&cond);
+ return ret|retval;
+}
+
int
syncop_dir_scan (xlator_t *subvol, loc_t *loc, int pid, void *data,
int (*fn) (xlator_t *subvol, gf_dirent_t *entry, loc_t *parent,