summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--xlators/performance/write-behind/src/write-behind.c452
1 files changed, 279 insertions, 173 deletions
diff --git a/xlators/performance/write-behind/src/write-behind.c b/xlators/performance/write-behind/src/write-behind.c
index bd66a7ad5f3..a71d3a37855 100644
--- a/xlators/performance/write-behind/src/write-behind.c
+++ b/xlators/performance/write-behind/src/write-behind.c
@@ -52,6 +52,7 @@ typedef struct wb_file {
uint64_t disable_till;
size_t window_conf;
size_t window_current;
+ int32_t flags;
size_t aggregate_current;
int32_t refcount;
int32_t op_ret;
@@ -79,6 +80,14 @@ typedef struct wb_request {
char stack_wound;
char got_reply;
char virgin;
+ char flush_all; /* while trying to sync to back-end,
+ * don't wait till a data of size
+ * equal to configured aggregate-size
+ * is accumulated, instead sync
+ * whatever data currently present in
+ * request queue.
+ */
+
}write_request;
struct {
@@ -116,26 +125,28 @@ typedef struct wb_page wb_page_t;
int32_t
-wb_process_queue (call_frame_t *frame, wb_file_t *file, char flush_all);
+wb_process_queue (call_frame_t *frame, wb_file_t *file);
ssize_t
wb_sync (call_frame_t *frame, wb_file_t *file, list_head_t *winds);
ssize_t
__wb_mark_winds (list_head_t *list, list_head_t *winds, size_t aggregate_size,
- char wind_all, char enable_trickling_writes);
+ char enable_trickling_writes);
-static void
+static int
__wb_request_unref (wb_request_t *this)
{
+ int ret = -1;
+
if (this->refcount <= 0) {
gf_log ("wb-request", GF_LOG_DEBUG,
"refcount(%d) is <= 0", this->refcount);
- return;
+ goto out;
}
- this->refcount--;
+ ret = --this->refcount;
if (this->refcount == 0) {
list_del_init (&this->list);
if (this->stub && this->stub->fop == GF_FOP_WRITE) {
@@ -144,25 +155,33 @@ __wb_request_unref (wb_request_t *this)
GF_FREE (this);
}
+
+out:
+ return ret;
}
-static void
+static int
wb_request_unref (wb_request_t *this)
{
wb_file_t *file = NULL;
+ int ret = 0;
+
if (this == NULL) {
gf_log ("wb-request", GF_LOG_DEBUG,
"request is NULL");
- return;
+ goto out;
}
file = this->file;
LOCK (&file->lock);
{
- __wb_request_unref (this);
+ ret = __wb_request_unref (this);
}
UNLOCK (&file->lock);
+
+out:
+ return ret;
}
@@ -204,11 +223,11 @@ wb_request_ref (wb_request_t *this)
wb_request_t *
wb_enqueue (wb_file_t *file, call_stub_t *stub)
{
- wb_request_t *request = NULL;
- call_frame_t *frame = NULL;
- wb_local_t *local = NULL;
- struct iovec *vector = NULL;
- int32_t count = 0;
+ wb_request_t *request = NULL, *tmp = NULL;
+ call_frame_t *frame = NULL;
+ wb_local_t *local = NULL;
+ struct iovec *vector = NULL;
+ int32_t count = 0;
request = GF_CALLOC (1, sizeof (*request), gf_wb_mt_wb_request_t);
if (request == NULL) {
@@ -254,6 +273,13 @@ wb_enqueue (wb_file_t *file, call_stub_t *stub)
file->aggregate_current += request->write_size;
} else {
+ list_for_each_entry (tmp, &file->request, list) {
+ if (tmp->stub && tmp->stub->fop
+ == GF_FOP_WRITE) {
+ tmp->flags.write_request.flush_all = 1;
+ }
+ }
+
/*reference for resuming */
__wb_request_ref (request);
}
@@ -266,7 +292,7 @@ out:
wb_file_t *
-wb_file_create (xlator_t *this, fd_t *fd)
+wb_file_create (xlator_t *this, fd_t *fd, int32_t flags)
{
wb_file_t *file = NULL;
wb_conf_t *conf = this->private;
@@ -288,6 +314,7 @@ wb_file_create (xlator_t *this, fd_t *fd)
file->this = this;
file->refcount = 1;
file->window_conf = conf->window_size;
+ file->flags = flags;
fd_ctx_set (fd, this, (uint64_t)(long)file);
@@ -359,7 +386,7 @@ wb_sync_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret,
}
UNLOCK (&file->lock);
- ret = wb_process_queue (frame, file, 0);
+ ret = wb_process_queue (frame, file);
if ((ret == -1) && (errno == ENOMEM)) {
LOCK (&file->lock);
{
@@ -393,6 +420,12 @@ wb_sync (call_frame_t *frame, wb_file_t *file, list_head_t *winds)
size_t bytecount = 0;
wb_conf_t *conf = NULL;
fd_t *fd = NULL;
+ int32_t op_errno = -1;
+
+ if (frame == NULL) {
+ op_errno = EINVAL;
+ goto out;
+ }
conf = file->this->private;
list_for_each_entry (request, winds, winds) {
@@ -403,6 +436,8 @@ wb_sync (call_frame_t *frame, wb_file_t *file, list_head_t *winds)
}
if (total_count == 0) {
+ gf_log (file->this->name, GF_LOG_DEBUG, "no vectors are to be"
+ "synced");
goto out;
}
@@ -412,12 +447,18 @@ wb_sync (call_frame_t *frame, wb_file_t *file, list_head_t *winds)
gf_wb_mt_iovec);
if (vector == NULL) {
bytes = -1;
+ op_errno = ENOMEM;
+ gf_log (file->this->name, GF_LOG_ERROR,
+ "out of memory");
goto out;
}
iobref = iobref_new ();
if (iobref == NULL) {
bytes = -1;
+ op_errno = ENOMEM;
+ gf_log (file->this->name, GF_LOG_ERROR,
+ "out of memory");
goto out;
}
@@ -425,6 +466,9 @@ wb_sync (call_frame_t *frame, wb_file_t *file, list_head_t *winds)
gf_wb_mt_wb_local_t);
if (local == NULL) {
bytes = -1;
+ op_errno = ENOMEM;
+ gf_log (file->this->name, GF_LOG_ERROR,
+ "out of memory");
goto out;
}
@@ -466,6 +510,9 @@ wb_sync (call_frame_t *frame, wb_file_t *file, list_head_t *winds)
sync_frame = copy_frame (frame);
if (sync_frame == NULL) {
bytes = -1;
+ op_errno = ENOMEM;
+ gf_log (file->this->name, GF_LOG_ERROR,
+ "out of memory");
goto out;
}
@@ -508,6 +555,14 @@ out:
}
if (local != NULL) {
+ /* had we winded these requests, we would have unrefed
+ * in wb_sync_cbk.
+ */
+ list_for_each_entry_safe (request, dummy, &local->winds,
+ winds) {
+ wb_request_unref (request);
+ }
+
GF_FREE (local);
}
@@ -519,6 +574,27 @@ out:
GF_FREE (vector);
}
+ if (bytes == -1) {
+ /*
+ * had we winded these requests, we would have unrefed
+ * in wb_sync_cbk.
+ */
+
+ list_for_each_entry_safe (request, dummy, &local->winds,
+ winds) {
+ wb_request_unref (request);
+ }
+
+ if (file != NULL) {
+ LOCK (&file->lock);
+ {
+ file->op_ret = -1;
+ file->op_errno = op_errno;
+ }
+ UNLOCK (&file->lock);
+ }
+ }
+
return bytes;
}
@@ -553,7 +629,7 @@ wb_stat_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret,
}
if (process_frame != NULL) {
- ret = wb_process_queue (process_frame, file, 0);
+ ret = wb_process_queue (process_frame, file);
if ((ret == -1) && (errno == ENOMEM) && (file != NULL)) {
LOCK (&file->lock);
{
@@ -639,7 +715,7 @@ wb_stat (call_frame_t *frame, xlator_t *this, loc_t *loc)
goto unwind;
}
- ret = wb_process_queue (frame, file, 1);
+ ret = wb_process_queue (frame, file);
if ((ret == -1) && (errno == ENOMEM)) {
op_errno = ENOMEM;
goto unwind;
@@ -683,7 +759,7 @@ wb_fstat_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret,
request = local->request;
if ((file != NULL) && (request != NULL)) {
wb_request_unref (request);
- ret = wb_process_queue (frame, file, 0);
+ ret = wb_process_queue (frame, file);
if ((ret == -1) && (errno == ENOMEM)) {
op_ret = -1;
op_errno = ENOMEM;
@@ -757,7 +833,7 @@ wb_fstat (call_frame_t *frame, xlator_t *this, fd_t *fd)
/*
FIXME:should the request queue be emptied in case of error?
*/
- ret = wb_process_queue (frame, file, 1);
+ ret = wb_process_queue (frame, file);
if ((ret == -1) && (errno == ENOMEM)) {
op_errno = ENOMEM;
goto unwind;
@@ -813,7 +889,7 @@ wb_truncate_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
}
if (process_frame != NULL) {
- ret = wb_process_queue (process_frame, file, 0);
+ ret = wb_process_queue (process_frame, file);
if ((ret == -1) && (errno == ENOMEM) && (file != NULL)) {
LOCK (&file->lock);
{
@@ -906,7 +982,7 @@ wb_truncate (call_frame_t *frame, xlator_t *this, loc_t *loc, off_t offset)
goto unwind;
}
- ret = wb_process_queue (frame, file, 1);
+ ret = wb_process_queue (frame, file);
if ((ret == -1) && (errno == ENOMEM)) {
op_errno = ENOMEM;
goto unwind;
@@ -949,7 +1025,7 @@ wb_ftruncate_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
if ((request != NULL) && (file != NULL)) {
wb_request_unref (request);
- ret = wb_process_queue (frame, file, 0);
+ ret = wb_process_queue (frame, file);
if ((ret == -1) && (errno == ENOMEM)) {
op_ret = -1;
op_errno = ENOMEM;
@@ -1026,7 +1102,7 @@ wb_ftruncate (call_frame_t *frame, xlator_t *this, fd_t *fd, off_t offset)
goto unwind;
}
- ret = wb_process_queue (frame, file, 1);
+ ret = wb_process_queue (frame, file);
if ((ret == -1) && (errno == ENOMEM)) {
op_errno = ENOMEM;
goto unwind;
@@ -1055,7 +1131,8 @@ unwind:
int32_t
wb_setattr_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
- int32_t op_ret, int32_t op_errno, struct iatt *statpre, struct iatt *statpost)
+ int32_t op_ret, int32_t op_errno, struct iatt *statpre,
+ struct iatt *statpost)
{
wb_local_t *local = NULL;
wb_request_t *request = NULL;
@@ -1076,14 +1153,15 @@ wb_setattr_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
}
}
- STACK_UNWIND_STRICT (setattr, frame, op_ret, op_errno, statpre, statpost);
+ STACK_UNWIND_STRICT (setattr, frame, op_ret, op_errno, statpre,
+ statpost);
if (request) {
wb_request_unref (request);
}
if (request && (process_frame != NULL)) {
- ret = wb_process_queue (process_frame, file, 0);
+ ret = wb_process_queue (process_frame, file);
if ((ret == -1) && (errno == ENOMEM) && (file != NULL)) {
LOCK (&file->lock);
{
@@ -1188,7 +1266,7 @@ wb_setattr (call_frame_t *frame, xlator_t *this, loc_t *loc,
goto unwind;
}
- ret = wb_process_queue (frame, file, 1);
+ ret = wb_process_queue (frame, file);
if ((ret == -1) && (errno == ENOMEM)) {
op_errno = ENOMEM;
goto unwind;
@@ -1236,7 +1314,7 @@ wb_open_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret,
wbflags = local->wbflags;
if (op_ret != -1) {
- file = wb_file_create (this, fd);
+ file = wb_file_create (this, fd, flags);
if (file == NULL) {
op_ret = -1;
op_errno = ENOMEM;
@@ -1307,7 +1385,11 @@ wb_create_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
wb_conf_t *conf = this->private;
if (op_ret != -1) {
- file = wb_file_create (this, fd);
+ if (frame->local) {
+ flags = (long) frame->local;
+ }
+
+ file = wb_file_create (this, fd, flags);
if (file == NULL) {
op_ret = -1;
op_errno = ENOMEM;
@@ -1316,7 +1398,6 @@ wb_create_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
/* If O_DIRECT then, we disable chaching */
if (frame->local) {
- flags = (long)frame->local;
if (((flags & O_DIRECT) == O_DIRECT)
|| ((flags & O_ACCMODE) == O_RDONLY)
|| (((flags & O_SYNC) == O_SYNC)
@@ -1331,8 +1412,8 @@ wb_create_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
frame->local = NULL;
out:
- STACK_UNWIND_STRICT (create, frame, op_ret, op_errno, fd, inode, buf, preparent,
- postparent);
+ STACK_UNWIND_STRICT (create, frame, op_ret, op_errno, fd, inode, buf,
+ preparent, postparent);
return 0;
}
@@ -1351,14 +1432,22 @@ wb_create (call_frame_t *frame, xlator_t *this, loc_t *loc, int32_t flags,
return 0;
}
-
+/* Mark all the contiguous write requests for winding starting from head of
+ * request list. Stops marking at the first non-write request found. If
+ * file is opened with O_APPEND, make sure all the writes marked for winding
+ * will fit into a single write call to server.
+ */
size_t
__wb_mark_wind_all (wb_file_t *file, list_head_t *list, list_head_t *winds)
{
- wb_request_t *request = NULL;
- size_t size = 0;
- char first_request = 1;
+ wb_request_t *request = NULL;
+ size_t size = 0;
+ char first_request = 1;
off_t offset_expected = 0;
+ wb_conf_t *conf = NULL;
+ int count = 0;
+
+ conf = file->this->private;
list_for_each_entry (request, list, list)
{
@@ -1377,9 +1466,18 @@ __wb_mark_wind_all (wb_file_t *file, list_head_t *list, list_head_t *winds)
break;
}
+ if ((file->flags & O_APPEND)
+ && (((size + request->write_size)
+ > conf->aggregate_size)
+ || ((count + request->stub->args.writev.count)
+ > MAX_VECTOR_COUNT))) {
+ break;
+ }
+
size += request->write_size;
offset_expected += request->write_size;
file->aggregate_current -= request->write_size;
+ count += request->stub->args.writev.count;
request->flags.write_request.stack_wound = 1;
list_add_tail (&request->winds, winds);
@@ -1392,7 +1490,8 @@ __wb_mark_wind_all (wb_file_t *file, list_head_t *list, list_head_t *winds)
void
__wb_can_wind (list_head_t *list, char *other_fop_in_queue,
- char *non_contiguous_writes, char *incomplete_writes)
+ char *non_contiguous_writes, char *incomplete_writes,
+ char *wind_all)
{
wb_request_t *request = NULL;
char first_request = 1;
@@ -1418,7 +1517,11 @@ __wb_can_wind (list_head_t *list, char *other_fop_in_queue,
if (!request->flags.write_request.stack_wound) {
if (first_request) {
first_request = 0;
- offset_expected = request->stub->args.writev.off;
+ offset_expected
+ = request->stub->args.writev.off;
+ if (wind_all != NULL) {
+ *wind_all = request->flags.write_request.flush_all;
+ }
}
if (offset_expected != request->stub->args.writev.off) {
@@ -1438,7 +1541,7 @@ __wb_can_wind (list_head_t *list, char *other_fop_in_queue,
ssize_t
__wb_mark_winds (list_head_t *list, list_head_t *winds, size_t aggregate_conf,
- char wind_all, char enable_trickling_writes)
+ char enable_trickling_writes)
{
size_t size = 0;
char other_fop_in_queue = 0;
@@ -1446,6 +1549,7 @@ __wb_mark_winds (list_head_t *list, list_head_t *winds, size_t aggregate_conf,
char non_contiguous_writes = 0;
wb_request_t *request = NULL;
wb_file_t *file = NULL;
+ char wind_all = 0;
if (list_empty (list)) {
goto out;
@@ -1454,17 +1558,16 @@ __wb_mark_winds (list_head_t *list, list_head_t *winds, size_t aggregate_conf,
request = list_entry (list->next, typeof (*request), list);
file = request->file;
- if (!wind_all && (file->aggregate_current < aggregate_conf)) {
- __wb_can_wind (list, &other_fop_in_queue,
- &non_contiguous_writes, &incomplete_writes);
- }
+ __wb_can_wind (list, &other_fop_in_queue,
+ &non_contiguous_writes, &incomplete_writes, &wind_all);
- if ((enable_trickling_writes && !incomplete_writes)
- || (wind_all) || (non_contiguous_writes)
- || (other_fop_in_queue)
- || (file->aggregate_current >= aggregate_conf)) {
+ if (!incomplete_writes && ((enable_trickling_writes)
+ || (wind_all) || (non_contiguous_writes)
+ || (other_fop_in_queue)
+ || (file->aggregate_current
+ >= aggregate_conf))) {
size = __wb_mark_wind_all (file, list, winds);
- }
+ }
out:
return size;
@@ -1565,6 +1668,7 @@ wb_stack_unwind (list_head_t *unwinds)
wb_request_t *request = NULL, *dummy = NULL;
call_frame_t *frame = NULL;
wb_local_t *local = NULL;
+ int ret = 0, write_requests_removed = 0;
list_for_each_entry_safe (request, dummy, unwinds, unwinds)
{
@@ -1574,10 +1678,13 @@ wb_stack_unwind (list_head_t *unwinds)
STACK_UNWIND (frame, local->op_ret, local->op_errno, &buf,
&buf);
- wb_request_unref (request);
+ ret = wb_request_unref (request);
+ if (ret == 0) {
+ write_requests_removed++;
+ }
}
- return 0;
+ return write_requests_removed;
}
@@ -1615,7 +1722,7 @@ wb_resume_other_requests (call_frame_t *frame, wb_file_t *file,
}
if (fops_removed > 0) {
- ret = wb_process_queue (frame, file, 0);
+ ret = wb_process_queue (frame, file);
}
out:
@@ -1627,19 +1734,27 @@ int32_t
wb_do_ops (call_frame_t *frame, wb_file_t *file, list_head_t *winds,
list_head_t *unwinds, list_head_t *other_requests)
{
- int32_t ret = -1;
+ int32_t ret = -1, write_requests_removed = 0;
ret = wb_stack_unwind (unwinds);
- if (ret == -1) {
- goto out;
- }
+
+ write_requests_removed = ret;
ret = wb_sync (frame, file, winds);
if (ret == -1) {
goto out;
}
- ret = wb_resume_other_requests (frame, file, other_requests);
+ wb_resume_other_requests (frame, file, other_requests);
+
+ /* wb_stack_unwind does wb_request_unref after unwinding a write
+ * request. Hence if a write-request was just freed in wb_stack_unwind,
+ * we have to process request queue once again to unblock requests
+ * blocked on the writes just unwound.
+ */
+ if (write_requests_removed > 0) {
+ ret = wb_process_queue (frame, file);
+ }
out:
return ret;
@@ -1763,7 +1878,7 @@ __wb_collapse_write_bufs (list_head_t *requests, size_t page_size)
int32_t
-wb_process_queue (call_frame_t *frame, wb_file_t *file, char flush_all)
+wb_process_queue (call_frame_t *frame, wb_file_t *file)
{
list_head_t winds, unwinds, other_requests;
size_t size = 0;
@@ -1800,7 +1915,6 @@ wb_process_queue (call_frame_t *frame, wb_file_t *file, char flush_all)
if (count == 0) {
__wb_mark_winds (&file->request, &winds, size,
- flush_all,
conf->enable_trickling_writes);
}
@@ -1927,7 +2041,7 @@ wb_writev (call_frame_t *frame, xlator_t *this, fd_t *fd, struct iovec *vector,
goto unwind;
}
- ret = wb_process_queue (process_frame, file, 0);
+ ret = wb_process_queue (process_frame, file);
if ((ret == -1) && (errno == ENOMEM)) {
op_errno = ENOMEM;
goto unwind;
@@ -1969,7 +2083,7 @@ wb_readv_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret,
if ((request != NULL) && (file != NULL)) {
wb_request_unref (request);
- ret = wb_process_queue (frame, file, 0);
+ ret = wb_process_queue (frame, file);
if ((ret == -1) && (errno == ENOMEM)) {
op_ret = -1;
op_errno = ENOMEM;
@@ -2048,7 +2162,7 @@ wb_readv (call_frame_t *frame, xlator_t *this, fd_t *fd, size_t size,
return 0;
}
- ret = wb_process_queue (frame, file, 1);
+ ret = wb_process_queue (frame, file);
if ((ret == -1) && (errno == ENOMEM)) {
STACK_UNWIND_STRICT (readv, frame, -1, ENOMEM,
NULL, 0, NULL, NULL);
@@ -2082,67 +2196,102 @@ wb_ffr_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret,
int32_t op_errno)
{
wb_local_t *local = NULL;
- wb_file_t *file = NULL;
- wb_conf_t *conf = NULL;
- char unwind = 0;
- int32_t ret = -1;
- int disabled = 0;
- int64_t disable_till = 0;
+ wb_file_t *file = NULL;
+ wb_conf_t *conf = NULL;
conf = this->private;
local = frame->local;
+ file = local->file;
- if ((local != NULL) && (local->file != NULL)) {
- file = local->file;
-
+ if (file != NULL) {
LOCK (&file->lock);
{
- disabled = file->disabled;
- disable_till = file->disable_till;
+ if (file->op_ret == -1) {
+ op_ret = file->op_ret;
+ op_errno = file->op_errno;
+
+ file->op_ret = 0;
+ }
}
UNLOCK (&file->lock);
+ }
+
+ STACK_UNWIND_STRICT (flush, frame, op_ret, op_errno);
- if (conf->flush_behind
- && (!disabled) && (disable_till == 0)) {
- unwind = 1;
- } else {
- local->reply_count++;
- /*
- * without flush-behind, unwind should wait for replies
- * of writes queued before and the flush
- */
- if (local->reply_count == 2) {
- unwind = 1;
- }
+ return 0;
+}
+
+
+int32_t
+wb_flush_helper (call_frame_t *frame, xlator_t *this, fd_t *fd)
+{
+ wb_conf_t *conf = NULL;
+ wb_local_t *local = NULL;
+ wb_file_t *file = NULL;
+ call_frame_t *flush_frame = NULL, *process_frame = NULL;
+ int32_t op_ret = -1, op_errno = -1, ret = -1;
+
+ conf = this->private;
+
+ local = frame->local;
+ file = local->file;
+
+ LOCK (&file->lock);
+ {
+ op_ret = file->op_ret;
+ op_errno = file->op_errno;
+ }
+ UNLOCK (&file->lock);
+
+ if (local && local->request) {
+ process_frame = copy_frame (frame);
+ if (process_frame == NULL) {
+ gf_log (this->name, GF_LOG_ERROR, "out of memory");
+ goto unwind;
+ }
+
+ wb_request_unref (local->request);
+ }
+
+ if (conf->flush_behind) {
+ flush_frame = copy_frame (frame);
+ if (flush_frame == NULL) {
+ gf_log (this->name, GF_LOG_ERROR, "out of memory");
+ goto unwind;
}
+
+ STACK_WIND (flush_frame,
+ wb_ffr_bg_cbk,
+ FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->flush,
+ fd);
} else {
- unwind = 1;
+ STACK_WIND (frame,
+ wb_ffr_cbk,
+ FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->flush,
+ fd);
}
- if (unwind) {
- if (file != NULL) {
- LOCK (&file->lock);
- {
- if (file->op_ret == -1) {
- op_ret = file->op_ret;
- op_errno = file->op_errno;
+ if (process_frame != NULL) {
+ ret = wb_process_queue (process_frame, file);
+ if ((ret == -1) && (errno == ENOMEM)) {
+ STACK_DESTROY (process_frame->root);
+ goto unwind;
+ }
- file->op_ret = 0;
- }
- }
- UNLOCK (&file->lock);
+ STACK_DESTROY (process_frame->root);
+ }
- ret = wb_process_queue (frame, file, 0);
- if ((ret == -1) && (errno == ENOMEM)) {
- op_ret = -1;
- op_errno = ENOMEM;
- }
- }
-
+ if (conf->flush_behind) {
STACK_UNWIND_STRICT (flush, frame, op_ret, op_errno);
}
return 0;
+
+unwind:
+ STACK_UNWIND_STRICT (flush, frame, -1, ENOMEM);
+ return 0;
}
@@ -2154,12 +2303,9 @@ wb_flush (call_frame_t *frame, xlator_t *this, fd_t *fd)
wb_local_t *local = NULL;
uint64_t tmp_file = 0;
call_stub_t *stub = NULL;
- call_frame_t *process_frame = NULL;
- wb_local_t *tmp_local = NULL;
+ call_frame_t *flush_frame = NULL;
wb_request_t *request = NULL;
int32_t ret = 0;
- int disabled = 0;
- int64_t disable_till = 0;
conf = this->private;
@@ -2176,97 +2322,57 @@ wb_flush (call_frame_t *frame, xlator_t *this, fd_t *fd)
file = (wb_file_t *)(long)tmp_file;
if (file != NULL) {
- local = GF_CALLOC (1, sizeof (*local),
- gf_wb_mt_wb_local_t);
+ local = GF_CALLOC (1, sizeof (*local), gf_wb_mt_wb_local_t);
if (local == NULL) {
- STACK_UNWIND (frame, -1, ENOMEM, NULL);
+ STACK_UNWIND_STRICT (flush, frame, -1, ENOMEM);
return 0;
}
local->file = file;
frame->local = local;
- stub = fop_flush_cbk_stub (frame, wb_ffr_cbk, 0, 0);
- if (stub == NULL) {
- STACK_UNWIND_STRICT (flush, frame, -1, ENOMEM);
- return 0;
- }
- process_frame = copy_frame (frame);
- if (process_frame == NULL) {
+ stub = fop_flush_stub (frame, wb_flush_helper, fd);
+ if (stub == NULL) {
STACK_UNWIND_STRICT (flush, frame, -1, ENOMEM);
- call_stub_destroy (stub);
return 0;
}
- LOCK (&file->lock);
- {
- disabled = file->disabled;
- disable_till = file->disable_till;
- }
- UNLOCK (&file->lock);
-
- if (conf->flush_behind
- && (!disabled) && (disable_till == 0)) {
- tmp_local = GF_CALLOC (1, sizeof (*local),
- gf_wb_mt_wb_local_t);
- if (tmp_local == NULL) {
- STACK_UNWIND_STRICT (flush, frame, -1, ENOMEM);
-
- STACK_DESTROY (process_frame->root);
- call_stub_destroy (stub);
- return 0;
- }
- tmp_local->file = file;
-
- process_frame->local = tmp_local;
- }
-
- fd_ref (fd);
-
request = wb_enqueue (file, stub);
if (request == NULL) {
STACK_UNWIND_STRICT (flush, frame, -1, ENOMEM);
-
- fd_unref (fd);
call_stub_destroy (stub);
- STACK_DESTROY (process_frame->root);
return 0;
}
- ret = wb_process_queue (process_frame, file, 1);
+ ret = wb_process_queue (frame, file);
if ((ret == -1) && (errno == ENOMEM)) {
STACK_UNWIND_STRICT (flush, frame, -1, ENOMEM);
-
- fd_unref (fd);
call_stub_destroy (stub);
- STACK_DESTROY (process_frame->root);
return 0;
}
- }
-
- if ((file != NULL) && conf->flush_behind
- && (!disabled) && (disable_till == 0)) {
- STACK_WIND (process_frame,
- wb_ffr_bg_cbk,
- FIRST_CHILD(this),
- FIRST_CHILD(this)->fops->flush,
- fd);
} else {
- STACK_WIND (frame,
- wb_ffr_cbk,
- FIRST_CHILD(this),
- FIRST_CHILD(this)->fops->flush,
- fd);
+ if (conf->flush_behind) {
+ flush_frame = copy_frame (frame);
+ if (flush_frame == NULL) {
+ STACK_UNWIND_STRICT (flush, frame, -1, ENOMEM);
+ return 0;
+ }
- if (process_frame != NULL) {
- STACK_DESTROY (process_frame->root);
- }
- }
+ STACK_UNWIND_STRICT (flush, frame, 0, 0);
-
- if (file != NULL) {
- fd_unref (fd);
+ STACK_WIND (flush_frame,
+ wb_ffr_bg_cbk,
+ FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->flush,
+ fd);
+ } else {
+ STACK_WIND (frame,
+ wb_ffr_cbk,
+ FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->flush,
+ fd);
+ }
}
return 0;
@@ -2300,7 +2406,7 @@ wb_fsync_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret,
if (request) {
wb_request_unref (request);
- ret = wb_process_queue (frame, file, 0);
+ ret = wb_process_queue (frame, file);
if ((ret == -1) && (errno == ENOMEM)) {
op_ret = -1;
op_errno = ENOMEM;
@@ -2377,7 +2483,7 @@ wb_fsync (call_frame_t *frame, xlator_t *this, fd_t *fd, int32_t datasync)
return 0;
}
- ret = wb_process_queue (frame, file, 1);
+ ret = wb_process_queue (frame, file);
if ((ret == -1) && (errno == ENOMEM)) {
STACK_UNWIND_STRICT (fsync, frame, -1, ENOMEM,
NULL, NULL);