summaryrefslogtreecommitdiffstats
path: root/xlators/features
diff options
context:
space:
mode:
Diffstat (limited to 'xlators/features')
-rw-r--r--xlators/features/Makefile.am2
-rw-r--r--xlators/features/changelog/lib/src/gf-changelog.c4
-rw-r--r--xlators/features/changelog/src/changelog-encoders.c20
-rw-r--r--xlators/features/changelog/src/changelog-encoders.h2
-rw-r--r--xlators/features/changelog/src/changelog-helpers.c4
-rw-r--r--xlators/features/changelog/src/changelog.c33
-rw-r--r--xlators/features/compress/Makefile.am3
-rw-r--r--xlators/features/compress/src/Makefile.am17
-rw-r--r--xlators/features/compress/src/cdc-helper.c547
-rw-r--r--xlators/features/compress/src/cdc-mem-types.h22
-rw-r--r--xlators/features/compress/src/cdc.c342
-rw-r--r--xlators/features/compress/src/cdc.h107
-rw-r--r--xlators/features/gfid-access/src/gfid-access.c70
-rw-r--r--xlators/features/index/src/index.c317
-rw-r--r--xlators/features/index/src/index.h14
-rw-r--r--xlators/features/locks/src/Makefile.am1
-rw-r--r--xlators/features/locks/src/clear.c5
-rw-r--r--xlators/features/locks/src/common.c152
-rw-r--r--xlators/features/locks/src/common.h51
-rw-r--r--xlators/features/locks/src/entrylk.c120
-rw-r--r--xlators/features/locks/src/inodelk.c108
-rw-r--r--xlators/features/locks/src/locks.h30
-rw-r--r--xlators/features/locks/src/posix.c182
-rw-r--r--xlators/features/marker/src/marker.c95
-rw-r--r--xlators/features/marker/src/marker.h5
-rw-r--r--xlators/features/qemu-block/Makefile.am1
-rw-r--r--xlators/features/qemu-block/src/Makefile.am155
-rw-r--r--xlators/features/qemu-block/src/bdrv-xlator.c397
-rw-r--r--xlators/features/qemu-block/src/bh-syncop.c48
-rw-r--r--xlators/features/qemu-block/src/clock-timer.c60
-rw-r--r--xlators/features/qemu-block/src/coroutine-synctask.c116
-rw-r--r--xlators/features/qemu-block/src/monitor-logging.c50
-rw-r--r--xlators/features/qemu-block/src/qb-coroutines.c662
-rw-r--r--xlators/features/qemu-block/src/qb-coroutines.h30
-rw-r--r--xlators/features/qemu-block/src/qemu-block-memory-types.h25
-rw-r--r--xlators/features/qemu-block/src/qemu-block.c1140
-rw-r--r--xlators/features/qemu-block/src/qemu-block.h109
-rw-r--r--xlators/features/quiesce/src/quiesce.c8
38 files changed, 4839 insertions, 215 deletions
diff --git a/xlators/features/Makefile.am b/xlators/features/Makefile.am
index f4e0430f4..d2f5ef192 100644
--- a/xlators/features/Makefile.am
+++ b/xlators/features/Makefile.am
@@ -1,4 +1,4 @@
SUBDIRS = locks quota read-only mac-compat quiesce marker index \
- protect changelog gfid-access $(GLUPY_SUBDIR) # trash path-converter # filter
+ protect compress changelog gfid-access $(GLUPY_SUBDIR) qemu-block # trash path-converter # filter
CLEANFILES =
diff --git a/xlators/features/changelog/lib/src/gf-changelog.c b/xlators/features/changelog/lib/src/gf-changelog.c
index 15c6e8414..ca8e373e7 100644
--- a/xlators/features/changelog/lib/src/gf-changelog.c
+++ b/xlators/features/changelog/lib/src/gf-changelog.c
@@ -483,8 +483,8 @@ gf_changelog_register (char *brick_path, char *scratch_dir,
goto cleanup;
}
- ret = pthread_create (&gfc->gfc_changelog_processor,
- NULL, gf_changelog_process, gfc);
+ ret = gf_thread_create (&gfc->gfc_changelog_processor,
+ NULL, gf_changelog_process, gfc);
if (ret) {
errn = errno;
gf_log (this->name, GF_LOG_ERROR,
diff --git a/xlators/features/changelog/src/changelog-encoders.c b/xlators/features/changelog/src/changelog-encoders.c
index c71ea9270..553eec85c 100644
--- a/xlators/features/changelog/src/changelog-encoders.c
+++ b/xlators/features/changelog/src/changelog-encoders.c
@@ -154,3 +154,23 @@ changelog_encode_binary (xlator_t *this, changelog_log_data_t *cld)
return changelog_write_change (priv, buffer, off);
}
+
+static struct changelog_encoder
+cb_encoder[] = {
+ [CHANGELOG_ENCODE_BINARY] =
+ {
+ .encoder = CHANGELOG_ENCODE_BINARY,
+ .encode = changelog_encode_binary,
+ },
+ [CHANGELOG_ENCODE_ASCII] =
+ {
+ .encoder = CHANGELOG_ENCODE_ASCII,
+ .encode = changelog_encode_ascii,
+ },
+};
+
+void
+changelog_encode_change( changelog_priv_t * priv)
+{
+ priv->ce = &cb_encoder[priv->encode_mode];
+}
diff --git a/xlators/features/changelog/src/changelog-encoders.h b/xlators/features/changelog/src/changelog-encoders.h
index 43deb1307..a3efbee05 100644
--- a/xlators/features/changelog/src/changelog-encoders.h
+++ b/xlators/features/changelog/src/changelog-encoders.h
@@ -40,5 +40,7 @@ int
changelog_encode_binary (xlator_t *, changelog_log_data_t *);
int
changelog_encode_ascii (xlator_t *, changelog_log_data_t *);
+void
+changelog_encode_change(changelog_priv_t *);
#endif /* _CHANGELOG_ENCODERS_H */
diff --git a/xlators/features/changelog/src/changelog-helpers.c b/xlators/features/changelog/src/changelog-helpers.c
index c1bb6e5fe..7ab0091b5 100644
--- a/xlators/features/changelog/src/changelog-helpers.c
+++ b/xlators/features/changelog/src/changelog-helpers.c
@@ -21,6 +21,7 @@
#include "changelog-helpers.h"
#include "changelog-mem-types.h"
+#include "changelog-encoders.h"
#include <pthread.h>
void
@@ -201,7 +202,7 @@ changelog_open (xlator_t *this,
(void) snprintf (buffer, 1024, CHANGELOG_HEADER,
CHANGELOG_VERSION_MAJOR,
CHANGELOG_VERSION_MINOR,
- priv->encode_mode);
+ priv->ce->encoder);
ret = changelog_write_change (priv, buffer, strlen (buffer));
if (ret) {
close (priv->changelog_fd);
@@ -267,6 +268,7 @@ changelog_handle_change (xlator_t *this,
int ret = 0;
if (CHANGELOG_TYPE_IS_ROLLOVER (cld->cld_type)) {
+ changelog_encode_change(priv);
ret = changelog_start_next_change (this, priv,
cld->cld_roll_time,
cld->cld_finale);
diff --git a/xlators/features/changelog/src/changelog.c b/xlators/features/changelog/src/changelog.c
index fe0643429..cea0e8c70 100644
--- a/xlators/features/changelog/src/changelog.c
+++ b/xlators/features/changelog/src/changelog.c
@@ -36,20 +36,6 @@ cb_bootstrap[] = {
},
};
-static struct changelog_encoder
-cb_encoder[] = {
- [CHANGELOG_ENCODE_BINARY] =
- {
- .encoder = CHANGELOG_ENCODE_BINARY,
- .encode = changelog_encode_binary,
- },
- [CHANGELOG_ENCODE_ASCII] =
- {
- .encoder = CHANGELOG_ENCODE_ASCII,
- .encode = changelog_encode_ascii,
- },
-};
-
/* Entry operations - TYPE III */
/**
@@ -203,7 +189,6 @@ changelog_rename (call_frame_t *frame, xlator_t *this,
loc_t *oldloc, loc_t *newloc, dict_t *xdata)
{
size_t xtra_len = 0;
- uuid_t null_uuid = {0,};
changelog_priv_t *priv = NULL;
changelog_opt_t *co = NULL;
@@ -211,7 +196,8 @@ changelog_rename (call_frame_t *frame, xlator_t *this,
CHANGELOG_NOT_ACTIVE_THEN_GOTO (frame, priv, wind);
/* 3 == fop + oldloc + newloc */
- CHANGELOG_INIT_NOCHECK (this, frame->local, NULL, null_uuid, 3);
+ CHANGELOG_INIT_NOCHECK (this, frame->local,
+ NULL, oldloc->inode->gfid, 3);
co = changelog_get_usable_buffer (frame->local);
if (!co)
@@ -1030,15 +1016,15 @@ changelog_spawn_helper_threads (xlator_t *this, changelog_priv_t *priv)
int ret = 0;
priv->cr.this = this;
- ret = pthread_create (&priv->cr.rollover_th,
- NULL, changelog_rollover, priv);
+ ret = gf_thread_create (&priv->cr.rollover_th,
+ NULL, changelog_rollover, priv);
if (ret)
goto out;
if (priv->fsync_interval) {
priv->cf.this = this;
- ret = pthread_create (&priv->cf.fsync_th,
- NULL, changelog_fsync_thread, priv);
+ ret = gf_thread_create (&priv->cf.fsync_th,
+ NULL, changelog_fsync_thread, priv);
}
if (ret)
@@ -1102,8 +1088,8 @@ changelog_spawn_notifier (xlator_t *this, changelog_priv_t *priv)
priv->cn.this = this;
priv->cn.rfd = pipe_fd[0];
- ret = pthread_create (&priv->cn.notify_th,
- NULL, changelog_notifier, priv);
+ ret = gf_thread_create (&priv->cn.notify_th,
+ NULL, changelog_notifier, priv);
out:
return ret;
@@ -1363,8 +1349,7 @@ init (xlator_t *this)
GF_OPTION_INIT ("fsync-interval", priv->fsync_interval, int32, out);
- GF_ASSERT (cb_encoder[priv->encode_mode].encoder == priv->encode_mode);
- priv->ce = &cb_encoder[priv->encode_mode];
+ changelog_encode_change(priv);
GF_ASSERT (cb_bootstrap[priv->op_mode].mode == priv->op_mode);
priv->cb = &cb_bootstrap[priv->op_mode];
diff --git a/xlators/features/compress/Makefile.am b/xlators/features/compress/Makefile.am
new file mode 100644
index 000000000..a985f42a8
--- /dev/null
+++ b/xlators/features/compress/Makefile.am
@@ -0,0 +1,3 @@
+SUBDIRS = src
+
+CLEANFILES =
diff --git a/xlators/features/compress/src/Makefile.am b/xlators/features/compress/src/Makefile.am
new file mode 100644
index 000000000..4a64b52a9
--- /dev/null
+++ b/xlators/features/compress/src/Makefile.am
@@ -0,0 +1,17 @@
+xlator_LTLIBRARIES = cdc.la
+
+xlatordir = $(libdir)/glusterfs/$(PACKAGE_VERSION)/xlator/features
+
+noinst_HEADERS = cdc.h cdc-mem-types.h
+
+cdc_la_LDFLAGS = -module -avoidversion $(LIBZ_LIBS)
+
+cdc_la_SOURCES = cdc.c cdc-helper.c
+cdc_la_LIBADD = $(top_builddir)/libglusterfs/src/libglusterfs.la
+
+AM_CPPFLAGS = $(GF_CPPFLAGS) -I$(top_srcdir)/libglusterfs/src -fPIC -D_FILE_OFFSET_BITS=64 -D_GNU_SOURCE -D$(GF_HOST_OS) \
+-shared -nostartfiles $(LIBZ_CFLAGS)
+
+AM_CFLAGS = -Wall $(GF_CFLAGS)
+
+CLEANFILES =
diff --git a/xlators/features/compress/src/cdc-helper.c b/xlators/features/compress/src/cdc-helper.c
new file mode 100644
index 000000000..54432ff45
--- /dev/null
+++ b/xlators/features/compress/src/cdc-helper.c
@@ -0,0 +1,547 @@
+/*
+ Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com>
+ This file is part of GlusterFS.
+
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
+*/
+
+#ifndef _CONFIG_H
+#define _CONFIG_H
+#include "config.h"
+#endif
+
+#include "glusterfs.h"
+#include "logging.h"
+
+#include "cdc.h"
+#include "cdc-mem-types.h"
+
+#ifdef HAVE_LIB_Z
+#include "zlib.h"
+#endif
+
+#ifdef HAVE_LIB_Z
+/* gzip header looks something like this
+ * (RFC 1950)
+ *
+ * +---+---+---+---+---+---+---+---+---+---+
+ * |ID1|ID2|CM |FLG| MTIME |XFL|OS |
+ * +---+---+---+---+---+---+---+---+---+---+
+ *
+ * Data is usually sent without this header i.e
+ * Data sent = <compressed-data> + trailer(8)
+ * The trailer contains the checksum.
+ *
+ * gzip_header is added only during debugging.
+ * Refer to the function cdc_dump_iovec_to_disk
+ */
+static const char gzip_header[10] =
+ {
+ '\037', '\213', Z_DEFLATED, 0,
+ 0, 0, 0, 0,
+ 0, GF_CDC_OS_ID
+ };
+
+static int32_t
+cdc_next_iovec (xlator_t *this, cdc_info_t *ci)
+{
+ int ret = -1;
+
+ ci->ncount++;
+ /* check for iovec overflow -- should not happen */
+ if (ci->ncount == MAX_IOVEC) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "Zlib output buffer overflow"
+ " ->ncount (%d) | ->MAX_IOVEC (%d)",
+ ci->ncount, MAX_IOVEC);
+ goto out;
+ }
+
+ ret = 0;
+
+ out:
+ return ret;
+}
+
+static void
+cdc_put_long (unsigned char *string, unsigned long x)
+{
+ string[0] = (unsigned char) (x & 0xff);
+ string[1] = (unsigned char) ((x & 0xff00) >> 8);
+ string[2] = (unsigned char) ((x & 0xff0000) >> 16);
+ string[3] = (unsigned char) ((x & 0xff000000) >> 24);
+}
+
+static unsigned long
+cdc_get_long (unsigned char *buf)
+{
+ return ((unsigned long) buf[0])
+ | (((unsigned long) buf[1]) << 8)
+ | (((unsigned long) buf[2]) << 16)
+ | (((unsigned long) buf[3]) << 24);
+}
+
+static int32_t
+cdc_init_gzip_trailer (xlator_t *this, cdc_priv_t *priv, cdc_info_t *ci)
+{
+ int ret = -1;
+ char *buf = NULL;
+
+ ret = cdc_next_iovec (this, ci);
+ if (ret)
+ goto out;
+
+ buf = CURR_VEC(ci).iov_base =
+ (char *) GF_CALLOC (1, GF_CDC_VALIDATION_SIZE,
+ gf_cdc_mt_gzip_trailer_t);
+
+ if (!CURR_VEC(ci).iov_base)
+ goto out;
+
+ CURR_VEC(ci).iov_len = GF_CDC_VALIDATION_SIZE;
+
+ cdc_put_long ((unsigned char *)&buf[0], ci->crc);
+ cdc_put_long ((unsigned char *)&buf[4], ci->stream.total_in);
+
+ ret = 0;
+
+ out:
+ return ret;
+}
+
+static int32_t
+cdc_alloc_iobuf_and_init_vec (xlator_t *this,
+ cdc_priv_t *priv, cdc_info_t *ci,
+ int size)
+{
+ int ret = -1;
+ int alloc_len = 0;
+ struct iobuf *iobuf = NULL;
+
+ ret = cdc_next_iovec (this, ci);
+ if (ret)
+ goto out;
+
+ alloc_len = size ? size : ci->buffer_size;
+
+ iobuf = iobuf_get2 (this->ctx->iobuf_pool, alloc_len);
+ if (!iobuf)
+ goto out;
+
+ ret = iobref_add (ci->iobref, iobuf);
+ if (ret)
+ goto out;
+
+ /* Initialize this iovec */
+ CURR_VEC(ci).iov_base = iobuf->ptr;
+ CURR_VEC(ci).iov_len = alloc_len;
+
+ ret = 0;
+
+ out:
+ return ret;
+}
+
+static void
+cdc_init_zlib_output_stream (cdc_priv_t *priv, cdc_info_t *ci, int size)
+{
+ ci->stream.next_out = (unsigned char *) CURR_VEC(ci).iov_base;
+ ci->stream.avail_out = size ? size : ci->buffer_size;
+}
+
+/* This routine is for testing and debugging only.
+ * Data written = header(10) + <compressed-data> + trailer(8)
+ * So each gzip dump file is at least 18 bytes in size.
+ */
+void
+cdc_dump_iovec_to_disk (xlator_t *this, cdc_info_t *ci, const char *file)
+{
+ int i = 0;
+ int fd = 0;
+ size_t writen = 0;
+ size_t total_writen = 0;
+
+ fd = open (file, O_WRONLY|O_CREAT|O_TRUNC, 0777 );
+ if (fd < 0) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "Cannot open file: %s", file);
+ return;
+ }
+
+ writen = write (fd, (char *) gzip_header, 10);
+ total_writen += writen;
+ for (i = 0; i < ci->ncount; i++) {
+ writen = write (fd, (char *) ci->vec[i].iov_base, ci->vec[i].iov_len);
+ total_writen += writen;
+ }
+
+ gf_log (this->name, GF_LOG_DEBUG,
+ "dump'd %zu bytes to %s", total_writen, GF_CDC_DEBUG_DUMP_FILE );
+
+ close (fd);
+}
+
+static int32_t
+cdc_flush_libz_buffer (cdc_priv_t *priv, xlator_t *this, cdc_info_t *ci,
+ int (*libz_func)(z_streamp, int),
+ int flush)
+{
+ int32_t ret = Z_OK;
+ int done = 0;
+ unsigned int deflate_len = 0;
+
+ for (;;) {
+ deflate_len = ci->buffer_size - ci->stream.avail_out;
+
+ if (deflate_len != 0) {
+ CURR_VEC(ci).iov_len = deflate_len;
+
+ ret = cdc_alloc_iobuf_and_init_vec (this, priv, ci, 0);
+ if (ret) {
+ ret = Z_MEM_ERROR;
+ break;
+ }
+
+ /* Re-position Zlib output buffer */
+ cdc_init_zlib_output_stream (priv, ci, 0);
+ }
+
+ if (done) {
+ ci->ncount--;
+ break;
+ }
+
+ ret = libz_func (&ci->stream, flush);
+
+ if (ret == Z_BUF_ERROR) {
+ ret = Z_OK;
+ ci->ncount--;
+ break;
+ }
+
+ done = (ci->stream.avail_out != 0 || ret == Z_STREAM_END);
+
+ if (ret != Z_OK && ret != Z_STREAM_END)
+ break;
+ }
+
+ return ret;
+}
+
+static int32_t
+do_cdc_compress (struct iovec *vec, xlator_t *this, cdc_priv_t *priv,
+ cdc_info_t *ci)
+{
+ int ret = -1;
+
+ /* Initialize defalte */
+ ret = deflateInit2 (&ci->stream, priv->cdc_level, Z_DEFLATED,
+ priv->window_size, priv->mem_level,
+ Z_DEFAULT_STRATEGY);
+
+ if (ret) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "unable to init Zlib (retval: %d)", ret);
+ goto out;
+ }
+
+ ret = cdc_alloc_iobuf_and_init_vec (this, priv, ci, 0);
+ if (ret)
+ goto out;
+
+ /* setup output buffer */
+ cdc_init_zlib_output_stream (priv, ci, 0);
+
+ /* setup input buffer */
+ ci->stream.next_in = (unsigned char *) vec->iov_base;
+ ci->stream.avail_in = vec->iov_len;
+
+ ci->crc = crc32 (ci->crc, (const Bytef *) vec->iov_base, vec->iov_len);
+
+ gf_log (this->name, GF_LOG_DEBUG, "crc=%lu len=%d buffer_size=%d",
+ ci->crc, ci->stream.avail_in, ci->buffer_size);
+
+ /* compress !! */
+ while (ci->stream.avail_in != 0) {
+ if (ci->stream.avail_out == 0) {
+
+ CURR_VEC(ci).iov_len = ci->buffer_size;
+
+ ret = cdc_alloc_iobuf_and_init_vec (this, priv, ci, 0);
+ if (ret)
+ break;
+
+ /* Re-position Zlib output buffer */
+ cdc_init_zlib_output_stream (priv, ci, 0);
+ }
+
+ ret = deflate (&ci->stream, Z_NO_FLUSH);
+ if (ret != Z_OK)
+ break;
+ }
+
+ out:
+ return ret;
+}
+
+int32_t
+cdc_compress (xlator_t *this, cdc_priv_t *priv, cdc_info_t *ci,
+ dict_t **xdata)
+{
+ int ret = -1;
+ int i = 0;
+
+ ci->iobref = iobref_new ();
+ if (!ci->iobref)
+ goto out;
+
+ if (!*xdata) {
+ *xdata = dict_new ();
+ if (!*xdata) {
+ gf_log (this->name, GF_LOG_ERROR, "Cannot allocate xdata"
+ " dict");
+ goto out;
+ }
+ }
+
+ /* data */
+ for (i = 0; i < ci->count; i++) {
+ ret = do_cdc_compress (&ci->vector[i], this, priv, ci);
+ if (ret != Z_OK)
+ goto deflate_cleanup_out;
+ }
+
+ /* flush zlib buffer */
+ ret = cdc_flush_libz_buffer (priv, this, ci, deflate, Z_FINISH);
+ if (!(ret == Z_OK || ret == Z_STREAM_END)) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "Compression Error: ret (%d)", ret);
+ ret = -1;
+ goto deflate_cleanup_out;
+ }
+
+ /* trailer */
+ ret = cdc_init_gzip_trailer (this, priv, ci);
+ if (ret)
+ goto deflate_cleanup_out;
+
+ gf_log (this->name, GF_LOG_DEBUG,
+ "Compressed %ld to %ld bytes",
+ ci->stream.total_in, ci->stream.total_out);
+
+ ci->nbytes = ci->stream.total_out + GF_CDC_VALIDATION_SIZE;
+
+ /* set deflated canary value for identification */
+ ret = dict_set_int32 (*xdata, GF_CDC_DEFLATE_CANARY_VAL, 1);
+ if (ret) {
+ /* Send uncompressed data if we can't _tell_ the client
+ * that deflated data is on it's way. So, we just log
+ * the faliure and continue as usual.
+ */
+ gf_log (this->name, GF_LOG_ERROR,
+ "Data deflated, but could not set canary"
+ " value in dict for identification");
+ }
+
+ /* This is to be used in testing */
+ if ( priv->debug ) {
+ cdc_dump_iovec_to_disk (this, ci, GF_CDC_DEBUG_DUMP_FILE );
+ }
+
+ deflate_cleanup_out:
+ (void) deflateEnd(&ci->stream);
+
+ out:
+ return ret;
+}
+
+
+/* deflate content is checked by the presence of a canary
+ * value in the dict as the key
+ */
+static int32_t
+cdc_check_content_for_deflate (dict_t *xdata)
+{
+ return dict_get (xdata, GF_CDC_DEFLATE_CANARY_VAL) ? -1 : 0;
+}
+
+static unsigned long
+cdc_extract_crc (char *trailer)
+{
+ return cdc_get_long ((unsigned char *) &trailer[0]);
+}
+
+static unsigned long
+cdc_extract_size (char *trailer)
+{
+ return cdc_get_long ((unsigned char *) &trailer[4]);
+}
+
+static int32_t
+cdc_validate_inflate (cdc_info_t *ci, unsigned long crc,
+ unsigned long len)
+{
+ return !((crc == ci->crc)
+ /* inflated length is hidden inside
+ * Zlib stream struct */
+ && (len == ci->stream.total_out));
+}
+
+static int32_t
+do_cdc_decompress (xlator_t *this, cdc_priv_t *priv, cdc_info_t *ci)
+{
+ int ret = -1;
+ int i = 0;
+ int len = 0;
+ char *inflte = NULL;
+ char *trailer = NULL;
+ struct iovec vec = {0,};
+ unsigned long computed_crc = 0;
+ unsigned long computed_len = 0;
+
+ ret = inflateInit2 (&ci->stream, priv->window_size);
+ if (ret) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "Zlib: Unable to initialize inflate");
+ goto out;
+ }
+
+ vec = THIS_VEC(ci, 0);
+
+ trailer = (char *) (((char *) vec.iov_base) + vec.iov_len
+ - GF_CDC_VALIDATION_SIZE);
+
+ /* CRC of uncompressed data */
+ computed_crc = cdc_extract_crc (trailer);
+
+ /* size of uncomrpessed data */
+ computed_len = cdc_extract_size (trailer);
+
+ gf_log (this->name, GF_LOG_DEBUG, "crc=%lu len=%lu buffer_size=%d",
+ computed_crc, computed_len, ci->buffer_size);
+
+ inflte = vec.iov_base ;
+ len = vec.iov_len - GF_CDC_VALIDATION_SIZE;
+
+ /* allocate buffer of the original length of the data */
+ ret = cdc_alloc_iobuf_and_init_vec (this, priv, ci, 0);
+ if (ret)
+ goto out;
+
+ /* setup output buffer */
+ cdc_init_zlib_output_stream (priv, ci, 0);
+
+ /* setup input buffer */
+ ci->stream.next_in = (unsigned char *) inflte;
+ ci->stream.avail_in = len;
+
+ while (ci->stream.avail_in != 0) {
+ if (ci->stream.avail_out == 0) {
+ CURR_VEC(ci).iov_len = ci->buffer_size;
+
+ ret = cdc_alloc_iobuf_and_init_vec (this, priv, ci, 0);
+ if (ret)
+ break;
+
+ /* Re-position Zlib output buffer */
+ cdc_init_zlib_output_stream (priv, ci, 0);
+ }
+
+ ret = inflate (&ci->stream, Z_NO_FLUSH);
+ if (ret == Z_STREAM_ERROR)
+ break;
+ }
+
+ /* flush zlib buffer */
+ ret = cdc_flush_libz_buffer (priv, this, ci, inflate, Z_SYNC_FLUSH);
+ if (!(ret == Z_OK || ret == Z_STREAM_END)) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "Decompression Error: ret (%d)", ret);
+ ret = -1;
+ goto out;
+ }
+
+ /* compute CRC of the uncompresses data to check for
+ * correctness */
+
+ for (i = 0; i < ci->ncount; i++) {
+ ci->crc = crc32 (ci->crc,
+ (const Bytef *) ci->vec[i].iov_base,
+ ci->vec[i].iov_len);
+ }
+
+ /* validate inflated data */
+ ret = cdc_validate_inflate (ci, computed_crc, computed_len);
+ if (ret) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "Checksum or length mismatched in inflated data");
+ }
+
+ out:
+ return ret;
+}
+
+int32_t
+cdc_decompress (xlator_t *this, cdc_priv_t *priv, cdc_info_t *ci,
+ dict_t *xdata)
+{
+ int32_t ret = -1;
+
+ /* check for deflate content */
+ if (!cdc_check_content_for_deflate (xdata)) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "Content not deflated, passing through ...");
+ goto passthrough_out;
+ }
+
+ ci->iobref = iobref_new ();
+ if (!ci->iobref)
+ goto passthrough_out;
+
+ /* do we need to do this? can we assume that one iovec
+ * will hold per request data everytime?
+ *
+ * server/client protocol seems to deal with a single
+ * iovec even if op_ret > 1M. So, it looks ok to
+ * assume that a single iovec will contain all the
+ * data (This saves us a lot from finding the trailer
+ * and the data since it could have been split-up onto
+ * two adjacent iovec's.
+ *
+ * But, in case this translator is loaded above quick-read
+ * for some reason, then it's entirely possible that we get
+ * multiple iovec's...
+ *
+ * This case (handled below) is not tested. (by loading the
+ * xlator below quick-read)
+ */
+
+ /* @@ I_HOPE_THIS_IS_NEVER_HIT */
+ if (ci->count > 1) {
+ gf_log (this->name, GF_LOG_WARNING, "unable to handle"
+ " multiple iovecs (%d in number)", ci->count);
+ goto inflate_cleanup_out;
+ /* TODO: coallate all iovecs in one */
+ }
+
+ ret = do_cdc_decompress (this, priv, ci);
+ if (ret)
+ goto inflate_cleanup_out;
+
+ ci->nbytes = ci->stream.total_out;
+
+ gf_log (this->name, GF_LOG_DEBUG,
+ "Inflated %ld to %ld bytes",
+ ci->stream.total_in, ci->stream.total_out);
+
+ inflate_cleanup_out:
+ (void) inflateEnd (&ci->stream);
+
+ passthrough_out:
+ return ret;
+}
+
+#endif
diff --git a/xlators/features/compress/src/cdc-mem-types.h b/xlators/features/compress/src/cdc-mem-types.h
new file mode 100644
index 000000000..efa008059
--- /dev/null
+++ b/xlators/features/compress/src/cdc-mem-types.h
@@ -0,0 +1,22 @@
+/*
+ Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com>
+ This file is part of GlusterFS.
+
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
+*/
+
+#ifndef __CDC_MEM_TYPES_H
+#define __CDC_MEM_TYPES_H
+
+#include "mem-types.h"
+
+enum gf_cdc_mem_types {
+ gf_cdc_mt_priv_t = gf_common_mt_end + 1,
+ gf_cdc_mt_vec_t = gf_common_mt_end + 2,
+ gf_cdc_mt_gzip_trailer_t = gf_common_mt_end + 3,
+};
+
+#endif
diff --git a/xlators/features/compress/src/cdc.c b/xlators/features/compress/src/cdc.c
new file mode 100644
index 000000000..eb7d87c56
--- /dev/null
+++ b/xlators/features/compress/src/cdc.c
@@ -0,0 +1,342 @@
+/*
+ Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com>
+ This file is part of GlusterFS.
+
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
+*/
+
+#include <sys/uio.h>
+
+#ifndef _CONFIG_H
+#define _CONFIG_H
+#include "config.h"
+#endif
+
+#include "xlator.h"
+#include "defaults.h"
+#include "logging.h"
+
+#include "cdc.h"
+#include "cdc-mem-types.h"
+
+static void
+cdc_cleanup_iobref (cdc_info_t *ci)
+{
+ assert(ci->iobref != NULL);
+ iobref_clear (ci->iobref);
+}
+
+int32_t
+cdc_readv_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno,
+ struct iovec *vector, int32_t count,
+ struct iatt *stbuf, struct iobref *iobref,
+ dict_t *xdata)
+{
+ int ret = -1;
+ cdc_priv_t *priv = NULL;
+ cdc_info_t ci = {0,};
+
+ GF_VALIDATE_OR_GOTO ("cdc", this, default_out);
+ GF_VALIDATE_OR_GOTO (this->name, frame, default_out);
+
+ priv = this->private;
+
+ if (op_ret <= 0)
+ goto default_out;
+
+ if ( (priv->min_file_size != 0)
+ && (op_ret < priv->min_file_size) )
+ goto default_out;
+
+ ci.count = count;
+ ci.ibytes = op_ret;
+ ci.vector = vector;
+ ci.buf = NULL;
+ ci.iobref = NULL;
+ ci.ncount = 0;
+ ci.crc = 0;
+ ci.buffer_size = GF_CDC_DEF_BUFFERSIZE;
+
+/* A readv compresses on the server side and decompresses on the client side
+ */
+ if (priv->op_mode == GF_CDC_MODE_SERVER) {
+ ret = cdc_compress (this, priv, &ci, &xdata);
+ } else if (priv->op_mode == GF_CDC_MODE_CLIENT) {
+ ret = cdc_decompress (this, priv, &ci, xdata);
+ } else {
+ gf_log (this->name, GF_LOG_ERROR,
+ "Invalid operation mode (%d)", priv->op_mode);
+ }
+
+ if (ret)
+ goto default_out;
+
+ STACK_UNWIND_STRICT (readv, frame, ci.nbytes, op_errno,
+ ci.vec, ci.ncount, stbuf, iobref,
+ xdata);
+ cdc_cleanup_iobref (&ci);
+ return 0;
+
+ default_out:
+ STACK_UNWIND_STRICT (readv, frame, op_ret, op_errno,
+ vector, count, stbuf, iobref, xdata);
+ return 0;
+}
+
+int32_t
+cdc_readv (call_frame_t *frame, xlator_t *this,
+ fd_t *fd, size_t size, off_t offset, uint32_t flags,
+ dict_t *xdata)
+{
+ fop_readv_cbk_t cbk = NULL;
+
+#ifdef HAVE_LIB_Z
+ cbk = cdc_readv_cbk;
+#else
+ cbk = default_readv_cbk;
+#endif
+ STACK_WIND (frame, cbk, FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->readv,
+ fd, size, offset, flags, xdata);
+ return 0;
+}
+
+int32_t
+cdc_writev_cbk (call_frame_t *frame,
+ void *cookie,
+ xlator_t *this,
+ int32_t op_ret,
+ int32_t op_errno,
+ struct iatt *prebuf,
+ struct iatt *postbuf, dict_t *xdata)
+{
+
+ STACK_UNWIND_STRICT (writev, frame, op_ret, op_errno, prebuf, postbuf, xdata);
+ return 0;
+}
+
+int32_t
+cdc_writev (call_frame_t *frame,
+ xlator_t *this,
+ fd_t *fd,
+ struct iovec *vector,
+ int32_t count,
+ off_t offset,
+ uint32_t flags,
+ struct iobref *iobref, dict_t *xdata)
+{
+ int ret = -1;
+ cdc_priv_t *priv = NULL;
+ cdc_info_t ci = {0,};
+ size_t isize = 0;
+
+ GF_VALIDATE_OR_GOTO ("cdc", this, default_out);
+ GF_VALIDATE_OR_GOTO (this->name, frame, default_out);
+
+ priv = this->private;
+
+ isize = iov_length(vector, count);
+
+ if (isize <= 0)
+ goto default_out;
+
+ if ( (priv->min_file_size != 0)
+ && (isize < priv->min_file_size) )
+ goto default_out;
+
+ ci.count = count;
+ ci.ibytes = isize;
+ ci.vector = vector;
+ ci.buf = NULL;
+ ci.iobref = NULL;
+ ci.ncount = 0;
+ ci.crc = 0;
+ ci.buffer_size = GF_CDC_DEF_BUFFERSIZE;
+
+/* A writev compresses on the client side and decompresses on the server side
+ */
+ if (priv->op_mode == GF_CDC_MODE_CLIENT) {
+ ret = cdc_compress (this, priv, &ci, &xdata);
+ } else if (priv->op_mode == GF_CDC_MODE_SERVER) {
+ ret = cdc_decompress (this, priv, &ci, xdata);
+ } else {
+ gf_log (this->name, GF_LOG_ERROR, "Invalid operation mode (%d) ", priv->op_mode);
+ }
+
+ if (ret)
+ goto default_out;
+
+ STACK_WIND (frame,
+ cdc_writev_cbk,
+ FIRST_CHILD (this),
+ FIRST_CHILD (this)->fops->writev,
+ fd, ci.vec, ci.ncount, offset, flags,
+ iobref, xdata);
+
+ cdc_cleanup_iobref (&ci);
+ return 0;
+
+ default_out:
+ STACK_WIND (frame,
+ cdc_writev_cbk,
+ FIRST_CHILD (this),
+ FIRST_CHILD (this)->fops->writev,
+ fd, vector, count, offset, flags,
+ iobref, xdata);
+ return 0;
+}
+
+int32_t
+init (xlator_t *this)
+{
+ int ret = -1;
+ char *temp_str = NULL;
+ cdc_priv_t *priv = NULL;
+
+ GF_VALIDATE_OR_GOTO ("cdc", this, err);
+
+ if (!this->children || this->children->next) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "Need subvolume == 1");
+ goto err;
+ }
+
+ if (!this->parents) {
+ gf_log (this->name, GF_LOG_WARNING,
+ "Dangling volume. Check volfile");
+ }
+
+ priv = GF_CALLOC (1, sizeof (*priv), gf_cdc_mt_priv_t);
+ if (!priv) {
+ goto err;
+ }
+
+ /* Check if debug mode is turned on */
+ GF_OPTION_INIT ("debug", priv->debug, bool, err);
+ if( priv->debug ) {
+ gf_log (this->name, GF_LOG_DEBUG, "CDC debug option turned on");
+ }
+
+ /* Set Gzip Window Size */
+ GF_OPTION_INIT ("window-size", priv->window_size, int32, err);
+ if ( (priv->window_size > GF_CDC_MAX_WINDOWSIZE)
+ || (priv->window_size < GF_CDC_DEF_WINDOWSIZE) ) {
+ gf_log (this->name, GF_LOG_WARNING,
+ "Invalid gzip window size (%d), using default",
+ priv->window_size);
+ priv->window_size = GF_CDC_DEF_WINDOWSIZE;
+ }
+
+ /* Set Gzip (De)Compression Level */
+ GF_OPTION_INIT ("compression-level", priv->cdc_level, int32, err);
+ if ( ((priv->cdc_level < 1) || (priv->cdc_level > 9))
+ && (priv->cdc_level != GF_CDC_DEF_COMPRESSION) ) {
+ gf_log (this->name, GF_LOG_WARNING,
+ "Invalid gzip (de)compression level (%d),"
+ " using default", priv->cdc_level);
+ priv->cdc_level = GF_CDC_DEF_COMPRESSION;
+ }
+
+ /* Set Gzip Memory Level */
+ GF_OPTION_INIT ("mem-level", priv->mem_level, int32, err);
+ if ( (priv->mem_level < 1) || (priv->mem_level > 9) ) {
+ gf_log (this->name, GF_LOG_WARNING,
+ "Invalid gzip memory level, using the default");
+ priv->mem_level = GF_CDC_DEF_MEMLEVEL;
+ }
+
+ /* Set min file size to enable compression */
+ GF_OPTION_INIT ("min-size", priv->min_file_size, int32, err);
+
+ /* Mode of operation - Server/Client */
+ ret = dict_get_str (this->options, "mode", &temp_str);
+ if (ret) {
+ gf_log (this->name, GF_LOG_CRITICAL,
+ "Operation mode not specified !!");
+ goto err;
+ }
+
+ if (GF_CDC_MODE_IS_CLIENT (temp_str)) {
+ priv->op_mode = GF_CDC_MODE_CLIENT;
+ } else if (GF_CDC_MODE_IS_SERVER (temp_str)) {
+ priv->op_mode = GF_CDC_MODE_SERVER;
+ } else {
+ gf_log (this->name, GF_LOG_CRITICAL,
+ "Bogus operation mode (%s) specified", temp_str);
+ goto err;
+ }
+
+ this->private = priv;
+ gf_log (this->name, GF_LOG_DEBUG, "CDC xlator loaded in (%s) mode",temp_str);
+ return 0;
+
+ err:
+ if (priv)
+ GF_FREE (priv);
+
+ return -1;
+}
+
+void
+fini (xlator_t *this)
+{
+ cdc_priv_t *priv = this->private;
+
+ if (priv)
+ GF_FREE (priv);
+ this->private = NULL;
+ return;
+}
+
+struct xlator_fops fops = {
+ .readv = cdc_readv,
+ .writev = cdc_writev,
+};
+
+struct xlator_cbks cbks = {
+};
+
+struct volume_options options[] = {
+ { .key = {"window-size"},
+ .default_value = "-15",
+ .type = GF_OPTION_TYPE_INT,
+ .description = "Size of the zlib history buffer."
+ },
+ { .key = {"mem-level"},
+ .default_value = "8",
+ .type = GF_OPTION_TYPE_INT,
+ .description = "Memory allocated for internal compression state.\
+ 1 uses minimum memory but is slow and reduces \
+ compression ratio; memLevel=9 uses maximum memory \
+ for optimal speed. The default value is 8."
+ },
+ { .key = {"compression-level"},
+ .default_value = "-1",
+ .type = GF_OPTION_TYPE_INT,
+ .description = "Compression levels \
+ 0 : no compression, 1 : best speed, \
+ 9 : best compression, -1 : default compression "
+ },
+ { .key = {"min-size"},
+ .default_value = "0",
+ .type = GF_OPTION_TYPE_INT,
+ .description = "Data is compressed only when its size exceeds this."
+ },
+ { .key = {"mode"},
+ .value = {"server", "client"},
+ .type = GF_OPTION_TYPE_STR,
+ .description = "Set on the basis of where the xlator is loaded."
+ },
+ { .key = {"debug"},
+ .default_value = "false",
+ .type = GF_OPTION_TYPE_BOOL,
+ .description = "This is used in testing. Will dump compressed data \
+ to disk as a gzip file."
+ },
+ { .key = {NULL}
+ },
+};
diff --git a/xlators/features/compress/src/cdc.h b/xlators/features/compress/src/cdc.h
new file mode 100644
index 000000000..71f4d2317
--- /dev/null
+++ b/xlators/features/compress/src/cdc.h
@@ -0,0 +1,107 @@
+/*
+ Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com>
+ This file is part of GlusterFS.
+
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
+*/
+
+#ifndef __CDC_H
+#define __CDC_H
+
+#ifdef HAVE_LIB_Z
+#include "zlib.h"
+#endif
+
+#include "xlator.h"
+
+#ifndef MAX_IOVEC
+#define MAX_IOVEC 16
+#endif
+
+typedef struct cdc_priv {
+ int window_size;
+ int mem_level;
+ int cdc_level;
+ int min_file_size;
+ int op_mode;
+ gf_boolean_t debug;
+ gf_lock_t lock;
+} cdc_priv_t;
+
+typedef struct cdc_info {
+ /* input bits */
+ int count;
+ int32_t ibytes;
+ struct iovec *vector;
+ struct iatt *buf;
+
+ /* output bits */
+ int ncount;
+ int nbytes;
+ int buffer_size;
+ struct iovec vec[MAX_IOVEC];
+ struct iobref *iobref;
+
+ /* zlib bits */
+#ifdef HAVE_LIB_Z
+ z_stream stream;
+#endif
+ unsigned long crc;
+} cdc_info_t;
+
+#define NVEC(ci) (ci->ncount - 1)
+#define CURR_VEC(ci) ci->vec[ci->ncount - 1]
+#define THIS_VEC(ci, i) ci->vector[i]
+
+/* Gzip defaults */
+#define GF_CDC_DEF_WINDOWSIZE -15 /* default value */
+#define GF_CDC_MAX_WINDOWSIZE -8 /* max value */
+
+#ifdef HAVE_LIB_Z
+#define GF_CDC_DEF_COMPRESSION Z_DEFAULT_COMPRESSION
+#else
+#define GF_CDC_DEF_COMPRESSION -1
+#endif
+
+#define GF_CDC_DEF_MEMLEVEL 8
+#define GF_CDC_DEF_BUFFERSIZE 262144 // 256K - default compression buffer size
+
+/* Operation mode
+ * If xlator is loaded on client, readv decompresses and writev compresses
+ * If xlator is loaded on server, readv compresses and writev decompresses
+ */
+#define GF_CDC_MODE_CLIENT 0
+#define GF_CDC_MODE_SERVER 1
+
+/* min size of data to do cmpression
+ * 0 == compress even 1byte
+ */
+#define GF_CDC_MIN_CHUNK_SIZE 0
+
+#define GF_CDC_VALIDATION_SIZE 8
+
+#define GF_CDC_OS_ID 0xFF
+#define GF_CDC_DEFLATE_CANARY_VAL "deflate"
+#define GF_CDC_DEBUG_DUMP_FILE "/tmp/cdcdump.gz"
+
+#define GF_CDC_MODE_IS_CLIENT(m) \
+ (strcmp (m, "client") == 0)
+
+#define GF_CDC_MODE_IS_SERVER(m) \
+ (strcmp (m, "server") == 0)
+
+int32_t
+cdc_compress (xlator_t *this,
+ cdc_priv_t *priv,
+ cdc_info_t *ci,
+ dict_t **xdata);
+int32_t
+cdc_decompress (xlator_t *this,
+ cdc_priv_t *priv,
+ cdc_info_t *ci,
+ dict_t *xdata);
+
+#endif
diff --git a/xlators/features/gfid-access/src/gfid-access.c b/xlators/features/gfid-access/src/gfid-access.c
index 755add522..da0ba7e50 100644
--- a/xlators/features/gfid-access/src/gfid-access.c
+++ b/xlators/features/gfid-access/src/gfid-access.c
@@ -68,8 +68,12 @@ ga_newfile_parse_args (xlator_t *this, data_t *data)
min_len = sizeof (args->uid) + sizeof (args->gid) + sizeof (args->gfid)
+ sizeof (args->st_mode) + 2 + 2;
- if (blob_len < min_len)
+ if (blob_len < min_len) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "Invalid length: Total length is less "
+ "than minimum length.");
goto err;
+ }
args = mem_get0 (priv->newfile_args_pool);
if (args == NULL)
@@ -93,7 +97,12 @@ ga_newfile_parse_args (xlator_t *this, data_t *data)
len = strnlen (blob, blob_len);
if (len == blob_len)
+ if (len == blob_len) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "gfid: %s. No null byte present.",
+ args->gfid);
goto err;
+ }
args->bname = GF_CALLOC (1, (len + 1), gf_common_mt_char);
if (args->bname == NULL)
@@ -104,25 +113,39 @@ ga_newfile_parse_args (xlator_t *this, data_t *data)
blob_len -= (len + 1);
if (S_ISDIR (args->st_mode)) {
- if (blob_len < sizeof (uint32_t))
+ if (blob_len < sizeof (uint32_t)) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "gfid: %s. Invalid length",
+ args->gfid);
goto err;
+ }
args->args.mkdir.mode = ntoh32 (*(uint32_t *)blob);
blob += sizeof (uint32_t);
blob_len -= sizeof (uint32_t);
- if (blob_len < sizeof (uint32_t))
+ if (blob_len < sizeof (uint32_t)) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "gfid: %s. Invalid length",
+ args->gfid);
goto err;
+ }
args->args.mkdir.umask = ntoh32 (*(uint32_t *)blob);
blob += sizeof (uint32_t);
blob_len -= sizeof (uint32_t);
- if (blob_len < 0)
+ if (blob_len < 0) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "gfid: %s. Invalid length",
+ args->gfid);
goto err;
-
+ }
} else if (S_ISLNK (args->st_mode)) {
len = strnlen (blob, blob_len);
- if (len == blob_len)
+ if (len == blob_len) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "gfid: %s. Invalid length",
+ args->gfid);
goto err;
-
+ }
args->args.symlink.linkpath = GF_CALLOC (1, len + 1,
gf_common_mt_char);
if (args->args.symlink.linkpath == NULL)
@@ -132,27 +155,43 @@ ga_newfile_parse_args (xlator_t *this, data_t *data)
blob += (len + 1);
blob_len -= (len + 1);
} else {
- if (blob_len < sizeof (uint32_t))
+ if (blob_len < sizeof (uint32_t)) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "gfid: %s. Invalid length",
+ args->gfid);
goto err;
+ }
args->args.mknod.mode = ntoh32 (*(uint32_t *)blob);
blob += sizeof (uint32_t);
blob_len -= sizeof (uint32_t);
- if (blob_len < sizeof (uint32_t))
+ if (blob_len < sizeof (uint32_t)) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "gfid: %s. Invalid length",
+ args->gfid);
goto err;
+ }
args->args.mknod.rdev = ntoh32 (*(uint32_t *)blob);
blob += sizeof (uint32_t);
blob_len -= sizeof (uint32_t);
- if (blob_len < sizeof (uint32_t))
+ if (blob_len < sizeof (uint32_t)) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "gfid: %s. Invalid length",
+ args->gfid);
goto err;
+ }
args->args.mknod.umask = ntoh32 (*(uint32_t *)blob);
blob += sizeof (uint32_t);
blob_len -= sizeof (uint32_t);
}
- if (blob_len)
+ if (blob_len) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "gfid: %s. Invalid length",
+ args->gfid);
goto err;
+ }
return args;
@@ -469,13 +508,8 @@ ga_setxattr (call_frame_t *frame, xlator_t *this, loc_t *loc, dict_t *dict,
return 0;
}
- /* now, check if the setxattr() is on gfid-path */
- if (!((loc->parent &&
- __is_gfid_access_dir (loc->parent->gfid)) ||
- __is_gfid_access_dir (loc->pargfid))) {
- goto wind;
- }
-
+ //If the inode is a virtual inode change the inode otherwise perform
+ //the operation on same inode
GFID_ACCESS_GET_VALID_DIR_INODE (this, loc, unref, wind);
wind:
diff --git a/xlators/features/index/src/index.c b/xlators/features/index/src/index.c
index 3245076ec..9253120f3 100644
--- a/xlators/features/index/src/index.c
+++ b/xlators/features/index/src/index.c
@@ -15,8 +15,10 @@
#include "index.h"
#include "options.h"
#include "glusterfs3-xdr.h"
+#include "syncop.h"
#define XATTROP_SUBDIR "xattrop"
+#define BASE_INDICES_HOLDER_SUBDIR "base_indices_holder"
call_stub_t *
__index_dequeue (struct list_head *callstubs)
@@ -242,20 +244,40 @@ check_delete_stale_index_file (xlator_t *this, char *filename)
{
int ret = 0;
struct stat st = {0};
+ struct stat base_index_st = {0};
char filepath[PATH_MAX] = {0};
+ char filepath_under_base_indices_holder[PATH_MAX] = {0};
index_priv_t *priv = NULL;
priv = this->private;
+ if (priv->to_be_healed_states != synced_state)
+ return;
+
make_file_path (priv->index_basepath, XATTROP_SUBDIR,
filename, filepath, sizeof (filepath));
+
+ make_file_path (priv->index_basepath, BASE_INDICES_HOLDER_SUBDIR,
+ filename, filepath_under_base_indices_holder,
+ sizeof (filepath_under_base_indices_holder));
+
+
+ ret = stat (filepath_under_base_indices_holder, &base_index_st);
+ if (ret) {
+ gf_log (THIS->name, GF_LOG_ERROR, "Base index is not created"
+ "under index/base_indices_holder");
+ return;
+ }
+
ret = stat (filepath, &st);
- if (!ret && st.st_nlink == 1)
+ if (!ret && st.st_nlink == 2) {
unlink (filepath);
+ unlink (filepath_under_base_indices_holder);
+ }
}
static int
index_fill_readdir (fd_t *fd, DIR *dir, off_t off,
- size_t size, gf_dirent_t *entries)
+ size_t size, gf_dirent_t *entries, readdir_directory type)
{
off_t in_case = -1;
size_t filled = 0;
@@ -298,7 +320,8 @@ index_fill_readdir (fd_t *fd, DIR *dir, off_t off,
}
if (!strncmp (entry->d_name, XATTROP_SUBDIR"-",
- strlen (XATTROP_SUBDIR"-"))) {
+ strlen (XATTROP_SUBDIR"-")) &&
+ (type == INDEX_XATTROP)) {
check_delete_stale_index_file (this, entry->d_name);
continue;
}
@@ -337,16 +360,175 @@ out:
}
int
+sync_base_indices (void *index_priv)
+{
+ index_priv_t *priv = NULL;
+ DIR *dir_base_holder = NULL;
+ DIR *xattrop_dir = NULL;
+ struct dirent *entry = NULL;
+ char base_indices_holder[PATH_MAX] = {0};
+ char xattrop_directory[PATH_MAX] = {0};
+ char base_index_path[PATH_MAX] = {0};
+ char xattrop_index_path[PATH_MAX] = {0};
+ int ret = 0;
+
+ priv = index_priv;
+
+ snprintf (base_indices_holder, PATH_MAX, "%s/%s", priv->index_basepath,
+ BASE_INDICES_HOLDER_SUBDIR);
+ snprintf (xattrop_directory, PATH_MAX, "%s/%s", priv->index_basepath,
+ XATTROP_SUBDIR);
+
+ if ((dir_base_holder = opendir(base_indices_holder)) == NULL) {
+ ret = -1;
+ goto out;
+ }
+ if ((xattrop_dir = opendir (xattrop_directory)) == NULL) {
+ ret = -1;
+ goto out;
+ }
+
+ priv->to_be_healed_states = sync_started;
+ while ((entry = readdir(xattrop_dir)) != NULL) {
+ if (!strcmp (entry->d_name, ".") ||
+ !strcmp (entry->d_name, "..")) {
+ continue;
+ }
+ if (strncmp (entry->d_name, XATTROP_SUBDIR"-",
+ strlen (XATTROP_SUBDIR"-"))) {
+ continue;
+ }
+ if (!strncmp (entry->d_name, XATTROP_SUBDIR"-",
+ strlen (XATTROP_SUBDIR"-"))) {
+
+ snprintf (xattrop_index_path, PATH_MAX, "%s/%s",
+ xattrop_directory, entry->d_name);
+
+ snprintf (base_index_path, PATH_MAX, "%s/%s",
+ base_indices_holder, entry->d_name);
+
+ ret = link (xattrop_index_path, base_index_path);
+ if (ret && errno != EEXIST)
+ goto out;
+
+ }
+ }
+ ret = closedir (xattrop_dir);
+ if (ret)
+ goto out;
+ ret = closedir (dir_base_holder);
+ if (ret)
+ goto out;
+
+ ret = 0;
+out:
+ return ret;
+
+}
+
+int
+base_indices_syncing_done (int ret, call_frame_t *frame, void *data)
+{
+ index_priv_t *priv = NULL;
+ priv = data;
+
+ if (!priv)
+ goto out;
+
+ if (ret) {
+ priv->to_be_healed_states = sync_not_started;
+ } else {
+ priv->to_be_healed_states = synced_state;
+ }
+
+ STACK_DESTROY (frame->root);
+
+out:
+ return 0;
+}
+
+int
+sync_base_indices_from_xattrop (xlator_t *this)
+{
+
+ index_priv_t *priv = NULL;
+ char base_indices_holder[PATH_MAX] = {0};
+ int ret = 0;
+ struct stat st = {0};
+ DIR *dir = NULL;
+ struct dirent *entry = NULL;
+ call_frame_t *frame = NULL;
+
+ priv = this->private;
+
+ if (priv->to_be_healed_states != sync_not_started) {
+ ret = -1;
+ goto out;
+ }
+
+ snprintf (base_indices_holder, PATH_MAX, "%s/%s", priv->index_basepath,
+ BASE_INDICES_HOLDER_SUBDIR);
+
+ ret = stat (base_indices_holder, &st);
+
+ if (ret && (errno != ENOENT)) {
+ goto out;
+ } else if (errno == ENOENT) {
+ ret = index_dir_create (this, BASE_INDICES_HOLDER_SUBDIR);
+ if (ret)
+ goto out;
+ } else {
+ if ((dir = opendir (base_indices_holder)) == NULL) {
+ ret = -1;
+ goto out;
+ }
+ while ((entry = readdir (dir)) != NULL) {
+ if (!strcmp (entry->d_name, ".") ||
+ !strcmp (entry->d_name,"..")) {
+ continue;
+ }
+ ret = unlink (entry->d_name);
+ if (ret)
+ goto out;
+ }
+ closedir (dir);
+ }
+
+ /*At this point of time we have index/base_indicies_holder directory
+ *is with no entries*/
+
+ frame = create_frame (this, this->ctx->pool);
+ if (!frame) {
+ ret = -1;
+ goto out;
+ }
+ set_lk_owner_from_ptr (&frame->root->lk_owner, frame->root);
+
+ frame->root->pid = LOW_PRIO_PROC_PID;
+
+ ret = synctask_new (this->ctx->env, sync_base_indices,
+ base_indices_syncing_done,frame, priv);
+
+
+
+out:
+ return ret;
+
+}
+
+int
index_add (xlator_t *this, uuid_t gfid, const char *subdir)
{
int32_t op_errno = 0;
char gfid_path[PATH_MAX] = {0};
char index_path[PATH_MAX] = {0};
+ char base_path[PATH_MAX] = {0};
int ret = 0;
uuid_t index = {0};
index_priv_t *priv = NULL;
struct stat st = {0};
int fd = 0;
+ int index_created = 0;
priv = this->private;
GF_ASSERT_AND_GOTO_WITH_ERROR (this->name, !uuid_is_null (gfid),
@@ -364,9 +546,11 @@ index_add (xlator_t *this, uuid_t gfid, const char *subdir)
ret = link (index_path, gfid_path);
if (!ret || (errno == EEXIST)) {
ret = 0;
+ index_created = 1;
goto out;
}
+
op_errno = errno;
if (op_errno == ENOENT) {
ret = index_dir_create (this, subdir);
@@ -398,10 +582,36 @@ index_add (xlator_t *this, uuid_t gfid, const char *subdir)
"add to index (%s)", uuid_utoa (gfid),
strerror (errno));
goto out;
+ } else {
+ index_created = 1;
+ }
+
+ if (priv->to_be_healed_states != sync_not_started) {
+ make_index_path (priv->index_basepath,
+ GF_BASE_INDICES_HOLDER_GFID,
+ index, base_path, sizeof (base_path));
+ ret = link (index_path, base_path);
+ if (ret)
+ goto out;
}
ret = 0;
out:
+ /*If base_indices_holder is not created: create and sync
+ *If directory is present: delete contents and start syncing
+ *If syncing is in progress :No need to do any thing
+ *If syncing is done: No need to do anything*/
+ if (!ret) {
+ switch (priv->to_be_healed_states) {
+ case sync_not_started:
+ ret = sync_base_indices_from_xattrop (this);
+ break;
+ case sync_started:
+ case synced_state:
+ /*No need to do anything*/
+ break;
+ }
+ }
return ret;
}
@@ -737,8 +947,18 @@ index_getxattr_wrapper (call_frame_t *frame, xlator_t *this,
goto done;
}
- ret = dict_set_static_bin (xattr, (char*)name, priv->xattrop_vgfid,
- sizeof (priv->xattrop_vgfid));
+ if (!strcmp (name, GF_XATTROP_INDEX_GFID)) {
+
+ ret = dict_set_static_bin (xattr, (char*)name,
+ priv->xattrop_vgfid,
+ sizeof (priv->xattrop_vgfid));
+
+ } else if (!strcmp (name, GF_BASE_INDICES_HOLDER_GFID)) {
+
+ ret = dict_set_static_bin (xattr, (char*)name,
+ priv->base_indices_holder_vgfid,
+ sizeof (priv->base_indices_holder_vgfid));
+ }
if (ret) {
ret = -ENOMEM;
gf_log (THIS->name, GF_LOG_ERROR, "xattrop index "
@@ -782,6 +1002,15 @@ index_lookup_wrapper (call_frame_t *frame, xlator_t *this,
} else if (!uuid_compare (loc->pargfid, priv->xattrop_vgfid)) {
make_file_path (priv->index_basepath, XATTROP_SUBDIR,
loc->name, path, sizeof (path));
+ } else if (!uuid_compare (loc->gfid,priv->base_indices_holder_vgfid)){
+ make_index_dir_path (priv->index_basepath,
+ BASE_INDICES_HOLDER_SUBDIR, path,
+ sizeof (path));
+ is_dir = _gf_true;
+ } else if (!uuid_compare (loc->pargfid, priv->base_indices_holder_vgfid)) {
+ make_file_path (priv->index_basepath,
+ BASE_INDICES_HOLDER_SUBDIR,loc->name, path,
+ sizeof (path));
}
ret = lstat (path, &lstatbuf);
@@ -803,10 +1032,14 @@ index_lookup_wrapper (call_frame_t *frame, xlator_t *this,
}
iatt_from_stat (&stbuf, &lstatbuf);
- if (is_dir)
+ if (is_dir && !uuid_compare (loc->gfid, priv->xattrop_vgfid)) {
uuid_copy (stbuf.ia_gfid, priv->xattrop_vgfid);
- else
+ } else if (is_dir &&
+ !uuid_compare (loc->gfid, priv->base_indices_holder_vgfid)) {
+ uuid_copy (stbuf.ia_gfid, priv->base_indices_holder_vgfid);
+ } else {
uuid_generate (stbuf.ia_gfid);
+ }
stbuf.ia_ino = -1;
op_ret = 0;
done:
@@ -818,6 +1051,44 @@ done:
}
int32_t
+base_indices_readdir_wrapper (call_frame_t *frame, xlator_t *this,
+ fd_t *fd, size_t size, off_t off, dict_t *xdata)
+{
+ index_priv_t *priv = NULL;
+ char base_indices_holder[PATH_MAX] = {0};
+ DIR *dir = NULL;
+ int32_t op_ret = -1;
+ int32_t op_errno = 0;
+ int count = 0;
+ gf_dirent_t entries;
+
+ priv = this->private;
+
+ make_index_dir_path (priv->index_basepath, BASE_INDICES_HOLDER_SUBDIR,
+ base_indices_holder, sizeof (base_indices_holder));
+
+ dir = opendir (base_indices_holder);
+ if (!dir) {
+ op_errno = EINVAL;
+ goto done;
+ }
+
+
+ INIT_LIST_HEAD (&entries.list);
+
+ count = index_fill_readdir (fd, dir, off, size, &entries,
+ BASE_INDICES_HOLDER);
+ /* pick ENOENT to indicate EOF */
+ op_errno = errno;
+ op_ret = count;
+ closedir (dir);
+done:
+ STACK_UNWIND_STRICT (readdir, frame, op_ret, op_errno, &entries, xdata);
+ gf_dirent_free (&entries);
+ return 0;
+}
+
+int32_t
index_readdir_wrapper (call_frame_t *frame, xlator_t *this,
fd_t *fd, size_t size, off_t off, dict_t *xdata)
{
@@ -848,7 +1119,8 @@ index_readdir_wrapper (call_frame_t *frame, xlator_t *this,
goto done;
}
- count = index_fill_readdir (fd, dir, off, size, &entries);
+ count = index_fill_readdir (fd, dir, off, size, &entries,
+ INDEX_XATTROP);
/* pick ENOENT to indicate EOF */
op_errno = errno;
@@ -915,7 +1187,10 @@ index_getxattr (call_frame_t *frame, xlator_t *this,
{
call_stub_t *stub = NULL;
- if (!name || strcmp (GF_XATTROP_INDEX_GFID, name))
+ if (!name)
+ goto out;
+ if (strcmp (GF_XATTROP_INDEX_GFID, name) &&
+ strcmp (GF_BASE_INDICES_HOLDER_GFID, name))
goto out;
stub = fop_getxattr_stub (frame, index_getxattr_wrapper, loc, name,
@@ -942,7 +1217,9 @@ index_lookup (call_frame_t *frame, xlator_t *this,
priv = this->private;
if (uuid_compare (loc->gfid, priv->xattrop_vgfid) &&
- uuid_compare (loc->pargfid, priv->xattrop_vgfid))
+ uuid_compare (loc->pargfid, priv->xattrop_vgfid) &&
+ uuid_compare (loc->gfid, priv->base_indices_holder_vgfid) &&
+ uuid_compare (loc->pargfid, priv->base_indices_holder_vgfid))
goto normal;
stub = fop_lookup_stub (frame, index_lookup_wrapper, loc, xattr_req);
@@ -968,10 +1245,19 @@ index_readdir (call_frame_t *frame, xlator_t *this,
index_priv_t *priv = NULL;
priv = this->private;
- if (uuid_compare (fd->inode->gfid, priv->xattrop_vgfid))
+ if (uuid_compare (fd->inode->gfid, priv->xattrop_vgfid) &&
+ uuid_compare (fd->inode->gfid, priv->base_indices_holder_vgfid))
goto out;
- stub = fop_readdir_stub (frame, index_readdir_wrapper, fd, size, off,
- xdata);
+
+ if (!uuid_compare (fd->inode->gfid, priv->xattrop_vgfid)) {
+ stub = fop_readdir_stub (frame, index_readdir_wrapper, fd, size,
+ off, xdata);
+ } else if (!uuid_compare (fd->inode->gfid,
+ priv->base_indices_holder_vgfid)) {
+ stub = fop_readdir_stub (frame, base_indices_readdir_wrapper,
+ fd, size, off, xdata);
+ }
+
if (!stub) {
STACK_UNWIND_STRICT (readdir, frame, -1, ENOMEM, NULL, NULL);
return 0;
@@ -1075,10 +1361,13 @@ init (xlator_t *this)
GF_OPTION_INIT ("index-base", priv->index_basepath, path, out);
uuid_generate (priv->index);
uuid_generate (priv->xattrop_vgfid);
+ /*base_indices_holder is a directory which contains hard links to
+ * all base indices inside indices/xattrop directory*/
+ uuid_generate (priv->base_indices_holder_vgfid);
INIT_LIST_HEAD (&priv->callstubs);
this->private = priv;
- ret = pthread_create (&thread, &w_attr, index_worker, this);
+ ret = gf_thread_create (&thread, &w_attr, index_worker, this);
if (ret) {
gf_log (this->name, GF_LOG_WARNING, "Failed to create "
"worker thread, aborting");
diff --git a/xlators/features/index/src/index.h b/xlators/features/index/src/index.h
index 661dcdbc4..d6dcb1c23 100644
--- a/xlators/features/index/src/index.h
+++ b/xlators/features/index/src/index.h
@@ -36,14 +36,28 @@ typedef struct index_fd_ctx {
DIR *dir;
} index_fd_ctx_t;
+typedef enum {
+ sync_not_started,
+ sync_started,
+ synced_state,
+} to_be_healed_states_t;
+
+typedef enum {
+ INDEX_XATTROP,
+ BASE_INDICES_HOLDER,
+} readdir_directory;
+
typedef struct index_priv {
char *index_basepath;
uuid_t index;
gf_lock_t lock;
uuid_t xattrop_vgfid;//virtual gfid of the xattrop index dir
+ uuid_t base_indices_holder_vgfid; //virtual gfid of the
+ //to_be_healed_xattrop directory
struct list_head callstubs;
pthread_mutex_t mutex;
pthread_cond_t cond;
+ to_be_healed_states_t to_be_healed_states;
} index_priv_t;
#define INDEX_STACK_UNWIND(fop, frame, params ...) \
diff --git a/xlators/features/locks/src/Makefile.am b/xlators/features/locks/src/Makefile.am
index 8908c1f52..0f79731b4 100644
--- a/xlators/features/locks/src/Makefile.am
+++ b/xlators/features/locks/src/Makefile.am
@@ -11,6 +11,7 @@ noinst_HEADERS = locks.h common.h locks-mem-types.h clear.h
AM_CPPFLAGS = $(GF_CPPFLAGS) -I$(top_srcdir)/libglusterfs/src
+
AM_CFLAGS = -Wall -fno-strict-aliasing $(GF_CFLAGS)
CLEANFILES =
diff --git a/xlators/features/locks/src/clear.c b/xlators/features/locks/src/clear.c
index 5790a99ce..124b9ad0f 100644
--- a/xlators/features/locks/src/clear.c
+++ b/xlators/features/locks/src/clear.c
@@ -339,6 +339,7 @@ blkd:
-1, EAGAIN);
STACK_UNWIND_STRICT (entrylk, elock->frame, -1, EAGAIN, NULL);
GF_FREE ((char *) elock->basename);
+ GF_FREE (elock->connection_id);
GF_FREE (elock);
}
@@ -379,8 +380,8 @@ out:
int
clrlk_clear_lks_in_all_domains (xlator_t *this, pl_inode_t *pl_inode,
- clrlk_args *args, int *blkd, int *granted,
- int *op_errno)
+ clrlk_args *args, int *blkd, int *granted,
+ int *op_errno)
{
pl_dom_list_t *dom = NULL;
int ret = -1;
diff --git a/xlators/features/locks/src/common.c b/xlators/features/locks/src/common.c
index bc4ea53f9..b3309580d 100644
--- a/xlators/features/locks/src/common.c
+++ b/xlators/features/locks/src/common.c
@@ -35,6 +35,7 @@ __insert_and_merge (pl_inode_t *pl_inode, posix_lock_t *lock);
static int
pl_send_prelock_unlock (xlator_t *this, pl_inode_t *pl_inode,
posix_lock_t *old_lock);
+
static pl_dom_list_t *
__allocate_domain (const char *volume)
{
@@ -75,8 +76,8 @@ get_domain (pl_inode_t *pl_inode, const char *volume)
{
pl_dom_list_t *dom = NULL;
- GF_VALIDATE_OR_GOTO (POSIX_LOCKS, pl_inode, out);
- GF_VALIDATE_OR_GOTO (POSIX_LOCKS, volume, out);
+ GF_VALIDATE_OR_GOTO ("posix-locks", pl_inode, out);
+ GF_VALIDATE_OR_GOTO ("posix-locks", volume, out);
pthread_mutex_lock (&pl_inode->mutex);
{
@@ -92,9 +93,9 @@ get_domain (pl_inode_t *pl_inode, const char *volume)
unlock:
pthread_mutex_unlock (&pl_inode->mutex);
if (dom) {
- gf_log (POSIX_LOCKS, GF_LOG_TRACE, "Domain %s found", volume);
+ gf_log ("posix-locks", GF_LOG_TRACE, "Domain %s found", volume);
} else {
- gf_log (POSIX_LOCKS, GF_LOG_TRACE, "Domain %s not found", volume);
+ gf_log ("posix-locks", GF_LOG_TRACE, "Domain %s not found", volume);
}
out:
return dom;
@@ -138,7 +139,7 @@ pl_print_locker (char *str, int size, xlator_t *this, call_frame_t *frame)
snprintf (str, size, "Pid=%llu, lk-owner=%s, Client=%p, Frame=%llu",
(unsigned long long) frame->root->pid,
lkowner_utoa (&frame->root->lk_owner),
- frame->root->trans,
+ frame->root->client,
(unsigned long long) frame->root->unique);
}
@@ -462,14 +463,14 @@ unlock:
/* Create a new posix_lock_t */
posix_lock_t *
-new_posix_lock (struct gf_flock *flock, void *transport, pid_t client_pid,
+new_posix_lock (struct gf_flock *flock, client_t *client, pid_t client_pid,
gf_lkowner_t *owner, fd_t *fd)
{
posix_lock_t *lock = NULL;
- GF_VALIDATE_OR_GOTO (POSIX_LOCKS, flock, out);
- GF_VALIDATE_OR_GOTO (POSIX_LOCKS, transport, out);
- GF_VALIDATE_OR_GOTO (POSIX_LOCKS, fd, out);
+ GF_VALIDATE_OR_GOTO ("posix-locks", flock, out);
+ GF_VALIDATE_OR_GOTO ("posix-locks", client, out);
+ GF_VALIDATE_OR_GOTO ("posix-locks", fd, out);
lock = GF_CALLOC (1, sizeof (posix_lock_t),
gf_locks_mt_posix_lock_t);
@@ -485,7 +486,7 @@ new_posix_lock (struct gf_flock *flock, void *transport, pid_t client_pid,
else
lock->fl_end = flock->l_start + flock->l_len - 1;
- lock->transport = transport;
+ lock->client = client;
lock->fd_num = fd_to_fdnum (fd);
lock->fd = fd;
lock->client_pid = client_pid;
@@ -565,7 +566,7 @@ same_owner (posix_lock_t *l1, posix_lock_t *l2)
{
return (is_same_lkowner (&l1->owner, &l2->owner) &&
- (l1->transport == l2->transport));
+ (l1->client == l2->client));
}
@@ -694,7 +695,7 @@ subtract_locks (posix_lock_t *big, posix_lock_t *small)
}
GF_ASSERT (0);
- gf_log (POSIX_LOCKS, GF_LOG_ERROR, "Unexpected case in subtract_locks");
+ gf_log ("posix-locks", GF_LOG_ERROR, "Unexpected case in subtract_locks");
out:
if (v.locks[0]) {
@@ -812,7 +813,7 @@ __insert_and_merge (pl_inode_t *pl_inode, posix_lock_t *lock)
sum = add_locks (lock, conf);
sum->fl_type = lock->fl_type;
- sum->transport = lock->transport;
+ sum->client = lock->client;
sum->fd_num = lock->fd_num;
sum->client_pid = lock->client_pid;
sum->owner = lock->owner;
@@ -830,7 +831,7 @@ __insert_and_merge (pl_inode_t *pl_inode, posix_lock_t *lock)
sum = add_locks (lock, conf);
sum->fl_type = conf->fl_type;
- sum->transport = conf->transport;
+ sum->client = conf->client;
sum->fd_num = conf->fd_num;
sum->client_pid = conf->client_pid;
sum->owner = conf->owner;
@@ -988,7 +989,7 @@ pl_send_prelock_unlock (xlator_t *this, pl_inode_t *pl_inode,
flock.l_len = old_lock->user_flock.l_len;
- unlock_lock = new_posix_lock (&flock, old_lock->transport,
+ unlock_lock = new_posix_lock (&flock, old_lock->client,
old_lock->client_pid, &old_lock->owner,
old_lock->fd);
GF_VALIDATE_OR_GOTO (this->name, unlock_lock, out);
@@ -1097,3 +1098,124 @@ pl_getlk (pl_inode_t *pl_inode, posix_lock_t *lock)
return conf;
}
+
+
+struct _lock_table *
+pl_lock_table_new (void)
+{
+ struct _lock_table *new = NULL;
+
+ new = GF_CALLOC (1, sizeof (struct _lock_table), gf_common_mt_lock_table);
+ if (new == NULL) {
+ goto out;
+ }
+ INIT_LIST_HEAD (&new->entrylk_lockers);
+ INIT_LIST_HEAD (&new->inodelk_lockers);
+ LOCK_INIT (&new->lock);
+out:
+ return new;
+}
+
+
+int
+pl_add_locker (struct _lock_table *table, const char *volume,
+ loc_t *loc, fd_t *fd, pid_t pid, gf_lkowner_t *owner,
+ glusterfs_fop_t type)
+{
+ int32_t ret = -1;
+ struct _locker *new = NULL;
+
+ GF_VALIDATE_OR_GOTO ("lock-table", table, out);
+ GF_VALIDATE_OR_GOTO ("lock-table", volume, out);
+
+ new = GF_CALLOC (1, sizeof (struct _locker), gf_common_mt_locker);
+ if (new == NULL) {
+ goto out;
+ }
+ INIT_LIST_HEAD (&new->lockers);
+
+ new->volume = gf_strdup (volume);
+
+ if (fd == NULL) {
+ loc_copy (&new->loc, loc);
+ } else {
+ new->fd = fd_ref (fd);
+ }
+
+ new->pid = pid;
+ new->owner = *owner;
+
+ LOCK (&table->lock);
+ {
+ if (type == GF_FOP_ENTRYLK)
+ list_add_tail (&new->lockers, &table->entrylk_lockers);
+ else
+ list_add_tail (&new->lockers, &table->inodelk_lockers);
+ }
+ UNLOCK (&table->lock);
+out:
+ return ret;
+}
+
+int
+pl_del_locker (struct _lock_table *table, const char *volume,
+ loc_t *loc, fd_t *fd, gf_lkowner_t *owner, glusterfs_fop_t type)
+{
+ struct _locker *locker = NULL;
+ struct _locker *tmp = NULL;
+ int32_t ret = -1;
+ struct list_head *head = NULL;
+ struct list_head del;
+
+ GF_VALIDATE_OR_GOTO ("lock-table", table, out);
+ GF_VALIDATE_OR_GOTO ("lock-table", volume, out);
+
+ INIT_LIST_HEAD (&del);
+
+ LOCK (&table->lock);
+ {
+ if (type == GF_FOP_ENTRYLK) {
+ head = &table->entrylk_lockers;
+ } else {
+ head = &table->inodelk_lockers;
+ }
+
+ list_for_each_entry_safe (locker, tmp, head, lockers) {
+ if (!is_same_lkowner (&locker->owner, owner) ||
+ strcmp (locker->volume, volume))
+ continue;
+
+ /*
+ * It is possible for inodelk lock to come on anon-fd
+ * and inodelk unlock to come on normal fd in case of
+ * client re-opens. So don't check for fds to be equal.
+ */
+ if (locker->fd && fd)
+ list_move_tail (&locker->lockers, &del);
+ else if (locker->loc.inode && loc &&
+ (locker->loc.inode == loc->inode))
+ list_move_tail (&locker->lockers, &del);
+ }
+ }
+ UNLOCK (&table->lock);
+
+ tmp = NULL;
+ locker = NULL;
+
+ list_for_each_entry_safe (locker, tmp, &del, lockers) {
+ list_del_init (&locker->lockers);
+ if (locker->fd)
+ fd_unref (locker->fd);
+ else
+ loc_wipe (&locker->loc);
+
+ GF_FREE (locker->volume);
+ GF_FREE (locker);
+ }
+
+ ret = 0;
+out:
+ return ret;
+
+}
+
diff --git a/xlators/features/locks/src/common.h b/xlators/features/locks/src/common.h
index 2e1cd7ac1..db19ec978 100644
--- a/xlators/features/locks/src/common.h
+++ b/xlators/features/locks/src/common.h
@@ -14,12 +14,14 @@
/*dump locks format strings */
#define RANGE_FMT "type=%s, whence=%hd, start=%llu, len=%llu"
#define ENTRY_FMT "type=%s on basename=%s"
-#define DUMP_GEN_FMT "pid = %llu, owner=%s, transport=%p, "
+#define DUMP_GEN_FMT "pid = %llu, owner=%s, client=%p"
#define GRNTD_AT "granted at %s"
#define BLKD_AT "blocked at %s"
-#define DUMP_BLKD_FMT DUMP_GEN_FMT", "BLKD_AT
-#define DUMP_GRNTD_FMT DUMP_GEN_FMT", "GRNTD_AT
-#define DUMP_BLKD_GRNTD_FMT DUMP_GEN_FMT", "BLKD_AT", "GRNTD_AT
+#define CONN_ID "connection-id=%s"
+#define DUMP_BLKD_FMT DUMP_GEN_FMT", "CONN_ID", "BLKD_AT
+#define DUMP_GRNTD_FMT DUMP_GEN_FMT", "CONN_ID", "GRNTD_AT
+#define DUMP_BLKD_GRNTD_FMT DUMP_GEN_FMT", "CONN_ID", "BLKD_AT", "GRNTD_AT
+
#define ENTRY_BLKD_FMT ENTRY_FMT", "DUMP_BLKD_FMT
#define ENTRY_GRNTD_FMT ENTRY_FMT", "DUMP_GRNTD_FMT
#define ENTRY_BLKD_GRNTD_FMT ENTRY_FMT", "DUMP_BLKD_GRNTD_FMT
@@ -29,8 +31,24 @@
#define RANGE_BLKD_GRNTD_FMT RANGE_FMT", "DUMP_BLKD_GRNTD_FMT
#define SET_FLOCK_PID(flock, lock) ((flock)->l_pid = lock->client_pid)
+
+struct _locker {
+ struct list_head lockers;
+ char *volume;
+ loc_t loc;
+ fd_t *fd;
+ gf_lkowner_t owner;
+ pid_t pid;
+};
+
+struct _lock_table {
+ struct list_head inodelk_lockers;
+ struct list_head entrylk_lockers;
+ gf_lock_t lock;
+};
+
posix_lock_t *
-new_posix_lock (struct gf_flock *flock, void *transport, pid_t client_pid,
+new_posix_lock (struct gf_flock *flock, client_t *client, pid_t client_pid,
gf_lkowner_t *owner, fd_t *fd);
pl_inode_t *
@@ -63,7 +81,8 @@ pl_dom_list_t *
get_domain (pl_inode_t *pl_inode, const char *volume);
void
-grant_blocked_inode_locks (xlator_t *this, pl_inode_t *pl_inode, pl_dom_list_t *dom);
+grant_blocked_inode_locks (xlator_t *this, pl_inode_t *pl_inode,
+ pl_dom_list_t *dom);
void
__delete_inode_lock (pl_inode_lock_t *lock);
@@ -143,6 +162,26 @@ pl_verify_reservelk (xlator_t *this, pl_inode_t *pl_inode,
posix_lock_t *lock, int can_block);
int
pl_reserve_unlock (xlator_t *this, pl_inode_t *pl_inode, posix_lock_t *reqlock);
+
uint32_t
check_entrylk_on_basename (xlator_t *this, inode_t *parent, char *basename);
+
+int32_t
+pl_add_locker (struct _lock_table *table, const char *volume,
+ loc_t *loc,
+ fd_t *fd,
+ pid_t pid,
+ gf_lkowner_t *owner,
+ glusterfs_fop_t type);
+
+int32_t
+pl_del_locker (struct _lock_table *table, const char *volume,
+ loc_t *loc,
+ fd_t *fd,
+ gf_lkowner_t *owner,
+ glusterfs_fop_t type);
+
+struct _lock_table *
+pl_lock_table_new (void);
+
#endif /* __COMMON_H__ */
diff --git a/xlators/features/locks/src/entrylk.c b/xlators/features/locks/src/entrylk.c
index d934a8b94..0785dc547 100644
--- a/xlators/features/locks/src/entrylk.c
+++ b/xlators/features/locks/src/entrylk.c
@@ -25,7 +25,7 @@
static pl_entry_lock_t *
new_entrylk_lock (pl_inode_t *pinode, const char *basename, entrylk_type type,
- void *trans, pid_t client_pid, gf_lkowner_t *owner,
+ client_t *client, pid_t client_pid, gf_lkowner_t *owner,
const char *volume)
{
@@ -39,7 +39,7 @@ new_entrylk_lock (pl_inode_t *pinode, const char *basename, entrylk_type type,
newlock->basename = basename ? gf_strdup (basename) : NULL;
newlock->type = type;
- newlock->trans = trans;
+ newlock->trans = client;
newlock->volume = volume;
newlock->client_pid = client_pid;
newlock->owner = *owner;
@@ -305,18 +305,15 @@ __find_most_matching_lock (pl_dom_list_t *dom, const char *basename)
int
__lock_name (pl_inode_t *pinode, const char *basename, entrylk_type type,
- call_frame_t *frame, pl_dom_list_t *dom, xlator_t *this, int nonblock)
+ call_frame_t *frame, pl_dom_list_t *dom, xlator_t *this,
+ int nonblock, char *conn_id)
{
pl_entry_lock_t *lock = NULL;
pl_entry_lock_t *conf = NULL;
- void *trans = NULL;
- pid_t client_pid = 0;
int ret = -EINVAL;
- trans = frame->root->trans;
- client_pid = frame->root->pid;
-
- lock = new_entrylk_lock (pinode, basename, type, trans, client_pid,
+ lock = new_entrylk_lock (pinode, basename, type,
+ frame->root->client, frame->root->pid,
&frame->root->lk_owner, dom->domain);
if (!lock) {
ret = -ENOMEM;
@@ -325,12 +322,17 @@ __lock_name (pl_inode_t *pinode, const char *basename, entrylk_type type,
lock->frame = frame;
lock->this = this;
- lock->trans = trans;
+ lock->trans = frame->root->client;
+
+ if (conn_id) {
+ lock->connection_id = gf_strdup (conn_id);
+ }
conf = __lock_grantable (dom, basename, type);
if (conf) {
ret = -EAGAIN;
if (nonblock){
+ GF_FREE (lock->connection_id);
GF_FREE ((char *)lock->basename);
GF_FREE (lock);
goto out;
@@ -350,6 +352,7 @@ __lock_name (pl_inode_t *pinode, const char *basename, entrylk_type type,
if ( __blocked_lock_conflict (dom, basename, type) && !(__owner_has_lock (dom, lock))) {
ret = -EAGAIN;
if (nonblock) {
+ GF_FREE (lock->connection_id);
GF_FREE ((char *) lock->basename);
GF_FREE (lock);
goto out;
@@ -480,13 +483,15 @@ __grant_blocked_entry_locks (xlator_t *this, pl_inode_t *pl_inode,
pl_inode, bl->basename);
bl_ret = __lock_name (pl_inode, bl->basename, bl->type,
- bl->frame, dom, bl->this, 0);
+ bl->frame, dom, bl->this, 0,
+ bl->connection_id);
if (bl_ret == 0) {
list_add (&bl->blocked_locks, granted);
} else {
gf_log (this->name, GF_LOG_DEBUG,
"should never happen");
+ GF_FREE (bl->connection_id);
GF_FREE ((char *)bl->basename);
GF_FREE (bl);
}
@@ -507,7 +512,8 @@ grant_blocked_entry_locks (xlator_t *this, pl_inode_t *pl_inode,
pthread_mutex_lock (&pl_inode->mutex);
{
- __grant_blocked_entry_locks (this, pl_inode, dom, &granted_list);
+ __grant_blocked_entry_locks (this, pl_inode, dom,
+ &granted_list);
}
pthread_mutex_unlock (&pl_inode->mutex);
@@ -520,24 +526,26 @@ grant_blocked_entry_locks (xlator_t *this, pl_inode_t *pl_inode,
STACK_UNWIND_STRICT (entrylk, lock->frame, 0, 0, NULL);
- GF_FREE ((char *)lock->basename);
- GF_FREE (lock);
+ GF_FREE (lock->connection_id);
+ GF_FREE ((char *)lock->basename);
+ GF_FREE (lock);
}
GF_FREE ((char *)unlocked->basename);
+ GF_FREE (unlocked->connection_id);
GF_FREE (unlocked);
return;
}
/**
- * release_entry_locks_for_transport: release all entry locks from this
- * transport for this loc_t
+ * release_entry_locks_for_client: release all entry locks from this
+ * client for this loc_t
*/
static int
-release_entry_locks_for_transport (xlator_t *this, pl_inode_t *pinode,
- pl_dom_list_t *dom, void *trans)
+release_entry_locks_for_client (xlator_t *this, pl_inode_t *pinode,
+ pl_dom_list_t *dom, client_t *client)
{
pl_entry_lock_t *lock = NULL;
pl_entry_lock_t *tmp = NULL;
@@ -551,14 +559,14 @@ release_entry_locks_for_transport (xlator_t *this, pl_inode_t *pinode,
{
list_for_each_entry_safe (lock, tmp, &dom->blocked_entrylks,
blocked_locks) {
- if (lock->trans != trans)
+ if (lock->trans != client)
continue;
list_del_init (&lock->blocked_locks);
gf_log (this->name, GF_LOG_TRACE,
"releasing lock on held by "
- "{transport=%p}",trans);
+ "{client=%p}", client);
list_add (&lock->blocked_locks, &released);
@@ -566,16 +574,17 @@ release_entry_locks_for_transport (xlator_t *this, pl_inode_t *pinode,
list_for_each_entry_safe (lock, tmp, &dom->entrylk_list,
domain_list) {
- if (lock->trans != trans)
+ if (lock->trans != client)
continue;
list_del_init (&lock->domain_list);
gf_log (this->name, GF_LOG_TRACE,
"releasing lock on held by "
- "{transport=%p}",trans);
+ "{client=%p}", client);
GF_FREE ((char *)lock->basename);
+ GF_FREE (lock->connection_id);
GF_FREE (lock);
}
@@ -591,6 +600,7 @@ release_entry_locks_for_transport (xlator_t *this, pl_inode_t *pinode,
STACK_UNWIND_STRICT (entrylk, lock->frame, -1, EAGAIN, NULL);
GF_FREE ((char *)lock->basename);
+ GF_FREE (lock->connection_id);
GF_FREE (lock);
}
@@ -601,6 +611,7 @@ release_entry_locks_for_transport (xlator_t *this, pl_inode_t *pinode,
STACK_UNWIND_STRICT (entrylk, lock->frame, 0, 0, NULL);
GF_FREE ((char *)lock->basename);
+ GF_FREE (lock->connection_id);
GF_FREE (lock);
}
@@ -611,19 +622,23 @@ release_entry_locks_for_transport (xlator_t *this, pl_inode_t *pinode,
int
pl_common_entrylk (call_frame_t *frame, xlator_t *this,
const char *volume, inode_t *inode, const char *basename,
- entrylk_cmd cmd, entrylk_type type, loc_t *loc, fd_t *fd)
-{
- int32_t op_ret = -1;
- int32_t op_errno = 0;
+ entrylk_cmd cmd, entrylk_type type, loc_t *loc, fd_t *fd,
+ dict_t *xdata)
- void * transport = NULL;
-
- pl_inode_t * pinode = NULL;
+{
+ int32_t op_ret = -1;
+ int32_t op_errno = 0;
int ret = -1;
- pl_entry_lock_t *unlocked = NULL;
char unwind = 1;
+ GF_UNUSED int dict_ret = -1;
+ pl_inode_t *pinode = NULL;
+ pl_entry_lock_t *unlocked = NULL;
+ pl_dom_list_t *dom = NULL;
+ char *conn_id = NULL;
+ pl_ctx_t *ctx = NULL;
- pl_dom_list_t *dom = NULL;
+ if (xdata)
+ dict_ret = dict_get_str (xdata, "connection-id", &conn_id);
pinode = pl_inode_get (this, inode);
if (!pinode) {
@@ -639,18 +654,17 @@ pl_common_entrylk (call_frame_t *frame, xlator_t *this,
entrylk_trace_in (this, frame, volume, fd, loc, basename, cmd, type);
- transport = frame->root->trans;
-
if (frame->root->lk_owner.len == 0) {
/*
this is a special case that means release
- all locks from this transport
+ all locks from this client
*/
gf_log (this->name, GF_LOG_TRACE,
- "Releasing locks for transport %p", transport);
+ "Releasing locks for client %p", frame->root->client);
- release_entry_locks_for_transport (this, pinode, dom, transport);
+ release_entry_locks_for_client (this, pinode, dom,
+ frame->root->client);
op_ret = 0;
goto out;
@@ -661,7 +675,7 @@ pl_common_entrylk (call_frame_t *frame, xlator_t *this,
pthread_mutex_lock (&pinode->mutex);
{
ret = __lock_name (pinode, basename, type,
- frame, dom, this, 0);
+ frame, dom, this, 0, conn_id);
}
pthread_mutex_unlock (&pinode->mutex);
@@ -686,7 +700,7 @@ pl_common_entrylk (call_frame_t *frame, xlator_t *this,
pthread_mutex_lock (&pinode->mutex);
{
ret = __lock_name (pinode, basename, type,
- frame, dom, this, 1);
+ frame, dom, this, 1, conn_id);
}
pthread_mutex_unlock (&pinode->mutex);
@@ -723,6 +737,24 @@ out:
entrylk_trace_out (this, frame, volume, fd, loc, basename,
cmd, type, op_ret, op_errno);
+ ctx = pl_ctx_get (frame->root->client, this);
+
+ if (ctx == NULL) {
+ gf_log (this->name, GF_LOG_INFO, "pl_ctx_get() failed");
+ goto unwind;
+ }
+
+ if (cmd == ENTRYLK_UNLOCK)
+ pl_del_locker (ctx->ltable, volume, loc, fd,
+ &frame->root->lk_owner,
+ GF_FOP_ENTRYLK);
+ else
+ pl_add_locker (ctx->ltable, volume, loc, fd,
+ frame->root->pid,
+ &frame->root->lk_owner,
+ GF_FOP_ENTRYLK);
+
+unwind:
STACK_UNWIND_STRICT (entrylk, frame, op_ret, op_errno, NULL);
} else {
entrylk_trace_block (this, frame, volume, fd, loc, basename,
@@ -742,10 +774,10 @@ out:
int
pl_entrylk (call_frame_t *frame, xlator_t *this,
const char *volume, loc_t *loc, const char *basename,
- entrylk_cmd cmd, entrylk_type type)
+ entrylk_cmd cmd, entrylk_type type, dict_t *xdata)
{
-
- pl_common_entrylk (frame, this, volume, loc->inode, basename, cmd, type, loc, NULL);
+ pl_common_entrylk (frame, this, volume, loc->inode, basename, cmd,
+ type, loc, NULL, xdata);
return 0;
}
@@ -760,10 +792,10 @@ pl_entrylk (call_frame_t *frame, xlator_t *this,
int
pl_fentrylk (call_frame_t *frame, xlator_t *this,
const char *volume, fd_t *fd, const char *basename,
- entrylk_cmd cmd, entrylk_type type)
+ entrylk_cmd cmd, entrylk_type type, dict_t *xdata)
{
-
- pl_common_entrylk (frame, this, volume, fd->inode, basename, cmd, type, NULL, fd);
+ pl_common_entrylk (frame, this, volume, fd->inode, basename, cmd,
+ type, NULL, fd, xdata);
return 0;
}
diff --git a/xlators/features/locks/src/inodelk.c b/xlators/features/locks/src/inodelk.c
index 2fd8b6c29..508523e11 100644
--- a/xlators/features/locks/src/inodelk.c
+++ b/xlators/features/locks/src/inodelk.c
@@ -39,8 +39,10 @@ inline void
__pl_inodelk_unref (pl_inode_lock_t *lock)
{
lock->ref--;
- if (!lock->ref)
+ if (!lock->ref) {
+ GF_FREE (lock->connection_id);
GF_FREE (lock);
+ }
}
/* Check if 2 inodelks are conflicting on type. Only 2 shared locks don't conflict */
@@ -122,7 +124,7 @@ static inline int
same_inodelk_owner (pl_inode_lock_t *l1, pl_inode_lock_t *l2)
{
return (is_same_lkowner (&l1->owner, &l2->owner) &&
- (l1->transport == l2->transport));
+ (l1->client == l2->client));
}
/* Returns true if the 2 inodelks conflict with each other */
@@ -292,7 +294,7 @@ __inode_unlock_lock (xlator_t *this, pl_inode_lock_t *lock, pl_dom_list_t *dom)
" Matching lock not found for unlock %llu-%llu, by %s "
"on %p", (unsigned long long)lock->fl_start,
(unsigned long long)lock->fl_end,
- lkowner_utoa (&lock->owner), lock->transport);
+ lkowner_utoa (&lock->owner), lock->client);
goto out;
}
__delete_inode_lock (conf);
@@ -300,7 +302,7 @@ __inode_unlock_lock (xlator_t *this, pl_inode_lock_t *lock, pl_dom_list_t *dom)
" Matching lock found for unlock %llu-%llu, by %s on %p",
(unsigned long long)lock->fl_start,
(unsigned long long)lock->fl_end, lkowner_utoa (&lock->owner),
- lock->transport);
+ lock->client);
out:
return conf;
@@ -333,7 +335,8 @@ __grant_blocked_inode_locks (xlator_t *this, pl_inode_t *pl_inode,
/* Grant all inodelks blocked on a lock */
void
-grant_blocked_inode_locks (xlator_t *this, pl_inode_t *pl_inode, pl_dom_list_t *dom)
+grant_blocked_inode_locks (xlator_t *this, pl_inode_t *pl_inode,
+ pl_dom_list_t *dom)
{
struct list_head granted;
pl_inode_lock_t *lock;
@@ -372,10 +375,10 @@ grant_blocked_inode_locks (xlator_t *this, pl_inode_t *pl_inode, pl_dom_list_t *
pthread_mutex_unlock (&pl_inode->mutex);
}
-/* Release all inodelks from this transport */
+/* Release all inodelks from this client */
static int
-release_inode_locks_of_transport (xlator_t *this, pl_dom_list_t *dom,
- inode_t *inode, void *trans)
+release_inode_locks_of_client (xlator_t *this, pl_dom_list_t *dom,
+ inode_t *inode, client_t *client)
{
pl_inode_lock_t *tmp = NULL;
pl_inode_lock_t *l = NULL;
@@ -395,7 +398,7 @@ release_inode_locks_of_transport (xlator_t *this, pl_dom_list_t *dom,
{
list_for_each_entry_safe (l, tmp, &dom->blocked_inodelks, blocked_locks) {
- if (l->transport != trans)
+ if (l->client != client)
continue;
list_del_init (&l->blocked_locks);
@@ -408,8 +411,8 @@ release_inode_locks_of_transport (xlator_t *this, pl_dom_list_t *dom,
gf_log (this->name, GF_LOG_DEBUG,
"releasing blocking lock on %s held by "
- "{transport=%p, pid=%"PRId64" lk-owner=%s}",
- file, trans, (uint64_t) l->client_pid,
+ "{client=%p, pid=%"PRId64" lk-owner=%s}",
+ file, client, (uint64_t) l->client_pid,
lkowner_utoa (&l->owner));
list_add (&l->blocked_locks, &released);
@@ -420,7 +423,7 @@ release_inode_locks_of_transport (xlator_t *this, pl_dom_list_t *dom,
}
list_for_each_entry_safe (l, tmp, &dom->inodelk_list, list) {
- if (l->transport != trans)
+ if (l->client != client)
continue;
inode_path (inode, NULL, &path);
@@ -431,8 +434,8 @@ release_inode_locks_of_transport (xlator_t *this, pl_dom_list_t *dom,
gf_log (this->name, GF_LOG_DEBUG,
"releasing granted lock on %s held by "
- "{transport=%p, pid=%"PRId64" lk-owner=%s}",
- file, trans, (uint64_t) l->client_pid,
+ "{client=%p, pid=%"PRId64" lk-owner=%s}",
+ file, client, (uint64_t) l->client_pid,
lkowner_utoa (&l->owner));
if (path) {
@@ -515,8 +518,9 @@ out:
/* Create a new inode_lock_t */
pl_inode_lock_t *
-new_inode_lock (struct gf_flock *flock, void *transport, pid_t client_pid,
- gf_lkowner_t *owner, const char *volume)
+new_inode_lock (struct gf_flock *flock, client_t *client, pid_t client_pid,
+ call_frame_t *frame, xlator_t *this, const char *volume,
+ char *conn_id)
{
pl_inode_lock_t *lock = NULL;
@@ -535,10 +539,16 @@ new_inode_lock (struct gf_flock *flock, void *transport, pid_t client_pid,
else
lock->fl_end = flock->l_start + flock->l_len - 1;
- lock->transport = transport;
+ lock->client = client;
lock->client_pid = client_pid;
lock->volume = volume;
- lock->owner = *owner;
+ lock->owner = frame->root->lk_owner;
+ lock->frame = frame;
+ lock->this = this;
+
+ if (conn_id) {
+ lock->connection_id = gf_strdup (conn_id);
+ }
INIT_LIST_HEAD (&lock->list);
INIT_LIST_HEAD (&lock->blocked_locks);
@@ -582,19 +592,23 @@ _pl_convert_volume_for_special_range (struct gf_flock *flock,
int
pl_common_inodelk (call_frame_t *frame, xlator_t *this,
const char *volume, inode_t *inode, int32_t cmd,
- struct gf_flock *flock, loc_t *loc, fd_t *fd)
+ struct gf_flock *flock, loc_t *loc, fd_t *fd, dict_t *xdata)
{
int32_t op_ret = -1;
int32_t op_errno = 0;
int ret = -1;
+ GF_UNUSED int dict_ret = -1;
int can_block = 0;
- pid_t client_pid = -1;
- void * transport = NULL;
pl_inode_t * pinode = NULL;
pl_inode_lock_t * reqlock = NULL;
pl_dom_list_t * dom = NULL;
char *res = NULL;
char *res1 = NULL;
+ char *conn_id = NULL;
+ pl_ctx_t *ctx = NULL;
+
+ if (xdata)
+ dict_ret = dict_get_str (xdata, "connection-id", &conn_id);
VALIDATE_OR_GOTO (frame, out);
VALIDATE_OR_GOTO (inode, unwind);
@@ -613,9 +627,6 @@ pl_common_inodelk (call_frame_t *frame, xlator_t *this,
pl_trace_in (this, frame, fd, loc, cmd, flock, volume);
- transport = frame->root->trans;
- client_pid = frame->root->pid;
-
pinode = pl_inode_get (this, inode);
if (!pinode) {
op_errno = ENOMEM;
@@ -631,26 +642,26 @@ pl_common_inodelk (call_frame_t *frame, xlator_t *this,
if (frame->root->lk_owner.len == 0) {
/*
special case: this means release all locks
- from this transport
+ from this client
*/
gf_log (this->name, GF_LOG_TRACE,
- "Releasing all locks from transport %p", transport);
+ "Releasing all locks from client %p", frame->root->client);
- release_inode_locks_of_transport (this, dom, inode, transport);
+ release_inode_locks_of_client (this, dom, inode, frame->root->client);
_pl_convert_volume (volume, &res1);
if (res1) {
dom = get_domain (pinode, res1);
if (dom)
- release_inode_locks_of_transport (this, dom,
- inode, transport);
+ release_inode_locks_of_client (this, dom,
+ inode, frame->root->client);
}
op_ret = 0;
goto unwind;
}
- reqlock = new_inode_lock (flock, transport, client_pid,
- &frame->root->lk_owner, volume);
+ reqlock = new_inode_lock (flock, frame->root->client, frame->root->pid,
+ frame, this, volume, conn_id);
if (!reqlock) {
op_ret = -1;
@@ -658,14 +669,10 @@ pl_common_inodelk (call_frame_t *frame, xlator_t *this,
goto unwind;
}
- reqlock->frame = frame;
- reqlock->this = this;
switch (cmd) {
case F_SETLKW:
can_block = 1;
- reqlock->frame = frame;
- reqlock->this = this;
/* fall through */
@@ -697,6 +704,23 @@ pl_common_inodelk (call_frame_t *frame, xlator_t *this,
op_ret = 0;
+ ctx = pl_ctx_get (frame->root->client, this);
+
+ if (ctx == NULL) {
+ gf_log (this->name, GF_LOG_INFO, "pl_ctx_get() failed");
+ goto unwind;
+ }
+
+ if (flock->l_type == F_UNLCK)
+ pl_del_locker (ctx->ltable, volume, loc, fd,
+ &frame->root->lk_owner,
+ GF_FOP_INODELK);
+ else
+ pl_add_locker (ctx->ltable, volume, loc, fd,
+ frame->root->pid,
+ &frame->root->lk_owner,
+ GF_FOP_INODELK);
+
unwind:
if ((inode != NULL) && (flock !=NULL)) {
pl_update_refkeeper (this, inode);
@@ -712,20 +736,22 @@ out:
int
pl_inodelk (call_frame_t *frame, xlator_t *this,
- const char *volume, loc_t *loc, int32_t cmd, struct gf_flock *flock)
+ const char *volume, loc_t *loc, int32_t cmd, struct gf_flock *flock,
+ dict_t *xdata)
{
-
- pl_common_inodelk (frame, this, volume, loc->inode, cmd, flock, loc, NULL);
+ pl_common_inodelk (frame, this, volume, loc->inode, cmd, flock,
+ loc, NULL, xdata);
return 0;
}
int
pl_finodelk (call_frame_t *frame, xlator_t *this,
- const char *volume, fd_t *fd, int32_t cmd, struct gf_flock *flock)
+ const char *volume, fd_t *fd, int32_t cmd, struct gf_flock *flock,
+ dict_t *xdata)
{
-
- pl_common_inodelk (frame, this, volume, fd->inode, cmd, flock, NULL, fd);
+ pl_common_inodelk (frame, this, volume, fd->inode, cmd, flock,
+ NULL, fd, xdata);
return 0;
diff --git a/xlators/features/locks/src/locks.h b/xlators/features/locks/src/locks.h
index 43b674760..76fc941d7 100644
--- a/xlators/features/locks/src/locks.h
+++ b/xlators/features/locks/src/locks.h
@@ -19,10 +19,10 @@
#include "stack.h"
#include "call-stub.h"
#include "locks-mem-types.h"
+#include "client_t.h"
#include "lkowner.h"
-#define POSIX_LOCKS "posix-locks"
struct __pl_fd;
struct __posix_lock {
@@ -33,7 +33,7 @@ struct __posix_lock {
off_t fl_end;
short blocked; /* waiting to acquire */
- struct gf_flock user_flock; /* the flock supplied by the user */
+ struct gf_flock user_flock; /* the flock supplied by the user */
xlator_t *this; /* required for blocked locks */
unsigned long fd_num;
@@ -46,7 +46,7 @@ struct __posix_lock {
/* These two together serve to uniquely identify each process
across nodes */
- void *transport; /* to identify client node */
+ void *client; /* to identify client node */
gf_lkowner_t owner;
pid_t client_pid; /* pid of client process */
};
@@ -63,7 +63,7 @@ struct __pl_inode_lock {
const char *volume;
- struct gf_flock user_flock; /* the flock supplied by the user */
+ struct gf_flock user_flock; /* the flock supplied by the user */
xlator_t *this; /* required for blocked locks */
fd_t *fd;
@@ -75,9 +75,11 @@ struct __pl_inode_lock {
/* These two together serve to uniquely identify each process
across nodes */
- void *transport; /* to identify client node */
+ void *client; /* to identify client node */
gf_lkowner_t owner;
pid_t client_pid; /* pid of client process */
+
+ char *connection_id; /* stores the client connection id */
};
typedef struct __pl_inode_lock pl_inode_lock_t;
@@ -115,7 +117,9 @@ struct __entry_lock {
void *trans;
gf_lkowner_t owner;
- pid_t client_pid; /* pid of client process */
+ pid_t client_pid; /* pid of client process */
+
+ char *connection_id; /* stores the client connection id */
};
typedef struct __entry_lock pl_entry_lock_t;
@@ -152,6 +156,7 @@ typedef struct {
char *brickname;
} posix_locks_private_t;
+
typedef struct {
gf_boolean_t entrylk_count_req;
gf_boolean_t inodelk_count_req;
@@ -167,8 +172,21 @@ typedef struct {
enum {TRUNCATE, FTRUNCATE} op;
} pl_local_t;
+
typedef struct {
struct list_head locks_list;
} pl_fdctx_t;
+
+typedef struct _locks_ctx {
+ gf_lock_t ltable_lock; /* only for replace,
+ ltable has its own internal
+ lock for operations */
+ struct _lock_table *ltable;
+} pl_ctx_t;
+
+
+pl_ctx_t *
+pl_ctx_get (client_t *client, xlator_t *xlator);
+
#endif /* __POSIX_LOCKS_H__ */
diff --git a/xlators/features/locks/src/posix.c b/xlators/features/locks/src/posix.c
index ec670cda8..7bfb38a51 100644
--- a/xlators/features/locks/src/posix.c
+++ b/xlators/features/locks/src/posix.c
@@ -51,7 +51,7 @@ pl_new_fdctx ()
fdctx = GF_CALLOC (1, sizeof (*fdctx),
gf_locks_mt_pl_fdctx_t);
- GF_VALIDATE_OR_GOTO (POSIX_LOCKS, fdctx, out);
+ GF_VALIDATE_OR_GOTO ("posix-locks", fdctx, out);
INIT_LIST_HEAD (&fdctx->locks_list);
@@ -66,7 +66,7 @@ pl_check_n_create_fdctx (xlator_t *this, fd_t *fd)
uint64_t tmp = 0;
pl_fdctx_t *fdctx = NULL;
- GF_VALIDATE_OR_GOTO (POSIX_LOCKS, this, out);
+ GF_VALIDATE_OR_GOTO ("posix-locks", this, out);
GF_VALIDATE_OR_GOTO (this->name, fd, out);
LOCK (&fd->lock);
@@ -119,7 +119,7 @@ pl_truncate_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
static int
truncate_allowed (pl_inode_t *pl_inode,
- void *transport, pid_t client_pid,
+ client_t *client, pid_t client_pid,
gf_lkowner_t *owner, off_t offset)
{
posix_lock_t *l = NULL;
@@ -128,7 +128,7 @@ truncate_allowed (pl_inode_t *pl_inode,
region.fl_start = offset;
region.fl_end = LLONG_MAX;
- region.transport = transport;
+ region.client = client;
region.client_pid = client_pid;
region.owner = *owner;
@@ -139,7 +139,7 @@ truncate_allowed (pl_inode_t *pl_inode,
&& locks_overlap (&region, l)
&& !same_owner (&region, l)) {
ret = 0;
- gf_log (POSIX_LOCKS, GF_LOG_TRACE, "Truncate "
+ gf_log ("posix-locks", GF_LOG_TRACE, "Truncate "
"allowed");
break;
}
@@ -186,7 +186,7 @@ truncate_stat_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
if (priv->mandatory
&& pl_inode->mandatory
- && !truncate_allowed (pl_inode, frame->root->trans,
+ && !truncate_allowed (pl_inode, frame->root->client,
frame->root->pid, &frame->root->lk_owner,
local->offset)) {
op_ret = -1;
@@ -347,7 +347,7 @@ delete_locks_of_fd (xlator_t *this, pl_inode_t *pl_inode, fd_t *fd)
static void
__delete_locks_of_owner (pl_inode_t *pl_inode,
- void *transport, gf_lkowner_t *owner)
+ client_t *client, gf_lkowner_t *owner)
{
posix_lock_t *tmp = NULL;
posix_lock_t *l = NULL;
@@ -357,7 +357,7 @@ __delete_locks_of_owner (pl_inode_t *pl_inode,
list_for_each_entry_safe (l, tmp, &pl_inode->ext_list, list) {
if (l->blocked)
continue;
- if ((l->transport == transport) &&
+ if ((l->client == client) &&
is_same_lkowner (&l->owner, owner)) {
gf_log ("posix-locks", GF_LOG_TRACE,
" Flushing lock"
@@ -810,7 +810,7 @@ pl_migrate_locks (call_frame_t *frame, fd_t *newfd, uint64_t oldfd_num,
list_for_each_entry (l, &pl_inode->ext_list, list) {
if (l->fd_num == oldfd_num) {
l->fd_num = newfd_num;
- l->transport = frame->root->trans;
+ l->client = frame->root->client;
}
}
}
@@ -983,7 +983,7 @@ pl_flush (call_frame_t *frame, xlator_t *this,
}
pthread_mutex_lock (&pl_inode->mutex);
{
- __delete_locks_of_owner (pl_inode, frame->root->trans,
+ __delete_locks_of_owner (pl_inode, frame->root->client,
&frame->root->lk_owner);
}
pthread_mutex_unlock (&pl_inode->mutex);
@@ -1178,7 +1178,7 @@ pl_readv (call_frame_t *frame, xlator_t *this,
if (priv->mandatory && pl_inode->mandatory) {
region.fl_start = offset;
region.fl_end = offset + size - 1;
- region.transport = frame->root->trans;
+ region.client = frame->root->client;
region.fd_num = fd_to_fdnum(fd);
region.client_pid = frame->root->pid;
region.owner = frame->root->lk_owner;
@@ -1272,7 +1272,7 @@ pl_writev (call_frame_t *frame, xlator_t *this, fd_t *fd,
if (priv->mandatory && pl_inode->mandatory) {
region.fl_start = offset;
region.fl_end = offset + iov_length (vector, count) - 1;
- region.transport = frame->root->trans;
+ region.client = frame->root->client;
region.fd_num = fd_to_fdnum(fd);
region.client_pid = frame->root->pid;
region.owner = frame->root->lk_owner;
@@ -1353,7 +1353,7 @@ lock_dup (posix_lock_t *lock)
{
posix_lock_t *new_lock = NULL;
- new_lock = new_posix_lock (&lock->user_flock, lock->transport,
+ new_lock = new_posix_lock (&lock->user_flock, lock->client,
lock->client_pid, &lock->owner,
(fd_t *)lock->fd_num);
return new_lock;
@@ -1513,8 +1513,6 @@ int
pl_lk (call_frame_t *frame, xlator_t *this,
fd_t *fd, int32_t cmd, struct gf_flock *flock, dict_t *xdata)
{
- void *transport = NULL;
- pid_t client_pid = 0;
pl_inode_t *pl_inode = NULL;
int op_ret = 0;
int op_errno = 0;
@@ -1523,9 +1521,6 @@ pl_lk (call_frame_t *frame, xlator_t *this,
posix_lock_t *conf = NULL;
int ret = 0;
- transport = frame->root->trans;
- client_pid = frame->root->pid;
-
if ((flock->l_start < 0) || (flock->l_len < 0)) {
op_ret = -1;
op_errno = EINVAL;
@@ -1539,7 +1534,7 @@ pl_lk (call_frame_t *frame, xlator_t *this,
goto unwind;
}
- reqlock = new_posix_lock (flock, transport, client_pid,
+ reqlock = new_posix_lock (flock, frame->root->client, frame->root->pid,
&frame->root->lk_owner, fd);
if (!reqlock) {
@@ -1764,6 +1759,7 @@ pl_forget (xlator_t *this,
list_del_init (&entry_l->domain_list);
GF_FREE ((char *)entry_l->basename);
+ GF_FREE (entry_l->connection_id);
GF_FREE (entry_l);
}
@@ -1797,6 +1793,7 @@ pl_forget (xlator_t *this,
STACK_UNWIND_STRICT (entrylk, entry_l->frame, -1, 0, NULL);
GF_FREE ((char *)entry_l->basename);
+ GF_FREE (entry_l->connection_id);
GF_FREE (entry_l);
}
@@ -2161,8 +2158,8 @@ out:
void
pl_dump_lock (char *str, int size, struct gf_flock *flock,
- gf_lkowner_t *owner, void *trans, time_t *granted_time,
- time_t *blkd_time, gf_boolean_t active)
+ gf_lkowner_t *owner, void *trans, char *conn_id,
+ time_t *granted_time, time_t *blkd_time, gf_boolean_t active)
{
char *type_str = NULL;
char granted[32] = {0,};
@@ -2190,7 +2187,7 @@ pl_dump_lock (char *str, int size, struct gf_flock *flock,
(unsigned long long) flock->l_start,
(unsigned long long) flock->l_len,
(unsigned long long) flock->l_pid,
- lkowner_utoa (owner), trans,
+ lkowner_utoa (owner), trans, conn_id,
ctime_r (granted_time, granted));
} else {
snprintf (str, size, RANGE_BLKD_GRNTD_FMT,
@@ -2198,7 +2195,7 @@ pl_dump_lock (char *str, int size, struct gf_flock *flock,
(unsigned long long) flock->l_start,
(unsigned long long) flock->l_len,
(unsigned long long) flock->l_pid,
- lkowner_utoa (owner), trans,
+ lkowner_utoa (owner), trans, conn_id,
ctime_r (blkd_time, blocked),
ctime_r (granted_time, granted));
}
@@ -2209,7 +2206,7 @@ pl_dump_lock (char *str, int size, struct gf_flock *flock,
(unsigned long long) flock->l_start,
(unsigned long long) flock->l_len,
(unsigned long long) flock->l_pid,
- lkowner_utoa (owner), trans,
+ lkowner_utoa (owner), trans, conn_id,
ctime_r (blkd_time, blocked));
}
@@ -2247,6 +2244,7 @@ __dump_entrylks (pl_inode_t *pl_inode)
"ENTRYLK_WRLCK", lock->basename,
(unsigned long long) lock->client_pid,
lkowner_utoa (&lock->owner), lock->trans,
+ lock->connection_id,
ctime_r (&lock->granted_time.tv_sec, granted));
} else {
snprintf (tmp, 256, ENTRY_BLKD_GRNTD_FMT,
@@ -2254,6 +2252,7 @@ __dump_entrylks (pl_inode_t *pl_inode)
"ENTRYLK_WRLCK", lock->basename,
(unsigned long long) lock->client_pid,
lkowner_utoa (&lock->owner), lock->trans,
+ lock->connection_id,
ctime_r (&lock->blkd_time.tv_sec, blocked),
ctime_r (&lock->granted_time.tv_sec, granted));
}
@@ -2273,6 +2272,7 @@ __dump_entrylks (pl_inode_t *pl_inode)
"ENTRYLK_WRLCK", lock->basename,
(unsigned long long) lock->client_pid,
lkowner_utoa (&lock->owner), lock->trans,
+ lock->connection_id,
ctime_r (&lock->blkd_time.tv_sec, blocked));
gf_proc_dump_write(key, tmp);
@@ -2323,7 +2323,7 @@ __dump_inodelks (pl_inode_t *pl_inode)
SET_FLOCK_PID (&lock->user_flock, lock);
pl_dump_lock (tmp, 256, &lock->user_flock,
&lock->owner,
- lock->transport,
+ lock->client, lock->connection_id,
&lock->granted_time.tv_sec,
&lock->blkd_time.tv_sec,
_gf_true);
@@ -2340,7 +2340,7 @@ __dump_inodelks (pl_inode_t *pl_inode)
SET_FLOCK_PID (&lock->user_flock, lock);
pl_dump_lock (tmp, 256, &lock->user_flock,
&lock->owner,
- lock->transport,
+ lock->client, lock->connection_id,
0, &lock->blkd_time.tv_sec,
_gf_false);
gf_proc_dump_write(key, tmp);
@@ -2381,7 +2381,7 @@ __dump_posixlks (pl_inode_t *pl_inode)
count,
lock->blocked ? "BLOCKED" : "ACTIVE");
pl_dump_lock (tmp, 256, &lock->user_flock,
- &lock->owner, lock->transport,
+ &lock->owner, lock->client, NULL,
&lock->granted_time.tv_sec, &lock->blkd_time.tv_sec,
(lock->blocked)? _gf_false: _gf_true);
gf_proc_dump_write(key, tmp);
@@ -2505,6 +2505,124 @@ mem_acct_init (xlator_t *this)
return ret;
}
+
+pl_ctx_t*
+pl_ctx_get (client_t *client, xlator_t *xlator)
+{
+ void *tmp = NULL;
+ pl_ctx_t *ctx = NULL;
+
+ client_ctx_get (client, xlator, &tmp);
+
+ ctx = tmp;
+
+ if (ctx != NULL)
+ goto out;
+
+ ctx = GF_CALLOC (1, sizeof (pl_ctx_t), gf_locks_mt_posix_lock_t);
+
+ if (ctx == NULL)
+ goto out;
+
+ ctx->ltable = pl_lock_table_new();
+
+ if (ctx->ltable == NULL) {
+ GF_FREE (ctx);
+ ctx = NULL;
+ goto out;
+ }
+
+ LOCK_INIT (&ctx->ltable_lock);
+
+ if (client_ctx_set (client, xlator, ctx) != 0) {
+ LOCK_DESTROY (&ctx->ltable_lock);
+ GF_FREE (ctx->ltable);
+ GF_FREE (ctx);
+ ctx = NULL;
+ }
+out:
+ return ctx;
+}
+
+static void
+ltable_delete_locks (struct _lock_table *ltable)
+{
+ struct _locker *locker = NULL;
+ struct _locker *tmp = NULL;
+
+ list_for_each_entry_safe (locker, tmp, &ltable->inodelk_lockers, lockers) {
+ if (locker->fd)
+ pl_del_locker (ltable, locker->volume, &locker->loc,
+ locker->fd, &locker->owner,
+ GF_FOP_INODELK);
+ GF_FREE (locker->volume);
+ GF_FREE (locker);
+ }
+
+ list_for_each_entry_safe (locker, tmp, &ltable->entrylk_lockers, lockers) {
+ if (locker->fd)
+ pl_del_locker (ltable, locker->volume, &locker->loc,
+ locker->fd, &locker->owner,
+ GF_FOP_ENTRYLK);
+ GF_FREE (locker->volume);
+ GF_FREE (locker);
+ }
+ GF_FREE (ltable);
+}
+
+
+static int32_t
+destroy_cbk (xlator_t *this, client_t *client)
+{
+ void *tmp = NULL;
+ pl_ctx_t *locks_ctx = NULL;
+
+ client_ctx_del (client, this, &tmp);
+
+ if (tmp == NULL)
+ return 0
+;
+ locks_ctx = tmp;
+ if (locks_ctx->ltable)
+ ltable_delete_locks (locks_ctx->ltable);
+
+ LOCK_DESTROY (&locks_ctx->ltable_lock);
+ GF_FREE (locks_ctx);
+
+ return 0;
+}
+
+
+static int32_t
+disconnect_cbk (xlator_t *this, client_t *client)
+{
+ int32_t ret = 0;
+ pl_ctx_t *locks_ctx = NULL;
+ struct _lock_table *ltable = NULL;
+
+ locks_ctx = pl_ctx_get (client, this);
+ if (locks_ctx == NULL) {
+ gf_log (this->name, GF_LOG_INFO, "pl_ctx_get() failed");
+ goto out;
+ }
+
+ LOCK (&locks_ctx->ltable_lock);
+ {
+ if (locks_ctx->ltable) {
+ ltable = locks_ctx->ltable;
+ locks_ctx->ltable = pl_lock_table_new ();
+ }
+ }
+ UNLOCK (&locks_ctx->ltable_lock);
+
+ if (ltable)
+ ltable_delete_locks (ltable);
+
+out:
+ return ret;
+}
+
+
int
init (xlator_t *this)
{
@@ -2533,7 +2651,7 @@ init (xlator_t *this)
gf_log (this->name, GF_LOG_CRITICAL,
"'locks' translator is not loaded over a storage "
"translator");
- goto out;;
+ goto out;
}
priv = GF_CALLOC (1, sizeof (*priv),
@@ -2635,9 +2753,11 @@ struct xlator_dumpops dumpops = {
};
struct xlator_cbks cbks = {
- .forget = pl_forget,
- .release = pl_release,
- .releasedir = pl_releasedir,
+ .forget = pl_forget,
+ .release = pl_release,
+ .releasedir = pl_releasedir,
+ .client_destroy = destroy_cbk,
+ .client_disconnect = disconnect_cbk,
};
diff --git a/xlators/features/marker/src/marker.c b/xlators/features/marker/src/marker.c
index 3325d8af3..6a2c85691 100644
--- a/xlators/features/marker/src/marker.c
+++ b/xlators/features/marker/src/marker.c
@@ -468,11 +468,16 @@ marker_create_frame (xlator_t *this, marker_local_t *local)
int32_t
marker_xtime_update_marks (xlator_t *this, marker_local_t *local)
{
+ marker_conf_t *priv = NULL;
+
GF_VALIDATE_OR_GOTO ("marker", this, out);
GF_VALIDATE_OR_GOTO (this->name, local, out);
- if ((local->pid == GF_CLIENT_PID_GSYNCD) ||
- (local->pid == GF_CLIENT_PID_DEFRAG))
+ priv = this->private;
+
+ if ((local->pid == GF_CLIENT_PID_GSYNCD
+ && !(priv->feature_enabled & GF_XTIME_GSYNC_FORCE))
+ || (local->pid == GF_CLIENT_PID_DEFRAG))
goto out;
marker_gettimeofday (local);
@@ -1948,6 +1953,73 @@ err:
return 0;
}
+int32_t
+marker_zerofill_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno, struct iatt *prebuf,
+ struct iatt *postbuf, dict_t *xdata)
+{
+ marker_local_t *local = NULL;
+ marker_conf_t *priv = NULL;
+
+ if (op_ret == -1) {
+ gf_log (this->name, GF_LOG_TRACE, "%s occurred during zerofill",
+ strerror (op_errno));
+ }
+
+ local = (marker_local_t *) frame->local;
+
+ frame->local = NULL;
+
+ STACK_UNWIND_STRICT (zerofill, frame, op_ret, op_errno, prebuf,
+ postbuf, xdata);
+
+ if (op_ret == -1 || local == NULL)
+ goto out;
+
+ priv = this->private;
+
+ if (priv->feature_enabled & GF_QUOTA)
+ mq_initiate_quota_txn (this, &local->loc);
+
+ if (priv->feature_enabled & GF_XTIME)
+ marker_xtime_update_marks (this, local);
+out:
+ marker_local_unref (local);
+
+ return 0;
+}
+
+int32_t
+marker_zerofill(call_frame_t *frame, xlator_t *this, fd_t *fd, off_t offset,
+ size_t len, dict_t *xdata)
+{
+ int32_t ret = 0;
+ marker_local_t *local = NULL;
+ marker_conf_t *priv = NULL;
+
+ priv = this->private;
+
+ if (priv->feature_enabled == 0)
+ goto wind;
+
+ local = mem_get0 (this->local_pool);
+
+ MARKER_INIT_LOCAL (frame, local);
+
+ ret = marker_inode_loc_fill (fd->inode, &local->loc);
+
+ if (ret == -1)
+ goto err;
+wind:
+ STACK_WIND (frame, marker_zerofill_cbk, FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->zerofill, fd, offset, len, xdata);
+ return 0;
+err:
+ STACK_UNWIND_STRICT (zerofill, frame, -1, ENOMEM, NULL, NULL, NULL);
+
+ return 0;
+}
+
/* when a call from the special client is received on
* key trusted.glusterfs.volume-mark with value "RESET"
@@ -2593,7 +2665,7 @@ out:
int32_t
reconfigure (xlator_t *this, dict_t *options)
{
- int32_t ret = -1;
+ int32_t ret = 0;
data_t *data = NULL;
gf_boolean_t flag = _gf_false;
marker_conf_t *priv = NULL;
@@ -2634,11 +2706,17 @@ reconfigure (xlator_t *this, dict_t *options)
"xtime updation will fail");
} else {
priv->feature_enabled |= GF_XTIME;
+ data = dict_get (options, "gsync-force-xtime");
+ if (!data)
+ goto out;
+ ret = gf_string2boolean (data->data, &flag);
+ if (ret == 0 && flag)
+ priv->feature_enabled |= GF_XTIME_GSYNC_FORCE;
}
}
}
out:
- return 0;
+ return ret;
}
@@ -2694,9 +2772,16 @@ init (xlator_t *this)
goto err;
priv->feature_enabled |= GF_XTIME;
+ data = dict_get (options, "gsync-force-xtime");
+ if (!data)
+ goto cont;
+ ret = gf_string2boolean (data->data, &flag);
+ if (ret == 0 && flag)
+ priv->feature_enabled |= GF_XTIME_GSYNC_FORCE;
}
}
+ cont:
this->local_pool = mem_pool_new (marker_local_t, 128);
if (!this->local_pool) {
gf_log (this->name, GF_LOG_ERROR,
@@ -2760,6 +2845,7 @@ struct xlator_fops fops = {
.readdirp = marker_readdirp,
.fallocate = marker_fallocate,
.discard = marker_discard,
+ .zerofill = marker_zerofill,
};
struct xlator_cbks cbks = {
@@ -2771,5 +2857,6 @@ struct volume_options options[] = {
{.key = {"timestamp-file"}},
{.key = {"quota"}},
{.key = {"xtime"}},
+ {.key = {"gsync-force-xtime"}},
{.key = {NULL}}
};
diff --git a/xlators/features/marker/src/marker.h b/xlators/features/marker/src/marker.h
index 2f1feeee6..1a58f8cfc 100644
--- a/xlators/features/marker/src/marker.h
+++ b/xlators/features/marker/src/marker.h
@@ -28,8 +28,9 @@
#define TIMESTAMP_FILE "timestamp-file"
enum {
- GF_QUOTA=1,
- GF_XTIME=2
+ GF_QUOTA = 1,
+ GF_XTIME = 2,
+ GF_XTIME_GSYNC_FORCE = 4,
};
/*initialize the local variable*/
diff --git a/xlators/features/qemu-block/Makefile.am b/xlators/features/qemu-block/Makefile.am
new file mode 100644
index 000000000..af437a64d
--- /dev/null
+++ b/xlators/features/qemu-block/Makefile.am
@@ -0,0 +1 @@
+SUBDIRS = src
diff --git a/xlators/features/qemu-block/src/Makefile.am b/xlators/features/qemu-block/src/Makefile.am
new file mode 100644
index 000000000..08a7b62a0
--- /dev/null
+++ b/xlators/features/qemu-block/src/Makefile.am
@@ -0,0 +1,155 @@
+if ENABLE_QEMU_BLOCK
+xlator_LTLIBRARIES = qemu-block.la
+xlatordir = $(libdir)/glusterfs/$(PACKAGE_VERSION)/xlator/features
+
+qemu_block_la_LDFLAGS = -module -avoid-version
+qemu_block_la_LIBADD = $(top_builddir)/libglusterfs/src/libglusterfs.la $(GLIB_LIBS) -lz -lrt
+
+qemu_block_la_SOURCES_qemu = \
+ $(CONTRIBDIR)/qemu/qemu-coroutine.c \
+ $(CONTRIBDIR)/qemu/qemu-coroutine-lock.c \
+ $(CONTRIBDIR)/qemu/qemu-coroutine-sleep.c \
+ $(CONTRIBDIR)/qemu/coroutine-ucontext.c \
+ $(CONTRIBDIR)/qemu/block.c \
+ $(CONTRIBDIR)/qemu/nop-symbols.c
+
+qemu_block_la_SOURCES_qemu_util = \
+ $(CONTRIBDIR)/qemu/util/aes.c \
+ $(CONTRIBDIR)/qemu/util/bitmap.c \
+ $(CONTRIBDIR)/qemu/util/bitops.c \
+ $(CONTRIBDIR)/qemu/util/cutils.c \
+ $(CONTRIBDIR)/qemu/util/error.c \
+ $(CONTRIBDIR)/qemu/util/hbitmap.c \
+ $(CONTRIBDIR)/qemu/util/iov.c \
+ $(CONTRIBDIR)/qemu/util/module.c \
+ $(CONTRIBDIR)/qemu/util/oslib-posix.c \
+ $(CONTRIBDIR)/qemu/util/qemu-option.c \
+ $(CONTRIBDIR)/qemu/util/qemu-error.c \
+ $(CONTRIBDIR)/qemu/util/qemu-thread-posix.c \
+ $(CONTRIBDIR)/qemu/util/unicode.c \
+ $(CONTRIBDIR)/qemu/util/hexdump.c
+
+qemu_block_la_SOURCES_qemu_block = \
+ $(CONTRIBDIR)/qemu/block/snapshot.c \
+ $(CONTRIBDIR)/qemu/block/qcow2-cache.c \
+ $(CONTRIBDIR)/qemu/block/qcow2-cluster.c \
+ $(CONTRIBDIR)/qemu/block/qcow2-refcount.c \
+ $(CONTRIBDIR)/qemu/block/qcow2-snapshot.c \
+ $(CONTRIBDIR)/qemu/block/qcow2.c \
+ $(CONTRIBDIR)/qemu/block/qed-check.c \
+ $(CONTRIBDIR)/qemu/block/qed-cluster.c \
+ $(CONTRIBDIR)/qemu/block/qed-gencb.c \
+ $(CONTRIBDIR)/qemu/block/qed-l2-cache.c \
+ $(CONTRIBDIR)/qemu/block/qed-table.c \
+ $(CONTRIBDIR)/qemu/block/qed.c
+
+qemu_block_la_SOURCES_qemu_qobject = \
+ $(CONTRIBDIR)/qemu/qobject/json-lexer.c \
+ $(CONTRIBDIR)/qemu/qobject/json-parser.c \
+ $(CONTRIBDIR)/qemu/qobject/json-streamer.c \
+ $(CONTRIBDIR)/qemu/qobject/qbool.c \
+ $(CONTRIBDIR)/qemu/qobject/qdict.c \
+ $(CONTRIBDIR)/qemu/qobject/qerror.c \
+ $(CONTRIBDIR)/qemu/qobject/qfloat.c \
+ $(CONTRIBDIR)/qemu/qobject/qint.c \
+ $(CONTRIBDIR)/qemu/qobject/qjson.c \
+ $(CONTRIBDIR)/qemu/qobject/qlist.c \
+ $(CONTRIBDIR)/qemu/qobject/qstring.c
+
+qemu_block_la_SOURCES = \
+ $(qemu_block_la_SOURCES_qemu) \
+ $(qemu_block_la_SOURCES_qemu_util) \
+ $(qemu_block_la_SOURCES_qemu_block) \
+ $(qemu_block_la_SOURCES_qemu_qobject) \
+ bdrv-xlator.c \
+ coroutine-synctask.c \
+ bh-syncop.c \
+ monitor-logging.c \
+ clock-timer.c \
+ qemu-block.c \
+ qb-coroutines.c
+
+noinst_HEADERS_qemu = \
+ $(CONTRIBDIR)/qemu/config-host.h \
+ $(CONTRIBDIR)/qemu/qapi-types.h \
+ $(CONTRIBDIR)/qemu/qmp-commands.h \
+ $(CONTRIBDIR)/qemu/trace/generated-tracers.h \
+ $(CONTRIBDIR)/qemu/include/config.h \
+ $(CONTRIBDIR)/qemu/include/glib-compat.h \
+ $(CONTRIBDIR)/qemu/include/qemu-common.h \
+ $(CONTRIBDIR)/qemu/include/trace.h \
+ $(CONTRIBDIR)/qemu/include/block/coroutine.h \
+ $(CONTRIBDIR)/qemu/include/block/aio.h \
+ $(CONTRIBDIR)/qemu/include/block/block.h \
+ $(CONTRIBDIR)/qemu/include/block/block_int.h \
+ $(CONTRIBDIR)/qemu/include/block/blockjob.h \
+ $(CONTRIBDIR)/qemu/include/block/coroutine.h \
+ $(CONTRIBDIR)/qemu/include/block/coroutine_int.h \
+ $(CONTRIBDIR)/qemu/include/block/snapshot.h \
+ $(CONTRIBDIR)/qemu/include/exec/cpu-common.h \
+ $(CONTRIBDIR)/qemu/include/exec/hwaddr.h \
+ $(CONTRIBDIR)/qemu/include/exec/poison.h \
+ $(CONTRIBDIR)/qemu/include/fpu/softfloat.h \
+ $(CONTRIBDIR)/qemu/include/migration/migration.h \
+ $(CONTRIBDIR)/qemu/include/migration/qemu-file.h \
+ $(CONTRIBDIR)/qemu/include/migration/vmstate.h \
+ $(CONTRIBDIR)/qemu/include/monitor/monitor.h \
+ $(CONTRIBDIR)/qemu/include/monitor/readline.h \
+ $(CONTRIBDIR)/qemu/include/qapi/error.h \
+ $(CONTRIBDIR)/qemu/include/qapi/qmp/json-lexer.h \
+ $(CONTRIBDIR)/qemu/include/qapi/qmp/json-parser.h \
+ $(CONTRIBDIR)/qemu/include/qapi/qmp/json-streamer.h \
+ $(CONTRIBDIR)/qemu/include/qapi/qmp/qbool.h \
+ $(CONTRIBDIR)/qemu/include/qapi/qmp/qdict.h \
+ $(CONTRIBDIR)/qemu/include/qapi/qmp/qerror.h \
+ $(CONTRIBDIR)/qemu/include/qapi/qmp/qfloat.h \
+ $(CONTRIBDIR)/qemu/include/qapi/qmp/qint.h \
+ $(CONTRIBDIR)/qemu/include/qapi/qmp/qjson.h \
+ $(CONTRIBDIR)/qemu/include/qapi/qmp/qlist.h \
+ $(CONTRIBDIR)/qemu/include/qapi/qmp/qobject.h \
+ $(CONTRIBDIR)/qemu/include/qapi/qmp/qstring.h \
+ $(CONTRIBDIR)/qemu/include/qapi/qmp/types.h \
+ $(CONTRIBDIR)/qemu/include/qemu/aes.h \
+ $(CONTRIBDIR)/qemu/include/qemu/atomic.h \
+ $(CONTRIBDIR)/qemu/include/qemu/bitmap.h \
+ $(CONTRIBDIR)/qemu/include/qemu/bitops.h \
+ $(CONTRIBDIR)/qemu/include/qemu/bswap.h \
+ $(CONTRIBDIR)/qemu/include/qemu/compiler.h \
+ $(CONTRIBDIR)/qemu/include/qemu/error-report.h \
+ $(CONTRIBDIR)/qemu/include/qemu/event_notifier.h \
+ $(CONTRIBDIR)/qemu/include/qemu/hbitmap.h \
+ $(CONTRIBDIR)/qemu/include/qemu/host-utils.h \
+ $(CONTRIBDIR)/qemu/include/qemu/iov.h \
+ $(CONTRIBDIR)/qemu/include/qemu/main-loop.h \
+ $(CONTRIBDIR)/qemu/include/qemu/module.h \
+ $(CONTRIBDIR)/qemu/include/qemu/notify.h \
+ $(CONTRIBDIR)/qemu/include/qemu/option.h \
+ $(CONTRIBDIR)/qemu/include/qemu/option_int.h \
+ $(CONTRIBDIR)/qemu/include/qemu/osdep.h \
+ $(CONTRIBDIR)/qemu/include/qemu/queue.h \
+ $(CONTRIBDIR)/qemu/include/qemu/sockets.h \
+ $(CONTRIBDIR)/qemu/include/qemu/thread-posix.h \
+ $(CONTRIBDIR)/qemu/include/qemu/thread.h \
+ $(CONTRIBDIR)/qemu/include/qemu/timer.h \
+ $(CONTRIBDIR)/qemu/include/qemu/typedefs.h \
+ $(CONTRIBDIR)/qemu/include/sysemu/sysemu.h \
+ $(CONTRIBDIR)/qemu/include/sysemu/os-posix.h \
+ $(CONTRIBDIR)/qemu/block/qcow2.h \
+ $(CONTRIBDIR)/qemu/block/qed.h
+
+noinst_HEADERS = \
+ $(noinst_HEADERS_qemu) \
+ qemu-block.h \
+ qemu-block-memory-types.h \
+ qb-coroutines.h
+
+AM_CPPFLAGS = $(GF_CPPFLAGS) -I$(top_srcdir)/libglusterfs/src \
+ -I$(CONTRIBDIR)/qemu \
+ -I$(CONTRIBDIR)/qemu/include \
+ -DGLUSTER_XLATOR
+
+AM_CFLAGS = -fno-strict-aliasing -Wall $(GF_CFLAGS) $(GLIB_CFLAGS)
+
+CLEANFILES =
+
+endif
diff --git a/xlators/features/qemu-block/src/bdrv-xlator.c b/xlators/features/qemu-block/src/bdrv-xlator.c
new file mode 100644
index 000000000..106c59775
--- /dev/null
+++ b/xlators/features/qemu-block/src/bdrv-xlator.c
@@ -0,0 +1,397 @@
+/*
+ Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com>
+ This file is part of GlusterFS.
+
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
+*/
+
+
+#ifndef _CONFIG_H
+#define _CONFIG_H
+#include "config.h"
+#endif
+
+#include "inode.h"
+#include "syncop.h"
+#include "qemu-block.h"
+#include "block/block_int.h"
+
+typedef struct BDRVGlusterState {
+ inode_t *inode;
+} BDRVGlusterState;
+
+static QemuOptsList runtime_opts = {
+ .name = "gluster",
+ .head = QTAILQ_HEAD_INITIALIZER(runtime_opts.head),
+ .desc = {
+ {
+ .name = "filename",
+ .type = QEMU_OPT_STRING,
+ .help = "GFID of file",
+ },
+ { /* end of list */ }
+ },
+};
+
+inode_t *
+qb_inode_from_filename (const char *filename)
+{
+ const char *iptr = NULL;
+ inode_t *inode = NULL;
+
+ iptr = filename + 17;
+ sscanf (iptr, "%p", &inode);
+
+ return inode;
+}
+
+
+int
+qb_inode_to_filename (inode_t *inode, char *filename, int size)
+{
+ return snprintf (filename, size, "gluster://inodep:%p", inode);
+}
+
+
+static fd_t *
+fd_from_bs (BlockDriverState *bs)
+{
+ BDRVGlusterState *s = bs->opaque;
+
+ return fd_anonymous (s->inode);
+}
+
+
+static int
+qemu_gluster_open (BlockDriverState *bs, QDict *options, int bdrv_flags)
+{
+ inode_t *inode = NULL;
+ BDRVGlusterState *s = bs->opaque;
+ QemuOpts *opts = NULL;
+ Error *local_err = NULL;
+ const char *filename = NULL;
+ char gfid_str[128];
+ int ret;
+ qb_conf_t *conf = THIS->private;
+
+ opts = qemu_opts_create_nofail(&runtime_opts);
+ qemu_opts_absorb_qdict(opts, options, &local_err);
+ if (error_is_set(&local_err)) {
+ qerror_report_err(local_err);
+ error_free(local_err);
+ return -EINVAL;
+ }
+
+ filename = qemu_opt_get(opts, "filename");
+
+ /*
+ * gfid:<gfid> format means we're opening a backing image.
+ */
+ ret = sscanf(filename, "gluster://gfid:%s", gfid_str);
+ if (ret) {
+ loc_t loc = {0,};
+ struct iatt buf = {0,};
+ uuid_t gfid;
+
+ uuid_parse(gfid_str, gfid);
+
+ loc.inode = inode_find(conf->root_inode->table, gfid);
+ if (!loc.inode) {
+ loc.inode = inode_new(conf->root_inode->table);
+ uuid_copy(loc.inode->gfid, gfid);
+ }
+
+ uuid_copy(loc.gfid, loc.inode->gfid);
+ ret = syncop_lookup(FIRST_CHILD(THIS), &loc, NULL, &buf, NULL,
+ NULL);
+ if (ret) {
+ loc_wipe(&loc);
+ return -errno;
+ }
+
+ s->inode = inode_ref(loc.inode);
+ loc_wipe(&loc);
+ } else {
+ inode = qb_inode_from_filename (filename);
+ if (!inode)
+ return -EINVAL;
+
+ s->inode = inode_ref(inode);
+ }
+
+ return 0;
+}
+
+
+static int
+qemu_gluster_create (const char *filename, QEMUOptionParameter *options)
+{
+ uint64_t total_size = 0;
+ inode_t *inode = NULL;
+ fd_t *fd = NULL;
+ struct iatt stat = {0, };
+ int ret = 0;
+
+ inode = qb_inode_from_filename (filename);
+ if (!inode)
+ return -EINVAL;
+
+ while (options && options->name) {
+ if (!strcmp(options->name, BLOCK_OPT_SIZE)) {
+ total_size = options->value.n / BDRV_SECTOR_SIZE;
+ }
+ options++;
+ }
+
+ fd = fd_anonymous (inode);
+ if (!fd)
+ return -ENOMEM;
+
+ ret = syncop_fstat (FIRST_CHILD(THIS), fd, &stat);
+ if (ret) {
+ fd_unref (fd);
+ return -errno;
+ }
+
+ if (stat.ia_size) {
+ /* format ONLY if the filesize is 0 bytes */
+ fd_unref (fd);
+ return -EFBIG;
+ }
+
+ if (total_size) {
+ ret = syncop_ftruncate (FIRST_CHILD(THIS), fd, total_size);
+ if (ret) {
+ fd_unref (fd);
+ return -errno;
+ }
+ }
+
+ fd_unref (fd);
+ return 0;
+}
+
+
+static int
+qemu_gluster_co_readv (BlockDriverState *bs, int64_t sector_num, int nb_sectors,
+ QEMUIOVector *qiov)
+{
+ fd_t *fd = NULL;
+ off_t offset = 0;
+ size_t size = 0;
+ struct iovec *iov = NULL;
+ int count = 0;
+ struct iobref *iobref = NULL;
+ int ret = 0;
+
+ fd = fd_from_bs (bs);
+ if (!fd)
+ return -EIO;
+
+ offset = sector_num * BDRV_SECTOR_SIZE;
+ size = nb_sectors * BDRV_SECTOR_SIZE;
+
+ ret = syncop_readv (FIRST_CHILD(THIS), fd, size, offset, 0,
+ &iov, &count, &iobref);
+ if (ret < 0) {
+ ret = -errno;
+ goto out;
+ }
+
+ iov_copy (qiov->iov, qiov->niov, iov, count); /* *choke!* */
+
+out:
+ GF_FREE (iov);
+ if (iobref)
+ iobref_unref (iobref);
+ fd_unref (fd);
+ return ret;
+}
+
+
+static int
+qemu_gluster_co_writev (BlockDriverState *bs, int64_t sector_num, int nb_sectors,
+ QEMUIOVector *qiov)
+{
+ fd_t *fd = NULL;
+ off_t offset = 0;
+ size_t size = 0;
+ struct iobref *iobref = NULL;
+ struct iobuf *iobuf = NULL;
+ struct iovec iov = {0, };
+ int ret = -ENOMEM;
+
+ fd = fd_from_bs (bs);
+ if (!fd)
+ return -EIO;
+
+ offset = sector_num * BDRV_SECTOR_SIZE;
+ size = nb_sectors * BDRV_SECTOR_SIZE;
+
+ iobuf = iobuf_get2 (THIS->ctx->iobuf_pool, size);
+ if (!iobuf)
+ goto out;
+
+ iobref = iobref_new ();
+ if (!iobref) {
+ iobuf_unref (iobuf);
+ goto out;
+ }
+
+ iobref_add (iobref, iobuf);
+
+ iov_unload (iobuf_ptr (iobuf), qiov->iov, qiov->niov); /* *choke!* */
+
+ iov.iov_base = iobuf_ptr (iobuf);
+ iov.iov_len = size;
+
+ ret = syncop_writev (FIRST_CHILD(THIS), fd, &iov, 1, offset, iobref, 0);
+ if (ret < 0)
+ ret = -errno;
+
+out:
+ if (iobuf)
+ iobuf_unref (iobuf);
+ if (iobref)
+ iobref_unref (iobref);
+ fd_unref (fd);
+ return ret;
+}
+
+
+static int
+qemu_gluster_co_flush (BlockDriverState *bs)
+{
+ fd_t *fd = NULL;
+ int ret = 0;
+
+ fd = fd_from_bs (bs);
+
+ ret = syncop_flush (FIRST_CHILD(THIS), fd);
+
+ fd_unref (fd);
+
+ return ret;
+}
+
+
+static int
+qemu_gluster_co_fsync (BlockDriverState *bs)
+{
+ fd_t *fd = NULL;
+ int ret = 0;
+
+ fd = fd_from_bs (bs);
+
+ ret = syncop_fsync (FIRST_CHILD(THIS), fd, 0);
+
+ fd_unref (fd);
+
+ return ret;
+}
+
+
+static int
+qemu_gluster_truncate (BlockDriverState *bs, int64_t offset)
+{
+ fd_t *fd = NULL;
+ int ret = 0;
+
+ fd = fd_from_bs (bs);
+
+ ret = syncop_ftruncate (FIRST_CHILD(THIS), fd, offset);
+
+ fd_unref (fd);
+
+ if (ret < 0)
+ return ret;
+
+ return ret;
+}
+
+
+static int64_t
+qemu_gluster_getlength (BlockDriverState *bs)
+{
+ fd_t *fd = NULL;
+ int ret = 0;
+ struct iatt iatt = {0, };
+
+ fd = fd_from_bs (bs);
+
+ ret = syncop_fstat (FIRST_CHILD(THIS), fd, &iatt);
+ if (ret < 0)
+ return -1;
+
+ return iatt.ia_size;
+}
+
+
+static int64_t
+qemu_gluster_allocated_file_size (BlockDriverState *bs)
+{
+ fd_t *fd = NULL;
+ int ret = 0;
+ struct iatt iatt = {0, };
+
+ fd = fd_from_bs (bs);
+
+ ret = syncop_fstat (FIRST_CHILD(THIS), fd, &iatt);
+ if (ret < 0)
+ return -1;
+
+ return iatt.ia_blocks * 512;
+}
+
+
+static void
+qemu_gluster_close (BlockDriverState *bs)
+{
+ BDRVGlusterState *s = NULL;
+
+ s = bs->opaque;
+
+ inode_unref (s->inode);
+
+ return;
+}
+
+
+static QEMUOptionParameter qemu_gluster_create_options[] = {
+ {
+ .name = BLOCK_OPT_SIZE,
+ .type = OPT_SIZE,
+ .help = "Virtual disk size"
+ },
+ { NULL }
+};
+
+
+static BlockDriver bdrv_gluster = {
+ .format_name = "gluster",
+ .protocol_name = "gluster",
+ .instance_size = sizeof(BDRVGlusterState),
+ .bdrv_file_open = qemu_gluster_open,
+ .bdrv_close = qemu_gluster_close,
+ .bdrv_create = qemu_gluster_create,
+ .bdrv_getlength = qemu_gluster_getlength,
+ .bdrv_get_allocated_file_size = qemu_gluster_allocated_file_size,
+ .bdrv_co_readv = qemu_gluster_co_readv,
+ .bdrv_co_writev = qemu_gluster_co_writev,
+ .bdrv_co_flush_to_os = qemu_gluster_co_flush,
+ .bdrv_co_flush_to_disk = qemu_gluster_co_fsync,
+ .bdrv_truncate = qemu_gluster_truncate,
+ .create_options = qemu_gluster_create_options,
+};
+
+
+static void bdrv_gluster_init(void)
+{
+ bdrv_register(&bdrv_gluster);
+}
+
+
+block_init(bdrv_gluster_init);
diff --git a/xlators/features/qemu-block/src/bh-syncop.c b/xlators/features/qemu-block/src/bh-syncop.c
new file mode 100644
index 000000000..e8686f6d4
--- /dev/null
+++ b/xlators/features/qemu-block/src/bh-syncop.c
@@ -0,0 +1,48 @@
+/*
+ Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com>
+ This file is part of GlusterFS.
+
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
+*/
+
+
+#ifndef _CONFIG_H
+#define _CONFIG_H
+#include "config.h"
+#endif
+
+#include "glusterfs.h"
+#include "logging.h"
+#include "dict.h"
+#include "xlator.h"
+#include "syncop.h"
+#include "qemu-block-memory-types.h"
+
+#include "block/aio.h"
+
+void
+qemu_bh_schedule (QEMUBH *bh)
+{
+ return;
+}
+
+void
+qemu_bh_cancel (QEMUBH *bh)
+{
+ return;
+}
+
+void
+qemu_bh_delete (QEMUBH *bh)
+{
+
+}
+
+QEMUBH *
+qemu_bh_new (QEMUBHFunc *cb, void *opaque)
+{
+ return NULL;
+}
diff --git a/xlators/features/qemu-block/src/clock-timer.c b/xlators/features/qemu-block/src/clock-timer.c
new file mode 100644
index 000000000..fcbec6ad1
--- /dev/null
+++ b/xlators/features/qemu-block/src/clock-timer.c
@@ -0,0 +1,60 @@
+/*
+ Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com>
+ This file is part of GlusterFS.
+
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
+*/
+
+
+#ifndef _CONFIG_H
+#define _CONFIG_H
+#include "config.h"
+#endif
+
+#include "glusterfs.h"
+#include "logging.h"
+#include "dict.h"
+#include "xlator.h"
+#include "syncop.h"
+#include "qemu-block-memory-types.h"
+
+#include "qemu/timer.h"
+
+QEMUClock *vm_clock;
+int use_rt_clock = 0;
+
+QEMUTimer *qemu_new_timer (QEMUClock *clock, int scale,
+ QEMUTimerCB *cb, void *opaque)
+{
+ return NULL;
+}
+
+int64_t qemu_get_clock_ns (QEMUClock *clock)
+{
+ return 0;
+}
+
+void qemu_mod_timer (QEMUTimer *ts, int64_t expire_time)
+{
+ return;
+}
+
+void qemu_free_timer (QEMUTimer *ts)
+{
+
+}
+
+void qemu_del_timer (QEMUTimer *ts)
+{
+
+}
+
+bool qemu_aio_wait()
+{
+ synctask_wake (synctask_get());
+ synctask_yield (synctask_get());
+ return 0;
+}
diff --git a/xlators/features/qemu-block/src/coroutine-synctask.c b/xlators/features/qemu-block/src/coroutine-synctask.c
new file mode 100644
index 000000000..e43988a95
--- /dev/null
+++ b/xlators/features/qemu-block/src/coroutine-synctask.c
@@ -0,0 +1,116 @@
+/*
+ Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com>
+ This file is part of GlusterFS.
+
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
+*/
+
+
+#ifndef _CONFIG_H
+#define _CONFIG_H
+#include "config.h"
+#endif
+
+#include "glusterfs.h"
+#include "logging.h"
+#include "dict.h"
+#include "xlator.h"
+#include "syncop.h"
+#include "qemu-block-memory-types.h"
+
+#include "qemu-block.h"
+
+/*
+ * This code serves as the bridge from the main glusterfs context to the qemu
+ * coroutine context via synctask. We create a single threaded syncenv with a
+ * single synctask responsible for processing a queue of coroutines. The qemu
+ * code invoked from within the synctask function handlers uses the ucontext
+ * coroutine implementation and scheduling logic internal to qemu. This
+ * effectively donates a thread of execution to qemu and its internal coroutine
+ * management.
+ *
+ * NOTE: The existence of concurrent synctasks has proven quite racy with regard
+ * to qemu coroutine management, particularly related to the lifecycle
+ * differences with top-level synctasks and internally created coroutines and
+ * interactions with qemu-internal queues (and locks, in turn). We explicitly
+ * disallow this scenario, via the queue, until it is more well supported.
+ */
+
+static struct {
+ struct list_head queue;
+ gf_lock_t lock;
+ struct synctask *task;
+} qb_co;
+
+static void
+init_qbco()
+{
+ INIT_LIST_HEAD(&qb_co.queue);
+ LOCK_INIT(&qb_co.lock);
+}
+
+static int
+synctask_nop_cbk (int ret, call_frame_t *frame, void *opaque)
+{
+ return 0;
+}
+
+static int
+qb_synctask_wrap (void *opaque)
+{
+ qb_local_t *qb_local, *tmp;
+
+ LOCK(&qb_co.lock);
+
+ while (!list_empty(&qb_co.queue)) {
+ list_for_each_entry_safe(qb_local, tmp, &qb_co.queue, list) {
+ list_del_init(&qb_local->list);
+ break;
+ }
+
+ UNLOCK(&qb_co.lock);
+
+ qb_local->synctask_fn(qb_local);
+ /* qb_local is now unwound and gone! */
+
+ LOCK(&qb_co.lock);
+ }
+
+ qb_co.task = NULL;
+
+ UNLOCK(&qb_co.lock);
+
+ return 0;
+}
+
+int
+qb_coroutine (call_frame_t *frame, synctask_fn_t fn)
+{
+ qb_local_t *qb_local = NULL;
+ qb_conf_t *qb_conf = NULL;
+ static int init = 0;
+
+ qb_local = frame->local;
+ qb_local->synctask_fn = fn;
+ qb_conf = frame->this->private;
+
+ if (!init) {
+ init = 1;
+ init_qbco();
+ }
+
+ LOCK(&qb_co.lock);
+
+ if (!qb_co.task)
+ qb_co.task = synctask_create(qb_conf->env, qb_synctask_wrap,
+ synctask_nop_cbk, frame, NULL);
+
+ list_add_tail(&qb_local->list, &qb_co.queue);
+
+ UNLOCK(&qb_co.lock);
+
+ return 0;
+}
diff --git a/xlators/features/qemu-block/src/monitor-logging.c b/xlators/features/qemu-block/src/monitor-logging.c
new file mode 100644
index 000000000..d37c37f0f
--- /dev/null
+++ b/xlators/features/qemu-block/src/monitor-logging.c
@@ -0,0 +1,50 @@
+/*
+ Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com>
+ This file is part of GlusterFS.
+
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
+*/
+
+
+#ifndef _CONFIG_H
+#define _CONFIG_H
+#include "config.h"
+#endif
+
+#include "glusterfs.h"
+#include "logging.h"
+#include "dict.h"
+#include "xlator.h"
+#include "qemu-block-memory-types.h"
+
+#include "block/block_int.h"
+
+Monitor *cur_mon;
+
+int
+monitor_cur_is_qmp()
+{
+ /* No QMP support here */
+ return 0;
+}
+
+void
+monitor_set_error (Monitor *mon, QError *qerror)
+{
+ /* NOP here */
+ return;
+}
+
+
+void
+monitor_vprintf(Monitor *mon, const char *fmt, va_list ap)
+{
+ char buf[4096];
+
+ vsnprintf(buf, sizeof(buf), fmt, ap);
+
+ gf_log (THIS->name, GF_LOG_ERROR, "%s", buf);
+}
diff --git a/xlators/features/qemu-block/src/qb-coroutines.c b/xlators/features/qemu-block/src/qb-coroutines.c
new file mode 100644
index 000000000..7c52adb21
--- /dev/null
+++ b/xlators/features/qemu-block/src/qb-coroutines.c
@@ -0,0 +1,662 @@
+/*
+ Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com>
+ This file is part of GlusterFS.
+
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
+*/
+
+
+#ifndef _CONFIG_H
+#define _CONFIG_H
+#include "config.h"
+#endif
+
+#include "glusterfs.h"
+#include "logging.h"
+#include "dict.h"
+#include "xlator.h"
+#include "inode.h"
+#include "call-stub.h"
+#include "defaults.h"
+#include "qemu-block-memory-types.h"
+#include "qemu-block.h"
+#include "qb-coroutines.h"
+
+
+int
+qb_format_and_resume (void *opaque)
+{
+ qb_local_t *local = NULL;
+ call_frame_t *frame = NULL;
+ call_stub_t *stub = NULL;
+ inode_t *inode = NULL;
+ char filename[64];
+ char base_filename[128];
+ int use_base = 0;
+ qb_inode_t *qb_inode = NULL;
+ Error *local_err = NULL;
+ fd_t *fd = NULL;
+ dict_t *xattr = NULL;
+ qb_conf_t *qb_conf = NULL;
+ int ret = -1;
+
+ local = opaque;
+ frame = local->frame;
+ stub = local->stub;
+ inode = local->inode;
+ qb_conf = frame->this->private;
+
+ qb_inode_to_filename (inode, filename, 64);
+
+ qb_inode = qb_inode_ctx_get (frame->this, inode);
+
+ /*
+ * See if the caller specified a backing image.
+ */
+ if (!uuid_is_null(qb_inode->backing_gfid) || qb_inode->backing_fname) {
+ loc_t loc = {0,};
+ char gfid_str[64];
+ struct iatt buf;
+
+ if (!uuid_is_null(qb_inode->backing_gfid)) {
+ loc.inode = inode_find(qb_conf->root_inode->table,
+ qb_inode->backing_gfid);
+ if (!loc.inode) {
+ loc.inode = inode_new(qb_conf->root_inode->table);
+ uuid_copy(loc.inode->gfid,
+ qb_inode->backing_gfid);
+ }
+ uuid_copy(loc.gfid, loc.inode->gfid);
+ } else if (qb_inode->backing_fname) {
+ loc.inode = inode_new(qb_conf->root_inode->table);
+ loc.name = qb_inode->backing_fname;
+ loc.parent = inode_parent(inode, NULL, NULL);
+ loc_path(&loc, loc.name);
+ }
+
+ /*
+ * Lookup the backing image. Verify existence and/or get the
+ * gfid if we don't already have it.
+ */
+ ret = syncop_lookup(FIRST_CHILD(frame->this), &loc, NULL, &buf,
+ NULL, NULL);
+ GF_FREE(qb_inode->backing_fname);
+ if (ret) {
+ loc_wipe(&loc);
+ ret = errno;
+ goto err;
+ }
+
+ uuid_copy(qb_inode->backing_gfid, buf.ia_gfid);
+ loc_wipe(&loc);
+
+ /*
+ * We pass the filename of the backing image into the qemu block
+ * subsystem as the associated gfid. This is embedded into the
+ * clone image and passed along to the gluster bdrv backend when
+ * the block subsystem needs to operate on the backing image on
+ * behalf of the clone.
+ */
+ uuid_unparse(qb_inode->backing_gfid, gfid_str);
+ snprintf(base_filename, sizeof(base_filename),
+ "gluster://gfid:%s", gfid_str);
+ use_base = 1;
+ }
+
+ bdrv_img_create (filename, qb_inode->fmt,
+ use_base ? base_filename : NULL, 0, 0, qb_inode->size,
+ 0, &local_err, true);
+
+ if (error_is_set (&local_err)) {
+ gf_log (frame->this->name, GF_LOG_ERROR, "%s",
+ error_get_pretty (local_err));
+ error_free (local_err);
+ QB_STUB_UNWIND (stub, -1, EIO);
+ return 0;
+ }
+
+ fd = fd_anonymous (inode);
+ if (!fd) {
+ gf_log (frame->this->name, GF_LOG_ERROR,
+ "could not create anonymous fd for %s",
+ uuid_utoa (inode->gfid));
+ QB_STUB_UNWIND (stub, -1, ENOMEM);
+ return 0;
+ }
+
+ xattr = dict_new ();
+ if (!xattr) {
+ gf_log (frame->this->name, GF_LOG_ERROR,
+ "could not allocate xattr dict for %s",
+ uuid_utoa (inode->gfid));
+ QB_STUB_UNWIND (stub, -1, ENOMEM);
+ fd_unref (fd);
+ return 0;
+ }
+
+ ret = dict_set_str (xattr, qb_conf->qb_xattr_key, local->fmt);
+ if (ret) {
+ gf_log (frame->this->name, GF_LOG_ERROR,
+ "could not dict_set for %s",
+ uuid_utoa (inode->gfid));
+ QB_STUB_UNWIND (stub, -1, ENOMEM);
+ fd_unref (fd);
+ dict_unref (xattr);
+ return 0;
+ }
+
+ ret = syncop_fsetxattr (FIRST_CHILD(THIS), fd, xattr, 0);
+ if (ret) {
+ ret = errno;
+ gf_log (frame->this->name, GF_LOG_ERROR,
+ "failed to setxattr for %s",
+ uuid_utoa (inode->gfid));
+ QB_STUB_UNWIND (stub, -1, ret);
+ fd_unref (fd);
+ dict_unref (xattr);
+ return 0;
+ }
+
+ fd_unref (fd);
+ dict_unref (xattr);
+
+ QB_STUB_UNWIND (stub, 0, 0);
+
+ return 0;
+
+err:
+ QB_STUB_UNWIND(stub, -1, ret);
+ return 0;
+}
+
+
+static BlockDriverState *
+qb_bs_create (inode_t *inode, const char *fmt)
+{
+ char filename[64];
+ BlockDriverState *bs = NULL;
+ BlockDriver *drv = NULL;
+ int op_errno = 0;
+ int ret = 0;
+
+ bs = bdrv_new (uuid_utoa (inode->gfid));
+ if (!bs) {
+ op_errno = ENOMEM;
+ gf_log (THIS->name, GF_LOG_ERROR,
+ "could not allocate @bdrv for gfid:%s",
+ uuid_utoa (inode->gfid));
+ goto err;
+ }
+
+ drv = bdrv_find_format (fmt);
+ if (!drv) {
+ op_errno = EINVAL;
+ gf_log (THIS->name, GF_LOG_ERROR,
+ "Unknown file format: %s for gfid:%s",
+ fmt, uuid_utoa (inode->gfid));
+ goto err;
+ }
+
+ qb_inode_to_filename (inode, filename, 64);
+
+ ret = bdrv_open (bs, filename, NULL, BDRV_O_RDWR, drv);
+ if (ret < 0) {
+ op_errno = -ret;
+ gf_log (THIS->name, GF_LOG_ERROR,
+ "Unable to bdrv_open() gfid:%s (%s)",
+ uuid_utoa (inode->gfid), strerror (op_errno));
+ goto err;
+ }
+
+ return bs;
+err:
+ errno = op_errno;
+ return NULL;
+}
+
+
+int
+qb_co_open (void *opaque)
+{
+ qb_local_t *local = NULL;
+ call_frame_t *frame = NULL;
+ call_stub_t *stub = NULL;
+ inode_t *inode = NULL;
+ qb_inode_t *qb_inode = NULL;
+
+ local = opaque;
+ frame = local->frame;
+ stub = local->stub;
+ inode = local->inode;
+
+ qb_inode = qb_inode_ctx_get (frame->this, inode);
+ if (!qb_inode->bs) {
+ /* FIXME: we need locks around this when
+ enabling multithreaded syncop/coroutine
+ for qemu-block
+ */
+
+ qb_inode->bs = qb_bs_create (inode, qb_inode->fmt);
+ if (!qb_inode->bs) {
+ QB_STUB_UNWIND (stub, -1, errno);
+ return 0;
+ }
+ }
+ qb_inode->refcnt++;
+
+ QB_STUB_RESUME (stub);
+
+ return 0;
+}
+
+
+int
+qb_co_writev (void *opaque)
+{
+ qb_local_t *local = NULL;
+ call_frame_t *frame = NULL;
+ call_stub_t *stub = NULL;
+ inode_t *inode = NULL;
+ qb_inode_t *qb_inode = NULL;
+ QEMUIOVector qiov = {0, };
+ int ret = 0;
+
+ local = opaque;
+ frame = local->frame;
+ stub = local->stub;
+ inode = local->inode;
+
+ qb_inode = qb_inode_ctx_get (frame->this, inode);
+ if (!qb_inode->bs) {
+ /* FIXME: we need locks around this when
+ enabling multithreaded syncop/coroutine
+ for qemu-block
+ */
+
+ qb_inode->bs = qb_bs_create (inode, qb_inode->fmt);
+ if (!qb_inode->bs) {
+ QB_STUB_UNWIND (stub, -1, errno);
+ return 0;
+ }
+ }
+
+ qemu_iovec_init_external (&qiov, stub->args.vector, stub->args.count);
+
+ ret = bdrv_pwritev (qb_inode->bs, stub->args.offset, &qiov);
+
+ if (ret < 0) {
+ QB_STUB_UNWIND (stub, -1, -ret);
+ } else {
+ QB_STUB_UNWIND (stub, ret, 0);
+ }
+
+ return 0;
+}
+
+
+int
+qb_co_readv (void *opaque)
+{
+ qb_local_t *local = NULL;
+ call_frame_t *frame = NULL;
+ call_stub_t *stub = NULL;
+ inode_t *inode = NULL;
+ qb_inode_t *qb_inode = NULL;
+ struct iobuf *iobuf = NULL;
+ struct iobref *iobref = NULL;
+ struct iovec iov = {0, };
+ int ret = 0;
+
+ local = opaque;
+ frame = local->frame;
+ stub = local->stub;
+ inode = local->inode;
+
+ qb_inode = qb_inode_ctx_get (frame->this, inode);
+ if (!qb_inode->bs) {
+ /* FIXME: we need locks around this when
+ enabling multithreaded syncop/coroutine
+ for qemu-block
+ */
+
+ qb_inode->bs = qb_bs_create (inode, qb_inode->fmt);
+ if (!qb_inode->bs) {
+ QB_STUB_UNWIND (stub, -1, errno);
+ return 0;
+ }
+ }
+
+ if (stub->args.offset >= qb_inode->size) {
+ QB_STUB_UNWIND (stub, 0, 0);
+ return 0;
+ }
+
+ iobuf = iobuf_get2 (frame->this->ctx->iobuf_pool, stub->args.size);
+ if (!iobuf) {
+ QB_STUB_UNWIND (stub, -1, ENOMEM);
+ return 0;
+ }
+
+ iobref = iobref_new ();
+ if (!iobref) {
+ QB_STUB_UNWIND (stub, -1, ENOMEM);
+ iobuf_unref (iobuf);
+ return 0;
+ }
+
+ if (iobref_add (iobref, iobuf) < 0) {
+ iobuf_unref (iobuf);
+ iobref_unref (iobref);
+ QB_STUB_UNWIND (stub, -1, ENOMEM);
+ return 0;
+ }
+
+ ret = bdrv_pread (qb_inode->bs, stub->args.offset, iobuf_ptr (iobuf),
+ stub->args.size);
+
+ if (ret < 0) {
+ QB_STUB_UNWIND (stub, -1, -ret);
+ iobref_unref (iobref);
+ return 0;
+ }
+
+ iov.iov_base = iobuf_ptr (iobuf);
+ iov.iov_len = ret;
+
+ stub->args_cbk.vector = iov_dup (&iov, 1);
+ stub->args_cbk.count = 1;
+ stub->args_cbk.iobref = iobref;
+
+ QB_STUB_UNWIND (stub, ret, 0);
+
+ return 0;
+}
+
+
+int
+qb_co_fsync (void *opaque)
+{
+ qb_local_t *local = NULL;
+ call_frame_t *frame = NULL;
+ call_stub_t *stub = NULL;
+ inode_t *inode = NULL;
+ qb_inode_t *qb_inode = NULL;
+ int ret = 0;
+
+ local = opaque;
+ frame = local->frame;
+ stub = local->stub;
+ inode = local->inode;
+
+ qb_inode = qb_inode_ctx_get (frame->this, inode);
+ if (!qb_inode->bs) {
+ /* FIXME: we need locks around this when
+ enabling multithreaded syncop/coroutine
+ for qemu-block
+ */
+
+ qb_inode->bs = qb_bs_create (inode, qb_inode->fmt);
+ if (!qb_inode->bs) {
+ QB_STUB_UNWIND (stub, -1, errno);
+ return 0;
+ }
+ }
+
+ ret = bdrv_flush (qb_inode->bs);
+
+ if (ret < 0) {
+ QB_STUB_UNWIND (stub, -1, -ret);
+ } else {
+ QB_STUB_UNWIND (stub, ret, 0);
+ }
+
+ return 0;
+}
+
+
+static void
+qb_update_size_xattr (xlator_t *this, fd_t *fd, const char *fmt, off_t offset)
+{
+ char val[QB_XATTR_VAL_MAX];
+ qb_conf_t *qb_conf = NULL;
+ dict_t *xattr = NULL;
+
+ qb_conf = this->private;
+
+ snprintf (val, QB_XATTR_VAL_MAX, "%s:%llu",
+ fmt, (long long unsigned) offset);
+
+ xattr = dict_new ();
+ if (!xattr)
+ return;
+
+ if (dict_set_str (xattr, qb_conf->qb_xattr_key, val) != 0) {
+ dict_unref (xattr);
+ return;
+ }
+
+ syncop_fsetxattr (FIRST_CHILD(this), fd, xattr, 0);
+ dict_unref (xattr);
+}
+
+
+int
+qb_co_truncate (void *opaque)
+{
+ qb_local_t *local = NULL;
+ call_frame_t *frame = NULL;
+ call_stub_t *stub = NULL;
+ inode_t *inode = NULL;
+ qb_inode_t *qb_inode = NULL;
+ int ret = 0;
+ off_t offset = 0;
+ xlator_t *this = NULL;
+
+ this = THIS;
+
+ local = opaque;
+ frame = local->frame;
+ stub = local->stub;
+ inode = local->inode;
+
+ qb_inode = qb_inode_ctx_get (frame->this, inode);
+ if (!qb_inode->bs) {
+ /* FIXME: we need locks around this when
+ enabling multithreaded syncop/coroutine
+ for qemu-block
+ */
+
+ qb_inode->bs = qb_bs_create (inode, qb_inode->fmt);
+ if (!qb_inode->bs) {
+ QB_STUB_UNWIND (stub, -1, errno);
+ return 0;
+ }
+ }
+
+ syncop_fstat (FIRST_CHILD(this), local->fd, &stub->args_cbk.prestat);
+ stub->args_cbk.prestat.ia_size = qb_inode->size;
+
+ ret = bdrv_truncate (qb_inode->bs, stub->args.offset);
+ if (ret < 0)
+ goto out;
+
+ offset = bdrv_getlength (qb_inode->bs);
+
+ qb_inode->size = offset;
+
+ syncop_fstat (FIRST_CHILD(this), local->fd, &stub->args_cbk.poststat);
+ stub->args_cbk.poststat.ia_size = qb_inode->size;
+
+ qb_update_size_xattr (this, local->fd, qb_inode->fmt, qb_inode->size);
+
+out:
+ if (ret < 0) {
+ QB_STUB_UNWIND (stub, -1, -ret);
+ } else {
+ QB_STUB_UNWIND (stub, ret, 0);
+ }
+
+ return 0;
+}
+
+
+int
+qb_co_close (void *opaque)
+{
+ qb_local_t *local = NULL;
+ call_frame_t *frame = NULL;
+ inode_t *inode = NULL;
+ qb_inode_t *qb_inode = NULL;
+ BlockDriverState *bs = NULL;
+
+ local = opaque;
+ inode = local->inode;
+
+ qb_inode = qb_inode_ctx_get (THIS, inode);
+
+ if (!--qb_inode->refcnt) {
+ bs = qb_inode->bs;
+ qb_inode->bs = NULL;
+ bdrv_delete (bs);
+ }
+
+ frame = local->frame;
+ frame->local = NULL;
+ qb_local_free (THIS, local);
+ STACK_DESTROY (frame->root);
+
+ return 0;
+}
+
+
+int
+qb_snapshot_create (void *opaque)
+{
+ qb_local_t *local = NULL;
+ call_frame_t *frame = NULL;
+ call_stub_t *stub = NULL;
+ inode_t *inode = NULL;
+ qb_inode_t *qb_inode = NULL;
+ QEMUSnapshotInfo sn;
+ struct timeval tv = {0, };
+ int ret = 0;
+
+ local = opaque;
+ frame = local->frame;
+ stub = local->stub;
+ inode = local->inode;
+
+ qb_inode = qb_inode_ctx_get (frame->this, inode);
+ if (!qb_inode->bs) {
+ /* FIXME: we need locks around this when
+ enabling multithreaded syncop/coroutine
+ for qemu-block
+ */
+
+ qb_inode->bs = qb_bs_create (inode, qb_inode->fmt);
+ if (!qb_inode->bs) {
+ QB_STUB_UNWIND (stub, -1, errno);
+ return 0;
+ }
+ }
+
+ memset (&sn, 0, sizeof (sn));
+ pstrcpy (sn.name, sizeof(sn.name), local->name);
+ gettimeofday (&tv, NULL);
+ sn.date_sec = tv.tv_sec;
+ sn.date_nsec = tv.tv_usec * 1000;
+
+ ret = bdrv_snapshot_create (qb_inode->bs, &sn);
+ if (ret < 0) {
+ QB_STUB_UNWIND (stub, -1, -ret);
+ } else {
+ QB_STUB_UNWIND (stub, ret, 0);
+ }
+
+ return 0;
+}
+
+
+int
+qb_snapshot_delete (void *opaque)
+{
+ qb_local_t *local = NULL;
+ call_frame_t *frame = NULL;
+ call_stub_t *stub = NULL;
+ inode_t *inode = NULL;
+ qb_inode_t *qb_inode = NULL;
+ int ret = 0;
+
+ local = opaque;
+ frame = local->frame;
+ stub = local->stub;
+ inode = local->inode;
+
+ qb_inode = qb_inode_ctx_get (frame->this, inode);
+ if (!qb_inode->bs) {
+ /* FIXME: we need locks around this when
+ enabling multithreaded syncop/coroutine
+ for qemu-block
+ */
+
+ qb_inode->bs = qb_bs_create (inode, qb_inode->fmt);
+ if (!qb_inode->bs) {
+ QB_STUB_UNWIND (stub, -1, errno);
+ return 0;
+ }
+ }
+
+ ret = bdrv_snapshot_delete (qb_inode->bs, local->name);
+
+ if (ret < 0) {
+ QB_STUB_UNWIND (stub, -1, -ret);
+ } else {
+ QB_STUB_UNWIND (stub, ret, 0);
+ }
+
+ return 0;
+}
+
+
+int
+qb_snapshot_goto (void *opaque)
+{
+ qb_local_t *local = NULL;
+ call_frame_t *frame = NULL;
+ call_stub_t *stub = NULL;
+ inode_t *inode = NULL;
+ qb_inode_t *qb_inode = NULL;
+ int ret = 0;
+
+ local = opaque;
+ frame = local->frame;
+ stub = local->stub;
+ inode = local->inode;
+
+ qb_inode = qb_inode_ctx_get (frame->this, inode);
+ if (!qb_inode->bs) {
+ /* FIXME: we need locks around this when
+ enabling multithreaded syncop/coroutine
+ for qemu-block
+ */
+
+ qb_inode->bs = qb_bs_create (inode, qb_inode->fmt);
+ if (!qb_inode->bs) {
+ QB_STUB_UNWIND (stub, -1, errno);
+ return 0;
+ }
+ }
+
+ ret = bdrv_snapshot_goto (qb_inode->bs, local->name);
+
+ if (ret < 0) {
+ QB_STUB_UNWIND (stub, -1, -ret);
+ } else {
+ QB_STUB_UNWIND (stub, ret, 0);
+ }
+
+ return 0;
+}
diff --git a/xlators/features/qemu-block/src/qb-coroutines.h b/xlators/features/qemu-block/src/qb-coroutines.h
new file mode 100644
index 000000000..583319f3b
--- /dev/null
+++ b/xlators/features/qemu-block/src/qb-coroutines.h
@@ -0,0 +1,30 @@
+/*
+ Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com>
+ This file is part of GlusterFS.
+
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
+*/
+
+#ifndef __QB_COROUTINES_H
+#define __QB_COROUTINES_H
+
+#include "syncop.h"
+#include "call-stub.h"
+#include "block/block_int.h"
+#include "monitor/monitor.h"
+
+int qb_format_and_resume (void *opaque);
+int qb_snapshot_create (void *opaque);
+int qb_snapshot_delete (void *opaque);
+int qb_snapshot_goto (void *opaque);
+int qb_co_open (void *opaque);
+int qb_co_close (void *opaque);
+int qb_co_writev (void *opaque);
+int qb_co_readv (void *opaque);
+int qb_co_fsync (void *opaque);
+int qb_co_truncate (void *opaque);
+
+#endif /* __QB_COROUTINES_H */
diff --git a/xlators/features/qemu-block/src/qemu-block-memory-types.h b/xlators/features/qemu-block/src/qemu-block-memory-types.h
new file mode 100644
index 000000000..267b3893f
--- /dev/null
+++ b/xlators/features/qemu-block/src/qemu-block-memory-types.h
@@ -0,0 +1,25 @@
+/*
+ Copyright (c) 2008-2012 Red Hat, Inc. <http://www.redhat.com>
+ This file is part of GlusterFS.
+
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
+*/
+
+
+#ifndef __QB_MEM_TYPES_H__
+#define __QB_MEM_TYPES_H__
+
+#include "mem-types.h"
+
+enum gf_qb_mem_types_ {
+ gf_qb_mt_qb_conf_t = gf_common_mt_end + 1,
+ gf_qb_mt_qb_inode_t,
+ gf_qb_mt_qb_local_t,
+ gf_qb_mt_coroutinesynctask_t,
+ gf_qb_mt_end
+};
+#endif
+
diff --git a/xlators/features/qemu-block/src/qemu-block.c b/xlators/features/qemu-block/src/qemu-block.c
new file mode 100644
index 000000000..48bbf3140
--- /dev/null
+++ b/xlators/features/qemu-block/src/qemu-block.c
@@ -0,0 +1,1140 @@
+/*
+ Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com>
+ This file is part of GlusterFS.
+
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
+*/
+
+
+#ifndef _CONFIG_H
+#define _CONFIG_H
+#include "config.h"
+#endif
+
+#include "glusterfs.h"
+#include "logging.h"
+#include "dict.h"
+#include "xlator.h"
+#include "inode.h"
+#include "call-stub.h"
+#include "defaults.h"
+#include "qemu-block-memory-types.h"
+#include "qemu-block.h"
+#include "qb-coroutines.h"
+
+
+qb_inode_t *
+__qb_inode_ctx_get (xlator_t *this, inode_t *inode)
+{
+ uint64_t value = 0;
+ qb_inode_t *qb_inode = NULL;
+
+ __inode_ctx_get (inode, this, &value);
+ qb_inode = (qb_inode_t *)(unsigned long) value;
+
+ return qb_inode;
+}
+
+
+qb_inode_t *
+qb_inode_ctx_get (xlator_t *this, inode_t *inode)
+{
+ qb_inode_t *qb_inode = NULL;
+
+ LOCK (&inode->lock);
+ {
+ qb_inode = __qb_inode_ctx_get (this, inode);
+ }
+ UNLOCK (&inode->lock);
+
+ return qb_inode;
+}
+
+
+qb_inode_t *
+qb_inode_ctx_del (xlator_t *this, inode_t *inode)
+{
+ uint64_t value = 0;
+ qb_inode_t *qb_inode = NULL;
+
+ inode_ctx_del (inode, this, &value);
+ qb_inode = (qb_inode_t *)(unsigned long) value;
+
+ return qb_inode;
+}
+
+
+int
+qb_inode_cleanup (xlator_t *this, inode_t *inode, int warn)
+{
+ qb_inode_t *qb_inode = NULL;
+
+ qb_inode = qb_inode_ctx_del (this, inode);
+
+ if (!qb_inode)
+ return 0;
+
+ if (warn)
+ gf_log (this->name, GF_LOG_WARNING,
+ "inode %s no longer block formatted",
+ uuid_utoa (inode->gfid));
+
+ /* free (qb_inode->bs); */
+
+ GF_FREE (qb_inode);
+
+ return 0;
+}
+
+
+int
+qb_iatt_fixup (xlator_t *this, inode_t *inode, struct iatt *iatt)
+{
+ qb_inode_t *qb_inode = NULL;
+
+ qb_inode = qb_inode_ctx_get (this, inode);
+ if (!qb_inode)
+ return 0;
+
+ iatt->ia_size = qb_inode->size;
+
+ return 0;
+}
+
+
+int
+qb_format_extract (xlator_t *this, char *format, inode_t *inode)
+{
+ char *s, *save;
+ uint64_t size = 0;
+ char fmt[QB_XATTR_VAL_MAX+1] = {0, };
+ qb_inode_t *qb_inode = NULL;
+ char *formatstr = NULL;
+ uuid_t gfid = {0,};
+ char gfid_str[64] = {0,};
+ int ret;
+
+ strncpy(fmt, format, QB_XATTR_VAL_MAX);
+
+ s = strtok_r(fmt, ":", &save);
+ if (!s)
+ goto invalid;
+ formatstr = gf_strdup(s);
+
+ s = strtok_r(NULL, ":", &save);
+ if (!s)
+ goto invalid;
+ if (gf_string2bytesize (s, &size))
+ goto invalid;
+ if (!size)
+ goto invalid;
+
+ s = strtok_r(NULL, "\0", &save);
+ if (s && !strncmp(s, "<gfid:", strlen("<gfid:"))) {
+ /*
+ * Check for valid gfid backing image specifier.
+ */
+ if (strlen(s) + 1 > sizeof(gfid_str))
+ goto invalid;
+ ret = sscanf(s, "<gfid:%[^>]s", gfid_str);
+ if (ret == 1) {
+ ret = uuid_parse(gfid_str, gfid);
+ if (ret < 0)
+ goto invalid;
+ }
+ }
+
+ qb_inode = qb_inode_ctx_get (this, inode);
+ if (!qb_inode)
+ qb_inode = GF_CALLOC (1, sizeof (*qb_inode),
+ gf_qb_mt_qb_inode_t);
+ if (!qb_inode) {
+ GF_FREE(formatstr);
+ return ENOMEM;
+ }
+
+ strncpy(qb_inode->fmt, formatstr, QB_XATTR_VAL_MAX);
+ qb_inode->size = size;
+
+ /*
+ * If a backing gfid was not specified, interpret any remaining bytes
+ * associated with a backing image as a filename local to the parent
+ * directory. The format processing will validate further.
+ */
+ if (!uuid_is_null(gfid))
+ uuid_copy(qb_inode->backing_gfid, gfid);
+ else if (s)
+ qb_inode->backing_fname = gf_strdup(s);
+
+ inode_ctx_set (inode, this, (void *)&qb_inode);
+
+ GF_FREE(formatstr);
+
+ return 0;
+
+invalid:
+ GF_FREE(formatstr);
+
+ gf_log (this->name, GF_LOG_WARNING,
+ "invalid format '%s' in inode %s", format,
+ uuid_utoa (inode->gfid));
+ return EINVAL;
+}
+
+
+void
+qb_local_free (xlator_t *this, qb_local_t *local)
+{
+ if (local->inode)
+ inode_unref (local->inode);
+ if (local->fd)
+ fd_unref (local->fd);
+ GF_FREE (local);
+}
+
+
+int
+qb_local_init (call_frame_t *frame)
+{
+ qb_local_t *qb_local = NULL;
+
+ qb_local = GF_CALLOC (1, sizeof (*qb_local), gf_qb_mt_qb_local_t);
+ if (!qb_local)
+ return -1;
+ INIT_LIST_HEAD(&qb_local->list);
+
+ qb_local->frame = frame;
+ frame->local = qb_local;
+
+ return 0;
+}
+
+
+int
+qb_lookup_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
+ int op_ret, int op_errno, inode_t *inode, struct iatt *buf,
+ dict_t *xdata, struct iatt *postparent)
+{
+ char *format = NULL;
+ qb_conf_t *conf = NULL;
+
+ conf = this->private;
+
+ if (op_ret == -1)
+ goto out;
+
+ /*
+ * Cache the root inode for dealing with backing images. The format
+ * coroutine and the gluster qemu backend driver both use the root inode
+ * table to verify and/or redirect I/O to the backing image via
+ * anonymous fd's.
+ */
+ if (!conf->root_inode && __is_root_gfid(inode->gfid))
+ conf->root_inode = inode_ref(inode);
+
+ if (!xdata)
+ goto out;
+
+ if (dict_get_str (xdata, conf->qb_xattr_key, &format))
+ goto out;
+
+ if (!format) {
+ qb_inode_cleanup (this, inode, 1);
+ goto out;
+ }
+
+ op_errno = qb_format_extract (this, format, inode);
+ if (op_errno)
+ op_ret = -1;
+
+ qb_iatt_fixup (this, inode, buf);
+out:
+ QB_STACK_UNWIND (lookup, frame, op_ret, op_errno, inode, buf,
+ xdata, postparent);
+ return 0;
+}
+
+
+int
+qb_lookup (call_frame_t *frame, xlator_t *this, loc_t *loc, dict_t *xdata)
+{
+ qb_conf_t *conf = NULL;
+
+ conf = this->private;
+
+ xdata = xdata ? dict_ref (xdata) : dict_new ();
+
+ if (!xdata)
+ goto enomem;
+
+ if (dict_set_int32 (xdata, conf->qb_xattr_key, 0))
+ goto enomem;
+
+ STACK_WIND (frame, qb_lookup_cbk, FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->lookup, loc, xdata);
+ dict_unref (xdata);
+ return 0;
+enomem:
+ QB_STACK_UNWIND (lookup, frame, -1, ENOMEM, 0, 0, 0, 0);
+ if (xdata)
+ dict_unref (xdata);
+ return 0;
+}
+
+
+int
+qb_setxattr_format (call_frame_t *frame, xlator_t *this, call_stub_t *stub,
+ dict_t *xattr, inode_t *inode)
+{
+ char *format = NULL;
+ int op_errno = 0;
+ qb_local_t *qb_local = NULL;
+ data_t *data = NULL;
+ qb_inode_t *qb_inode;
+
+ if (!(data = dict_get (xattr, "trusted.glusterfs.block-format"))) {
+ QB_STUB_RESUME (stub);
+ return 0;
+ }
+
+ format = alloca (data->len + 1);
+ memcpy (format, data->data, data->len);
+ format[data->len] = 0;
+
+ op_errno = qb_format_extract (this, format, inode);
+ if (op_errno) {
+ QB_STUB_UNWIND (stub, -1, op_errno);
+ return 0;
+ }
+ qb_inode = qb_inode_ctx_get(this, inode);
+
+ qb_local = frame->local;
+
+ qb_local->stub = stub;
+ qb_local->inode = inode_ref (inode);
+
+ snprintf(qb_local->fmt, QB_XATTR_VAL_MAX, "%s:%lu", qb_inode->fmt,
+ qb_inode->size);
+
+ qb_coroutine (frame, qb_format_and_resume);
+
+ return 0;
+}
+
+
+int
+qb_setxattr_snapshot_create (call_frame_t *frame, xlator_t *this,
+ call_stub_t *stub, dict_t *xattr, inode_t *inode)
+{
+ qb_local_t *qb_local = NULL;
+ char *name = NULL;
+ data_t *data = NULL;
+
+ if (!(data = dict_get (xattr, "trusted.glusterfs.block-snapshot-create"))) {
+ QB_STUB_RESUME (stub);
+ return 0;
+ }
+
+ name = alloca (data->len + 1);
+ memcpy (name, data->data, data->len);
+ name[data->len] = 0;
+
+ qb_local = frame->local;
+
+ qb_local->stub = stub;
+ qb_local->inode = inode_ref (inode);
+ strncpy (qb_local->name, name, 128);
+
+ qb_coroutine (frame, qb_snapshot_create);
+
+ return 0;
+}
+
+
+int
+qb_setxattr_snapshot_delete (call_frame_t *frame, xlator_t *this,
+ call_stub_t *stub, dict_t *xattr, inode_t *inode)
+{
+ qb_local_t *qb_local = NULL;
+ char *name = NULL;
+ data_t *data = NULL;
+
+ if (!(data = dict_get (xattr, "trusted.glusterfs.block-snapshot-delete"))) {
+ QB_STUB_RESUME (stub);
+ return 0;
+ }
+
+ name = alloca (data->len + 1);
+ memcpy (name, data->data, data->len);
+ name[data->len] = 0;
+
+ qb_local = frame->local;
+
+ qb_local->stub = stub;
+ qb_local->inode = inode_ref (inode);
+ strncpy (qb_local->name, name, 128);
+
+ qb_coroutine (frame, qb_snapshot_delete);
+
+ return 0;
+}
+
+int
+qb_setxattr_snapshot_goto (call_frame_t *frame, xlator_t *this,
+ call_stub_t *stub, dict_t *xattr, inode_t *inode)
+{
+ qb_local_t *qb_local = NULL;
+ char *name = NULL;
+ data_t *data = NULL;
+
+ if (!(data = dict_get (xattr, "trusted.glusterfs.block-snapshot-goto"))) {
+ QB_STUB_RESUME (stub);
+ return 0;
+ }
+
+ name = alloca (data->len + 1);
+ memcpy (name, data->data, data->len);
+ name[data->len] = 0;
+
+ qb_local = frame->local;
+
+ qb_local->stub = stub;
+ qb_local->inode = inode_ref (inode);
+ strncpy (qb_local->name, name, 128);
+
+ qb_coroutine (frame, qb_snapshot_goto);
+
+ return 0;
+}
+
+
+int
+qb_setxattr_common (call_frame_t *frame, xlator_t *this, call_stub_t *stub,
+ dict_t *xattr, inode_t *inode)
+{
+ data_t *data = NULL;
+
+ if ((data = dict_get (xattr, "trusted.glusterfs.block-format"))) {
+ qb_setxattr_format (frame, this, stub, xattr, inode);
+ return 0;
+ }
+
+ if ((data = dict_get (xattr, "trusted.glusterfs.block-snapshot-create"))) {
+ qb_setxattr_snapshot_create (frame, this, stub, xattr, inode);
+ return 0;
+ }
+
+ if ((data = dict_get (xattr, "trusted.glusterfs.block-snapshot-delete"))) {
+ qb_setxattr_snapshot_delete (frame, this, stub, xattr, inode);
+ return 0;
+ }
+
+ if ((data = dict_get (xattr, "trusted.glusterfs.block-snapshot-goto"))) {
+ qb_setxattr_snapshot_goto (frame, this, stub, xattr, inode);
+ return 0;
+ }
+
+ QB_STUB_RESUME (stub);
+
+ return 0;
+}
+
+
+int
+qb_setxattr (call_frame_t *frame, xlator_t *this, loc_t *loc, dict_t *xattr,
+ int flags, dict_t *xdata)
+{
+ call_stub_t *stub = NULL;
+
+ if (qb_local_init (frame) != 0)
+ goto enomem;
+
+ stub = fop_setxattr_stub (frame, default_setxattr_resume, loc, xattr,
+ flags, xdata);
+ if (!stub)
+ goto enomem;
+
+ qb_setxattr_common (frame, this, stub, xattr, loc->inode);
+
+ return 0;
+enomem:
+ QB_STACK_UNWIND (setxattr, frame, -1, ENOMEM, 0);
+ return 0;
+}
+
+
+int
+qb_fsetxattr (call_frame_t *frame, xlator_t *this, fd_t *fd, dict_t *xattr,
+ int flags, dict_t *xdata)
+{
+ call_stub_t *stub = NULL;
+
+ if (qb_local_init (frame) != 0)
+ goto enomem;
+
+ stub = fop_fsetxattr_stub (frame, default_fsetxattr_resume, fd, xattr,
+ flags, xdata);
+ if (!stub)
+ goto enomem;
+
+ qb_setxattr_common (frame, this, stub, xattr, fd->inode);
+
+ return 0;
+enomem:
+ QB_STACK_UNWIND (fsetxattr, frame, -1, ENOMEM, 0);
+ return 0;
+}
+
+
+int
+qb_open_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
+ int op_ret, int op_errno, fd_t *fd, dict_t *xdata)
+{
+ call_stub_t *stub = NULL;
+ qb_local_t *qb_local = NULL;
+
+ qb_local = frame->local;
+
+ if (op_ret < 0)
+ goto unwind;
+
+ if (!qb_inode_ctx_get (this, qb_local->inode))
+ goto unwind;
+
+ stub = fop_open_cbk_stub (frame, NULL, op_ret, op_errno, fd, xdata);
+ if (!stub) {
+ op_ret = -1;
+ op_errno = ENOMEM;
+ goto unwind;
+ }
+
+ qb_local->stub = stub;
+
+ qb_coroutine (frame, qb_co_open);
+
+ return 0;
+unwind:
+ QB_STACK_UNWIND (open, frame, op_ret, op_errno, fd, xdata);
+ return 0;
+}
+
+
+int
+qb_open (call_frame_t *frame, xlator_t *this, loc_t *loc, int flags,
+ fd_t *fd, dict_t *xdata)
+{
+ qb_local_t *qb_local = NULL;
+ qb_inode_t *qb_inode = NULL;
+
+ qb_inode = qb_inode_ctx_get (this, loc->inode);
+ if (!qb_inode) {
+ STACK_WIND (frame, default_open_cbk, FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->open, loc, flags, fd,
+ xdata);
+ return 0;
+ }
+
+ if (qb_local_init (frame) != 0)
+ goto enomem;
+
+ qb_local = frame->local;
+
+ qb_local->inode = inode_ref (loc->inode);
+ qb_local->fd = fd_ref (fd);
+
+ STACK_WIND (frame, qb_open_cbk, FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->open, loc, flags, fd, xdata);
+ return 0;
+enomem:
+ QB_STACK_UNWIND (open, frame, -1, ENOMEM, 0, 0);
+ return 0;
+}
+
+
+int
+qb_writev (call_frame_t *frame, xlator_t *this, fd_t *fd, struct iovec *vector,
+ int count, off_t offset, uint32_t flags, struct iobref *iobref,
+ dict_t *xdata)
+{
+ qb_local_t *qb_local = NULL;
+ qb_inode_t *qb_inode = NULL;
+
+ qb_inode = qb_inode_ctx_get (this, fd->inode);
+ if (!qb_inode) {
+ STACK_WIND (frame, default_writev_cbk, FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->writev, fd, vector, count,
+ offset, flags, iobref, xdata);
+ return 0;
+ }
+
+ if (qb_local_init (frame) != 0)
+ goto enomem;
+
+ qb_local = frame->local;
+
+ qb_local->inode = inode_ref (fd->inode);
+ qb_local->fd = fd_ref (fd);
+
+ qb_local->stub = fop_writev_stub (frame, NULL, fd, vector, count,
+ offset, flags, iobref, xdata);
+ if (!qb_local->stub)
+ goto enomem;
+
+ qb_coroutine (frame, qb_co_writev);
+
+ return 0;
+enomem:
+ QB_STACK_UNWIND (writev, frame, -1, ENOMEM, 0, 0, 0);
+ return 0;
+}
+
+
+int
+qb_readv (call_frame_t *frame, xlator_t *this, fd_t *fd, size_t size,
+ off_t offset, uint32_t flags, dict_t *xdata)
+{
+ qb_local_t *qb_local = NULL;
+ qb_inode_t *qb_inode = NULL;
+
+ qb_inode = qb_inode_ctx_get (this, fd->inode);
+ if (!qb_inode) {
+ STACK_WIND (frame, default_readv_cbk, FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->readv, fd, size, offset,
+ flags, xdata);
+ return 0;
+ }
+
+ if (qb_local_init (frame) != 0)
+ goto enomem;
+
+ qb_local = frame->local;
+
+ qb_local->inode = inode_ref (fd->inode);
+ qb_local->fd = fd_ref (fd);
+
+ qb_local->stub = fop_readv_stub (frame, NULL, fd, size, offset,
+ flags, xdata);
+ if (!qb_local->stub)
+ goto enomem;
+
+ qb_coroutine (frame, qb_co_readv);
+
+ return 0;
+enomem:
+ QB_STACK_UNWIND (readv, frame, -1, ENOMEM, 0, 0, 0, 0, 0);
+ return 0;
+}
+
+
+int
+qb_fsync (call_frame_t *frame, xlator_t *this, fd_t *fd, int dsync,
+ dict_t *xdata)
+{
+ qb_local_t *qb_local = NULL;
+ qb_inode_t *qb_inode = NULL;
+
+ qb_inode = qb_inode_ctx_get (this, fd->inode);
+ if (!qb_inode) {
+ STACK_WIND (frame, default_fsync_cbk, FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->fsync, fd, dsync, xdata);
+ return 0;
+ }
+
+ if (qb_local_init (frame) != 0)
+ goto enomem;
+
+ qb_local = frame->local;
+
+ qb_local->inode = inode_ref (fd->inode);
+ qb_local->fd = fd_ref (fd);
+
+ qb_local->stub = fop_fsync_stub (frame, NULL, fd, dsync, xdata);
+
+ if (!qb_local->stub)
+ goto enomem;
+
+ qb_coroutine (frame, qb_co_fsync);
+
+ return 0;
+enomem:
+ QB_STACK_UNWIND (fsync, frame, -1, ENOMEM, 0, 0, 0);
+ return 0;
+}
+
+
+int
+qb_flush (call_frame_t *frame, xlator_t *this, fd_t *fd, dict_t *xdata)
+{
+ qb_local_t *qb_local = NULL;
+ qb_inode_t *qb_inode = NULL;
+
+ qb_inode = qb_inode_ctx_get (this, fd->inode);
+ if (!qb_inode) {
+ STACK_WIND (frame, default_flush_cbk, FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->flush, fd, xdata);
+ return 0;
+ }
+
+ if (qb_local_init (frame) != 0)
+ goto enomem;
+
+ qb_local = frame->local;
+
+ qb_local->inode = inode_ref (fd->inode);
+ qb_local->fd = fd_ref (fd);
+
+ qb_local->stub = fop_flush_stub (frame, NULL, fd, xdata);
+
+ if (!qb_local->stub)
+ goto enomem;
+
+ qb_coroutine (frame, qb_co_fsync);
+
+ return 0;
+enomem:
+ QB_STACK_UNWIND (flush, frame, -1, ENOMEM, 0);
+ return 0;
+}
+
+static int32_t
+qb_readdirp_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno, gf_dirent_t *entries,
+ dict_t *xdata)
+{
+ qb_conf_t *conf = this->private;
+ gf_dirent_t *entry;
+ char *format;
+
+ list_for_each_entry(entry, &entries->list, list) {
+ if (!entry->inode || !entry->dict)
+ continue;
+
+ format = NULL;
+ if (dict_get_str(entry->dict, conf->qb_xattr_key, &format))
+ continue;
+
+ if (!format) {
+ qb_inode_cleanup(this, entry->inode, 1);
+ continue;
+ }
+
+ if (qb_format_extract(this, format, entry->inode))
+ continue;
+
+ qb_iatt_fixup(this, entry->inode, &entry->d_stat);
+ }
+
+ STACK_UNWIND_STRICT(readdirp, frame, op_ret, op_errno, entries, xdata);
+ return 0;
+}
+
+static int32_t
+qb_readdirp(call_frame_t *frame, xlator_t *this, fd_t *fd, size_t size,
+ off_t off, dict_t *xdata)
+{
+ qb_conf_t *conf = this->private;
+
+ xdata = xdata ? dict_ref(xdata) : dict_new();
+ if (!xdata)
+ goto enomem;
+
+ if (dict_set_int32 (xdata, conf->qb_xattr_key, 0))
+ goto enomem;
+
+ STACK_WIND(frame, qb_readdirp_cbk, FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->readdirp, fd, size, off, xdata);
+
+ dict_unref(xdata);
+ return 0;
+
+enomem:
+ QB_STACK_UNWIND(readdirp, frame, -1, ENOMEM, NULL, NULL);
+ if (xdata)
+ dict_unref(xdata);
+ return 0;
+}
+
+int
+qb_truncate (call_frame_t *frame, xlator_t *this, loc_t *loc, off_t offset,
+ dict_t *xdata)
+{
+ qb_local_t *qb_local = NULL;
+ qb_inode_t *qb_inode = NULL;
+
+ qb_inode = qb_inode_ctx_get (this, loc->inode);
+ if (!qb_inode) {
+ STACK_WIND (frame, default_truncate_cbk, FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->truncate, loc, offset,
+ xdata);
+ return 0;
+ }
+
+ if (qb_local_init (frame) != 0)
+ goto enomem;
+
+ qb_local = frame->local;
+
+ qb_local->inode = inode_ref (loc->inode);
+ qb_local->fd = fd_anonymous (loc->inode);
+
+ qb_local->stub = fop_truncate_stub (frame, NULL, loc, offset, xdata);
+
+ if (!qb_local->stub)
+ goto enomem;
+
+ qb_coroutine (frame, qb_co_truncate);
+
+ return 0;
+enomem:
+ QB_STACK_UNWIND (truncate, frame, -1, ENOMEM, 0, 0, 0);
+ return 0;
+}
+
+
+int
+qb_ftruncate (call_frame_t *frame, xlator_t *this, fd_t *fd, off_t offset,
+ dict_t *xdata)
+{
+ qb_local_t *qb_local = NULL;
+ qb_inode_t *qb_inode = NULL;
+
+ qb_inode = qb_inode_ctx_get (this, fd->inode);
+ if (!qb_inode) {
+ STACK_WIND (frame, default_ftruncate_cbk, FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->ftruncate, fd, offset,
+ xdata);
+ return 0;
+ }
+
+ if (qb_local_init (frame) != 0)
+ goto enomem;
+
+ qb_local = frame->local;
+
+ qb_local->inode = inode_ref (fd->inode);
+ qb_local->fd = fd_ref (fd);
+
+ qb_local->stub = fop_ftruncate_stub (frame, NULL, fd, offset, xdata);
+
+ if (!qb_local->stub)
+ goto enomem;
+
+ qb_coroutine (frame, qb_co_truncate);
+
+ return 0;
+enomem:
+ QB_STACK_UNWIND (ftruncate, frame, -1, ENOMEM, 0, 0, 0);
+ return 0;
+}
+
+
+int
+qb_stat_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
+ int op_ret, int op_errno, struct iatt *iatt, dict_t *xdata)
+{
+ inode_t *inode = NULL;
+
+ inode = frame->local;
+ frame->local = NULL;
+
+ if (inode) {
+ qb_iatt_fixup (this, inode, iatt);
+ inode_unref (inode);
+ }
+
+ QB_STACK_UNWIND (stat, frame, op_ret, op_errno, iatt, xdata);
+
+ return 0;
+}
+
+int
+qb_stat (call_frame_t *frame, xlator_t *this, loc_t *loc, dict_t *xdata)
+{
+ if (qb_inode_ctx_get (this, loc->inode))
+ frame->local = inode_ref (loc->inode);
+
+ STACK_WIND (frame, qb_stat_cbk, FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->stat, loc, xdata);
+ return 0;
+}
+
+
+int
+qb_fstat_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
+ int op_ret, int op_errno, struct iatt *iatt, dict_t *xdata)
+{
+ inode_t *inode = NULL;
+
+ inode = frame->local;
+ frame->local = NULL;
+
+ if (inode) {
+ qb_iatt_fixup (this, inode, iatt);
+ inode_unref (inode);
+ }
+
+ QB_STACK_UNWIND (fstat, frame, op_ret, op_errno, iatt, xdata);
+
+ return 0;
+}
+
+
+int
+qb_fstat (call_frame_t *frame, xlator_t *this, fd_t *fd, dict_t *xdata)
+{
+ if (qb_inode_ctx_get (this, fd->inode))
+ frame->local = inode_ref (fd->inode);
+
+ STACK_WIND (frame, qb_fstat_cbk, FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->fstat, fd, xdata);
+ return 0;
+}
+
+
+int
+qb_setattr_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
+ int op_ret, int op_errno, struct iatt *pre, struct iatt *post,
+ dict_t *xdata)
+{
+ inode_t *inode = NULL;
+
+ inode = frame->local;
+ frame->local = NULL;
+
+ if (inode) {
+ qb_iatt_fixup (this, inode, pre);
+ qb_iatt_fixup (this, inode, post);
+ inode_unref (inode);
+ }
+
+ QB_STACK_UNWIND (setattr, frame, op_ret, op_errno, pre, post, xdata);
+
+ return 0;
+}
+
+
+int
+qb_setattr (call_frame_t *frame, xlator_t *this, loc_t *loc, struct iatt *buf,
+ int valid, dict_t *xdata)
+{
+ if (qb_inode_ctx_get (this, loc->inode))
+ frame->local = inode_ref (loc->inode);
+
+ STACK_WIND (frame, qb_setattr_cbk, FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->setattr, loc, buf, valid, xdata);
+ return 0;
+}
+
+
+int
+qb_fsetattr_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
+ int op_ret, int op_errno, struct iatt *pre, struct iatt *post,
+ dict_t *xdata)
+{
+ inode_t *inode = NULL;
+
+ inode = frame->local;
+ frame->local = NULL;
+
+ if (inode) {
+ qb_iatt_fixup (this, inode, pre);
+ qb_iatt_fixup (this, inode, post);
+ inode_unref (inode);
+ }
+
+ QB_STACK_UNWIND (fsetattr, frame, op_ret, op_errno, pre, post, xdata);
+
+ return 0;
+}
+
+
+int
+qb_fsetattr (call_frame_t *frame, xlator_t *this, fd_t *fd, struct iatt *buf,
+ int valid, dict_t *xdata)
+{
+ if (qb_inode_ctx_get (this, fd->inode))
+ frame->local = inode_ref (fd->inode);
+
+ STACK_WIND (frame, qb_setattr_cbk, FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->fsetattr, fd, buf, valid, xdata);
+ return 0;
+}
+
+
+int
+qb_forget (xlator_t *this, inode_t *inode)
+{
+ return qb_inode_cleanup (this, inode, 0);
+}
+
+
+int
+qb_release (xlator_t *this, fd_t *fd)
+{
+ call_frame_t *frame = NULL;
+
+ frame = create_frame (this, this->ctx->pool);
+ if (!frame) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "Could not allocate frame. "
+ "Leaking QEMU BlockDriverState");
+ return -1;
+ }
+
+ if (qb_local_init (frame) != 0) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "Could not allocate local. "
+ "Leaking QEMU BlockDriverState");
+ STACK_DESTROY (frame->root);
+ return -1;
+ }
+
+ if (qb_coroutine (frame, qb_co_close) != 0) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "Could not allocate coroutine. "
+ "Leaking QEMU BlockDriverState");
+ qb_local_free (this, frame->local);
+ frame->local = NULL;
+ STACK_DESTROY (frame->root);
+ }
+
+ return 0;
+}
+
+int
+mem_acct_init (xlator_t *this)
+{
+ int ret = -1;
+
+ ret = xlator_mem_acct_init (this, gf_qb_mt_end + 1);
+
+ if (ret)
+ gf_log (this->name, GF_LOG_ERROR, "Memory accounting init "
+ "failed");
+ return ret;
+}
+
+
+int
+reconfigure (xlator_t *this, dict_t *options)
+{
+ return 0;
+}
+
+
+int
+init (xlator_t *this)
+{
+ qb_conf_t *conf = NULL;
+ int32_t ret = -1;
+ static int bdrv_inited = 0;
+
+ if (!this->children || this->children->next) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "FATAL: qemu-block (%s) not configured with exactly "
+ "one child", this->name);
+ goto out;
+ }
+
+ conf = GF_CALLOC (1, sizeof (*conf), gf_qb_mt_qb_conf_t);
+ if (!conf)
+ goto out;
+
+ /* configure 'option window-size <size>' */
+ GF_OPTION_INIT ("default-password", conf->default_password, str, out);
+
+ /* qemu coroutines use "co_mutex" for synchronizing among themselves.
+ However "co_mutex" itself is not threadsafe if the coroutine framework
+ is multithreaded (which usually is not). However synctasks are
+ fundamentally multithreaded, so for now create a syncenv which has
+ scaling limits set to max 1 thread so that the qemu coroutines can
+ execute "safely".
+
+ Future work: provide an implementation of "co_mutex" which is
+ threadsafe and use the global multithreaded ctx->env syncenv.
+ */
+ conf->env = syncenv_new (0, 1, 1);
+
+ this->private = conf;
+
+ ret = 0;
+
+ snprintf (conf->qb_xattr_key, QB_XATTR_KEY_MAX, QB_XATTR_KEY_FMT,
+ this->name);
+
+ cur_mon = (void *) 1;
+
+ if (!bdrv_inited) {
+ bdrv_init ();
+ bdrv_inited = 1;
+ }
+
+out:
+ if (ret)
+ GF_FREE (conf);
+
+ return ret;
+}
+
+
+void
+fini (xlator_t *this)
+{
+ qb_conf_t *conf = NULL;
+
+ conf = this->private;
+
+ this->private = NULL;
+
+ if (conf->root_inode)
+ inode_unref(conf->root_inode);
+ GF_FREE (conf);
+
+ return;
+}
+
+
+struct xlator_fops fops = {
+ .lookup = qb_lookup,
+ .fsetxattr = qb_fsetxattr,
+ .setxattr = qb_setxattr,
+ .open = qb_open,
+ .writev = qb_writev,
+ .readv = qb_readv,
+ .fsync = qb_fsync,
+ .truncate = qb_truncate,
+ .ftruncate = qb_ftruncate,
+ .stat = qb_stat,
+ .fstat = qb_fstat,
+ .setattr = qb_setattr,
+ .fsetattr = qb_fsetattr,
+ .flush = qb_flush,
+/*
+ .getxattr = qb_getxattr,
+ .fgetxattr = qb_fgetxattr
+*/
+ .readdirp = qb_readdirp,
+};
+
+
+struct xlator_cbks cbks = {
+ .forget = qb_forget,
+ .release = qb_release,
+};
+
+
+struct xlator_dumpops dumpops = {
+};
+
+
+struct volume_options options[] = {
+ { .key = {"default-password"},
+ .type = GF_OPTION_TYPE_STR,
+ .default_value = "",
+ .description = "Default password for the AES encrypted block images."
+ },
+ { .key = {NULL} },
+};
diff --git a/xlators/features/qemu-block/src/qemu-block.h b/xlators/features/qemu-block/src/qemu-block.h
new file mode 100644
index 000000000..c95f2799a
--- /dev/null
+++ b/xlators/features/qemu-block/src/qemu-block.h
@@ -0,0 +1,109 @@
+/*
+ Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com>
+ This file is part of GlusterFS.
+
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
+*/
+
+#ifndef __QEMU_BLOCK_H
+#define __QEMU_BLOCK_H
+
+#include "syncop.h"
+#include "call-stub.h"
+#include "block/block_int.h"
+#include "monitor/monitor.h"
+
+/* QB_XATTR_KEY_FMT is the on-disk xattr stored in the inode which
+ indicates that the file must be "interpreted" by the block format
+ logic. The value of the key is of the pattern:
+
+ "format:virtual_size"
+
+ e.g
+
+ "qcow2:20GB" or "qed:100GB"
+
+ The format and virtual size are colon separated. The format is
+ a case sensitive string which qemu recognizes. virtual_size is
+ specified as a size which glusterfs recognizes as size (i.e.,
+ value accepted by gf_string2bytesize())
+*/
+#define QB_XATTR_KEY_FMT "trusted.glusterfs.%s.format"
+
+#define QB_XATTR_KEY_MAX 64
+
+#define QB_XATTR_VAL_MAX 64
+
+
+typedef struct qb_inode {
+ char fmt[QB_XATTR_VAL_MAX]; /* this is only the format, not "format:size" */
+ size_t size; /* virtual size in bytes */
+ BlockDriverState *bs;
+ int refcnt;
+ uuid_t backing_gfid;
+ char *backing_fname;
+} qb_inode_t;
+
+
+typedef struct qb_conf {
+ Monitor *mon;
+ struct syncenv *env;
+ char qb_xattr_key[QB_XATTR_KEY_MAX];
+ char *default_password;
+ inode_t *root_inode;
+} qb_conf_t;
+
+
+typedef struct qb_local {
+ call_frame_t *frame; /* backpointer */
+ call_stub_t *stub;
+ inode_t *inode;
+ fd_t *fd;
+ char fmt[QB_XATTR_VAL_MAX+1];
+ char name[256];
+ synctask_fn_t synctask_fn;
+ struct list_head list;
+} qb_local_t;
+
+void qb_local_free (xlator_t *this, qb_local_t *local);
+int qb_coroutine (call_frame_t *frame, synctask_fn_t fn);
+inode_t *qb_inode_from_filename (const char *filename);
+int qb_inode_to_filename (inode_t *inode, char *filename, int size);
+int qb_format_extract (xlator_t *this, char *format, inode_t *inode);
+
+qb_inode_t *qb_inode_ctx_get (xlator_t *this, inode_t *inode);
+
+#define QB_STACK_UNWIND(typ, frame, args ...) do { \
+ qb_local_t *__local = frame->local; \
+ xlator_t *__this = frame->this; \
+ \
+ frame->local = NULL; \
+ STACK_UNWIND_STRICT (typ, frame, args); \
+ if (__local) \
+ qb_local_free (__this, __local); \
+ } while (0)
+
+#define QB_STUB_UNWIND(stub, op_ret, op_errno) do { \
+ qb_local_t *__local = stub->frame->local; \
+ xlator_t *__this = stub->frame->this; \
+ \
+ stub->frame->local = NULL; \
+ call_unwind_error (stub, op_ret, op_errno); \
+ if (__local) \
+ qb_local_free (__this, __local); \
+ } while (0)
+
+#define QB_STUB_RESUME(stub_errno) do { \
+ qb_local_t *__local = stub->frame->local; \
+ xlator_t *__this = stub->frame->this; \
+ \
+ stub->frame->local = NULL; \
+ call_resume (stub); \
+ if (__local) \
+ qb_local_free (__this, __local); \
+ } while (0)
+
+#endif /* !__QEMU_BLOCK_H */
diff --git a/xlators/features/quiesce/src/quiesce.c b/xlators/features/quiesce/src/quiesce.c
index 73eb91947..24c7dc6ed 100644
--- a/xlators/features/quiesce/src/quiesce.c
+++ b/xlators/features/quiesce/src/quiesce.c
@@ -111,7 +111,7 @@ void
gf_quiesce_enqueue (xlator_t *this, call_stub_t *stub)
{
quiesce_priv_t *priv = NULL;
- struct timeval timeout = {0,};
+ struct timespec timeout = {0,};
priv = this->private;
if (!priv) {
@@ -129,7 +129,7 @@ gf_quiesce_enqueue (xlator_t *this, call_stub_t *stub)
if (!priv->timer) {
timeout.tv_sec = 20;
- timeout.tv_usec = 0;
+ timeout.tv_nsec = 0;
priv->timer = gf_timer_call_after (this->ctx,
timeout,
@@ -2492,7 +2492,7 @@ notify (xlator_t *this, int event, void *data, ...)
{
int ret = 0;
quiesce_priv_t *priv = NULL;
- struct timeval timeout = {0,};
+ struct timespec timeout = {0,};
priv = this->private;
if (!priv)
@@ -2525,7 +2525,7 @@ notify (xlator_t *this, int event, void *data, ...)
if (priv->timer)
break;
timeout.tv_sec = 20;
- timeout.tv_usec = 0;
+ timeout.tv_nsec = 0;
priv->timer = gf_timer_call_after (this->ctx,
timeout,