summaryrefslogtreecommitdiffstats
path: root/xlators/performance/write-behind
diff options
context:
space:
mode:
authorVikas Gorur <vikas@zresearch.com>2009-02-18 17:36:07 +0530
committerVikas Gorur <vikas@zresearch.com>2009-02-18 17:36:07 +0530
commit77adf4cd648dce41f89469dd185deec6b6b53a0b (patch)
tree02e155a5753b398ee572b45793f889b538efab6b /xlators/performance/write-behind
parentf3b2e6580e5663292ee113c741343c8a43ee133f (diff)
Added all files
Diffstat (limited to 'xlators/performance/write-behind')
-rw-r--r--xlators/performance/write-behind/Makefile.am3
-rw-r--r--xlators/performance/write-behind/src/Makefile.am12
-rw-r--r--xlators/performance/write-behind/src/write-behind.c1444
3 files changed, 1459 insertions, 0 deletions
diff --git a/xlators/performance/write-behind/Makefile.am b/xlators/performance/write-behind/Makefile.am
new file mode 100644
index 00000000000..d471a3f9243
--- /dev/null
+++ b/xlators/performance/write-behind/Makefile.am
@@ -0,0 +1,3 @@
+SUBDIRS = src
+
+CLEANFILES =
diff --git a/xlators/performance/write-behind/src/Makefile.am b/xlators/performance/write-behind/src/Makefile.am
new file mode 100644
index 00000000000..f800abad50d
--- /dev/null
+++ b/xlators/performance/write-behind/src/Makefile.am
@@ -0,0 +1,12 @@
+xlator_LTLIBRARIES = write-behind.la
+xlatordir = $(libdir)/glusterfs/$(PACKAGE_VERSION)/xlator/performance
+
+write_behind_la_LDFLAGS = -module -avoidversion
+
+write_behind_la_SOURCES = write-behind.c
+write_behind_la_LIBADD = $(top_builddir)/libglusterfs/src/libglusterfs.la
+
+AM_CFLAGS = -fPIC -D_FILE_OFFSET_BITS=64 -D_GNU_SOURCE -Wall -D$(GF_HOST_OS)\
+ -I$(top_srcdir)/libglusterfs/src -shared -nostartfiles $(GF_CFLAGS)
+
+CLEANFILES =
diff --git a/xlators/performance/write-behind/src/write-behind.c b/xlators/performance/write-behind/src/write-behind.c
new file mode 100644
index 00000000000..04a447d49e9
--- /dev/null
+++ b/xlators/performance/write-behind/src/write-behind.c
@@ -0,0 +1,1444 @@
+/*
+ Copyright (c) 2006, 2007, 2008, 2009 Z RESEARCH, Inc. <http://www.zresearch.com>
+ This file is part of GlusterFS.
+
+ GlusterFS is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published
+ by the Free Software Foundation; either version 3 of the License,
+ or (at your option) any later version.
+
+ GlusterFS is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see
+ <http://www.gnu.org/licenses/>.
+*/
+
+/*TODO: check for non null wb_file_data before getting wb_file */
+
+
+#ifndef _CONFIG_H
+#define _CONFIG_H
+#include "config.h"
+#endif
+
+#include "glusterfs.h"
+#include "logging.h"
+#include "dict.h"
+#include "xlator.h"
+#include "list.h"
+#include "compat.h"
+#include "compat-errno.h"
+#include "common-utils.h"
+
+#define MAX_VECTOR_COUNT 8
+
+typedef struct list_head list_head_t;
+struct wb_conf;
+struct wb_page;
+struct wb_file;
+
+
+struct wb_conf {
+ uint64_t aggregate_size;
+ uint64_t window_size;
+ uint64_t disable_till;
+ gf_boolean_t enable_O_SYNC;
+ gf_boolean_t flush_behind;
+};
+
+
+typedef struct wb_local {
+ list_head_t winds;
+ struct wb_file *file;
+ list_head_t unwind_frames;
+ int op_ret;
+ int op_errno;
+ call_frame_t *frame;
+} wb_local_t;
+
+
+typedef struct write_request {
+ call_frame_t *frame;
+ off_t offset;
+ /* int32_t op_ret;
+ int32_t op_errno; */
+ struct iovec *vector;
+ int32_t count;
+ dict_t *refs;
+ char write_behind;
+ char stack_wound;
+ char got_reply;
+ list_head_t list;
+ list_head_t winds;
+ /* list_head_t unwinds;*/
+} wb_write_request_t;
+
+
+struct wb_file {
+ int disabled;
+ uint64_t disable_till;
+ off_t offset;
+ size_t window_size;
+ int32_t refcount;
+ int32_t op_ret;
+ int32_t op_errno;
+ list_head_t request;
+ fd_t *fd;
+ gf_lock_t lock;
+ xlator_t *this;
+};
+
+
+typedef struct wb_conf wb_conf_t;
+typedef struct wb_page wb_page_t;
+typedef struct wb_file wb_file_t;
+
+
+int32_t
+wb_process_queue (call_frame_t *frame, wb_file_t *file, char flush_all);
+
+int32_t
+wb_sync (call_frame_t *frame, wb_file_t *file, list_head_t *winds);
+
+int32_t
+wb_sync_all (call_frame_t *frame, wb_file_t *file);
+
+int32_t
+__wb_mark_winds (list_head_t *list, list_head_t *winds, size_t aggregate_size);
+
+
+wb_file_t *
+wb_file_create (xlator_t *this,
+ fd_t *fd)
+{
+ wb_file_t *file = NULL;
+ wb_conf_t *conf = this->private;
+
+ file = CALLOC (1, sizeof (*file));
+ INIT_LIST_HEAD (&file->request);
+
+ /* fd_ref() not required, file should never decide the existance of
+ * an fd */
+ file->fd= fd;
+ file->disable_till = conf->disable_till;
+ file->this = this;
+ file->refcount = 1;
+
+ fd_ctx_set (fd, this, (uint64_t)(long)file);
+
+ return file;
+}
+
+void
+wb_file_destroy (wb_file_t *file)
+{
+ int32_t refcount = 0;
+
+ LOCK (&file->lock);
+ {
+ refcount = --file->refcount;
+ }
+ UNLOCK (&file->lock);
+
+ if (!refcount){
+ LOCK_DESTROY (&file->lock);
+ FREE (file);
+ }
+
+ return;
+}
+
+
+int32_t
+wb_sync_cbk (call_frame_t *frame,
+ void *cookie,
+ xlator_t *this,
+ int32_t op_ret,
+ int32_t op_errno,
+ struct stat *stbuf)
+{
+ wb_local_t *local = NULL;
+ list_head_t *winds = NULL;
+ wb_file_t *file = NULL;
+ wb_write_request_t *request = NULL, *dummy = NULL;
+
+ local = frame->local;
+ winds = &local->winds;
+ file = local->file;
+
+ LOCK (&file->lock);
+ {
+ list_for_each_entry_safe (request, dummy, winds, winds) {
+ request->got_reply = 1;
+ if (!request->write_behind && (op_ret == -1)) {
+ wb_local_t *per_request_local = request->frame->local;
+ per_request_local->op_ret = op_ret;
+ per_request_local->op_errno = op_errno;
+ }
+
+ /*
+ request->op_ret = op_ret;
+ request->op_errno = op_errno;
+ */
+ }
+ }
+ UNLOCK (&file->lock);
+
+ if (op_ret == -1)
+ {
+ file->op_ret = op_ret;
+ file->op_errno = op_errno;
+ }
+
+ wb_process_queue (frame, file, 0);
+
+ /* safe place to do fd_unref */
+ fd_unref (file->fd);
+
+ STACK_DESTROY (frame->root);
+
+ return 0;
+}
+
+int32_t
+wb_sync_all (call_frame_t *frame, wb_file_t *file)
+{
+ list_head_t winds;
+ int32_t bytes = 0;
+
+ INIT_LIST_HEAD (&winds);
+
+ LOCK (&file->lock);
+ {
+ bytes = __wb_mark_winds (&file->request, &winds, 0);
+ }
+ UNLOCK (&file->lock);
+
+ wb_sync (frame, file, &winds);
+
+ return bytes;
+}
+
+
+int32_t
+wb_sync (call_frame_t *frame, wb_file_t *file, list_head_t *winds)
+{
+ wb_write_request_t *dummy = NULL, *request = NULL, *first_request = NULL, *next = NULL;
+ size_t total_count = 0, count = 0;
+ size_t copied = 0;
+ call_frame_t *sync_frame = NULL;
+ dict_t *refs = NULL;
+ wb_local_t *local = NULL;
+ struct iovec *vector = NULL;
+ int32_t bytes = 0;
+ size_t bytecount = 0;
+
+ list_for_each_entry (request, winds, winds)
+ {
+ total_count += request->count;
+ bytes += iov_length (request->vector, request->count);
+ }
+
+ if (!total_count) {
+ return 0;
+ }
+
+ list_for_each_entry_safe (request, dummy, winds, winds) {
+ if (!vector) {
+ vector = MALLOC (VECTORSIZE (MAX_VECTOR_COUNT));
+ refs = get_new_dict ();
+
+ local = CALLOC (1, sizeof (*local));
+ INIT_LIST_HEAD (&local->winds);
+
+ first_request = request;
+ }
+
+ count += request->count;
+ bytecount = VECTORSIZE (request->count);
+ memcpy (((char *)vector)+copied,
+ request->vector,
+ bytecount);
+ copied += bytecount;
+
+ if (request->refs) {
+ dict_copy (request->refs, refs);
+ }
+
+ next = NULL;
+ if (request->winds.next != winds) {
+ next = list_entry (request->winds.next, struct write_request, winds);
+ }
+
+ list_del_init (&request->winds);
+ list_add_tail (&request->winds, &local->winds);
+
+ if (!next || ((count + next->count) > MAX_VECTOR_COUNT)) {
+ sync_frame = copy_frame (frame);
+ sync_frame->local = local;
+ local->file = file;
+ sync_frame->root->req_refs = dict_ref (refs);
+ fd_ref (file->fd);
+ STACK_WIND (sync_frame,
+ wb_sync_cbk,
+ FIRST_CHILD(sync_frame->this),
+ FIRST_CHILD(sync_frame->this)->fops->writev,
+ file->fd, vector,
+ count, first_request->offset);
+
+ dict_unref (refs);
+ FREE (vector);
+ first_request = NULL;
+ refs = NULL;
+ vector = NULL;
+ copied = count = 0;
+ }
+ }
+
+ return bytes;
+}
+
+
+int32_t
+wb_stat_cbk (call_frame_t *frame,
+ void *cookie,
+ xlator_t *this,
+ int32_t op_ret,
+ int32_t op_errno,
+ struct stat *buf)
+{
+ wb_local_t *local = NULL;
+
+ local = frame->local;
+
+ if (local->file)
+ fd_unref (local->file->fd);
+
+ STACK_UNWIND (frame, op_ret, op_errno, buf);
+
+ return 0;
+}
+
+
+int32_t
+wb_stat (call_frame_t *frame,
+ xlator_t *this,
+ loc_t *loc)
+{
+ wb_file_t *file = NULL;
+ fd_t *iter_fd = NULL;
+ wb_local_t *local = NULL;
+ uint64_t tmp_file = 0;
+
+ if (loc->inode)
+ {
+ iter_fd = fd_lookup (loc->inode, frame->root->pid);
+ if (iter_fd) {
+ if (!fd_ctx_get (iter_fd, this, &tmp_file)) {
+ file = (wb_file_t *)(long)tmp_file;
+ } else {
+ fd_unref (iter_fd);
+ }
+ }
+ if (file) {
+ wb_sync_all (frame, file);
+ }
+ }
+
+ local = CALLOC (1, sizeof (*local));
+ local->file = file;
+
+ frame->local = local;
+
+ STACK_WIND (frame, wb_stat_cbk,
+ FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->stat,
+ loc);
+ return 0;
+}
+
+
+int32_t
+wb_fstat (call_frame_t *frame,
+ xlator_t *this,
+ fd_t *fd)
+{
+ wb_file_t *file = NULL;
+ wb_local_t *local = NULL;
+ uint64_t tmp_file = 0;
+
+ if (fd_ctx_get (fd, this, &tmp_file)) {
+ gf_log (this->name, GF_LOG_ERROR, "returning EBADFD");
+ STACK_UNWIND (frame, -1, EBADFD, NULL);
+ return 0;
+ }
+
+ file = (wb_file_t *)(long)tmp_file;
+ if (file) {
+ fd_ref (file->fd);
+ wb_sync_all (frame, file);
+ }
+
+ local = CALLOC (1, sizeof (*local));
+ local->file = file;
+
+ frame->local = local;
+
+ STACK_WIND (frame,
+ wb_stat_cbk,
+ FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->fstat,
+ fd);
+ return 0;
+}
+
+
+int32_t
+wb_truncate_cbk (call_frame_t *frame,
+ void *cookie,
+ xlator_t *this,
+ int32_t op_ret,
+ int32_t op_errno,
+ struct stat *buf)
+{
+ wb_local_t *local = NULL;
+
+ local = frame->local;
+ if (local->file)
+ fd_unref (local->file->fd);
+
+ STACK_UNWIND (frame, op_ret, op_errno, buf);
+ return 0;
+}
+
+
+int32_t
+wb_truncate (call_frame_t *frame,
+ xlator_t *this,
+ loc_t *loc,
+ off_t offset)
+{
+ wb_file_t *file = NULL;
+ fd_t *iter_fd = NULL;
+ wb_local_t *local = NULL;
+ uint64_t tmp_file = 0;
+
+ if (loc->inode)
+ {
+ iter_fd = fd_lookup (loc->inode, frame->root->pid);
+ if (iter_fd) {
+ if (!fd_ctx_get (iter_fd, this, &tmp_file)){
+ file = (wb_file_t *)(long)tmp_file;
+ } else {
+ fd_unref (iter_fd);
+ }
+ }
+
+ if (file)
+ {
+ wb_sync_all (frame, file);
+ }
+ }
+
+ local = CALLOC (1, sizeof (*local));
+ local->file = file;
+
+ frame->local = local;
+
+ STACK_WIND (frame,
+ wb_truncate_cbk,
+ FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->truncate,
+ loc,
+ offset);
+ return 0;
+}
+
+
+int32_t
+wb_ftruncate (call_frame_t *frame,
+ xlator_t *this,
+ fd_t *fd,
+ off_t offset)
+{
+ wb_file_t *file = NULL;
+ wb_local_t *local = NULL;
+ uint64_t tmp_file = 0;
+
+ if (fd_ctx_get (fd, this, &tmp_file)) {
+ gf_log (this->name, GF_LOG_ERROR, "returning EBADFD");
+ STACK_UNWIND (frame, -1, EBADFD, NULL);
+ return 0;
+ }
+
+ file = (wb_file_t *)(long)tmp_file;
+ if (file)
+ wb_sync_all (frame, file);
+
+ local = CALLOC (1, sizeof (*local));
+ local->file = file;
+
+ if (file)
+ fd_ref (file->fd);
+
+ frame->local = local;
+
+ STACK_WIND (frame,
+ wb_truncate_cbk,
+ FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->ftruncate,
+ fd,
+ offset);
+ return 0;
+}
+
+
+int32_t
+wb_utimens_cbk (call_frame_t *frame,
+ void *cookie,
+ xlator_t *this,
+ int32_t op_ret,
+ int32_t op_errno,
+ struct stat *buf)
+{
+ wb_local_t *local = NULL;
+
+ local = frame->local;
+ if (local->file)
+ fd_unref (local->file->fd);
+
+ STACK_UNWIND (frame, op_ret, op_errno, buf);
+ return 0;
+}
+
+
+int32_t
+wb_utimens (call_frame_t *frame,
+ xlator_t *this,
+ loc_t *loc,
+ struct timespec tv[2])
+{
+ wb_file_t *file = NULL;
+ fd_t *iter_fd = NULL;
+ wb_local_t *local = NULL;
+ uint64_t tmp_file = 0;
+
+ if (loc->inode) {
+ iter_fd = fd_lookup (loc->inode, frame->root->pid);
+ if (iter_fd) {
+ if (!fd_ctx_get (iter_fd, this, &tmp_file)) {
+ file = (wb_file_t *)(long)tmp_file;
+ } else {
+ fd_unref (iter_fd);
+ }
+ }
+
+ if (file)
+ wb_sync_all (frame, file);
+ }
+
+ local = CALLOC (1, sizeof (*local));
+ local->file = file;
+
+ frame->local = local;
+
+ STACK_WIND (frame,
+ wb_utimens_cbk,
+ FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->utimens,
+ loc,
+ tv);
+ return 0;
+}
+
+int32_t
+wb_open_cbk (call_frame_t *frame,
+ void *cookie,
+ xlator_t *this,
+ int32_t op_ret,
+ int32_t op_errno,
+ fd_t *fd)
+{
+ int32_t flags = 0;
+ wb_file_t *file = NULL;
+ wb_conf_t *conf = this->private;
+
+ if (op_ret != -1)
+ {
+ file = wb_file_create (this, fd);
+
+ /* If mandatory locking has been enabled on this file,
+ we disable caching on it */
+
+ if ((fd->inode->st_mode & S_ISGID) && !(fd->inode->st_mode & S_IXGRP))
+ file->disabled = 1;
+
+ /* If O_DIRECT then, we disable chaching */
+ if (frame->local)
+ {
+ flags = *((int32_t *)frame->local);
+ if (((flags & O_DIRECT) == O_DIRECT) ||
+ ((flags & O_RDONLY) == O_RDONLY) ||
+ (((flags & O_SYNC) == O_SYNC) &&
+ conf->enable_O_SYNC == _gf_true)) {
+ file->disabled = 1;
+ }
+ }
+
+ LOCK_INIT (&file->lock);
+ }
+
+ STACK_UNWIND (frame, op_ret, op_errno, fd);
+ return 0;
+}
+
+
+int32_t
+wb_open (call_frame_t *frame,
+ xlator_t *this,
+ loc_t *loc,
+ int32_t flags,
+ fd_t *fd)
+{
+ frame->local = CALLOC (1, sizeof(int32_t));
+ *((int32_t *)frame->local) = flags;
+
+ STACK_WIND (frame,
+ wb_open_cbk,
+ FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->open,
+ loc, flags, fd);
+ return 0;
+}
+
+
+int32_t
+wb_create_cbk (call_frame_t *frame,
+ void *cookie,
+ xlator_t *this,
+ int32_t op_ret,
+ int32_t op_errno,
+ fd_t *fd,
+ inode_t *inode,
+ struct stat *buf)
+{
+ wb_file_t *file = NULL;
+
+ if (op_ret != -1)
+ {
+ file = wb_file_create (this, fd);
+ /*
+ * If mandatory locking has been enabled on this file,
+ * we disable caching on it
+ */
+ if ((fd->inode->st_mode & S_ISGID) &&
+ !(fd->inode->st_mode & S_IXGRP))
+ {
+ file->disabled = 1;
+ }
+
+ LOCK_INIT (&file->lock);
+ }
+
+ STACK_UNWIND (frame, op_ret, op_errno, fd, inode, buf);
+ return 0;
+}
+
+
+int32_t
+wb_create (call_frame_t *frame,
+ xlator_t *this,
+ loc_t *loc,
+ int32_t flags,
+ mode_t mode,
+ fd_t *fd)
+{
+ STACK_WIND (frame,
+ wb_create_cbk,
+ FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->create,
+ loc, flags, mode, fd);
+ return 0;
+}
+
+
+int32_t
+__wb_cleanup_queue (wb_file_t *file)
+{
+ wb_write_request_t *request = NULL, *dummy = NULL;
+ int32_t bytes = 0;
+
+ list_for_each_entry_safe (request, dummy, &file->request, list)
+ {
+ if (request->got_reply && request->write_behind)
+ {
+ bytes += iov_length (request->vector, request->count);
+ list_del_init (&request->list);
+
+ FREE (request->vector);
+ dict_unref (request->refs);
+
+ FREE (request);
+ }
+ }
+
+ return bytes;
+}
+
+
+int32_t
+__wb_mark_wind_all (list_head_t *list, list_head_t *winds)
+{
+ wb_write_request_t *request = NULL;
+ size_t size = 0;
+
+ list_for_each_entry (request, list, list)
+ {
+ if (!request->stack_wound)
+ {
+ size += iov_length (request->vector, request->count);
+ request->stack_wound = 1;
+ list_add_tail (&request->winds, winds);
+ }
+ }
+
+ return size;
+}
+
+
+size_t
+__wb_get_aggregate_size (list_head_t *list)
+{
+ wb_write_request_t *request = NULL;
+ size_t size = 0;
+
+ list_for_each_entry (request, list, list)
+ {
+ if (!request->stack_wound)
+ {
+ size += iov_length (request->vector, request->count);
+ }
+ }
+
+ return size;
+}
+
+uint32_t
+__wb_get_incomplete_writes (list_head_t *list)
+{
+ wb_write_request_t *request = NULL;
+ uint32_t count = 0;
+
+ list_for_each_entry (request, list, list)
+ {
+ if (request->stack_wound && !request->got_reply)
+ {
+ count++;
+ }
+ }
+
+ return count;
+}
+
+int32_t
+__wb_mark_winds (list_head_t *list, list_head_t *winds, size_t aggregate_conf)
+{
+ size_t aggregate_current = 0;
+ uint32_t incomplete_writes = 0;
+
+ incomplete_writes = __wb_get_incomplete_writes (list);
+
+ aggregate_current = __wb_get_aggregate_size (list);
+
+ if ((incomplete_writes == 0) || (aggregate_current >= aggregate_conf))
+ {
+ __wb_mark_wind_all (list, winds);
+ }
+
+ return aggregate_current;
+}
+
+
+size_t
+__wb_get_window_size (list_head_t *list)
+{
+ wb_write_request_t *request = NULL;
+ size_t size = 0;
+
+ list_for_each_entry (request, list, list)
+ {
+ if (request->write_behind && !request->got_reply)
+ {
+ size += iov_length (request->vector, request->count);
+ }
+ }
+
+ return size;
+}
+
+
+size_t
+__wb_mark_unwind_till (list_head_t *list, list_head_t *unwinds, size_t size)
+{
+ size_t written_behind = 0;
+ wb_write_request_t *request = NULL;
+
+ list_for_each_entry (request, list, list)
+ {
+ if (written_behind <= size)
+ {
+ if (!request->write_behind)
+ {
+ wb_local_t *local = request->frame->local;
+ written_behind += iov_length (request->vector, request->count);
+ request->write_behind = 1;
+ list_add_tail (&local->unwind_frames, unwinds);
+ }
+ }
+ else
+ {
+ break;
+ }
+ }
+
+ return written_behind;
+}
+
+
+int32_t
+__wb_mark_unwinds (list_head_t *list, list_head_t *unwinds, size_t window_conf)
+{
+ size_t window_current = 0;
+
+ window_current = __wb_get_window_size (list);
+ if (window_current <= window_conf)
+ {
+ window_current += __wb_mark_unwind_till (list, unwinds,
+ window_conf - window_current);
+ }
+
+ return window_current;
+}
+
+
+int32_t
+wb_stack_unwind (list_head_t *unwinds)
+{
+ struct stat buf = {0,};
+ wb_local_t *local = NULL, *dummy = NULL;
+
+ list_for_each_entry_safe (local, dummy, unwinds, unwind_frames)
+ {
+ list_del_init (&local->unwind_frames);
+ STACK_UNWIND (local->frame, local->op_ret, local->op_errno, &buf);
+ }
+
+ return 0;
+}
+
+
+int32_t
+wb_do_ops (call_frame_t *frame, wb_file_t *file, list_head_t *winds, list_head_t *unwinds)
+{
+ /* copy the frame before calling wb_stack_unwind, since this request containing current frame might get unwound */
+ /* call_frame_t *sync_frame = copy_frame (frame); */
+
+ wb_stack_unwind (unwinds);
+ wb_sync (frame, file, winds);
+
+ return 0;
+}
+
+
+int32_t
+wb_process_queue (call_frame_t *frame, wb_file_t *file, char flush_all)
+{
+ list_head_t winds, unwinds;
+ size_t size = 0;
+ wb_conf_t *conf = file->this->private;
+
+ INIT_LIST_HEAD (&winds);
+ INIT_LIST_HEAD (&unwinds);
+
+ if (!file)
+ {
+ return -1;
+ }
+
+ size = flush_all ? 0 : conf->aggregate_size;
+ LOCK (&file->lock);
+ {
+ __wb_cleanup_queue (file);
+ __wb_mark_winds (&file->request, &winds, size);
+ __wb_mark_unwinds (&file->request, &unwinds, conf->window_size);
+ }
+ UNLOCK (&file->lock);
+
+ wb_do_ops (frame, file, &winds, &unwinds);
+ return 0;
+}
+
+
+wb_write_request_t *
+wb_enqueue (wb_file_t *file,
+ call_frame_t *frame,
+ struct iovec *vector,
+ int32_t count,
+ off_t offset)
+{
+ wb_write_request_t *request = NULL;
+ wb_local_t *local = CALLOC (1, sizeof (*local));
+
+ request = CALLOC (1, sizeof (*request));
+
+ INIT_LIST_HEAD (&request->list);
+ INIT_LIST_HEAD (&request->winds);
+
+ request->frame = frame;
+ request->vector = iov_dup (vector, count);
+ request->count = count;
+ request->offset = offset;
+ request->refs = dict_ref (frame->root->req_refs);
+
+ frame->local = local;
+ local->frame = frame;
+ local->op_ret = iov_length (vector, count);
+ local->op_errno = 0;
+ INIT_LIST_HEAD (&local->unwind_frames);
+
+ LOCK (&file->lock);
+ {
+ list_add_tail (&request->list, &file->request);
+ file->offset = offset + iov_length (vector, count);
+ }
+ UNLOCK (&file->lock);
+
+ return request;
+}
+
+
+int32_t
+wb_writev_cbk (call_frame_t *frame,
+ void *cookie,
+ xlator_t *this,
+ int32_t op_ret,
+ int32_t op_errno,
+ struct stat *stbuf)
+{
+ STACK_UNWIND (frame, op_ret, op_errno, stbuf);
+ return 0;
+}
+
+
+int32_t
+wb_writev (call_frame_t *frame,
+ xlator_t *this,
+ fd_t *fd,
+ struct iovec *vector,
+ int32_t count,
+ off_t offset)
+{
+ wb_file_t *file = NULL;
+ char offset_expected = 1, wb_disabled = 0;
+ call_frame_t *process_frame = NULL;
+ size_t size = 0;
+ uint64_t tmp_file = 0;
+
+ if (vector != NULL)
+ size = iov_length (vector, count);
+
+ if (fd_ctx_get (fd, this, &tmp_file)) {
+ gf_log (this->name, GF_LOG_ERROR, "returning EBADFD");
+ STACK_UNWIND (frame, -1, EBADFD, NULL);
+ return 0;
+ }
+
+ file = (wb_file_t *)(long)tmp_file;
+ if (!file) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "wb_file not found for fd %p", fd);
+ STACK_UNWIND (frame, -1, EBADFD, NULL);
+ return 0;
+ }
+
+ LOCK (&file->lock);
+ {
+ if (file->disabled || file->disable_till) {
+ if (size > file->disable_till) {
+ file->disable_till = 0;
+ } else {
+ file->disable_till -= size;
+ }
+ wb_disabled = 1;
+ }
+
+ if (file->offset != offset)
+ offset_expected = 0;
+ }
+ UNLOCK (&file->lock);
+
+ if (wb_disabled) {
+ STACK_WIND (frame,
+ wb_writev_cbk,
+ FIRST_CHILD (frame->this),
+ FIRST_CHILD (frame->this)->fops->writev,
+ file->fd,
+ vector,
+ count,
+ offset);
+ return 0;
+ }
+
+ process_frame = copy_frame (frame);
+
+ if (!offset_expected)
+ wb_process_queue (process_frame, file, 1);
+
+ wb_enqueue (file, frame, vector, count, offset);
+ wb_process_queue (process_frame, file, 0);
+
+ STACK_DESTROY (process_frame->root);
+
+ return 0;
+}
+
+
+int32_t
+wb_readv_cbk (call_frame_t *frame,
+ void *cookie,
+ xlator_t *this,
+ int32_t op_ret,
+ int32_t op_errno,
+ struct iovec *vector,
+ int32_t count,
+ struct stat *stbuf)
+{
+ wb_local_t *local = NULL;
+
+ local = frame->local;
+
+ STACK_UNWIND (frame, op_ret, op_errno, vector, count, stbuf);
+ return 0;
+}
+
+
+int32_t
+wb_readv (call_frame_t *frame,
+ xlator_t *this,
+ fd_t *fd,
+ size_t size,
+ off_t offset)
+{
+ wb_file_t *file = NULL;
+ wb_local_t *local = NULL;
+ uint64_t tmp_file = 0;
+
+ if (fd_ctx_get (fd, this, &tmp_file)) {
+ gf_log (this->name, GF_LOG_ERROR, "returning EBADFD");
+ STACK_UNWIND (frame, -1, EBADFD, NULL);
+ return 0;
+ }
+
+ file = (wb_file_t *)(long)tmp_file;
+ if (file)
+ wb_sync_all (frame, file);
+
+ local = CALLOC (1, sizeof (*local));
+ local->file = file;
+
+ frame->local = local;
+
+ STACK_WIND (frame,
+ wb_readv_cbk,
+ FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->readv,
+ fd, size, offset);
+
+ return 0;
+}
+
+
+int32_t
+wb_ffr_bg_cbk (call_frame_t *frame,
+ void *cookie,
+ xlator_t *this,
+ int32_t op_ret,
+ int32_t op_errno)
+{
+ wb_local_t *local = NULL;
+ wb_file_t *file = NULL;
+
+ local = frame->local;
+ file = local->file;
+
+ if (file) {
+ fd_unref (file->fd);
+ }
+
+ if (file->op_ret == -1)
+ {
+ op_ret = file->op_ret;
+ op_errno = file->op_errno;
+
+ file->op_ret = 0;
+ }
+
+ STACK_DESTROY (frame->root);
+ return 0;
+}
+
+
+int32_t
+wb_ffr_cbk (call_frame_t *frame,
+ void *cookie,
+ xlator_t *this,
+ int32_t op_ret,
+ int32_t op_errno)
+{
+ wb_local_t *local = NULL;
+ wb_file_t *file = NULL;
+
+ local = frame->local;
+ file = local->file;
+ if (file) {
+ /* corresponds to the fd_ref() done during wb_file_create() */
+ fd_unref (file->fd);
+ }
+
+ if (file->op_ret == -1)
+ {
+ op_ret = file->op_ret;
+ op_errno = file->op_errno;
+
+ file->op_ret = 0;
+ }
+
+ STACK_UNWIND (frame, op_ret, op_errno);
+ return 0;
+}
+
+
+int32_t
+wb_flush (call_frame_t *frame,
+ xlator_t *this,
+ fd_t *fd)
+{
+ wb_conf_t *conf = NULL;
+ wb_file_t *file = NULL;
+ call_frame_t *flush_frame = NULL;
+ wb_local_t *local = NULL;
+ uint64_t tmp_file = 0;
+
+ conf = this->private;
+
+ if (fd_ctx_get (fd, this, &tmp_file)) {
+ gf_log (this->name, GF_LOG_ERROR, "returning EBADFD");
+ STACK_UNWIND (frame, -1, EBADFD);
+ return 0;
+ }
+
+ file = (wb_file_t *)(long)tmp_file;
+
+ local = CALLOC (1, sizeof (*local));
+ local->file = file;
+ if (file)
+ fd_ref (file->fd);
+
+ if (&file->request != file->request.next) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "request queue is not empty, it has to be synced");
+ }
+
+ if (conf->flush_behind &&
+ (!file->disabled) && (file->disable_till == 0)) {
+ flush_frame = copy_frame (frame);
+ STACK_UNWIND (frame, file->op_ret,
+ file->op_errno); // liar! liar! :O
+
+ flush_frame->local = local;
+ wb_sync_all (flush_frame, file);
+
+ STACK_WIND (flush_frame,
+ wb_ffr_bg_cbk,
+ FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->flush,
+ fd);
+ } else {
+ wb_sync_all (frame, file);
+
+ frame->local = local;
+ STACK_WIND (frame,
+ wb_ffr_cbk,
+ FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->flush,
+ fd);
+ }
+
+ return 0;
+}
+
+
+int32_t
+wb_fsync_cbk (call_frame_t *frame,
+ void *cookie,
+ xlator_t *this,
+ int32_t op_ret,
+ int32_t op_errno)
+{
+ wb_local_t *local = NULL;
+ wb_file_t *file = NULL;
+
+ local = frame->local;
+ file = local->file;
+
+ if (file->op_ret == -1)
+ {
+ op_ret = file->op_ret;
+ op_errno = file->op_errno;
+
+ file->op_ret = 0;
+ }
+
+ STACK_UNWIND (frame, op_ret, op_errno);
+ return 0;
+}
+
+int32_t
+wb_fsync (call_frame_t *frame,
+ xlator_t *this,
+ fd_t *fd,
+ int32_t datasync)
+{
+ wb_file_t *file = NULL;
+ wb_local_t *local = NULL;
+ uint64_t tmp_file = 0;
+
+ if (fd_ctx_get (fd, this, &tmp_file)) {
+ gf_log (this->name, GF_LOG_ERROR, "returning EBADFD");
+ STACK_UNWIND (frame, -1, EBADFD);
+ return 0;
+ }
+
+ file = (wb_file_t *)(long)tmp_file;
+ if (file)
+ wb_sync_all (frame, file);
+
+ local = CALLOC (1, sizeof (*local));
+ local->file = file;
+
+ frame->local = local;
+
+ STACK_WIND (frame,
+ wb_fsync_cbk,
+ FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->fsync,
+ fd, datasync);
+ return 0;
+}
+
+
+int32_t
+wb_release (xlator_t *this,
+ fd_t *fd)
+{
+ uint64_t file = 0;
+
+ fd_ctx_get (fd, this, &file);
+ wb_file_destroy ((wb_file_t *)(long)file);
+
+ return 0;
+}
+
+
+int32_t
+init (xlator_t *this)
+{
+ dict_t *options = NULL;
+ wb_conf_t *conf = NULL;
+ char *aggregate_size_string = NULL;
+ char *window_size_string = NULL;
+ char *flush_behind_string = NULL;
+ char *disable_till_string = NULL;
+ char *enable_O_SYNC_string = NULL;
+ int32_t ret = -1;
+
+ if ((this->children == NULL)
+ || this->children->next) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "FATAL: write-behind (%s) not configured with exactly one child",
+ this->name);
+ return -1;
+ }
+
+ if (this->parents == NULL) {
+ gf_log (this->name, GF_LOG_WARNING,
+ "dangling volume. check volfile");
+ }
+
+ options = this->options;
+
+ conf = CALLOC (1, sizeof (*conf));
+
+ conf->enable_O_SYNC = _gf_false;
+ ret = dict_get_str (options, "enable-O_SYNC",
+ &enable_O_SYNC_string);
+ if (ret == 0) {
+ ret = gf_string2boolean (enable_O_SYNC_string,
+ &conf->enable_O_SYNC);
+ if (ret == -1) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "'enable-O_SYNC' takes only boolean arguments");
+ return -1;
+ }
+ }
+
+ /* configure 'options aggregate-size <size>' */
+ conf->aggregate_size = 0;
+ ret = dict_get_str (options, "block-size",
+ &aggregate_size_string);
+ if (ret == 0) {
+ ret = gf_string2bytesize (aggregate_size_string,
+ &conf->aggregate_size);
+ if (ret != 0) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "invalid number format \"%s\" of \"option aggregate-size\"",
+ aggregate_size_string);
+ return -1;
+ }
+ }
+
+ gf_log (this->name, GF_LOG_DEBUG,
+ "using aggregate-size = %"PRIu64"",
+ conf->aggregate_size);
+
+ conf->disable_till = 1;
+ ret = dict_get_str (options, "disable-for-first-nbytes",
+ &disable_till_string);
+ if (ret == 0) {
+ ret = gf_string2bytesize (disable_till_string,
+ &conf->disable_till);
+ if (ret != 0) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "invalid number format \"%s\" of \"option disable-for-first-nbytes\"",
+ disable_till_string);
+ return -1;
+ }
+ }
+
+ gf_log (this->name, GF_LOG_DEBUG,
+ "disabling write-behind for first %"PRIu64" bytes",
+ conf->disable_till);
+
+ /* configure 'option window-size <size>' */
+ conf->window_size = 0;
+ ret = dict_get_str (options, "cache-size",
+ &window_size_string);
+ if (ret == 0) {
+ ret = gf_string2bytesize (window_size_string,
+ &conf->window_size);
+ if (ret != 0) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "invalid number format \"%s\" of \"option window-size\"",
+ window_size_string);
+ FREE (conf);
+ return -1;
+ }
+ }
+
+ if (!conf->window_size && conf->aggregate_size) {
+ gf_log (this->name, GF_LOG_WARNING,
+ "setting window-size to be equal to aggregate-size(%"PRIu64")",
+ conf->aggregate_size);
+ conf->window_size = conf->aggregate_size;
+ }
+
+ if (conf->window_size < conf->aggregate_size) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "aggregate-size(%"PRIu64") cannot be more than window-size"
+ "(%"PRIu64")", conf->window_size, conf->aggregate_size);
+ FREE (conf);
+ return -1;
+ }
+
+ /* configure 'option flush-behind <on/off>' */
+ conf->flush_behind = 0;
+ ret = dict_get_str (options, "flush-behind",
+ &flush_behind_string);
+ if (ret == 0) {
+ ret = gf_string2boolean (flush_behind_string,
+ &conf->flush_behind);
+ if (ret == -1) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "'flush-behind' takes only boolean arguments");
+ return -1;
+ }
+
+ if (conf->flush_behind) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "enabling flush-behind");
+ }
+ }
+ this->private = conf;
+ return 0;
+}
+
+
+void
+fini (xlator_t *this)
+{
+ wb_conf_t *conf = this->private;
+
+ FREE (conf);
+ return;
+}
+
+
+struct xlator_fops fops = {
+ .writev = wb_writev,
+ .open = wb_open,
+ .create = wb_create,
+ .readv = wb_readv,
+ .flush = wb_flush,
+ .fsync = wb_fsync,
+ .stat = wb_stat,
+ .fstat = wb_fstat,
+ .truncate = wb_truncate,
+ .ftruncate = wb_ftruncate,
+ .utimens = wb_utimens,
+};
+
+struct xlator_mops mops = {
+};
+
+struct xlator_cbks cbks = {
+ .release = wb_release
+};
+
+struct volume_options options[] = {
+ { .key = {"flush-behind"},
+ .type = GF_OPTION_TYPE_BOOL
+ },
+ { .key = {"block-size", "aggregate-size"},
+ .type = GF_OPTION_TYPE_SIZET,
+ .min = 128 * GF_UNIT_KB,
+ .max = 4 * GF_UNIT_MB
+ },
+ { .key = {"cache-size", "window-size"},
+ .type = GF_OPTION_TYPE_SIZET,
+ .min = 512 * GF_UNIT_KB,
+ .max = 1 * GF_UNIT_GB
+ },
+ { .key = {"disable-for-first-nbytes"},
+ .type = GF_OPTION_TYPE_SIZET,
+ .min = 1,
+ .max = 1 * GF_UNIT_MB,
+ },
+ { .key = {"enable-O_SYNC"},
+ .type = GF_OPTION_TYPE_BOOL,
+ },
+ { .key = {NULL} },
+};