summaryrefslogtreecommitdiffstats
path: root/xlators/cluster/nsr-server/src/nsr-cg.c
diff options
context:
space:
mode:
Diffstat (limited to 'xlators/cluster/nsr-server/src/nsr-cg.c')
-rw-r--r--xlators/cluster/nsr-server/src/nsr-cg.c4444
1 files changed, 4444 insertions, 0 deletions
diff --git a/xlators/cluster/nsr-server/src/nsr-cg.c b/xlators/cluster/nsr-server/src/nsr-cg.c
new file mode 100644
index 000000000..54f370b75
--- /dev/null
+++ b/xlators/cluster/nsr-server/src/nsr-cg.c
@@ -0,0 +1,4444 @@
+/* No stub needed for access */
+
+/* No cbk needed for access */
+
+int32_t
+nsr_access (call_frame_t *frame, xlator_t *this,
+ loc_t * loc, int32_t mask, dict_t * xdata)
+{
+ nsr_private_t *priv = this->private;
+ gf_boolean_t in_recon = _gf_false;
+ int32_t recon_term, recon_index;
+
+ // allow reads during reconciliation
+ // TBD: allow "dirty" reads on non-leaders
+ if (xdata &&
+ (dict_get_int32(xdata, RECON_TERM_XATTR, &recon_term) == 0) &&
+ (dict_get_int32(xdata, RECON_INDEX_XATTR, &recon_index) == 0)) {
+ in_recon = _gf_true;
+ }
+
+ if ((!priv->leader) && (in_recon == _gf_false)) {
+ goto err;
+ }
+
+ STACK_WIND (frame, default_access_cbk,
+ FIRST_CHILD(this), FIRST_CHILD(this)->fops->access,
+ loc, mask, xdata);
+ return 0;
+
+err:
+ STACK_UNWIND_STRICT (access, frame, -1, EREMOTE,
+ NULL);
+ return 0;
+}
+
+int32_t
+nsr_create_complete (call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno,
+ fd_t * fd, inode_t * inode, struct iatt * buf, struct iatt * preparent, struct iatt * postparent, dict_t * xdata)
+{
+#if NSR_CG_NEED_FD
+ nsr_local_t *local = frame->local;
+#endif
+
+#if NSR_CG_QUEUE
+ nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this,local->fd);
+ if (ictx) {
+ /* TBD: LOCK */
+ if (ictx->pending) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "unblocking %u requests",
+ ictx->pending);
+ /* TBD: actually dequeue */
+ ictx->pending = 0;
+ }
+ /* TBD: UNLOCK */
+ }
+#endif
+
+#if NSR_CG_FSYNC
+ nsr_mark_fd_dirty(this,local);
+#endif
+
+#if NSR_CG_NEED_FD
+ fd_unref(local->fd);
+#endif
+
+ STACK_UNWIND_STRICT (create, frame, op_ret, op_errno,
+ fd, inode, buf, preparent, postparent, xdata);
+ return 0;
+
+}
+int32_t
+nsr_create_continue (call_frame_t *frame, xlator_t *this,
+ loc_t * loc, int32_t flags, mode_t mode, mode_t umask, fd_t * fd, dict_t * xdata)
+{
+ STACK_WIND (frame, nsr_create_complete,
+ FIRST_CHILD(this), FIRST_CHILD(this)->fops->create,
+ loc, flags, mode, umask, fd, xdata);
+ return 0;
+}
+
+int32_t
+nsr_create_fan_in (call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno,
+ fd_t * fd, inode_t * inode, struct iatt * buf, struct iatt * preparent, struct iatt * postparent, dict_t * xdata)
+{
+ nsr_local_t *local = frame->local;
+ uint8_t call_count;
+
+ gf_log (this->name, GF_LOG_TRACE,
+ "op_ret = %d, op_errno = %d\n", op_ret, op_errno);
+
+ LOCK(&frame->lock);
+ call_count = --(local->call_count);
+ UNLOCK(&frame->lock);
+
+ // TBD: variable Completion count
+ if (call_count == 0) {
+ call_resume(local->stub);
+ }
+
+ return 0;
+}
+
+int32_t
+nsr_create (call_frame_t *frame, xlator_t *this,
+ loc_t * loc, int32_t flags, mode_t mode, mode_t umask, fd_t * fd, dict_t * xdata)
+{
+ nsr_local_t *local = NULL;
+ nsr_private_t *priv = this->private;
+ xlator_list_t *trav;
+ int op_errno = ENOMEM;
+ int from_leader;
+ int from_recon;
+
+ local = mem_get0(this->local_pool);
+ if (!local) {
+ goto err;
+ }
+#if NSR_CG_NEED_FD
+ local->fd = fd_ref(fd)
+#else
+ local->fd = NULL
+#endif
+ frame->local = local;
+
+ if (xdata) {
+ from_leader = !!dict_get(xdata,NSR_TERM_XATTR);
+ from_recon = !!dict_get(xdata,RECON_TERM_XATTR)
+ && !!dict_get(xdata,RECON_INDEX_XATTR);
+ }
+ else {
+ from_leader = from_recon = _gf_false;
+ }
+
+ // follower/recon path
+ // just send it to local node
+ if (from_leader || from_recon) {
+ STACK_WIND (frame, nsr_create_complete,
+ FIRST_CHILD(this), FIRST_CHILD(this)->fops->create,
+ loc, flags, mode, umask, fd, xdata);
+ return 0;
+ }
+
+ if (!priv->leader || priv->fence_io) {
+ op_errno = EREMOTE;
+ goto err;
+ }
+
+#if NSR_CG_QUEUE
+ nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this,fd);
+ if (!ictx) {
+ op_errno = EIO;
+ goto err;
+ }
+ /* TBD: LOCK */
+ if (ictx->active) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "queuing request due to conflict");
+ ++(ictx->pending);
+ /* TBD: actually enqueue */
+ }
+ else {
+ ++(ictx->active);
+ }
+ /* TBD: UNLOCK */
+#endif
+
+ if (!xdata) {
+ xdata = dict_new();
+ if (!xdata) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "failed to allocate xdata");
+ goto err;
+ }
+ }
+
+ if (dict_set_int32(xdata,NSR_TERM_XATTR,priv->current_term) != 0) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "failed to set nsr-term");
+ goto err;
+ }
+
+ local->stub = fop_create_stub (frame,nsr_create_continue,
+ loc, flags, mode, umask, fd, xdata);
+ if (!local->stub) {
+ goto err;
+ }
+
+ local->call_count = priv->n_children - 1;
+ for (trav = this->children->next; trav; trav = trav->next) {
+ STACK_WIND (frame, nsr_create_fan_in,
+ trav->xlator, trav->xlator->fops->create,
+ loc, flags, mode, umask, fd, xdata);
+ }
+
+ // TBD: variable Issue count
+ return 0;
+
+err:
+ if (local) {
+ if (local->stub) {
+ call_stub_destroy(local->stub);
+ }
+ if (local->fd) {
+ fd_unref(local->fd);
+ }
+ mem_put(local);
+ }
+ STACK_UNWIND_STRICT (create, frame, -1, op_errno,
+ NULL, NULL, NULL, NULL, NULL, NULL);
+ return 0;
+}
+
+int32_t
+nsr_discard_complete (call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno,
+ struct iatt * preop_stbuf, struct iatt * postop_stbuf, dict_t * xdata)
+{
+#if NSR_CG_NEED_FD
+ nsr_local_t *local = frame->local;
+#endif
+
+#if NSR_CG_QUEUE
+ nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this,local->fd);
+ if (ictx) {
+ /* TBD: LOCK */
+ if (ictx->pending) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "unblocking %u requests",
+ ictx->pending);
+ /* TBD: actually dequeue */
+ ictx->pending = 0;
+ }
+ /* TBD: UNLOCK */
+ }
+#endif
+
+#if NSR_CG_FSYNC
+ nsr_mark_fd_dirty(this,local);
+#endif
+
+#if NSR_CG_NEED_FD
+ fd_unref(local->fd);
+#endif
+
+ STACK_UNWIND_STRICT (discard, frame, op_ret, op_errno,
+ preop_stbuf, postop_stbuf, xdata);
+ return 0;
+
+}
+int32_t
+nsr_discard_continue (call_frame_t *frame, xlator_t *this,
+ fd_t * fd, off_t offset, size_t len, dict_t * xdata)
+{
+ STACK_WIND (frame, nsr_discard_complete,
+ FIRST_CHILD(this), FIRST_CHILD(this)->fops->discard,
+ fd, offset, len, xdata);
+ return 0;
+}
+
+int32_t
+nsr_discard_fan_in (call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno,
+ struct iatt * preop_stbuf, struct iatt * postop_stbuf, dict_t * xdata)
+{
+ nsr_local_t *local = frame->local;
+ uint8_t call_count;
+
+ gf_log (this->name, GF_LOG_TRACE,
+ "op_ret = %d, op_errno = %d\n", op_ret, op_errno);
+
+ LOCK(&frame->lock);
+ call_count = --(local->call_count);
+ UNLOCK(&frame->lock);
+
+ // TBD: variable Completion count
+ if (call_count == 0) {
+ call_resume(local->stub);
+ }
+
+ return 0;
+}
+
+int32_t
+nsr_discard (call_frame_t *frame, xlator_t *this,
+ fd_t * fd, off_t offset, size_t len, dict_t * xdata)
+{
+ nsr_local_t *local = NULL;
+ nsr_private_t *priv = this->private;
+ xlator_list_t *trav;
+ int op_errno = ENOMEM;
+ int from_leader;
+ int from_recon;
+
+ local = mem_get0(this->local_pool);
+ if (!local) {
+ goto err;
+ }
+#if NSR_CG_NEED_FD
+ local->fd = fd_ref(fd)
+#else
+ local->fd = NULL
+#endif
+ frame->local = local;
+
+ if (xdata) {
+ from_leader = !!dict_get(xdata,NSR_TERM_XATTR);
+ from_recon = !!dict_get(xdata,RECON_TERM_XATTR)
+ && !!dict_get(xdata,RECON_INDEX_XATTR);
+ }
+ else {
+ from_leader = from_recon = _gf_false;
+ }
+
+ // follower/recon path
+ // just send it to local node
+ if (from_leader || from_recon) {
+ STACK_WIND (frame, nsr_discard_complete,
+ FIRST_CHILD(this), FIRST_CHILD(this)->fops->discard,
+ fd, offset, len, xdata);
+ return 0;
+ }
+
+ if (!priv->leader || priv->fence_io) {
+ op_errno = EREMOTE;
+ goto err;
+ }
+
+#if NSR_CG_QUEUE
+ nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this,fd);
+ if (!ictx) {
+ op_errno = EIO;
+ goto err;
+ }
+ /* TBD: LOCK */
+ if (ictx->active) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "queuing request due to conflict");
+ ++(ictx->pending);
+ /* TBD: actually enqueue */
+ }
+ else {
+ ++(ictx->active);
+ }
+ /* TBD: UNLOCK */
+#endif
+
+ if (!xdata) {
+ xdata = dict_new();
+ if (!xdata) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "failed to allocate xdata");
+ goto err;
+ }
+ }
+
+ if (dict_set_int32(xdata,NSR_TERM_XATTR,priv->current_term) != 0) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "failed to set nsr-term");
+ goto err;
+ }
+
+ local->stub = fop_discard_stub (frame,nsr_discard_continue,
+ fd, offset, len, xdata);
+ if (!local->stub) {
+ goto err;
+ }
+
+ local->call_count = priv->n_children - 1;
+ for (trav = this->children->next; trav; trav = trav->next) {
+ STACK_WIND (frame, nsr_discard_fan_in,
+ trav->xlator, trav->xlator->fops->discard,
+ fd, offset, len, xdata);
+ }
+
+ // TBD: variable Issue count
+ return 0;
+
+err:
+ if (local) {
+ if (local->stub) {
+ call_stub_destroy(local->stub);
+ }
+ if (local->fd) {
+ fd_unref(local->fd);
+ }
+ mem_put(local);
+ }
+ STACK_UNWIND_STRICT (discard, frame, -1, op_errno,
+ NULL, NULL, NULL);
+ return 0;
+}
+
+/* No code emitted for entrylk */
+
+int32_t
+nsr_fallocate_complete (call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno,
+ struct iatt * preop_stbuf, struct iatt * postop_stbuf, dict_t * xdata)
+{
+#if NSR_CG_NEED_FD
+ nsr_local_t *local = frame->local;
+#endif
+
+#if NSR_CG_QUEUE
+ nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this,local->fd);
+ if (ictx) {
+ /* TBD: LOCK */
+ if (ictx->pending) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "unblocking %u requests",
+ ictx->pending);
+ /* TBD: actually dequeue */
+ ictx->pending = 0;
+ }
+ /* TBD: UNLOCK */
+ }
+#endif
+
+#if NSR_CG_FSYNC
+ nsr_mark_fd_dirty(this,local);
+#endif
+
+#if NSR_CG_NEED_FD
+ fd_unref(local->fd);
+#endif
+
+ STACK_UNWIND_STRICT (fallocate, frame, op_ret, op_errno,
+ preop_stbuf, postop_stbuf, xdata);
+ return 0;
+
+}
+int32_t
+nsr_fallocate_continue (call_frame_t *frame, xlator_t *this,
+ fd_t * fd, int32_t keep_size, off_t offset, size_t len, dict_t * xdata)
+{
+ STACK_WIND (frame, nsr_fallocate_complete,
+ FIRST_CHILD(this), FIRST_CHILD(this)->fops->fallocate,
+ fd, keep_size, offset, len, xdata);
+ return 0;
+}
+
+int32_t
+nsr_fallocate_fan_in (call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno,
+ struct iatt * preop_stbuf, struct iatt * postop_stbuf, dict_t * xdata)
+{
+ nsr_local_t *local = frame->local;
+ uint8_t call_count;
+
+ gf_log (this->name, GF_LOG_TRACE,
+ "op_ret = %d, op_errno = %d\n", op_ret, op_errno);
+
+ LOCK(&frame->lock);
+ call_count = --(local->call_count);
+ UNLOCK(&frame->lock);
+
+ // TBD: variable Completion count
+ if (call_count == 0) {
+ call_resume(local->stub);
+ }
+
+ return 0;
+}
+
+int32_t
+nsr_fallocate (call_frame_t *frame, xlator_t *this,
+ fd_t * fd, int32_t keep_size, off_t offset, size_t len, dict_t * xdata)
+{
+ nsr_local_t *local = NULL;
+ nsr_private_t *priv = this->private;
+ xlator_list_t *trav;
+ int op_errno = ENOMEM;
+ int from_leader;
+ int from_recon;
+
+ local = mem_get0(this->local_pool);
+ if (!local) {
+ goto err;
+ }
+#if NSR_CG_NEED_FD
+ local->fd = fd_ref(fd)
+#else
+ local->fd = NULL
+#endif
+ frame->local = local;
+
+ if (xdata) {
+ from_leader = !!dict_get(xdata,NSR_TERM_XATTR);
+ from_recon = !!dict_get(xdata,RECON_TERM_XATTR)
+ && !!dict_get(xdata,RECON_INDEX_XATTR);
+ }
+ else {
+ from_leader = from_recon = _gf_false;
+ }
+
+ // follower/recon path
+ // just send it to local node
+ if (from_leader || from_recon) {
+ STACK_WIND (frame, nsr_fallocate_complete,
+ FIRST_CHILD(this), FIRST_CHILD(this)->fops->fallocate,
+ fd, keep_size, offset, len, xdata);
+ return 0;
+ }
+
+ if (!priv->leader || priv->fence_io) {
+ op_errno = EREMOTE;
+ goto err;
+ }
+
+#if NSR_CG_QUEUE
+ nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this,fd);
+ if (!ictx) {
+ op_errno = EIO;
+ goto err;
+ }
+ /* TBD: LOCK */
+ if (ictx->active) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "queuing request due to conflict");
+ ++(ictx->pending);
+ /* TBD: actually enqueue */
+ }
+ else {
+ ++(ictx->active);
+ }
+ /* TBD: UNLOCK */
+#endif
+
+ if (!xdata) {
+ xdata = dict_new();
+ if (!xdata) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "failed to allocate xdata");
+ goto err;
+ }
+ }
+
+ if (dict_set_int32(xdata,NSR_TERM_XATTR,priv->current_term) != 0) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "failed to set nsr-term");
+ goto err;
+ }
+
+ local->stub = fop_fallocate_stub (frame,nsr_fallocate_continue,
+ fd, keep_size, offset, len, xdata);
+ if (!local->stub) {
+ goto err;
+ }
+
+ local->call_count = priv->n_children - 1;
+ for (trav = this->children->next; trav; trav = trav->next) {
+ STACK_WIND (frame, nsr_fallocate_fan_in,
+ trav->xlator, trav->xlator->fops->fallocate,
+ fd, keep_size, offset, len, xdata);
+ }
+
+ // TBD: variable Issue count
+ return 0;
+
+err:
+ if (local) {
+ if (local->stub) {
+ call_stub_destroy(local->stub);
+ }
+ if (local->fd) {
+ fd_unref(local->fd);
+ }
+ mem_put(local);
+ }
+ STACK_UNWIND_STRICT (fallocate, frame, -1, op_errno,
+ NULL, NULL, NULL);
+ return 0;
+}
+
+/* No code emitted for fentrylk */
+
+/* No stub needed for fgetxattr */
+
+/* No cbk needed for fgetxattr */
+
+int32_t
+nsr_fgetxattr (call_frame_t *frame, xlator_t *this,
+ fd_t * fd, const char * name, dict_t * xdata)
+{
+ nsr_private_t *priv = this->private;
+ gf_boolean_t in_recon = _gf_false;
+ int32_t recon_term, recon_index;
+
+ // allow reads during reconciliation
+ // TBD: allow "dirty" reads on non-leaders
+ if (xdata &&
+ (dict_get_int32(xdata, RECON_TERM_XATTR, &recon_term) == 0) &&
+ (dict_get_int32(xdata, RECON_INDEX_XATTR, &recon_index) == 0)) {
+ in_recon = _gf_true;
+ }
+
+ if ((!priv->leader) && (in_recon == _gf_false)) {
+ goto err;
+ }
+
+ STACK_WIND (frame, default_fgetxattr_cbk,
+ FIRST_CHILD(this), FIRST_CHILD(this)->fops->fgetxattr,
+ fd, name, xdata);
+ return 0;
+
+err:
+ STACK_UNWIND_STRICT (fgetxattr, frame, -1, EREMOTE,
+ NULL, NULL);
+ return 0;
+}
+
+/* No code emitted for finodelk */
+
+/* No code emitted for flush */
+
+int32_t
+nsr_fremovexattr_complete (call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno,
+ dict_t * xdata)
+{
+#if NSR_CG_NEED_FD
+ nsr_local_t *local = frame->local;
+#endif
+
+#if NSR_CG_QUEUE
+ nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this,local->fd);
+ if (ictx) {
+ /* TBD: LOCK */
+ if (ictx->pending) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "unblocking %u requests",
+ ictx->pending);
+ /* TBD: actually dequeue */
+ ictx->pending = 0;
+ }
+ /* TBD: UNLOCK */
+ }
+#endif
+
+#if NSR_CG_FSYNC
+ nsr_mark_fd_dirty(this,local);
+#endif
+
+#if NSR_CG_NEED_FD
+ fd_unref(local->fd);
+#endif
+
+ STACK_UNWIND_STRICT (fremovexattr, frame, op_ret, op_errno,
+ xdata);
+ return 0;
+
+}
+int32_t
+nsr_fremovexattr_continue (call_frame_t *frame, xlator_t *this,
+ fd_t * fd, const char * name, dict_t * xdata)
+{
+ STACK_WIND (frame, nsr_fremovexattr_complete,
+ FIRST_CHILD(this), FIRST_CHILD(this)->fops->fremovexattr,
+ fd, name, xdata);
+ return 0;
+}
+
+int32_t
+nsr_fremovexattr_fan_in (call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno,
+ dict_t * xdata)
+{
+ nsr_local_t *local = frame->local;
+ uint8_t call_count;
+
+ gf_log (this->name, GF_LOG_TRACE,
+ "op_ret = %d, op_errno = %d\n", op_ret, op_errno);
+
+ LOCK(&frame->lock);
+ call_count = --(local->call_count);
+ UNLOCK(&frame->lock);
+
+ // TBD: variable Completion count
+ if (call_count == 0) {
+ call_resume(local->stub);
+ }
+
+ return 0;
+}
+
+int32_t
+nsr_fremovexattr (call_frame_t *frame, xlator_t *this,
+ fd_t * fd, const char * name, dict_t * xdata)
+{
+ nsr_local_t *local = NULL;
+ nsr_private_t *priv = this->private;
+ xlator_list_t *trav;
+ int op_errno = ENOMEM;
+ int from_leader;
+ int from_recon;
+
+ local = mem_get0(this->local_pool);
+ if (!local) {
+ goto err;
+ }
+#if NSR_CG_NEED_FD
+ local->fd = fd_ref(fd)
+#else
+ local->fd = NULL
+#endif
+ frame->local = local;
+
+ if (xdata) {
+ from_leader = !!dict_get(xdata,NSR_TERM_XATTR);
+ from_recon = !!dict_get(xdata,RECON_TERM_XATTR)
+ && !!dict_get(xdata,RECON_INDEX_XATTR);
+ }
+ else {
+ from_leader = from_recon = _gf_false;
+ }
+
+ // follower/recon path
+ // just send it to local node
+ if (from_leader || from_recon) {
+ STACK_WIND (frame, nsr_fremovexattr_complete,
+ FIRST_CHILD(this), FIRST_CHILD(this)->fops->fremovexattr,
+ fd, name, xdata);
+ return 0;
+ }
+
+ if (!priv->leader || priv->fence_io) {
+ op_errno = EREMOTE;
+ goto err;
+ }
+
+#if NSR_CG_QUEUE
+ nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this,fd);
+ if (!ictx) {
+ op_errno = EIO;
+ goto err;
+ }
+ /* TBD: LOCK */
+ if (ictx->active) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "queuing request due to conflict");
+ ++(ictx->pending);
+ /* TBD: actually enqueue */
+ }
+ else {
+ ++(ictx->active);
+ }
+ /* TBD: UNLOCK */
+#endif
+
+ if (!xdata) {
+ xdata = dict_new();
+ if (!xdata) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "failed to allocate xdata");
+ goto err;
+ }
+ }
+
+ if (dict_set_int32(xdata,NSR_TERM_XATTR,priv->current_term) != 0) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "failed to set nsr-term");
+ goto err;
+ }
+
+ local->stub = fop_fremovexattr_stub (frame,nsr_fremovexattr_continue,
+ fd, name, xdata);
+ if (!local->stub) {
+ goto err;
+ }
+
+ local->call_count = priv->n_children - 1;
+ for (trav = this->children->next; trav; trav = trav->next) {
+ STACK_WIND (frame, nsr_fremovexattr_fan_in,
+ trav->xlator, trav->xlator->fops->fremovexattr,
+ fd, name, xdata);
+ }
+
+ // TBD: variable Issue count
+ return 0;
+
+err:
+ if (local) {
+ if (local->stub) {
+ call_stub_destroy(local->stub);
+ }
+ if (local->fd) {
+ fd_unref(local->fd);
+ }
+ mem_put(local);
+ }
+ STACK_UNWIND_STRICT (fremovexattr, frame, -1, op_errno,
+ NULL);
+ return 0;
+}
+
+int32_t
+nsr_fsetattr_complete (call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno,
+ struct iatt * preop_stbuf, struct iatt * postop_stbuf, dict_t * xdata)
+{
+#if NSR_CG_NEED_FD
+ nsr_local_t *local = frame->local;
+#endif
+
+#if NSR_CG_QUEUE
+ nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this,local->fd);
+ if (ictx) {
+ /* TBD: LOCK */
+ if (ictx->pending) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "unblocking %u requests",
+ ictx->pending);
+ /* TBD: actually dequeue */
+ ictx->pending = 0;
+ }
+ /* TBD: UNLOCK */
+ }
+#endif
+
+#if NSR_CG_FSYNC
+ nsr_mark_fd_dirty(this,local);
+#endif
+
+#if NSR_CG_NEED_FD
+ fd_unref(local->fd);
+#endif
+
+ STACK_UNWIND_STRICT (fsetattr, frame, op_ret, op_errno,
+ preop_stbuf, postop_stbuf, xdata);
+ return 0;
+
+}
+int32_t
+nsr_fsetattr_continue (call_frame_t *frame, xlator_t *this,
+ fd_t * fd, struct iatt * stbuf, int32_t valid, dict_t * xdata)
+{
+ STACK_WIND (frame, nsr_fsetattr_complete,
+ FIRST_CHILD(this), FIRST_CHILD(this)->fops->fsetattr,
+ fd, stbuf, valid, xdata);
+ return 0;
+}
+
+int32_t
+nsr_fsetattr_fan_in (call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno,
+ struct iatt * preop_stbuf, struct iatt * postop_stbuf, dict_t * xdata)
+{
+ nsr_local_t *local = frame->local;
+ uint8_t call_count;
+
+ gf_log (this->name, GF_LOG_TRACE,
+ "op_ret = %d, op_errno = %d\n", op_ret, op_errno);
+
+ LOCK(&frame->lock);
+ call_count = --(local->call_count);
+ UNLOCK(&frame->lock);
+
+ // TBD: variable Completion count
+ if (call_count == 0) {
+ call_resume(local->stub);
+ }
+
+ return 0;
+}
+
+int32_t
+nsr_fsetattr (call_frame_t *frame, xlator_t *this,
+ fd_t * fd, struct iatt * stbuf, int32_t valid, dict_t * xdata)
+{
+ nsr_local_t *local = NULL;
+ nsr_private_t *priv = this->private;
+ xlator_list_t *trav;
+ int op_errno = ENOMEM;
+ int from_leader;
+ int from_recon;
+
+ local = mem_get0(this->local_pool);
+ if (!local) {
+ goto err;
+ }
+#if NSR_CG_NEED_FD
+ local->fd = fd_ref(fd)
+#else
+ local->fd = NULL
+#endif
+ frame->local = local;
+
+ if (xdata) {
+ from_leader = !!dict_get(xdata,NSR_TERM_XATTR);
+ from_recon = !!dict_get(xdata,RECON_TERM_XATTR)
+ && !!dict_get(xdata,RECON_INDEX_XATTR);
+ }
+ else {
+ from_leader = from_recon = _gf_false;
+ }
+
+ // follower/recon path
+ // just send it to local node
+ if (from_leader || from_recon) {
+ STACK_WIND (frame, nsr_fsetattr_complete,
+ FIRST_CHILD(this), FIRST_CHILD(this)->fops->fsetattr,
+ fd, stbuf, valid, xdata);
+ return 0;
+ }
+
+ if (!priv->leader || priv->fence_io) {
+ op_errno = EREMOTE;
+ goto err;
+ }
+
+#if NSR_CG_QUEUE
+ nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this,fd);
+ if (!ictx) {
+ op_errno = EIO;
+ goto err;
+ }
+ /* TBD: LOCK */
+ if (ictx->active) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "queuing request due to conflict");
+ ++(ictx->pending);
+ /* TBD: actually enqueue */
+ }
+ else {
+ ++(ictx->active);
+ }
+ /* TBD: UNLOCK */
+#endif
+
+ if (!xdata) {
+ xdata = dict_new();
+ if (!xdata) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "failed to allocate xdata");
+ goto err;
+ }
+ }
+
+ if (dict_set_int32(xdata,NSR_TERM_XATTR,priv->current_term) != 0) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "failed to set nsr-term");
+ goto err;
+ }
+
+ local->stub = fop_fsetattr_stub (frame,nsr_fsetattr_continue,
+ fd, stbuf, valid, xdata);
+ if (!local->stub) {
+ goto err;
+ }
+
+ local->call_count = priv->n_children - 1;
+ for (trav = this->children->next; trav; trav = trav->next) {
+ STACK_WIND (frame, nsr_fsetattr_fan_in,
+ trav->xlator, trav->xlator->fops->fsetattr,
+ fd, stbuf, valid, xdata);
+ }
+
+ // TBD: variable Issue count
+ return 0;
+
+err:
+ if (local) {
+ if (local->stub) {
+ call_stub_destroy(local->stub);
+ }
+ if (local->fd) {
+ fd_unref(local->fd);
+ }
+ mem_put(local);
+ }
+ STACK_UNWIND_STRICT (fsetattr, frame, -1, op_errno,
+ NULL, NULL, NULL);
+ return 0;
+}
+
+int32_t
+nsr_fsetxattr_complete (call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno,
+ dict_t * xdata)
+{
+#if NSR_CG_NEED_FD
+ nsr_local_t *local = frame->local;
+#endif
+
+#if NSR_CG_QUEUE
+ nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this,local->fd);
+ if (ictx) {
+ /* TBD: LOCK */
+ if (ictx->pending) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "unblocking %u requests",
+ ictx->pending);
+ /* TBD: actually dequeue */
+ ictx->pending = 0;
+ }
+ /* TBD: UNLOCK */
+ }
+#endif
+
+#if NSR_CG_FSYNC
+ nsr_mark_fd_dirty(this,local);
+#endif
+
+#if NSR_CG_NEED_FD
+ fd_unref(local->fd);
+#endif
+
+ STACK_UNWIND_STRICT (fsetxattr, frame, op_ret, op_errno,
+ xdata);
+ return 0;
+
+}
+int32_t
+nsr_fsetxattr_continue (call_frame_t *frame, xlator_t *this,
+ fd_t * fd, dict_t * dict, int32_t flags, dict_t * xdata)
+{
+ STACK_WIND (frame, nsr_fsetxattr_complete,
+ FIRST_CHILD(this), FIRST_CHILD(this)->fops->fsetxattr,
+ fd, dict, flags, xdata);
+ return 0;
+}
+
+int32_t
+nsr_fsetxattr_fan_in (call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno,
+ dict_t * xdata)
+{
+ nsr_local_t *local = frame->local;
+ uint8_t call_count;
+
+ gf_log (this->name, GF_LOG_TRACE,
+ "op_ret = %d, op_errno = %d\n", op_ret, op_errno);
+
+ LOCK(&frame->lock);
+ call_count = --(local->call_count);
+ UNLOCK(&frame->lock);
+
+ // TBD: variable Completion count
+ if (call_count == 0) {
+ call_resume(local->stub);
+ }
+
+ return 0;
+}
+
+int32_t
+nsr_fsetxattr (call_frame_t *frame, xlator_t *this,
+ fd_t * fd, dict_t * dict, int32_t flags, dict_t * xdata)
+{
+ nsr_local_t *local = NULL;
+ nsr_private_t *priv = this->private;
+ xlator_list_t *trav;
+ int op_errno = ENOMEM;
+ int from_leader;
+ int from_recon;
+
+ local = mem_get0(this->local_pool);
+ if (!local) {
+ goto err;
+ }
+#if NSR_CG_NEED_FD
+ local->fd = fd_ref(fd)
+#else
+ local->fd = NULL
+#endif
+ frame->local = local;
+
+ if (xdata) {
+ from_leader = !!dict_get(xdata,NSR_TERM_XATTR);
+ from_recon = !!dict_get(xdata,RECON_TERM_XATTR)
+ && !!dict_get(xdata,RECON_INDEX_XATTR);
+ }
+ else {
+ from_leader = from_recon = _gf_false;
+ }
+
+ // follower/recon path
+ // just send it to local node
+ if (from_leader || from_recon) {
+ STACK_WIND (frame, nsr_fsetxattr_complete,
+ FIRST_CHILD(this), FIRST_CHILD(this)->fops->fsetxattr,
+ fd, dict, flags, xdata);
+ return 0;
+ }
+
+ if (!priv->leader || priv->fence_io) {
+ op_errno = EREMOTE;
+ goto err;
+ }
+
+#if NSR_CG_QUEUE
+ nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this,fd);
+ if (!ictx) {
+ op_errno = EIO;
+ goto err;
+ }
+ /* TBD: LOCK */
+ if (ictx->active) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "queuing request due to conflict");
+ ++(ictx->pending);
+ /* TBD: actually enqueue */
+ }
+ else {
+ ++(ictx->active);
+ }
+ /* TBD: UNLOCK */
+#endif
+
+ if (!xdata) {
+ xdata = dict_new();
+ if (!xdata) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "failed to allocate xdata");
+ goto err;
+ }
+ }
+
+ if (dict_set_int32(xdata,NSR_TERM_XATTR,priv->current_term) != 0) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "failed to set nsr-term");
+ goto err;
+ }
+
+ local->stub = fop_fsetxattr_stub (frame,nsr_fsetxattr_continue,
+ fd, dict, flags, xdata);
+ if (!local->stub) {
+ goto err;
+ }
+
+ local->call_count = priv->n_children - 1;
+ for (trav = this->children->next; trav; trav = trav->next) {
+ STACK_WIND (frame, nsr_fsetxattr_fan_in,
+ trav->xlator, trav->xlator->fops->fsetxattr,
+ fd, dict, flags, xdata);
+ }
+
+ // TBD: variable Issue count
+ return 0;
+
+err:
+ if (local) {
+ if (local->stub) {
+ call_stub_destroy(local->stub);
+ }
+ if (local->fd) {
+ fd_unref(local->fd);
+ }
+ mem_put(local);
+ }
+ STACK_UNWIND_STRICT (fsetxattr, frame, -1, op_errno,
+ NULL);
+ return 0;
+}
+
+/* No stub needed for fstat */
+
+/* No cbk needed for fstat */
+
+int32_t
+nsr_fstat (call_frame_t *frame, xlator_t *this,
+ fd_t * fd, dict_t * xdata)
+{
+ nsr_private_t *priv = this->private;
+ gf_boolean_t in_recon = _gf_false;
+ int32_t recon_term, recon_index;
+
+ // allow reads during reconciliation
+ // TBD: allow "dirty" reads on non-leaders
+ if (xdata &&
+ (dict_get_int32(xdata, RECON_TERM_XATTR, &recon_term) == 0) &&
+ (dict_get_int32(xdata, RECON_INDEX_XATTR, &recon_index) == 0)) {
+ in_recon = _gf_true;
+ }
+
+ if ((!priv->leader) && (in_recon == _gf_false)) {
+ goto err;
+ }
+
+ STACK_WIND (frame, default_fstat_cbk,
+ FIRST_CHILD(this), FIRST_CHILD(this)->fops->fstat,
+ fd, xdata);
+ return 0;
+
+err:
+ STACK_UNWIND_STRICT (fstat, frame, -1, EREMOTE,
+ NULL, NULL);
+ return 0;
+}
+
+/* No code emitted for fsync */
+
+/* No code emitted for fsyncdir */
+
+int32_t
+nsr_ftruncate_complete (call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno,
+ struct iatt * prebuf, struct iatt * postbuf, dict_t * xdata)
+{
+#if NSR_CG_NEED_FD
+ nsr_local_t *local = frame->local;
+#endif
+
+#if NSR_CG_QUEUE
+ nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this,local->fd);
+ if (ictx) {
+ /* TBD: LOCK */
+ if (ictx->pending) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "unblocking %u requests",
+ ictx->pending);
+ /* TBD: actually dequeue */
+ ictx->pending = 0;
+ }
+ /* TBD: UNLOCK */
+ }
+#endif
+
+#if NSR_CG_FSYNC
+ nsr_mark_fd_dirty(this,local);
+#endif
+
+#if NSR_CG_NEED_FD
+ fd_unref(local->fd);
+#endif
+
+ STACK_UNWIND_STRICT (ftruncate, frame, op_ret, op_errno,
+ prebuf, postbuf, xdata);
+ return 0;
+
+}
+int32_t
+nsr_ftruncate_continue (call_frame_t *frame, xlator_t *this,
+ fd_t * fd, off_t offset, dict_t * xdata)
+{
+ STACK_WIND (frame, nsr_ftruncate_complete,
+ FIRST_CHILD(this), FIRST_CHILD(this)->fops->ftruncate,
+ fd, offset, xdata);
+ return 0;
+}
+
+int32_t
+nsr_ftruncate_fan_in (call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno,
+ struct iatt * prebuf, struct iatt * postbuf, dict_t * xdata)
+{
+ nsr_local_t *local = frame->local;
+ uint8_t call_count;
+
+ gf_log (this->name, GF_LOG_TRACE,
+ "op_ret = %d, op_errno = %d\n", op_ret, op_errno);
+
+ LOCK(&frame->lock);
+ call_count = --(local->call_count);
+ UNLOCK(&frame->lock);
+
+ // TBD: variable Completion count
+ if (call_count == 0) {
+ call_resume(local->stub);
+ }
+
+ return 0;
+}
+
+int32_t
+nsr_ftruncate (call_frame_t *frame, xlator_t *this,
+ fd_t * fd, off_t offset, dict_t * xdata)
+{
+ nsr_local_t *local = NULL;
+ nsr_private_t *priv = this->private;
+ xlator_list_t *trav;
+ int op_errno = ENOMEM;
+ int from_leader;
+ int from_recon;
+
+ local = mem_get0(this->local_pool);
+ if (!local) {
+ goto err;
+ }
+#if NSR_CG_NEED_FD
+ local->fd = fd_ref(fd)
+#else
+ local->fd = NULL
+#endif
+ frame->local = local;
+
+ if (xdata) {
+ from_leader = !!dict_get(xdata,NSR_TERM_XATTR);
+ from_recon = !!dict_get(xdata,RECON_TERM_XATTR)
+ && !!dict_get(xdata,RECON_INDEX_XATTR);
+ }
+ else {
+ from_leader = from_recon = _gf_false;
+ }
+
+ // follower/recon path
+ // just send it to local node
+ if (from_leader || from_recon) {
+ STACK_WIND (frame, nsr_ftruncate_complete,
+ FIRST_CHILD(this), FIRST_CHILD(this)->fops->ftruncate,
+ fd, offset, xdata);
+ return 0;
+ }
+
+ if (!priv->leader || priv->fence_io) {
+ op_errno = EREMOTE;
+ goto err;
+ }
+
+#if NSR_CG_QUEUE
+ nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this,fd);
+ if (!ictx) {
+ op_errno = EIO;
+ goto err;
+ }
+ /* TBD: LOCK */
+ if (ictx->active) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "queuing request due to conflict");
+ ++(ictx->pending);
+ /* TBD: actually enqueue */
+ }
+ else {
+ ++(ictx->active);
+ }
+ /* TBD: UNLOCK */
+#endif
+
+ if (!xdata) {
+ xdata = dict_new();
+ if (!xdata) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "failed to allocate xdata");
+ goto err;
+ }
+ }
+
+ if (dict_set_int32(xdata,NSR_TERM_XATTR,priv->current_term) != 0) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "failed to set nsr-term");
+ goto err;
+ }
+
+ local->stub = fop_ftruncate_stub (frame,nsr_ftruncate_continue,
+ fd, offset, xdata);
+ if (!local->stub) {
+ goto err;
+ }
+
+ local->call_count = priv->n_children - 1;
+ for (trav = this->children->next; trav; trav = trav->next) {
+ STACK_WIND (frame, nsr_ftruncate_fan_in,
+ trav->xlator, trav->xlator->fops->ftruncate,
+ fd, offset, xdata);
+ }
+
+ // TBD: variable Issue count
+ return 0;
+
+err:
+ if (local) {
+ if (local->stub) {
+ call_stub_destroy(local->stub);
+ }
+ if (local->fd) {
+ fd_unref(local->fd);
+ }
+ mem_put(local);
+ }
+ STACK_UNWIND_STRICT (ftruncate, frame, -1, op_errno,
+ NULL, NULL, NULL);
+ return 0;
+}
+
+int32_t
+nsr_fxattrop_complete (call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno,
+ dict_t * xattr, dict_t * xdata)
+{
+#if NSR_CG_NEED_FD
+ nsr_local_t *local = frame->local;
+#endif
+
+#if NSR_CG_QUEUE
+ nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this,local->fd);
+ if (ictx) {
+ /* TBD: LOCK */
+ if (ictx->pending) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "unblocking %u requests",
+ ictx->pending);
+ /* TBD: actually dequeue */
+ ictx->pending = 0;
+ }
+ /* TBD: UNLOCK */
+ }
+#endif
+
+#if NSR_CG_FSYNC
+ nsr_mark_fd_dirty(this,local);
+#endif
+
+#if NSR_CG_NEED_FD
+ fd_unref(local->fd);
+#endif
+
+ STACK_UNWIND_STRICT (fxattrop, frame, op_ret, op_errno,
+ xattr, xdata);
+ return 0;
+
+}
+int32_t
+nsr_fxattrop_continue (call_frame_t *frame, xlator_t *this,
+ fd_t * fd, gf_xattrop_flags_t optype, dict_t * xattr, dict_t * xdata)
+{
+ STACK_WIND (frame, nsr_fxattrop_complete,
+ FIRST_CHILD(this), FIRST_CHILD(this)->fops->fxattrop,
+ fd, optype, xattr, xdata);
+ return 0;
+}
+
+int32_t
+nsr_fxattrop_fan_in (call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno,
+ dict_t * xattr, dict_t * xdata)
+{
+ nsr_local_t *local = frame->local;
+ uint8_t call_count;
+
+ gf_log (this->name, GF_LOG_TRACE,
+ "op_ret = %d, op_errno = %d\n", op_ret, op_errno);
+
+ LOCK(&frame->lock);
+ call_count = --(local->call_count);
+ UNLOCK(&frame->lock);
+
+ // TBD: variable Completion count
+ if (call_count == 0) {
+ call_resume(local->stub);
+ }
+
+ return 0;
+}
+
+int32_t
+nsr_fxattrop (call_frame_t *frame, xlator_t *this,
+ fd_t * fd, gf_xattrop_flags_t optype, dict_t * xattr, dict_t * xdata)
+{
+ nsr_local_t *local = NULL;
+ nsr_private_t *priv = this->private;
+ xlator_list_t *trav;
+ int op_errno = ENOMEM;
+ int from_leader;
+ int from_recon;
+
+ local = mem_get0(this->local_pool);
+ if (!local) {
+ goto err;
+ }
+#if NSR_CG_NEED_FD
+ local->fd = fd_ref(fd)
+#else
+ local->fd = NULL
+#endif
+ frame->local = local;
+
+ if (xdata) {
+ from_leader = !!dict_get(xdata,NSR_TERM_XATTR);
+ from_recon = !!dict_get(xdata,RECON_TERM_XATTR)
+ && !!dict_get(xdata,RECON_INDEX_XATTR);
+ }
+ else {
+ from_leader = from_recon = _gf_false;
+ }
+
+ // follower/recon path
+ // just send it to local node
+ if (from_leader || from_recon) {
+ STACK_WIND (frame, nsr_fxattrop_complete,
+ FIRST_CHILD(this), FIRST_CHILD(this)->fops->fxattrop,
+ fd, optype, xattr, xdata);
+ return 0;
+ }
+
+ if (!priv->leader || priv->fence_io) {
+ op_errno = EREMOTE;
+ goto err;
+ }
+
+#if NSR_CG_QUEUE
+ nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this,fd);
+ if (!ictx) {
+ op_errno = EIO;
+ goto err;
+ }
+ /* TBD: LOCK */
+ if (ictx->active) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "queuing request due to conflict");
+ ++(ictx->pending);
+ /* TBD: actually enqueue */
+ }
+ else {
+ ++(ictx->active);
+ }
+ /* TBD: UNLOCK */
+#endif
+
+ if (!xdata) {
+ xdata = dict_new();
+ if (!xdata) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "failed to allocate xdata");
+ goto err;
+ }
+ }
+
+ if (dict_set_int32(xdata,NSR_TERM_XATTR,priv->current_term) != 0) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "failed to set nsr-term");
+ goto err;
+ }
+
+ local->stub = fop_fxattrop_stub (frame,nsr_fxattrop_continue,
+ fd, optype, xattr, xdata);
+ if (!local->stub) {
+ goto err;
+ }
+
+ local->call_count = priv->n_children - 1;
+ for (trav = this->children->next; trav; trav = trav->next) {
+ STACK_WIND (frame, nsr_fxattrop_fan_in,
+ trav->xlator, trav->xlator->fops->fxattrop,
+ fd, optype, xattr, xdata);
+ }
+
+ // TBD: variable Issue count
+ return 0;
+
+err:
+ if (local) {
+ if (local->stub) {
+ call_stub_destroy(local->stub);
+ }
+ if (local->fd) {
+ fd_unref(local->fd);
+ }
+ mem_put(local);
+ }
+ STACK_UNWIND_STRICT (fxattrop, frame, -1, op_errno,
+ NULL, NULL);
+ return 0;
+}
+
+/* No code emitted for getspec */
+
+/* No stub needed for getxattr */
+
+/* No cbk needed for getxattr */
+
+int32_t
+nsr_getxattr (call_frame_t *frame, xlator_t *this,
+ loc_t * loc, const char * name, dict_t * xdata)
+{
+ nsr_private_t *priv = this->private;
+ gf_boolean_t in_recon = _gf_false;
+ int32_t recon_term, recon_index;
+
+ // allow reads during reconciliation
+ // TBD: allow "dirty" reads on non-leaders
+ if (xdata &&
+ (dict_get_int32(xdata, RECON_TERM_XATTR, &recon_term) == 0) &&
+ (dict_get_int32(xdata, RECON_INDEX_XATTR, &recon_index) == 0)) {
+ in_recon = _gf_true;
+ }
+
+ if ((!priv->leader) && (in_recon == _gf_false)) {
+ goto err;
+ }
+
+ STACK_WIND (frame, default_getxattr_cbk,
+ FIRST_CHILD(this), FIRST_CHILD(this)->fops->getxattr,
+ loc, name, xdata);
+ return 0;
+
+err:
+ STACK_UNWIND_STRICT (getxattr, frame, -1, EREMOTE,
+ NULL, NULL);
+ return 0;
+}
+
+/* No code emitted for inodelk */
+
+int32_t
+nsr_link_complete (call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno,
+ inode_t * inode, struct iatt * buf, struct iatt * preparent, struct iatt * postparent, dict_t * xdata)
+{
+#if NSR_CG_NEED_FD
+ nsr_local_t *local = frame->local;
+#endif
+
+#if NSR_CG_QUEUE
+ nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this,local->fd);
+ if (ictx) {
+ /* TBD: LOCK */
+ if (ictx->pending) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "unblocking %u requests",
+ ictx->pending);
+ /* TBD: actually dequeue */
+ ictx->pending = 0;
+ }
+ /* TBD: UNLOCK */
+ }
+#endif
+
+#if NSR_CG_FSYNC
+ nsr_mark_fd_dirty(this,local);
+#endif
+
+#if NSR_CG_NEED_FD
+ fd_unref(local->fd);
+#endif
+
+ STACK_UNWIND_STRICT (link, frame, op_ret, op_errno,
+ inode, buf, preparent, postparent, xdata);
+ return 0;
+
+}
+int32_t
+nsr_link_continue (call_frame_t *frame, xlator_t *this,
+ loc_t * oldloc, loc_t * newloc, dict_t * xdata)
+{
+ STACK_WIND (frame, nsr_link_complete,
+ FIRST_CHILD(this), FIRST_CHILD(this)->fops->link,
+ oldloc, newloc, xdata);
+ return 0;
+}
+
+int32_t
+nsr_link_fan_in (call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno,
+ inode_t * inode, struct iatt * buf, struct iatt * preparent, struct iatt * postparent, dict_t * xdata)
+{
+ nsr_local_t *local = frame->local;
+ uint8_t call_count;
+
+ gf_log (this->name, GF_LOG_TRACE,
+ "op_ret = %d, op_errno = %d\n", op_ret, op_errno);
+
+ LOCK(&frame->lock);
+ call_count = --(local->call_count);
+ UNLOCK(&frame->lock);
+
+ // TBD: variable Completion count
+ if (call_count == 0) {
+ call_resume(local->stub);
+ }
+
+ return 0;
+}
+
+int32_t
+nsr_link (call_frame_t *frame, xlator_t *this,
+ loc_t * oldloc, loc_t * newloc, dict_t * xdata)
+{
+ nsr_local_t *local = NULL;
+ nsr_private_t *priv = this->private;
+ xlator_list_t *trav;
+ int op_errno = ENOMEM;
+ int from_leader;
+ int from_recon;
+
+ local = mem_get0(this->local_pool);
+ if (!local) {
+ goto err;
+ }
+#if NSR_CG_NEED_FD
+ local->fd = fd_ref(fd)
+#else
+ local->fd = NULL
+#endif
+ frame->local = local;
+
+ if (xdata) {
+ from_leader = !!dict_get(xdata,NSR_TERM_XATTR);
+ from_recon = !!dict_get(xdata,RECON_TERM_XATTR)
+ && !!dict_get(xdata,RECON_INDEX_XATTR);
+ }
+ else {
+ from_leader = from_recon = _gf_false;
+ }
+
+ // follower/recon path
+ // just send it to local node
+ if (from_leader || from_recon) {
+ STACK_WIND (frame, nsr_link_complete,
+ FIRST_CHILD(this), FIRST_CHILD(this)->fops->link,
+ oldloc, newloc, xdata);
+ return 0;
+ }
+
+ if (!priv->leader || priv->fence_io) {
+ op_errno = EREMOTE;
+ goto err;
+ }
+
+#if NSR_CG_QUEUE
+ nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this,fd);
+ if (!ictx) {
+ op_errno = EIO;
+ goto err;
+ }
+ /* TBD: LOCK */
+ if (ictx->active) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "queuing request due to conflict");
+ ++(ictx->pending);
+ /* TBD: actually enqueue */
+ }
+ else {
+ ++(ictx->active);
+ }
+ /* TBD: UNLOCK */
+#endif
+
+ if (!xdata) {
+ xdata = dict_new();
+ if (!xdata) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "failed to allocate xdata");
+ goto err;
+ }
+ }
+
+ if (dict_set_int32(xdata,NSR_TERM_XATTR,priv->current_term) != 0) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "failed to set nsr-term");
+ goto err;
+ }
+
+ local->stub = fop_link_stub (frame,nsr_link_continue,
+ oldloc, newloc, xdata);
+ if (!local->stub) {
+ goto err;
+ }
+
+ local->call_count = priv->n_children - 1;
+ for (trav = this->children->next; trav; trav = trav->next) {
+ STACK_WIND (frame, nsr_link_fan_in,
+ trav->xlator, trav->xlator->fops->link,
+ oldloc, newloc, xdata);
+ }
+
+ // TBD: variable Issue count
+ return 0;
+
+err:
+ if (local) {
+ if (local->stub) {
+ call_stub_destroy(local->stub);
+ }
+ if (local->fd) {
+ fd_unref(local->fd);
+ }
+ mem_put(local);
+ }
+ STACK_UNWIND_STRICT (link, frame, -1, op_errno,
+ NULL, NULL, NULL, NULL, NULL);
+ return 0;
+}
+
+/* No code emitted for lk */
+
+/* No code emitted for lookup */
+
+int32_t
+nsr_mkdir_complete (call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno,
+ inode_t * inode, struct iatt * buf, struct iatt * preparent, struct iatt * postparent, dict_t * xdata)
+{
+#if NSR_CG_NEED_FD
+ nsr_local_t *local = frame->local;
+#endif
+
+#if NSR_CG_QUEUE
+ nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this,local->fd);
+ if (ictx) {
+ /* TBD: LOCK */
+ if (ictx->pending) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "unblocking %u requests",
+ ictx->pending);
+ /* TBD: actually dequeue */
+ ictx->pending = 0;
+ }
+ /* TBD: UNLOCK */
+ }
+#endif
+
+#if NSR_CG_FSYNC
+ nsr_mark_fd_dirty(this,local);
+#endif
+
+#if NSR_CG_NEED_FD
+ fd_unref(local->fd);
+#endif
+
+ STACK_UNWIND_STRICT (mkdir, frame, op_ret, op_errno,
+ inode, buf, preparent, postparent, xdata);
+ return 0;
+
+}
+int32_t
+nsr_mkdir_continue (call_frame_t *frame, xlator_t *this,
+ loc_t * loc, mode_t mode, mode_t umask, dict_t * xdata)
+{
+ STACK_WIND (frame, nsr_mkdir_complete,
+ FIRST_CHILD(this), FIRST_CHILD(this)->fops->mkdir,
+ loc, mode, umask, xdata);
+ return 0;
+}
+
+int32_t
+nsr_mkdir_fan_in (call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno,
+ inode_t * inode, struct iatt * buf, struct iatt * preparent, struct iatt * postparent, dict_t * xdata)
+{
+ nsr_local_t *local = frame->local;
+ uint8_t call_count;
+
+ gf_log (this->name, GF_LOG_TRACE,
+ "op_ret = %d, op_errno = %d\n", op_ret, op_errno);
+
+ LOCK(&frame->lock);
+ call_count = --(local->call_count);
+ UNLOCK(&frame->lock);
+
+ // TBD: variable Completion count
+ if (call_count == 0) {
+ call_resume(local->stub);
+ }
+
+ return 0;
+}
+
+int32_t
+nsr_mkdir (call_frame_t *frame, xlator_t *this,
+ loc_t * loc, mode_t mode, mode_t umask, dict_t * xdata)
+{
+ nsr_local_t *local = NULL;
+ nsr_private_t *priv = this->private;
+ xlator_list_t *trav;
+ int op_errno = ENOMEM;
+ int from_leader;
+ int from_recon;
+
+ local = mem_get0(this->local_pool);
+ if (!local) {
+ goto err;
+ }
+#if NSR_CG_NEED_FD
+ local->fd = fd_ref(fd)
+#else
+ local->fd = NULL
+#endif
+ frame->local = local;
+
+ if (xdata) {
+ from_leader = !!dict_get(xdata,NSR_TERM_XATTR);
+ from_recon = !!dict_get(xdata,RECON_TERM_XATTR)
+ && !!dict_get(xdata,RECON_INDEX_XATTR);
+ }
+ else {
+ from_leader = from_recon = _gf_false;
+ }
+
+ // follower/recon path
+ // just send it to local node
+ if (from_leader || from_recon) {
+ STACK_WIND (frame, nsr_mkdir_complete,
+ FIRST_CHILD(this), FIRST_CHILD(this)->fops->mkdir,
+ loc, mode, umask, xdata);
+ return 0;
+ }
+
+ if (!priv->leader || priv->fence_io) {
+ op_errno = EREMOTE;
+ goto err;
+ }
+
+#if NSR_CG_QUEUE
+ nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this,fd);
+ if (!ictx) {
+ op_errno = EIO;
+ goto err;
+ }
+ /* TBD: LOCK */
+ if (ictx->active) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "queuing request due to conflict");
+ ++(ictx->pending);
+ /* TBD: actually enqueue */
+ }
+ else {
+ ++(ictx->active);
+ }
+ /* TBD: UNLOCK */
+#endif
+
+ if (!xdata) {
+ xdata = dict_new();
+ if (!xdata) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "failed to allocate xdata");
+ goto err;
+ }
+ }
+
+ if (dict_set_int32(xdata,NSR_TERM_XATTR,priv->current_term) != 0) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "failed to set nsr-term");
+ goto err;
+ }
+
+ local->stub = fop_mkdir_stub (frame,nsr_mkdir_continue,
+ loc, mode, umask, xdata);
+ if (!local->stub) {
+ goto err;
+ }
+
+ local->call_count = priv->n_children - 1;
+ for (trav = this->children->next; trav; trav = trav->next) {
+ STACK_WIND (frame, nsr_mkdir_fan_in,
+ trav->xlator, trav->xlator->fops->mkdir,
+ loc, mode, umask, xdata);
+ }
+
+ // TBD: variable Issue count
+ return 0;
+
+err:
+ if (local) {
+ if (local->stub) {
+ call_stub_destroy(local->stub);
+ }
+ if (local->fd) {
+ fd_unref(local->fd);
+ }
+ mem_put(local);
+ }
+ STACK_UNWIND_STRICT (mkdir, frame, -1, op_errno,
+ NULL, NULL, NULL, NULL, NULL);
+ return 0;
+}
+
+int32_t
+nsr_mknod_complete (call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno,
+ inode_t * inode, struct iatt * buf, struct iatt * preparent, struct iatt * postparent, dict_t * xdata)
+{
+#if NSR_CG_NEED_FD
+ nsr_local_t *local = frame->local;
+#endif
+
+#if NSR_CG_QUEUE
+ nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this,local->fd);
+ if (ictx) {
+ /* TBD: LOCK */
+ if (ictx->pending) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "unblocking %u requests",
+ ictx->pending);
+ /* TBD: actually dequeue */
+ ictx->pending = 0;
+ }
+ /* TBD: UNLOCK */
+ }
+#endif
+
+#if NSR_CG_FSYNC
+ nsr_mark_fd_dirty(this,local);
+#endif
+
+#if NSR_CG_NEED_FD
+ fd_unref(local->fd);
+#endif
+
+ STACK_UNWIND_STRICT (mknod, frame, op_ret, op_errno,
+ inode, buf, preparent, postparent, xdata);
+ return 0;
+
+}
+int32_t
+nsr_mknod_continue (call_frame_t *frame, xlator_t *this,
+ loc_t * loc, mode_t mode, dev_t rdev, mode_t umask, dict_t * xdata)
+{
+ STACK_WIND (frame, nsr_mknod_complete,
+ FIRST_CHILD(this), FIRST_CHILD(this)->fops->mknod,
+ loc, mode, rdev, umask, xdata);
+ return 0;
+}
+
+int32_t
+nsr_mknod_fan_in (call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno,
+ inode_t * inode, struct iatt * buf, struct iatt * preparent, struct iatt * postparent, dict_t * xdata)
+{
+ nsr_local_t *local = frame->local;
+ uint8_t call_count;
+
+ gf_log (this->name, GF_LOG_TRACE,
+ "op_ret = %d, op_errno = %d\n", op_ret, op_errno);
+
+ LOCK(&frame->lock);
+ call_count = --(local->call_count);
+ UNLOCK(&frame->lock);
+
+ // TBD: variable Completion count
+ if (call_count == 0) {
+ call_resume(local->stub);
+ }
+
+ return 0;
+}
+
+int32_t
+nsr_mknod (call_frame_t *frame, xlator_t *this,
+ loc_t * loc, mode_t mode, dev_t rdev, mode_t umask, dict_t * xdata)
+{
+ nsr_local_t *local = NULL;
+ nsr_private_t *priv = this->private;
+ xlator_list_t *trav;
+ int op_errno = ENOMEM;
+ int from_leader;
+ int from_recon;
+
+ local = mem_get0(this->local_pool);
+ if (!local) {
+ goto err;
+ }
+#if NSR_CG_NEED_FD
+ local->fd = fd_ref(fd)
+#else
+ local->fd = NULL
+#endif
+ frame->local = local;
+
+ if (xdata) {
+ from_leader = !!dict_get(xdata,NSR_TERM_XATTR);
+ from_recon = !!dict_get(xdata,RECON_TERM_XATTR)
+ && !!dict_get(xdata,RECON_INDEX_XATTR);
+ }
+ else {
+ from_leader = from_recon = _gf_false;
+ }
+
+ // follower/recon path
+ // just send it to local node
+ if (from_leader || from_recon) {
+ STACK_WIND (frame, nsr_mknod_complete,
+ FIRST_CHILD(this), FIRST_CHILD(this)->fops->mknod,
+ loc, mode, rdev, umask, xdata);
+ return 0;
+ }
+
+ if (!priv->leader || priv->fence_io) {
+ op_errno = EREMOTE;
+ goto err;
+ }
+
+#if NSR_CG_QUEUE
+ nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this,fd);
+ if (!ictx) {
+ op_errno = EIO;
+ goto err;
+ }
+ /* TBD: LOCK */
+ if (ictx->active) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "queuing request due to conflict");
+ ++(ictx->pending);
+ /* TBD: actually enqueue */
+ }
+ else {
+ ++(ictx->active);
+ }
+ /* TBD: UNLOCK */
+#endif
+
+ if (!xdata) {
+ xdata = dict_new();
+ if (!xdata) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "failed to allocate xdata");
+ goto err;
+ }
+ }
+
+ if (dict_set_int32(xdata,NSR_TERM_XATTR,priv->current_term) != 0) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "failed to set nsr-term");
+ goto err;
+ }
+
+ local->stub = fop_mknod_stub (frame,nsr_mknod_continue,
+ loc, mode, rdev, umask, xdata);
+ if (!local->stub) {
+ goto err;
+ }
+
+ local->call_count = priv->n_children - 1;
+ for (trav = this->children->next; trav; trav = trav->next) {
+ STACK_WIND (frame, nsr_mknod_fan_in,
+ trav->xlator, trav->xlator->fops->mknod,
+ loc, mode, rdev, umask, xdata);
+ }
+
+ // TBD: variable Issue count
+ return 0;
+
+err:
+ if (local) {
+ if (local->stub) {
+ call_stub_destroy(local->stub);
+ }
+ if (local->fd) {
+ fd_unref(local->fd);
+ }
+ mem_put(local);
+ }
+ STACK_UNWIND_STRICT (mknod, frame, -1, op_errno,
+ NULL, NULL, NULL, NULL, NULL);
+ return 0;
+}
+
+int32_t
+nsr_open_complete (call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno,
+ fd_t * fd, dict_t * xdata)
+{
+#if NSR_CG_NEED_FD
+ nsr_local_t *local = frame->local;
+#endif
+
+#if NSR_CG_QUEUE
+ nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this,local->fd);
+ if (ictx) {
+ /* TBD: LOCK */
+ if (ictx->pending) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "unblocking %u requests",
+ ictx->pending);
+ /* TBD: actually dequeue */
+ ictx->pending = 0;
+ }
+ /* TBD: UNLOCK */
+ }
+#endif
+
+#if NSR_CG_FSYNC
+ nsr_mark_fd_dirty(this,local);
+#endif
+
+#if NSR_CG_NEED_FD
+ fd_unref(local->fd);
+#endif
+
+ STACK_UNWIND_STRICT (open, frame, op_ret, op_errno,
+ fd, xdata);
+ return 0;
+
+}
+int32_t
+nsr_open_continue (call_frame_t *frame, xlator_t *this,
+ loc_t * loc, int32_t flags, fd_t * fd, dict_t * xdata)
+{
+ STACK_WIND (frame, nsr_open_complete,
+ FIRST_CHILD(this), FIRST_CHILD(this)->fops->open,
+ loc, flags, fd, xdata);
+ return 0;
+}
+
+int32_t
+nsr_open_fan_in (call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno,
+ fd_t * fd, dict_t * xdata)
+{
+ nsr_local_t *local = frame->local;
+ uint8_t call_count;
+
+ gf_log (this->name, GF_LOG_TRACE,
+ "op_ret = %d, op_errno = %d\n", op_ret, op_errno);
+
+ LOCK(&frame->lock);
+ call_count = --(local->call_count);
+ UNLOCK(&frame->lock);
+
+ // TBD: variable Completion count
+ if (call_count == 0) {
+ call_resume(local->stub);
+ }
+
+ return 0;
+}
+
+int32_t
+nsr_open (call_frame_t *frame, xlator_t *this,
+ loc_t * loc, int32_t flags, fd_t * fd, dict_t * xdata)
+{
+ nsr_local_t *local = NULL;
+ nsr_private_t *priv = this->private;
+ xlator_list_t *trav;
+ int op_errno = ENOMEM;
+ int from_leader;
+ int from_recon;
+
+ local = mem_get0(this->local_pool);
+ if (!local) {
+ goto err;
+ }
+#if NSR_CG_NEED_FD
+ local->fd = fd_ref(fd)
+#else
+ local->fd = NULL
+#endif
+ frame->local = local;
+
+ if (xdata) {
+ from_leader = !!dict_get(xdata,NSR_TERM_XATTR);
+ from_recon = !!dict_get(xdata,RECON_TERM_XATTR)
+ && !!dict_get(xdata,RECON_INDEX_XATTR);
+ }
+ else {
+ from_leader = from_recon = _gf_false;
+ }
+
+ // follower/recon path
+ // just send it to local node
+ if (from_leader || from_recon) {
+ STACK_WIND (frame, nsr_open_complete,
+ FIRST_CHILD(this), FIRST_CHILD(this)->fops->open,
+ loc, flags, fd, xdata);
+ return 0;
+ }
+
+ if (!priv->leader || priv->fence_io) {
+ op_errno = EREMOTE;
+ goto err;
+ }
+
+#if NSR_CG_QUEUE
+ nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this,fd);
+ if (!ictx) {
+ op_errno = EIO;
+ goto err;
+ }
+ /* TBD: LOCK */
+ if (ictx->active) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "queuing request due to conflict");
+ ++(ictx->pending);
+ /* TBD: actually enqueue */
+ }
+ else {
+ ++(ictx->active);
+ }
+ /* TBD: UNLOCK */
+#endif
+
+ if (!xdata) {
+ xdata = dict_new();
+ if (!xdata) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "failed to allocate xdata");
+ goto err;
+ }
+ }
+
+ if (dict_set_int32(xdata,NSR_TERM_XATTR,priv->current_term) != 0) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "failed to set nsr-term");
+ goto err;
+ }
+
+ local->stub = fop_open_stub (frame,nsr_open_continue,
+ loc, flags, fd, xdata);
+ if (!local->stub) {
+ goto err;
+ }
+
+ local->call_count = priv->n_children - 1;
+ for (trav = this->children->next; trav; trav = trav->next) {
+ STACK_WIND (frame, nsr_open_fan_in,
+ trav->xlator, trav->xlator->fops->open,
+ loc, flags, fd, xdata);
+ }
+
+ // TBD: variable Issue count
+ return 0;
+
+err:
+ if (local) {
+ if (local->stub) {
+ call_stub_destroy(local->stub);
+ }
+ if (local->fd) {
+ fd_unref(local->fd);
+ }
+ mem_put(local);
+ }
+ STACK_UNWIND_STRICT (open, frame, -1, op_errno,
+ NULL, NULL);
+ return 0;
+}
+
+/* No stub needed for opendir */
+
+/* No cbk needed for opendir */
+
+int32_t
+nsr_opendir (call_frame_t *frame, xlator_t *this,
+ loc_t * loc, fd_t * fd, dict_t * xdata)
+{
+ nsr_private_t *priv = this->private;
+ gf_boolean_t in_recon = _gf_false;
+ int32_t recon_term, recon_index;
+
+ // allow reads during reconciliation
+ // TBD: allow "dirty" reads on non-leaders
+ if (xdata &&
+ (dict_get_int32(xdata, RECON_TERM_XATTR, &recon_term) == 0) &&
+ (dict_get_int32(xdata, RECON_INDEX_XATTR, &recon_index) == 0)) {
+ in_recon = _gf_true;
+ }
+
+ if ((!priv->leader) && (in_recon == _gf_false)) {
+ goto err;
+ }
+
+ STACK_WIND (frame, default_opendir_cbk,
+ FIRST_CHILD(this), FIRST_CHILD(this)->fops->opendir,
+ loc, fd, xdata);
+ return 0;
+
+err:
+ STACK_UNWIND_STRICT (opendir, frame, -1, EREMOTE,
+ NULL, NULL);
+ return 0;
+}
+
+/* No stub needed for rchecksum */
+
+/* No cbk needed for rchecksum */
+
+int32_t
+nsr_rchecksum (call_frame_t *frame, xlator_t *this,
+ fd_t * fd, off_t offset, int32_t len, dict_t * xdata)
+{
+ nsr_private_t *priv = this->private;
+ gf_boolean_t in_recon = _gf_false;
+ int32_t recon_term, recon_index;
+
+ // allow reads during reconciliation
+ // TBD: allow "dirty" reads on non-leaders
+ if (xdata &&
+ (dict_get_int32(xdata, RECON_TERM_XATTR, &recon_term) == 0) &&
+ (dict_get_int32(xdata, RECON_INDEX_XATTR, &recon_index) == 0)) {
+ in_recon = _gf_true;
+ }
+
+ if ((!priv->leader) && (in_recon == _gf_false)) {
+ goto err;
+ }
+
+ STACK_WIND (frame, default_rchecksum_cbk,
+ FIRST_CHILD(this), FIRST_CHILD(this)->fops->rchecksum,
+ fd, offset, len, xdata);
+ return 0;
+
+err:
+ STACK_UNWIND_STRICT (rchecksum, frame, -1, EREMOTE,
+ 0, NULL, NULL);
+ return 0;
+}
+
+/* No stub needed for readdir */
+
+/* No cbk needed for readdir */
+
+int32_t
+nsr_readdir (call_frame_t *frame, xlator_t *this,
+ fd_t * fd, size_t size, off_t offset, dict_t * xdata)
+{
+ nsr_private_t *priv = this->private;
+ gf_boolean_t in_recon = _gf_false;
+ int32_t recon_term, recon_index;
+
+ // allow reads during reconciliation
+ // TBD: allow "dirty" reads on non-leaders
+ if (xdata &&
+ (dict_get_int32(xdata, RECON_TERM_XATTR, &recon_term) == 0) &&
+ (dict_get_int32(xdata, RECON_INDEX_XATTR, &recon_index) == 0)) {
+ in_recon = _gf_true;
+ }
+
+ if ((!priv->leader) && (in_recon == _gf_false)) {
+ goto err;
+ }
+
+ STACK_WIND (frame, default_readdir_cbk,
+ FIRST_CHILD(this), FIRST_CHILD(this)->fops->readdir,
+ fd, size, offset, xdata);
+ return 0;
+
+err:
+ STACK_UNWIND_STRICT (readdir, frame, -1, EREMOTE,
+ NULL, NULL);
+ return 0;
+}
+
+/* No stub needed for readdirp */
+
+/* No cbk needed for readdirp */
+
+int32_t
+nsr_readdirp (call_frame_t *frame, xlator_t *this,
+ fd_t * fd, size_t size, off_t offset, dict_t * xdata)
+{
+ nsr_private_t *priv = this->private;
+ gf_boolean_t in_recon = _gf_false;
+ int32_t recon_term, recon_index;
+
+ // allow reads during reconciliation
+ // TBD: allow "dirty" reads on non-leaders
+ if (xdata &&
+ (dict_get_int32(xdata, RECON_TERM_XATTR, &recon_term) == 0) &&
+ (dict_get_int32(xdata, RECON_INDEX_XATTR, &recon_index) == 0)) {
+ in_recon = _gf_true;
+ }
+
+ if ((!priv->leader) && (in_recon == _gf_false)) {
+ goto err;
+ }
+
+ STACK_WIND (frame, default_readdirp_cbk,
+ FIRST_CHILD(this), FIRST_CHILD(this)->fops->readdirp,
+ fd, size, offset, xdata);
+ return 0;
+
+err:
+ STACK_UNWIND_STRICT (readdirp, frame, -1, EREMOTE,
+ NULL, NULL);
+ return 0;
+}
+
+/* No stub needed for readlink */
+
+/* No cbk needed for readlink */
+
+int32_t
+nsr_readlink (call_frame_t *frame, xlator_t *this,
+ loc_t * loc, size_t size, dict_t * xdata)
+{
+ nsr_private_t *priv = this->private;
+ gf_boolean_t in_recon = _gf_false;
+ int32_t recon_term, recon_index;
+
+ // allow reads during reconciliation
+ // TBD: allow "dirty" reads on non-leaders
+ if (xdata &&
+ (dict_get_int32(xdata, RECON_TERM_XATTR, &recon_term) == 0) &&
+ (dict_get_int32(xdata, RECON_INDEX_XATTR, &recon_index) == 0)) {
+ in_recon = _gf_true;
+ }
+
+ if ((!priv->leader) && (in_recon == _gf_false)) {
+ goto err;
+ }
+
+ STACK_WIND (frame, default_readlink_cbk,
+ FIRST_CHILD(this), FIRST_CHILD(this)->fops->readlink,
+ loc, size, xdata);
+ return 0;
+
+err:
+ STACK_UNWIND_STRICT (readlink, frame, -1, EREMOTE,
+ NULL, NULL, NULL);
+ return 0;
+}
+
+/* No stub needed for readv */
+
+/* No cbk needed for readv */
+
+int32_t
+nsr_readv (call_frame_t *frame, xlator_t *this,
+ fd_t * fd, size_t size, off_t offset, uint32_t flags, dict_t * xdata)
+{
+ nsr_private_t *priv = this->private;
+ gf_boolean_t in_recon = _gf_false;
+ int32_t recon_term, recon_index;
+
+ // allow reads during reconciliation
+ // TBD: allow "dirty" reads on non-leaders
+ if (xdata &&
+ (dict_get_int32(xdata, RECON_TERM_XATTR, &recon_term) == 0) &&
+ (dict_get_int32(xdata, RECON_INDEX_XATTR, &recon_index) == 0)) {
+ in_recon = _gf_true;
+ }
+
+ if ((!priv->leader) && (in_recon == _gf_false)) {
+ goto err;
+ }
+
+ STACK_WIND (frame, default_readv_cbk,
+ FIRST_CHILD(this), FIRST_CHILD(this)->fops->readv,
+ fd, size, offset, flags, xdata);
+ return 0;
+
+err:
+ STACK_UNWIND_STRICT (readv, frame, -1, EREMOTE,
+ NULL, 0, NULL, NULL, NULL);
+ return 0;
+}
+
+int32_t
+nsr_removexattr_complete (call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno,
+ dict_t * xdata)
+{
+#if NSR_CG_NEED_FD
+ nsr_local_t *local = frame->local;
+#endif
+
+#if NSR_CG_QUEUE
+ nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this,local->fd);
+ if (ictx) {
+ /* TBD: LOCK */
+ if (ictx->pending) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "unblocking %u requests",
+ ictx->pending);
+ /* TBD: actually dequeue */
+ ictx->pending = 0;
+ }
+ /* TBD: UNLOCK */
+ }
+#endif
+
+#if NSR_CG_FSYNC
+ nsr_mark_fd_dirty(this,local);
+#endif
+
+#if NSR_CG_NEED_FD
+ fd_unref(local->fd);
+#endif
+
+ STACK_UNWIND_STRICT (removexattr, frame, op_ret, op_errno,
+ xdata);
+ return 0;
+
+}
+int32_t
+nsr_removexattr_continue (call_frame_t *frame, xlator_t *this,
+ loc_t * loc, const char * name, dict_t * xdata)
+{
+ STACK_WIND (frame, nsr_removexattr_complete,
+ FIRST_CHILD(this), FIRST_CHILD(this)->fops->removexattr,
+ loc, name, xdata);
+ return 0;
+}
+
+int32_t
+nsr_removexattr_fan_in (call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno,
+ dict_t * xdata)
+{
+ nsr_local_t *local = frame->local;
+ uint8_t call_count;
+
+ gf_log (this->name, GF_LOG_TRACE,
+ "op_ret = %d, op_errno = %d\n", op_ret, op_errno);
+
+ LOCK(&frame->lock);
+ call_count = --(local->call_count);
+ UNLOCK(&frame->lock);
+
+ // TBD: variable Completion count
+ if (call_count == 0) {
+ call_resume(local->stub);
+ }
+
+ return 0;
+}
+
+int32_t
+nsr_removexattr (call_frame_t *frame, xlator_t *this,
+ loc_t * loc, const char * name, dict_t * xdata)
+{
+ nsr_local_t *local = NULL;
+ nsr_private_t *priv = this->private;
+ xlator_list_t *trav;
+ int op_errno = ENOMEM;
+ int from_leader;
+ int from_recon;
+
+ local = mem_get0(this->local_pool);
+ if (!local) {
+ goto err;
+ }
+#if NSR_CG_NEED_FD
+ local->fd = fd_ref(fd)
+#else
+ local->fd = NULL
+#endif
+ frame->local = local;
+
+ if (xdata) {
+ from_leader = !!dict_get(xdata,NSR_TERM_XATTR);
+ from_recon = !!dict_get(xdata,RECON_TERM_XATTR)
+ && !!dict_get(xdata,RECON_INDEX_XATTR);
+ }
+ else {
+ from_leader = from_recon = _gf_false;
+ }
+
+ // follower/recon path
+ // just send it to local node
+ if (from_leader || from_recon) {
+ STACK_WIND (frame, nsr_removexattr_complete,
+ FIRST_CHILD(this), FIRST_CHILD(this)->fops->removexattr,
+ loc, name, xdata);
+ return 0;
+ }
+
+ if (!priv->leader || priv->fence_io) {
+ op_errno = EREMOTE;
+ goto err;
+ }
+
+#if NSR_CG_QUEUE
+ nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this,fd);
+ if (!ictx) {
+ op_errno = EIO;
+ goto err;
+ }
+ /* TBD: LOCK */
+ if (ictx->active) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "queuing request due to conflict");
+ ++(ictx->pending);
+ /* TBD: actually enqueue */
+ }
+ else {
+ ++(ictx->active);
+ }
+ /* TBD: UNLOCK */
+#endif
+
+ if (!xdata) {
+ xdata = dict_new();
+ if (!xdata) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "failed to allocate xdata");
+ goto err;
+ }
+ }
+
+ if (dict_set_int32(xdata,NSR_TERM_XATTR,priv->current_term) != 0) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "failed to set nsr-term");
+ goto err;
+ }
+
+ local->stub = fop_removexattr_stub (frame,nsr_removexattr_continue,
+ loc, name, xdata);
+ if (!local->stub) {
+ goto err;
+ }
+
+ local->call_count = priv->n_children - 1;
+ for (trav = this->children->next; trav; trav = trav->next) {
+ STACK_WIND (frame, nsr_removexattr_fan_in,
+ trav->xlator, trav->xlator->fops->removexattr,
+ loc, name, xdata);
+ }
+
+ // TBD: variable Issue count
+ return 0;
+
+err:
+ if (local) {
+ if (local->stub) {
+ call_stub_destroy(local->stub);
+ }
+ if (local->fd) {
+ fd_unref(local->fd);
+ }
+ mem_put(local);
+ }
+ STACK_UNWIND_STRICT (removexattr, frame, -1, op_errno,
+ NULL);
+ return 0;
+}
+
+int32_t
+nsr_rename_complete (call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno,
+ struct iatt * buf, struct iatt * preoldparent, struct iatt * postoldparent, struct iatt * prenewparent, struct iatt * postnewparent, dict_t * xdata)
+{
+#if NSR_CG_NEED_FD
+ nsr_local_t *local = frame->local;
+#endif
+
+#if NSR_CG_QUEUE
+ nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this,local->fd);
+ if (ictx) {
+ /* TBD: LOCK */
+ if (ictx->pending) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "unblocking %u requests",
+ ictx->pending);
+ /* TBD: actually dequeue */
+ ictx->pending = 0;
+ }
+ /* TBD: UNLOCK */
+ }
+#endif
+
+#if NSR_CG_FSYNC
+ nsr_mark_fd_dirty(this,local);
+#endif
+
+#if NSR_CG_NEED_FD
+ fd_unref(local->fd);
+#endif
+
+ STACK_UNWIND_STRICT (rename, frame, op_ret, op_errno,
+ buf, preoldparent, postoldparent, prenewparent, postnewparent, xdata);
+ return 0;
+
+}
+int32_t
+nsr_rename_continue (call_frame_t *frame, xlator_t *this,
+ loc_t * oldloc, loc_t * newloc, dict_t * xdata)
+{
+ STACK_WIND (frame, nsr_rename_complete,
+ FIRST_CHILD(this), FIRST_CHILD(this)->fops->rename,
+ oldloc, newloc, xdata);
+ return 0;
+}
+
+int32_t
+nsr_rename_fan_in (call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno,
+ struct iatt * buf, struct iatt * preoldparent, struct iatt * postoldparent, struct iatt * prenewparent, struct iatt * postnewparent, dict_t * xdata)
+{
+ nsr_local_t *local = frame->local;
+ uint8_t call_count;
+
+ gf_log (this->name, GF_LOG_TRACE,
+ "op_ret = %d, op_errno = %d\n", op_ret, op_errno);
+
+ LOCK(&frame->lock);
+ call_count = --(local->call_count);
+ UNLOCK(&frame->lock);
+
+ // TBD: variable Completion count
+ if (call_count == 0) {
+ call_resume(local->stub);
+ }
+
+ return 0;
+}
+
+int32_t
+nsr_rename (call_frame_t *frame, xlator_t *this,
+ loc_t * oldloc, loc_t * newloc, dict_t * xdata)
+{
+ nsr_local_t *local = NULL;
+ nsr_private_t *priv = this->private;
+ xlator_list_t *trav;
+ int op_errno = ENOMEM;
+ int from_leader;
+ int from_recon;
+
+ local = mem_get0(this->local_pool);
+ if (!local) {
+ goto err;
+ }
+#if NSR_CG_NEED_FD
+ local->fd = fd_ref(fd)
+#else
+ local->fd = NULL
+#endif
+ frame->local = local;
+
+ if (xdata) {
+ from_leader = !!dict_get(xdata,NSR_TERM_XATTR);
+ from_recon = !!dict_get(xdata,RECON_TERM_XATTR)
+ && !!dict_get(xdata,RECON_INDEX_XATTR);
+ }
+ else {
+ from_leader = from_recon = _gf_false;
+ }
+
+ // follower/recon path
+ // just send it to local node
+ if (from_leader || from_recon) {
+ STACK_WIND (frame, nsr_rename_complete,
+ FIRST_CHILD(this), FIRST_CHILD(this)->fops->rename,
+ oldloc, newloc, xdata);
+ return 0;
+ }
+
+ if (!priv->leader || priv->fence_io) {
+ op_errno = EREMOTE;
+ goto err;
+ }
+
+#if NSR_CG_QUEUE
+ nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this,fd);
+ if (!ictx) {
+ op_errno = EIO;
+ goto err;
+ }
+ /* TBD: LOCK */
+ if (ictx->active) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "queuing request due to conflict");
+ ++(ictx->pending);
+ /* TBD: actually enqueue */
+ }
+ else {
+ ++(ictx->active);
+ }
+ /* TBD: UNLOCK */
+#endif
+
+ if (!xdata) {
+ xdata = dict_new();
+ if (!xdata) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "failed to allocate xdata");
+ goto err;
+ }
+ }
+
+ if (dict_set_int32(xdata,NSR_TERM_XATTR,priv->current_term) != 0) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "failed to set nsr-term");
+ goto err;
+ }
+
+ local->stub = fop_rename_stub (frame,nsr_rename_continue,
+ oldloc, newloc, xdata);
+ if (!local->stub) {
+ goto err;
+ }
+
+ local->call_count = priv->n_children - 1;
+ for (trav = this->children->next; trav; trav = trav->next) {
+ STACK_WIND (frame, nsr_rename_fan_in,
+ trav->xlator, trav->xlator->fops->rename,
+ oldloc, newloc, xdata);
+ }
+
+ // TBD: variable Issue count
+ return 0;
+
+err:
+ if (local) {
+ if (local->stub) {
+ call_stub_destroy(local->stub);
+ }
+ if (local->fd) {
+ fd_unref(local->fd);
+ }
+ mem_put(local);
+ }
+ STACK_UNWIND_STRICT (rename, frame, -1, op_errno,
+ NULL, NULL, NULL, NULL, NULL, NULL);
+ return 0;
+}
+
+int32_t
+nsr_rmdir_complete (call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno,
+ struct iatt * preparent, struct iatt * postparent, dict_t * xdata)
+{
+#if NSR_CG_NEED_FD
+ nsr_local_t *local = frame->local;
+#endif
+
+#if NSR_CG_QUEUE
+ nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this,local->fd);
+ if (ictx) {
+ /* TBD: LOCK */
+ if (ictx->pending) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "unblocking %u requests",
+ ictx->pending);
+ /* TBD: actually dequeue */
+ ictx->pending = 0;
+ }
+ /* TBD: UNLOCK */
+ }
+#endif
+
+#if NSR_CG_FSYNC
+ nsr_mark_fd_dirty(this,local);
+#endif
+
+#if NSR_CG_NEED_FD
+ fd_unref(local->fd);
+#endif
+
+ STACK_UNWIND_STRICT (rmdir, frame, op_ret, op_errno,
+ preparent, postparent, xdata);
+ return 0;
+
+}
+int32_t
+nsr_rmdir_continue (call_frame_t *frame, xlator_t *this,
+ loc_t * loc, int xflags, dict_t * xdata)
+{
+ STACK_WIND (frame, nsr_rmdir_complete,
+ FIRST_CHILD(this), FIRST_CHILD(this)->fops->rmdir,
+ loc, xflags, xdata);
+ return 0;
+}
+
+int32_t
+nsr_rmdir_fan_in (call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno,
+ struct iatt * preparent, struct iatt * postparent, dict_t * xdata)
+{
+ nsr_local_t *local = frame->local;
+ uint8_t call_count;
+
+ gf_log (this->name, GF_LOG_TRACE,
+ "op_ret = %d, op_errno = %d\n", op_ret, op_errno);
+
+ LOCK(&frame->lock);
+ call_count = --(local->call_count);
+ UNLOCK(&frame->lock);
+
+ // TBD: variable Completion count
+ if (call_count == 0) {
+ call_resume(local->stub);
+ }
+
+ return 0;
+}
+
+int32_t
+nsr_rmdir (call_frame_t *frame, xlator_t *this,
+ loc_t * loc, int xflags, dict_t * xdata)
+{
+ nsr_local_t *local = NULL;
+ nsr_private_t *priv = this->private;
+ xlator_list_t *trav;
+ int op_errno = ENOMEM;
+ int from_leader;
+ int from_recon;
+
+ local = mem_get0(this->local_pool);
+ if (!local) {
+ goto err;
+ }
+#if NSR_CG_NEED_FD
+ local->fd = fd_ref(fd)
+#else
+ local->fd = NULL
+#endif
+ frame->local = local;
+
+ if (xdata) {
+ from_leader = !!dict_get(xdata,NSR_TERM_XATTR);
+ from_recon = !!dict_get(xdata,RECON_TERM_XATTR)
+ && !!dict_get(xdata,RECON_INDEX_XATTR);
+ }
+ else {
+ from_leader = from_recon = _gf_false;
+ }
+
+ // follower/recon path
+ // just send it to local node
+ if (from_leader || from_recon) {
+ STACK_WIND (frame, nsr_rmdir_complete,
+ FIRST_CHILD(this), FIRST_CHILD(this)->fops->rmdir,
+ loc, xflags, xdata);
+ return 0;
+ }
+
+ if (!priv->leader || priv->fence_io) {
+ op_errno = EREMOTE;
+ goto err;
+ }
+
+#if NSR_CG_QUEUE
+ nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this,fd);
+ if (!ictx) {
+ op_errno = EIO;
+ goto err;
+ }
+ /* TBD: LOCK */
+ if (ictx->active) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "queuing request due to conflict");
+ ++(ictx->pending);
+ /* TBD: actually enqueue */
+ }
+ else {
+ ++(ictx->active);
+ }
+ /* TBD: UNLOCK */
+#endif
+
+ if (!xdata) {
+ xdata = dict_new();
+ if (!xdata) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "failed to allocate xdata");
+ goto err;
+ }
+ }
+
+ if (dict_set_int32(xdata,NSR_TERM_XATTR,priv->current_term) != 0) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "failed to set nsr-term");
+ goto err;
+ }
+
+ local->stub = fop_rmdir_stub (frame,nsr_rmdir_continue,
+ loc, xflags, xdata);
+ if (!local->stub) {
+ goto err;
+ }
+
+ local->call_count = priv->n_children - 1;
+ for (trav = this->children->next; trav; trav = trav->next) {
+ STACK_WIND (frame, nsr_rmdir_fan_in,
+ trav->xlator, trav->xlator->fops->rmdir,
+ loc, xflags, xdata);
+ }
+
+ // TBD: variable Issue count
+ return 0;
+
+err:
+ if (local) {
+ if (local->stub) {
+ call_stub_destroy(local->stub);
+ }
+ if (local->fd) {
+ fd_unref(local->fd);
+ }
+ mem_put(local);
+ }
+ STACK_UNWIND_STRICT (rmdir, frame, -1, op_errno,
+ NULL, NULL, NULL);
+ return 0;
+}
+
+int32_t
+nsr_setattr_complete (call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno,
+ struct iatt * preop_stbuf, struct iatt * postop_stbuf, dict_t * xdata)
+{
+#if NSR_CG_NEED_FD
+ nsr_local_t *local = frame->local;
+#endif
+
+#if NSR_CG_QUEUE
+ nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this,local->fd);
+ if (ictx) {
+ /* TBD: LOCK */
+ if (ictx->pending) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "unblocking %u requests",
+ ictx->pending);
+ /* TBD: actually dequeue */
+ ictx->pending = 0;
+ }
+ /* TBD: UNLOCK */
+ }
+#endif
+
+#if NSR_CG_FSYNC
+ nsr_mark_fd_dirty(this,local);
+#endif
+
+#if NSR_CG_NEED_FD
+ fd_unref(local->fd);
+#endif
+
+ STACK_UNWIND_STRICT (setattr, frame, op_ret, op_errno,
+ preop_stbuf, postop_stbuf, xdata);
+ return 0;
+
+}
+int32_t
+nsr_setattr_continue (call_frame_t *frame, xlator_t *this,
+ loc_t * loc, struct iatt * stbuf, int32_t valid, dict_t * xdata)
+{
+ STACK_WIND (frame, nsr_setattr_complete,
+ FIRST_CHILD(this), FIRST_CHILD(this)->fops->setattr,
+ loc, stbuf, valid, xdata);
+ return 0;
+}
+
+int32_t
+nsr_setattr_fan_in (call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno,
+ struct iatt * preop_stbuf, struct iatt * postop_stbuf, dict_t * xdata)
+{
+ nsr_local_t *local = frame->local;
+ uint8_t call_count;
+
+ gf_log (this->name, GF_LOG_TRACE,
+ "op_ret = %d, op_errno = %d\n", op_ret, op_errno);
+
+ LOCK(&frame->lock);
+ call_count = --(local->call_count);
+ UNLOCK(&frame->lock);
+
+ // TBD: variable Completion count
+ if (call_count == 0) {
+ call_resume(local->stub);
+ }
+
+ return 0;
+}
+
+int32_t
+nsr_setattr (call_frame_t *frame, xlator_t *this,
+ loc_t * loc, struct iatt * stbuf, int32_t valid, dict_t * xdata)
+{
+ nsr_local_t *local = NULL;
+ nsr_private_t *priv = this->private;
+ xlator_list_t *trav;
+ int op_errno = ENOMEM;
+ int from_leader;
+ int from_recon;
+
+ local = mem_get0(this->local_pool);
+ if (!local) {
+ goto err;
+ }
+#if NSR_CG_NEED_FD
+ local->fd = fd_ref(fd)
+#else
+ local->fd = NULL
+#endif
+ frame->local = local;
+
+ if (xdata) {
+ from_leader = !!dict_get(xdata,NSR_TERM_XATTR);
+ from_recon = !!dict_get(xdata,RECON_TERM_XATTR)
+ && !!dict_get(xdata,RECON_INDEX_XATTR);
+ }
+ else {
+ from_leader = from_recon = _gf_false;
+ }
+
+ // follower/recon path
+ // just send it to local node
+ if (from_leader || from_recon) {
+ STACK_WIND (frame, nsr_setattr_complete,
+ FIRST_CHILD(this), FIRST_CHILD(this)->fops->setattr,
+ loc, stbuf, valid, xdata);
+ return 0;
+ }
+
+ if (!priv->leader || priv->fence_io) {
+ op_errno = EREMOTE;
+ goto err;
+ }
+
+#if NSR_CG_QUEUE
+ nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this,fd);
+ if (!ictx) {
+ op_errno = EIO;
+ goto err;
+ }
+ /* TBD: LOCK */
+ if (ictx->active) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "queuing request due to conflict");
+ ++(ictx->pending);
+ /* TBD: actually enqueue */
+ }
+ else {
+ ++(ictx->active);
+ }
+ /* TBD: UNLOCK */
+#endif
+
+ if (!xdata) {
+ xdata = dict_new();
+ if (!xdata) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "failed to allocate xdata");
+ goto err;
+ }
+ }
+
+ if (dict_set_int32(xdata,NSR_TERM_XATTR,priv->current_term) != 0) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "failed to set nsr-term");
+ goto err;
+ }
+
+ local->stub = fop_setattr_stub (frame,nsr_setattr_continue,
+ loc, stbuf, valid, xdata);
+ if (!local->stub) {
+ goto err;
+ }
+
+ local->call_count = priv->n_children - 1;
+ for (trav = this->children->next; trav; trav = trav->next) {
+ STACK_WIND (frame, nsr_setattr_fan_in,
+ trav->xlator, trav->xlator->fops->setattr,
+ loc, stbuf, valid, xdata);
+ }
+
+ // TBD: variable Issue count
+ return 0;
+
+err:
+ if (local) {
+ if (local->stub) {
+ call_stub_destroy(local->stub);
+ }
+ if (local->fd) {
+ fd_unref(local->fd);
+ }
+ mem_put(local);
+ }
+ STACK_UNWIND_STRICT (setattr, frame, -1, op_errno,
+ NULL, NULL, NULL);
+ return 0;
+}
+
+int32_t
+nsr_setxattr_complete (call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno,
+ dict_t * xdata)
+{
+#if NSR_CG_NEED_FD
+ nsr_local_t *local = frame->local;
+#endif
+
+#if NSR_CG_QUEUE
+ nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this,local->fd);
+ if (ictx) {
+ /* TBD: LOCK */
+ if (ictx->pending) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "unblocking %u requests",
+ ictx->pending);
+ /* TBD: actually dequeue */
+ ictx->pending = 0;
+ }
+ /* TBD: UNLOCK */
+ }
+#endif
+
+#if NSR_CG_FSYNC
+ nsr_mark_fd_dirty(this,local);
+#endif
+
+#if NSR_CG_NEED_FD
+ fd_unref(local->fd);
+#endif
+
+ STACK_UNWIND_STRICT (setxattr, frame, op_ret, op_errno,
+ xdata);
+ return 0;
+
+}
+int32_t
+nsr_setxattr_continue (call_frame_t *frame, xlator_t *this,
+ loc_t * loc, dict_t * dict, int32_t flags, dict_t * xdata)
+{
+ STACK_WIND (frame, nsr_setxattr_complete,
+ FIRST_CHILD(this), FIRST_CHILD(this)->fops->setxattr,
+ loc, dict, flags, xdata);
+ return 0;
+}
+
+int32_t
+nsr_setxattr_fan_in (call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno,
+ dict_t * xdata)
+{
+ nsr_local_t *local = frame->local;
+ uint8_t call_count;
+
+ gf_log (this->name, GF_LOG_TRACE,
+ "op_ret = %d, op_errno = %d\n", op_ret, op_errno);
+
+ LOCK(&frame->lock);
+ call_count = --(local->call_count);
+ UNLOCK(&frame->lock);
+
+ // TBD: variable Completion count
+ if (call_count == 0) {
+ call_resume(local->stub);
+ }
+
+ return 0;
+}
+
+int32_t
+nsr_setxattr (call_frame_t *frame, xlator_t *this,
+ loc_t * loc, dict_t * dict, int32_t flags, dict_t * xdata)
+{
+ nsr_local_t *local = NULL;
+ nsr_private_t *priv = this->private;
+ xlator_list_t *trav;
+ int op_errno = ENOMEM;
+ int from_leader;
+ int from_recon;
+
+ local = mem_get0(this->local_pool);
+ if (!local) {
+ goto err;
+ }
+#if NSR_CG_NEED_FD
+ local->fd = fd_ref(fd)
+#else
+ local->fd = NULL
+#endif
+ frame->local = local;
+
+ if (xdata) {
+ from_leader = !!dict_get(xdata,NSR_TERM_XATTR);
+ from_recon = !!dict_get(xdata,RECON_TERM_XATTR)
+ && !!dict_get(xdata,RECON_INDEX_XATTR);
+ }
+ else {
+ from_leader = from_recon = _gf_false;
+ }
+
+ // follower/recon path
+ // just send it to local node
+ if (from_leader || from_recon) {
+ STACK_WIND (frame, nsr_setxattr_complete,
+ FIRST_CHILD(this), FIRST_CHILD(this)->fops->setxattr,
+ loc, dict, flags, xdata);
+ return 0;
+ }
+
+ if (!priv->leader || priv->fence_io) {
+ op_errno = EREMOTE;
+ goto err;
+ }
+
+#if NSR_CG_QUEUE
+ nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this,fd);
+ if (!ictx) {
+ op_errno = EIO;
+ goto err;
+ }
+ /* TBD: LOCK */
+ if (ictx->active) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "queuing request due to conflict");
+ ++(ictx->pending);
+ /* TBD: actually enqueue */
+ }
+ else {
+ ++(ictx->active);
+ }
+ /* TBD: UNLOCK */
+#endif
+
+ if (!xdata) {
+ xdata = dict_new();
+ if (!xdata) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "failed to allocate xdata");
+ goto err;
+ }
+ }
+
+ if (dict_set_int32(xdata,NSR_TERM_XATTR,priv->current_term) != 0) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "failed to set nsr-term");
+ goto err;
+ }
+
+ local->stub = fop_setxattr_stub (frame,nsr_setxattr_continue,
+ loc, dict, flags, xdata);
+ if (!local->stub) {
+ goto err;
+ }
+
+ local->call_count = priv->n_children - 1;
+ for (trav = this->children->next; trav; trav = trav->next) {
+ STACK_WIND (frame, nsr_setxattr_fan_in,
+ trav->xlator, trav->xlator->fops->setxattr,
+ loc, dict, flags, xdata);
+ }
+
+ // TBD: variable Issue count
+ return 0;
+
+err:
+ if (local) {
+ if (local->stub) {
+ call_stub_destroy(local->stub);
+ }
+ if (local->fd) {
+ fd_unref(local->fd);
+ }
+ mem_put(local);
+ }
+ STACK_UNWIND_STRICT (setxattr, frame, -1, op_errno,
+ NULL);
+ return 0;
+}
+
+/* No stub needed for stat */
+
+/* No cbk needed for stat */
+
+int32_t
+nsr_stat (call_frame_t *frame, xlator_t *this,
+ loc_t * loc, dict_t * xdata)
+{
+ nsr_private_t *priv = this->private;
+ gf_boolean_t in_recon = _gf_false;
+ int32_t recon_term, recon_index;
+
+ // allow reads during reconciliation
+ // TBD: allow "dirty" reads on non-leaders
+ if (xdata &&
+ (dict_get_int32(xdata, RECON_TERM_XATTR, &recon_term) == 0) &&
+ (dict_get_int32(xdata, RECON_INDEX_XATTR, &recon_index) == 0)) {
+ in_recon = _gf_true;
+ }
+
+ if ((!priv->leader) && (in_recon == _gf_false)) {
+ goto err;
+ }
+
+ STACK_WIND (frame, default_stat_cbk,
+ FIRST_CHILD(this), FIRST_CHILD(this)->fops->stat,
+ loc, xdata);
+ return 0;
+
+err:
+ STACK_UNWIND_STRICT (stat, frame, -1, EREMOTE,
+ NULL, NULL);
+ return 0;
+}
+
+/* No stub needed for statfs */
+
+/* No cbk needed for statfs */
+
+int32_t
+nsr_statfs (call_frame_t *frame, xlator_t *this,
+ loc_t * loc, dict_t * xdata)
+{
+ nsr_private_t *priv = this->private;
+ gf_boolean_t in_recon = _gf_false;
+ int32_t recon_term, recon_index;
+
+ // allow reads during reconciliation
+ // TBD: allow "dirty" reads on non-leaders
+ if (xdata &&
+ (dict_get_int32(xdata, RECON_TERM_XATTR, &recon_term) == 0) &&
+ (dict_get_int32(xdata, RECON_INDEX_XATTR, &recon_index) == 0)) {
+ in_recon = _gf_true;
+ }
+
+ if ((!priv->leader) && (in_recon == _gf_false)) {
+ goto err;
+ }
+
+ STACK_WIND (frame, default_statfs_cbk,
+ FIRST_CHILD(this), FIRST_CHILD(this)->fops->statfs,
+ loc, xdata);
+ return 0;
+
+err:
+ STACK_UNWIND_STRICT (statfs, frame, -1, EREMOTE,
+ NULL, NULL);
+ return 0;
+}
+
+int32_t
+nsr_symlink_complete (call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno,
+ inode_t * inode, struct iatt * buf, struct iatt * preparent, struct iatt * postparent, dict_t * xdata)
+{
+#if NSR_CG_NEED_FD
+ nsr_local_t *local = frame->local;
+#endif
+
+#if NSR_CG_QUEUE
+ nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this,local->fd);
+ if (ictx) {
+ /* TBD: LOCK */
+ if (ictx->pending) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "unblocking %u requests",
+ ictx->pending);
+ /* TBD: actually dequeue */
+ ictx->pending = 0;
+ }
+ /* TBD: UNLOCK */
+ }
+#endif
+
+#if NSR_CG_FSYNC
+ nsr_mark_fd_dirty(this,local);
+#endif
+
+#if NSR_CG_NEED_FD
+ fd_unref(local->fd);
+#endif
+
+ STACK_UNWIND_STRICT (symlink, frame, op_ret, op_errno,
+ inode, buf, preparent, postparent, xdata);
+ return 0;
+
+}
+int32_t
+nsr_symlink_continue (call_frame_t *frame, xlator_t *this,
+ const char * linkname, loc_t * loc, mode_t umask, dict_t * xdata)
+{
+ STACK_WIND (frame, nsr_symlink_complete,
+ FIRST_CHILD(this), FIRST_CHILD(this)->fops->symlink,
+ linkname, loc, umask, xdata);
+ return 0;
+}
+
+int32_t
+nsr_symlink_fan_in (call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno,
+ inode_t * inode, struct iatt * buf, struct iatt * preparent, struct iatt * postparent, dict_t * xdata)
+{
+ nsr_local_t *local = frame->local;
+ uint8_t call_count;
+
+ gf_log (this->name, GF_LOG_TRACE,
+ "op_ret = %d, op_errno = %d\n", op_ret, op_errno);
+
+ LOCK(&frame->lock);
+ call_count = --(local->call_count);
+ UNLOCK(&frame->lock);
+
+ // TBD: variable Completion count
+ if (call_count == 0) {
+ call_resume(local->stub);
+ }
+
+ return 0;
+}
+
+int32_t
+nsr_symlink (call_frame_t *frame, xlator_t *this,
+ const char * linkname, loc_t * loc, mode_t umask, dict_t * xdata)
+{
+ nsr_local_t *local = NULL;
+ nsr_private_t *priv = this->private;
+ xlator_list_t *trav;
+ int op_errno = ENOMEM;
+ int from_leader;
+ int from_recon;
+
+ local = mem_get0(this->local_pool);
+ if (!local) {
+ goto err;
+ }
+#if NSR_CG_NEED_FD
+ local->fd = fd_ref(fd)
+#else
+ local->fd = NULL
+#endif
+ frame->local = local;
+
+ if (xdata) {
+ from_leader = !!dict_get(xdata,NSR_TERM_XATTR);
+ from_recon = !!dict_get(xdata,RECON_TERM_XATTR)
+ && !!dict_get(xdata,RECON_INDEX_XATTR);
+ }
+ else {
+ from_leader = from_recon = _gf_false;
+ }
+
+ // follower/recon path
+ // just send it to local node
+ if (from_leader || from_recon) {
+ STACK_WIND (frame, nsr_symlink_complete,
+ FIRST_CHILD(this), FIRST_CHILD(this)->fops->symlink,
+ linkname, loc, umask, xdata);
+ return 0;
+ }
+
+ if (!priv->leader || priv->fence_io) {
+ op_errno = EREMOTE;
+ goto err;
+ }
+
+#if NSR_CG_QUEUE
+ nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this,fd);
+ if (!ictx) {
+ op_errno = EIO;
+ goto err;
+ }
+ /* TBD: LOCK */
+ if (ictx->active) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "queuing request due to conflict");
+ ++(ictx->pending);
+ /* TBD: actually enqueue */
+ }
+ else {
+ ++(ictx->active);
+ }
+ /* TBD: UNLOCK */
+#endif
+
+ if (!xdata) {
+ xdata = dict_new();
+ if (!xdata) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "failed to allocate xdata");
+ goto err;
+ }
+ }
+
+ if (dict_set_int32(xdata,NSR_TERM_XATTR,priv->current_term) != 0) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "failed to set nsr-term");
+ goto err;
+ }
+
+ local->stub = fop_symlink_stub (frame,nsr_symlink_continue,
+ linkname, loc, umask, xdata);
+ if (!local->stub) {
+ goto err;
+ }
+
+ local->call_count = priv->n_children - 1;
+ for (trav = this->children->next; trav; trav = trav->next) {
+ STACK_WIND (frame, nsr_symlink_fan_in,
+ trav->xlator, trav->xlator->fops->symlink,
+ linkname, loc, umask, xdata);
+ }
+
+ // TBD: variable Issue count
+ return 0;
+
+err:
+ if (local) {
+ if (local->stub) {
+ call_stub_destroy(local->stub);
+ }
+ if (local->fd) {
+ fd_unref(local->fd);
+ }
+ mem_put(local);
+ }
+ STACK_UNWIND_STRICT (symlink, frame, -1, op_errno,
+ NULL, NULL, NULL, NULL, NULL);
+ return 0;
+}
+
+int32_t
+nsr_truncate_complete (call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno,
+ struct iatt * prebuf, struct iatt * postbuf, dict_t * xdata)
+{
+#if NSR_CG_NEED_FD
+ nsr_local_t *local = frame->local;
+#endif
+
+#if NSR_CG_QUEUE
+ nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this,local->fd);
+ if (ictx) {
+ /* TBD: LOCK */
+ if (ictx->pending) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "unblocking %u requests",
+ ictx->pending);
+ /* TBD: actually dequeue */
+ ictx->pending = 0;
+ }
+ /* TBD: UNLOCK */
+ }
+#endif
+
+#if NSR_CG_FSYNC
+ nsr_mark_fd_dirty(this,local);
+#endif
+
+#if NSR_CG_NEED_FD
+ fd_unref(local->fd);
+#endif
+
+ STACK_UNWIND_STRICT (truncate, frame, op_ret, op_errno,
+ prebuf, postbuf, xdata);
+ return 0;
+
+}
+int32_t
+nsr_truncate_continue (call_frame_t *frame, xlator_t *this,
+ loc_t * loc, off_t offset, dict_t * xdata)
+{
+ STACK_WIND (frame, nsr_truncate_complete,
+ FIRST_CHILD(this), FIRST_CHILD(this)->fops->truncate,
+ loc, offset, xdata);
+ return 0;
+}
+
+int32_t
+nsr_truncate_fan_in (call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno,
+ struct iatt * prebuf, struct iatt * postbuf, dict_t * xdata)
+{
+ nsr_local_t *local = frame->local;
+ uint8_t call_count;
+
+ gf_log (this->name, GF_LOG_TRACE,
+ "op_ret = %d, op_errno = %d\n", op_ret, op_errno);
+
+ LOCK(&frame->lock);
+ call_count = --(local->call_count);
+ UNLOCK(&frame->lock);
+
+ // TBD: variable Completion count
+ if (call_count == 0) {
+ call_resume(local->stub);
+ }
+
+ return 0;
+}
+
+int32_t
+nsr_truncate (call_frame_t *frame, xlator_t *this,
+ loc_t * loc, off_t offset, dict_t * xdata)
+{
+ nsr_local_t *local = NULL;
+ nsr_private_t *priv = this->private;
+ xlator_list_t *trav;
+ int op_errno = ENOMEM;
+ int from_leader;
+ int from_recon;
+
+ local = mem_get0(this->local_pool);
+ if (!local) {
+ goto err;
+ }
+#if NSR_CG_NEED_FD
+ local->fd = fd_ref(fd)
+#else
+ local->fd = NULL
+#endif
+ frame->local = local;
+
+ if (xdata) {
+ from_leader = !!dict_get(xdata,NSR_TERM_XATTR);
+ from_recon = !!dict_get(xdata,RECON_TERM_XATTR)
+ && !!dict_get(xdata,RECON_INDEX_XATTR);
+ }
+ else {
+ from_leader = from_recon = _gf_false;
+ }
+
+ // follower/recon path
+ // just send it to local node
+ if (from_leader || from_recon) {
+ STACK_WIND (frame, nsr_truncate_complete,
+ FIRST_CHILD(this), FIRST_CHILD(this)->fops->truncate,
+ loc, offset, xdata);
+ return 0;
+ }
+
+ if (!priv->leader || priv->fence_io) {
+ op_errno = EREMOTE;
+ goto err;
+ }
+
+#if NSR_CG_QUEUE
+ nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this,fd);
+ if (!ictx) {
+ op_errno = EIO;
+ goto err;
+ }
+ /* TBD: LOCK */
+ if (ictx->active) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "queuing request due to conflict");
+ ++(ictx->pending);
+ /* TBD: actually enqueue */
+ }
+ else {
+ ++(ictx->active);
+ }
+ /* TBD: UNLOCK */
+#endif
+
+ if (!xdata) {
+ xdata = dict_new();
+ if (!xdata) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "failed to allocate xdata");
+ goto err;
+ }
+ }
+
+ if (dict_set_int32(xdata,NSR_TERM_XATTR,priv->current_term) != 0) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "failed to set nsr-term");
+ goto err;
+ }
+
+ local->stub = fop_truncate_stub (frame,nsr_truncate_continue,
+ loc, offset, xdata);
+ if (!local->stub) {
+ goto err;
+ }
+
+ local->call_count = priv->n_children - 1;
+ for (trav = this->children->next; trav; trav = trav->next) {
+ STACK_WIND (frame, nsr_truncate_fan_in,
+ trav->xlator, trav->xlator->fops->truncate,
+ loc, offset, xdata);
+ }
+
+ // TBD: variable Issue count
+ return 0;
+
+err:
+ if (local) {
+ if (local->stub) {
+ call_stub_destroy(local->stub);
+ }
+ if (local->fd) {
+ fd_unref(local->fd);
+ }
+ mem_put(local);
+ }
+ STACK_UNWIND_STRICT (truncate, frame, -1, op_errno,
+ NULL, NULL, NULL);
+ return 0;
+}
+
+int32_t
+nsr_unlink_complete (call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno,
+ struct iatt * preparent, struct iatt * postparent, dict_t * xdata)
+{
+#if NSR_CG_NEED_FD
+ nsr_local_t *local = frame->local;
+#endif
+
+#if NSR_CG_QUEUE
+ nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this,local->fd);
+ if (ictx) {
+ /* TBD: LOCK */
+ if (ictx->pending) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "unblocking %u requests",
+ ictx->pending);
+ /* TBD: actually dequeue */
+ ictx->pending = 0;
+ }
+ /* TBD: UNLOCK */
+ }
+#endif
+
+#if NSR_CG_FSYNC
+ nsr_mark_fd_dirty(this,local);
+#endif
+
+#if NSR_CG_NEED_FD
+ fd_unref(local->fd);
+#endif
+
+ STACK_UNWIND_STRICT (unlink, frame, op_ret, op_errno,
+ preparent, postparent, xdata);
+ return 0;
+
+}
+int32_t
+nsr_unlink_continue (call_frame_t *frame, xlator_t *this,
+ loc_t * loc, int xflags, dict_t * xdata)
+{
+ STACK_WIND (frame, nsr_unlink_complete,
+ FIRST_CHILD(this), FIRST_CHILD(this)->fops->unlink,
+ loc, xflags, xdata);
+ return 0;
+}
+
+int32_t
+nsr_unlink_fan_in (call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno,
+ struct iatt * preparent, struct iatt * postparent, dict_t * xdata)
+{
+ nsr_local_t *local = frame->local;
+ uint8_t call_count;
+
+ gf_log (this->name, GF_LOG_TRACE,
+ "op_ret = %d, op_errno = %d\n", op_ret, op_errno);
+
+ LOCK(&frame->lock);
+ call_count = --(local->call_count);
+ UNLOCK(&frame->lock);
+
+ // TBD: variable Completion count
+ if (call_count == 0) {
+ call_resume(local->stub);
+ }
+
+ return 0;
+}
+
+int32_t
+nsr_unlink (call_frame_t *frame, xlator_t *this,
+ loc_t * loc, int xflags, dict_t * xdata)
+{
+ nsr_local_t *local = NULL;
+ nsr_private_t *priv = this->private;
+ xlator_list_t *trav;
+ int op_errno = ENOMEM;
+ int from_leader;
+ int from_recon;
+
+ local = mem_get0(this->local_pool);
+ if (!local) {
+ goto err;
+ }
+#if NSR_CG_NEED_FD
+ local->fd = fd_ref(fd)
+#else
+ local->fd = NULL
+#endif
+ frame->local = local;
+
+ if (xdata) {
+ from_leader = !!dict_get(xdata,NSR_TERM_XATTR);
+ from_recon = !!dict_get(xdata,RECON_TERM_XATTR)
+ && !!dict_get(xdata,RECON_INDEX_XATTR);
+ }
+ else {
+ from_leader = from_recon = _gf_false;
+ }
+
+ // follower/recon path
+ // just send it to local node
+ if (from_leader || from_recon) {
+ STACK_WIND (frame, nsr_unlink_complete,
+ FIRST_CHILD(this), FIRST_CHILD(this)->fops->unlink,
+ loc, xflags, xdata);
+ return 0;
+ }
+
+ if (!priv->leader || priv->fence_io) {
+ op_errno = EREMOTE;
+ goto err;
+ }
+
+#if NSR_CG_QUEUE
+ nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this,fd);
+ if (!ictx) {
+ op_errno = EIO;
+ goto err;
+ }
+ /* TBD: LOCK */
+ if (ictx->active) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "queuing request due to conflict");
+ ++(ictx->pending);
+ /* TBD: actually enqueue */
+ }
+ else {
+ ++(ictx->active);
+ }
+ /* TBD: UNLOCK */
+#endif
+
+ if (!xdata) {
+ xdata = dict_new();
+ if (!xdata) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "failed to allocate xdata");
+ goto err;
+ }
+ }
+
+ if (dict_set_int32(xdata,NSR_TERM_XATTR,priv->current_term) != 0) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "failed to set nsr-term");
+ goto err;
+ }
+
+ local->stub = fop_unlink_stub (frame,nsr_unlink_continue,
+ loc, xflags, xdata);
+ if (!local->stub) {
+ goto err;
+ }
+
+ local->call_count = priv->n_children - 1;
+ for (trav = this->children->next; trav; trav = trav->next) {
+ STACK_WIND (frame, nsr_unlink_fan_in,
+ trav->xlator, trav->xlator->fops->unlink,
+ loc, xflags, xdata);
+ }
+
+ // TBD: variable Issue count
+ return 0;
+
+err:
+ if (local) {
+ if (local->stub) {
+ call_stub_destroy(local->stub);
+ }
+ if (local->fd) {
+ fd_unref(local->fd);
+ }
+ mem_put(local);
+ }
+ STACK_UNWIND_STRICT (unlink, frame, -1, op_errno,
+ NULL, NULL, NULL);
+ return 0;
+}
+
+#define NSR_CG_FSYNC
+#define NSR_CG_QUEUE
+#define NSR_CG_NEED_FD
+int32_t
+nsr_writev_complete (call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno,
+ struct iatt * prebuf, struct iatt * postbuf, dict_t * xdata)
+{
+#if NSR_CG_NEED_FD
+ nsr_local_t *local = frame->local;
+#endif
+
+#if NSR_CG_QUEUE
+ nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this,local->fd);
+ if (ictx) {
+ /* TBD: LOCK */
+ if (ictx->pending) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "unblocking %u requests",
+ ictx->pending);
+ /* TBD: actually dequeue */
+ ictx->pending = 0;
+ }
+ /* TBD: UNLOCK */
+ }
+#endif
+
+#if NSR_CG_FSYNC
+ nsr_mark_fd_dirty(this,local);
+#endif
+
+#if NSR_CG_NEED_FD
+ fd_unref(local->fd);
+#endif
+
+ STACK_UNWIND_STRICT (writev, frame, op_ret, op_errno,
+ prebuf, postbuf, xdata);
+ return 0;
+
+}
+int32_t
+nsr_writev_continue (call_frame_t *frame, xlator_t *this,
+ fd_t * fd, struct iovec * vector, int32_t count, off_t offset, uint32_t flags, struct iobref * iobref, dict_t * xdata)
+{
+ STACK_WIND (frame, nsr_writev_complete,
+ FIRST_CHILD(this), FIRST_CHILD(this)->fops->writev,
+ fd, vector, count, offset, flags, iobref, xdata);
+ return 0;
+}
+
+int32_t
+nsr_writev_fan_in (call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno,
+ struct iatt * prebuf, struct iatt * postbuf, dict_t * xdata)
+{
+ nsr_local_t *local = frame->local;
+ uint8_t call_count;
+
+ gf_log (this->name, GF_LOG_TRACE,
+ "op_ret = %d, op_errno = %d\n", op_ret, op_errno);
+
+ LOCK(&frame->lock);
+ call_count = --(local->call_count);
+ UNLOCK(&frame->lock);
+
+ // TBD: variable Completion count
+ if (call_count == 0) {
+ call_resume(local->stub);
+ }
+
+ return 0;
+}
+
+int32_t
+nsr_writev (call_frame_t *frame, xlator_t *this,
+ fd_t * fd, struct iovec * vector, int32_t count, off_t offset, uint32_t flags, struct iobref * iobref, dict_t * xdata)
+{
+ nsr_local_t *local = NULL;
+ nsr_private_t *priv = this->private;
+ xlator_list_t *trav;
+ int op_errno = ENOMEM;
+ int from_leader;
+ int from_recon;
+
+ local = mem_get0(this->local_pool);
+ if (!local) {
+ goto err;
+ }
+#if NSR_CG_NEED_FD
+ local->fd = fd_ref(fd)
+#else
+ local->fd = NULL
+#endif
+ frame->local = local;
+
+ if (xdata) {
+ from_leader = !!dict_get(xdata,NSR_TERM_XATTR);
+ from_recon = !!dict_get(xdata,RECON_TERM_XATTR)
+ && !!dict_get(xdata,RECON_INDEX_XATTR);
+ }
+ else {
+ from_leader = from_recon = _gf_false;
+ }
+
+ // follower/recon path
+ // just send it to local node
+ if (from_leader || from_recon) {
+ STACK_WIND (frame, nsr_writev_complete,
+ FIRST_CHILD(this), FIRST_CHILD(this)->fops->writev,
+ fd, vector, count, offset, flags, iobref, xdata);
+ return 0;
+ }
+
+ if (!priv->leader || priv->fence_io) {
+ op_errno = EREMOTE;
+ goto err;
+ }
+
+#if NSR_CG_QUEUE
+ nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this,fd);
+ if (!ictx) {
+ op_errno = EIO;
+ goto err;
+ }
+ /* TBD: LOCK */
+ if (ictx->active) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "queuing request due to conflict");
+ ++(ictx->pending);
+ /* TBD: actually enqueue */
+ }
+ else {
+ ++(ictx->active);
+ }
+ /* TBD: UNLOCK */
+#endif
+
+ if (!xdata) {
+ xdata = dict_new();
+ if (!xdata) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "failed to allocate xdata");
+ goto err;
+ }
+ }
+
+ if (dict_set_int32(xdata,NSR_TERM_XATTR,priv->current_term) != 0) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "failed to set nsr-term");
+ goto err;
+ }
+
+ local->stub = fop_writev_stub (frame,nsr_writev_continue,
+ fd, vector, count, offset, flags, iobref, xdata);
+ if (!local->stub) {
+ goto err;
+ }
+
+ local->call_count = priv->n_children - 1;
+ for (trav = this->children->next; trav; trav = trav->next) {
+ STACK_WIND (frame, nsr_writev_fan_in,
+ trav->xlator, trav->xlator->fops->writev,
+ fd, vector, count, offset, flags, iobref, xdata);
+ }
+
+ // TBD: variable Issue count
+ return 0;
+
+err:
+ if (local) {
+ if (local->stub) {
+ call_stub_destroy(local->stub);
+ }
+ if (local->fd) {
+ fd_unref(local->fd);
+ }
+ mem_put(local);
+ }
+ STACK_UNWIND_STRICT (writev, frame, -1, op_errno,
+ NULL, NULL, NULL);
+ return 0;
+}
+
+#undef NSR_CG_FSYNC
+#undef NSR_CG_QUEUE
+#undef NSR_CG_NEED_FD
+int32_t
+nsr_xattrop_complete (call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno,
+ dict_t * xattr, dict_t * xdata)
+{
+#if NSR_CG_NEED_FD
+ nsr_local_t *local = frame->local;
+#endif
+
+#if NSR_CG_QUEUE
+ nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this,local->fd);
+ if (ictx) {
+ /* TBD: LOCK */
+ if (ictx->pending) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "unblocking %u requests",
+ ictx->pending);
+ /* TBD: actually dequeue */
+ ictx->pending = 0;
+ }
+ /* TBD: UNLOCK */
+ }
+#endif
+
+#if NSR_CG_FSYNC
+ nsr_mark_fd_dirty(this,local);
+#endif
+
+#if NSR_CG_NEED_FD
+ fd_unref(local->fd);
+#endif
+
+ STACK_UNWIND_STRICT (xattrop, frame, op_ret, op_errno,
+ xattr, xdata);
+ return 0;
+
+}
+int32_t
+nsr_xattrop_continue (call_frame_t *frame, xlator_t *this,
+ loc_t * loc, gf_xattrop_flags_t optype, dict_t * xattr, dict_t * xdata)
+{
+ STACK_WIND (frame, nsr_xattrop_complete,
+ FIRST_CHILD(this), FIRST_CHILD(this)->fops->xattrop,
+ loc, optype, xattr, xdata);
+ return 0;
+}
+
+int32_t
+nsr_xattrop_fan_in (call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno,
+ dict_t * xattr, dict_t * xdata)
+{
+ nsr_local_t *local = frame->local;
+ uint8_t call_count;
+
+ gf_log (this->name, GF_LOG_TRACE,
+ "op_ret = %d, op_errno = %d\n", op_ret, op_errno);
+
+ LOCK(&frame->lock);
+ call_count = --(local->call_count);
+ UNLOCK(&frame->lock);
+
+ // TBD: variable Completion count
+ if (call_count == 0) {
+ call_resume(local->stub);
+ }
+
+ return 0;
+}
+
+int32_t
+nsr_xattrop (call_frame_t *frame, xlator_t *this,
+ loc_t * loc, gf_xattrop_flags_t optype, dict_t * xattr, dict_t * xdata)
+{
+ nsr_local_t *local = NULL;
+ nsr_private_t *priv = this->private;
+ xlator_list_t *trav;
+ int op_errno = ENOMEM;
+ int from_leader;
+ int from_recon;
+
+ local = mem_get0(this->local_pool);
+ if (!local) {
+ goto err;
+ }
+#if NSR_CG_NEED_FD
+ local->fd = fd_ref(fd)
+#else
+ local->fd = NULL
+#endif
+ frame->local = local;
+
+ if (xdata) {
+ from_leader = !!dict_get(xdata,NSR_TERM_XATTR);
+ from_recon = !!dict_get(xdata,RECON_TERM_XATTR)
+ && !!dict_get(xdata,RECON_INDEX_XATTR);
+ }
+ else {
+ from_leader = from_recon = _gf_false;
+ }
+
+ // follower/recon path
+ // just send it to local node
+ if (from_leader || from_recon) {
+ STACK_WIND (frame, nsr_xattrop_complete,
+ FIRST_CHILD(this), FIRST_CHILD(this)->fops->xattrop,
+ loc, optype, xattr, xdata);
+ return 0;
+ }
+
+ if (!priv->leader || priv->fence_io) {
+ op_errno = EREMOTE;
+ goto err;
+ }
+
+#if NSR_CG_QUEUE
+ nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this,fd);
+ if (!ictx) {
+ op_errno = EIO;
+ goto err;
+ }
+ /* TBD: LOCK */
+ if (ictx->active) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "queuing request due to conflict");
+ ++(ictx->pending);
+ /* TBD: actually enqueue */
+ }
+ else {
+ ++(ictx->active);
+ }
+ /* TBD: UNLOCK */
+#endif
+
+ if (!xdata) {
+ xdata = dict_new();
+ if (!xdata) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "failed to allocate xdata");
+ goto err;
+ }
+ }
+
+ if (dict_set_int32(xdata,NSR_TERM_XATTR,priv->current_term) != 0) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "failed to set nsr-term");
+ goto err;
+ }
+
+ local->stub = fop_xattrop_stub (frame,nsr_xattrop_continue,
+ loc, optype, xattr, xdata);
+ if (!local->stub) {
+ goto err;
+ }
+
+ local->call_count = priv->n_children - 1;
+ for (trav = this->children->next; trav; trav = trav->next) {
+ STACK_WIND (frame, nsr_xattrop_fan_in,
+ trav->xlator, trav->xlator->fops->xattrop,
+ loc, optype, xattr, xdata);
+ }
+
+ // TBD: variable Issue count
+ return 0;
+
+err:
+ if (local) {
+ if (local->stub) {
+ call_stub_destroy(local->stub);
+ }
+ if (local->fd) {
+ fd_unref(local->fd);
+ }
+ mem_put(local);
+ }
+ STACK_UNWIND_STRICT (xattrop, frame, -1, op_errno,
+ NULL, NULL);
+ return 0;
+}
+
+/* No code emitted for zerofill */
+
+struct xlator_fops fops = {
+ .access = nsr_access,
+ .create = nsr_create,
+ .discard = nsr_discard,
+ .fallocate = nsr_fallocate,
+ .fgetxattr = nsr_fgetxattr,
+ .fremovexattr = nsr_fremovexattr,
+ .fsetattr = nsr_fsetattr,
+ .fsetxattr = nsr_fsetxattr,
+ .fstat = nsr_fstat,
+ .ftruncate = nsr_ftruncate,
+ .fxattrop = nsr_fxattrop,
+ .getxattr = nsr_getxattr,
+ .link = nsr_link,
+ .mkdir = nsr_mkdir,
+ .mknod = nsr_mknod,
+ .open = nsr_open,
+ .opendir = nsr_opendir,
+ .rchecksum = nsr_rchecksum,
+ .readdir = nsr_readdir,
+ .readdirp = nsr_readdirp,
+ .readlink = nsr_readlink,
+ .readv = nsr_readv,
+ .removexattr = nsr_removexattr,
+ .rename = nsr_rename,
+ .rmdir = nsr_rmdir,
+ .setattr = nsr_setattr,
+ .setxattr = nsr_setxattr,
+ .stat = nsr_stat,
+ .statfs = nsr_statfs,
+ .symlink = nsr_symlink,
+ .truncate = nsr_truncate,
+ .unlink = nsr_unlink,
+ .writev = nsr_writev,
+ .xattrop = nsr_xattrop,
+};