summaryrefslogtreecommitdiffstats
path: root/xlators/cluster
diff options
context:
space:
mode:
Diffstat (limited to 'xlators/cluster')
-rw-r--r--xlators/cluster/stripe/src/stripe-mem-types.h2
-rw-r--r--xlators/cluster/stripe/src/stripe.c136
-rw-r--r--xlators/cluster/stripe/src/stripe.h6
3 files changed, 106 insertions, 38 deletions
diff --git a/xlators/cluster/stripe/src/stripe-mem-types.h b/xlators/cluster/stripe/src/stripe-mem-types.h
index c8781d7d761..e05ba0c29c3 100644
--- a/xlators/cluster/stripe/src/stripe-mem-types.h
+++ b/xlators/cluster/stripe/src/stripe-mem-types.h
@@ -16,7 +16,7 @@
enum gf_stripe_mem_types_ {
gf_stripe_mt_iovec = gf_common_mt_end + 1,
- gf_stripe_mt_readv_replies,
+ gf_stripe_mt_stripe_replies,
gf_stripe_mt_stripe_fd_ctx_t,
gf_stripe_mt_char,
gf_stripe_mt_int8_t,
diff --git a/xlators/cluster/stripe/src/stripe.c b/xlators/cluster/stripe/src/stripe.c
index e2176eeae66..6588a44996c 100644
--- a/xlators/cluster/stripe/src/stripe.c
+++ b/xlators/cluster/stripe/src/stripe.c
@@ -3482,8 +3482,8 @@ stripe_readv (call_frame_t *frame, xlator_t *this, fd_t *fd,
frame->local = local;
/* This is where all the vectors should be copied. */
- local->replies = GF_CALLOC (num_stripe, sizeof (struct readv_replies),
- gf_stripe_mt_readv_replies);
+ local->replies = GF_CALLOC (num_stripe, sizeof (struct stripe_replies),
+ gf_stripe_mt_stripe_replies);
if (!local->replies) {
op_errno = ENOMEM;
goto err;
@@ -3543,7 +3543,11 @@ stripe_writev_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
{
int32_t callcnt = 0;
stripe_local_t *local = NULL;
+ stripe_local_t *mlocal = NULL;
call_frame_t *prev = NULL;
+ call_frame_t *mframe = NULL;
+ struct stripe_replies *reply = NULL;
+ int32_t i = 0;
if (!this || !frame || !frame->local || !cookie) {
gf_log ("stripe", GF_LOG_DEBUG, "possible NULL deref");
@@ -3552,48 +3556,75 @@ stripe_writev_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
prev = cookie;
local = frame->local;
+ mframe = local->orig_frame;
+ mlocal = mframe->local;
LOCK(&frame->lock);
{
- callcnt = ++local->call_count;
+ callcnt = ++mlocal->call_count;
+
+ mlocal->replies[local->node_index].op_ret = op_ret;
+ mlocal->replies[local->node_index].op_errno = op_errno;
- if (op_ret == -1) {
- gf_log (this->name, GF_LOG_DEBUG,
- "%s returned error %s",
- prev->this->name, strerror (op_errno));
- local->op_errno = op_errno;
- local->op_ret = -1;
- }
if (op_ret >= 0) {
- local->op_ret += op_ret;
- local->post_buf = *postbuf;
- local->pre_buf = *prebuf;
+ mlocal->post_buf = *postbuf;
+ mlocal->pre_buf = *prebuf;
- local->prebuf_blocks += prebuf->ia_blocks;
- local->postbuf_blocks += postbuf->ia_blocks;
+ mlocal->prebuf_blocks += prebuf->ia_blocks;
+ mlocal->postbuf_blocks += postbuf->ia_blocks;
- correct_file_size(prebuf, local->fctx, prev);
- correct_file_size(postbuf, local->fctx, prev);
+ correct_file_size(prebuf, mlocal->fctx, prev);
+ correct_file_size(postbuf, mlocal->fctx, prev);
- if (local->prebuf_size < prebuf->ia_size)
- local->prebuf_size = prebuf->ia_size;
- if (local->postbuf_size < postbuf->ia_size)
- local->postbuf_size = postbuf->ia_size;
+ if (mlocal->prebuf_size < prebuf->ia_size)
+ mlocal->prebuf_size = prebuf->ia_size;
+ if (mlocal->postbuf_size < postbuf->ia_size)
+ mlocal->postbuf_size = postbuf->ia_size;
}
}
UNLOCK (&frame->lock);
- if ((callcnt == local->wind_count) && local->unwind) {
- local->pre_buf.ia_size = local->prebuf_size;
- local->pre_buf.ia_blocks = local->prebuf_blocks;
- local->post_buf.ia_size = local->postbuf_size;
- local->post_buf.ia_blocks = local->postbuf_blocks;
+ if ((callcnt == mlocal->wind_count) && mlocal->unwind) {
+ mlocal->pre_buf.ia_size = mlocal->prebuf_size;
+ mlocal->pre_buf.ia_blocks = mlocal->prebuf_blocks;
+ mlocal->post_buf.ia_size = mlocal->postbuf_size;
+ mlocal->post_buf.ia_blocks = mlocal->postbuf_blocks;
+
+ /*
+ * Only return the number of consecutively written bytes up until
+ * the first error. Only return an error if it occurs first.
+ *
+ * When a short write occurs, the application should retry at the
+ * appropriate offset, at which point we'll potentially pass back
+ * the error.
+ */
+ for (i = 0, reply = mlocal->replies; i < mlocal->wind_count;
+ i++, reply++) {
+ if (reply->op_ret == -1) {
+ gf_log(this->name, GF_LOG_DEBUG, "reply %d "
+ "returned error %s", i,
+ strerror(reply->op_errno));
+ if (!mlocal->op_ret) {
+ mlocal->op_ret = -1;
+ mlocal->op_errno = reply->op_errno;
+ }
+ break;
+ }
- STRIPE_STACK_UNWIND (writev, frame, local->op_ret,
- local->op_errno, &local->pre_buf,
- &local->post_buf, NULL);
+ mlocal->op_ret += reply->op_ret;
+
+ if (reply->op_ret < reply->requested_size)
+ break;
+ }
+
+ GF_FREE(mlocal->replies);
+
+ STRIPE_STACK_UNWIND (writev, mframe, mlocal->op_ret,
+ mlocal->op_errno, &mlocal->pre_buf,
+ &mlocal->post_buf, NULL);
}
out:
+ STRIPE_STACK_DESTROY(frame);
return 0;
}
@@ -3615,6 +3646,11 @@ stripe_writev (call_frame_t *frame, xlator_t *this, fd_t *fd,
uint64_t stripe_size = 0;
uint64_t tmp_fctx = 0;
off_t dest_offset = 0;
+ off_t rounded_start = 0;
+ off_t rounded_end = 0;
+ int32_t total_chunks = 0;
+ call_frame_t *wframe = NULL;
+ stripe_local_t *wlocal = NULL;
VALIDATE_OR_GOTO (frame, err);
VALIDATE_OR_GOTO (this, err);
@@ -3653,7 +3689,27 @@ stripe_writev (call_frame_t *frame, xlator_t *this, fd_t *fd,
goto err;
}
+ rounded_start = floor(offset, stripe_size);
+ rounded_end = roof(offset + total_size, stripe_size);
+ total_chunks = (rounded_end - rounded_start) / stripe_size;
+ local->replies = GF_CALLOC(total_chunks, sizeof(struct stripe_replies),
+ gf_stripe_mt_stripe_replies);
+ if (!local->replies) {
+ op_errno = ENOMEM;
+ goto err;
+ }
+
+ total_chunks = 0;
while (1) {
+ wframe = copy_frame(frame);
+ wlocal = mem_get0(this->local_pool);
+ if (!wlocal) {
+ op_errno = ENOMEM;
+ goto err;
+ }
+ wlocal->orig_frame = frame;
+ wframe->local = wlocal;
+
/* Send striped chunk of the vector to child
nodes appropriately. */
idx = (((offset + offset_offset) /
@@ -3681,25 +3737,37 @@ stripe_writev (call_frame_t *frame, xlator_t *this, fd_t *fd,
if (remaining_size == 0)
local->unwind = 1;
+ /*
+ * Store off the request index (with respect to the chunk of the
+ * initial offset) and the size of the request. This is required
+ * in the callback to calculate an appropriate return value in
+ * the event of a write failure in one or more requests.
+ */
+ wlocal->node_index = total_chunks;
+ local->replies[total_chunks].requested_size = fill_size;
+
+ dest_offset = offset + offset_offset;
if (fctx->stripe_coalesce)
- dest_offset = coalesced_offset(offset + offset_offset,
- local->stripe_size, fctx->stripe_count);
- else
- dest_offset = offset + offset_offset;
+ dest_offset = coalesced_offset(dest_offset,
+ local->stripe_size, fctx->stripe_count);
- STACK_WIND (frame, stripe_writev_cbk, fctx->xl_array[idx],
+ STACK_WIND (wframe, stripe_writev_cbk, fctx->xl_array[idx],
fctx->xl_array[idx]->fops->writev, fd, tmp_vec,
tmp_count, dest_offset, flags, iobref,
xdata);
GF_FREE (tmp_vec);
offset_offset += fill_size;
+ total_chunks++;
if (remaining_size == 0)
break;
}
return 0;
err:
+ if (wframe)
+ STRIPE_STACK_DESTROY(wframe);
+
STRIPE_STACK_UNWIND (writev, frame, -1, op_errno, NULL, NULL, NULL);
return 0;
}
diff --git a/xlators/cluster/stripe/src/stripe.h b/xlators/cluster/stripe/src/stripe.h
index 1b9e660c126..a440f87ba1f 100644
--- a/xlators/cluster/stripe/src/stripe.h
+++ b/xlators/cluster/stripe/src/stripe.h
@@ -106,9 +106,9 @@ struct stripe_private {
};
/**
- * Used to keep info about the replies received from fops->readv calls
+ * Used to keep info about the replies received from readv/writev calls
*/
-struct readv_replies {
+struct stripe_replies {
struct iovec *vector;
int32_t count; //count of vector
int32_t op_ret; //op_ret of readv
@@ -156,7 +156,7 @@ struct stripe_local {
blkcnt_t preparent_blocks;
blkcnt_t postparent_blocks;
- struct readv_replies *replies;
+ struct stripe_replies *replies;
struct statvfs statvfs_buf;
dir_entry_t *entry;