/* Copyright (c) 2008-2012 Red Hat, Inc. This file is part of GlusterFS. This file is licensed to you under your choice of the GNU Lesser General Public License, version 3 or any later version (LGPLv3 or later), or the GNU General Public License, version 2 (GPLv2), in all cases as published by the Free Software Foundation. */ #include #include #include #include #include #include #include #include #include #include #include #include "write-behind-mem-types.h" #include "write-behind-messages.h" #define MAX_VECTOR_COUNT 8 #define WB_AGGREGATE_SIZE 131072 /* 128 KB */ #define WB_WINDOW_SIZE 1048576 /* 1MB */ typedef struct list_head list_head_t; struct wb_conf; struct wb_inode; typedef struct wb_inode { ssize_t window_conf; ssize_t window_current; ssize_t transit; /* size of data stack_wound, and yet to be fulfilled (wb_fulfill_cbk). used for trickling_writes */ list_head_t all; /* All requests, from enqueue() till destroy(). Used only for resetting generation number when empty. */ list_head_t todo; /* Work to do (i.e, STACK_WIND to server). Once we STACK_WIND, the entry is taken off the list. If it is non-sync write, then we continue to track it via @liability or @temptation depending on the status of its writeback. */ list_head_t liability; /* Non-sync writes which are lied (STACK_UNWIND'ed to caller) but ack from server not yet complete. This is the "liability" which we hold, and must guarantee that dependent operations which arrive later (which overlap, etc.) are issued only after their dependencies in this list are "fulfilled". Server acks for entries in this list shrinks the window. The sum total of all req->write_size of entries in this list must be kept less than the permitted window size. */ list_head_t temptation; /* Operations for which we are tempted to 'lie' (write-behind), but temporarily holding off (because of insufficient window capacity, etc.) This is the list to look at to grow the window (in __wb_pick_unwinds()). Entries typically get chosen from write-behind from this list, and therefore get "upgraded" to the "liability" list. */ list_head_t wip; /* List of write calls in progress, SYNC or non-SYNC which are currently STACK_WIND'ed towards the server. This is for guaranteeing that no two overlapping writes are in progress at the same time. Modules like eager-lock in AFR depend on this behavior. */ list_head_t invalidate_list; /* list of wb_inodes that were marked for * iatt invalidation due to requests in * liability queue fulfilled while there * was a readdirp session on parent * directory. For a directory inode, this * list points to list of children. */ uint64_t gen; /* Liability generation number. Represents the current 'state' of liability. Every new addition to the liability list bumps the generation number. a newly arrived request is only required to perform causal checks against the entries in the liability list which were present at the time of its addition. the generation number at the time of its addition is stored in the request and used during checks. the liability list can grow while the request waits in the todo list waiting for its dependent operations to complete. however it is not of the request's concern to depend itself on those new entries which arrived after it arrived (i.e, those that have a liability generation higher than itself) */ size_t size; /* Size of the file to catch write after EOF. */ gf_lock_t lock; xlator_t *this; inode_t *inode; int dontsync; /* If positive, don't pick lies for * winding. This is needed to break infinite * recursion during invocation of * wb_process_queue from * wb_fulfill_cbk in case of an * error during fulfill. */ gf_atomic_int32_t readdirps; gf_atomic_int8_t invalidate; } wb_inode_t; typedef struct wb_request { list_head_t all; list_head_t todo; list_head_t lie; /* either in @liability or @temptation */ list_head_t winds; list_head_t unwinds; list_head_t wip; call_stub_t *stub; ssize_t write_size; /* currently held size (after collapsing) */ size_t orig_size; /* size which arrived with the request. This is the size by which we grow the window when unwinding the frame. */ size_t total_size; /* valid only in @head in wb_fulfill(). This is the size with which we perform STACK_WIND to server and therefore the amount by which we shrink the window. */ int op_ret; int op_errno; int32_t refcount; wb_inode_t *wb_inode; glusterfs_fop_t fop; gf_lkowner_t lk_owner; pid_t client_pid; struct iobref *iobref; uint64_t gen; /* inode liability state at the time of request arrival */ fd_t *fd; int wind_count; /* number of sync-attempts. Only for debug purposes */ struct { size_t size; /* 0 size == till infinity */ off_t off; int append : 1; /* offset is invalid. only one outstanding append at a time */ int tempted : 1; /* true only for non-sync writes */ int lied : 1; /* sin committed */ int fulfilled : 1; /* got server acknowledgement */ int go : 1; /* enough aggregating, good to go */ } ordering; /* for debug purposes. A request might outlive the fop it is * representing. So, preserve essential info for logging. */ uint64_t unique; uuid_t gfid; } wb_request_t; typedef struct wb_conf { uint64_t aggregate_size; uint64_t page_size; uint64_t window_size; gf_boolean_t flush_behind; gf_boolean_t trickling_writes; gf_boolean_t strict_write_ordering; gf_boolean_t strict_O_DIRECT; gf_boolean_t resync_after_fsync; } wb_conf_t; wb_inode_t * __wb_inode_ctx_get(xlator_t *this, inode_t *inode) { uint64_t value = 0; wb_inode_t *wb_inode = NULL; int ret = 0; ret = __inode_ctx_get(inode, this, &value); if (ret) return NULL; wb_inode = (wb_inode_t *)(unsigned long)value; return wb_inode; } wb_inode_t * wb_inode_ctx_get(xlator_t *this, inode_t *inode) { wb_inode_t *wb_inode = NULL; GF_VALIDATE_OR_GOTO("write-behind", this, out); GF_VALIDATE_OR_GOTO(this->name, inode, out); LOCK(&inode->lock); { wb_inode = __wb_inode_ctx_get(this, inode); } UNLOCK(&inode->lock); out: return wb_inode; } static void wb_set_invalidate(wb_inode_t *wb_inode, int set) { int readdirps = 0; inode_t *parent_inode = NULL; wb_inode_t *wb_parent_inode = NULL; parent_inode = inode_parent(wb_inode->inode, NULL, NULL); if (parent_inode) wb_parent_inode = wb_inode_ctx_get(wb_inode->this, parent_inode); if (wb_parent_inode) { LOCK(&wb_parent_inode->lock); { readdirps = GF_ATOMIC_GET(wb_parent_inode->readdirps); if (readdirps && set) { GF_ATOMIC_SWAP(wb_inode->invalidate, 1); list_del_init(&wb_inode->invalidate_list); list_add(&wb_inode->invalidate_list, &wb_parent_inode->invalidate_list); } else if (readdirps == 0) { GF_ATOMIC_SWAP(wb_inode->invalidate, 0); list_del_init(&wb_inode->invalidate_list); } } UNLOCK(&wb_parent_inode->lock); } else { GF_ATOMIC_SWAP(wb_inode->invalidate, 0); } return; } void wb_process_queue(wb_inode_t *wb_inode); /* Below is a succinct explanation of the code deciding whether two regions overlap, from Pavan . For any two ranges to be non-overlapping, either the end of the first range is lesser than the start of the second, or vice versa. Example - <---------> <--------------> p q x y ( q < x ) or (y < p) = > No overlap. To check for *overlap*, we can negate this (using de morgan's laws), and it becomes - (q >= x ) and (y >= p) Either that, or you write the negation using - if (! ((q < x) or (y < p)) ) { "Overlap" } */ gf_boolean_t wb_requests_overlap(wb_request_t *req1, wb_request_t *req2) { uint64_t r1_start = 0; uint64_t r1_end = 0; uint64_t r2_start = 0; uint64_t r2_end = 0; gf_boolean_t do_overlap = _gf_false; r1_start = req1->ordering.off; if (req1->ordering.size) r1_end = r1_start + req1->ordering.size - 1; else r1_end = ULLONG_MAX; r2_start = req2->ordering.off; if (req2->ordering.size) r2_end = r2_start + req2->ordering.size - 1; else r2_end = ULLONG_MAX; do_overlap = ((r1_end >= r2_start) && (r2_end >= r1_start)); return do_overlap; } gf_boolean_t wb_requests_conflict(wb_request_t *lie, wb_request_t *req) { wb_conf_t *conf = NULL; conf = req->wb_inode->this->private; if (lie == req) /* request cannot conflict with itself */ return _gf_false; if (lie->gen >= req->gen) /* this liability entry was behind us in the todo list */ return _gf_false; if (lie->ordering.append) /* all modifications wait for the completion of outstanding append */ return _gf_true; if (conf->strict_write_ordering) /* We are sure (lie->gen < req->gen) by now. So skip overlap check if strict write ordering is requested and always return "conflict" against a lower generation lie. */ return _gf_true; return wb_requests_overlap(lie, req); } wb_request_t * wb_liability_has_conflict(wb_inode_t *wb_inode, wb_request_t *req) { wb_request_t *each = NULL; list_for_each_entry(each, &wb_inode->liability, lie) { if (wb_requests_conflict(each, req) && (!each->ordering.fulfilled)) /* A fulfilled request shouldn't block another * request (even a dependent one) from winding. */ return each; } return NULL; } wb_request_t * wb_wip_has_conflict(wb_inode_t *wb_inode, wb_request_t *req) { wb_request_t *each = NULL; if (req->stub->fop != GF_FOP_WRITE) /* non-writes fundamentally never conflict with WIP requests */ return NULL; list_for_each_entry(each, &wb_inode->wip, wip) { if (each == req) /* request never conflicts with itself, though this condition should never occur. */ continue; if (wb_requests_overlap(each, req)) return each; } return NULL; } static int __wb_request_unref(wb_request_t *req) { int ret = -1; wb_inode_t *wb_inode = NULL; char gfid[64] = { 0, }; wb_inode = req->wb_inode; if (req->refcount <= 0) { uuid_utoa_r(req->gfid, gfid); gf_msg( "wb-request", GF_LOG_WARNING, 0, WRITE_BEHIND_MSG_RES_UNAVAILABLE, "(unique=%" PRIu64 ", fop=%s, gfid=%s, gen=%" PRIu64 "): " "refcount(%d) is <= 0 ", req->unique, gf_fop_list[req->fop], gfid, req->gen, req->refcount); goto out; } ret = --req->refcount; if (req->refcount == 0) { uuid_utoa_r(req->gfid, gfid); gf_log_callingfn(wb_inode->this->name, GF_LOG_DEBUG, "(unique = %" PRIu64 ", fop=%s, gfid=%s, " "gen=%" PRIu64 "): destroying request, " "removing from all queues", req->unique, gf_fop_list[req->fop], gfid, req->gen); list_del_init(&req->todo); list_del_init(&req->lie); list_del_init(&req->wip); list_del_init(&req->all); if (list_empty(&wb_inode->all)) { wb_inode->gen = 0; /* in case of accounting errors? */ wb_inode->window_current = 0; } list_del_init(&req->winds); list_del_init(&req->unwinds); if (req->stub) { call_stub_destroy(req->stub); req->stub = NULL; } if (req->iobref) iobref_unref(req->iobref); if (req->fd) fd_unref(req->fd); GF_FREE(req); } out: return ret; } static int wb_request_unref(wb_request_t *req) { wb_inode_t *wb_inode = NULL; int ret = -1; GF_VALIDATE_OR_GOTO("write-behind", req, out); wb_inode = req->wb_inode; LOCK(&wb_inode->lock); { ret = __wb_request_unref(req); } UNLOCK(&wb_inode->lock); out: return ret; } static wb_request_t * __wb_request_ref(wb_request_t *req) { GF_VALIDATE_OR_GOTO("write-behind", req, out); if (req->refcount < 0) { gf_msg("wb-request", GF_LOG_WARNING, 0, WRITE_BEHIND_MSG_RES_UNAVAILABLE, "refcount(%d) is < 0", req->refcount); req = NULL; goto out; } req->refcount++; out: return req; } wb_request_t * wb_request_ref(wb_request_t *req) { wb_inode_t *wb_inode = NULL; GF_VALIDATE_OR_GOTO("write-behind", req, out); wb_inode = req->wb_inode; LOCK(&wb_inode->lock); { req = __wb_request_ref(req); } UNLOCK(&wb_inode->lock); out: return req; } gf_boolean_t wb_enqueue_common(wb_inode_t *wb_inode, call_stub_t *stub, int tempted) { wb_request_t *req = NULL; inode_t *inode = NULL; GF_VALIDATE_OR_GOTO("write-behind", wb_inode, out); GF_VALIDATE_OR_GOTO(wb_inode->this->name, stub, out); req = GF_CALLOC(1, sizeof(*req), gf_wb_mt_wb_request_t); if (!req) goto out; INIT_LIST_HEAD(&req->all); INIT_LIST_HEAD(&req->todo); INIT_LIST_HEAD(&req->lie); INIT_LIST_HEAD(&req->winds); INIT_LIST_HEAD(&req->unwinds); INIT_LIST_HEAD(&req->wip); req->stub = stub; req->wb_inode = wb_inode; req->fop = stub->fop; req->ordering.tempted = tempted; req->unique = stub->frame->root->unique; inode = ((stub->args.fd != NULL) ? stub->args.fd->inode : stub->args.loc.inode); if (inode) gf_uuid_copy(req->gfid, inode->gfid); if (stub->fop == GF_FOP_WRITE) { req->write_size = iov_length(stub->args.vector, stub->args.count); /* req->write_size can change as we collapse small writes. But the window needs to grow only by how much we acknowledge the app. so copy the original size in orig_size for the purpose of accounting. */ req->orig_size = req->write_size; /* Let's be optimistic that we can lie about it */ req->op_ret = req->write_size; req->op_errno = 0; if (stub->args.fd && (stub->args.fd->flags & O_APPEND)) req->ordering.append = 1; } req->lk_owner = stub->frame->root->lk_owner; req->client_pid = stub->frame->root->pid; switch (stub->fop) { case GF_FOP_WRITE: LOCK(&wb_inode->lock); { if (wb_inode->size < stub->args.offset) { req->ordering.off = wb_inode->size; req->ordering.size = stub->args.offset + req->write_size - wb_inode->size; } else { req->ordering.off = stub->args.offset; req->ordering.size = req->write_size; } if (wb_inode->size < stub->args.offset + req->write_size) wb_inode->size = stub->args.offset + req->write_size; } UNLOCK(&wb_inode->lock); req->fd = fd_ref(stub->args.fd); break; case GF_FOP_READ: req->ordering.off = stub->args.offset; req->ordering.size = stub->args.size; req->fd = fd_ref(stub->args.fd); break; case GF_FOP_TRUNCATE: req->ordering.off = stub->args.offset; req->ordering.size = 0; /* till infinity */ LOCK(&wb_inode->lock); { wb_inode->size = req->ordering.off; } UNLOCK(&wb_inode->lock); break; case GF_FOP_FTRUNCATE: req->ordering.off = stub->args.offset; req->ordering.size = 0; /* till infinity */ LOCK(&wb_inode->lock); { wb_inode->size = req->ordering.off; } UNLOCK(&wb_inode->lock); req->fd = fd_ref(stub->args.fd); break; default: if (stub && stub->args.fd) req->fd = fd_ref(stub->args.fd); break; } LOCK(&wb_inode->lock); { list_add_tail(&req->all, &wb_inode->all); req->gen = wb_inode->gen; list_add_tail(&req->todo, &wb_inode->todo); __wb_request_ref(req); /* for wind */ if (req->ordering.tempted) { list_add_tail(&req->lie, &wb_inode->temptation); __wb_request_ref(req); /* for unwind */ } } UNLOCK(&wb_inode->lock); out: if (!req) return _gf_false; return _gf_true; } gf_boolean_t wb_enqueue(wb_inode_t *wb_inode, call_stub_t *stub) { return wb_enqueue_common(wb_inode, stub, 0); } gf_boolean_t wb_enqueue_tempted(wb_inode_t *wb_inode, call_stub_t *stub) { return wb_enqueue_common(wb_inode, stub, 1); } wb_inode_t * __wb_inode_create(xlator_t *this, inode_t *inode) { wb_inode_t *wb_inode = NULL; wb_conf_t *conf = NULL; int ret = 0; GF_VALIDATE_OR_GOTO(this->name, inode, out); conf = this->private; wb_inode = GF_CALLOC(1, sizeof(*wb_inode), gf_wb_mt_wb_inode_t); if (!wb_inode) goto out; INIT_LIST_HEAD(&wb_inode->all); INIT_LIST_HEAD(&wb_inode->todo); INIT_LIST_HEAD(&wb_inode->liability); INIT_LIST_HEAD(&wb_inode->temptation); INIT_LIST_HEAD(&wb_inode->wip); INIT_LIST_HEAD(&wb_inode->invalidate_list); wb_inode->this = this; wb_inode->window_conf = conf->window_size; wb_inode->inode = inode; LOCK_INIT(&wb_inode->lock); GF_ATOMIC_INIT(wb_inode->invalidate, 0); GF_ATOMIC_INIT(wb_inode->readdirps, 0); ret = __inode_ctx_put(inode, this, (uint64_t)(unsigned long)wb_inode); if (ret) { GF_FREE(wb_inode); wb_inode = NULL; } out: return wb_inode; } wb_inode_t * wb_inode_create(xlator_t *this, inode_t *inode) { wb_inode_t *wb_inode = NULL; GF_VALIDATE_OR_GOTO(this->name, inode, out); LOCK(&inode->lock); { wb_inode = __wb_inode_ctx_get(this, inode); if (!wb_inode) wb_inode = __wb_inode_create(this, inode); } UNLOCK(&inode->lock); out: return wb_inode; } void wb_inode_destroy(wb_inode_t *wb_inode) { GF_VALIDATE_OR_GOTO("write-behind", wb_inode, out); LOCK_DESTROY(&wb_inode->lock); GF_FREE(wb_inode); out: return; } void __wb_fulfill_request(wb_request_t *req) { wb_inode_t *wb_inode = NULL; char gfid[64] = { 0, }; wb_inode = req->wb_inode; req->ordering.fulfilled = 1; wb_inode->window_current -= req->total_size; wb_inode->transit -= req->total_size; uuid_utoa_r(req->gfid, gfid); gf_log_callingfn(wb_inode->this->name, GF_LOG_DEBUG, "(unique=%" PRIu64 ", fop=%s, gfid=%s, " "gen=%" PRIu64 "): request fulfilled. " "removing the request from liability queue? = %s", req->unique, gf_fop_list[req->fop], gfid, req->gen, req->ordering.lied ? "yes" : "no"); if (req->ordering.lied) { /* 1. If yes, request is in liability queue and hence can be safely removed from list. 2. If no, request is in temptation queue and hence should be left in the queue so that wb_pick_unwinds picks it up */ list_del_init(&req->lie); } else { /* TODO: fail the req->frame with error if necessary */ } list_del_init(&req->wip); __wb_request_unref(req); } /* get a flush/fsync waiting on req */ wb_request_t * __wb_request_waiting_on(wb_request_t *req) { wb_inode_t *wb_inode = NULL; wb_request_t *trav = NULL; wb_inode = req->wb_inode; list_for_each_entry(trav, &wb_inode->todo, todo) { if (((trav->stub->fop == GF_FOP_FLUSH) || (trav->stub->fop == GF_FOP_FSYNC)) && (trav->gen >= req->gen)) return trav; } return NULL; } void __wb_add_request_for_retry(wb_request_t *req) { wb_inode_t *wb_inode = NULL; if (!req) goto out; wb_inode = req->wb_inode; /* response was unwound and no waiter waiting on this request, retry till a flush or fsync (subject to conf->resync_after_fsync). */ wb_inode->transit -= req->total_size; req->total_size = 0; list_del_init(&req->winds); list_del_init(&req->todo); list_del_init(&req->wip); /* sanitize ordering flags to retry */ req->ordering.go = 0; /* Add back to todo list to retry */ list_add(&req->todo, &wb_inode->todo); out: return; } void __wb_add_head_for_retry(wb_request_t *head) { wb_request_t *req = NULL, *tmp = NULL; if (!head) goto out; list_for_each_entry_safe_reverse(req, tmp, &head->winds, winds) { __wb_add_request_for_retry(req); } __wb_add_request_for_retry(head); out: return; } void wb_add_head_for_retry(wb_request_t *head) { if (!head) goto out; LOCK(&head->wb_inode->lock); { __wb_add_head_for_retry(head); } UNLOCK(&head->wb_inode->lock); out: return; } void __wb_fulfill_request_err(wb_request_t *req, int32_t op_errno) { wb_inode_t *wb_inode = NULL; wb_request_t *waiter = NULL; wb_conf_t *conf = NULL; wb_inode = req->wb_inode; conf = wb_inode->this->private; req->op_ret = -1; req->op_errno = op_errno; if (req->ordering.lied) waiter = __wb_request_waiting_on(req); if (!req->ordering.lied || waiter) { if (!req->ordering.lied) { /* response to app is still pending, send failure in * response. */ } else { /* response was sent, store the error in a * waiter (either an fsync or flush). */ waiter->op_ret = -1; waiter->op_errno = op_errno; } if (!req->ordering.lied || (waiter->stub->fop == GF_FOP_FLUSH) || ((waiter->stub->fop == GF_FOP_FSYNC) && !conf->resync_after_fsync)) { /* No retry needed, forget the request */ __wb_fulfill_request(req); return; } } __wb_add_request_for_retry(req); return; } void wb_head_done(wb_request_t *head) { wb_request_t *req = NULL; wb_request_t *tmp = NULL; wb_inode_t *wb_inode = NULL; wb_inode = head->wb_inode; LOCK(&wb_inode->lock); { list_for_each_entry_safe(req, tmp, &head->winds, winds) { __wb_fulfill_request(req); } __wb_fulfill_request(head); } UNLOCK(&wb_inode->lock); } void __wb_fulfill_err(wb_request_t *head, int op_errno) { wb_request_t *req = NULL, *tmp = NULL; if (!head) goto out; head->wb_inode->dontsync++; list_for_each_entry_safe_reverse(req, tmp, &head->winds, winds) { __wb_fulfill_request_err(req, op_errno); } __wb_fulfill_request_err(head, op_errno); out: return; } void wb_fulfill_err(wb_request_t *head, int op_errno) { wb_inode_t *wb_inode = NULL; wb_inode = head->wb_inode; LOCK(&wb_inode->lock); { __wb_fulfill_err(head, op_errno); } UNLOCK(&wb_inode->lock); } void __wb_modify_write_request(wb_request_t *req, int synced_size) { struct iovec *vector = NULL; int count = 0; if (!req || synced_size == 0) goto out; req->write_size -= synced_size; req->stub->args.offset += synced_size; vector = req->stub->args.vector; count = req->stub->args.count; req->stub->args.count = iov_subset(vector, count, synced_size, iov_length(vector, count), vector); out: return; } int __wb_fulfill_short_write(wb_request_t *req, int size, gf_boolean_t *fulfilled) { int accounted_size = 0; if (req == NULL) goto out; if (req->write_size <= size) { accounted_size = req->write_size; __wb_fulfill_request(req); *fulfilled = 1; } else { accounted_size = size; __wb_modify_write_request(req, size); *fulfilled = 0; } out: return accounted_size; } void wb_fulfill_short_write(wb_request_t *head, int size) { wb_inode_t *wb_inode = NULL; wb_request_t *req = NULL, *next = NULL; int accounted_size = 0; gf_boolean_t fulfilled = _gf_false; if (!head) goto out; wb_inode = head->wb_inode; req = head; LOCK(&wb_inode->lock); { /* hold a reference to head so that __wb_fulfill_short_write * won't free it. We need head for a cleaner list traversal as * list_for_each_entry_safe doesn't iterate over "head" member. * So, if we pass "next->winds" as head to list_for_each_entry, * "next" is skipped. For a simpler logic we need to traverse * the list in the order. So, we start traversal from * "head->winds" and hence we want head to be alive. */ __wb_request_ref(head); next = list_entry(head->winds.next, wb_request_t, winds); accounted_size = __wb_fulfill_short_write(head, size, &fulfilled); size -= accounted_size; if (size == 0) { if (fulfilled && (next != head)) req = next; goto done; } list_for_each_entry_safe(req, next, &head->winds, winds) { accounted_size = __wb_fulfill_short_write(req, size, &fulfilled); size -= accounted_size; if (size == 0) { if (fulfilled && (next != head)) req = next; break; } } done: __wb_request_unref(head); } UNLOCK(&wb_inode->lock); wb_add_head_for_retry(req); out: return; } int wb_fulfill_cbk(call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, struct iatt *prebuf, struct iatt *postbuf, dict_t *xdata) { wb_inode_t *wb_inode = NULL; wb_request_t *head = NULL; head = frame->local; frame->local = NULL; wb_inode = head->wb_inode; /* There could be a readdirp session in progress. Since wb_fulfill_cbk * can potentially remove a request from liability queue, * wb_readdirp_cbk will miss writes on this inode (as it invalidates * stats only if liability queue is not empty) and hence mark inode * for invalidation of stats in readdirp response. Specifically this * code fixes the following race mentioned in wb_readdirp_cbk: */ /* * We cannot guarantee integrity of entry->d_stat as there are cached * writes. The stat is most likely stale as it doesn't account the * cached writes. However, checking for non-empty liability list here is * not a fool-proof solution as there can be races like, * 1. readdirp is successful on posix * 2. sync of cached write is successful on posix * 3. write-behind received sync response and removed the request from * liability queue * 4. readdirp response is processed at write-behind * * In the above scenario, stat for the file is sent back in readdirp * response but it is stale. * */ wb_set_invalidate(wb_inode, 1); if (op_ret == -1) { wb_fulfill_err(head, op_errno); } else if (op_ret < head->total_size) { wb_fulfill_short_write(head, op_ret); } else { wb_head_done(head); } wb_process_queue(wb_inode); STACK_DESTROY(frame->root); return 0; } #define WB_IOV_LOAD(vec, cnt, req, head) \ do { \ memcpy(&vec[cnt], req->stub->args.vector, \ (req->stub->args.count * sizeof(vec[0]))); \ cnt += req->stub->args.count; \ head->total_size += req->write_size; \ } while (0) int wb_fulfill_head(wb_inode_t *wb_inode, wb_request_t *head) { struct iovec vector[MAX_VECTOR_COUNT]; int count = 0; wb_request_t *req = NULL; call_frame_t *frame = NULL; /* make sure head->total_size is updated before we run into any * errors */ WB_IOV_LOAD(vector, count, head, head); list_for_each_entry(req, &head->winds, winds) { WB_IOV_LOAD(vector, count, req, head); if (iobref_merge(head->stub->args.iobref, req->stub->args.iobref)) goto err; } frame = create_frame(wb_inode->this, wb_inode->this->ctx->pool); if (!frame) goto err; frame->root->lk_owner = head->lk_owner; frame->root->pid = head->client_pid; frame->local = head; LOCK(&wb_inode->lock); { wb_inode->transit += head->total_size; } UNLOCK(&wb_inode->lock); STACK_WIND(frame, wb_fulfill_cbk, FIRST_CHILD(frame->this), FIRST_CHILD(frame->this)->fops->writev, head->fd, vector, count, head->stub->args.offset, head->stub->args.flags, head->stub->args.iobref, NULL); return 0; err: /* frame creation failure */ wb_fulfill_err(head, ENOMEM); return ENOMEM; } #define NEXT_HEAD(head, req) \ do { \ if (head) \ ret |= wb_fulfill_head(wb_inode, head); \ head = req; \ expected_offset = req->stub->args.offset + req->write_size; \ curr_aggregate = 0; \ vector_count = 0; \ } while (0) int wb_fulfill(wb_inode_t *wb_inode, list_head_t *liabilities) { wb_request_t *req = NULL; wb_request_t *head = NULL; wb_request_t *tmp = NULL; wb_conf_t *conf = NULL; off_t expected_offset = 0; size_t curr_aggregate = 0; size_t vector_count = 0; int ret = 0; conf = wb_inode->this->private; list_for_each_entry_safe(req, tmp, liabilities, winds) { list_del_init(&req->winds); if (!head) { NEXT_HEAD(head, req); continue; } if (req->fd != head->fd) { NEXT_HEAD(head, req); continue; } if (!is_same_lkowner(&req->lk_owner, &head->lk_owner)) { NEXT_HEAD(head, req); continue; } if (expected_offset != req->stub->args.offset) { NEXT_HEAD(head, req); continue; } if ((curr_aggregate + req->write_size) > conf->aggregate_size) { NEXT_HEAD(head, req); continue; } if (vector_count + req->stub->args.count > MAX_VECTOR_COUNT) { NEXT_HEAD(head, req); continue; } list_add_tail(&req->winds, &head->winds); curr_aggregate += req->write_size; vector_count += req->stub->args.count; } if (head) ret |= wb_fulfill_head(wb_inode, head); return ret; } void wb_do_unwinds(wb_inode_t *wb_inode, list_head_t *lies) { wb_request_t *req = NULL; wb_request_t *tmp = NULL; call_frame_t *frame = NULL; struct iatt buf = { 0, }; list_for_each_entry_safe(req, tmp, lies, unwinds) { frame = req->stub->frame; STACK_UNWIND_STRICT(writev, frame, req->op_ret, req->op_errno, &buf, &buf, NULL); /* :O */ req->stub->frame = NULL; list_del_init(&req->unwinds); wb_request_unref(req); } return; } void __wb_pick_unwinds(wb_inode_t *wb_inode, list_head_t *lies) { wb_request_t *req = NULL; wb_request_t *tmp = NULL; char gfid[64] = { 0, }; list_for_each_entry_safe(req, tmp, &wb_inode->temptation, lie) { if (!req->ordering.fulfilled && wb_inode->window_current > wb_inode->window_conf) continue; list_del_init(&req->lie); list_move_tail(&req->unwinds, lies); wb_inode->window_current += req->orig_size; if (!req->ordering.fulfilled) { /* burden increased */ list_add_tail(&req->lie, &wb_inode->liability); req->ordering.lied = 1; wb_inode->gen++; uuid_utoa_r(req->gfid, gfid); gf_msg_debug(wb_inode->this->name, 0, "(unique=%" PRIu64 ", fop=%s, gfid=%s, " "gen=%" PRIu64 "): added req to liability " "queue. inode-generation-number=%" PRIu64, req->stub->frame->root->unique, gf_fop_list[req->fop], gfid, req->gen, wb_inode->gen); } } return; } int __wb_collapse_small_writes(wb_conf_t *conf, wb_request_t *holder, wb_request_t *req) { char *ptr = NULL; struct iobuf *iobuf = NULL; struct iobref *iobref = NULL; int ret = -1; ssize_t required_size = 0; size_t holder_len = 0; size_t req_len = 0; if (!holder->iobref) { holder_len = iov_length(holder->stub->args.vector, holder->stub->args.count); req_len = iov_length(req->stub->args.vector, req->stub->args.count); required_size = max((conf->page_size), (holder_len + req_len)); iobuf = iobuf_get2(req->wb_inode->this->ctx->iobuf_pool, required_size); if (iobuf == NULL) { goto out; } iobref = iobref_new(); if (iobref == NULL) { iobuf_unref(iobuf); goto out; } ret = iobref_add(iobref, iobuf); if (ret != 0) { gf_msg(req->wb_inode->this->name, GF_LOG_WARNING, -ret, WRITE_BEHIND_MSG_INVALID_ARGUMENT, "cannot add iobuf (%p) into iobref (%p)", iobuf, iobref); iobuf_unref(iobuf); iobref_unref(iobref); goto out; } iov_unload(iobuf->ptr, holder->stub->args.vector, holder->stub->args.count); holder->stub->args.vector[0].iov_base = iobuf->ptr; holder->stub->args.count = 1; iobref_unref(holder->stub->args.iobref); holder->stub->args.iobref = iobref; iobuf_unref(iobuf); holder->iobref = iobref_ref(iobref); } ptr = holder->stub->args.vector[0].iov_base + holder->write_size; iov_unload(ptr, req->stub->args.vector, req->stub->args.count); holder->stub->args.vector[0].iov_len += req->write_size; holder->write_size += req->write_size; holder->ordering.size += req->write_size; ret = 0; out: return ret; } void __wb_preprocess_winds(wb_inode_t *wb_inode) { off_t offset_expected = 0; ssize_t space_left = 0; wb_request_t *req = NULL; wb_request_t *tmp = NULL; wb_request_t *holder = NULL; wb_conf_t *conf = NULL; int ret = 0; ssize_t page_size = 0; char gfid[64] = { 0, }; /* With asynchronous IO from a VM guest (as a file), there can be two sequential writes happening in two regions of the file. But individual (broken down) IO requests can arrive interleaved. TODO: cycle for each such sequence sifting through the interleaved ops */ conf = wb_inode->this->private; page_size = conf->page_size; list_for_each_entry_safe(req, tmp, &wb_inode->todo, todo) { if (wb_inode->dontsync && req->ordering.lied) { /* sync has failed. Don't pick lies _again_ for winding * as winding these lies again will trigger an infinite * recursion of wb_process_queue being called from a * failed fulfill. However, pick non-lied requests for * winding so that application won't block indefinitely * waiting for write result. */ uuid_utoa_r(req->gfid, gfid); gf_msg_debug(wb_inode->this->name, 0, "(unique=%" PRIu64 ", fop=%s, gfid=%s, " "gen=%" PRIu64 "): not setting ordering.go" "as dontsync is set", req->unique, gf_fop_list[req->fop], gfid, req->gen); continue; } if (!req->ordering.tempted) { if (holder) { if (wb_requests_conflict(holder, req)) /* do not hold on write if a dependent write is in queue */ holder->ordering.go = 1; } /* collapse only non-sync writes */ continue; } else if (!holder) { /* holder is always a non-sync write */ holder = req; continue; } offset_expected = holder->stub->args.offset + holder->write_size; if (req->stub->args.offset != offset_expected) { holder->ordering.go = 1; holder = req; continue; } if (!is_same_lkowner(&req->lk_owner, &holder->lk_owner)) { holder->ordering.go = 1; holder = req; continue; } if (req->fd != holder->fd) { holder->ordering.go = 1; holder = req; continue; } space_left = page_size - holder->write_size; if (space_left < req->write_size) { holder->ordering.go = 1; holder = req; continue; } ret = __wb_collapse_small_writes(conf, holder, req); if (ret) continue; /* collapsed request is as good as wound (from its p.o.v) */ list_del_init(&req->todo); __wb_fulfill_request(req); /* Only the last @holder in queue which - does not have any non-buffered-writes following it - has not yet filled its capacity does not get its 'go' set, in anticipation of the arrival of consecutive smaller writes. */ } /* but if trickling writes are enabled, then do not hold back writes if there are no outstanding requests */ if (conf->trickling_writes && !wb_inode->transit && holder) holder->ordering.go = 1; if (wb_inode->dontsync > 0) wb_inode->dontsync--; return; } int __wb_handle_failed_conflict(wb_request_t *req, wb_request_t *conflict, list_head_t *tasks) { wb_conf_t *conf = NULL; char gfid[64] = { 0, }; conf = req->wb_inode->this->private; uuid_utoa_r(req->gfid, gfid); if ((req->stub->fop != GF_FOP_FLUSH) && ((req->stub->fop != GF_FOP_FSYNC) || conf->resync_after_fsync)) { if (!req->ordering.lied && list_empty(&conflict->wip)) { /* If request itself is in liability queue, * 1. We cannot unwind as the response has already been * sent. * 2. We cannot wind till conflict clears up. * 3. So, skip the request for now. * 4. Otherwise, resume (unwind) it with error. */ req->op_ret = -1; req->op_errno = conflict->op_errno; list_del_init(&req->todo); list_add_tail(&req->winds, tasks); gf_msg_debug(req->wb_inode->this->name, 0, "(unique=%" PRIu64 ", fop=%s, gfid=%s, " "gen=%" PRIu64 "): A conflicting write " "request in liability queue has failed " "to sync (error = \"%s\"), " "unwinding this request as a failure", req->unique, gf_fop_list[req->fop], gfid, req->gen, strerror(req->op_errno)); if (req->ordering.tempted) { /* make sure that it won't be unwound in * wb_do_unwinds too. Otherwise there'll be * a double wind. */ list_del_init(&req->lie); gf_msg_debug(req->wb_inode->this->name, 0, "(unique=%" PRIu64 ", fop=%s, " "gfid=%s, gen=%" PRIu64 "): " "removed from liability queue", req->unique, gf_fop_list[req->fop], gfid, req->gen); __wb_fulfill_request(req); } } } else { gf_msg_debug(req->wb_inode->this->name, 0, "(unique=%" PRIu64 ", fop=%s, gfid=%s, " "gen=%" PRIu64 "): A conflicting write request " "in liability queue has failed to sync " "(error = \"%s\"). This is an " "FSYNC/FLUSH and we need to maintain ordering " "guarantees with other writes in TODO queue. " "Hence doing nothing now", req->unique, gf_fop_list[req->fop], gfid, req->gen, strerror(conflict->op_errno)); /* flush and fsync (without conf->resync_after_fsync) act as barriers. We cannot unwind them out of order, when there are earlier generation writes just because there is a conflicting liability with an error. So, wait for our turn till there are no conflicting liabilities. This situation can arise when there liabilities spread across multiple generations. For eg., consider two writes with following characterstics: 1. they belong to different generations gen1, gen2 and (gen1 > gen2). 2. they overlap. 3. both are liabilities. 4. gen1 write was attempted to sync, but the attempt failed. 5. there was no attempt to sync gen2 write yet. 6. A flush (as part of close) is issued and gets a gen no gen3. In the above scenario, if flush is unwound without waiting for gen1 and gen2 writes either to be successfully synced or purged, we end up with these two writes in wb_inode->todo list forever as there will be no attempt to process the queue as flush is the last operation. */ } return 0; } int __wb_pick_winds(wb_inode_t *wb_inode, list_head_t *tasks, list_head_t *liabilities) { wb_request_t *req = NULL; wb_request_t *tmp = NULL; wb_request_t *conflict = NULL; char req_gfid[64] = { 0, }, conflict_gfid[64] = { 0, }; list_for_each_entry_safe(req, tmp, &wb_inode->todo, todo) { uuid_utoa_r(req->gfid, req_gfid); conflict = wb_liability_has_conflict(wb_inode, req); if (conflict) { uuid_utoa_r(conflict->gfid, conflict_gfid); gf_msg_debug(wb_inode->this->name, 0, "Not winding request due to a " "conflicting write in liability queue. " "REQ: unique=%" PRIu64 ", fop=%s, " "gen=%" PRIu64 ", gfid=%s. " "CONFLICT: unique=%" PRIu64 ", fop=%s, " "gen=%" PRIu64 ", gfid=%s, " "conflicts-sync-failed?=%s, " "conflicts-error=%s", req->unique, gf_fop_list[req->fop], req->gen, req_gfid, conflict->unique, gf_fop_list[conflict->fop], conflict->gen, conflict_gfid, (conflict->op_ret == 1) ? "yes" : "no", strerror(conflict->op_errno)); if (conflict->op_ret == -1) { /* There is a conflicting liability which failed * to sync in previous attempts, resume the req * and fail, unless its an fsync/flush. */ __wb_handle_failed_conflict(req, conflict, tasks); } else { /* There is a conflicting liability which was * not attempted to sync even once. Wait till * at least one attempt to sync is made. */ } continue; } if (req->ordering.tempted && !req->ordering.go) { /* wait some more */ gf_msg_debug(wb_inode->this->name, 0, "(unique=%" PRIu64 ", fop=%s, gen=%" PRIu64 ", gfid=%s): ordering.go is not set, " "hence not winding", req->unique, gf_fop_list[req->fop], req->gen, req_gfid); continue; } if (req->stub->fop == GF_FOP_WRITE) { conflict = wb_wip_has_conflict(wb_inode, req); if (conflict) { uuid_utoa_r(conflict->gfid, conflict_gfid); gf_msg_debug(wb_inode->this->name, 0, "Not winding write request as " "a conflicting write is being " "synced to backend. " "REQ: unique=%" PRIu64 " fop=%s," " gen=%" PRIu64 ", gfid=%s. " "CONFLICT: unique=%" PRIu64 " " "fop=%s, gen=%" PRIu64 ", " "gfid=%s", req->unique, gf_fop_list[req->fop], req->gen, req_gfid, conflict->unique, gf_fop_list[conflict->fop], conflict->gen, conflict_gfid); continue; } list_add_tail(&req->wip, &wb_inode->wip); req->wind_count++; if (!req->ordering.tempted) /* unrefed in wb_writev_cbk */ req->stub->frame->local = __wb_request_ref(req); } gf_msg_debug(wb_inode->this->name, 0, "(unique=%" PRIu64 ", fop=%s, gfid=%s, " "gen=%" PRIu64 "): picking the request for " "winding", req->unique, gf_fop_list[req->fop], req_gfid, req->gen); list_del_init(&req->todo); if (req->ordering.tempted) { list_add_tail(&req->winds, liabilities); } else { list_add_tail(&req->winds, tasks); } } return 0; } void wb_do_winds(wb_inode_t *wb_inode, list_head_t *tasks) { wb_request_t *req = NULL; wb_request_t *tmp = NULL; list_for_each_entry_safe(req, tmp, tasks, winds) { list_del_init(&req->winds); if (req->op_ret == -1) { call_unwind_error_keep_stub(req->stub, req->op_ret, req->op_errno); } else { call_resume_keep_stub(req->stub); } wb_request_unref(req); } } void wb_process_queue(wb_inode_t *wb_inode) { list_head_t tasks; list_head_t lies; list_head_t liabilities; int wind_failure = 0; INIT_LIST_HEAD(&tasks); INIT_LIST_HEAD(&lies); INIT_LIST_HEAD(&liabilities); do { gf_log_callingfn(wb_inode->this->name, GF_LOG_DEBUG, "processing queues"); LOCK(&wb_inode->lock); { __wb_preprocess_winds(wb_inode); __wb_pick_winds(wb_inode, &tasks, &liabilities); __wb_pick_unwinds(wb_inode, &lies); } UNLOCK(&wb_inode->lock); if (!list_empty(&lies)) wb_do_unwinds(wb_inode, &lies); if (!list_empty(&tasks)) wb_do_winds(wb_inode, &tasks); /* If there is an error in wb_fulfill before winding write * requests, we would miss invocation of wb_process_queue * from wb_fulfill_cbk. So, retry processing again. */ if (!list_empty(&liabilities)) wind_failure = wb_fulfill(wb_inode, &liabilities); } while (wind_failure); return; } void wb_set_inode_size(wb_inode_t *wb_inode, struct iatt *postbuf) { GF_ASSERT(wb_inode); GF_ASSERT(postbuf); LOCK(&wb_inode->lock); { wb_inode->size = postbuf->ia_size; } UNLOCK(&wb_inode->lock); } int wb_writev_cbk(call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, struct iatt *prebuf, struct iatt *postbuf, dict_t *xdata) { wb_request_t *req = NULL; wb_inode_t *wb_inode; req = frame->local; frame->local = NULL; wb_inode = req->wb_inode; wb_request_unref(req); /* requests could be pending while this was in progress */ wb_process_queue(wb_inode); STACK_UNWIND_STRICT(writev, frame, op_ret, op_errno, prebuf, postbuf, xdata); return 0; } int wb_writev_helper(call_frame_t *frame, xlator_t *this, fd_t *fd, struct iovec *vector, int32_t count, off_t offset, uint32_t flags, struct iobref *iobref, dict_t *xdata) { STACK_WIND(frame, wb_writev_cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->writev, fd, vector, count, offset, flags, iobref, xdata); return 0; } int wb_writev(call_frame_t *frame, xlator_t *this, fd_t *fd, struct iovec *vector, int32_t count, off_t offset, uint32_t flags, struct iobref *iobref, dict_t *xdata) { wb_inode_t *wb_inode = NULL; wb_conf_t *conf = NULL; gf_boolean_t wb_disabled = 0; call_stub_t *stub = NULL; int ret = -1; int32_t op_errno = EINVAL; int o_direct = O_DIRECT; conf = this->private; wb_inode = wb_inode_create(this, fd->inode); if (!wb_inode) { op_errno = ENOMEM; goto unwind; } if (!conf->strict_O_DIRECT) o_direct = 0; if (fd->flags & (O_SYNC | O_DSYNC | o_direct)) wb_disabled = 1; if (flags & (O_SYNC | O_DSYNC | o_direct)) wb_disabled = 1; if (wb_disabled) stub = fop_writev_stub(frame, wb_writev_helper, fd, vector, count, offset, flags, iobref, xdata); else stub = fop_writev_stub(frame, NULL, fd, vector, count, offset, flags, iobref, xdata); if (!stub) { op_errno = ENOMEM; goto unwind; } if (wb_disabled) ret = wb_enqueue(wb_inode, stub); else ret = wb_enqueue_tempted(wb_inode, stub); if (!ret) { op_errno = ENOMEM; goto unwind; } wb_process_queue(wb_inode); return 0; unwind: STACK_UNWIND_STRICT(writev, frame, -1, op_errno, NULL, NULL, NULL); if (stub) call_stub_destroy(stub); return 0; } int wb_readv_helper(call_frame_t *frame, xlator_t *this, fd_t *fd, size_t size, off_t offset, uint32_t flags, dict_t *xdata) { STACK_WIND(frame, default_readv_cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->readv, fd, size, offset, flags, xdata); return 0; } int wb_readv(call_frame_t *frame, xlator_t *this, fd_t *fd, size_t size, off_t offset, uint32_t flags, dict_t *xdata) { wb_inode_t *wb_inode = NULL; call_stub_t *stub = NULL; wb_inode = wb_inode_ctx_get(this, fd->inode); if (!wb_inode) goto noqueue; stub = fop_readv_stub(frame, wb_readv_helper, fd, size, offset, flags, xdata); if (!stub) goto unwind; if (!wb_enqueue(wb_inode, stub)) goto unwind; wb_process_queue(wb_inode); return 0; unwind: STACK_UNWIND_STRICT(readv, frame, -1, ENOMEM, NULL, 0, NULL, NULL, NULL); return 0; noqueue: STACK_WIND(frame, default_readv_cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->readv, fd, size, offset, flags, xdata); return 0; } int wb_flush_bg_cbk(call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, dict_t *xdata) { STACK_DESTROY(frame->root); return 0; } int wb_flush_helper(call_frame_t *frame, xlator_t *this, fd_t *fd, dict_t *xdata) { wb_conf_t *conf = NULL; wb_inode_t *wb_inode = NULL; call_frame_t *bg_frame = NULL; int32_t op_errno = 0; int op_ret = 0; conf = this->private; wb_inode = wb_inode_ctx_get(this, fd->inode); if (!wb_inode) { op_ret = -1; op_errno = EINVAL; goto unwind; } if (conf->flush_behind) goto flushbehind; STACK_WIND(frame, default_flush_cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->flush, fd, xdata); return 0; flushbehind: bg_frame = copy_frame(frame); if (!bg_frame) { op_ret = -1; op_errno = ENOMEM; goto unwind; } STACK_WIND(bg_frame, wb_flush_bg_cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->flush, fd, xdata); /* fall through */ unwind: STACK_UNWIND_STRICT(flush, frame, op_ret, op_errno, NULL); return 0; } int wb_flush(call_frame_t *frame, xlator_t *this, fd_t *fd, dict_t *xdata) { wb_inode_t *wb_inode = NULL; call_stub_t *stub = NULL; wb_inode = wb_inode_ctx_get(this, fd->inode); if (!wb_inode) goto noqueue; stub = fop_flush_stub(frame, wb_flush_helper, fd, xdata); if (!stub) goto unwind; if (!wb_enqueue(wb_inode, stub)) goto unwind; wb_process_queue(wb_inode); return 0; unwind: STACK_UNWIND_STRICT(flush, frame, -1, ENOMEM, NULL); return 0; noqueue: STACK_WIND(frame, default_flush_cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->flush, fd, xdata); return 0; } int wb_fsync_helper(call_frame_t *frame, xlator_t *this, fd_t *fd, int32_t datasync, dict_t *xdata) { STACK_WIND(frame, default_fsync_cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->fsync, fd, datasync, xdata); return 0; } int wb_fsync(call_frame_t *frame, xlator_t *this, fd_t *fd, int32_t datasync, dict_t *xdata) { wb_inode_t *wb_inode = NULL; call_stub_t *stub = NULL; int32_t op_errno = EINVAL; wb_inode = wb_inode_ctx_get(this, fd->inode); if (!wb_inode) goto noqueue; stub = fop_fsync_stub(frame, wb_fsync_helper, fd, datasync, xdata); if (!stub) goto unwind; if (!wb_enqueue(wb_inode, stub)) goto unwind; wb_process_queue(wb_inode); return 0; unwind: STACK_UNWIND_STRICT(fsync, frame, -1, op_errno, NULL, NULL, NULL); return 0; noqueue: STACK_WIND(frame, default_fsync_cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->fsync, fd, datasync, xdata); return 0; } int wb_stat_helper(call_frame_t *frame, xlator_t *this, loc_t *loc, dict_t *xdata) { STACK_WIND(frame, default_stat_cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->stat, loc, xdata); return 0; } int wb_stat(call_frame_t *frame, xlator_t *this, loc_t *loc, dict_t *xdata) { wb_inode_t *wb_inode = NULL; call_stub_t *stub = NULL; wb_inode = wb_inode_ctx_get(this, loc->inode); if (!wb_inode) goto noqueue; stub = fop_stat_stub(frame, wb_stat_helper, loc, xdata); if (!stub) goto unwind; if (!wb_enqueue(wb_inode, stub)) goto unwind; wb_process_queue(wb_inode); return 0; unwind: STACK_UNWIND_STRICT(stat, frame, -1, ENOMEM, NULL, NULL); if (stub) call_stub_destroy(stub); return 0; noqueue: STACK_WIND(frame, default_stat_cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->stat, loc, xdata); return 0; } int wb_fstat_helper(call_frame_t *frame, xlator_t *this, fd_t *fd, dict_t *xdata) { STACK_WIND(frame, default_fstat_cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->fstat, fd, xdata); return 0; } int wb_fstat(call_frame_t *frame, xlator_t *this, fd_t *fd, dict_t *xdata) { wb_inode_t *wb_inode = NULL; call_stub_t *stub = NULL; wb_inode = wb_inode_ctx_get(this, fd->inode); if (!wb_inode) goto noqueue; stub = fop_fstat_stub(frame, wb_fstat_helper, fd, xdata); if (!stub) goto unwind; if (!wb_enqueue(wb_inode, stub)) goto unwind; wb_process_queue(wb_inode); return 0; unwind: STACK_UNWIND_STRICT(fstat, frame, -1, ENOMEM, NULL, NULL); if (stub) call_stub_destroy(stub); return 0; noqueue: STACK_WIND(frame, default_fstat_cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->fstat, fd, xdata); return 0; } int32_t wb_truncate_cbk(call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, struct iatt *prebuf, struct iatt *postbuf, dict_t *xdata) { GF_ASSERT(frame->local); if (op_ret == 0) wb_set_inode_size(frame->local, postbuf); frame->local = NULL; STACK_UNWIND_STRICT(truncate, frame, op_ret, op_errno, prebuf, postbuf, xdata); return 0; } int wb_truncate_helper(call_frame_t *frame, xlator_t *this, loc_t *loc, off_t offset, dict_t *xdata) { STACK_WIND(frame, wb_truncate_cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->truncate, loc, offset, xdata); return 0; } int wb_truncate(call_frame_t *frame, xlator_t *this, loc_t *loc, off_t offset, dict_t *xdata) { wb_inode_t *wb_inode = NULL; call_stub_t *stub = NULL; wb_inode = wb_inode_create(this, loc->inode); if (!wb_inode) goto unwind; frame->local = wb_inode; stub = fop_truncate_stub(frame, wb_truncate_helper, loc, offset, xdata); if (!stub) goto unwind; if (!wb_enqueue(wb_inode, stub)) goto unwind; wb_process_queue(wb_inode); return 0; unwind: STACK_UNWIND_STRICT(truncate, frame, -1, ENOMEM, NULL, NULL, NULL); if (stub) call_stub_destroy(stub); return 0; } int32_t wb_ftruncate_cbk(call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, struct iatt *prebuf, struct iatt *postbuf, dict_t *xdata) { GF_ASSERT(frame->local); if (op_ret == 0) wb_set_inode_size(frame->local, postbuf); frame->local = NULL; STACK_UNWIND_STRICT(ftruncate, frame, op_ret, op_errno, prebuf, postbuf, xdata); return 0; } int wb_ftruncate_helper(call_frame_t *frame, xlator_t *this, fd_t *fd, off_t offset, dict_t *xdata) { STACK_WIND(frame, wb_ftruncate_cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->ftruncate, fd, offset, xdata); return 0; } int wb_ftruncate(call_frame_t *frame, xlator_t *this, fd_t *fd, off_t offset, dict_t *xdata) { wb_inode_t *wb_inode = NULL; call_stub_t *stub = NULL; int32_t op_errno = 0; wb_inode = wb_inode_create(this, fd->inode); if (!wb_inode) { op_errno = ENOMEM; goto unwind; } frame->local = wb_inode; stub = fop_ftruncate_stub(frame, wb_ftruncate_helper, fd, offset, xdata); if (!stub) { op_errno = ENOMEM; goto unwind; } if (!wb_enqueue(wb_inode, stub)) { op_errno = ENOMEM; goto unwind; } wb_process_queue(wb_inode); return 0; unwind: frame->local = NULL; STACK_UNWIND_STRICT(ftruncate, frame, -1, op_errno, NULL, NULL, NULL); if (stub) call_stub_destroy(stub); return 0; } int wb_setattr_helper(call_frame_t *frame, xlator_t *this, loc_t *loc, struct iatt *stbuf, int32_t valid, dict_t *xdata) { STACK_WIND(frame, default_setattr_cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->setattr, loc, stbuf, valid, xdata); return 0; } int wb_setattr(call_frame_t *frame, xlator_t *this, loc_t *loc, struct iatt *stbuf, int32_t valid, dict_t *xdata) { wb_inode_t *wb_inode = NULL; call_stub_t *stub = NULL; wb_inode = wb_inode_ctx_get(this, loc->inode); if (!wb_inode) goto noqueue; stub = fop_setattr_stub(frame, wb_setattr_helper, loc, stbuf, valid, xdata); if (!stub) goto unwind; if (!wb_enqueue(wb_inode, stub)) goto unwind; wb_process_queue(wb_inode); return 0; unwind: STACK_UNWIND_STRICT(setattr, frame, -1, ENOMEM, NULL, NULL, NULL); if (stub) call_stub_destroy(stub); return 0; noqueue: STACK_WIND(frame, default_setattr_cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->setattr, loc, stbuf, valid, xdata); return 0; } int wb_fsetattr_helper(call_frame_t *frame, xlator_t *this, fd_t *fd, struct iatt *stbuf, int32_t valid, dict_t *xdata) { STACK_WIND(frame, default_fsetattr_cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->fsetattr, fd, stbuf, valid, xdata); return 0; } int wb_fsetattr(call_frame_t *frame, xlator_t *this, fd_t *fd, struct iatt *stbuf, int32_t valid, dict_t *xdata) { wb_inode_t *wb_inode = NULL; call_stub_t *stub = NULL; wb_inode = wb_inode_ctx_get(this, fd->inode); if (!wb_inode) goto noqueue; stub = fop_fsetattr_stub(frame, wb_fsetattr_helper, fd, stbuf, valid, xdata); if (!stub) goto unwind; if (!wb_enqueue(wb_inode, stub)) goto unwind; wb_process_queue(wb_inode); return 0; unwind: STACK_UNWIND_STRICT(fsetattr, frame, -1, ENOMEM, NULL, NULL, NULL); if (stub) call_stub_destroy(stub); return 0; noqueue: STACK_WIND(frame, default_fsetattr_cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->fsetattr, fd, stbuf, valid, xdata); return 0; } int32_t wb_create(call_frame_t *frame, xlator_t *this, loc_t *loc, int32_t flags, mode_t mode, mode_t umask, fd_t *fd, dict_t *xdata) { wb_inode_t *wb_inode = NULL; wb_inode = wb_inode_create(this, fd->inode); if (!wb_inode) goto unwind; if (((flags & O_RDWR) || (flags & O_WRONLY)) && (flags & O_TRUNC)) wb_inode->size = 0; STACK_WIND_TAIL(frame, FIRST_CHILD(this), FIRST_CHILD(this)->fops->create, loc, flags, mode, umask, fd, xdata); return 0; unwind: STACK_UNWIND_STRICT(create, frame, -1, ENOMEM, NULL, NULL, NULL, NULL, NULL, NULL); return 0; } int32_t wb_open(call_frame_t *frame, xlator_t *this, loc_t *loc, int32_t flags, fd_t *fd, dict_t *xdata) { wb_inode_t *wb_inode = NULL; wb_inode = wb_inode_create(this, fd->inode); if (!wb_inode) goto unwind; if (((flags & O_RDWR) || (flags & O_WRONLY)) && (flags & O_TRUNC)) wb_inode->size = 0; STACK_WIND_TAIL(frame, FIRST_CHILD(this), FIRST_CHILD(this)->fops->open, loc, flags, fd, xdata); return 0; unwind: STACK_UNWIND_STRICT(open, frame, -1, ENOMEM, NULL, NULL); return 0; } int32_t wb_lookup_cbk(call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, inode_t *inode, struct iatt *buf, dict_t *xdata, struct iatt *postparent) { if (op_ret == 0) { wb_inode_t *wb_inode = wb_inode_ctx_get(this, inode); if (wb_inode) wb_set_inode_size(wb_inode, buf); } STACK_UNWIND_STRICT(lookup, frame, op_ret, op_errno, inode, buf, xdata, postparent); return 0; } int wb_lookup_helper(call_frame_t *frame, xlator_t *this, loc_t *loc, dict_t *xdata) { STACK_WIND(frame, wb_lookup_cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->lookup, loc, xdata); return 0; } int32_t wb_lookup(call_frame_t *frame, xlator_t *this, loc_t *loc, dict_t *xdata) { wb_inode_t *wb_inode = NULL; call_stub_t *stub = NULL; wb_inode = wb_inode_ctx_get(this, loc->inode); if (!wb_inode) goto noqueue; stub = fop_lookup_stub(frame, wb_lookup_helper, loc, xdata); if (!stub) goto unwind; if (!wb_enqueue(wb_inode, stub)) goto unwind; wb_process_queue(wb_inode); return 0; unwind: if (stub) call_stub_destroy(stub); STACK_UNWIND_STRICT(lookup, frame, -1, ENOMEM, NULL, NULL, NULL, NULL); return 0; noqueue: STACK_WIND(frame, wb_lookup_cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->lookup, loc, xdata); return 0; } static void wb_mark_readdirp_start(xlator_t *this, inode_t *directory) { wb_inode_t *wb_directory_inode = NULL; wb_directory_inode = wb_inode_create(this, directory); if (!wb_directory_inode || !wb_directory_inode->lock.spinlock) return; LOCK(&wb_directory_inode->lock); { GF_ATOMIC_INC(wb_directory_inode->readdirps); } UNLOCK(&wb_directory_inode->lock); return; } static void wb_mark_readdirp_end(xlator_t *this, inode_t *directory) { wb_inode_t *wb_directory_inode = NULL, *wb_inode = NULL, *tmp = NULL; int readdirps = 0; wb_directory_inode = wb_inode_ctx_get(this, directory); if (!wb_directory_inode || !wb_directory_inode->lock.spinlock) return; LOCK(&wb_directory_inode->lock); { readdirps = GF_ATOMIC_DEC(wb_directory_inode->readdirps); if (readdirps) goto unlock; list_for_each_entry_safe(wb_inode, tmp, &wb_directory_inode->invalidate_list, invalidate_list) { list_del_init(&wb_inode->invalidate_list); GF_ATOMIC_SWAP(wb_inode->invalidate, 0); } } unlock: UNLOCK(&wb_directory_inode->lock); return; } int32_t wb_readdirp_cbk(call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, gf_dirent_t *entries, dict_t *xdata) { wb_inode_t *wb_inode = NULL; gf_dirent_t *entry = NULL; inode_t *inode = NULL; fd_t *fd = NULL; fd = frame->local; frame->local = NULL; if (op_ret <= 0) goto unwind; list_for_each_entry(entry, &entries->list, list) { if (!entry->inode || !IA_ISREG(entry->d_stat.ia_type)) continue; wb_inode = wb_inode_ctx_get(this, entry->inode); if (!wb_inode) continue; LOCK(&wb_inode->lock); { if (!list_empty(&wb_inode->liability) || GF_ATOMIC_GET(wb_inode->invalidate)) { inode = entry->inode; entry->inode = NULL; memset(&entry->d_stat, 0, sizeof(entry->d_stat)); inode_unref(inode); } } UNLOCK(&wb_inode->lock); } wb_mark_readdirp_end(this, fd->inode); unwind: frame->local = NULL; STACK_UNWIND_STRICT(readdirp, frame, op_ret, op_errno, entries, xdata); return 0; } int32_t wb_readdirp(call_frame_t *frame, xlator_t *this, fd_t *fd, size_t size, off_t off, dict_t *xdata) { wb_mark_readdirp_start(this, fd->inode); frame->local = fd; STACK_WIND(frame, wb_readdirp_cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->readdirp, fd, size, off, xdata); return 0; } int32_t wb_link_helper(call_frame_t *frame, xlator_t *this, loc_t *oldloc, loc_t *newloc, dict_t *xdata) { STACK_WIND_TAIL(frame, FIRST_CHILD(this), FIRST_CHILD(this)->fops->link, oldloc, newloc, xdata); return 0; } int32_t wb_link(call_frame_t *frame, xlator_t *this, loc_t *oldloc, loc_t *newloc, dict_t *xdata) { wb_inode_t *wb_inode = NULL; call_stub_t *stub = NULL; wb_inode = wb_inode_ctx_get(this, oldloc->inode); if (!wb_inode) goto noqueue; stub = fop_link_stub(frame, wb_link_helper, oldloc, newloc, xdata); if (!stub) goto unwind; if (!wb_enqueue(wb_inode, stub)) goto unwind; wb_process_queue(wb_inode); return 0; unwind: STACK_UNWIND_STRICT(link, frame, -1, ENOMEM, NULL, NULL, NULL, NULL, NULL); if (stub) call_stub_destroy(stub); return 0; noqueue: STACK_WIND_TAIL(frame, FIRST_CHILD(this), FIRST_CHILD(this)->fops->link, oldloc, newloc, xdata); return 0; } int32_t wb_fallocate_helper(call_frame_t *frame, xlator_t *this, fd_t *fd, int32_t keep_size, off_t offset, size_t len, dict_t *xdata) { STACK_WIND_TAIL(frame, FIRST_CHILD(this), FIRST_CHILD(this)->fops->fallocate, fd, keep_size, offset, len, xdata); return 0; } int32_t wb_fallocate(call_frame_t *frame, xlator_t *this, fd_t *fd, int32_t keep_size, off_t offset, size_t len, dict_t *xdata) { wb_inode_t *wb_inode = NULL; call_stub_t *stub = NULL; wb_inode = wb_inode_ctx_get(this, fd->inode); if (!wb_inode) goto noqueue; stub = fop_fallocate_stub(frame, wb_fallocate_helper, fd, keep_size, offset, len, xdata); if (!stub) goto unwind; if (!wb_enqueue(wb_inode, stub)) goto unwind; wb_process_queue(wb_inode); return 0; unwind: STACK_UNWIND_STRICT(fallocate, frame, -1, ENOMEM, NULL, NULL, NULL); if (stub) call_stub_destroy(stub); return 0; noqueue: STACK_WIND_TAIL(frame, FIRST_CHILD(this), FIRST_CHILD(this)->fops->fallocate, fd, keep_size, offset, len, xdata); return 0; } int32_t wb_discard_helper(call_frame_t *frame, xlator_t *this, fd_t *fd, off_t offset, size_t len, dict_t *xdata) { STACK_WIND_TAIL(frame, FIRST_CHILD(this), FIRST_CHILD(this)->fops->discard, fd, offset, len, xdata); return 0; } int32_t wb_discard(call_frame_t *frame, xlator_t *this, fd_t *fd, off_t offset, size_t len, dict_t *xdata) { wb_inode_t *wb_inode = NULL; call_stub_t *stub = NULL; wb_inode = wb_inode_ctx_get(this, fd->inode); if (!wb_inode) goto noqueue; stub = fop_discard_stub(frame, wb_discard_helper, fd, offset, len, xdata); if (!stub) goto unwind; if (!wb_enqueue(wb_inode, stub)) goto unwind; wb_process_queue(wb_inode); return 0; unwind: STACK_UNWIND_STRICT(discard, frame, -1, ENOMEM, NULL, NULL, NULL); if (stub) call_stub_destroy(stub); return 0; noqueue: STACK_WIND_TAIL(frame, FIRST_CHILD(this), FIRST_CHILD(this)->fops->discard, fd, offset, len, xdata); return 0; } int32_t wb_zerofill_helper(call_frame_t *frame, xlator_t *this, fd_t *fd, off_t offset, off_t len, dict_t *xdata) { STACK_WIND_TAIL(frame, FIRST_CHILD(this), FIRST_CHILD(this)->fops->zerofill, fd, offset, len, xdata); return 0; } int32_t wb_zerofill(call_frame_t *frame, xlator_t *this, fd_t *fd, off_t offset, off_t len, dict_t *xdata) { wb_inode_t *wb_inode = NULL; call_stub_t *stub = NULL; wb_inode = wb_inode_ctx_get(this, fd->inode); if (!wb_inode) goto noqueue; stub = fop_zerofill_stub(frame, wb_zerofill_helper, fd, offset, len, xdata); if (!stub) goto unwind; if (!wb_enqueue(wb_inode, stub)) goto unwind; wb_process_queue(wb_inode); return 0; unwind: STACK_UNWIND_STRICT(zerofill, frame, -1, ENOMEM, NULL, NULL, NULL); if (stub) call_stub_destroy(stub); noqueue: STACK_WIND_TAIL(frame, FIRST_CHILD(this), FIRST_CHILD(this)->fops->zerofill, fd, offset, len, xdata); return 0; } int32_t wb_rename(call_frame_t *frame, xlator_t *this, loc_t *oldloc, loc_t *newloc, dict_t *xdata) { wb_inode_t *wb_inode = NULL; call_stub_t *stub = NULL; wb_inode = wb_inode_ctx_get(this, oldloc->inode); if (!wb_inode) goto noqueue; stub = fop_rename_stub(frame, default_rename_resume, oldloc, newloc, xdata); if (!stub) goto unwind; if (!wb_enqueue(wb_inode, stub)) goto unwind; wb_process_queue(wb_inode); return 0; unwind: if (stub) call_stub_destroy(stub); STACK_UNWIND_STRICT(rename, frame, -1, ENOMEM, NULL, NULL, NULL, NULL, NULL, NULL); return 0; noqueue: STACK_WIND_TAIL(frame, FIRST_CHILD(this), FIRST_CHILD(this)->fops->rename, oldloc, newloc, xdata); return 0; } int wb_forget(xlator_t *this, inode_t *inode) { uint64_t tmp = 0; wb_inode_t *wb_inode = NULL; inode_ctx_del(inode, this, &tmp); wb_inode = (wb_inode_t *)(long)tmp; if (!wb_inode) return 0; GF_ASSERT(list_empty(&wb_inode->todo)); GF_ASSERT(list_empty(&wb_inode->liability)); GF_ASSERT(list_empty(&wb_inode->temptation)); GF_FREE(wb_inode); return 0; } int wb_release(xlator_t *this, fd_t *fd) { uint64_t tmp = 0; (void)fd_ctx_del(fd, this, &tmp); return 0; } int wb_priv_dump(xlator_t *this) { wb_conf_t *conf = NULL; char key_prefix[GF_DUMP_MAX_BUF_LEN] = { 0, }; int ret = -1; GF_VALIDATE_OR_GOTO("write-behind", this, out); conf = this->private; GF_VALIDATE_OR_GOTO(this->name, conf, out); gf_proc_dump_build_key(key_prefix, "xlator.performance.write-behind", "priv"); gf_proc_dump_add_section("%s", key_prefix); gf_proc_dump_write("aggregate_size", "%" PRIu64, conf->aggregate_size); gf_proc_dump_write("window_size", "%" PRIu64, conf->window_size); gf_proc_dump_write("flush_behind", "%d", conf->flush_behind); gf_proc_dump_write("trickling_writes", "%d", conf->trickling_writes); ret = 0; out: return ret; } void __wb_dump_requests(struct list_head *head, char *prefix) { char key[GF_DUMP_MAX_BUF_LEN] = { 0, }; char key_prefix[GF_DUMP_MAX_BUF_LEN] = { 0, }, flag = 0; wb_request_t *req = NULL; list_for_each_entry(req, head, all) { gf_proc_dump_build_key(key_prefix, key, "%s", (char *)gf_fop_list[req->fop]); gf_proc_dump_add_section("%s", key_prefix); gf_proc_dump_write("unique", "%" PRIu64, req->unique); gf_proc_dump_write("refcount", "%d", req->refcount); if (list_empty(&req->todo)) gf_proc_dump_write("wound", "yes"); else gf_proc_dump_write("wound", "no"); gf_proc_dump_write("generation-number", "%" PRIu64, req->gen); gf_proc_dump_write("req->op_ret", "%d", req->op_ret); gf_proc_dump_write("req->op_errno", "%d", req->op_errno); gf_proc_dump_write("sync-attempts", "%d", req->wind_count); if (req->fop == GF_FOP_WRITE) { if (list_empty(&req->wip)) gf_proc_dump_write("sync-in-progress", "no"); else gf_proc_dump_write("sync-in-progress", "yes"); gf_proc_dump_write("size", "%" GF_PRI_SIZET, req->write_size); if (req->stub) gf_proc_dump_write("offset", "%" PRId64, req->stub->args.offset); flag = req->ordering.lied; gf_proc_dump_write("lied", "%d", flag); flag = req->ordering.append; gf_proc_dump_write("append", "%d", flag); flag = req->ordering.fulfilled; gf_proc_dump_write("fulfilled", "%d", flag); flag = req->ordering.go; gf_proc_dump_write("go", "%d", flag); } } } int wb_inode_dump(xlator_t *this, inode_t *inode) { wb_inode_t *wb_inode = NULL; int32_t ret = -1; char *path = NULL; char key_prefix[GF_DUMP_MAX_BUF_LEN] = { 0, }; char uuid_str[64] = { 0, }; if ((inode == NULL) || (this == NULL)) { ret = 0; goto out; } wb_inode = wb_inode_ctx_get(this, inode); if (wb_inode == NULL) { ret = 0; goto out; } uuid_utoa_r(inode->gfid, uuid_str); gf_proc_dump_build_key(key_prefix, "xlator.performance.write-behind", "wb_inode"); gf_proc_dump_add_section("%s", key_prefix); __inode_path(inode, NULL, &path); if (path != NULL) { gf_proc_dump_write("path", "%s", path); GF_FREE(path); } gf_proc_dump_write("inode", "%p", inode); gf_proc_dump_write("gfid", "%s", uuid_str); gf_proc_dump_write("window_conf", "%" GF_PRI_SIZET, wb_inode->window_conf); gf_proc_dump_write("window_current", "%" GF_PRI_SIZET, wb_inode->window_current); gf_proc_dump_write("transit-size", "%" GF_PRI_SIZET, wb_inode->transit); gf_proc_dump_write("dontsync", "%d", wb_inode->dontsync); ret = TRY_LOCK(&wb_inode->lock); if (!ret) { if (!list_empty(&wb_inode->all)) { __wb_dump_requests(&wb_inode->all, key_prefix); } UNLOCK(&wb_inode->lock); } if (ret && wb_inode) gf_proc_dump_write("Unable to dump the inode information", "(Lock acquisition failed) %p (gfid: %s)", wb_inode, uuid_str); ret = 0; out: return ret; } int mem_acct_init(xlator_t *this) { int ret = -1; if (!this) { goto out; } ret = xlator_mem_acct_init(this, gf_wb_mt_end + 1); if (ret != 0) { gf_msg(this->name, GF_LOG_ERROR, ENOMEM, WRITE_BEHIND_MSG_NO_MEMORY, "Memory accounting init" "failed"); } out: return ret; } int reconfigure(xlator_t *this, dict_t *options) { wb_conf_t *conf = NULL; int ret = -1; conf = this->private; GF_OPTION_RECONF("cache-size", conf->window_size, options, size_uint64, out); GF_OPTION_RECONF("flush-behind", conf->flush_behind, options, bool, out); GF_OPTION_RECONF("trickling-writes", conf->trickling_writes, options, bool, out); GF_OPTION_RECONF("strict-O_DIRECT", conf->strict_O_DIRECT, options, bool, out); GF_OPTION_RECONF("strict-write-ordering", conf->strict_write_ordering, options, bool, out); GF_OPTION_RECONF("resync-failed-syncs-after-fsync", conf->resync_after_fsync, options, bool, out); ret = 0; out: return ret; } int32_t init(xlator_t *this) { wb_conf_t *conf = NULL; int32_t ret = -1; if ((this->children == NULL) || this->children->next) { gf_msg(this->name, GF_LOG_ERROR, 0, WRITE_BEHIND_MSG_INIT_FAILED, "FATAL: write-behind (%s) not configured with exactly " "one child", this->name); goto out; } if (this->parents == NULL) { gf_msg(this->name, GF_LOG_WARNING, 0, WRITE_BEHIND_MSG_VOL_MISCONFIGURED, "dangling volume. check volfilex"); } conf = GF_CALLOC(1, sizeof(*conf), gf_wb_mt_wb_conf_t); if (conf == NULL) { goto out; } /* configure 'options aggregate-size ' */ GF_OPTION_INIT("aggregate-size", conf->aggregate_size, size_uint64, out); conf->page_size = conf->aggregate_size; /* configure 'option window-size ' */ GF_OPTION_INIT("cache-size", conf->window_size, size_uint64, out); if (!conf->window_size && conf->aggregate_size) { gf_msg(this->name, GF_LOG_WARNING, 0, WRITE_BEHIND_MSG_SIZE_NOT_SET, "setting window-size to be equal to " "aggregate-size(%" PRIu64 ")", conf->aggregate_size); conf->window_size = conf->aggregate_size; } if (conf->window_size < conf->aggregate_size) { gf_msg(this->name, GF_LOG_ERROR, 0, WRITE_BEHIND_MSG_EXCEEDED_MAX_SIZE, "aggregate-size(%" PRIu64 ") cannot be more than " "window-size(%" PRIu64 ")", conf->aggregate_size, conf->window_size); goto out; } /* configure 'option flush-behind ' */ GF_OPTION_INIT("flush-behind", conf->flush_behind, bool, out); GF_OPTION_INIT("trickling-writes", conf->trickling_writes, bool, out); GF_OPTION_INIT("strict-O_DIRECT", conf->strict_O_DIRECT, bool, out); GF_OPTION_INIT("strict-write-ordering", conf->strict_write_ordering, bool, out); GF_OPTION_INIT("resync-failed-syncs-after-fsync", conf->resync_after_fsync, bool, out); this->private = conf; ret = 0; out: if (ret) { GF_FREE(conf); } return ret; } void fini(xlator_t *this) { wb_conf_t *conf = NULL; GF_VALIDATE_OR_GOTO("write-behind", this, out); conf = this->private; if (!conf) { goto out; } this->private = NULL; GF_FREE(conf); out: return; } struct xlator_fops fops = { .writev = wb_writev, .readv = wb_readv, .flush = wb_flush, .fsync = wb_fsync, .stat = wb_stat, .fstat = wb_fstat, .truncate = wb_truncate, .ftruncate = wb_ftruncate, .setattr = wb_setattr, .fsetattr = wb_fsetattr, .lookup = wb_lookup, .readdirp = wb_readdirp, .link = wb_link, .fallocate = wb_fallocate, .discard = wb_discard, .zerofill = wb_zerofill, .rename = wb_rename, }; struct xlator_cbks cbks = {.forget = wb_forget, .release = wb_release}; struct xlator_dumpops dumpops = { .priv = wb_priv_dump, .inodectx = wb_inode_dump, }; struct volume_options options[] = { { .key = {"write-behind"}, .type = GF_OPTION_TYPE_BOOL, .default_value = "off", .description = "enable/disable write-behind", .op_version = {GD_OP_VERSION_6_0}, .flags = OPT_FLAG_SETTABLE, }, {.key = {"flush-behind"}, .type = GF_OPTION_TYPE_BOOL, .default_value = "on", .op_version = {1}, .flags = OPT_FLAG_SETTABLE | OPT_FLAG_DOC | OPT_FLAG_CLIENT_OPT, .tags = {"write-behind"}, .description = "If this option is set ON, instructs write-behind " "translator to perform flush in background, by " "returning success (or any errors, if any of " "previous writes were failed) to application even " "before flush FOP is sent to backend filesystem. "}, {.key = {"cache-size", "window-size"}, .type = GF_OPTION_TYPE_SIZET, .min = 512 * GF_UNIT_KB, .max = 1 * GF_UNIT_GB, .default_value = "1MB", .op_version = {1}, .flags = OPT_FLAG_SETTABLE | OPT_FLAG_DOC | OPT_FLAG_CLIENT_OPT, .tags = {"write-behind"}, .description = "Size of the write-behind buffer for a single file " "(inode)."}, { .key = {"trickling-writes"}, .type = GF_OPTION_TYPE_BOOL, .op_version = {GD_OP_VERSION_3_13_1}, .flags = OPT_FLAG_SETTABLE | OPT_FLAG_DOC | OPT_FLAG_CLIENT_OPT, .tags = {"write-behind"}, .default_value = "on", }, {.key = {"strict-O_DIRECT"}, .type = GF_OPTION_TYPE_BOOL, .default_value = "off", .op_version = {2}, .flags = OPT_FLAG_SETTABLE | OPT_FLAG_DOC | OPT_FLAG_CLIENT_OPT, .tags = {"write-behind"}, .description = "This option when set to off, ignores the " "O_DIRECT flag."}, { .key = {"strict-write-ordering"}, .type = GF_OPTION_TYPE_BOOL, .default_value = "off", .op_version = {2}, .flags = OPT_FLAG_SETTABLE | OPT_FLAG_DOC | OPT_FLAG_CLIENT_OPT, .tags = {"write-behind"}, .description = "Do not let later writes overtake earlier writes even " "if they do not overlap", }, { .key = {"resync-failed-syncs-after-fsync"}, .type = GF_OPTION_TYPE_BOOL, .default_value = "off", .op_version = {GD_OP_VERSION_3_7_7}, .flags = OPT_FLAG_SETTABLE | OPT_FLAG_DOC | OPT_FLAG_CLIENT_OPT, .tags = {"write-behind"}, .description = "If sync of \"cached-writes issued before fsync\" " "(to backend) fails, this option configures whether " "to retry syncing them after fsync or forget them. " "If set to on, cached-writes are retried " "till a \"flush\" fop (or a successful sync) on sync " "failures. " "fsync itself is failed irrespective of the value of " "this option. ", }, { .key = {"aggregate-size"}, .type = GF_OPTION_TYPE_SIZET, .default_value = "128KB", .op_version = {GD_OP_VERSION_4_1_0}, .flags = OPT_FLAG_SETTABLE | OPT_FLAG_DOC | OPT_FLAG_CLIENT_OPT, .description = "Will aggregate writes until data of specified " "size is fully filled for a single file provided " "there are no dependent fops on cached writes. This " "option just sets the aggregate size. Note that " "aggregation won't happen if " "performance.write-behind-trickling-writes" " is turned on. Hence turn off " "performance.write-behind.trickling-writes" " so that writes are aggregated till a max of " "\"aggregate-size\" bytes", }, {.key = {NULL}}, }; xlator_api_t xlator_api = { .init = init, .fini = fini, .reconfigure = reconfigure, .mem_acct_init = mem_acct_init, .op_version = {1}, /* Present from the initial version */ .dumpops = &dumpops, .fops = &fops, .cbks = &cbks, .options = options, .identifier = "write-behind", .category = GF_MAINTAINED, };