diff options
| author | Raghavendra G <raghavendra@zresearch.com> | 2009-03-12 04:41:23 -0700 | 
|---|---|---|
| committer | Anand V. Avati <avati@amp.gluster.com> | 2009-03-13 20:36:08 +0530 | 
| commit | 7d61f9d69309ccb0f9aa787caacfef77bc4e32d2 (patch) | |
| tree | 39c6c60d93af32187d3141513a6908e70eece013 | |
| parent | 473d02d1698259b4a0a6c22fdf70071e69c6e987 (diff) | |
write behind preserves order of fops with respect to writes
- the execution order of fops like read, stat, fsync, truncate etc whose results
 are affected by writes, are preserved.
Signed-off-by: Anand V. Avati <avati@amp.gluster.com>
| -rw-r--r-- | xlators/performance/write-behind/src/write-behind.c | 1351 | 
1 files changed, 947 insertions, 404 deletions
diff --git a/xlators/performance/write-behind/src/write-behind.c b/xlators/performance/write-behind/src/write-behind.c index 86752cc94..c41bf3d58 100644 --- a/xlators/performance/write-behind/src/write-behind.c +++ b/xlators/performance/write-behind/src/write-behind.c @@ -33,6 +33,7 @@  #include "compat.h"  #include "compat-errno.h"  #include "common-utils.h" +#include "call-stub.h"  #define MAX_VECTOR_COUNT 8 @@ -42,75 +43,207 @@ struct wb_page;  struct wb_file; +typedef struct wb_file { +        int          disabled; +        uint64_t     disable_till; +        size_t       window_size; +        int32_t      refcount; +        int32_t      op_ret; +        int32_t      op_errno; +        list_head_t  request; +        fd_t        *fd; +        gf_lock_t    lock; +        xlator_t    *this; +}wb_file_t; + + +typedef struct wb_request { +        list_head_t  list; +        list_head_t  winds; +        list_head_t  unwinds; +        list_head_t  other_requests; +        call_stub_t *stub; +        int32_t      refcount; +        wb_file_t   *file; +        union { +                struct  { +                        char write_behind; +                        char stack_wound; +                        char got_reply; +                }write_request; +                 +                struct { +                        char marked_for_resume; +                }other_requests; +        }flags; +} wb_request_t; + +  struct wb_conf { -        uint64_t aggregate_size; -        uint64_t window_size; -        uint64_t disable_till; +        uint64_t     aggregate_size; +        uint64_t     window_size; +        uint64_t     disable_till;          gf_boolean_t enable_O_SYNC;          gf_boolean_t flush_behind;  };  typedef struct wb_local { -        list_head_t winds; +        list_head_t     winds;          struct wb_file *file; -        list_head_t unwind_frames; -        int op_ret; -        int op_errno; -        call_frame_t *frame; +        wb_request_t   *request; +        int             op_ret; +        int             op_errno; +        call_frame_t   *frame; +        int32_t         reply_count;  } wb_local_t; -typedef struct write_request { -        call_frame_t *frame; -        off_t offset; -        /*  int32_t op_ret; -            int32_t op_errno; */ -        struct iovec *vector; -        int32_t count; -        dict_t *refs; -        char write_behind; -        char stack_wound; -        char got_reply; -        list_head_t list; -        list_head_t winds; -        /*  list_head_t unwinds;*/ -} wb_write_request_t; - - -struct wb_file { -        int disabled; -        uint64_t disable_till; -        off_t offset; -        size_t window_size; -        int32_t refcount; -        int32_t op_ret; -        int32_t op_errno; -        list_head_t request; -        fd_t *fd; -        gf_lock_t lock; -        xlator_t *this; -}; - -  typedef struct wb_conf wb_conf_t;  typedef struct wb_page wb_page_t; -typedef struct wb_file wb_file_t;  int32_t   wb_process_queue (call_frame_t *frame, wb_file_t *file, char flush_all); -int32_t +size_t  wb_sync (call_frame_t *frame, wb_file_t *file, list_head_t *winds); -int32_t -wb_sync_all (call_frame_t *frame, wb_file_t *file); - -int32_t  +size_t  __wb_mark_winds (list_head_t *list, list_head_t *winds, size_t aggregate_size); +static void +__wb_request_unref (wb_request_t *this) +{ +        if (this->refcount <= 0) { +                gf_log ("wb-request", GF_LOG_ERROR, +                        "refcount(%d) is <= 0", this->refcount); +                return; +        } + +        this->refcount--; +        if (this->refcount == 0) { +                list_del_init (&this->list); +                if (this->stub && this->stub->fop == GF_FOP_WRITE) { +                        call_stub_destroy (this->stub); +                } + +                FREE (this); +        } +} + + +static void +wb_request_unref (wb_request_t *this) +{ +        wb_file_t *file = NULL; +        if (this == NULL) { +                gf_log ("wb-request", GF_LOG_DEBUG, +                        "request is NULL"); +                return; +        } +         +        file = this->file; +        LOCK (&file->lock); +        { +                __wb_request_unref (this); +        } +        UNLOCK (&file->lock); +} + + +static wb_request_t * +__wb_request_ref (wb_request_t *this) +{ +        if (this->refcount < 0) { +                gf_log ("wb-request", GF_LOG_DEBUG, +                        "refcount(%d) is < 0", this->refcount); +                return NULL; +        } + +        this->refcount++; +        return this; +} + + +wb_request_t * +wb_request_ref (wb_request_t *this) +{ +        wb_file_t *file = NULL; +        if (this == NULL) { +                gf_log ("wb-request", GF_LOG_DEBUG, +                        "request is NULL"); +                return NULL; +        } + +        file = this->file; +        LOCK (&file->lock); +        { +                this = __wb_request_ref (this); +        } +        UNLOCK (&file->lock); + +        return this; +} + + +wb_request_t * +wb_enqueue (wb_file_t *file,  +            call_stub_t *stub) +{ +        wb_request_t *request = NULL; +        call_frame_t *frame = NULL; +        wb_local_t   *local = NULL; +        struct iovec *vector = NULL; +        int32_t       count = 0; +                         +        request = CALLOC (1, sizeof (*request)); + +        INIT_LIST_HEAD (&request->list); +        INIT_LIST_HEAD (&request->winds); +        INIT_LIST_HEAD (&request->unwinds); +        INIT_LIST_HEAD (&request->other_requests); + +        request->stub = stub; +        request->file = file; + +        frame = stub->frame; +        local = frame->local; +        if (local) { +                local->request = request; +        } + +        if (stub->fop == GF_FOP_WRITE) { +                vector = stub->args.writev.vector; +                count = stub->args.writev.count; + +                frame = stub->frame; +                local = frame->local; +                local->op_ret = iov_length (vector, count); +                local->op_errno = 0; +        } + +        LOCK (&file->lock); +        { +                list_add_tail (&request->list, &file->request); +                if (stub->fop == GF_FOP_WRITE) { +                        /* reference for stack winding */ +                        __wb_request_ref (request); + +                        /* reference for stack unwinding */ +                        __wb_request_ref (request); +                } else { +                        /*reference for resuming */ +                        __wb_request_ref (request); +                } +        } +        UNLOCK (&file->lock); + +        return request; +} + +  wb_file_t *  wb_file_create (xlator_t *this,                  fd_t *fd) @@ -161,10 +294,12 @@ wb_sync_cbk (call_frame_t *frame,               int32_t op_errno,               struct stat *stbuf)  { -        wb_local_t *local = NULL; -        list_head_t *winds = NULL; -        wb_file_t *file = NULL; -        wb_write_request_t *request = NULL, *dummy = NULL; +        wb_local_t   *local = NULL; +        list_head_t  *winds = NULL; +        wb_file_t    *file = NULL; +        wb_request_t *request = NULL, *dummy = NULL; +        wb_local_t   *per_request_local = NULL; +          local = frame->local;          winds = &local->winds; @@ -173,27 +308,25 @@ wb_sync_cbk (call_frame_t *frame,          LOCK (&file->lock);          {                  list_for_each_entry_safe (request, dummy, winds, winds) { -                        request->got_reply = 1; -                        if (!request->write_behind && (op_ret == -1)) { -                                wb_local_t *per_request_local = request->frame->local; +                        request->flags.write_request.got_reply = 1; + +                        if (!request->flags.write_request.write_behind +                            && (op_ret == -1)) { +                                per_request_local = request->stub->frame->local;                                  per_request_local->op_ret = op_ret;                                  per_request_local->op_errno = op_errno;                          } -                        /* -                          request->op_ret = op_ret; -                          request->op_errno = op_errno;  -                        */ +                        __wb_request_unref (request); +                } +                 +                if (op_ret == -1) { +                        file->op_ret = op_ret; +                        file->op_errno = op_errno;                  }          }          UNLOCK (&file->lock); -        if (op_ret == -1) -        { -                file->op_ret = op_ret; -                file->op_errno = op_errno; -        } -          wb_process_queue (frame, file, 0);            /* safe place to do fd_unref */ @@ -204,43 +337,25 @@ wb_sync_cbk (call_frame_t *frame,          return 0;  } -int32_t -wb_sync_all (call_frame_t *frame, wb_file_t *file)  -{ -        list_head_t winds; -        int32_t bytes = 0; - -        INIT_LIST_HEAD (&winds); - -        LOCK (&file->lock); -        { -                bytes = __wb_mark_winds (&file->request, &winds, 0); -        } -        UNLOCK (&file->lock); - -        wb_sync (frame, file, &winds); - -        return bytes; -} - -int32_t +size_t  wb_sync (call_frame_t *frame, wb_file_t *file, list_head_t *winds)  { -        wb_write_request_t *dummy = NULL, *request = NULL, *first_request = NULL, *next = NULL; -        size_t total_count = 0, count = 0; -        size_t copied = 0; -        call_frame_t *sync_frame = NULL; -        dict_t *refs = NULL; -        wb_local_t *local = NULL; -        struct iovec *vector = NULL; -        int32_t bytes = 0; -        size_t bytecount = 0; - -        list_for_each_entry (request, winds, winds) -        { -                total_count += request->count; -                bytes += iov_length (request->vector, request->count); +        wb_request_t   *dummy = NULL, *request = NULL; +        wb_request_t   *first_request = NULL, *next = NULL; +        size_t          total_count = 0, count = 0; +        size_t          copied = 0; +        call_frame_t   *sync_frame = NULL; +        dict_t         *refs = NULL; +        wb_local_t     *local = NULL; +        struct iovec   *vector = NULL; +        size_t          bytes = 0; +        size_t          bytecount = 0; + +        list_for_each_entry (request, winds, winds) { +                total_count += request->stub->args.writev.count; +                bytes += iov_length (request->stub->args.writev.vector, +                                     request->stub->args.writev.count);          }          if (!total_count) { @@ -258,26 +373,29 @@ wb_sync (call_frame_t *frame, wb_file_t *file, list_head_t *winds)                          first_request = request;                  } -                count += request->count; -                bytecount = VECTORSIZE (request->count); +                count += request->stub->args.writev.count; +                bytecount = VECTORSIZE (request->stub->args.writev.count);                  memcpy (((char *)vector)+copied, -                        request->vector, +                        request->stub->args.writev.vector,                          bytecount);                  copied += bytecount; -                if (request->refs) { -                        dict_copy (request->refs, refs); +                if (request->stub->args.writev.req_refs) { +                        dict_copy (request->stub->args.writev.req_refs, refs);                  }                  next = NULL;                  if (request->winds.next != winds) {     -                        next = list_entry (request->winds.next, struct write_request, winds); +                        next = list_entry (request->winds.next, +                                           wb_request_t, winds);                  }                  list_del_init (&request->winds);                  list_add_tail (&request->winds, &local->winds); -                if (!next || ((count + next->count) > MAX_VECTOR_COUNT)) { +                if (!next +                    || ((count + next->stub->args.writev.count) > MAX_VECTOR_COUNT)) +                {                          sync_frame = copy_frame (frame);                            sync_frame->local = local;                          local->file = file; @@ -288,7 +406,8 @@ wb_sync (call_frame_t *frame, wb_file_t *file, list_head_t *winds)                                      FIRST_CHILD(sync_frame->this),                                      FIRST_CHILD(sync_frame->this)->fops->writev,                                      file->fd, vector, -                                    count, first_request->offset); +                                    count, +                                    first_request->stub->args.writev.off);                          dict_unref (refs);                          FREE (vector); @@ -311,15 +430,44 @@ wb_stat_cbk (call_frame_t *frame,               int32_t op_errno,               struct stat *buf)  { -        wb_local_t *local = NULL; +        wb_local_t   *local = NULL; +        wb_request_t *request = NULL;  +        call_frame_t *process_frame = NULL; +        wb_file_t    *file = NULL;          local = frame->local; -   -        if (local->file) -                fd_unref (local->file->fd); +        file = local->file; + +        request = local->request; +        if (request) { +                process_frame = copy_frame (frame); +        }          STACK_UNWIND (frame, op_ret, op_errno, buf); +        if (request) { +                wb_request_unref (request); +                wb_process_queue (process_frame, file, 0); +                STACK_DESTROY (process_frame->root); +        } + +        if (file) { +                fd_unref (file->fd); +        } + +        return 0; +} + + +static int32_t +wb_stat_helper (call_frame_t *frame, +                xlator_t *this, +                loc_t *loc) +{ +        STACK_WIND (frame, wb_stat_cbk, +                    FIRST_CHILD(this), +                    FIRST_CHILD(this)->fops->stat, +                    loc);          return 0;  } @@ -329,13 +477,14 @@ wb_stat (call_frame_t *frame,           xlator_t *this,           loc_t *loc)  { -        wb_file_t *file = NULL; -        fd_t *iter_fd = NULL; -        wb_local_t *local = NULL; -	uint64_t tmp_file = 0; +        wb_file_t   *file = NULL; +        fd_t        *iter_fd = NULL; +        wb_local_t  *local = NULL; +	uint64_t     tmp_file = 0; +        call_stub_t *stub = NULL; -        if (loc->inode) -        { +        if (loc->inode) { +                /* FIXME: fd_lookup extends life of fd till stat returns */                  iter_fd = fd_lookup (loc->inode, frame->root->pid);                  if (iter_fd) {                          if (!fd_ctx_get (iter_fd, this, &tmp_file)) { @@ -344,9 +493,6 @@ wb_stat (call_frame_t *frame,                                  fd_unref (iter_fd);                          }                  } -                if (file) { -                        wb_sync_all (frame, file); -                }          }          local = CALLOC (1, sizeof (*local)); @@ -354,10 +500,64 @@ wb_stat (call_frame_t *frame,          frame->local = local; -        STACK_WIND (frame, wb_stat_cbk, +        if (file) { +                stub = fop_stat_stub (frame, wb_stat_helper, loc); +                if (stub == NULL) { +                        STACK_UNWIND (frame, -1, ENOMEM, NULL); +                        return 0; +                } +                 +                wb_enqueue (file, stub); + +                wb_process_queue (frame, file, 1); +        } else { +                STACK_WIND (frame, wb_stat_cbk, +                            FIRST_CHILD(this), +                            FIRST_CHILD(this)->fops->stat, +                            loc); +        } + +        return 0; +} + + +int32_t  +wb_fstat_cbk (call_frame_t *frame, +              void *cookie, +              xlator_t *this, +              int32_t op_ret, +              int32_t op_errno, +              struct stat *buf) +{ +        wb_local_t   *local = NULL; +        wb_request_t *request = NULL;  +        wb_file_t    *file = NULL; +   +        local = frame->local; +        file = local->file; + +        request = local->request; +        if (request) { +                wb_request_unref (request); +                wb_process_queue (frame, file, 0); +        } + +        STACK_UNWIND (frame, op_ret, op_errno, buf); + +        return 0; +} + + +int32_t  +wb_fstat_helper (call_frame_t *frame, +                 xlator_t *this, +                 fd_t *fd) +{ +        STACK_WIND (frame, +                    wb_fstat_cbk,                      FIRST_CHILD(this), -                    FIRST_CHILD(this)->fops->stat, -                    loc); +                    FIRST_CHILD(this)->fops->fstat, +                    fd);          return 0;  } @@ -367,9 +567,10 @@ wb_fstat (call_frame_t *frame,            xlator_t *this,            fd_t *fd)  { -        wb_file_t *file = NULL; -        wb_local_t *local = NULL; -  	uint64_t tmp_file = 0; +        wb_file_t   *file = NULL; +        wb_local_t  *local = NULL; +  	uint64_t     tmp_file = 0; +        call_stub_t *stub = NULL;          if (fd_ctx_get (fd, this, &tmp_file)) {                  gf_log (this->name, GF_LOG_ERROR, "returning EBADFD"); @@ -378,21 +579,29 @@ wb_fstat (call_frame_t *frame,          }  	file = (wb_file_t *)(long)tmp_file; -        if (file) { -                fd_ref (file->fd); -                wb_sync_all (frame, file); -        } -          local = CALLOC (1, sizeof (*local));          local->file = file;          frame->local = local; -   -        STACK_WIND (frame, -                    wb_stat_cbk, -                    FIRST_CHILD(this), -                    FIRST_CHILD(this)->fops->fstat, -                    fd); + +        if (file) { +                stub = fop_fstat_stub (frame, wb_fstat_helper, fd); +                if (stub == NULL) { +                        STACK_UNWIND (frame, -1, ENOMEM, NULL); +                        return 0; +                } +                 +                wb_enqueue (file, stub); + +                wb_process_queue (frame, file, 1); +        } else { +                STACK_WIND (frame, +                            wb_fstat_cbk, +                            FIRST_CHILD(this), +                            FIRST_CHILD(this)->fops->fstat, +                            fd); +        } +          return 0;  } @@ -405,13 +614,48 @@ wb_truncate_cbk (call_frame_t *frame,                   int32_t op_errno,                   struct stat *buf)  { -        wb_local_t *local = NULL;  -   +        wb_local_t   *local = NULL;  +        wb_request_t *request = NULL; +        wb_file_t    *file = NULL; +        call_frame_t *process_frame = NULL; +          local = frame->local; -        if (local->file) -                fd_unref (local->file->fd); +        file = local->file; +        request = local->request; + +        if (request) { +                process_frame = copy_frame (frame); +        }          STACK_UNWIND (frame, op_ret, op_errno, buf); + +        if (request) { +                wb_request_unref (request); +                wb_process_queue (process_frame, file, 0); +                STACK_DESTROY (process_frame->root); +        } + +        if (file) { +                fd_unref (file->fd); +        } + +        return 0; +} + + +static int32_t  +wb_truncate_helper (call_frame_t *frame, +                    xlator_t *this, +                    loc_t *loc, +                    off_t offset) +{ +        STACK_WIND (frame, +                    wb_truncate_cbk, +                    FIRST_CHILD(this), +                    FIRST_CHILD(this)->fops->truncate, +                    loc, +                    offset); +          return 0;  } @@ -422,13 +666,16 @@ wb_truncate (call_frame_t *frame,               loc_t *loc,               off_t offset)  { -        wb_file_t *file = NULL; -        fd_t *iter_fd = NULL; -        wb_local_t *local = NULL; -	uint64_t tmp_file = 0; +        wb_file_t   *file = NULL; +        fd_t        *iter_fd = NULL; +        wb_local_t  *local = NULL; +	uint64_t     tmp_file = 0; +        call_stub_t *stub = NULL;          if (loc->inode)          { +                /* FIXME: fd_lookup extends life of fd till the execution of +                   truncate_cbk */                  iter_fd = fd_lookup (loc->inode, frame->root->pid);                  if (iter_fd) {                          if (!fd_ctx_get (iter_fd, this, &tmp_file)){ @@ -437,37 +684,90 @@ wb_truncate (call_frame_t *frame,                                  fd_unref (iter_fd);                          }                  } -     -                if (file) -                { -                        wb_sync_all (frame, file); -                }          }          local = CALLOC (1, sizeof (*local));          local->file = file; - +                  frame->local = local; +        if (file) { +                stub = fop_truncate_stub (frame, wb_truncate_helper, loc, +                                          offset); +                if (stub == NULL) { +                        STACK_UNWIND (frame, -1, ENOMEM, NULL); +                        return 0; +                } + +                wb_enqueue (file, stub); +                 +                wb_process_queue (frame, file, 1); + +        } else { +                STACK_WIND (frame, +                            wb_truncate_cbk, +                            FIRST_CHILD(this), +                            FIRST_CHILD(this)->fops->truncate, +                            loc, +                            offset); +        } +        return 0; +} + + +int32_t +wb_ftruncate_cbk (call_frame_t *frame, +                  void *cookie, +                  xlator_t *this, +                  int32_t op_ret, +                  int32_t op_errno, +                  struct stat *buf) +{ +        wb_local_t   *local = NULL;  +        wb_request_t *request = NULL; +        wb_file_t    *file = NULL; + +        local = frame->local; +        file = local->file; +        request = local->request; + +        if (request) { +                wb_request_unref (request); +                wb_process_queue (frame, file, 0); +        } + +        STACK_UNWIND (frame, op_ret, op_errno, buf); + +        return 0; +} + + +static int32_t +wb_ftruncate_helper (call_frame_t *frame, +                     xlator_t *this, +                     fd_t *fd, +                     off_t offset) +{          STACK_WIND (frame, -                    wb_truncate_cbk, +                    wb_ftruncate_cbk,                      FIRST_CHILD(this), -                    FIRST_CHILD(this)->fops->truncate, -                    loc, +                    FIRST_CHILD(this)->fops->ftruncate, +                    fd,                      offset);          return 0;  } - +          int32_t  wb_ftruncate (call_frame_t *frame,                xlator_t *this,                fd_t *fd,                off_t offset)  { -        wb_file_t *file = NULL; -        wb_local_t *local = NULL; -	uint64_t tmp_file = 0; +        wb_file_t   *file = NULL; +        wb_local_t  *local = NULL; +	uint64_t     tmp_file = 0; +        call_stub_t *stub = NULL;          if (fd_ctx_get (fd, this, &tmp_file)) {                  gf_log (this->name, GF_LOG_ERROR, "returning EBADFD"); @@ -476,23 +776,32 @@ wb_ftruncate (call_frame_t *frame,          }  	file = (wb_file_t *)(long)tmp_file; -        if (file) -                wb_sync_all (frame, file);          local = CALLOC (1, sizeof (*local));          local->file = file; -        if (file) -                fd_ref (file->fd); -          frame->local = local; -        STACK_WIND (frame, -                    wb_truncate_cbk, -                    FIRST_CHILD(this), -                    FIRST_CHILD(this)->fops->ftruncate, -                    fd, -                    offset); +        if (file) { +                stub = fop_ftruncate_stub (frame, wb_ftruncate_helper, fd, +                                           offset); +                if (stub == NULL) { +                        STACK_UNWIND (frame, -1, ENOMEM, NULL); +                        return 0; +                } + +                wb_enqueue (file, stub); + +                wb_process_queue (frame, file, 1); +        } else { +                STACK_WIND (frame, +                            wb_ftruncate_cbk, +                            FIRST_CHILD(this), +                            FIRST_CHILD(this)->fops->ftruncate, +                            fd, +                            offset); +        } +          return 0;  } @@ -505,13 +814,48 @@ wb_utimens_cbk (call_frame_t *frame,                  int32_t op_errno,                  struct stat *buf)  { -        wb_local_t *local = NULL;        +        wb_local_t   *local = NULL;        +        wb_request_t *request = NULL; +        call_frame_t *process_frame = NULL;  +        wb_file_t    *file = NULL;          local = frame->local; -        if (local->file) -                fd_unref (local->file->fd); +        file = local->file; +        request = local->request; + +        if (request) { +                process_frame = copy_frame (frame); +        }          STACK_UNWIND (frame, op_ret, op_errno, buf); + +        if (request) { +                wb_request_unref (request); +                wb_process_queue (process_frame, file, 0); +                STACK_DESTROY (process_frame->root); +        } + +        if (file) { +                fd_unref (file->fd); +        } + +        return 0; +} + + +static int32_t  +wb_utimens_helper (call_frame_t *frame, +                   xlator_t *this, +                   loc_t *loc, +                   struct timespec tv[2]) +{ +        STACK_WIND (frame, +                    wb_utimens_cbk, +                    FIRST_CHILD(this), +                    FIRST_CHILD(this)->fops->utimens, +                    loc, +                    tv); +          return 0;  } @@ -522,12 +866,15 @@ wb_utimens (call_frame_t *frame,              loc_t *loc,              struct timespec tv[2])  { -        wb_file_t *file = NULL; -        fd_t *iter_fd = NULL; -        wb_local_t *local = NULL; -	uint64_t tmp_file = 0; +        wb_file_t   *file = NULL; +        fd_t        *iter_fd = NULL; +        wb_local_t  *local = NULL; +	uint64_t     tmp_file = 0; +        call_stub_t *stub = NULL;          if (loc->inode) { +                /* FIXME: fd_lookup extends life of fd till the execution +                   of wb_utimens_cbk */                  iter_fd = fd_lookup (loc->inode, frame->root->pid);                  if (iter_fd) {                          if (!fd_ctx_get (iter_fd, this, &tmp_file)) { @@ -537,8 +884,6 @@ wb_utimens (call_frame_t *frame,                          }                  } -                if (file) -                        wb_sync_all (frame, file);          }          local = CALLOC (1, sizeof (*local)); @@ -546,12 +891,25 @@ wb_utimens (call_frame_t *frame,          frame->local = local; -        STACK_WIND (frame, -                    wb_utimens_cbk, -                    FIRST_CHILD(this), -                    FIRST_CHILD(this)->fops->utimens, -                    loc, -                    tv); +        if (file) { +                stub = fop_utimens_stub (frame, wb_utimens_helper, loc, tv); +                if (stub == NULL) { +                        STACK_UNWIND (frame, -1, ENOMEM, NULL); +                        return 0; +                } + +                wb_enqueue (file, stub); + +                wb_process_queue (frame, file, 1); +        } else { +                STACK_WIND (frame, +                            wb_utimens_cbk, +                            FIRST_CHILD(this), +                            FIRST_CHILD(this)->fops->utimens, +                            loc, +                            tv); +        } +          return 0;  } @@ -563,7 +921,7 @@ wb_open_cbk (call_frame_t *frame,               int32_t op_errno,               fd_t *fd)  { -        int32_t flags = 0; +        int32_t    flags = 0;          wb_file_t *file = NULL;          wb_conf_t *conf = this->private; @@ -574,17 +932,18 @@ wb_open_cbk (call_frame_t *frame,                  /* If mandatory locking has been enabled on this file,                     we disable caching on it */ -                if ((fd->inode->st_mode & S_ISGID) && !(fd->inode->st_mode & S_IXGRP)) +                if ((fd->inode->st_mode & S_ISGID) +                    && !(fd->inode->st_mode & S_IXGRP))                          file->disabled = 1;                  /* If O_DIRECT then, we disable chaching */                  if (frame->local)                  {                          flags = *((int32_t *)frame->local); -                        if (((flags & O_DIRECT) == O_DIRECT) ||  -                            ((flags & O_RDONLY) == O_RDONLY) || -                            (((flags & O_SYNC) == O_SYNC) && -                             conf->enable_O_SYNC == _gf_true)) {  +                        if (((flags & O_DIRECT) == O_DIRECT) +                            || ((flags & O_RDONLY) == O_RDONLY) +                            || (((flags & O_SYNC) == O_SYNC) +                            && conf->enable_O_SYNC == _gf_true)) {                                   file->disabled = 1;                          }                  } @@ -635,8 +994,8 @@ wb_create_cbk (call_frame_t *frame,                   * If mandatory locking has been enabled on this file,                   * we disable caching on it                   */ -                if ((fd->inode->st_mode & S_ISGID) &&  -                    !(fd->inode->st_mode & S_IXGRP)) +                if ((fd->inode->st_mode & S_ISGID) +                    && !(fd->inode->st_mode & S_IXGRP))                  {                          file->disabled = 1;                  } @@ -666,44 +1025,43 @@ wb_create (call_frame_t *frame,  } -int32_t  -__wb_cleanup_queue (wb_file_t *file) +size_t +__wb_mark_wind_all (list_head_t *list, list_head_t *winds)  { -        wb_write_request_t *request = NULL, *dummy = NULL; -        int32_t bytes = 0; +        wb_request_t *request = NULL; +        size_t        size = 0; +        struct iovec *vector = NULL; +        int32_t       count = 0; +        char          first_request = 1; +        off_t         offset_expected = 0; +        size_t        length = 0; -        list_for_each_entry_safe (request, dummy, &file->request, list) +        list_for_each_entry (request, list, list)          { -                if (request->got_reply && request->write_behind) -                { -                        bytes += iov_length (request->vector, request->count); -                        list_del_init (&request->list); - -                        FREE (request->vector); -                        dict_unref (request->refs); -       -                        FREE (request); +                if ((request->stub == NULL) +                    || (request->stub->fop != GF_FOP_WRITE)) { +                        break;                  } -        } -        return bytes; -} +                vector = request->stub->args.writev.vector; +                count = request->stub->args.writev.count; +                if (!request->flags.write_request.stack_wound) { +                        if (first_request) { +                                first_request = 0; +                                offset_expected = request->stub->args.writev.off; +                        } +                         +                        if (request->stub->args.writev.off != offset_expected) { +                                break; +                        } +                        length = iov_length (vector, count); +                        size += length; +                        offset_expected += length; -int32_t  -__wb_mark_wind_all (list_head_t *list, list_head_t *winds) -{ -        wb_write_request_t *request = NULL; -        size_t size = 0; - -        list_for_each_entry (request, list, list) -        { -                if (!request->stack_wound) -                { -                        size += iov_length (request->vector, request->count); -                        request->stack_wound = 1; +                        request->flags.write_request.stack_wound = 1;                          list_add_tail (&request->winds, winds); -                } +                }           }          return size; @@ -711,32 +1069,66 @@ __wb_mark_wind_all (list_head_t *list, list_head_t *winds)  size_t  -__wb_get_aggregate_size (list_head_t *list) +__wb_get_aggregate_size (list_head_t *list, char *other_fop_in_queue, +                         char *non_contiguous_writes)  { -        wb_write_request_t *request = NULL; -        size_t size = 0; +        wb_request_t *request = NULL; +        size_t        size = 0, length = 0; +        struct iovec *vector = NULL; +        int32_t       count = 0; +        char          first_request = 1; +        off_t         offset_expected = 0;          list_for_each_entry (request, list, list)          { -                if (!request->stack_wound) -                { -                        size += iov_length (request->vector, request->count); +                if ((request->stub == NULL) +                    || (request->stub->fop != GF_FOP_WRITE)) { +                        if (request->stub && other_fop_in_queue) { +                                *other_fop_in_queue = 1; +                        } +                        break; +                } + +                vector = request->stub->args.writev.vector; +                count = request->stub->args.writev.count; +                if (!request->flags.write_request.stack_wound) { +                        if (first_request) { +                                first_request = 0; +                                offset_expected = request->stub->args.writev.off; +                        }  + +                        if (offset_expected != request->stub->args.writev.off) { +                                if (non_contiguous_writes) { +                                        *non_contiguous_writes = 1; +                                } +                                break; +                        } + +                        length = iov_length (vector, count); +                        size += length; +                        offset_expected += length;                  }          }          return size;  } +  uint32_t  __wb_get_incomplete_writes (list_head_t *list)  { -        wb_write_request_t *request = NULL; -        uint32_t count = 0; +        wb_request_t *request = NULL; +        uint32_t      count = 0;          list_for_each_entry (request, list, list)          { -                if (request->stack_wound && !request->got_reply) -                { +                if ((request->stub == NULL) +                    || (request->stub->fop != GF_FOP_WRITE)) { +                        break; +                } + +                if (request->flags.write_request.stack_wound +                    && !request->flags.write_request.got_reply) {                          count++;                  }          } @@ -744,18 +1136,22 @@ __wb_get_incomplete_writes (list_head_t *list)          return count;  } -int32_t + +size_t  __wb_mark_winds (list_head_t *list, list_head_t *winds, size_t aggregate_conf)  { -        size_t aggregate_current = 0; +        size_t   aggregate_current = 0;          uint32_t incomplete_writes = 0; +        char     other_fop_in_queue = 0; +        char     non_contiguous_writes = 0;          incomplete_writes = __wb_get_incomplete_writes (list);  -        aggregate_current = __wb_get_aggregate_size (list); +        aggregate_current = __wb_get_aggregate_size (list, &other_fop_in_queue, +                                                     &non_contiguous_writes); -        if ((incomplete_writes == 0) || (aggregate_current >= aggregate_conf)) -        { +        if ((incomplete_writes == 0) || (aggregate_current >= aggregate_conf) +            || other_fop_in_queue || non_contiguous_writes) {                  __wb_mark_wind_all (list, winds);          } @@ -766,14 +1162,25 @@ __wb_mark_winds (list_head_t *list, list_head_t *winds, size_t aggregate_conf)  size_t  __wb_get_window_size (list_head_t *list)  { -        wb_write_request_t *request = NULL; -        size_t size = 0; +        wb_request_t *request = NULL; +        size_t        size = 0; +        struct iovec *vector = NULL; +        int32_t       count = 0;          list_for_each_entry (request, list, list)          { -                if (request->write_behind && !request->got_reply) +                if ((request->stub == NULL) +                    || (request->stub->fop != GF_FOP_WRITE)) { +                        continue; +                } + +                vector = request->stub->args.writev.vector; +                count = request->stub->args.writev.count; + +                if (request->flags.write_request.write_behind +                    && !request->flags.write_request.got_reply)                  { -                        size += iov_length (request->vector, request->count); +                        size += iov_length (vector, count);                  }          } @@ -784,23 +1191,28 @@ __wb_get_window_size (list_head_t *list)  size_t   __wb_mark_unwind_till (list_head_t *list, list_head_t *unwinds, size_t size)  { -        size_t written_behind = 0; -        wb_write_request_t *request = NULL; +        size_t        written_behind = 0; +        wb_request_t *request = NULL; +        struct iovec *vector = NULL; +        int32_t       count = 0;          list_for_each_entry (request, list, list)          { -                if (written_behind <= size) -                { -                        if (!request->write_behind) -                        { -                                wb_local_t *local = request->frame->local; -                                written_behind += iov_length (request->vector, request->count); -                                request->write_behind = 1; -                                list_add_tail (&local->unwind_frames, unwinds); -                        } +                if ((request->stub == NULL) +                    || (request->stub->fop != GF_FOP_WRITE)) { +                        continue;                  } -                else -                { + +                vector = request->stub->args.writev.vector; +                count = request->stub->args.writev.count; + +                if (written_behind <= size) { +                        if (!request->flags.write_request.write_behind) { +                                written_behind += iov_length (vector, count); +                                request->flags.write_request.write_behind = 1; +                                list_add_tail (&request->unwinds, unwinds); +                        } +                } else {                          break;                  }          } @@ -825,16 +1237,48 @@ __wb_mark_unwinds (list_head_t *list, list_head_t *unwinds, size_t window_conf)  } +uint32_t +__wb_get_other_requests (list_head_t *list, list_head_t *other_requests) +{ +        wb_request_t *request = NULL; +        uint32_t      count = 0; +        list_for_each_entry (request, list, list) { +                if ((request->stub == NULL) +                    || (request->stub->fop == GF_FOP_WRITE)) { +                        break; +                } +                 +                if (!request->flags.other_requests.marked_for_resume) { +                        request->flags.other_requests.marked_for_resume = 1; +                        list_add_tail (&request->other_requests, +                                       other_requests); +                        count++; + +                        /* lets handle one at a time */ +                        break; +                } +        } + +        return count; +} + +  int32_t  wb_stack_unwind (list_head_t *unwinds)  { -        struct stat buf = {0,}; -        wb_local_t *local = NULL, *dummy = NULL; +        struct stat   buf = {0,}; +        wb_request_t *request = NULL, *dummy = NULL; +        call_frame_t *frame = NULL; +        wb_local_t   *local = NULL; -        list_for_each_entry_safe (local, dummy, unwinds, unwind_frames) +        list_for_each_entry_safe (request, dummy, unwinds, unwinds)          { -                list_del_init (&local->unwind_frames); -                STACK_UNWIND (local->frame, local->op_ret, local->op_errno, &buf); +                frame = request->stub->frame; +                local = frame->local; + +                STACK_UNWIND (frame, local->op_ret, local->op_errno, &buf); + +                wb_request_unref (request);          }          return 0; @@ -842,13 +1286,54 @@ wb_stack_unwind (list_head_t *unwinds)  int32_t -wb_do_ops (call_frame_t *frame, wb_file_t *file, list_head_t *winds, list_head_t *unwinds) +wb_resume_other_requests (call_frame_t *frame, wb_file_t *file, +                          list_head_t *other_requests) +{ +        int32_t       ret = 0; +        wb_request_t *request = NULL, *dummy = NULL; +        int32_t       fops_removed = 0; +        char          wind = 0; +        call_stub_t  *stub = NULL; + +        if (list_empty (other_requests)) { +                goto out; +        } + +        list_for_each_entry_safe (request, dummy, other_requests, +                                  other_requests) { +                wind = request->stub->wind; +                stub = request->stub; +                 +                LOCK (&file->lock);  +                { +                        request->stub = NULL; +                } +                UNLOCK (&file->lock); + +                if (!wind) { +                        wb_request_unref (request); +                        fops_removed++; +                }  +                 +                call_resume (stub); +        } + +        if (fops_removed > 0) { +                wb_process_queue (frame, file, 0); +        } +         +out: +        return ret; +} + + +int32_t +wb_do_ops (call_frame_t *frame, wb_file_t *file, list_head_t *winds, +           list_head_t *unwinds, list_head_t *other_requests)  { -        /* copy the frame before calling wb_stack_unwind, since this request containing current frame might get unwound */ -        /*  call_frame_t *sync_frame = copy_frame (frame); */ -           wb_stack_unwind (unwinds);          wb_sync (frame, file, winds); +        wb_resume_other_requests (frame, file, other_requests);          return 0;  } @@ -857,67 +1342,35 @@ wb_do_ops (call_frame_t *frame, wb_file_t *file, list_head_t *winds, list_head_t  int32_t   wb_process_queue (call_frame_t *frame, wb_file_t *file, char flush_all)   { -        list_head_t winds, unwinds; -        size_t size = 0; -        wb_conf_t *conf = file->this->private; +        list_head_t winds, unwinds, other_requests; +        size_t      size = 0; +        wb_conf_t  *conf = file->this->private; +        uint32_t    count = 0;          INIT_LIST_HEAD (&winds);          INIT_LIST_HEAD (&unwinds); - -        if (!file) -        { +        INIT_LIST_HEAD (&other_requests); +                 +        if (!file) {                  return -1;          }          size = flush_all ? 0 : conf->aggregate_size;          LOCK (&file->lock);          { -                __wb_cleanup_queue (file); -                __wb_mark_winds (&file->request, &winds, size); -                __wb_mark_unwinds (&file->request, &unwinds, conf->window_size); -        } -        UNLOCK (&file->lock); - -        wb_do_ops (frame, file, &winds, &unwinds); -        return 0; -} - - -wb_write_request_t * -wb_enqueue (wb_file_t *file,  -            call_frame_t *frame, -            struct iovec *vector, -            int32_t count, -            off_t offset) -{ -        wb_write_request_t *request = NULL; -        wb_local_t *local = CALLOC (1, sizeof (*local)); +                count = __wb_get_other_requests (&file->request, +                                                 &other_requests); -        request = CALLOC (1, sizeof (*request)); - -        INIT_LIST_HEAD (&request->list); -        INIT_LIST_HEAD (&request->winds); - -        request->frame = frame; -        request->vector = iov_dup (vector, count); -        request->count = count; -        request->offset = offset; -        request->refs = dict_ref (frame->root->req_refs); - -        frame->local = local; -        local->frame = frame; -        local->op_ret = iov_length (vector, count); -        local->op_errno = 0; -        INIT_LIST_HEAD (&local->unwind_frames); +                if (count == 0) { +                        __wb_mark_winds (&file->request, &winds, size); +                } -        LOCK (&file->lock); -        { -                list_add_tail (&request->list, &file->request); -                file->offset = offset + iov_length (vector, count); +                __wb_mark_unwinds (&file->request, &unwinds, conf->window_size);          }          UNLOCK (&file->lock); -        return request; +        wb_do_ops (frame, file, &winds, &unwinds, &other_requests); +        return 0;  } @@ -942,11 +1395,13 @@ wb_writev (call_frame_t *frame,             int32_t count,             off_t offset)  { -        wb_file_t *file = NULL; -        char offset_expected = 1, wb_disabled = 0;  +        wb_file_t    *file = NULL; +        char          wb_disabled = 0;          call_frame_t *process_frame = NULL; -        size_t size = 0; -	uint64_t tmp_file = 0; +        size_t        size = 0; +	uint64_t      tmp_file = 0; +        call_stub_t  *stub = NULL; +        wb_local_t   *local = NULL;          if (vector != NULL)                   size = iov_length (vector, count); @@ -975,9 +1430,6 @@ wb_writev (call_frame_t *frame,                          }                          wb_disabled = 1;                  } - -                if (file->offset != offset) -                        offset_expected = 0;          }          UNLOCK (&file->lock); @@ -986,7 +1438,7 @@ wb_writev (call_frame_t *frame,                              wb_writev_cbk,                              FIRST_CHILD (frame->this),                              FIRST_CHILD (frame->this)->fops->writev, -                            file->fd, +                            fd,                              vector,                              count,                              offset); @@ -995,10 +1447,17 @@ wb_writev (call_frame_t *frame,          process_frame = copy_frame (frame); -        if (!offset_expected) -                wb_process_queue (process_frame, file, 1); +        local = CALLOC (1, sizeof (*local)); +        frame->local = local; +        local->file = file; + +        stub = fop_writev_stub (frame, NULL, fd, vector, count, offset); +        if (stub == NULL) { +                STACK_UNWIND (frame, -1, ENOMEM, NULL); +                return 0; +        } -        wb_enqueue (file, frame, vector, count, offset); +        wb_enqueue (file, stub);          wb_process_queue (process_frame, file, 0);          STACK_DESTROY (process_frame->root); @@ -1017,11 +1476,38 @@ wb_readv_cbk (call_frame_t *frame,                int32_t count,                struct stat *stbuf)  { -        wb_local_t *local = NULL; +        wb_local_t   *local = NULL; +        wb_file_t    *file = NULL; +        wb_request_t *request = NULL;          local = frame->local; +        file = local->file; +        request = local->request; + +        if (request) { +                wb_request_unref (request); +                wb_process_queue (frame, file, 0); +        }          STACK_UNWIND (frame, op_ret, op_errno, vector, count, stbuf); + +        return 0; +} + + +static int32_t +wb_readv_helper (call_frame_t *frame, +                 xlator_t *this, +                 fd_t *fd, +                 size_t size, +                 off_t offset) +{ +        STACK_WIND (frame, +                    wb_readv_cbk, +                    FIRST_CHILD(this), +                    FIRST_CHILD(this)->fops->readv, +                    fd, size, offset); +                  return 0;  } @@ -1033,9 +1519,10 @@ wb_readv (call_frame_t *frame,            size_t size,            off_t offset)  { -        wb_file_t *file = NULL; -        wb_local_t *local = NULL; -	uint64_t tmp_file = 0; +        wb_file_t   *file = NULL; +        wb_local_t  *local = NULL; +	uint64_t     tmp_file = 0; +        call_stub_t *stub = NULL;          if (fd_ctx_get (fd, this, &tmp_file)) {                  gf_log (this->name, GF_LOG_ERROR, "returning EBADFD"); @@ -1044,19 +1531,29 @@ wb_readv (call_frame_t *frame,          }  	file = (wb_file_t *)(long)tmp_file; -        if (file) -                wb_sync_all (frame, file);          local = CALLOC (1, sizeof (*local));          local->file = file;          frame->local = local; +        if (file) { +                stub = fop_readv_stub (frame, wb_readv_helper, fd, size, +                                       offset); +                if (stub == NULL) { +                        STACK_UNWIND (frame, -1, ENOMEM, NULL); +                        return 0; +                } -        STACK_WIND (frame, -                    wb_readv_cbk, -                    FIRST_CHILD(this), -                    FIRST_CHILD(this)->fops->readv, -                    fd, size, offset); +                wb_enqueue (file, stub); + +                wb_process_queue (frame, file, 1); +        } else { +                STACK_WIND (frame, +                            wb_readv_cbk, +                            FIRST_CHILD(this), +                            FIRST_CHILD(this)->fops->readv, +                            fd, size, offset); +        }          return 0;  } @@ -1069,24 +1566,6 @@ wb_ffr_bg_cbk (call_frame_t *frame,                 int32_t op_ret,                 int32_t op_errno)  { -        wb_local_t *local = NULL; -        wb_file_t *file = NULL; - -        local = frame->local; -        file = local->file; - -        if (file) { -                fd_unref (file->fd); -        } - -        if (file->op_ret == -1) -        { -                op_ret = file->op_ret; -                op_errno = file->op_errno; - -                file->op_ret = 0; -        } -            STACK_DESTROY (frame->root);          return 0;  } @@ -1100,24 +1579,39 @@ wb_ffr_cbk (call_frame_t *frame,              int32_t op_errno)  {          wb_local_t *local = NULL; -        wb_file_t *file = NULL; +        wb_file_t  *file = NULL; +        wb_conf_t  *conf = NULL; +        char        unwind = 0; +        conf = this->private;          local = frame->local;          file = local->file; -        if (file) { -                /* corresponds to the fd_ref() done during wb_file_create() */ -                fd_unref (file->fd); + +        if (conf->flush_behind +            && (!file->disabled) && (file->disable_till == 0)) { +                unwind = 1; +        } else { +                local->reply_count++; +                /* without flush-behind, unwind should wait for replies of +                   writes queued before and the flush */ +                if (local->reply_count == 2) { +                        unwind = 1; +                }          } -        if (file->op_ret == -1) -        { -                op_ret = file->op_ret; -                op_errno = file->op_errno; +        if (unwind) { +                if (file->op_ret == -1) { +                        op_ret = file->op_ret; +                        op_errno = file->op_errno; -                file->op_ret = 0; +                        file->op_ret = 0; +                } + +                wb_process_queue (frame, file, 0); +                 +                STACK_UNWIND (frame, op_ret, op_errno);          } -        STACK_UNWIND (frame, op_ret, op_errno);          return 0;  } @@ -1127,11 +1621,13 @@ wb_flush (call_frame_t *frame,            xlator_t *this,            fd_t *fd)  { -        wb_conf_t *conf = NULL; -        wb_file_t *file = NULL; -        call_frame_t *flush_frame = NULL; -        wb_local_t *local = NULL; -	uint64_t tmp_file = 0; +        wb_conf_t    *conf = NULL; +        wb_file_t    *file = NULL; +        wb_local_t   *local = NULL; +	uint64_t      tmp_file = 0; +        call_stub_t  *stub = NULL; +        call_frame_t *process_frame = NULL; +        wb_local_t   *tmp_local = NULL;          conf = this->private; @@ -1145,32 +1641,35 @@ wb_flush (call_frame_t *frame,          local = CALLOC (1, sizeof (*local));          local->file = file; -        if (file) -                fd_ref (file->fd); -        if (&file->request != file->request.next) { -                gf_log (this->name, GF_LOG_DEBUG, -                        "request queue is not empty, it has to be synced"); +        frame->local = local; +        stub = fop_flush_cbk_stub (frame, wb_ffr_cbk, 0, 0); +        if (stub == NULL) { +                STACK_UNWIND (frame, -1, ENOMEM); +                return 0;          } -        if (conf->flush_behind &&  -	    (!file->disabled) && (file->disable_till == 0)) { -                flush_frame = copy_frame (frame);      -                STACK_UNWIND (frame, file->op_ret,  -			      file->op_errno); // liar! liar! :O +        if (conf->flush_behind +            && (!file->disabled) && (file->disable_till == 0)) { +                tmp_local = CALLOC (1, sizeof (*local)); +                tmp_local->file = file; -                flush_frame->local = local; -                wb_sync_all (flush_frame, file); +                process_frame = copy_frame (frame); +                process_frame->local = tmp_local; +        } + +        wb_enqueue (file, stub); -                STACK_WIND (flush_frame, +        wb_process_queue (process_frame, file, 1);  +                 +        if (conf->flush_behind +            && (!file->disabled) && (file->disable_till == 0)) { +                STACK_WIND (process_frame,                              wb_ffr_bg_cbk,                              FIRST_CHILD(this),                              FIRST_CHILD(this)->fops->flush,                              fd);          } else { -                wb_sync_all (frame, file); - -                frame->local = local;                  STACK_WIND (frame,                              wb_ffr_cbk,                              FIRST_CHILD(this), @@ -1182,40 +1681,64 @@ wb_flush (call_frame_t *frame,  } -int32_t +static int32_t  wb_fsync_cbk (call_frame_t *frame,                void *cookie,                xlator_t *this,                int32_t op_ret,                int32_t op_errno)  { -        wb_local_t *local = NULL; -        wb_file_t *file = NULL; +        wb_local_t   *local = NULL; +        wb_file_t    *file = NULL; +        wb_request_t *request = NULL;          local = frame->local;          file = local->file; +        request = local->request; -        if (file->op_ret == -1) -        { +        if (file->op_ret == -1) {                  op_ret = file->op_ret;                  op_errno = file->op_errno;                  file->op_ret = 0;          } +        if (request) { +                wb_request_unref (request); +                wb_process_queue (frame, file, 0); +        } +          STACK_UNWIND (frame, op_ret, op_errno); +                  return 0;  } + +static int32_t +wb_fsync_helper (call_frame_t *frame, +                 xlator_t *this, +                 fd_t *fd, +                 int32_t datasync) +{ +        STACK_WIND (frame, +                    wb_fsync_cbk, +                    FIRST_CHILD(this), +                    FIRST_CHILD(this)->fops->fsync, +                    fd, datasync); +        return 0; +} + +  int32_t  wb_fsync (call_frame_t *frame,            xlator_t *this,            fd_t *fd,            int32_t datasync)  { -        wb_file_t *file = NULL; -        wb_local_t *local = NULL; -	uint64_t tmp_file = 0; +        wb_file_t   *file = NULL; +        wb_local_t  *local = NULL; +	uint64_t     tmp_file = 0; +        call_stub_t *stub = NULL;          if (fd_ctx_get (fd, this, &tmp_file)) {                  gf_log (this->name, GF_LOG_ERROR, "returning EBADFD"); @@ -1224,19 +1747,30 @@ wb_fsync (call_frame_t *frame,          }  	file = (wb_file_t *)(long)tmp_file; -        if (file) -                wb_sync_all (frame, file);          local = CALLOC (1, sizeof (*local));          local->file = file;          frame->local = local; -        STACK_WIND (frame, -                    wb_fsync_cbk, -                    FIRST_CHILD(this), -                    FIRST_CHILD(this)->fops->fsync, -                    fd, datasync); +        if (file) { +                stub = fop_fsync_stub (frame, wb_fsync_helper, fd, datasync); +                if (stub == NULL) { +                        STACK_UNWIND (frame, -1, ENOMEM); +                        return 0; +                } +         +                wb_enqueue (file, stub); + +                wb_process_queue (frame, file, 1); +        } else { +                STACK_WIND (frame, +                            wb_fsync_cbk, +                            FIRST_CHILD(this), +                            FIRST_CHILD(this)->fops->fsync, +                            fd, datasync); +        } +          return 0;  } @@ -1245,10 +1779,19 @@ int32_t  wb_release (xlator_t *this,              fd_t *fd)  { -        uint64_t file = 0; +        uint64_t   file_ptr = 0; +        wb_file_t *file = NULL; + +	fd_ctx_get (fd, this, &file_ptr); +        file = (wb_file_t *) (long) file_ptr; + +        LOCK (&file->lock); +        { +                assert (list_empty (&file->request)); +        } +        UNLOCK (&file->lock); -	fd_ctx_get (fd, this, &file); -  	wb_file_destroy ((wb_file_t *)(long)file); +  	wb_file_destroy (file);          return 0;  } @@ -1257,14 +1800,14 @@ wb_release (xlator_t *this,  int32_t   init (xlator_t *this)  { -        dict_t *options = NULL; +        dict_t    *options = NULL;          wb_conf_t *conf = NULL; -        char *aggregate_size_string = NULL; -        char *window_size_string    = NULL; -        char *flush_behind_string   = NULL; -        char *disable_till_string = NULL; -        char *enable_O_SYNC_string = NULL; -        int32_t ret = -1; +        char      *aggregate_size_string = NULL; +        char      *window_size_string    = NULL; +        char      *flush_behind_string   = NULL; +        char      *disable_till_string = NULL; +        char      *enable_O_SYNC_string = NULL; +        int32_t    ret = -1;          if ((this->children == NULL)              || this->children->next) {  | 
