diff options
Diffstat (limited to 'rpc/rpc-lib')
| -rw-r--r-- | rpc/rpc-lib/src/Makefile.am | 8 | ||||
| -rw-r--r-- | rpc/rpc-lib/src/rpc-drc.c | 811 | ||||
| -rw-r--r-- | rpc/rpc-lib/src/rpc-drc.h | 100 | ||||
| -rw-r--r-- | rpc/rpc-lib/src/rpc-transport.h | 1 | ||||
| -rw-r--r-- | rpc/rpc-lib/src/rpcsvc-common.h | 48 | ||||
| -rw-r--r-- | rpc/rpc-lib/src/rpcsvc.c | 91 | ||||
| -rw-r--r-- | rpc/rpc-lib/src/rpcsvc.h | 19 | 
7 files changed, 1053 insertions, 25 deletions
diff --git a/rpc/rpc-lib/src/Makefile.am b/rpc/rpc-lib/src/Makefile.am index ca62a27f964..f19c3c8a431 100644 --- a/rpc/rpc-lib/src/Makefile.am +++ b/rpc/rpc-lib/src/Makefile.am @@ -1,16 +1,18 @@  lib_LTLIBRARIES = libgfrpc.la  libgfrpc_la_SOURCES = auth-unix.c rpcsvc-auth.c rpcsvc.c auth-null.c \ -	rpc-transport.c xdr-rpc.c xdr-rpcclnt.c rpc-clnt.c auth-glusterfs.c +	rpc-transport.c xdr-rpc.c xdr-rpcclnt.c rpc-clnt.c auth-glusterfs.c \ +	rpc-drc.c  libgfrpc_la_LIBADD = $(top_builddir)/libglusterfs/src/libglusterfs.la  noinst_HEADERS = rpcsvc.h rpc-transport.h xdr-common.h xdr-rpc.h xdr-rpcclnt.h \ -	rpc-clnt.h rpcsvc-common.h protocol-common.h +	rpc-clnt.h rpcsvc-common.h protocol-common.h rpc-drc.h  AM_CPPFLAGS = $(GF_CPPFLAGS) -I$(top_srcdir)/libglusterfs/src \  	-I$(top_srcdir)/rpc/xdr/src \ -	-DRPC_TRANSPORTDIR=\"$(libdir)/glusterfs/$(PACKAGE_VERSION)/rpc-transport\" +	-DRPC_TRANSPORTDIR=\"$(libdir)/glusterfs/$(PACKAGE_VERSION)/rpc-transport\" \ +	-I$(top_srcdir)/contrib/rbtree  AM_CFLAGS = -Wall $(GF_CFLAGS) diff --git a/rpc/rpc-lib/src/rpc-drc.c b/rpc/rpc-lib/src/rpc-drc.c new file mode 100644 index 00000000000..66d07cfe6bd --- /dev/null +++ b/rpc/rpc-lib/src/rpc-drc.c @@ -0,0 +1,811 @@ +/* +  Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com> +  This file is part of GlusterFS. + +  This file is licensed to you under your choice of the GNU Lesser +  General Public License, version 3 or any later version (LGPLv3 or +  later), or the GNU General Public License, version 2 (GPLv2), in all +  cases as published by the Free Software Foundation. +*/ + +#ifndef _CONFIG_H +#define _CONFIG_H +#include "config.h" +#endif + +#include "rpcsvc.h" +#ifndef RPC_DRC_H +#include "rpc-drc.h" +#endif +#include "locking.h" +#include "hashfn.h" +#include "common-utils.h" +#include "statedump.h" +#include "mem-pool.h" + +#include <netinet/in.h> +#include <unistd.h> + +/** + * rpcsvc_drc_op_destroy - Destroys the cached reply + * + * @param drc - the main drc structure + * @param reply - the cached reply to destroy + * @return NULL if reply is destroyed, reply otherwise + */ +static drc_cached_op_t * +rpcsvc_drc_op_destroy (rpcsvc_drc_globals_t *drc, drc_cached_op_t *reply) +{ +        GF_ASSERT (drc); +        GF_ASSERT (reply); + +        if (reply->state == DRC_OP_IN_TRANSIT) +                return reply; + +        iobref_unref (reply->msg.iobref); +        if (reply->msg.rpchdr) +                GF_FREE (reply->msg.rpchdr); +        if (reply->msg.proghdr) +                GF_FREE (reply->msg.proghdr); +        if (reply->msg.progpayload) +                GF_FREE (reply->msg.progpayload); + +        list_del (&reply->global_list); +        reply->client->op_count--; +        drc->op_count--; +        mem_put (reply); +        reply = NULL; + +        return reply; +} + +/** + * rpcsvc_drc_op_rb_unref - This function is used in rb tree cleanup only + * + * @param reply - the cached reply to unref + * @param drc - the main drc structure + * @return void + */ +static void +rpcsvc_drc_rb_op_destroy (void *reply, void *drc) +{ +        rpcsvc_drc_op_destroy (drc, (drc_cached_op_t *)reply); +} + +/** + * rpcsvc_remove_drc_client - Cleanup the drc client + * + * @param client - the drc client to be removed + * @return void + */ +static void +rpcsvc_remove_drc_client (drc_client_t *client) +{ +        rb_destroy (client->rbtree, rpcsvc_drc_rb_op_destroy); +        list_del (&client->client_list); +        GF_FREE (client); +} + +/** + * rpcsvc_client_lookup - Given a sockaddr_storage, find the client if it exists + * + * @param drc - the main drc structure + * @param sockaddr - the network address of the client to be looked up + * @return drc client if it exists, NULL otherwise + */ +static drc_client_t * +rpcsvc_client_lookup (rpcsvc_drc_globals_t *drc, +                      struct sockaddr_storage *sockaddr) +{ +        drc_client_t    *client = NULL; + +        GF_ASSERT (drc); +        GF_ASSERT (sockaddr); + +        if (list_empty (&drc->clients_head)) +            return NULL; + +        list_for_each_entry (client, &drc->clients_head, client_list) { +                if (gf_sock_union_equal_addr (&client->sock_union, +                                              (union gf_sock_union *)sockaddr)) +                        return client; +        } + +        return NULL; +} + +/** + * drc_compare_reqs - Used by rbtree to determine if incoming req matches with + *                    an existing node(cached reply) in rbtree + * + * @param item - pointer to the incoming req + * @param rb_node_data - pointer to an rbtree node (cached reply) + * @param param - drc pointer - unused here, but used in *op_destroy + * @return 0 if req matches reply, else (req->xid - reply->xid) + */ +int +drc_compare_reqs (const void *item, const void *rb_node_data, void *param) +{ +        int               ret      = -1; +        rpcsvc_request_t *req      = NULL; +        drc_cached_op_t  *reply    = NULL; + +        GF_ASSERT (item); +        GF_ASSERT (rb_node_data); +        GF_ASSERT (param); + +        req = (rpcsvc_request_t *)item; +        reply = (drc_cached_op_t *)rb_node_data; + +        ret = req->xid - reply->xid; +        if (ret != 0) +                return ret; + +        if (req->prognum == reply->prognum && +            req->procnum == reply->procnum && +            req->progver == reply->progversion) +                return 0; + +        return 1; +} + +/** + * drc_rb_calloc - used by rbtree api to allocate memory for nodes + * + * @param allocator - the libavl_allocator structure used by rbtree + * @param size - not needed by this function + * @return pointer to new cached reply (node in rbtree) + */ +static void * +drc_rb_calloc (struct libavl_allocator *allocator, size_t size) +{ +        rpcsvc_drc_globals_t *drc = NULL; + +        /* get the drc pointer by simple typecast, since allocator +         * is the first member of rpcsvc_drc_globals_t +         */ +        drc = (rpcsvc_drc_globals_t *)allocator; + +        return mem_get (drc->mempool); +} + +/** + * drc_rb_free - used by rbtree api to free a node + * + * @param a - the libavl_allocator structure used by rbtree api + * @param block - node that needs to be freed + * @return void + */ +static void +drc_rb_free (struct libavl_allocator *a, void *block) +{ +        mem_put (block); +} + +/** + * drc_init_client_cache - initialize a drc client and its rb tree + * + * @param drc - the main drc structure + * @param client - the drc client to be initialized + * @return 0 on success, -1 on failure + */ +static int +drc_init_client_cache (rpcsvc_drc_globals_t *drc, drc_client_t *client) +{ +        GF_ASSERT (drc); +        GF_ASSERT (client); + +        drc->allocator.libavl_malloc = drc_rb_calloc; +        drc->allocator.libavl_free = drc_rb_free; + +        client->rbtree = rb_create (drc_compare_reqs, drc, +                                    (struct libavl_allocator *)drc); +        if (!client->rbtree) { +                gf_log (GF_RPCSVC, GF_LOG_DEBUG, "rb tree creation failed"); +                return -1; +        } + +        return 0; +} + +/** + * rpcsvc_get_drc_client - find the drc client with given sockaddr, else + *                         allocate and initialize a new drc client + * + * @param drc - the main drc structure + * @param sockaddr - network address of client + * @return drc client on success, NULL on failure + */ +static drc_client_t * +rpcsvc_get_drc_client (rpcsvc_drc_globals_t *drc, +                       struct sockaddr_storage *sockaddr) +{ +        drc_client_t      *client      = NULL; + +        GF_ASSERT (drc); +        GF_ASSERT (sockaddr); + +        client = rpcsvc_client_lookup (drc, sockaddr); +        if (client) +                goto out; + +        /* if lookup fails, allocate cache for the new client */ +        client = GF_CALLOC (1, sizeof (drc_client_t), +                            gf_common_mt_drc_client_t); +        if (!client) +                goto out; + +        client->ref = 0; +        client->sock_union = (union gf_sock_union)*sockaddr; +        client->op_count = 0; + +        if (drc_init_client_cache (drc, client)) { +                gf_log (GF_RPCSVC, GF_LOG_DEBUG, +                        "initialization of drc client failed"); +                GF_FREE (client); +                client = NULL; +                goto out; +        } +        drc->client_count++; + +        list_add (&client->client_list, &drc->clients_head); + + out: +        return client; +} + +/** + * rpcsvc_need_drc - Determine if a request needs DRC service + * + * @param req - incoming request + * @return 1 if DRC is needed for req, 0 otherwise + */ +int +rpcsvc_need_drc (rpcsvc_request_t *req) +{ +        rpcsvc_actor_t           *actor = NULL; +        rpcsvc_drc_globals_t     *drc   = NULL; + +        GF_ASSERT (req); +        GF_ASSERT (req->svc); + +        drc = req->svc->drc; + +        if (!drc || drc->status == DRC_UNINITIATED) +                return 0; + +        actor = rpcsvc_program_actor (req); +        if (!actor) +                return 0; + +        return (actor->op_type == DRC_NON_IDEMPOTENT +                && drc->type != DRC_TYPE_NONE); +} + +/** + * rpcsvc_drc_client_ref - ref the drc client + * + * @param client - the drc client to ref + * @return client + */ +static drc_client_t * +rpcsvc_drc_client_ref (drc_client_t *client) +{ +        GF_ASSERT (client); +        client->ref++; +        return client; +} + +/** + * rpcsvc_drc_client_unref - unref the drc client, and destroy + *                           the client on last unref + * + * @param drc - the main drc structure + * @param client - the drc client to unref + * @return NULL if it is the last unref, client otherwise + */ +static drc_client_t * +rpcsvc_drc_client_unref (rpcsvc_drc_globals_t *drc, drc_client_t *client) +{ +        GF_ASSERT (drc); +        GF_ASSERT (client->ref); + +        client->ref--; +        if (!client->ref) { +                drc->client_count--; +                rpcsvc_remove_drc_client (client); +                client = NULL; +        } + +        return client; +} + +/** + * rpcsvc_drc_lookup - lookup a request to see if it is already cached + * + * @param req - incoming request + * @return cached reply of req if found, NULL otherwise + */ +drc_cached_op_t * +rpcsvc_drc_lookup (rpcsvc_request_t *req) +{ +        drc_client_t           *client = NULL; +        drc_cached_op_t        *reply  = NULL; + +        GF_ASSERT (req); + +        if (!req->trans->drc_client) { +                client = rpcsvc_get_drc_client (req->svc->drc, +                                                &req->trans->peerinfo.sockaddr); +                if (!client) +                        goto out; +                req->trans->drc_client = client; +        } + +        client = rpcsvc_drc_client_ref (req->trans->drc_client); + +        if (client->op_count == 0) +                goto out; + +        reply = rb_find (client->rbtree, req); + + out: +        if (client) +                rpcsvc_drc_client_unref (req->svc->drc, client); + +        return reply; +} + +/** + * rpcsvc_send_cached_reply - send the cached reply for the incoming request + * + * @param req - incoming request (which is a duplicate in this case) + * @param reply - the cached reply for req + * @return 0 on successful reply submission, -1 or other non-zero value otherwise + */ +int +rpcsvc_send_cached_reply (rpcsvc_request_t *req, drc_cached_op_t *reply) +{ +        int     ret = 0; + +        GF_ASSERT (req); +        GF_ASSERT (reply); + +        gf_log (GF_RPCSVC, GF_LOG_DEBUG, "sending cached reply: xid: %d, " +                "client: %s", req->xid, req->trans->peerinfo.identifier); + +        rpcsvc_drc_client_ref (reply->client); +        ret = rpcsvc_transport_submit (req->trans, +                     reply->msg.rpchdr, reply->msg.rpchdrcount, +                     reply->msg.proghdr, reply->msg.proghdrcount, +                     reply->msg.progpayload, reply->msg.progpayloadcount, +                     reply->msg.iobref, req->trans_private); +        rpcsvc_drc_client_unref (req->svc->drc, reply->client); + +        return ret; +} + +/** + * rpcsvc_cache_reply - cache the reply for the processed request 'req' + * + * @param req - processed request + * @param iobref - iobref structure of the reply + * @param rpchdr - rpc header of the reply + * @param rpchdrcount - size of rpchdr + * @param proghdr - program header of the reply + * @param proghdrcount - size of proghdr + * @param payload - payload of the reply if any + * @param payloadcount - size of payload + * @return 0 on success, -1 on failure + */ +int +rpcsvc_cache_reply (rpcsvc_request_t *req, struct iobref *iobref, +                    struct iovec *rpchdr, int rpchdrcount, +                    struct iovec *proghdr, int proghdrcount, +                    struct iovec *payload, int payloadcount) +{ +        int                       ret              = -1; +        drc_cached_op_t          *reply            = NULL; + +        GF_ASSERT (req); +        GF_ASSERT (req->reply); + +        reply = req->reply; + +        reply->state = DRC_OP_CACHED; + +        reply->msg.iobref = iobref_ref (iobref); + +        reply->msg.rpchdrcount = rpchdrcount; +        reply->msg.rpchdr = iov_dup (rpchdr, rpchdrcount); + +        reply->msg.proghdrcount = proghdrcount; +        reply->msg.proghdr = iov_dup (proghdr, proghdrcount); + +        reply->msg.progpayloadcount = payloadcount; +        if (payloadcount) +                reply->msg.progpayload = iov_dup (payload, payloadcount); + +        //        rpcsvc_drc_client_unref (req->svc->drc, req->trans->drc_client); +        //        rpcsvc_drc_op_unref (req->svc->drc, reply); +        ret = 0; + +        return ret; +} + +/** + * rpcsvc_vacate_drc_entries - free up some percentage of drc cache + *                             based on the lru factor + * + * @param drc - the main drc structure + * @return void + */ +static void +rpcsvc_vacate_drc_entries (rpcsvc_drc_globals_t *drc) +{ +        uint32_t            i           = 0; +        uint32_t            n           = 0; +        drc_cached_op_t    *reply       = NULL; +        drc_cached_op_t    *tmp         = NULL; +        drc_client_t       *client      = NULL; + +        GF_ASSERT (drc); + +        n = drc->global_cache_size / drc->lru_factor; + +        list_for_each_entry_safe_reverse (reply, tmp, &drc->cache_head, global_list) { +                /* Don't delete ops that are in transit */ +                if (reply->state == DRC_OP_IN_TRANSIT) +                        continue; + +                client = reply->client; + +                (void *)rb_delete (client->rbtree, reply); + +                rpcsvc_drc_op_destroy (drc, reply); +                rpcsvc_drc_client_unref (drc, client); +                i++; +                if (i >= n) +                        break; +        } +} + +/** + * rpcsvc_add_op_to_cache - insert the cached op into the client rbtree and drc list + * + * @param drc - the main drc structure + * @param reply - the op to be inserted + * @return 0 on success, -1 on failure + */ +static int +rpcsvc_add_op_to_cache (rpcsvc_drc_globals_t *drc, drc_cached_op_t *reply) +{ +        drc_client_t        *client         = NULL; +        drc_cached_op_t    **tmp_reply      = NULL; + +        GF_ASSERT (drc); +        GF_ASSERT (reply); + +        client = reply->client; + +        /* cache is full, free up some space */ +        if (drc->op_count >= drc->global_cache_size) +                rpcsvc_vacate_drc_entries (drc); + +        tmp_reply = (drc_cached_op_t **)rb_probe (client->rbtree, reply); +        if (*tmp_reply != reply) { +                /* should never happen */ +                gf_log (GF_RPCSVC, GF_LOG_ERROR, +                        "DRC failed to detect duplicates"); +                return -1; +        } else if (*tmp_reply == NULL) { +                /* mem alloc failed */ +                return -1; +        } + +        client->op_count++; +        list_add (&reply->global_list, &drc->cache_head); +        drc->op_count++; + +        return 0; +} + +/** + * rpcsvc_cache_request - cache the in-transition incoming request + * + * @param req - incoming request + * @return 0 on success, -1 on failure + */ +int +rpcsvc_cache_request (rpcsvc_request_t *req) +{ +        int                        ret            = -1; +        drc_client_t              *client         = NULL; +        drc_cached_op_t           *reply          = NULL; +        rpcsvc_drc_globals_t      *drc            = NULL; + +        GF_ASSERT (req); + +        drc = req->svc->drc; + +        client = req->trans->drc_client; +        if (!client) { +                gf_log (GF_RPCSVC, GF_LOG_DEBUG, "drc client is NULL"); +                goto out; +        } + +        reply = mem_get (drc->mempool); +        if (!reply) +                goto out; + +        reply->client = rpcsvc_drc_client_ref (client); +        reply->xid = req->xid; +        reply->prognum = req->prognum; +        reply->progversion = req->progver; +        reply->procnum = req->procnum; +        reply->state = DRC_OP_IN_TRANSIT; +        req->reply = reply; + +        ret = rpcsvc_add_op_to_cache (drc, reply); +        if (ret) { +                req->reply = NULL; +                rpcsvc_drc_op_destroy (drc, reply); +                rpcsvc_drc_client_unref (drc, client); +                gf_log (GF_RPCSVC, GF_LOG_DEBUG, "Failed to add op to drc cache"); +        } + + out: +        return ret; +} + +/** + * + * rpcsvc_drc_priv - function which dumps the drc state + * + * @param drc - the main drc structure + * @return 0 on success, -1 on failure + */ +int32_t +rpcsvc_drc_priv (rpcsvc_drc_globals_t *drc) +{ +        int                      i                         = 0; +        char                     key[GF_DUMP_MAX_BUF_LEN]  = {0}; +        drc_client_t            *client                    = NULL; +        char                     ip[INET6_ADDRSTRLEN]      = {0}; + +        if (!drc || drc->status == DRC_UNINITIATED) { +                gf_log (GF_RPCSVC, GF_LOG_DEBUG, "DRC is " +                        "uninitialized, not dumping its state"); +                return 0; +        } + +        gf_proc_dump_add_section("rpc.drc"); + +        if (TRY_LOCK (&drc->lock)) +                return -1; + +        gf_proc_dump_build_key (key, "drc", "type"); +        gf_proc_dump_write (key, "%d", drc->type); + +        gf_proc_dump_build_key (key, "drc", "client_count"); +        gf_proc_dump_write (key, "%d", drc->client_count); + +        gf_proc_dump_build_key (key, "drc", "current_cache_size"); +        gf_proc_dump_write (key, "%d", drc->op_count); + +        gf_proc_dump_build_key (key, "drc", "max_cache_size"); +        gf_proc_dump_write (key, "%d", drc->global_cache_size); + +        gf_proc_dump_build_key (key, "drc", "lru_factor"); +        gf_proc_dump_write (key, "%d", drc->lru_factor); + +        gf_proc_dump_build_key (key, "drc", "duplicate_request_count"); +        gf_proc_dump_write (key, "%d", drc->cache_hits); + +        gf_proc_dump_build_key (key, "drc", "in_transit_duplicate_requests"); +        gf_proc_dump_write (key, "%d", drc->intransit_hits); + +        list_for_each_entry (client, &drc->clients_head, client_list) { +                gf_proc_dump_build_key (key, "client", "%d.ip-address", i); +                memset (ip, 0, INET6_ADDRSTRLEN); +                switch (client->sock_union.storage.ss_family) { +                case AF_INET: +                        gf_proc_dump_write (key, "%s", inet_ntop (AF_INET, +                                &client->sock_union.sin.sin_addr.s_addr, +                                ip, INET_ADDRSTRLEN)); +                        break; +                case AF_INET6: +                        gf_proc_dump_write (key, "%s", inet_ntop (AF_INET6, +                                &client->sock_union.sin6.sin6_addr, +                                ip, INET6_ADDRSTRLEN)); +                        break; +                default: +                        gf_proc_dump_write (key, "%s", "N/A"); +                } + +                gf_proc_dump_build_key (key, "client", "%d.ref_count", i); +                gf_proc_dump_write (key, "%d", client->ref); +                gf_proc_dump_build_key (key, "client", "%d.op_count", i); +                gf_proc_dump_write (key, "%d", client->op_count); +                i++; +        } + +        UNLOCK (&drc->lock); +        return 0; +} + +/** + * rpcsvc_drc_notify - function which is notified of RPC transport events + * + * @param svc - pointer to rpcsvc_t structure of the rpc + * @param xl - pointer to the xlator + * @param event - the event which triggered this notify + * @param data - the transport structure + * @return 0 on success, -1 on failure + */ +int +rpcsvc_drc_notify (rpcsvc_t *svc, void *xl, +                   rpcsvc_event_t event, void *data) +{ +        int                       ret          = -1; +        rpc_transport_t          *trans        = NULL; +        drc_client_t             *client       = NULL; +        rpcsvc_drc_globals_t     *drc          = NULL; + +        GF_ASSERT (svc); +        GF_ASSERT (svc->drc); +        GF_ASSERT (data); + +        drc = svc->drc; + +        if (drc->status == DRC_UNINITIATED || +            drc->type == DRC_TYPE_NONE) +                return 0; + +        LOCK (&drc->lock); + +        trans = (rpc_transport_t *)data; +        client = rpcsvc_get_drc_client (drc, &trans->peerinfo.sockaddr); +        if (!client) +                goto out; + +        switch (event) { +        case RPCSVC_EVENT_ACCEPT: +                trans->drc_client = rpcsvc_drc_client_ref (client); +                ret = 0; +                break; + +        case RPCSVC_EVENT_DISCONNECT: +                ret = 0; +                if (list_empty (&drc->clients_head)) +                        break; +                /* should be the last unref */ +                rpcsvc_drc_client_unref (drc, client); +                trans->drc_client = NULL; +                break; + +        default: +                break; +        } + + out: +        UNLOCK (&drc->lock); +        return ret; +} + +/** + * rpcsvc_drc_init - Initialize the duplicate request cache service + * + * @param svc - pointer to rpcsvc_t structure of the rpc + * @param options - the options dictionary which configures drc + * @return 0 on success, non-zero integer on failure + */ +int +rpcsvc_drc_init (rpcsvc_t *svc, dict_t *options) +{ +        int                         ret            = 0; +        uint32_t                    drc_type       = 0; +        uint32_t                    drc_size       = 0; +        uint32_t                    drc_factor     = 0; +        rpcsvc_drc_globals_t       *drc            = NULL; + +        GF_ASSERT (svc); +        GF_ASSERT (options); + +        if (!svc->drc) { +                drc = GF_CALLOC (1, sizeof (rpcsvc_drc_globals_t), +                                 gf_common_mt_drc_globals_t); +                if (!drc) +                        return -1; + +                svc->drc = drc; +                LOCK_INIT (&drc->lock); +        } else { +                drc = svc->drc; +        } + +        LOCK (&drc->lock); +        if (drc->type != DRC_TYPE_NONE) { +                ret = 0; +                goto out; +        } + +        /* Toggle DRC on/off, when more drc types(persistent/cluster) +           are added, we shouldn't treat this as boolean */ +        ret = dict_get_str_boolean (options, "nfs.drc", _gf_false); +        if (ret == -1) { +                gf_log (GF_RPCSVC, GF_LOG_INFO, "drc user options need second look"); +                ret = _gf_true; +        } + +        if (ret == _gf_false) { +                /* drc off */ +                gf_log (GF_RPCSVC, GF_LOG_DEBUG, "DRC is off"); +                ret = 0; +                goto out; +        } + +        /* Specify type of DRC to be used */ +        ret = dict_get_uint32 (options, "nfs.drc-type", &drc_type); +        if (ret) { +                gf_log (GF_RPCSVC, GF_LOG_DEBUG, "drc type not set." +                        " Continuing with default"); +                drc_type = DRC_DEFAULT_TYPE; +        } + +        drc->type = drc_type; + +        /* Set the global cache size (no. of ops to cache) */ +        ret = dict_get_uint32 (options, "nfs.drc-size", &drc_size); +        if (ret) { +                gf_log (GF_RPCSVC, GF_LOG_DEBUG, "drc size not set." +                        " Continuing with default size"); +                drc_size = DRC_DEFAULT_CACHE_SIZE; +        } + +        drc->global_cache_size = drc_size; + +        /* Mempool for cached ops */ +        drc->mempool = mem_pool_new (drc_cached_op_t, drc->global_cache_size); +        if (!drc->mempool) { +                gf_log (GF_RPCSVC, GF_LOG_ERROR, "Failed to get mempool for" +                        " DRC, drc-size: %d", drc->global_cache_size); +                ret = -1; +                goto out; +        } + +        /* What percent of cache to be evicted whenever it fills up */ +        ret = dict_get_uint32 (options, "nfs.drc-lru-factor", &drc_factor); +        if (ret) { +                gf_log (GF_RPCSVC, GF_LOG_DEBUG, "drc lru factor not set." +                        " Continuing with policy default"); +                drc_factor = DRC_DEFAULT_LRU_FACTOR; +        } + +        drc->lru_factor = (drc_lru_factor_t) drc_factor; + +        INIT_LIST_HEAD (&drc->clients_head); +        INIT_LIST_HEAD (&drc->cache_head); + +        ret = rpcsvc_register_notify (svc, rpcsvc_drc_notify, THIS); +        if (ret) { +                gf_log (GF_RPCSVC, GF_LOG_ERROR, +                        "registration of drc_notify function failed"); +                goto out; +        } + +        gf_log (GF_RPCSVC, GF_LOG_DEBUG, "drc init successful"); +        drc->status = DRC_INITIATED; + + out: +        UNLOCK (&drc->lock); +        if (ret == -1) { +                if (drc->mempool) { +                        mem_pool_destroy (drc->mempool); +                        drc->mempool = NULL; +                } +                GF_FREE (drc); +                svc->drc = NULL; +        } +        return ret; +} diff --git a/rpc/rpc-lib/src/rpc-drc.h b/rpc/rpc-lib/src/rpc-drc.h new file mode 100644 index 00000000000..0a1688992d5 --- /dev/null +++ b/rpc/rpc-lib/src/rpc-drc.h @@ -0,0 +1,100 @@ +/* +  Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com> +  This file is part of GlusterFS. + +  This file is licensed to you under your choice of the GNU Lesser +  General Public License, version 3 or any later version (LGPLv3 or +  later), or the GNU General Public License, version 2 (GPLv2), in all +  cases as published by the Free Software Foundation. +*/ + +#ifndef RPC_DRC_H +#define RPC_DRC_H + +#ifndef _CONFIG_H +#define _CONFIG_H +#include "config.h" +#endif + +#include "rpcsvc-common.h" +#include "rpcsvc.h" +#include "locking.h" +#include "dict.h" +#include "rb.h" + +/* per-client cache structure */ +struct drc_client { +        uint32_t                   ref; +        union gf_sock_union        sock_union; +        /* pointers to the cache */ +        struct rb_table           *rbtree; +        /* no. of ops currently cached */ +        uint32_t                   op_count; +        struct list_head           client_list; +}; + +struct drc_cached_op { +        drc_op_state_t                 state; +        uint32_t                       xid; +        int                            prognum; +        int                            progversion; +        int                            procnum; +        rpc_transport_msg_t            msg; +        drc_client_t                  *client; +        struct list_head               client_list; +        struct list_head               global_list; +        int32_t                        ref; +}; + +/* global drc definitions */ +enum drc_status { +        DRC_UNINITIATED, +        DRC_INITIATED +}; +typedef enum drc_status drc_status_t; + +struct drc_globals { +        /* allocator must be the first member since +         * it is used so in gf_libavl_allocator +         */ +        struct libavl_allocator   allocator; +        drc_type_t                type; +        /* configurable size parameter */ +        uint32_t                  global_cache_size; +        drc_lru_factor_t          lru_factor; +        gf_lock_t                 lock; +        drc_status_t              status; +        uint32_t                  op_count; +        uint64_t                  cache_hits; +        uint64_t                  intransit_hits; +        struct mem_pool          *mempool; +        struct list_head          cache_head; +        uint32_t                  client_count; +        struct list_head          clients_head; +}; + +int +rpcsvc_need_drc (rpcsvc_request_t *req); + +drc_cached_op_t * +rpcsvc_drc_lookup (rpcsvc_request_t *req); + +int +rpcsvc_send_cached_reply (rpcsvc_request_t *req, drc_cached_op_t *reply); + +int +rpcsvc_cache_reply (rpcsvc_request_t *req, struct iobref *iobref, +                    struct iovec *rpchdr, int rpchdrcount, +                    struct iovec *proghdr, int proghdrcount, +                    struct iovec *payload, int payloadcount); + +int +rpcsvc_cache_request (rpcsvc_request_t *req); + +int32_t +rpcsvc_drc_priv (rpcsvc_drc_globals_t *drc); + +int +rpcsvc_drc_init (rpcsvc_t *svc, dict_t *options); + +#endif /* RPC_DRC_H */ diff --git a/rpc/rpc-lib/src/rpc-transport.h b/rpc/rpc-lib/src/rpc-transport.h index a8744d61810..4384f2abdc6 100644 --- a/rpc/rpc-lib/src/rpc-transport.h +++ b/rpc/rpc-lib/src/rpc-transport.h @@ -196,6 +196,7 @@ struct rpc_transport {          dict_t                    *options;          char                      *name;          void                      *dnscache; +        void                      *drc_client;          data_t                    *buf;          int32_t                  (*init)   (rpc_transport_t *this);          void                     (*fini)   (rpc_transport_t *this); diff --git a/rpc/rpc-lib/src/rpcsvc-common.h b/rpc/rpc-lib/src/rpcsvc-common.h index 2c6f074886d..054e187c96d 100644 --- a/rpc/rpc-lib/src/rpcsvc-common.h +++ b/rpc/rpc-lib/src/rpcsvc-common.h @@ -30,6 +30,8 @@ struct rpcsvc_state;  typedef int (*rpcsvc_notify_t) (struct rpcsvc_state *, void *mydata,                                  rpcsvc_event_t, void *data); +struct drc_globals; +typedef struct drc_globals rpcsvc_drc_globals_t;  /* Contains global state required for all the RPC services.   */ @@ -68,7 +70,53 @@ typedef struct rpcsvc_state {          void                    *mydata; /* This is xlator */          rpcsvc_notify_t          notifyfn;          struct mem_pool         *rxpool; +        rpcsvc_drc_globals_t    *drc;  } rpcsvc_t; +/* DRC START */ +enum drc_op_type { +        DRC_NA              = 0, +        DRC_IDEMPOTENT      = 1, +        DRC_NON_IDEMPOTENT  = 2 +}; +typedef enum drc_op_type drc_op_type_t; + +enum drc_type { +        DRC_TYPE_NONE        = 0, +        DRC_TYPE_IN_MEMORY   = 1 +}; +typedef enum drc_type drc_type_t; + +enum drc_lru_factor { +        DRC_LRU_5_PC       = 20, +        DRC_LRU_10_PC      = 10, +        DRC_LRU_25_PC      = 4, +        DRC_LRU_50_PC      = 2 +}; +typedef enum drc_lru_factor drc_lru_factor_t; + +enum drc_xid_state { +        DRC_XID_MONOTONOUS  = 0, +        DRC_XID_WRAPPED     = 1 +}; +typedef enum drc_xid_state drc_xid_state_t; + +enum drc_op_state { +        DRC_OP_IN_TRANSIT    = 0, +        DRC_OP_CACHED        = 1 +}; +typedef enum drc_op_state drc_op_state_t; + +enum drc_policy { +        DRC_LRU              = 0 +}; +typedef enum drc_policy drc_policy_t; + +/* Default policies for DRC */ +#define DRC_DEFAULT_TYPE               DRC_TYPE_IN_MEMORY +#define DRC_DEFAULT_CACHE_SIZE         0x20000 +#define DRC_DEFAULT_LRU_FACTOR         DRC_LRU_25_PC + +/* DRC END */  #endif /* #ifndef _RPCSVC_COMMON_H */ diff --git a/rpc/rpc-lib/src/rpcsvc.c b/rpc/rpc-lib/src/rpcsvc.c index d69756cc004..7efb2e1fbb7 100644 --- a/rpc/rpc-lib/src/rpcsvc.c +++ b/rpc/rpc-lib/src/rpcsvc.c @@ -28,6 +28,7 @@  #include "xdr-generic.h"  #include "rpc-common-xdr.h"  #include "syncop.h" +#include "rpc-drc.h"  #include <errno.h>  #include <pthread.h> @@ -422,6 +423,7 @@ rpcsvc_request_create (rpcsvc_t *svc, rpc_transport_t *trans,           * since we are not handling authentication failures for now.           */          req->rpc_status = MSG_ACCEPTED; +        req->reply = NULL;          ret = 0;  err:          if (ret == -1) { @@ -461,13 +463,15 @@ int  rpcsvc_handle_rpc_call (rpcsvc_t *svc, rpc_transport_t *trans,                          rpc_transport_pollin_t *msg)  { -        rpcsvc_actor_t          *actor = NULL; -        rpcsvc_actor            actor_fn = NULL; -        rpcsvc_request_t        *req = NULL; -        int                     ret = -1; -        uint16_t                port = 0; -        gf_boolean_t            is_unix = _gf_false; -        gf_boolean_t            unprivileged = _gf_false; +        rpcsvc_actor_t         *actor          = NULL; +        rpcsvc_actor            actor_fn       = NULL; +        rpcsvc_request_t       *req            = NULL; +        int                     ret            = -1; +        uint16_t                port           = 0; +        gf_boolean_t            is_unix        = _gf_false; +        gf_boolean_t            unprivileged   = _gf_false; +        drc_cached_op_t        *reply          = NULL; +        rpcsvc_drc_globals_t   *drc            = NULL;          if (!trans || !svc)                  return -1; @@ -503,7 +507,7 @@ rpcsvc_handle_rpc_call (rpcsvc_t *svc, rpc_transport_t *trans,          req = rpcsvc_request_create (svc, trans, msg);          if (!req) -                goto err; +                goto out;          if (!rpcsvc_request_accepted (req))                  goto err_reply; @@ -521,6 +525,39 @@ rpcsvc_handle_rpc_call (rpcsvc_t *svc, rpc_transport_t *trans,                          return -1;          } +        /* DRC */ +        if (rpcsvc_need_drc (req)) { +                drc = req->svc->drc; + +                LOCK (&drc->lock); +                reply = rpcsvc_drc_lookup (req); + +                /* retransmission of completed request, send cached reply */ +                if (reply && reply->state == DRC_OP_CACHED) { +                        gf_log (GF_RPCSVC, GF_LOG_INFO, "duplicate request:" +                                " XID: 0x%x", req->xid); +                        ret = rpcsvc_send_cached_reply (req, reply); +                        drc->cache_hits++; +                        UNLOCK (&drc->lock); +                        goto out; + +                } /* retransmitted request, original op in transit, drop it */ +                else if (reply && reply->state == DRC_OP_IN_TRANSIT) { +                        gf_log (GF_RPCSVC, GF_LOG_INFO, "op in transit," +                                " discarding. XID: 0x%x", req->xid); +                        ret = 0; +                        drc->intransit_hits++; +                        rpcsvc_request_destroy (req); +                        UNLOCK (&drc->lock); +                        goto out; + +                } /* fresh request, cache it as in-transit and proceed */ +                else { +                        ret = rpcsvc_cache_request (req); +                } +                UNLOCK (&drc->lock); +        } +          if (req->rpc_err == SUCCESS) {                  /* Before going to xlator code, set the THIS properly */                  THIS = svc->mydata; @@ -557,7 +594,7 @@ err_reply:           * has now been queued. */          ret = 0; -err: +out:          return ret;  } @@ -904,21 +941,22 @@ out:          return ret;  } -static inline int -rpcsvc_transport_submit (rpc_transport_t *trans, struct iovec *hdrvec, -                         int hdrcount, struct iovec *proghdr, int proghdrcount, -                         struct iovec *progpayload, int progpayloadcount, -                         struct iobref *iobref, void *priv) +int +rpcsvc_transport_submit (rpc_transport_t *trans, struct iovec *rpchdr, +                         int rpchdrcount, struct iovec *proghdr, +                         int proghdrcount, struct iovec *progpayload, +                         int progpayloadcount, struct iobref *iobref, +                         void *priv)  {          int                   ret   = -1;          rpc_transport_reply_t reply = {{0, }}; -        if ((!trans) || (!hdrvec) || (!hdrvec->iov_base)) { +        if ((!trans) || (!rpchdr) || (!rpchdr->iov_base)) {                  goto out;          } -        reply.msg.rpchdr = hdrvec; -        reply.msg.rpchdrcount = hdrcount; +        reply.msg.rpchdr = rpchdr; +        reply.msg.rpchdrcount = rpchdrcount;          reply.msg.proghdr = proghdr;          reply.msg.proghdrcount = proghdrcount;          reply.msg.progpayload = progpayload; @@ -1064,6 +1102,7 @@ rpcsvc_submit_generic (rpcsvc_request_t *req, struct iovec *proghdr,          size_t                  msglen     = 0;          size_t                  hdrlen     = 0;          char                    new_iobref = 0; +        rpcsvc_drc_globals_t   *drc        = NULL;          if ((!req) || (!req->trans))                  return -1; @@ -1098,6 +1137,17 @@ rpcsvc_submit_generic (rpcsvc_request_t *req, struct iovec *proghdr,          iobref_add (iobref, replyiob); +        /* cache the request in the duplicate request cache for appropriate ops */ +        if (req->reply) { +                drc = req->svc->drc; + +                LOCK (&drc->lock); +                ret = rpcsvc_cache_reply (req, iobref, &recordhdr, 1, +                                          proghdr, hdrcount, +                                          payload, payloadcount); +                UNLOCK (&drc->lock); +        } +          ret = rpcsvc_transport_submit (trans, &recordhdr, 1, proghdr, hdrcount,                                         payload, payloadcount, iobref,                                         req->trans_private); @@ -1905,6 +1955,7 @@ rpcsvc_init (xlator_t *xl, glusterfs_ctx_t *ctx, dict_t *options,                          "failed to register DUMP program");                  goto free_svc;          } +          ret = 0;  free_svc:          if (ret == -1) { @@ -2196,9 +2247,9 @@ out:  rpcsvc_actor_t gluster_dump_actors[] = { -        [GF_DUMP_NULL] = {"NULL", GF_DUMP_NULL, NULL, NULL, 0}, -        [GF_DUMP_DUMP] = {"DUMP", GF_DUMP_DUMP, rpcsvc_dump, NULL, 0}, -        [GF_DUMP_MAXVALUE] = {"MAXVALUE", GF_DUMP_MAXVALUE, NULL, NULL, 0}, +        [GF_DUMP_NULL]      = {"NULL",     GF_DUMP_NULL,     NULL,        NULL, 0, DRC_NA}, +        [GF_DUMP_DUMP]      = {"DUMP",     GF_DUMP_DUMP,     rpcsvc_dump, NULL, 0, DRC_NA}, +        [GF_DUMP_MAXVALUE]  = {"MAXVALUE", GF_DUMP_MAXVALUE, NULL,        NULL, 0, DRC_NA},  }; diff --git a/rpc/rpc-lib/src/rpcsvc.h b/rpc/rpc-lib/src/rpcsvc.h index afa7c992634..67ff74be6bc 100644 --- a/rpc/rpc-lib/src/rpcsvc.h +++ b/rpc/rpc-lib/src/rpcsvc.h @@ -140,6 +140,9 @@ typedef struct rpcsvc_auth_data {  #define rpcsvc_auth_flavour(au)    ((au).flavour) +typedef struct drc_client drc_client_t; +typedef struct drc_cached_op drc_cached_op_t; +  /* The container for the RPC call handed up to an actor.   * Dynamically allocated. Lives till the call reply is completely   * transmitted. @@ -241,6 +244,9 @@ struct rpcsvc_request {          /* we need to ref the 'iobuf' in case of 'synctasking' it */          struct iobuf            *hdr_iobuf; + +        /* pointer to cached reply for use in DRC */ +        drc_cached_op_t         *reply;  };  #define rpcsvc_request_program(req) ((rpcsvc_program_t *)((req)->prog)) @@ -314,7 +320,6 @@ typedef void *(*rpcsvc_encode_reply) (void *msg);   */  typedef void (*rpcsvc_deallocate_reply) (void *msg); -  #define RPCSVC_NAME_MAX            32  /* The descriptor for each procedure/actor that runs   * over the RPC service. @@ -336,6 +341,7 @@ typedef struct rpcsvc_actor_desc {          /* Can actor be ran on behalf an unprivileged requestor? */          gf_boolean_t            unprivileged; +        drc_op_type_t           op_type;  } rpcsvc_actor_t;  /* Describes a program and its version along with the function pointers @@ -448,6 +454,13 @@ int  rpcsvc_unregister_notify (rpcsvc_t *svc, rpcsvc_notify_t notify, void *mydata);  int +rpcsvc_transport_submit (rpc_transport_t *trans, struct iovec *rpchdr, +                         int rpchdrcount, struct iovec *proghdr, +                         int proghdrcount, struct iovec *progpayload, +                         int progpayloadcount, struct iobref *iobref, +                         void *priv); + +int  rpcsvc_submit_message (rpcsvc_request_t *req, struct iovec *proghdr,                         int hdrcount, struct iovec *payload, int payloadcount,                         struct iobref *iobref); @@ -558,6 +571,9 @@ int rpcsvc_callback_submit (rpcsvc_t *rpc, rpc_transport_t *trans,                              rpcsvc_cbk_program_t *prog, int procnum,                              struct iovec *proghdr, int proghdrcount); +rpcsvc_actor_t * +rpcsvc_program_actor (rpcsvc_request_t *req); +  int  rpcsvc_transport_unix_options_build (dict_t **options, char *filepath);  int @@ -571,5 +587,4 @@ rpcsvc_volume_allowed (dict_t *options, char *volname);  rpcsvc_vector_sizer  rpcsvc_get_program_vector_sizer (rpcsvc_t *svc, uint32_t prognum,                                   uint32_t progver, uint32_t procnum); -  #endif  | 
