From 08fadcc2a706342e4eee79dc7d9b48ba01fcb312 Mon Sep 17 00:00:00 2001 From: Krutika Dhananjay Date: Tue, 9 Jan 2018 15:11:00 +0530 Subject: mount/fuse: Add support for multi-threaded fuse readers Usage: Use 'reader-thread-count=' as command line option to set the thread count at the time of mounting the volume. Next task is to make these threads auto-scale based on the load, instead of having the user remount the volume everytime to change the thread count. Updates #412 Change-Id: I94aa1505e5ae6a133683d473e0e4e0edd139b76b Signed-off-by: Krutika Dhananjay --- xlators/mount/fuse/src/fuse-bridge.c | 231 +++++++++++++++++++++++------------ 1 file changed, 150 insertions(+), 81 deletions(-) (limited to 'xlators/mount/fuse/src/fuse-bridge.c') diff --git a/xlators/mount/fuse/src/fuse-bridge.c b/xlators/mount/fuse/src/fuse-bridge.c index dee9b16abf3..9e9fb815080 100644 --- a/xlators/mount/fuse/src/fuse-bridge.c +++ b/xlators/mount/fuse/src/fuse-bridge.c @@ -690,7 +690,8 @@ fuse_lookup_resume (fuse_state_t *state) } static void -fuse_lookup (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_lookup (xlator_t *this, fuse_in_header_t *finh, void *msg, + struct iobuf *iobuf) { char *name = msg; fuse_state_t *state = NULL; @@ -718,7 +719,8 @@ do_forget(xlator_t *this, uint64_t unique, uint64_t nodeid, uint64_t nlookup) } static void -fuse_forget (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_forget (xlator_t *this, fuse_in_header_t *finh, void *msg, + struct iobuf *iobuf) { struct fuse_forget_in *ffi = msg; @@ -739,7 +741,8 @@ fuse_forget (xlator_t *this, fuse_in_header_t *finh, void *msg) #if FUSE_KERNEL_MINOR_VERSION >= 16 static void -fuse_batch_forget(xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_batch_forget(xlator_t *this, fuse_in_header_t *finh, void *msg, + struct iobuf *iobuf) { struct fuse_batch_forget_in *fbfi = msg; struct fuse_forget_one *ffo = (struct fuse_forget_one *) (fbfi + 1); @@ -957,7 +960,8 @@ fuse_getattr_resume (fuse_state_t *state) } static void -fuse_getattr (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_getattr (xlator_t *this, fuse_in_header_t *finh, void *msg, + struct iobuf *iobuf) { #if FUSE_KERNEL_MINOR_VERSION >= 9 struct fuse_getattr_in *fgi = msg; @@ -1287,7 +1291,8 @@ fuse_setattr_resume (fuse_state_t *state) } static void -fuse_setattr (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_setattr (xlator_t *this, fuse_in_header_t *finh, void *msg, + struct iobuf *iobuf) { struct fuse_setattr_in *fsi = msg; @@ -1514,7 +1519,8 @@ fuse_access_resume (fuse_state_t *state) } static void -fuse_access (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_access (xlator_t *this, fuse_in_header_t *finh, void *msg, + struct iobuf *iobuf) { struct fuse_access_in *fai = msg; fuse_state_t *state = NULL; @@ -1588,7 +1594,8 @@ fuse_readlink_resume (fuse_state_t *state) } static void -fuse_readlink (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_readlink (xlator_t *this, fuse_in_header_t *finh, void *msg, + struct iobuf *iobuf) { fuse_state_t *state = NULL; @@ -1638,7 +1645,8 @@ fuse_mknod_resume (fuse_state_t *state) } static void -fuse_mknod (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_mknod (xlator_t *this, fuse_in_header_t *finh, void *msg, + struct iobuf *iobuf) { struct fuse_mknod_in *fmi = msg; char *name = (char *)(fmi + 1); @@ -1708,7 +1716,8 @@ fuse_mkdir_resume (fuse_state_t *state) } static void -fuse_mkdir (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_mkdir (xlator_t *this, fuse_in_header_t *finh, void *msg, + struct iobuf *iobuf) { struct fuse_mkdir_in *fmi = msg; char *name = (char *)(fmi + 1); @@ -1760,7 +1769,8 @@ fuse_unlink_resume (fuse_state_t *state) } static void -fuse_unlink (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_unlink (xlator_t *this, fuse_in_header_t *finh, void *msg, + struct iobuf *iobuf) { char *name = msg; fuse_state_t *state = NULL; @@ -1797,7 +1807,8 @@ fuse_rmdir_resume (fuse_state_t *state) } static void -fuse_rmdir (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_rmdir (xlator_t *this, fuse_in_header_t *finh, void *msg, + struct iobuf *iobuf) { char *name = msg; fuse_state_t *state = NULL; @@ -1847,7 +1858,8 @@ fuse_symlink_resume (fuse_state_t *state) } static void -fuse_symlink (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_symlink (xlator_t *this, fuse_in_header_t *finh, void *msg, + struct iobuf *iobuf) { char *name = msg; char *linkname = name + strlen (name) + 1; @@ -1969,7 +1981,8 @@ fuse_rename_resume (fuse_state_t *state) } static void -fuse_rename (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_rename (xlator_t *this, fuse_in_header_t *finh, void *msg, + struct iobuf *iobuf) { struct fuse_rename_in *fri = msg; char *oldname = (char *)(fri + 1); @@ -2019,7 +2032,8 @@ fuse_link_resume (fuse_state_t *state) } static void -fuse_link (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_link (xlator_t *this, fuse_in_header_t *finh, void *msg, + struct iobuf *iobuf) { struct fuse_link_in *fli = msg; char *name = (char *)(fli + 1); @@ -2208,7 +2222,8 @@ fuse_create_resume (fuse_state_t *state) } static void -fuse_create (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_create (xlator_t *this, fuse_in_header_t *finh, void *msg, + struct iobuf *iobuf) { #if FUSE_KERNEL_MINOR_VERSION >= 12 struct fuse_create_in *fci = msg; @@ -2298,7 +2313,8 @@ fuse_open_resume (fuse_state_t *state) } static void -fuse_open (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_open (xlator_t *this, fuse_in_header_t *finh, void *msg, + struct iobuf *iobuf) { struct fuse_open_in *foi = msg; fuse_state_t *state = NULL; @@ -2375,7 +2391,8 @@ fuse_readv_resume (fuse_state_t *state) } static void -fuse_readv (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_readv (xlator_t *this, fuse_in_header_t *finh, void *msg, + struct iobuf *iobuf) { struct fuse_read_in *fri = msg; @@ -2451,8 +2468,6 @@ void fuse_write_resume (fuse_state_t *state) { struct iobref *iobref = NULL; - struct iobuf *iobuf = NULL; - iobref = iobref_new (); if (!iobref) { @@ -2465,8 +2480,7 @@ fuse_write_resume (fuse_state_t *state) return; } - iobuf = ((fuse_private_t *) (state->this->private))->iobuf; - iobref_add (iobref, iobuf); + iobref_add (iobref, state->iobuf); gf_log ("glusterfs-fuse", GF_LOG_TRACE, "%"PRIu64": WRITE (%p, size=%"GF_PRI_SIZET", offset=%"PRId64")", @@ -2480,7 +2494,8 @@ fuse_write_resume (fuse_state_t *state) } static void -fuse_write (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_write (xlator_t *this, fuse_in_header_t *finh, void *msg, + struct iobuf *iobuf) { /* WRITE is special, metadata is attached to in_header, * and msg is the payload as-is. @@ -2523,6 +2538,7 @@ fuse_write (xlator_t *this, fuse_in_header_t *finh, void *msg) state->vector.iov_base = msg; state->vector.iov_len = fwi->size; + state->iobuf = iobuf; fuse_resolve_and_resume (state, fuse_write_resume); @@ -2561,7 +2577,8 @@ fuse_lseek_resume (fuse_state_t *state) } static void -fuse_lseek (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_lseek (xlator_t *this, fuse_in_header_t *finh, void *msg, + struct iobuf *iobuf) { struct fuse_lseek_in *ffi = msg; fuse_state_t *state = NULL; @@ -2597,7 +2614,8 @@ fuse_flush_resume (fuse_state_t *state) } static void -fuse_flush (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_flush (xlator_t *this, fuse_in_header_t *finh, void *msg, + struct iobuf *iobuf) { struct fuse_flush_in *ffi = msg; @@ -2633,7 +2651,8 @@ fuse_internal_release (xlator_t *this, fd_t *fd) } static void -fuse_release (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_release (xlator_t *this, fuse_in_header_t *finh, void *msg, + struct iobuf *iobuf) { struct fuse_release_in *fri = msg; fd_t *fd = NULL; @@ -2678,7 +2697,8 @@ fuse_fsync_resume (fuse_state_t *state) } static void -fuse_fsync (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_fsync (xlator_t *this, fuse_in_header_t *finh, void *msg, + struct iobuf *iobuf) { struct fuse_fsync_in *fsi = msg; @@ -2748,7 +2768,8 @@ fuse_opendir_resume (fuse_state_t *state) } static void -fuse_opendir (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_opendir (xlator_t *this, fuse_in_header_t *finh, void *msg, + struct iobuf *iobuf) { /* struct fuse_open_in *foi = msg; @@ -2890,7 +2911,8 @@ fuse_readdir_resume (fuse_state_t *state) } static void -fuse_readdir (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_readdir (xlator_t *this, fuse_in_header_t *finh, void *msg, + struct iobuf *iobuf) { struct fuse_read_in *fri = msg; @@ -3041,7 +3063,8 @@ fuse_readdirp_resume (fuse_state_t *state) static void -fuse_readdirp (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_readdirp (xlator_t *this, fuse_in_header_t *finh, void *msg, + struct iobuf *iobuf) { struct fuse_read_in *fri = msg; @@ -3088,7 +3111,8 @@ fuse_fallocate_resume(fuse_state_t *state) } static void -fuse_fallocate(xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_fallocate(xlator_t *this, fuse_in_header_t *finh, void *msg, + struct iobuf *iobuf) { struct fuse_fallocate_in *ffi = msg; fuse_state_t *state = NULL; @@ -3106,7 +3130,8 @@ fuse_fallocate(xlator_t *this, fuse_in_header_t *finh, void *msg) #endif /* FUSE minor version >= 19 */ static void -fuse_releasedir (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_releasedir (xlator_t *this, fuse_in_header_t *finh, void *msg, + struct iobuf *iobuf) { struct fuse_release_in *fri = msg; fuse_state_t *state = NULL; @@ -3147,7 +3172,8 @@ fuse_fsyncdir_resume (fuse_state_t *state) } static void -fuse_fsyncdir (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_fsyncdir (xlator_t *this, fuse_in_header_t *finh, void *msg, + struct iobuf *iobuf) { struct fuse_fsync_in *fsi = msg; @@ -3247,7 +3273,8 @@ fuse_statfs_resume (fuse_state_t *state) static void -fuse_statfs (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_statfs (xlator_t *this, fuse_in_header_t *finh, void *msg, + struct iobuf *iobuf) { fuse_state_t *state = NULL; @@ -3299,7 +3326,8 @@ fuse_setxattr_resume (fuse_state_t *state) static void -fuse_setxattr (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_setxattr (xlator_t *this, fuse_in_header_t *finh, void *msg, + struct iobuf *iobuf) { struct fuse_setxattr_in *fsi = msg; char *name = (char *)(fsi + 1); @@ -3630,7 +3658,8 @@ fuse_getxattr_resume (fuse_state_t *state) static void -fuse_getxattr (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_getxattr (xlator_t *this, fuse_in_header_t *finh, void *msg, + struct iobuf *iobuf) { struct fuse_getxattr_in *fgxi = msg; char *name = (char *)(fgxi + 1); @@ -3736,7 +3765,8 @@ fuse_listxattr_resume (fuse_state_t *state) static void -fuse_listxattr (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_listxattr (xlator_t *this, fuse_in_header_t *finh, void *msg, + struct iobuf *iobuf) { struct fuse_getxattr_in *fgxi = msg; fuse_state_t *state = NULL; @@ -3792,7 +3822,8 @@ fuse_removexattr_resume (fuse_state_t *state) static void -fuse_removexattr (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_removexattr (xlator_t *this, fuse_in_header_t *finh, void *msg, + struct iobuf *iobuf) { char *name = msg; @@ -3891,7 +3922,8 @@ fuse_getlk_resume (fuse_state_t *state) static void -fuse_getlk (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_getlk (xlator_t *this, fuse_in_header_t *finh, void *msg, + struct iobuf *iobuf) { struct fuse_lk_in *fli = msg; @@ -3983,7 +4015,8 @@ fuse_setlk_resume (fuse_state_t *state) static void -fuse_setlk (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_setlk (xlator_t *this, fuse_in_header_t *finh, void *msg, + struct iobuf *iobuf) { struct fuse_lk_in *fli = msg; @@ -4086,7 +4119,8 @@ notify_kernel_loop (void *data) #endif static void -fuse_init (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_init (xlator_t *this, fuse_in_header_t *finh, void *msg, + struct iobuf *iobuf) { struct fuse_init_in *fini = msg; struct fuse_init_out fino = {0,}; @@ -4257,7 +4291,8 @@ fuse_init (xlator_t *this, fuse_in_header_t *finh, void *msg) static void -fuse_enosys (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_enosys (xlator_t *this, fuse_in_header_t *finh, void *msg, + struct iobuf *iobuf) { send_fuse_err (this, finh, ENOSYS); @@ -4266,7 +4301,8 @@ fuse_enosys (xlator_t *this, fuse_in_header_t *finh, void *msg) static void -fuse_destroy (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_destroy (xlator_t *this, fuse_in_header_t *finh, void *msg, + struct iobuf *iobuf) { send_fuse_err (this, finh, 0); @@ -4857,6 +4893,7 @@ fuse_graph_sync (xlator_t *this) new_graph_id = priv->next_graph->id; priv->next_graph = NULL; need_first_lookup = 1; + priv->handle_graph_switch = _gf_true; while (!priv->event_recvd) { ret = pthread_cond_wait (&priv->sync_cond, @@ -4885,6 +4922,8 @@ unlock: { old_subvol->switched = 1; winds_on_old_subvol = old_subvol->winds; + priv->handle_graph_switch = _gf_false; + pthread_cond_broadcast (&priv->migrate_cond); } pthread_mutex_unlock (&priv->sync_mutex); @@ -4892,6 +4931,13 @@ unlock: xlator_notify (old_subvol, GF_EVENT_PARENT_DOWN, old_subvol, NULL); } + } else { + pthread_mutex_lock (&priv->sync_mutex); + { + priv->handle_graph_switch = _gf_false; + pthread_cond_broadcast (&priv->migrate_cond); + } + pthread_mutex_unlock (&priv->sync_mutex); } return 0; @@ -4928,7 +4974,6 @@ fuse_thread_proc (void *data) const size_t msg0_size = sizeof (*finh) + 128; fuse_handler_t **fuse_ops = NULL; struct pollfd pfd[2] = {{0,}}; - gf_boolean_t mount_finished = _gf_false; this = data; priv = this->private; @@ -4945,32 +4990,40 @@ fuse_thread_proc (void *data) /* THIS has to be reset here */ THIS = this; - if (!mount_finished) { - memset(pfd,0,sizeof(pfd)); - pfd[0].fd = priv->status_pipe[0]; - pfd[0].events = POLLIN | POLLHUP | POLLERR; - pfd[1].fd = priv->fd; - pfd[1].events = POLLIN | POLLHUP | POLLERR; - if (poll(pfd,2,-1) < 0) { - gf_log (this->name, GF_LOG_ERROR, - "poll error %s", strerror(errno)); - break; - } - if (pfd[0].revents & POLLIN) { - if (fuse_get_mount_status(this) != 0) { + pthread_mutex_lock (&priv->sync_mutex); + { + if (!priv->mount_finished) { + memset(pfd, 0, sizeof(pfd)); + pfd[0].fd = priv->status_pipe[0]; + pfd[0].events = POLLIN | POLLHUP | POLLERR; + pfd[1].fd = priv->fd; + pfd[1].events = POLLIN | POLLHUP | POLLERR; + if (poll(pfd, 2, -1) < 0) { + gf_log (this->name, GF_LOG_ERROR, + "poll error %s", + strerror(errno)); + pthread_mutex_unlock (&priv->sync_mutex); break; } - mount_finished = _gf_true; - } - else if (pfd[0].revents) { - gf_log (this->name, GF_LOG_ERROR, - "mount pipe closed without status"); - break; - } - if (!pfd[1].revents) { - continue; + if (pfd[0].revents & POLLIN) { + if (fuse_get_mount_status(this) != 0) { + pthread_mutex_unlock (&priv->sync_mutex); + break; + } + priv->mount_finished = _gf_true; + } else if (pfd[0].revents) { + gf_log (this->name, GF_LOG_ERROR, + "mount pipe closed without status"); + pthread_mutex_unlock (&priv->sync_mutex); + break; + } + if (!pfd[1].revents) { + pthread_mutex_unlock (&priv->sync_mutex); + continue; + } } } + pthread_mutex_unlock (&priv->sync_mutex); /* * We don't want to block on readv while we're still waiting @@ -5065,8 +5118,6 @@ fuse_thread_proc (void *data) break; } - priv->iobuf = iobuf; - /* * This can be moved around a bit, but it's important to do it * *after* the readv. Otherwise, a graph switch could occur @@ -5109,9 +5160,9 @@ fuse_thread_proc (void *data) if (finh->opcode >= FUSE_OP_HIGH) /* turn down MacFUSE specific messages */ - fuse_enosys (this, finh, msg); + fuse_enosys (this, finh, msg, NULL); else - fuse_ops[finh->opcode] (this, finh, msg); + fuse_ops[finh->opcode] (this, finh, msg, iobuf); iobuf_unref (iobuf); continue; @@ -5183,8 +5234,6 @@ fuse_priv_dump (xlator_t *this) private->volfile_size); gf_proc_dump_write("mount_point", "%s", private->mount_point); - gf_proc_dump_write("iobuf", "%u", - private->iobuf); gf_proc_dump_write("fuse_thread_started", "%d", (int)private->fuse_thread_started); gf_proc_dump_write("direct_io_mode", "%d", @@ -5310,6 +5359,7 @@ unlock: int notify (xlator_t *this, int32_t event, void *data, ...) { + int i = 0; int32_t ret = 0; fuse_private_t *private = NULL; gf_boolean_t start_thread = _gf_false; @@ -5358,14 +5408,21 @@ notify (xlator_t *this, int32_t event, void *data, ...) pthread_mutex_unlock (&private->sync_mutex); if (start_thread) { - ret = gf_thread_create (&private->fuse_thread, NULL, - fuse_thread_proc, this, - "fuseproc"); - if (ret != 0) { - gf_log (this->name, GF_LOG_DEBUG, - "pthread_create() failed (%s)", - strerror (errno)); - break; + private->fuse_thread = GF_CALLOC (private->reader_thread_count, + sizeof (pthread_t), + gf_fuse_mt_pthread_t); + for (i = 0; i < private->reader_thread_count; i++) { + + ret = gf_thread_create (&private->fuse_thread[i], + NULL, + fuse_thread_proc, this, + "fuseproc"); + if (ret != 0) { + gf_log (this->name, GF_LOG_DEBUG, + "pthread_create() failed (%s)", + strerror (errno)); + break; + } } } @@ -5472,7 +5529,8 @@ static fuse_handler_t *fuse_dump_ops[FUSE_OP_HIGH]; static void -fuse_dumper (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_dumper (xlator_t *this, fuse_in_header_t *finh, void *msg, + struct iobuf *iobuf) { fuse_private_t *priv = NULL; struct iovec diov[6] = {{0,},}; @@ -5504,7 +5562,7 @@ fuse_dumper (xlator_t *this, fuse_in_header_t *finh, void *msg) "failed to dump fuse message (R): %s", strerror (errno)); - priv->fuse_ops0[finh->opcode] (this, finh, msg); + priv->fuse_ops0[finh->opcode] (this, finh, msg, NULL); } @@ -5609,6 +5667,9 @@ init (xlator_t *this_xl) GF_OPTION_INIT (ZR_ATTR_TIMEOUT_OPT, priv->attribute_timeout, double, cleanup_exit); + GF_OPTION_INIT ("reader-thread-count", priv->reader_thread_count, uint32, + cleanup_exit); + GF_OPTION_INIT (ZR_ENTRY_TIMEOUT_OPT, priv->entry_timeout, double, cleanup_exit); @@ -5830,6 +5891,7 @@ init (xlator_t *this_xl) pthread_mutex_init (&priv->fuse_dump_mutex, NULL); pthread_cond_init (&priv->sync_cond, NULL); + pthread_cond_init (&priv->migrate_cond, NULL); pthread_mutex_init (&priv->sync_mutex, NULL); priv->event_recvd = 0; @@ -6034,5 +6096,12 @@ struct volume_options options[] = { .default_value = "false", .description = "Enables thin mount and connects via gfproxyd daemon.", }, + { .key = {"reader-thread-count"}, + .type = GF_OPTION_TYPE_INT, + .default_value = "4", + .min = 1, + .max = 64, + .description = "Sets fuse reader thread count.", + }, { .key = {NULL} }, }; -- cgit