summaryrefslogtreecommitdiffstats
path: root/xlators/features
diff options
context:
space:
mode:
authorPrashanth Pai <ppai@redhat.com>2012-03-26 14:33:41 +0530
committerVijay Bellur <vbellur@redhat.com>2013-11-11 19:35:01 -0800
commitb18257183d137c7ea8a76b4d57812fe26d9ea850 (patch)
tree2f50aa44455255afb3f43b18b9a0066331e25cc1 /xlators/features
parent27935ee84c484fff3d8d29218c197812701957ed (diff)
features/compress: Compression/DeCompression translator
* When a writev call occurs, the client compresses the data before sending it to server. On the server, compressed data is decompressed. Similarly, when a readv call occurs, the server compresses the data before sending it to client. On the client, the compressed data is decompressed. Thus the amount of data sent over the wire is minimized. * Compression/Decompression is done using Zlib library. * During normal operation, this is the format of data sent over wire : <compressed-data> + trailer(8) The trailer contains the CRC32 checksum and length of original uncompressed data. This is used for validation. HOW TO USE ---------- Turning on compression xlator: gluster volume set <vol_name> compress on Configurable options: gluster volume set <vol_name> compress.compression-level 8 gluster volume set <vol_name> compress.min-size 50 Change-Id: Ib7a66b6f1f70fe002b7c513588cdf75c69370805 BUG: 923540 Original-author : Venky Shankar <vshankar@redhat.com> Signed-off-by: Venky Shankar <vshankar@redhat.com> Signed-off-by: Prashanth Pai <nullpai@gmail.com> Signed-off-by: Prashanth Pai <ppai@redhat.com> Reviewed-on: http://review.gluster.org/3251 Tested-by: Gluster Build System <jenkins@build.gluster.com> Reviewed-by: Vijay Bellur <vbellur@redhat.com>
Diffstat (limited to 'xlators/features')
-rw-r--r--xlators/features/Makefile.am2
-rw-r--r--xlators/features/compress/Makefile.am3
-rw-r--r--xlators/features/compress/src/Makefile.am17
-rw-r--r--xlators/features/compress/src/cdc-helper.c547
-rw-r--r--xlators/features/compress/src/cdc-mem-types.h22
-rw-r--r--xlators/features/compress/src/cdc.c342
-rw-r--r--xlators/features/compress/src/cdc.h107
7 files changed, 1039 insertions, 1 deletions
diff --git a/xlators/features/Makefile.am b/xlators/features/Makefile.am
index 0e37fc0b7d6..d2f5ef19290 100644
--- a/xlators/features/Makefile.am
+++ b/xlators/features/Makefile.am
@@ -1,4 +1,4 @@
SUBDIRS = locks quota read-only mac-compat quiesce marker index \
- protect changelog gfid-access $(GLUPY_SUBDIR) qemu-block # trash path-converter # filter
+ protect compress changelog gfid-access $(GLUPY_SUBDIR) qemu-block # trash path-converter # filter
CLEANFILES =
diff --git a/xlators/features/compress/Makefile.am b/xlators/features/compress/Makefile.am
new file mode 100644
index 00000000000..a985f42a877
--- /dev/null
+++ b/xlators/features/compress/Makefile.am
@@ -0,0 +1,3 @@
+SUBDIRS = src
+
+CLEANFILES =
diff --git a/xlators/features/compress/src/Makefile.am b/xlators/features/compress/src/Makefile.am
new file mode 100644
index 00000000000..4a64b52a9a1
--- /dev/null
+++ b/xlators/features/compress/src/Makefile.am
@@ -0,0 +1,17 @@
+xlator_LTLIBRARIES = cdc.la
+
+xlatordir = $(libdir)/glusterfs/$(PACKAGE_VERSION)/xlator/features
+
+noinst_HEADERS = cdc.h cdc-mem-types.h
+
+cdc_la_LDFLAGS = -module -avoidversion $(LIBZ_LIBS)
+
+cdc_la_SOURCES = cdc.c cdc-helper.c
+cdc_la_LIBADD = $(top_builddir)/libglusterfs/src/libglusterfs.la
+
+AM_CPPFLAGS = $(GF_CPPFLAGS) -I$(top_srcdir)/libglusterfs/src -fPIC -D_FILE_OFFSET_BITS=64 -D_GNU_SOURCE -D$(GF_HOST_OS) \
+-shared -nostartfiles $(LIBZ_CFLAGS)
+
+AM_CFLAGS = -Wall $(GF_CFLAGS)
+
+CLEANFILES =
diff --git a/xlators/features/compress/src/cdc-helper.c b/xlators/features/compress/src/cdc-helper.c
new file mode 100644
index 00000000000..54432ff455c
--- /dev/null
+++ b/xlators/features/compress/src/cdc-helper.c
@@ -0,0 +1,547 @@
+/*
+ Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com>
+ This file is part of GlusterFS.
+
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
+*/
+
+#ifndef _CONFIG_H
+#define _CONFIG_H
+#include "config.h"
+#endif
+
+#include "glusterfs.h"
+#include "logging.h"
+
+#include "cdc.h"
+#include "cdc-mem-types.h"
+
+#ifdef HAVE_LIB_Z
+#include "zlib.h"
+#endif
+
+#ifdef HAVE_LIB_Z
+/* gzip header looks something like this
+ * (RFC 1950)
+ *
+ * +---+---+---+---+---+---+---+---+---+---+
+ * |ID1|ID2|CM |FLG| MTIME |XFL|OS |
+ * +---+---+---+---+---+---+---+---+---+---+
+ *
+ * Data is usually sent without this header i.e
+ * Data sent = <compressed-data> + trailer(8)
+ * The trailer contains the checksum.
+ *
+ * gzip_header is added only during debugging.
+ * Refer to the function cdc_dump_iovec_to_disk
+ */
+static const char gzip_header[10] =
+ {
+ '\037', '\213', Z_DEFLATED, 0,
+ 0, 0, 0, 0,
+ 0, GF_CDC_OS_ID
+ };
+
+static int32_t
+cdc_next_iovec (xlator_t *this, cdc_info_t *ci)
+{
+ int ret = -1;
+
+ ci->ncount++;
+ /* check for iovec overflow -- should not happen */
+ if (ci->ncount == MAX_IOVEC) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "Zlib output buffer overflow"
+ " ->ncount (%d) | ->MAX_IOVEC (%d)",
+ ci->ncount, MAX_IOVEC);
+ goto out;
+ }
+
+ ret = 0;
+
+ out:
+ return ret;
+}
+
+static void
+cdc_put_long (unsigned char *string, unsigned long x)
+{
+ string[0] = (unsigned char) (x & 0xff);
+ string[1] = (unsigned char) ((x & 0xff00) >> 8);
+ string[2] = (unsigned char) ((x & 0xff0000) >> 16);
+ string[3] = (unsigned char) ((x & 0xff000000) >> 24);
+}
+
+static unsigned long
+cdc_get_long (unsigned char *buf)
+{
+ return ((unsigned long) buf[0])
+ | (((unsigned long) buf[1]) << 8)
+ | (((unsigned long) buf[2]) << 16)
+ | (((unsigned long) buf[3]) << 24);
+}
+
+static int32_t
+cdc_init_gzip_trailer (xlator_t *this, cdc_priv_t *priv, cdc_info_t *ci)
+{
+ int ret = -1;
+ char *buf = NULL;
+
+ ret = cdc_next_iovec (this, ci);
+ if (ret)
+ goto out;
+
+ buf = CURR_VEC(ci).iov_base =
+ (char *) GF_CALLOC (1, GF_CDC_VALIDATION_SIZE,
+ gf_cdc_mt_gzip_trailer_t);
+
+ if (!CURR_VEC(ci).iov_base)
+ goto out;
+
+ CURR_VEC(ci).iov_len = GF_CDC_VALIDATION_SIZE;
+
+ cdc_put_long ((unsigned char *)&buf[0], ci->crc);
+ cdc_put_long ((unsigned char *)&buf[4], ci->stream.total_in);
+
+ ret = 0;
+
+ out:
+ return ret;
+}
+
+static int32_t
+cdc_alloc_iobuf_and_init_vec (xlator_t *this,
+ cdc_priv_t *priv, cdc_info_t *ci,
+ int size)
+{
+ int ret = -1;
+ int alloc_len = 0;
+ struct iobuf *iobuf = NULL;
+
+ ret = cdc_next_iovec (this, ci);
+ if (ret)
+ goto out;
+
+ alloc_len = size ? size : ci->buffer_size;
+
+ iobuf = iobuf_get2 (this->ctx->iobuf_pool, alloc_len);
+ if (!iobuf)
+ goto out;
+
+ ret = iobref_add (ci->iobref, iobuf);
+ if (ret)
+ goto out;
+
+ /* Initialize this iovec */
+ CURR_VEC(ci).iov_base = iobuf->ptr;
+ CURR_VEC(ci).iov_len = alloc_len;
+
+ ret = 0;
+
+ out:
+ return ret;
+}
+
+static void
+cdc_init_zlib_output_stream (cdc_priv_t *priv, cdc_info_t *ci, int size)
+{
+ ci->stream.next_out = (unsigned char *) CURR_VEC(ci).iov_base;
+ ci->stream.avail_out = size ? size : ci->buffer_size;
+}
+
+/* This routine is for testing and debugging only.
+ * Data written = header(10) + <compressed-data> + trailer(8)
+ * So each gzip dump file is at least 18 bytes in size.
+ */
+void
+cdc_dump_iovec_to_disk (xlator_t *this, cdc_info_t *ci, const char *file)
+{
+ int i = 0;
+ int fd = 0;
+ size_t writen = 0;
+ size_t total_writen = 0;
+
+ fd = open (file, O_WRONLY|O_CREAT|O_TRUNC, 0777 );
+ if (fd < 0) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "Cannot open file: %s", file);
+ return;
+ }
+
+ writen = write (fd, (char *) gzip_header, 10);
+ total_writen += writen;
+ for (i = 0; i < ci->ncount; i++) {
+ writen = write (fd, (char *) ci->vec[i].iov_base, ci->vec[i].iov_len);
+ total_writen += writen;
+ }
+
+ gf_log (this->name, GF_LOG_DEBUG,
+ "dump'd %zu bytes to %s", total_writen, GF_CDC_DEBUG_DUMP_FILE );
+
+ close (fd);
+}
+
+static int32_t
+cdc_flush_libz_buffer (cdc_priv_t *priv, xlator_t *this, cdc_info_t *ci,
+ int (*libz_func)(z_streamp, int),
+ int flush)
+{
+ int32_t ret = Z_OK;
+ int done = 0;
+ unsigned int deflate_len = 0;
+
+ for (;;) {
+ deflate_len = ci->buffer_size - ci->stream.avail_out;
+
+ if (deflate_len != 0) {
+ CURR_VEC(ci).iov_len = deflate_len;
+
+ ret = cdc_alloc_iobuf_and_init_vec (this, priv, ci, 0);
+ if (ret) {
+ ret = Z_MEM_ERROR;
+ break;
+ }
+
+ /* Re-position Zlib output buffer */
+ cdc_init_zlib_output_stream (priv, ci, 0);
+ }
+
+ if (done) {
+ ci->ncount--;
+ break;
+ }
+
+ ret = libz_func (&ci->stream, flush);
+
+ if (ret == Z_BUF_ERROR) {
+ ret = Z_OK;
+ ci->ncount--;
+ break;
+ }
+
+ done = (ci->stream.avail_out != 0 || ret == Z_STREAM_END);
+
+ if (ret != Z_OK && ret != Z_STREAM_END)
+ break;
+ }
+
+ return ret;
+}
+
+static int32_t
+do_cdc_compress (struct iovec *vec, xlator_t *this, cdc_priv_t *priv,
+ cdc_info_t *ci)
+{
+ int ret = -1;
+
+ /* Initialize defalte */
+ ret = deflateInit2 (&ci->stream, priv->cdc_level, Z_DEFLATED,
+ priv->window_size, priv->mem_level,
+ Z_DEFAULT_STRATEGY);
+
+ if (ret) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "unable to init Zlib (retval: %d)", ret);
+ goto out;
+ }
+
+ ret = cdc_alloc_iobuf_and_init_vec (this, priv, ci, 0);
+ if (ret)
+ goto out;
+
+ /* setup output buffer */
+ cdc_init_zlib_output_stream (priv, ci, 0);
+
+ /* setup input buffer */
+ ci->stream.next_in = (unsigned char *) vec->iov_base;
+ ci->stream.avail_in = vec->iov_len;
+
+ ci->crc = crc32 (ci->crc, (const Bytef *) vec->iov_base, vec->iov_len);
+
+ gf_log (this->name, GF_LOG_DEBUG, "crc=%lu len=%d buffer_size=%d",
+ ci->crc, ci->stream.avail_in, ci->buffer_size);
+
+ /* compress !! */
+ while (ci->stream.avail_in != 0) {
+ if (ci->stream.avail_out == 0) {
+
+ CURR_VEC(ci).iov_len = ci->buffer_size;
+
+ ret = cdc_alloc_iobuf_and_init_vec (this, priv, ci, 0);
+ if (ret)
+ break;
+
+ /* Re-position Zlib output buffer */
+ cdc_init_zlib_output_stream (priv, ci, 0);
+ }
+
+ ret = deflate (&ci->stream, Z_NO_FLUSH);
+ if (ret != Z_OK)
+ break;
+ }
+
+ out:
+ return ret;
+}
+
+int32_t
+cdc_compress (xlator_t *this, cdc_priv_t *priv, cdc_info_t *ci,
+ dict_t **xdata)
+{
+ int ret = -1;
+ int i = 0;
+
+ ci->iobref = iobref_new ();
+ if (!ci->iobref)
+ goto out;
+
+ if (!*xdata) {
+ *xdata = dict_new ();
+ if (!*xdata) {
+ gf_log (this->name, GF_LOG_ERROR, "Cannot allocate xdata"
+ " dict");
+ goto out;
+ }
+ }
+
+ /* data */
+ for (i = 0; i < ci->count; i++) {
+ ret = do_cdc_compress (&ci->vector[i], this, priv, ci);
+ if (ret != Z_OK)
+ goto deflate_cleanup_out;
+ }
+
+ /* flush zlib buffer */
+ ret = cdc_flush_libz_buffer (priv, this, ci, deflate, Z_FINISH);
+ if (!(ret == Z_OK || ret == Z_STREAM_END)) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "Compression Error: ret (%d)", ret);
+ ret = -1;
+ goto deflate_cleanup_out;
+ }
+
+ /* trailer */
+ ret = cdc_init_gzip_trailer (this, priv, ci);
+ if (ret)
+ goto deflate_cleanup_out;
+
+ gf_log (this->name, GF_LOG_DEBUG,
+ "Compressed %ld to %ld bytes",
+ ci->stream.total_in, ci->stream.total_out);
+
+ ci->nbytes = ci->stream.total_out + GF_CDC_VALIDATION_SIZE;
+
+ /* set deflated canary value for identification */
+ ret = dict_set_int32 (*xdata, GF_CDC_DEFLATE_CANARY_VAL, 1);
+ if (ret) {
+ /* Send uncompressed data if we can't _tell_ the client
+ * that deflated data is on it's way. So, we just log
+ * the faliure and continue as usual.
+ */
+ gf_log (this->name, GF_LOG_ERROR,
+ "Data deflated, but could not set canary"
+ " value in dict for identification");
+ }
+
+ /* This is to be used in testing */
+ if ( priv->debug ) {
+ cdc_dump_iovec_to_disk (this, ci, GF_CDC_DEBUG_DUMP_FILE );
+ }
+
+ deflate_cleanup_out:
+ (void) deflateEnd(&ci->stream);
+
+ out:
+ return ret;
+}
+
+
+/* deflate content is checked by the presence of a canary
+ * value in the dict as the key
+ */
+static int32_t
+cdc_check_content_for_deflate (dict_t *xdata)
+{
+ return dict_get (xdata, GF_CDC_DEFLATE_CANARY_VAL) ? -1 : 0;
+}
+
+static unsigned long
+cdc_extract_crc (char *trailer)
+{
+ return cdc_get_long ((unsigned char *) &trailer[0]);
+}
+
+static unsigned long
+cdc_extract_size (char *trailer)
+{
+ return cdc_get_long ((unsigned char *) &trailer[4]);
+}
+
+static int32_t
+cdc_validate_inflate (cdc_info_t *ci, unsigned long crc,
+ unsigned long len)
+{
+ return !((crc == ci->crc)
+ /* inflated length is hidden inside
+ * Zlib stream struct */
+ && (len == ci->stream.total_out));
+}
+
+static int32_t
+do_cdc_decompress (xlator_t *this, cdc_priv_t *priv, cdc_info_t *ci)
+{
+ int ret = -1;
+ int i = 0;
+ int len = 0;
+ char *inflte = NULL;
+ char *trailer = NULL;
+ struct iovec vec = {0,};
+ unsigned long computed_crc = 0;
+ unsigned long computed_len = 0;
+
+ ret = inflateInit2 (&ci->stream, priv->window_size);
+ if (ret) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "Zlib: Unable to initialize inflate");
+ goto out;
+ }
+
+ vec = THIS_VEC(ci, 0);
+
+ trailer = (char *) (((char *) vec.iov_base) + vec.iov_len
+ - GF_CDC_VALIDATION_SIZE);
+
+ /* CRC of uncompressed data */
+ computed_crc = cdc_extract_crc (trailer);
+
+ /* size of uncomrpessed data */
+ computed_len = cdc_extract_size (trailer);
+
+ gf_log (this->name, GF_LOG_DEBUG, "crc=%lu len=%lu buffer_size=%d",
+ computed_crc, computed_len, ci->buffer_size);
+
+ inflte = vec.iov_base ;
+ len = vec.iov_len - GF_CDC_VALIDATION_SIZE;
+
+ /* allocate buffer of the original length of the data */
+ ret = cdc_alloc_iobuf_and_init_vec (this, priv, ci, 0);
+ if (ret)
+ goto out;
+
+ /* setup output buffer */
+ cdc_init_zlib_output_stream (priv, ci, 0);
+
+ /* setup input buffer */
+ ci->stream.next_in = (unsigned char *) inflte;
+ ci->stream.avail_in = len;
+
+ while (ci->stream.avail_in != 0) {
+ if (ci->stream.avail_out == 0) {
+ CURR_VEC(ci).iov_len = ci->buffer_size;
+
+ ret = cdc_alloc_iobuf_and_init_vec (this, priv, ci, 0);
+ if (ret)
+ break;
+
+ /* Re-position Zlib output buffer */
+ cdc_init_zlib_output_stream (priv, ci, 0);
+ }
+
+ ret = inflate (&ci->stream, Z_NO_FLUSH);
+ if (ret == Z_STREAM_ERROR)
+ break;
+ }
+
+ /* flush zlib buffer */
+ ret = cdc_flush_libz_buffer (priv, this, ci, inflate, Z_SYNC_FLUSH);
+ if (!(ret == Z_OK || ret == Z_STREAM_END)) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "Decompression Error: ret (%d)", ret);
+ ret = -1;
+ goto out;
+ }
+
+ /* compute CRC of the uncompresses data to check for
+ * correctness */
+
+ for (i = 0; i < ci->ncount; i++) {
+ ci->crc = crc32 (ci->crc,
+ (const Bytef *) ci->vec[i].iov_base,
+ ci->vec[i].iov_len);
+ }
+
+ /* validate inflated data */
+ ret = cdc_validate_inflate (ci, computed_crc, computed_len);
+ if (ret) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "Checksum or length mismatched in inflated data");
+ }
+
+ out:
+ return ret;
+}
+
+int32_t
+cdc_decompress (xlator_t *this, cdc_priv_t *priv, cdc_info_t *ci,
+ dict_t *xdata)
+{
+ int32_t ret = -1;
+
+ /* check for deflate content */
+ if (!cdc_check_content_for_deflate (xdata)) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "Content not deflated, passing through ...");
+ goto passthrough_out;
+ }
+
+ ci->iobref = iobref_new ();
+ if (!ci->iobref)
+ goto passthrough_out;
+
+ /* do we need to do this? can we assume that one iovec
+ * will hold per request data everytime?
+ *
+ * server/client protocol seems to deal with a single
+ * iovec even if op_ret > 1M. So, it looks ok to
+ * assume that a single iovec will contain all the
+ * data (This saves us a lot from finding the trailer
+ * and the data since it could have been split-up onto
+ * two adjacent iovec's.
+ *
+ * But, in case this translator is loaded above quick-read
+ * for some reason, then it's entirely possible that we get
+ * multiple iovec's...
+ *
+ * This case (handled below) is not tested. (by loading the
+ * xlator below quick-read)
+ */
+
+ /* @@ I_HOPE_THIS_IS_NEVER_HIT */
+ if (ci->count > 1) {
+ gf_log (this->name, GF_LOG_WARNING, "unable to handle"
+ " multiple iovecs (%d in number)", ci->count);
+ goto inflate_cleanup_out;
+ /* TODO: coallate all iovecs in one */
+ }
+
+ ret = do_cdc_decompress (this, priv, ci);
+ if (ret)
+ goto inflate_cleanup_out;
+
+ ci->nbytes = ci->stream.total_out;
+
+ gf_log (this->name, GF_LOG_DEBUG,
+ "Inflated %ld to %ld bytes",
+ ci->stream.total_in, ci->stream.total_out);
+
+ inflate_cleanup_out:
+ (void) inflateEnd (&ci->stream);
+
+ passthrough_out:
+ return ret;
+}
+
+#endif
diff --git a/xlators/features/compress/src/cdc-mem-types.h b/xlators/features/compress/src/cdc-mem-types.h
new file mode 100644
index 00000000000..efa00805987
--- /dev/null
+++ b/xlators/features/compress/src/cdc-mem-types.h
@@ -0,0 +1,22 @@
+/*
+ Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com>
+ This file is part of GlusterFS.
+
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
+*/
+
+#ifndef __CDC_MEM_TYPES_H
+#define __CDC_MEM_TYPES_H
+
+#include "mem-types.h"
+
+enum gf_cdc_mem_types {
+ gf_cdc_mt_priv_t = gf_common_mt_end + 1,
+ gf_cdc_mt_vec_t = gf_common_mt_end + 2,
+ gf_cdc_mt_gzip_trailer_t = gf_common_mt_end + 3,
+};
+
+#endif
diff --git a/xlators/features/compress/src/cdc.c b/xlators/features/compress/src/cdc.c
new file mode 100644
index 00000000000..eb7d87c5698
--- /dev/null
+++ b/xlators/features/compress/src/cdc.c
@@ -0,0 +1,342 @@
+/*
+ Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com>
+ This file is part of GlusterFS.
+
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
+*/
+
+#include <sys/uio.h>
+
+#ifndef _CONFIG_H
+#define _CONFIG_H
+#include "config.h"
+#endif
+
+#include "xlator.h"
+#include "defaults.h"
+#include "logging.h"
+
+#include "cdc.h"
+#include "cdc-mem-types.h"
+
+static void
+cdc_cleanup_iobref (cdc_info_t *ci)
+{
+ assert(ci->iobref != NULL);
+ iobref_clear (ci->iobref);
+}
+
+int32_t
+cdc_readv_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno,
+ struct iovec *vector, int32_t count,
+ struct iatt *stbuf, struct iobref *iobref,
+ dict_t *xdata)
+{
+ int ret = -1;
+ cdc_priv_t *priv = NULL;
+ cdc_info_t ci = {0,};
+
+ GF_VALIDATE_OR_GOTO ("cdc", this, default_out);
+ GF_VALIDATE_OR_GOTO (this->name, frame, default_out);
+
+ priv = this->private;
+
+ if (op_ret <= 0)
+ goto default_out;
+
+ if ( (priv->min_file_size != 0)
+ && (op_ret < priv->min_file_size) )
+ goto default_out;
+
+ ci.count = count;
+ ci.ibytes = op_ret;
+ ci.vector = vector;
+ ci.buf = NULL;
+ ci.iobref = NULL;
+ ci.ncount = 0;
+ ci.crc = 0;
+ ci.buffer_size = GF_CDC_DEF_BUFFERSIZE;
+
+/* A readv compresses on the server side and decompresses on the client side
+ */
+ if (priv->op_mode == GF_CDC_MODE_SERVER) {
+ ret = cdc_compress (this, priv, &ci, &xdata);
+ } else if (priv->op_mode == GF_CDC_MODE_CLIENT) {
+ ret = cdc_decompress (this, priv, &ci, xdata);
+ } else {
+ gf_log (this->name, GF_LOG_ERROR,
+ "Invalid operation mode (%d)", priv->op_mode);
+ }
+
+ if (ret)
+ goto default_out;
+
+ STACK_UNWIND_STRICT (readv, frame, ci.nbytes, op_errno,
+ ci.vec, ci.ncount, stbuf, iobref,
+ xdata);
+ cdc_cleanup_iobref (&ci);
+ return 0;
+
+ default_out:
+ STACK_UNWIND_STRICT (readv, frame, op_ret, op_errno,
+ vector, count, stbuf, iobref, xdata);
+ return 0;
+}
+
+int32_t
+cdc_readv (call_frame_t *frame, xlator_t *this,
+ fd_t *fd, size_t size, off_t offset, uint32_t flags,
+ dict_t *xdata)
+{
+ fop_readv_cbk_t cbk = NULL;
+
+#ifdef HAVE_LIB_Z
+ cbk = cdc_readv_cbk;
+#else
+ cbk = default_readv_cbk;
+#endif
+ STACK_WIND (frame, cbk, FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->readv,
+ fd, size, offset, flags, xdata);
+ return 0;
+}
+
+int32_t
+cdc_writev_cbk (call_frame_t *frame,
+ void *cookie,
+ xlator_t *this,
+ int32_t op_ret,
+ int32_t op_errno,
+ struct iatt *prebuf,
+ struct iatt *postbuf, dict_t *xdata)
+{
+
+ STACK_UNWIND_STRICT (writev, frame, op_ret, op_errno, prebuf, postbuf, xdata);
+ return 0;
+}
+
+int32_t
+cdc_writev (call_frame_t *frame,
+ xlator_t *this,
+ fd_t *fd,
+ struct iovec *vector,
+ int32_t count,
+ off_t offset,
+ uint32_t flags,
+ struct iobref *iobref, dict_t *xdata)
+{
+ int ret = -1;
+ cdc_priv_t *priv = NULL;
+ cdc_info_t ci = {0,};
+ size_t isize = 0;
+
+ GF_VALIDATE_OR_GOTO ("cdc", this, default_out);
+ GF_VALIDATE_OR_GOTO (this->name, frame, default_out);
+
+ priv = this->private;
+
+ isize = iov_length(vector, count);
+
+ if (isize <= 0)
+ goto default_out;
+
+ if ( (priv->min_file_size != 0)
+ && (isize < priv->min_file_size) )
+ goto default_out;
+
+ ci.count = count;
+ ci.ibytes = isize;
+ ci.vector = vector;
+ ci.buf = NULL;
+ ci.iobref = NULL;
+ ci.ncount = 0;
+ ci.crc = 0;
+ ci.buffer_size = GF_CDC_DEF_BUFFERSIZE;
+
+/* A writev compresses on the client side and decompresses on the server side
+ */
+ if (priv->op_mode == GF_CDC_MODE_CLIENT) {
+ ret = cdc_compress (this, priv, &ci, &xdata);
+ } else if (priv->op_mode == GF_CDC_MODE_SERVER) {
+ ret = cdc_decompress (this, priv, &ci, xdata);
+ } else {
+ gf_log (this->name, GF_LOG_ERROR, "Invalid operation mode (%d) ", priv->op_mode);
+ }
+
+ if (ret)
+ goto default_out;
+
+ STACK_WIND (frame,
+ cdc_writev_cbk,
+ FIRST_CHILD (this),
+ FIRST_CHILD (this)->fops->writev,
+ fd, ci.vec, ci.ncount, offset, flags,
+ iobref, xdata);
+
+ cdc_cleanup_iobref (&ci);
+ return 0;
+
+ default_out:
+ STACK_WIND (frame,
+ cdc_writev_cbk,
+ FIRST_CHILD (this),
+ FIRST_CHILD (this)->fops->writev,
+ fd, vector, count, offset, flags,
+ iobref, xdata);
+ return 0;
+}
+
+int32_t
+init (xlator_t *this)
+{
+ int ret = -1;
+ char *temp_str = NULL;
+ cdc_priv_t *priv = NULL;
+
+ GF_VALIDATE_OR_GOTO ("cdc", this, err);
+
+ if (!this->children || this->children->next) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "Need subvolume == 1");
+ goto err;
+ }
+
+ if (!this->parents) {
+ gf_log (this->name, GF_LOG_WARNING,
+ "Dangling volume. Check volfile");
+ }
+
+ priv = GF_CALLOC (1, sizeof (*priv), gf_cdc_mt_priv_t);
+ if (!priv) {
+ goto err;
+ }
+
+ /* Check if debug mode is turned on */
+ GF_OPTION_INIT ("debug", priv->debug, bool, err);
+ if( priv->debug ) {
+ gf_log (this->name, GF_LOG_DEBUG, "CDC debug option turned on");
+ }
+
+ /* Set Gzip Window Size */
+ GF_OPTION_INIT ("window-size", priv->window_size, int32, err);
+ if ( (priv->window_size > GF_CDC_MAX_WINDOWSIZE)
+ || (priv->window_size < GF_CDC_DEF_WINDOWSIZE) ) {
+ gf_log (this->name, GF_LOG_WARNING,
+ "Invalid gzip window size (%d), using default",
+ priv->window_size);
+ priv->window_size = GF_CDC_DEF_WINDOWSIZE;
+ }
+
+ /* Set Gzip (De)Compression Level */
+ GF_OPTION_INIT ("compression-level", priv->cdc_level, int32, err);
+ if ( ((priv->cdc_level < 1) || (priv->cdc_level > 9))
+ && (priv->cdc_level != GF_CDC_DEF_COMPRESSION) ) {
+ gf_log (this->name, GF_LOG_WARNING,
+ "Invalid gzip (de)compression level (%d),"
+ " using default", priv->cdc_level);
+ priv->cdc_level = GF_CDC_DEF_COMPRESSION;
+ }
+
+ /* Set Gzip Memory Level */
+ GF_OPTION_INIT ("mem-level", priv->mem_level, int32, err);
+ if ( (priv->mem_level < 1) || (priv->mem_level > 9) ) {
+ gf_log (this->name, GF_LOG_WARNING,
+ "Invalid gzip memory level, using the default");
+ priv->mem_level = GF_CDC_DEF_MEMLEVEL;
+ }
+
+ /* Set min file size to enable compression */
+ GF_OPTION_INIT ("min-size", priv->min_file_size, int32, err);
+
+ /* Mode of operation - Server/Client */
+ ret = dict_get_str (this->options, "mode", &temp_str);
+ if (ret) {
+ gf_log (this->name, GF_LOG_CRITICAL,
+ "Operation mode not specified !!");
+ goto err;
+ }
+
+ if (GF_CDC_MODE_IS_CLIENT (temp_str)) {
+ priv->op_mode = GF_CDC_MODE_CLIENT;
+ } else if (GF_CDC_MODE_IS_SERVER (temp_str)) {
+ priv->op_mode = GF_CDC_MODE_SERVER;
+ } else {
+ gf_log (this->name, GF_LOG_CRITICAL,
+ "Bogus operation mode (%s) specified", temp_str);
+ goto err;
+ }
+
+ this->private = priv;
+ gf_log (this->name, GF_LOG_DEBUG, "CDC xlator loaded in (%s) mode",temp_str);
+ return 0;
+
+ err:
+ if (priv)
+ GF_FREE (priv);
+
+ return -1;
+}
+
+void
+fini (xlator_t *this)
+{
+ cdc_priv_t *priv = this->private;
+
+ if (priv)
+ GF_FREE (priv);
+ this->private = NULL;
+ return;
+}
+
+struct xlator_fops fops = {
+ .readv = cdc_readv,
+ .writev = cdc_writev,
+};
+
+struct xlator_cbks cbks = {
+};
+
+struct volume_options options[] = {
+ { .key = {"window-size"},
+ .default_value = "-15",
+ .type = GF_OPTION_TYPE_INT,
+ .description = "Size of the zlib history buffer."
+ },
+ { .key = {"mem-level"},
+ .default_value = "8",
+ .type = GF_OPTION_TYPE_INT,
+ .description = "Memory allocated for internal compression state.\
+ 1 uses minimum memory but is slow and reduces \
+ compression ratio; memLevel=9 uses maximum memory \
+ for optimal speed. The default value is 8."
+ },
+ { .key = {"compression-level"},
+ .default_value = "-1",
+ .type = GF_OPTION_TYPE_INT,
+ .description = "Compression levels \
+ 0 : no compression, 1 : best speed, \
+ 9 : best compression, -1 : default compression "
+ },
+ { .key = {"min-size"},
+ .default_value = "0",
+ .type = GF_OPTION_TYPE_INT,
+ .description = "Data is compressed only when its size exceeds this."
+ },
+ { .key = {"mode"},
+ .value = {"server", "client"},
+ .type = GF_OPTION_TYPE_STR,
+ .description = "Set on the basis of where the xlator is loaded."
+ },
+ { .key = {"debug"},
+ .default_value = "false",
+ .type = GF_OPTION_TYPE_BOOL,
+ .description = "This is used in testing. Will dump compressed data \
+ to disk as a gzip file."
+ },
+ { .key = {NULL}
+ },
+};
diff --git a/xlators/features/compress/src/cdc.h b/xlators/features/compress/src/cdc.h
new file mode 100644
index 00000000000..71f4d2317bb
--- /dev/null
+++ b/xlators/features/compress/src/cdc.h
@@ -0,0 +1,107 @@
+/*
+ Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com>
+ This file is part of GlusterFS.
+
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
+*/
+
+#ifndef __CDC_H
+#define __CDC_H
+
+#ifdef HAVE_LIB_Z
+#include "zlib.h"
+#endif
+
+#include "xlator.h"
+
+#ifndef MAX_IOVEC
+#define MAX_IOVEC 16
+#endif
+
+typedef struct cdc_priv {
+ int window_size;
+ int mem_level;
+ int cdc_level;
+ int min_file_size;
+ int op_mode;
+ gf_boolean_t debug;
+ gf_lock_t lock;
+} cdc_priv_t;
+
+typedef struct cdc_info {
+ /* input bits */
+ int count;
+ int32_t ibytes;
+ struct iovec *vector;
+ struct iatt *buf;
+
+ /* output bits */
+ int ncount;
+ int nbytes;
+ int buffer_size;
+ struct iovec vec[MAX_IOVEC];
+ struct iobref *iobref;
+
+ /* zlib bits */
+#ifdef HAVE_LIB_Z
+ z_stream stream;
+#endif
+ unsigned long crc;
+} cdc_info_t;
+
+#define NVEC(ci) (ci->ncount - 1)
+#define CURR_VEC(ci) ci->vec[ci->ncount - 1]
+#define THIS_VEC(ci, i) ci->vector[i]
+
+/* Gzip defaults */
+#define GF_CDC_DEF_WINDOWSIZE -15 /* default value */
+#define GF_CDC_MAX_WINDOWSIZE -8 /* max value */
+
+#ifdef HAVE_LIB_Z
+#define GF_CDC_DEF_COMPRESSION Z_DEFAULT_COMPRESSION
+#else
+#define GF_CDC_DEF_COMPRESSION -1
+#endif
+
+#define GF_CDC_DEF_MEMLEVEL 8
+#define GF_CDC_DEF_BUFFERSIZE 262144 // 256K - default compression buffer size
+
+/* Operation mode
+ * If xlator is loaded on client, readv decompresses and writev compresses
+ * If xlator is loaded on server, readv compresses and writev decompresses
+ */
+#define GF_CDC_MODE_CLIENT 0
+#define GF_CDC_MODE_SERVER 1
+
+/* min size of data to do cmpression
+ * 0 == compress even 1byte
+ */
+#define GF_CDC_MIN_CHUNK_SIZE 0
+
+#define GF_CDC_VALIDATION_SIZE 8
+
+#define GF_CDC_OS_ID 0xFF
+#define GF_CDC_DEFLATE_CANARY_VAL "deflate"
+#define GF_CDC_DEBUG_DUMP_FILE "/tmp/cdcdump.gz"
+
+#define GF_CDC_MODE_IS_CLIENT(m) \
+ (strcmp (m, "client") == 0)
+
+#define GF_CDC_MODE_IS_SERVER(m) \
+ (strcmp (m, "server") == 0)
+
+int32_t
+cdc_compress (xlator_t *this,
+ cdc_priv_t *priv,
+ cdc_info_t *ci,
+ dict_t **xdata);
+int32_t
+cdc_decompress (xlator_t *this,
+ cdc_priv_t *priv,
+ cdc_info_t *ci,
+ dict_t *xdata);
+
+#endif