diff options
| author | Shehjar Tikoo <shehjart@gluster.com> | 2010-05-05 00:27:45 +0000 | 
|---|---|---|
| committer | Anand V. Avati <avati@dev.gluster.com> | 2010-05-05 04:36:05 -0700 | 
| commit | b0ed997cda2195e9178cbaa96d26976aa6dd2acf (patch) | |
| tree | 49df25d9ef9af923cb1ef18d62bf3bf7dccc3002 | |
| parent | 716f4dc26097811161cbd59c919df11593cb46af (diff) | |
posix: Support thread-safe vectored writes/reads
..by maintaining internal offsets and using pwrite/pread
instead of writev/read.
The recent io-threads change is causing concurrent writes to
a file from different io-threads. This results in a race between
two threads both of which are trying to call lseek-writev. The
lseek-writev sequence should be a critical section. This is where
pwrite syscall helps. pwrite also requires an offset to be given so
for supporting pwrites for multiple iovecs we maintain an offset
within posix for the duration of the write fop and pass this internal
offset to pwrite.
The same logic also applies to read, only difference is that we do
not need to read multiple iovecs but only provide an atomic
lseek-read sequence.
Signed-off-by: Shehjar Tikoo <shehjart@gluster.com>
Signed-off-by: Anand V. Avati <avati@dev.gluster.com>
BUG: 883 (Data corruption due to thread unsafe reads and writes)
URL: http://bugs.gluster.com/cgi-bin/bugzilla3/show_bug.cgi?id=883
| -rw-r--r-- | xlators/storage/posix/src/posix.c | 174 | 
1 files changed, 90 insertions, 84 deletions
diff --git a/xlators/storage/posix/src/posix.c b/xlators/storage/posix/src/posix.c index 4f47ed027..677aa0270 100644 --- a/xlators/storage/posix/src/posix.c +++ b/xlators/storage/posix/src/posix.c @@ -2317,17 +2317,7 @@ posix_readv (call_frame_t *frame, xlator_t *this,          }          _fd = pfd->fd; - -        op_ret = lseek (_fd, offset, SEEK_SET); -        if (op_ret == -1) { -                op_errno = errno; -                gf_log (this->name, GF_LOG_ERROR, -			"lseek(%"PRId64") failed: %s", -                        offset, strerror (op_errno)); -                goto out; -        } - -        op_ret = read (_fd, iobuf->ptr, size); +        op_ret = pread (_fd, iobuf->ptr, size, offset);          if (op_ret == -1) {                  op_errno = errno;                  gf_log (this->name, GF_LOG_ERROR, @@ -2386,6 +2376,88 @@ out:  int32_t +__posix_pwritev (int fd, struct iovec *vector, int count, off_t offset) +{ +        int32_t         op_ret = 0; +        int             idx = 0; +        int             retval = 0; +        off_t           internal_off = 0; + +        if (!vector) +                return -EFAULT; + +        internal_off = offset; +        for (idx = 0; idx < count; idx++) { +                retval = pwrite (fd, vector[idx].iov_base, vector[idx].iov_len, +                                 internal_off); +                if (retval == -1) { +                        op_ret = -errno; +                        goto err; +                } +                op_ret += retval; +                internal_off += retval; +        } + +err: +        return op_ret; +} + + +int32_t +__posix_writev (int fd, struct iovec *vector, int count, off_t startoff, +                int odirect) +{ +        int32_t         op_ret = 0; +        int             idx = 0; +        int             align = 4096; +        int             max_buf_size = 0; +        int             retval = 0; +        char            *buf = NULL; +        char            *alloc_buf = NULL; +        off_t           internal_off = 0; + +        /* Check for the O_DIRECT flag during open() */ +        if (!odirect) +                return __posix_pwritev (fd, vector, count, startoff); + +        for (idx = 0; idx < count; idx++) { +                if (max_buf_size < vector[idx].iov_len) +                        max_buf_size = vector[idx].iov_len; +        } + +        alloc_buf = GF_MALLOC (1 * (max_buf_size + align), gf_posix_mt_char); +        if (!alloc_buf) { +                op_ret = -errno; +                goto err; +        } + +        internal_off = startoff; +        for (idx = 0; idx < count; idx++) { +                /* page aligned buffer */ +                buf = ALIGN_BUF (alloc_buf, align); + +                memcpy (buf, vector[idx].iov_base, vector[idx].iov_len); + +                /* not sure whether writev works on O_DIRECT'd fd */ +                retval = pwrite (fd, buf, vector[idx].iov_len, internal_off); +                if (retval == -1) { +                        op_ret = -errno; +                        goto err; +                } + +                op_ret += retval; +                internal_off += retval; +        } + +err: +        if (alloc_buf) +                GF_FREE (alloc_buf); + +        return op_ret; +} + + +int32_t  posix_writev (call_frame_t *frame, xlator_t *this,                fd_t *fd, struct iovec *vector, int32_t count, off_t offset,                struct iobref *iobref) @@ -2399,12 +2471,6 @@ posix_writev (call_frame_t *frame, xlator_t *this,          struct iatt            postop    = {0,};          int                      ret      = -1; -        int    idx          = 0; -        int    align        = 4096; -        int    max_buf_size = 0; -        int    retval       = 0; -        char * buf          = NULL; -        char * alloc_buf    = NULL;  	uint64_t  tmp_pfd   = 0;          VALIDATE_OR_GOTO (frame, out); @@ -2437,73 +2503,16 @@ posix_writev (call_frame_t *frame, xlator_t *this,                  goto out;          } -        op_ret = lseek (_fd, offset, SEEK_SET); - -        if (op_ret == -1) { -                op_errno = errno; -                gf_log (this->name, GF_LOG_ERROR, -			"lseek(%"PRId64") on fd=%p failed: %s", -                        offset, fd, strerror (op_errno)); +        op_ret = __posix_writev (_fd, vector, count, offset, +                                 (pfd->flags & O_DIRECT)); +        if (op_ret < 0) { +                op_errno = -op_ret; +                op_ret = -1; +                gf_log (this->name, GF_LOG_ERROR, "write failed: offset %"PRIu64 +                        ", %s", offset, strerror (op_errno));                  goto out;          } -        /* Check for the O_DIRECT flag during open() */ -        if (pfd->flags & O_DIRECT) { -                /* This is O_DIRECT'd file */ -		op_ret = -1; -                for (idx = 0; idx < count; idx++) { -                        if (max_buf_size < vector[idx].iov_len) -                                max_buf_size = vector[idx].iov_len; -                } - -                alloc_buf = GF_MALLOC (1 * (max_buf_size + align), -                                    gf_posix_mt_char); -                if (!alloc_buf) { -                        op_errno = errno; -                        gf_log (this->name, GF_LOG_ERROR, -                                "Out of memory."); -                        goto out; -                } - -                for (idx = 0; idx < count; idx++) { -                        /* page aligned buffer */ -                        buf = ALIGN_BUF (alloc_buf, align); - -                        memcpy (buf, vector[idx].iov_base, -				vector[idx].iov_len); - -                        /* not sure whether writev works on O_DIRECT'd fd */ -                        retval = write (_fd, buf, vector[idx].iov_len); - -                        if (retval == -1) { -                                if (op_ret == -1) { -                                        op_errno = errno; -                                        gf_log (this->name, GF_LOG_DEBUG, -                                                "O_DIRECT enabled on fd=%p: %s", -						fd, strerror (op_errno)); -                                        goto out; -                                } - -                                break; -                        } -			if (op_ret == -1) -				op_ret = 0; -                        op_ret += retval; -                } - -        } else /* if (O_DIRECT) */ { - -                /* This is not O_DIRECT'd fd */ -                op_ret = writev (_fd, vector, count); -                if (op_ret == -1) { -                        op_errno = errno; -                        gf_log (this->name, GF_LOG_ERROR, -				"writev failed on fd=%p: %s", -                                fd, strerror (op_errno)); -                        goto out; -                } -        } -          LOCK (&priv->lock);          {                  priv->write_value    += op_ret; @@ -2533,9 +2542,6 @@ posix_writev (call_frame_t *frame, xlator_t *this,          }   out: -        if (alloc_buf) { -                GF_FREE (alloc_buf); -        }          STACK_UNWIND_STRICT (writev, frame, op_ret, op_errno, &preop, &postop);  | 
