diff options
Diffstat (limited to 'xlators/performance/write-behind/src/write-behind.c')
| -rw-r--r-- | xlators/performance/write-behind/src/write-behind.c | 129 |
1 files changed, 102 insertions, 27 deletions
diff --git a/xlators/performance/write-behind/src/write-behind.c b/xlators/performance/write-behind/src/write-behind.c index 7c2cb467c..95c5921c6 100644 --- a/xlators/performance/write-behind/src/write-behind.c +++ b/xlators/performance/write-behind/src/write-behind.c @@ -82,6 +82,12 @@ typedef struct wb_inode { write-behind from this list, and therefore get "upgraded" to the "liability" list. */ + list_head_t wip; /* List of write calls in progress, SYNC or non-SYNC + which are currently STACK_WIND'ed towards the server. + This is for guaranteeing that no two overlapping + writes are in progress at the same time. Modules + like eager-lock in AFR depend on this behavior. + */ uint64_t gen; /* Liability generation number. Represents the current 'state' of liability. Every new addition to the liability list bumps @@ -113,6 +119,7 @@ typedef struct wb_request { list_head_t lie; /* either in @liability or @temptation */ list_head_t winds; list_head_t unwinds; + list_head_t wip; call_stub_t *stub; @@ -206,11 +213,7 @@ wb_fd_err (fd_t *fd, xlator_t *this, int32_t *op_errno) int32_t tmp = 0; if (fd_ctx_get (fd, this, &value) == 0) { - if (value != EBADF) { - fd_ctx_set (fd, this, EBADF); - } - - if (op_errno != NULL) { + if (op_errno) { tmp = value; *op_errno = tmp; } @@ -319,6 +322,30 @@ wb_liability_has_conflict (wb_inode_t *wb_inode, wb_request_t *req) } +gf_boolean_t +wb_wip_has_conflict (wb_inode_t *wb_inode, wb_request_t *req) +{ + wb_request_t *each = NULL; + + if (req->stub->fop != GF_FOP_WRITE) + /* non-writes fundamentally never conflict with WIP requests */ + return _gf_false; + + list_for_each_entry (each, &wb_inode->wip, wip) { + if (each == req) + /* request never conflicts with itself, + though this condition should never occur. + */ + continue; + + if (wb_requests_overlap (each, req)) + return _gf_true; + } + + return _gf_false; +} + + static int __wb_request_unref (wb_request_t *req) { @@ -337,6 +364,7 @@ __wb_request_unref (wb_request_t *req) if (req->refcount == 0) { list_del_init (&req->todo); list_del_init (&req->lie); + list_del_init (&req->wip); list_del_init (&req->all); if (list_empty (&wb_inode->all)) { @@ -442,6 +470,7 @@ wb_enqueue_common (wb_inode_t *wb_inode, call_stub_t *stub, int tempted) INIT_LIST_HEAD (&req->lie); INIT_LIST_HEAD (&req->winds); INIT_LIST_HEAD (&req->unwinds); + INIT_LIST_HEAD (&req->wip); req->stub = stub; req->wb_inode = wb_inode; @@ -558,6 +587,7 @@ __wb_inode_create (xlator_t *this, inode_t *inode) INIT_LIST_HEAD (&wb_inode->todo); INIT_LIST_HEAD (&wb_inode->liability); INIT_LIST_HEAD (&wb_inode->temptation); + INIT_LIST_HEAD (&wb_inode->wip); wb_inode->this = this; @@ -715,7 +745,7 @@ wb_fulfill_cbk (call_frame_t *frame, void *cookie, xlator_t *this, } while (0) -void +int wb_fulfill_head (wb_inode_t *wb_inode, wb_request_t *head) { struct iovec vector[MAX_VECTOR_COUNT]; @@ -765,22 +795,23 @@ wb_fulfill_head (wb_inode_t *wb_inode, wb_request_t *head) head->stub->args.flags, head->stub->args.iobref, NULL); - return; + return 0; err: if (!fderr) { /* frame creation failure */ - wb_fulfill_err (head, ENOMEM); + fderr = ENOMEM; + wb_fulfill_err (head, fderr); } wb_head_done (head); - return; + return fderr; } #define NEXT_HEAD(head, req) do { \ if (head) \ - wb_fulfill_head (wb_inode, head); \ + ret |= wb_fulfill_head (wb_inode, head); \ head = req; \ expected_offset = req->stub->args.offset + \ req->write_size; \ @@ -789,7 +820,7 @@ err: } while (0) -void +int wb_fulfill (wb_inode_t *wb_inode, list_head_t *liabilities) { wb_request_t *req = NULL; @@ -799,6 +830,7 @@ wb_fulfill (wb_inode_t *wb_inode, list_head_t *liabilities) off_t expected_offset = 0; size_t curr_aggregate = 0; size_t vector_count = 0; + int ret = 0; conf = wb_inode->this->private; @@ -842,8 +874,9 @@ wb_fulfill (wb_inode_t *wb_inode, list_head_t *liabilities) } if (head) - wb_fulfill_head (wb_inode, head); - return; + ret |= wb_fulfill_head (wb_inode, head); + + return ret; } @@ -1084,6 +1117,18 @@ __wb_pick_winds (wb_inode_t *wb_inode, list_head_t *tasks, /* wait some more */ continue; + if (req->stub->fop == GF_FOP_WRITE) { + if (wb_wip_has_conflict (wb_inode, req)) + continue; + + list_add_tail (&req->wip, &wb_inode->wip); + + if (!req->ordering.tempted) + /* unrefed in wb_writev_cbk */ + req->stub->frame->local = + __wb_request_ref (req); + } + list_del_init (&req->todo); if (req->ordering.tempted) @@ -1116,38 +1161,69 @@ wb_process_queue (wb_inode_t *wb_inode) list_head_t tasks = {0, }; list_head_t lies = {0, }; list_head_t liabilities = {0, }; + int retry = 0; INIT_LIST_HEAD (&tasks); INIT_LIST_HEAD (&lies); INIT_LIST_HEAD (&liabilities); - LOCK (&wb_inode->lock); - { - __wb_preprocess_winds (wb_inode); + do { + LOCK (&wb_inode->lock); + { + __wb_preprocess_winds (wb_inode); - __wb_pick_winds (wb_inode, &tasks, &liabilities); + __wb_pick_winds (wb_inode, &tasks, &liabilities); - __wb_pick_unwinds (wb_inode, &lies); + __wb_pick_unwinds (wb_inode, &lies); - } - UNLOCK (&wb_inode->lock); + } + UNLOCK (&wb_inode->lock); - wb_do_unwinds (wb_inode, &lies); + wb_do_unwinds (wb_inode, &lies); - wb_do_winds (wb_inode, &tasks); + wb_do_winds (wb_inode, &tasks); - wb_fulfill (wb_inode, &liabilities); + /* fd might've been marked bad due to previous errors. + * Since, caller of wb_process_queue might be the last fop on + * inode, make sure we keep processing request queue, till there + * are no requests left. + */ + retry = wb_fulfill (wb_inode, &liabilities); + } while (retry); return; } int +wb_writev_cbk (call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, + struct iatt *prebuf, struct iatt *postbuf, dict_t *xdata) +{ + wb_request_t *req = NULL; + wb_inode_t *wb_inode; + + req = frame->local; + frame->local = NULL; + wb_inode = req->wb_inode; + + wb_request_unref (req); + + /* requests could be pending while this was in progress */ + wb_process_queue(wb_inode); + + STACK_UNWIND_STRICT (writev, frame, op_ret, op_errno, prebuf, postbuf, + xdata); + return 0; +} + + +int wb_writev_helper (call_frame_t *frame, xlator_t *this, fd_t *fd, struct iovec *vector, int32_t count, off_t offset, uint32_t flags, struct iobref *iobref, dict_t *xdata) { - STACK_WIND (frame, default_writev_cbk, + STACK_WIND (frame, wb_writev_cbk, FIRST_CHILD (this), FIRST_CHILD (this)->fops->writev, fd, vector, count, offset, flags, iobref, xdata); return 0; @@ -1185,8 +1261,7 @@ wb_writev (call_frame_t *frame, xlator_t *this, fd_t *fd, struct iovec *vector, if (fd->flags & (O_SYNC|O_DSYNC|o_direct)) wb_disabled = 1; - if (flags & (O_SYNC|O_DSYNC|O_DIRECT)) - /* O_DIRECT flag in params of writev must _always_ be honored */ + if (flags & (O_SYNC|O_DSYNC|o_direct)) wb_disabled = 1; if (wb_disabled) @@ -1345,7 +1420,7 @@ wb_flush (call_frame_t *frame, xlator_t *this, fd_t *fd, dict_t *xdata) if (!wb_enqueue (wb_inode, stub)) goto unwind; - wb_process_queue (wb_inode); + wb_process_queue (wb_inode); return 0; |
