diff options
Diffstat (limited to 'xlators/features/changelog/src/changelog-rpc-common.c')
| -rw-r--r-- | xlators/features/changelog/src/changelog-rpc-common.c | 359 |
1 files changed, 359 insertions, 0 deletions
diff --git a/xlators/features/changelog/src/changelog-rpc-common.c b/xlators/features/changelog/src/changelog-rpc-common.c new file mode 100644 index 00000000000..125246a17e1 --- /dev/null +++ b/xlators/features/changelog/src/changelog-rpc-common.c @@ -0,0 +1,359 @@ +/* + Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com> + This file is part of GlusterFS. + + This file is licensed to you under your choice of the GNU Lesser + General Public License, version 3 or any later version (LGPLv3 or + later), or the GNU General Public License, version 2 (GPLv2), in all + cases as published by the Free Software Foundation. +*/ + +#include "changelog-rpc-common.h" +#include "changelog-messages.h" + +#include <glusterfs/syscall.h> +/** +***************************************************** + Client Interface +***************************************************** +*/ + +/** + * Initialize and return an RPC client object for a given unix + * domain socket. + */ + +void * +changelog_rpc_poller(void *arg) +{ + xlator_t *this = arg; + + (void)gf_event_dispatch(this->ctx->event_pool); + return NULL; +} + +struct rpc_clnt * +changelog_rpc_client_init(xlator_t *this, void *cbkdata, char *sockfile, + rpc_clnt_notify_t fn) +{ + int ret = 0; + struct rpc_clnt *rpc = NULL; + dict_t *options = NULL; + + if (!cbkdata) + cbkdata = this; + + options = dict_new(); + if (!options) + goto error_return; + + ret = rpc_transport_unix_options_build(options, sockfile, 0); + if (ret) { + gf_smsg(this->name, GF_LOG_ERROR, 0, CHANGELOG_MSG_RPC_BUILD_ERROR, + NULL); + goto dealloc_dict; + } + + rpc = rpc_clnt_new(options, this, this->name, 16); + if (!rpc) + goto dealloc_dict; + + ret = rpc_clnt_register_notify(rpc, fn, cbkdata); + if (ret) { + gf_smsg(this->name, GF_LOG_ERROR, 0, + CHANGELOG_MSG_NOTIFY_REGISTER_FAILED, NULL); + goto dealloc_rpc_clnt; + } + + ret = rpc_clnt_start(rpc); + if (ret) { + gf_smsg(this->name, GF_LOG_ERROR, 0, CHANGELOG_MSG_RPC_START_ERROR, + NULL); + goto dealloc_rpc_clnt; + } + + dict_unref(options); + return rpc; + +dealloc_rpc_clnt: + rpc_clnt_unref(rpc); +dealloc_dict: + dict_unref(options); +error_return: + return NULL; +} + +/** + * Generic RPC client routine to dispatch a request to an + * RPC server. + */ +int +changelog_rpc_sumbit_req(struct rpc_clnt *rpc, void *req, call_frame_t *frame, + rpc_clnt_prog_t *prog, int procnum, + struct iovec *payload, int payloadcnt, + struct iobref *iobref, xlator_t *this, + fop_cbk_fn_t cbkfn, xdrproc_t xdrproc) +{ + int ret = 0; + int count = 0; + struct iovec iov = { + 0, + }; + struct iobuf *iobuf = NULL; + char new_iobref = 0; + ssize_t xdr_size = 0; + + GF_ASSERT(this); + + if (req) { + xdr_size = xdr_sizeof(xdrproc, req); + + iobuf = iobuf_get2(this->ctx->iobuf_pool, xdr_size); + if (!iobuf) { + goto out; + }; + + if (!iobref) { + iobref = iobref_new(); + if (!iobref) { + goto out; + } + + new_iobref = 1; + } + + iobref_add(iobref, iobuf); + + iov.iov_base = iobuf->ptr; + iov.iov_len = iobuf_size(iobuf); + + /* Create the xdr payload */ + ret = xdr_serialize_generic(iov, req, xdrproc); + if (ret == -1) { + goto out; + } + + iov.iov_len = ret; + count = 1; + } + + ret = rpc_clnt_submit(rpc, prog, procnum, cbkfn, &iov, count, payload, + payloadcnt, iobref, frame, NULL, 0, NULL, 0, NULL); + +out: + if (new_iobref) + iobref_unref(iobref); + if (iobuf) + iobuf_unref(iobuf); + return ret; +} + +/** + * Entry point to perform a remote procedure call + */ +int +changelog_invoke_rpc(xlator_t *this, struct rpc_clnt *rpc, + rpc_clnt_prog_t *prog, int procidx, void *arg) +{ + int ret = 0; + call_frame_t *frame = NULL; + rpc_clnt_procedure_t *proc = NULL; + + if (!this || !prog) + goto error_return; + + frame = create_frame(this, this->ctx->pool); + if (!frame) { + gf_smsg(this->name, GF_LOG_ERROR, 0, CHANGELOG_MSG_CREATE_FRAME_FAILED, + NULL); + goto error_return; + } + + proc = &prog->proctable[procidx]; + if (proc->fn) + ret = proc->fn(frame, this, arg); + + STACK_DESTROY(frame->root); + return ret; + +error_return: + return -1; +} + +/** +***************************************************** + Server Interface +***************************************************** +*/ + +struct iobuf * +__changelog_rpc_serialize_reply(rpcsvc_request_t *req, void *arg, + struct iovec *outmsg, xdrproc_t xdrproc) +{ + struct iobuf *iob = NULL; + ssize_t retlen = 0; + ssize_t rsp_size = 0; + + rsp_size = xdr_sizeof(xdrproc, arg); + iob = iobuf_get2(req->svc->ctx->iobuf_pool, rsp_size); + if (!iob) + goto error_return; + + iobuf_to_iovec(iob, outmsg); + + retlen = xdr_serialize_generic(*outmsg, arg, xdrproc); + if (retlen == -1) + goto unref_iob; + + outmsg->iov_len = retlen; + return iob; + +unref_iob: + iobuf_unref(iob); +error_return: + return NULL; +} + +int +changelog_rpc_sumbit_reply(rpcsvc_request_t *req, void *arg, + struct iovec *payload, int payloadcount, + struct iobref *iobref, xdrproc_t xdrproc) +{ + int ret = -1; + struct iobuf *iob = NULL; + struct iovec iov = { + 0, + }; + char new_iobref = 0; + + if (!req) + goto return_ret; + + if (!iobref) { + iobref = iobref_new(); + if (!iobref) + goto return_ret; + new_iobref = 1; + } + + iob = __changelog_rpc_serialize_reply(req, arg, &iov, xdrproc); + if (!iob) + gf_smsg("", GF_LOG_ERROR, 0, CHANGELOG_MSG_RPC_SUBMIT_REPLY_FAILED, + NULL); + else + iobref_add(iobref, iob); + + ret = rpcsvc_submit_generic(req, &iov, 1, payload, payloadcount, iobref); + + if (new_iobref) + iobref_unref(iobref); + if (iob) + iobuf_unref(iob); +return_ret: + return ret; +} + +void +changelog_rpc_server_destroy(xlator_t *this, rpcsvc_t *rpc, char *sockfile, + rpcsvc_notify_t fn, struct rpcsvc_program **progs) +{ + rpcsvc_listener_t *listener = NULL; + rpcsvc_listener_t *next = NULL; + struct rpcsvc_program *prog = NULL; + rpc_transport_t *trans = NULL; + + if (!rpc) + return; + + while (*progs) { + prog = *progs; + (void)rpcsvc_program_unregister(rpc, prog); + progs++; + } + + list_for_each_entry_safe(listener, next, &rpc->listeners, list) + { + if (listener->trans) { + trans = listener->trans; + rpc_transport_disconnect(trans, _gf_false); + } + } + + (void)rpcsvc_unregister_notify(rpc, fn, this); + + /* TODO Avoid freeing rpc object in case of brick multiplex + after freeing rpc object svc->rpclock corrupted and it takes + more time to detach a brick + */ + if (!this->cleanup_starting) { + if (rpc->rxpool) { + mem_pool_destroy(rpc->rxpool); + rpc->rxpool = NULL; + } + GF_FREE(rpc); + } +} + +rpcsvc_t * +changelog_rpc_server_init(xlator_t *this, char *sockfile, void *cbkdata, + rpcsvc_notify_t fn, struct rpcsvc_program **progs) +{ + int ret = 0; + rpcsvc_t *rpc = NULL; + dict_t *options = NULL; + struct rpcsvc_program *prog = NULL; + + if (!cbkdata) + cbkdata = this; + + options = dict_new(); + if (!options) + return NULL; + + ret = rpcsvc_transport_unix_options_build(options, sockfile); + if (ret) + goto dealloc_dict; + + rpc = rpcsvc_init(this, this->ctx, options, 8); + if (rpc == NULL) { + gf_smsg(this->name, GF_LOG_ERROR, 0, CHANGELOG_MSG_RPC_START_ERROR, + NULL); + goto dealloc_dict; + } + + ret = rpcsvc_register_notify(rpc, fn, cbkdata); + if (ret) { + gf_smsg(this->name, GF_LOG_ERROR, 0, + CHANGELOG_MSG_NOTIFY_REGISTER_FAILED, NULL); + goto dealloc_rpc; + } + + ret = rpcsvc_create_listeners(rpc, options, this->name); + if (ret != 1) { + gf_msg_debug(this->name, 0, "failed to create listeners"); + goto dealloc_rpc; + } + + while (*progs) { + prog = *progs; + ret = rpcsvc_program_register(rpc, prog, _gf_false); + if (ret) { + gf_smsg(this->name, GF_LOG_ERROR, 0, + CHANGELOG_MSG_PROGRAM_NAME_REG_FAILED, "name%s", + prog->progname, "prognum=%d", prog->prognum, "pogver=%d", + prog->progver, NULL); + goto dealloc_rpc; + } + + progs++; + } + + dict_unref(options); + return rpc; + +dealloc_rpc: + GF_FREE(rpc); +dealloc_dict: + dict_unref(options); + return NULL; +} |
