diff options
| -rw-r--r-- | configure.ac | 13 | ||||
| -rwxr-xr-x | tests/basic/cdc.t | 135 | ||||
| -rw-r--r-- | xlators/features/Makefile.am | 2 | ||||
| -rw-r--r-- | xlators/features/compress/Makefile.am | 3 | ||||
| -rw-r--r-- | xlators/features/compress/src/Makefile.am | 17 | ||||
| -rw-r--r-- | xlators/features/compress/src/cdc-helper.c | 547 | ||||
| -rw-r--r-- | xlators/features/compress/src/cdc-mem-types.h | 22 | ||||
| -rw-r--r-- | xlators/features/compress/src/cdc.c | 342 | ||||
| -rw-r--r-- | xlators/features/compress/src/cdc.h | 107 | ||||
| -rw-r--r-- | xlators/mgmt/glusterd/src/glusterd-volgen.c | 25 | ||||
| -rw-r--r-- | xlators/mgmt/glusterd/src/glusterd-volume-set.c | 44 | 
11 files changed, 1256 insertions, 1 deletions
diff --git a/configure.ac b/configure.ac index b8cb855a01c..cf301b2901a 100644 --- a/configure.ac +++ b/configure.ac @@ -113,6 +113,8 @@ AC_CONFIG_FILES([Makefile                  xlators/features/marker/src/Makefile                  xlators/features/read-only/Makefile                  xlators/features/read-only/src/Makefile +                xlators/features/compress/Makefile +                xlators/features/compress/src/Makefile                  xlators/features/mac-compat/Makefile                  xlators/features/mac-compat/src/Makefile                  xlators/features/quiesce/Makefile @@ -484,6 +486,17 @@ AC_SUBST(SYNCDAEMON_COMPILE)  AC_SUBST(SYNCDAEMON_SUBDIR)  # end SYNCDAEMON section +# CDC xlator - check if libz is present if so enable HAVE_LIB_Z +echo -n "checking if libz is present... " + +PKG_CHECK_MODULES([ZLIB], [zlib >= 1.2.0], +                  [echo "yes (features requiring zlib enabled)"  AC_DEFINE(HAVE_LIB_Z, 1, [define if zlib is present])], +                  [echo "no"] ) + +AC_SUBST(LIBZ_CFLAGS) +AC_SUBST(LIBZ_LIBS) +# end CDC xlator secion +  # check for systemtap/dtrace  BUILD_SYSTEMTAP=no  AC_MSG_CHECKING([whether to include systemtap tracing support]) diff --git a/tests/basic/cdc.t b/tests/basic/cdc.t new file mode 100755 index 00000000000..4cd915aa9ca --- /dev/null +++ b/tests/basic/cdc.t @@ -0,0 +1,135 @@ +#!/bin/bash + +. $(dirname $0)/../include.rc +. $(dirname $0)/../volume.rc + +cleanup; + +TEST glusterd +TEST pidof glusterd + +## Create a volume with one brick +TEST $CLI volume create $V0 $H0:$B0/${V0}1; +EXPECT "$V0" volinfo_field $V0 'Volume Name'; +EXPECT 'Created' volinfo_field $V0 'Status'; +EXPECT '1' brick_count $V0 + +## Turn off performance translators +## This is required for testing readv calls +TEST $CLI volume set $V0 performance.io-cache off +EXPECT 'off' volinfo_field $V0 'performance.io-cache' +TEST $CLI volume set $V0 performance.quick-read off +EXPECT 'off' volinfo_field $V0 'performance.quick-read' + +TEST $CLI volume set $V0 strict-write-ordering on +EXPECT 'on' volinfo_field $V0 'performance.strict-write-ordering' + +## Turn on cdc xlator by setting features.compress to on +TEST $CLI volume set $V0 compress on +EXPECT 'on' volinfo_field $V0 'features.compress' +EXPECT 'server' volinfo_field $V0 'compress.mode' + +## Make sure that user cannot change compress.mode +## This would break the cdc xlator if allowed! +TEST $CLI volume set $V0 compress.mode client +EXPECT 'server' volinfo_field $V0 'compress.mode' + +## Turn on compress.debug option +## This will dump compressed data onto disk as gzip file +## This is used to check if compression actually happened +TEST $CLI volume set $V0 compress.debug on +EXPECT 'on' volinfo_field $V0 'compress.debug' + +## Start the volume +TEST $CLI volume start $V0; +EXPECT 'Started' volinfo_field $V0 'Status'; + +## Mount FUSE with caching disabled +TEST glusterfs --entry-timeout=0 --attribute-timeout=0 -s $H0 --volfile-id $V0 $M0; + +#################### +## Testing writev ## +#################### + +## Create a 1K file locally and find the md5sum +TEST dd if=/dev/zero of=/tmp/cdc-orig count=1 bs=1K 2>/dev/null +checksum[original-file]=`md5sum /tmp/cdc-orig | cut -d' ' -f1` + +## Copy the file to mountpoint and find its md5sum on brick +TEST dd if=/tmp/cdc-orig of=$M0/cdc-server count=1 bs=1K 2>/dev/null +checksum[brick-file]=`md5sum $B0/${V0}1/cdc-server | cut -d' ' -f1` + +## Uncompress the gzip dump file and find its md5sum +EXPECT '/tmp/cdcdump.gz: application/x-gzip; charset=binary' file -i /tmp/cdcdump.gz +TEST gunzip -f /tmp/cdcdump.gz +checksum[dump-file-writev]=`md5sum /tmp/cdcdump | cut -d' ' -f1` + +## Check if all 3 checksums are same +TEST test ${checksum[original-file]} = ${checksum[brick-file]} +TEST test ${checksum[brick-file]} = ${checksum[dump-file-writev]} + +## Cleanup files +TEST rm -f /tmp/cdcdump.gz + +################### +## Testing readv ## +################### + +## Copy file from mount point to client and find checksum +TEST dd if=$M0/cdc-server of=/tmp/cdc-client count=1 bs=1K 2>/dev/null +checksum[client-file]=`md5sum /tmp/cdc-client | cut -d' ' -f1` + +## Uncompress the gzip dump file and find its md5sum +EXPECT '/tmp/cdcdump.gz: application/x-gzip; charset=binary' file -i /tmp/cdcdump.gz +TEST gunzip -f /tmp/cdcdump.gz +checksum[dump-file-readv]=`md5sum /tmp/cdcdump | cut -d' ' -f1` + +## Check if all 3 checksums are same +TEST test ${checksum[brick-file]} = ${checksum[client-file]} +TEST test ${checksum[client-file]} = ${checksum[dump-file-readv]} + +## Cleanup files and unmount +TEST rm -f /tmp/cdc* $M0/cdc* +TEST umount $M0 + +## Stop the volume +TEST $CLI volume stop $V0; +EXPECT 'Stopped' volinfo_field $V0 'Status'; + +## Turn on compress.min-size and set it to 100 bytes +## Compression should not take place if file size +## is less than 100 bytes +TEST $CLI volume set $V0 compress.min-size 100 +EXPECT '100' volinfo_field $V0 'compress.min-size' + +## Start the volume +TEST $CLI volume start $V0; +EXPECT 'Started' volinfo_field $V0 'Status'; + +## Mount FUSE with caching disabled +TEST glusterfs --entry-timeout=0 --attribute-timeout=0 -s $H0 --volfile-id $V0 $M0; + +## Create a file of size 99 bytes on mountpoint +## This is should not be compressed +TEST dd if=/dev/zero of=$M0/cdc-small count=1 bs=99 2>/dev/null +TEST ! test -e /tmp/cdcdump.gz + +## Cleanup files and unmount +TEST rm -f /tmp/cdc* $M0/cdc* +TEST umount $M0 + +## Reset the compress options +TEST $CLI volume reset $V0 compress.debug +TEST $CLI volume reset $V0 compress.min-size +TEST $CLI volume reset $V0 compress.mode +TEST $CLI volume reset $V0 features.compress + +## Stop the volume +TEST $CLI volume stop $V0; +EXPECT 'Stopped' volinfo_field $V0 'Status'; + +## Delete the volume +TEST $CLI volume delete $V0; +TEST ! $CLI volume info $V0; + +cleanup; diff --git a/xlators/features/Makefile.am b/xlators/features/Makefile.am index 0e37fc0b7d6..d2f5ef19290 100644 --- a/xlators/features/Makefile.am +++ b/xlators/features/Makefile.am @@ -1,4 +1,4 @@  SUBDIRS = locks quota read-only mac-compat quiesce marker index \ -	  protect changelog gfid-access $(GLUPY_SUBDIR) qemu-block # trash path-converter # filter +	  protect compress changelog gfid-access $(GLUPY_SUBDIR) qemu-block # trash path-converter # filter  CLEANFILES = diff --git a/xlators/features/compress/Makefile.am b/xlators/features/compress/Makefile.am new file mode 100644 index 00000000000..a985f42a877 --- /dev/null +++ b/xlators/features/compress/Makefile.am @@ -0,0 +1,3 @@ +SUBDIRS = src + +CLEANFILES = diff --git a/xlators/features/compress/src/Makefile.am b/xlators/features/compress/src/Makefile.am new file mode 100644 index 00000000000..4a64b52a9a1 --- /dev/null +++ b/xlators/features/compress/src/Makefile.am @@ -0,0 +1,17 @@ +xlator_LTLIBRARIES = cdc.la + +xlatordir = $(libdir)/glusterfs/$(PACKAGE_VERSION)/xlator/features + +noinst_HEADERS = cdc.h cdc-mem-types.h + +cdc_la_LDFLAGS = -module -avoidversion $(LIBZ_LIBS) + +cdc_la_SOURCES = cdc.c cdc-helper.c +cdc_la_LIBADD = $(top_builddir)/libglusterfs/src/libglusterfs.la + +AM_CPPFLAGS = $(GF_CPPFLAGS) -I$(top_srcdir)/libglusterfs/src -fPIC -D_FILE_OFFSET_BITS=64 -D_GNU_SOURCE -D$(GF_HOST_OS) \ +-shared -nostartfiles  $(LIBZ_CFLAGS) + +AM_CFLAGS = -Wall $(GF_CFLAGS) + +CLEANFILES = diff --git a/xlators/features/compress/src/cdc-helper.c b/xlators/features/compress/src/cdc-helper.c new file mode 100644 index 00000000000..54432ff455c --- /dev/null +++ b/xlators/features/compress/src/cdc-helper.c @@ -0,0 +1,547 @@ +/* +   Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com> +   This file is part of GlusterFS. + +   This file is licensed to you under your choice of the GNU Lesser +   General Public License, version 3 or any later version (LGPLv3 or +   later), or the GNU General Public License, version 2 (GPLv2), in all +   cases as published by the Free Software Foundation. +*/ + +#ifndef _CONFIG_H +#define _CONFIG_H +#include "config.h" +#endif + +#include "glusterfs.h" +#include "logging.h" + +#include "cdc.h" +#include "cdc-mem-types.h" + +#ifdef HAVE_LIB_Z +#include "zlib.h" +#endif + +#ifdef HAVE_LIB_Z +/* gzip header looks something like this + * (RFC 1950) + * + * +---+---+---+---+---+---+---+---+---+---+ + * |ID1|ID2|CM |FLG|     MTIME     |XFL|OS | + * +---+---+---+---+---+---+---+---+---+---+ + * + * Data is usually sent without this header i.e + * Data sent = <compressed-data> + trailer(8) + * The trailer contains the checksum. + * + * gzip_header is added only during debugging. + * Refer to the function cdc_dump_iovec_to_disk + */ +static const char gzip_header[10] = +        { +                '\037', '\213', Z_DEFLATED, 0, +                0,      0,      0,          0, +                0,      GF_CDC_OS_ID +        }; + +static int32_t +cdc_next_iovec (xlator_t *this, cdc_info_t *ci) +{ +        int ret = -1; + +        ci->ncount++; +        /* check for iovec overflow -- should not happen */ +        if (ci->ncount == MAX_IOVEC) { +                gf_log (this->name, GF_LOG_ERROR, +                        "Zlib output buffer overflow" +                        " ->ncount (%d) | ->MAX_IOVEC (%d)", +                        ci->ncount, MAX_IOVEC); +                goto out; +        } + +        ret = 0; + + out: +        return ret; +} + +static void +cdc_put_long (unsigned char *string, unsigned long x) +{ +        string[0] = (unsigned char) (x & 0xff); +        string[1] = (unsigned char) ((x & 0xff00) >> 8); +        string[2] = (unsigned char) ((x & 0xff0000) >> 16); +        string[3] = (unsigned char) ((x & 0xff000000) >> 24); +} + +static unsigned long +cdc_get_long (unsigned char *buf) +{ +        return ((unsigned long) buf[0]) +                | (((unsigned long) buf[1]) << 8) +                | (((unsigned long) buf[2]) << 16) +                | (((unsigned long) buf[3]) << 24); +} + +static int32_t +cdc_init_gzip_trailer (xlator_t *this, cdc_priv_t *priv, cdc_info_t *ci) +{ +        int   ret = -1; +        char *buf = NULL; + +        ret = cdc_next_iovec (this, ci); +        if (ret) +                goto out; + +        buf = CURR_VEC(ci).iov_base = +                (char *) GF_CALLOC (1, GF_CDC_VALIDATION_SIZE, +                                    gf_cdc_mt_gzip_trailer_t); + +        if (!CURR_VEC(ci).iov_base) +                goto out; + +        CURR_VEC(ci).iov_len = GF_CDC_VALIDATION_SIZE; + +        cdc_put_long ((unsigned char *)&buf[0], ci->crc); +        cdc_put_long ((unsigned char *)&buf[4], ci->stream.total_in); + +        ret = 0; + + out: +        return ret; +} + +static int32_t +cdc_alloc_iobuf_and_init_vec (xlator_t *this, +                              cdc_priv_t *priv, cdc_info_t *ci, +                              int size) +{ +        int           ret       = -1; +        int           alloc_len = 0; +        struct iobuf *iobuf     = NULL; + +        ret = cdc_next_iovec (this, ci); +        if (ret) +                goto out; + +        alloc_len = size ? size : ci->buffer_size; + +        iobuf = iobuf_get2 (this->ctx->iobuf_pool, alloc_len); +        if (!iobuf) +                goto out; + +        ret = iobref_add (ci->iobref, iobuf); +        if (ret) +                goto out; + +        /* Initialize this iovec */ +        CURR_VEC(ci).iov_base = iobuf->ptr; +        CURR_VEC(ci).iov_len  = alloc_len; + +        ret = 0; + + out: +        return ret; +} + +static void +cdc_init_zlib_output_stream (cdc_priv_t *priv, cdc_info_t *ci, int size) +{ +        ci->stream.next_out  = (unsigned char *) CURR_VEC(ci).iov_base; +        ci->stream.avail_out = size ? size : ci->buffer_size; +} + +/* This routine is for testing and debugging only. + * Data written = header(10) + <compressed-data> + trailer(8) + * So each gzip dump file is at least 18 bytes in size. + */ +void +cdc_dump_iovec_to_disk (xlator_t *this, cdc_info_t *ci, const char *file) +{ +        int    i      = 0; +        int    fd     = 0; +        size_t writen = 0; +        size_t total_writen = 0; + +        fd = open (file, O_WRONLY|O_CREAT|O_TRUNC, 0777 ); +        if (fd < 0) { +                gf_log (this->name, GF_LOG_ERROR, +                        "Cannot open file: %s", file); +                return; +        } + +        writen = write (fd, (char *) gzip_header, 10); +        total_writen += writen; +        for (i = 0; i < ci->ncount; i++) { +                writen = write (fd, (char *) ci->vec[i].iov_base, ci->vec[i].iov_len); +                total_writen += writen; +        } + +        gf_log (this->name, GF_LOG_DEBUG, +                        "dump'd %zu bytes to %s", total_writen, GF_CDC_DEBUG_DUMP_FILE ); + +        close (fd); +} + +static int32_t +cdc_flush_libz_buffer (cdc_priv_t *priv, xlator_t *this, cdc_info_t *ci, +                       int (*libz_func)(z_streamp, int), +                       int flush) +{ +        int32_t ret = Z_OK; +        int done = 0; +        unsigned int deflate_len = 0; + +        for (;;) { +                deflate_len = ci->buffer_size - ci->stream.avail_out; + +                if (deflate_len != 0) { +                        CURR_VEC(ci).iov_len = deflate_len; + +                        ret = cdc_alloc_iobuf_and_init_vec (this, priv, ci, 0); +                        if (ret) { +                                ret = Z_MEM_ERROR; +                                break; +                        } + +                        /* Re-position Zlib output buffer */ +                        cdc_init_zlib_output_stream (priv, ci, 0); +                } + +                if (done) { +                        ci->ncount--; +                        break; +                } + +                ret = libz_func (&ci->stream, flush); + +                if (ret == Z_BUF_ERROR) { +                        ret = Z_OK; +                        ci->ncount--; +                        break; +                } + +                done = (ci->stream.avail_out != 0 || ret == Z_STREAM_END); + +                if (ret != Z_OK && ret != Z_STREAM_END) +                        break; +        } + +        return ret; +} + +static int32_t +do_cdc_compress (struct iovec *vec, xlator_t *this, cdc_priv_t *priv, +                 cdc_info_t *ci) +{ +        int ret = -1; + +        /* Initialize defalte */ +        ret = deflateInit2 (&ci->stream, priv->cdc_level, Z_DEFLATED, +                            priv->window_size, priv->mem_level, +                            Z_DEFAULT_STRATEGY); + +        if (ret) { +                gf_log (this->name, GF_LOG_ERROR, +                        "unable to init Zlib (retval: %d)", ret); +                goto out; +        } + +        ret = cdc_alloc_iobuf_and_init_vec (this, priv, ci, 0); +        if (ret) +                goto out; + +        /* setup output buffer */ +        cdc_init_zlib_output_stream (priv, ci, 0); + +        /* setup input buffer */ +        ci->stream.next_in  = (unsigned char *) vec->iov_base; +        ci->stream.avail_in = vec->iov_len; + +        ci->crc = crc32 (ci->crc, (const Bytef *) vec->iov_base, vec->iov_len); + +        gf_log (this->name, GF_LOG_DEBUG, "crc=%lu len=%d buffer_size=%d", +                ci->crc, ci->stream.avail_in, ci->buffer_size); + +        /* compress !! */ +        while (ci->stream.avail_in != 0) { +                if (ci->stream.avail_out == 0) { + +                        CURR_VEC(ci).iov_len = ci->buffer_size; + +                        ret = cdc_alloc_iobuf_and_init_vec (this, priv, ci, 0); +                        if (ret) +                                break; + +                        /* Re-position Zlib output buffer */ +                        cdc_init_zlib_output_stream (priv, ci, 0); +                } + +                ret = deflate (&ci->stream, Z_NO_FLUSH); +                if (ret != Z_OK) +                        break; +        } + + out: +        return ret; +} + +int32_t +cdc_compress (xlator_t *this, cdc_priv_t *priv, cdc_info_t *ci, +              dict_t **xdata) +{ +        int ret = -1; +        int i   = 0; + +        ci->iobref = iobref_new (); +        if (!ci->iobref) +                goto out; + +        if (!*xdata) { +                *xdata = dict_new (); +                if (!*xdata) { +                        gf_log (this->name, GF_LOG_ERROR, "Cannot allocate xdata" +                                " dict"); +                        goto out; +                } +        } + +        /* data */ +        for (i = 0; i < ci->count; i++) { +                ret = do_cdc_compress (&ci->vector[i], this, priv, ci); +                if (ret != Z_OK) +                        goto deflate_cleanup_out; +        } + +        /* flush zlib buffer */ +        ret = cdc_flush_libz_buffer (priv, this, ci, deflate, Z_FINISH); +        if (!(ret == Z_OK || ret == Z_STREAM_END)) { +                gf_log (this->name, GF_LOG_ERROR, +                        "Compression Error: ret (%d)", ret); +                ret = -1; +                goto deflate_cleanup_out; +        } + +        /* trailer */ +        ret = cdc_init_gzip_trailer (this, priv, ci); +        if (ret) +                goto deflate_cleanup_out; + +        gf_log (this->name, GF_LOG_DEBUG, +                "Compressed %ld to %ld bytes", +                ci->stream.total_in, ci->stream.total_out); + +        ci->nbytes = ci->stream.total_out + GF_CDC_VALIDATION_SIZE; + +        /* set deflated canary value for identification */ +        ret = dict_set_int32 (*xdata, GF_CDC_DEFLATE_CANARY_VAL, 1); +        if (ret) { +                /* Send uncompressed data if we can't _tell_ the client +                 * that deflated data is on it's way. So, we just log +                 * the faliure and continue as usual. +                 */ +                 gf_log (this->name, GF_LOG_ERROR, +                 "Data deflated, but could not set canary" +                 " value in dict for identification"); +        } + +        /* This is to be used in testing */ +        if ( priv->debug ) { +                cdc_dump_iovec_to_disk (this, ci, GF_CDC_DEBUG_DUMP_FILE ); +        } + + deflate_cleanup_out: +        (void) deflateEnd(&ci->stream); + + out: +        return ret; +} + + +/* deflate content is checked by the presence of a canary + * value in the dict as the key + */ +static int32_t +cdc_check_content_for_deflate (dict_t *xdata) +{ +        return dict_get (xdata, GF_CDC_DEFLATE_CANARY_VAL) ? -1 : 0; +} + +static unsigned long +cdc_extract_crc (char *trailer) +{ +        return cdc_get_long ((unsigned char *) &trailer[0]); +} + +static unsigned long +cdc_extract_size (char *trailer) +{ +        return cdc_get_long ((unsigned char *) &trailer[4]); +} + +static int32_t +cdc_validate_inflate (cdc_info_t *ci, unsigned long crc, +                      unsigned long len) +{ +        return !((crc == ci->crc) +                 /* inflated length is hidden inside +                  * Zlib stream struct */ +                 && (len == ci->stream.total_out)); +} + +static int32_t +do_cdc_decompress (xlator_t *this, cdc_priv_t *priv, cdc_info_t *ci) +{ +        int            ret          = -1; +        int            i            = 0; +        int            len          = 0; +        char          *inflte       = NULL; +        char          *trailer      = NULL; +        struct iovec   vec          = {0,}; +        unsigned long  computed_crc = 0; +        unsigned long  computed_len = 0; + +        ret = inflateInit2 (&ci->stream, priv->window_size); +        if (ret) { +                gf_log (this->name, GF_LOG_ERROR, +                        "Zlib: Unable to initialize inflate"); +                goto out; +        } + +        vec = THIS_VEC(ci, 0); + +        trailer = (char *) (((char *) vec.iov_base) + vec.iov_len +                - GF_CDC_VALIDATION_SIZE); + +        /* CRC of uncompressed data */ +        computed_crc = cdc_extract_crc (trailer); + +        /* size of uncomrpessed data */ +        computed_len = cdc_extract_size (trailer); + +        gf_log (this->name, GF_LOG_DEBUG, "crc=%lu len=%lu buffer_size=%d", +                computed_crc, computed_len, ci->buffer_size); + +        inflte = vec.iov_base ; +        len = vec.iov_len - GF_CDC_VALIDATION_SIZE; + +        /* allocate buffer of the original length of the data */ +        ret = cdc_alloc_iobuf_and_init_vec (this, priv, ci, 0); +        if (ret) +                goto out; + +        /* setup output buffer */ +        cdc_init_zlib_output_stream (priv, ci, 0); + +        /* setup input buffer */ +        ci->stream.next_in = (unsigned char *) inflte; +        ci->stream.avail_in = len; + +        while (ci->stream.avail_in != 0) { +                if (ci->stream.avail_out == 0) { +                        CURR_VEC(ci).iov_len = ci->buffer_size; + +                        ret = cdc_alloc_iobuf_and_init_vec (this, priv, ci, 0); +                        if (ret) +                                break; + +                        /* Re-position Zlib output buffer */ +                        cdc_init_zlib_output_stream (priv, ci, 0); +                } + +                ret = inflate (&ci->stream, Z_NO_FLUSH); +                if (ret == Z_STREAM_ERROR) +                        break; +        } + +        /* flush zlib buffer */ +        ret = cdc_flush_libz_buffer (priv, this, ci, inflate, Z_SYNC_FLUSH); +        if (!(ret == Z_OK || ret == Z_STREAM_END)) { +                gf_log (this->name, GF_LOG_ERROR, +                        "Decompression Error: ret (%d)", ret); +                ret = -1; +                goto out; +        } + +        /* compute CRC of the uncompresses data to check for +         * correctness */ + +        for (i = 0; i < ci->ncount; i++) { +                ci->crc = crc32 (ci->crc, +                                 (const Bytef *) ci->vec[i].iov_base, +                                 ci->vec[i].iov_len); +        } + +        /* validate inflated data */ +        ret = cdc_validate_inflate (ci, computed_crc, computed_len); +        if (ret) { +                gf_log (this->name, GF_LOG_ERROR, +                        "Checksum or length mismatched in inflated data"); +        } + + out: +        return ret; +} + +int32_t +cdc_decompress (xlator_t *this, cdc_priv_t *priv, cdc_info_t *ci, +                dict_t *xdata) +{ +        int32_t ret = -1; + +        /* check for deflate content */ +        if (!cdc_check_content_for_deflate (xdata)) { +                gf_log (this->name, GF_LOG_DEBUG, +                        "Content not deflated, passing through ..."); +                goto passthrough_out; +        } + +        ci->iobref = iobref_new (); +        if (!ci->iobref) +                goto passthrough_out; + +        /* do we need to do this? can we assume that one iovec +         * will hold per request data everytime? +         * +         * server/client protocol seems to deal with a single +         * iovec even if op_ret > 1M. So, it looks ok to +         * assume that a single iovec will contain all the +         * data (This saves us a lot from finding the trailer +         * and the data since it could have been split-up onto +         * two adjacent iovec's. +         * +         * But, in case this translator is loaded above quick-read +         * for some reason, then it's entirely possible that we get +         * multiple iovec's... +         * +         * This case (handled below) is not tested. (by loading the +         * xlator below quick-read) +         */ + +        /* @@ I_HOPE_THIS_IS_NEVER_HIT */ +        if (ci->count > 1) { +                gf_log (this->name, GF_LOG_WARNING, "unable to handle" +                        " multiple iovecs (%d in number)", ci->count); +                goto inflate_cleanup_out; +                /* TODO: coallate all iovecs in one */ +        } + +        ret = do_cdc_decompress (this, priv, ci); +        if (ret) +                goto inflate_cleanup_out; + +        ci->nbytes = ci->stream.total_out; + +        gf_log (this->name, GF_LOG_DEBUG, +                "Inflated %ld to %ld bytes", +                ci->stream.total_in, ci->stream.total_out); + + inflate_cleanup_out: +        (void) inflateEnd (&ci->stream); + + passthrough_out: +        return ret; +} + +#endif diff --git a/xlators/features/compress/src/cdc-mem-types.h b/xlators/features/compress/src/cdc-mem-types.h new file mode 100644 index 00000000000..efa00805987 --- /dev/null +++ b/xlators/features/compress/src/cdc-mem-types.h @@ -0,0 +1,22 @@ +/* +   Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com> +   This file is part of GlusterFS. + +   This file is licensed to you under your choice of the GNU Lesser +   General Public License, version 3 or any later version (LGPLv3 or +   later), or the GNU General Public License, version 2 (GPLv2), in all +   cases as published by the Free Software Foundation. +*/ + +#ifndef __CDC_MEM_TYPES_H +#define __CDC_MEM_TYPES_H + +#include "mem-types.h" + +enum gf_cdc_mem_types { +        gf_cdc_mt_priv_t         = gf_common_mt_end + 1, +        gf_cdc_mt_vec_t          = gf_common_mt_end + 2, +        gf_cdc_mt_gzip_trailer_t = gf_common_mt_end + 3, +}; + +#endif diff --git a/xlators/features/compress/src/cdc.c b/xlators/features/compress/src/cdc.c new file mode 100644 index 00000000000..eb7d87c5698 --- /dev/null +++ b/xlators/features/compress/src/cdc.c @@ -0,0 +1,342 @@ +/* +   Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com> +   This file is part of GlusterFS. + +   This file is licensed to you under your choice of the GNU Lesser +   General Public License, version 3 or any later version (LGPLv3 or +   later), or the GNU General Public License, version 2 (GPLv2), in all +   cases as published by the Free Software Foundation. +*/ + +#include <sys/uio.h> + +#ifndef _CONFIG_H +#define _CONFIG_H +#include "config.h" +#endif + +#include "xlator.h" +#include "defaults.h" +#include "logging.h" + +#include "cdc.h" +#include "cdc-mem-types.h" + +static void +cdc_cleanup_iobref (cdc_info_t *ci) +{ +        assert(ci->iobref != NULL); +        iobref_clear (ci->iobref); +} + +int32_t +cdc_readv_cbk (call_frame_t *frame, void *cookie, xlator_t *this, +               int32_t op_ret, int32_t op_errno, +               struct iovec *vector, int32_t count, +               struct iatt *stbuf, struct iobref *iobref, +               dict_t *xdata) +{ +        int         ret   = -1; +        cdc_priv_t *priv  = NULL; +        cdc_info_t  ci    = {0,}; + +        GF_VALIDATE_OR_GOTO ("cdc", this, default_out); +        GF_VALIDATE_OR_GOTO (this->name, frame, default_out); + +        priv = this->private; + +        if (op_ret <= 0) +                goto default_out; + +        if ( (priv->min_file_size != 0) +             && (op_ret < priv->min_file_size) ) +                goto default_out; + +        ci.count       = count; +        ci.ibytes      = op_ret; +        ci.vector      = vector; +        ci.buf         = NULL; +        ci.iobref      = NULL; +        ci.ncount      = 0; +        ci.crc         = 0; +        ci.buffer_size = GF_CDC_DEF_BUFFERSIZE; + +/* A readv compresses on the server side and decompresses on the client side + */ +        if (priv->op_mode == GF_CDC_MODE_SERVER) { +                ret = cdc_compress (this, priv, &ci, &xdata); +        } else if (priv->op_mode == GF_CDC_MODE_CLIENT) { +                ret = cdc_decompress (this, priv, &ci, xdata); +        } else { +                gf_log (this->name, GF_LOG_ERROR, +                        "Invalid operation mode (%d)", priv->op_mode); +        } + +        if (ret) +                goto default_out; + +        STACK_UNWIND_STRICT (readv, frame, ci.nbytes, op_errno, +                             ci.vec, ci.ncount, stbuf, iobref, +                             xdata); +        cdc_cleanup_iobref (&ci); +        return 0; + + default_out: +        STACK_UNWIND_STRICT (readv, frame, op_ret, op_errno, +                             vector, count, stbuf, iobref, xdata); +        return 0; +} + +int32_t +cdc_readv (call_frame_t *frame, xlator_t *this, +           fd_t *fd, size_t size, off_t offset, uint32_t flags, +           dict_t *xdata) +{ +        fop_readv_cbk_t cbk = NULL; + +#ifdef HAVE_LIB_Z +        cbk = cdc_readv_cbk; +#else +        cbk = default_readv_cbk; +#endif +        STACK_WIND (frame, cbk, FIRST_CHILD(this), +                    FIRST_CHILD(this)->fops->readv, +                    fd, size, offset, flags, xdata); +        return 0; +} + +int32_t +cdc_writev_cbk (call_frame_t *frame, +                void *cookie, +                xlator_t *this, +                int32_t op_ret, +                int32_t op_errno, +                struct iatt *prebuf, +                struct iatt *postbuf, dict_t *xdata) +{ + +	STACK_UNWIND_STRICT (writev, frame, op_ret, op_errno, prebuf, postbuf, xdata); +	return 0; +} + +int32_t +cdc_writev (call_frame_t *frame, +            xlator_t *this, +            fd_t *fd, +            struct iovec *vector, +            int32_t count, +            off_t offset, +            uint32_t flags, +            struct iobref *iobref, dict_t *xdata) +{ +	int	     ret   = -1; +	cdc_priv_t  *priv  = NULL; +	cdc_info_t   ci    = {0,}; +	size_t       isize = 0; + +	GF_VALIDATE_OR_GOTO ("cdc", this, default_out); +	GF_VALIDATE_OR_GOTO (this->name, frame, default_out); + +	priv = this->private; + +	isize =  iov_length(vector, count); + +	if (isize <= 0) +                goto default_out; + +        if ( (priv->min_file_size != 0) +             && (isize < priv->min_file_size) ) +                goto default_out; + +        ci.count       = count; +        ci.ibytes      = isize; +        ci.vector      = vector; +        ci.buf         = NULL; +        ci.iobref      = NULL; +        ci.ncount      = 0; +        ci.crc         = 0; +        ci.buffer_size = GF_CDC_DEF_BUFFERSIZE; + +/* A writev compresses on the client side and decompresses on the server side + */ +	if (priv->op_mode == GF_CDC_MODE_CLIENT) { +		ret = cdc_compress (this, priv, &ci, &xdata); +	} else if (priv->op_mode == GF_CDC_MODE_SERVER) { +		ret = cdc_decompress (this, priv, &ci, xdata); +	} else { +		gf_log (this->name, GF_LOG_ERROR, "Invalid operation mode (%d) ", priv->op_mode); +	} + +	if (ret) +		goto default_out; + +	STACK_WIND (frame, +                    cdc_writev_cbk, +                    FIRST_CHILD (this), +                    FIRST_CHILD (this)->fops->writev, +                    fd, ci.vec, ci.ncount, offset, flags, +                    iobref, xdata); + +        cdc_cleanup_iobref (&ci); +        return 0; + + default_out: +	STACK_WIND (frame, +                    cdc_writev_cbk, +                    FIRST_CHILD (this), +                    FIRST_CHILD (this)->fops->writev, +                    fd, vector, count, offset, flags, +                    iobref, xdata); +	return 0; +} + +int32_t +init (xlator_t *this) +{ +        int         ret      = -1; +        char       *temp_str = NULL; +        cdc_priv_t *priv     = NULL; + +        GF_VALIDATE_OR_GOTO ("cdc", this, err); + +        if (!this->children || this->children->next) { +                gf_log (this->name, GF_LOG_ERROR, +                        "Need subvolume == 1"); +                goto err; +        } + +        if (!this->parents) { +                gf_log (this->name, GF_LOG_WARNING, +                        "Dangling volume. Check volfile"); +        } + +        priv = GF_CALLOC (1, sizeof (*priv), gf_cdc_mt_priv_t); +        if (!priv) { +                goto err; +        } + +        /* Check if debug mode is turned on */ +        GF_OPTION_INIT ("debug", priv->debug, bool, err); +        if( priv->debug ) { +                gf_log (this->name, GF_LOG_DEBUG, "CDC debug option turned on"); +        } + +        /* Set Gzip Window Size */ +        GF_OPTION_INIT ("window-size", priv->window_size, int32, err); +        if ( (priv->window_size > GF_CDC_MAX_WINDOWSIZE) +             || (priv->window_size < GF_CDC_DEF_WINDOWSIZE) ) { +                gf_log (this->name, GF_LOG_WARNING, +                        "Invalid gzip window size (%d), using default", +                        priv->window_size); +                priv->window_size = GF_CDC_DEF_WINDOWSIZE; +        } + +        /* Set Gzip (De)Compression Level */ +        GF_OPTION_INIT ("compression-level", priv->cdc_level, int32, err); +        if ( ((priv->cdc_level < 1) || (priv->cdc_level > 9)) +             && (priv->cdc_level != GF_CDC_DEF_COMPRESSION) ) { +                gf_log (this->name, GF_LOG_WARNING, +                        "Invalid gzip (de)compression level (%d)," +                        " using default", priv->cdc_level); +                priv->cdc_level = GF_CDC_DEF_COMPRESSION; +        } + +        /* Set Gzip Memory Level */ +        GF_OPTION_INIT ("mem-level", priv->mem_level, int32, err); +        if ( (priv->mem_level < 1) || (priv->mem_level > 9) ) { +                gf_log (this->name, GF_LOG_WARNING, +                        "Invalid gzip memory level, using the default"); +                priv->mem_level = GF_CDC_DEF_MEMLEVEL; +        } + +        /* Set min file size to enable compression */ +        GF_OPTION_INIT ("min-size", priv->min_file_size, int32, err); + +        /* Mode of operation - Server/Client */ +        ret = dict_get_str (this->options, "mode", &temp_str); +        if (ret) { +                gf_log (this->name, GF_LOG_CRITICAL, +                        "Operation mode not specified !!"); +                goto err; +        } + +        if (GF_CDC_MODE_IS_CLIENT (temp_str)) { +                priv->op_mode = GF_CDC_MODE_CLIENT; +        } else if (GF_CDC_MODE_IS_SERVER (temp_str)) { +                priv->op_mode = GF_CDC_MODE_SERVER; +        } else { +                gf_log (this->name, GF_LOG_CRITICAL, +                        "Bogus operation mode (%s) specified", temp_str); +                goto err; +        } + +        this->private = priv; +        gf_log (this->name, GF_LOG_DEBUG, "CDC xlator loaded in (%s) mode",temp_str); +        return 0; + + err: +        if (priv) +                GF_FREE (priv); + +        return -1; +} + +void +fini (xlator_t *this) +{ +        cdc_priv_t *priv = this->private; + +        if (priv) +                GF_FREE (priv); +        this->private = NULL; +        return; +} + +struct xlator_fops fops = { +        .readv  = cdc_readv, +        .writev = cdc_writev, +}; + +struct xlator_cbks cbks = { +}; + +struct volume_options options[] = { +        { .key  = {"window-size"}, +          .default_value = "-15", +          .type = GF_OPTION_TYPE_INT, +          .description = "Size of the zlib history buffer." +        }, +        { .key  = {"mem-level"}, +          .default_value = "8", +          .type = GF_OPTION_TYPE_INT, +          .description = "Memory allocated for internal compression state.\ +                          1 uses minimum memory but is slow and reduces \ +                          compression ratio; memLevel=9 uses maximum memory \ +                          for optimal speed. The default value is 8." +        }, +        { .key  = {"compression-level"}, +          .default_value = "-1", +          .type = GF_OPTION_TYPE_INT, +          .description = "Compression levels \ +                          0 : no compression, 1 : best speed, \ +                          9 : best compression, -1 : default compression " +        }, +        { .key  = {"min-size"}, +          .default_value = "0", +          .type = GF_OPTION_TYPE_INT, +          .description = "Data is compressed only when its size exceeds this." +        }, +        { .key  = {"mode"}, +          .value = {"server", "client"}, +          .type = GF_OPTION_TYPE_STR, +          .description = "Set on the basis of where the xlator is loaded." +        }, +        { .key = {"debug"}, +          .default_value = "false", +          .type = GF_OPTION_TYPE_BOOL, +          .description = "This is used in testing. Will dump compressed data \ +                          to disk as a gzip file." +        }, +        { .key  = {NULL} +        }, +}; diff --git a/xlators/features/compress/src/cdc.h b/xlators/features/compress/src/cdc.h new file mode 100644 index 00000000000..71f4d2317bb --- /dev/null +++ b/xlators/features/compress/src/cdc.h @@ -0,0 +1,107 @@ +/* +   Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com> +   This file is part of GlusterFS. + +   This file is licensed to you under your choice of the GNU Lesser +   General Public License, version 3 or any later version (LGPLv3 or +   later), or the GNU General Public License, version 2 (GPLv2), in all +   cases as published by the Free Software Foundation. +*/ + +#ifndef __CDC_H +#define __CDC_H + +#ifdef HAVE_LIB_Z +#include "zlib.h" +#endif + +#include "xlator.h" + +#ifndef MAX_IOVEC +#define MAX_IOVEC 16 +#endif + +typedef struct cdc_priv { +        int window_size; +        int mem_level; +        int cdc_level; +        int min_file_size; +        int op_mode; +        gf_boolean_t debug; +        gf_lock_t lock; +} cdc_priv_t; + +typedef struct cdc_info { +        /* input bits */ +        int            count; +        int32_t        ibytes; +        struct iovec  *vector; +        struct iatt   *buf; + +        /* output bits */ +        int            ncount; +        int            nbytes; +        int            buffer_size; +        struct iovec   vec[MAX_IOVEC]; +        struct iobref *iobref; + +        /* zlib bits */ +#ifdef HAVE_LIB_Z +        z_stream      stream; +#endif +        unsigned long crc; +} cdc_info_t; + +#define NVEC(ci) (ci->ncount - 1) +#define CURR_VEC(ci) ci->vec[ci->ncount - 1] +#define THIS_VEC(ci, i) ci->vector[i] + +/* Gzip defaults */ +#define GF_CDC_DEF_WINDOWSIZE  -15 /* default value */ +#define GF_CDC_MAX_WINDOWSIZE  -8  /* max value     */ + +#ifdef HAVE_LIB_Z +#define GF_CDC_DEF_COMPRESSION Z_DEFAULT_COMPRESSION +#else +#define GF_CDC_DEF_COMPRESSION -1 +#endif + +#define GF_CDC_DEF_MEMLEVEL    8 +#define GF_CDC_DEF_BUFFERSIZE  262144 // 256K - default compression buffer size + +/* Operation mode + * If xlator is loaded on client, readv decompresses and writev compresses + * If xlator is loaded on server, readv compresses and writev decompresses + */ +#define GF_CDC_MODE_CLIENT   0 +#define GF_CDC_MODE_SERVER   1 + +/* min size of data to do cmpression + * 0 == compress even 1byte + */ +#define GF_CDC_MIN_CHUNK_SIZE 0 + +#define GF_CDC_VALIDATION_SIZE 8 + +#define GF_CDC_OS_ID 0xFF +#define GF_CDC_DEFLATE_CANARY_VAL "deflate" +#define GF_CDC_DEBUG_DUMP_FILE "/tmp/cdcdump.gz" + +#define GF_CDC_MODE_IS_CLIENT(m) \ +        (strcmp (m, "client") == 0) + +#define GF_CDC_MODE_IS_SERVER(m) \ +        (strcmp (m, "server") == 0) + +int32_t +cdc_compress (xlator_t *this, +              cdc_priv_t *priv, +              cdc_info_t *ci, +              dict_t **xdata); +int32_t +cdc_decompress (xlator_t *this, +                cdc_priv_t *priv, +                cdc_info_t *ci, +                dict_t *xdata); + +#endif diff --git a/xlators/mgmt/glusterd/src/glusterd-volgen.c b/xlators/mgmt/glusterd/src/glusterd-volgen.c index fb91c3de457..33147105a80 100644 --- a/xlators/mgmt/glusterd/src/glusterd-volgen.c +++ b/xlators/mgmt/glusterd/src/glusterd-volgen.c @@ -1633,6 +1633,18 @@ server_graph_builder (volgen_graph_t *graph, glusterd_volinfo_t *volinfo,                  }          } +        /* Check for compress volume option, and add it to the graph on server side */ +        if (dict_get_str_boolean (set_dict, "features.compress", 0)) { +                xl = volgen_graph_add (graph, "features/cdc", volname); +                if (!xl) { +                        ret = -1; +                        goto out; +                } +                ret = dict_set_str (set_dict, "compress.mode", "server"); +                if (ret) +                        goto out; +        } +          xl = volgen_graph_add_as (graph, "debug/io-stats", path);          if (!xl)                  return -1; @@ -2462,6 +2474,19 @@ client_graph_builder (volgen_graph_t *graph, glusterd_volinfo_t *volinfo,          if (ret == -1)                  goto out; +        /* Check for compress volume option, and add it to the graph on client side */ +        if (dict_get_str_boolean (set_dict, "features.compress", 0)) { +                xl = volgen_graph_add (graph, "features/cdc", volname); +                if (!xl) { +                        ret = -1; +                        goto out; +                } +                ret = dict_set_str (set_dict, "compress.mode", "client"); +                if (ret) +                        goto out; + +        } +          ret = glusterd_volinfo_get_boolean (volinfo, VKEY_FEATURES_QUOTA);          if (ret == -1)                  goto out; diff --git a/xlators/mgmt/glusterd/src/glusterd-volume-set.c b/xlators/mgmt/glusterd/src/glusterd-volume-set.c index adf0f2922ee..ae4a464adbd 100644 --- a/xlators/mgmt/glusterd/src/glusterd-volume-set.c +++ b/xlators/mgmt/glusterd/src/glusterd-volume-set.c @@ -970,6 +970,50 @@ struct volopt_map_entry glusterd_volopt_map[] = {            .flags       = OPT_FLAG_CLIENT_OPT | OPT_FLAG_XLATOR_OPT          }, +#ifdef HAVE_LIB_Z +        /* Compressor-decompressor xlator options +         * defaults used from xlator/feature/compress/src/cdc.h +         */ +        { .key         = "features.compress", +          .voltype     = "features/cdc", +          .option      = "!compress", +          .value       = "off", +          .type        = NO_DOC, +          .op_version  = 2, +          .description = "enable/disable compression translator" +        }, +        { .key         = "compress.mode", +          .voltype     = "features/cdc", +          .type        = NO_DOC, +          .op_version  = 2 +        }, +        { .key         = "compress.window-size", +          .voltype     = "features/cdc", +          .type        = NO_DOC, +          .op_version  = 2 +        }, +        { .key         = "compress.mem-level", +          .voltype     = "features/cdc", +          .type        = NO_DOC, +          .op_version  = 2 +        }, +        { .key         = "compress.min-size", +          .voltype     = "features/cdc", +          .type        = NO_DOC, +          .op_version  = 2 +        }, +        { .key         = "compress.compression-level", +          .voltype     = "features/cdc", +          .type        = NO_DOC, +          .op_version  = 2 +        }, +        { .key         = "compress.debug", +          .voltype     = "features/cdc", +          .type        = NO_DOC, +          .op_version  = 2 +        }, + #endif +          /* Quota xlator options */          { .key        = VKEY_FEATURES_LIMIT_USAGE,            .voltype    = "features/quota",  | 
