summaryrefslogtreecommitdiffstats
path: root/xlators/features/changelog/src/changelog-rpc-common.c
diff options
context:
space:
mode:
Diffstat (limited to 'xlators/features/changelog/src/changelog-rpc-common.c')
-rw-r--r--xlators/features/changelog/src/changelog-rpc-common.c525
1 files changed, 261 insertions, 264 deletions
diff --git a/xlators/features/changelog/src/changelog-rpc-common.c b/xlators/features/changelog/src/changelog-rpc-common.c
index 056519232cf..ce01bf7a133 100644
--- a/xlators/features/changelog/src/changelog-rpc-common.c
+++ b/xlators/features/changelog/src/changelog-rpc-common.c
@@ -24,65 +24,63 @@
*/
void *
-changelog_rpc_poller (void *arg)
+changelog_rpc_poller(void *arg)
{
- xlator_t *this = arg;
+ xlator_t *this = arg;
- (void) event_dispatch (this->ctx->event_pool);
- return NULL;
+ (void)event_dispatch(this->ctx->event_pool);
+ return NULL;
}
struct rpc_clnt *
-changelog_rpc_client_init (xlator_t *this, void *cbkdata,
- char *sockfile, rpc_clnt_notify_t fn)
+changelog_rpc_client_init(xlator_t *this, void *cbkdata, char *sockfile,
+ rpc_clnt_notify_t fn)
{
- int ret = 0;
- struct rpc_clnt *rpc = NULL;
- dict_t *options = NULL;
-
- if (!cbkdata)
- cbkdata = this;
-
- options = dict_new ();
- if (!options)
- goto error_return;
-
- ret = rpc_transport_unix_options_build (&options, sockfile, 0);
- if (ret) {
- gf_msg (this->name, GF_LOG_ERROR, 0,
- CHANGELOG_MSG_RPC_BUILD_ERROR,
- "failed to build rpc options");
- goto dealloc_dict;
- }
-
- rpc = rpc_clnt_new (options, this, this->name, 16);
- if (!rpc)
- goto dealloc_dict;
-
- ret = rpc_clnt_register_notify (rpc, fn, cbkdata);
- if (ret) {
- gf_msg (this->name, GF_LOG_ERROR, 0,
- CHANGELOG_MSG_NOTIFY_REGISTER_FAILED,
- "failed to register notify");
- goto dealloc_rpc_clnt;
- }
-
- ret = rpc_clnt_start (rpc);
- if (ret) {
- gf_msg (this->name, GF_LOG_ERROR, 0,
- CHANGELOG_MSG_RPC_START_ERROR,
- "failed to start rpc");
- goto dealloc_rpc_clnt;
- }
-
- return rpc;
-
- dealloc_rpc_clnt:
- rpc_clnt_unref (rpc);
- dealloc_dict:
- dict_unref (options);
- error_return:
- return NULL;
+ int ret = 0;
+ struct rpc_clnt *rpc = NULL;
+ dict_t *options = NULL;
+
+ if (!cbkdata)
+ cbkdata = this;
+
+ options = dict_new();
+ if (!options)
+ goto error_return;
+
+ ret = rpc_transport_unix_options_build(&options, sockfile, 0);
+ if (ret) {
+ gf_msg(this->name, GF_LOG_ERROR, 0, CHANGELOG_MSG_RPC_BUILD_ERROR,
+ "failed to build rpc options");
+ goto dealloc_dict;
+ }
+
+ rpc = rpc_clnt_new(options, this, this->name, 16);
+ if (!rpc)
+ goto dealloc_dict;
+
+ ret = rpc_clnt_register_notify(rpc, fn, cbkdata);
+ if (ret) {
+ gf_msg(this->name, GF_LOG_ERROR, 0,
+ CHANGELOG_MSG_NOTIFY_REGISTER_FAILED,
+ "failed to register notify");
+ goto dealloc_rpc_clnt;
+ }
+
+ ret = rpc_clnt_start(rpc);
+ if (ret) {
+ gf_msg(this->name, GF_LOG_ERROR, 0, CHANGELOG_MSG_RPC_START_ERROR,
+ "failed to start rpc");
+ goto dealloc_rpc_clnt;
+ }
+
+ return rpc;
+
+dealloc_rpc_clnt:
+ rpc_clnt_unref(rpc);
+dealloc_dict:
+ dict_unref(options);
+error_return:
+ return NULL;
}
/**
@@ -90,96 +88,96 @@ changelog_rpc_client_init (xlator_t *this, void *cbkdata,
* RPC server.
*/
int
-changelog_rpc_sumbit_req (struct rpc_clnt *rpc, void *req,
- call_frame_t *frame, rpc_clnt_prog_t *prog,
- int procnum, struct iovec *payload, int payloadcnt,
- struct iobref *iobref, xlator_t *this,
- fop_cbk_fn_t cbkfn, xdrproc_t xdrproc)
+changelog_rpc_sumbit_req(struct rpc_clnt *rpc, void *req, call_frame_t *frame,
+ rpc_clnt_prog_t *prog, int procnum,
+ struct iovec *payload, int payloadcnt,
+ struct iobref *iobref, xlator_t *this,
+ fop_cbk_fn_t cbkfn, xdrproc_t xdrproc)
{
- int ret = 0;
- int count = 0;
- struct iovec iov = {0, };
- struct iobuf *iobuf = NULL;
- char new_iobref = 0;
- ssize_t xdr_size = 0;
-
- GF_ASSERT (this);
+ int ret = 0;
+ int count = 0;
+ struct iovec iov = {
+ 0,
+ };
+ struct iobuf *iobuf = NULL;
+ char new_iobref = 0;
+ ssize_t xdr_size = 0;
- if (req) {
- xdr_size = xdr_sizeof (xdrproc, req);
+ GF_ASSERT(this);
- iobuf = iobuf_get2 (this->ctx->iobuf_pool, xdr_size);
- if (!iobuf) {
- goto out;
- };
+ if (req) {
+ xdr_size = xdr_sizeof(xdrproc, req);
- if (!iobref) {
- iobref = iobref_new ();
- if (!iobref) {
- goto out;
- }
+ iobuf = iobuf_get2(this->ctx->iobuf_pool, xdr_size);
+ if (!iobuf) {
+ goto out;
+ };
- new_iobref = 1;
- }
+ if (!iobref) {
+ iobref = iobref_new();
+ if (!iobref) {
+ goto out;
+ }
- iobref_add (iobref, iobuf);
+ new_iobref = 1;
+ }
- iov.iov_base = iobuf->ptr;
- iov.iov_len = iobuf_size (iobuf);
+ iobref_add(iobref, iobuf);
- /* Create the xdr payload */
- ret = xdr_serialize_generic (iov, req, xdrproc);
- if (ret == -1) {
- goto out;
- }
+ iov.iov_base = iobuf->ptr;
+ iov.iov_len = iobuf_size(iobuf);
- iov.iov_len = ret;
- count = 1;
+ /* Create the xdr payload */
+ ret = xdr_serialize_generic(iov, req, xdrproc);
+ if (ret == -1) {
+ goto out;
}
- ret = rpc_clnt_submit (rpc, prog, procnum, cbkfn, &iov, count,
- payload, payloadcnt, iobref, frame, NULL,
- 0, NULL, 0, NULL);
+ iov.iov_len = ret;
+ count = 1;
+ }
- out:
- if (new_iobref)
- iobref_unref (iobref);
- if (iobuf)
- iobuf_unref (iobuf);
- return ret;
+ ret = rpc_clnt_submit(rpc, prog, procnum, cbkfn, &iov, count, payload,
+ payloadcnt, iobref, frame, NULL, 0, NULL, 0, NULL);
+
+out:
+ if (new_iobref)
+ iobref_unref(iobref);
+ if (iobuf)
+ iobuf_unref(iobuf);
+ return ret;
}
/**
* Entry point to perform a remote procedure call
*/
int
-changelog_invoke_rpc (xlator_t *this, struct rpc_clnt *rpc,
- rpc_clnt_prog_t *prog, int procidx, void *arg)
+changelog_invoke_rpc(xlator_t *this, struct rpc_clnt *rpc,
+ rpc_clnt_prog_t *prog, int procidx, void *arg)
{
- int ret = 0;
- call_frame_t *frame = NULL;
- rpc_clnt_procedure_t *proc = NULL;
-
- if (!this || !prog)
- goto error_return;
-
- frame = create_frame (this, this->ctx->pool);
- if (!frame) {
- gf_msg (this->name, GF_LOG_ERROR, 0,
- CHANGELOG_MSG_CREATE_FRAME_FAILED,
- "failed to create frame");
- goto error_return;
- }
+ int ret = 0;
+ call_frame_t *frame = NULL;
+ rpc_clnt_procedure_t *proc = NULL;
- proc = &prog->proctable[procidx];
- if (proc->fn)
- ret = proc->fn (frame, this, arg);
+ if (!this || !prog)
+ goto error_return;
- STACK_DESTROY (frame->root);
- return ret;
+ frame = create_frame(this, this->ctx->pool);
+ if (!frame) {
+ gf_msg(this->name, GF_LOG_ERROR, 0, CHANGELOG_MSG_CREATE_FRAME_FAILED,
+ "failed to create frame");
+ goto error_return;
+ }
- error_return:
- return -1;
+ proc = &prog->proctable[procidx];
+ if (proc->fn)
+ ret = proc->fn(frame, this, arg);
+
+ STACK_DESTROY(frame->root);
+ return ret;
+
+error_return:
+ return -1;
}
/**
@@ -189,170 +187,169 @@ changelog_invoke_rpc (xlator_t *this, struct rpc_clnt *rpc,
*/
struct iobuf *
-__changelog_rpc_serialize_reply (rpcsvc_request_t *req, void *arg,
- struct iovec *outmsg, xdrproc_t xdrproc)
+__changelog_rpc_serialize_reply(rpcsvc_request_t *req, void *arg,
+ struct iovec *outmsg, xdrproc_t xdrproc)
{
- struct iobuf *iob = NULL;
- ssize_t retlen = 0;
- ssize_t rsp_size = 0;
+ struct iobuf *iob = NULL;
+ ssize_t retlen = 0;
+ ssize_t rsp_size = 0;
- rsp_size = xdr_sizeof (xdrproc, arg);
- iob = iobuf_get2 (req->svc->ctx->iobuf_pool, rsp_size);
- if (!iob)
- goto error_return;
+ rsp_size = xdr_sizeof(xdrproc, arg);
+ iob = iobuf_get2(req->svc->ctx->iobuf_pool, rsp_size);
+ if (!iob)
+ goto error_return;
- iobuf_to_iovec (iob, outmsg);
+ iobuf_to_iovec(iob, outmsg);
- retlen = xdr_serialize_generic (*outmsg, arg, xdrproc);
- if (retlen == -1)
- goto unref_iob;
+ retlen = xdr_serialize_generic(*outmsg, arg, xdrproc);
+ if (retlen == -1)
+ goto unref_iob;
- outmsg->iov_len = retlen;
- return iob;
+ outmsg->iov_len = retlen;
+ return iob;
- unref_iob:
- iobuf_unref (iob);
- error_return:
- return NULL;
+unref_iob:
+ iobuf_unref(iob);
+error_return:
+ return NULL;
}
int
-changelog_rpc_sumbit_reply (rpcsvc_request_t *req,
- void *arg, struct iovec *payload, int payloadcount,
- struct iobref *iobref, xdrproc_t xdrproc)
+changelog_rpc_sumbit_reply(rpcsvc_request_t *req, void *arg,
+ struct iovec *payload, int payloadcount,
+ struct iobref *iobref, xdrproc_t xdrproc)
{
- int ret = -1;
- struct iobuf *iob = NULL;
- struct iovec iov = {0,};
- char new_iobref = 0;
-
- if (!req)
- goto return_ret;
-
- if (!iobref) {
- iobref = iobref_new ();
- if (!iobref)
- goto return_ret;
- new_iobref = 1;
- }
-
- iob = __changelog_rpc_serialize_reply (req, arg, &iov, xdrproc);
- if (!iob)
- gf_msg ("", GF_LOG_ERROR, 0,
- CHANGELOG_MSG_RPC_SUBMIT_REPLY_FAILED,
- "failed to serialize reply");
- else
- iobref_add (iobref, iob);
-
- ret = rpcsvc_submit_generic (req, &iov,
- 1, payload, payloadcount, iobref);
-
- if (new_iobref)
- iobref_unref (iobref);
- if (iob)
- iobuf_unref (iob);
- return_ret:
- return ret;
+ int ret = -1;
+ struct iobuf *iob = NULL;
+ struct iovec iov = {
+ 0,
+ };
+ char new_iobref = 0;
+
+ if (!req)
+ goto return_ret;
+
+ if (!iobref) {
+ iobref = iobref_new();
+ if (!iobref)
+ goto return_ret;
+ new_iobref = 1;
+ }
+
+ iob = __changelog_rpc_serialize_reply(req, arg, &iov, xdrproc);
+ if (!iob)
+ gf_msg("", GF_LOG_ERROR, 0, CHANGELOG_MSG_RPC_SUBMIT_REPLY_FAILED,
+ "failed to serialize reply");
+ else
+ iobref_add(iobref, iob);
+
+ ret = rpcsvc_submit_generic(req, &iov, 1, payload, payloadcount, iobref);
+
+ if (new_iobref)
+ iobref_unref(iobref);
+ if (iob)
+ iobuf_unref(iob);
+return_ret:
+ return ret;
}
void
-changelog_rpc_server_destroy (xlator_t *this, rpcsvc_t *rpc, char *sockfile,
- rpcsvc_notify_t fn, struct rpcsvc_program **progs)
+changelog_rpc_server_destroy(xlator_t *this, rpcsvc_t *rpc, char *sockfile,
+ rpcsvc_notify_t fn, struct rpcsvc_program **progs)
{
- rpcsvc_listener_t *listener = NULL;
- rpcsvc_listener_t *next = NULL;
- struct rpcsvc_program *prog = NULL;
-
- while (*progs) {
- prog = *progs;
- (void) rpcsvc_program_unregister (rpc, prog);
- progs++;
- }
-
- list_for_each_entry_safe (listener, next, &rpc->listeners, list) {
- rpcsvc_listener_destroy (listener);
- }
-
- (void) rpcsvc_unregister_notify (rpc, fn, this);
- sys_unlink (sockfile);
- if (rpc->rxpool) {
- mem_pool_destroy (rpc->rxpool);
- rpc->rxpool = NULL;
- }
-
- /* TODO Avoid freeing rpc object in case of brick multiplex
- after freeing rpc object svc->rpclock corrupted and it takes
- more time to detach a brick
- */
- if (!this->cleanup_starting)
- GF_FREE (rpc);
+ rpcsvc_listener_t *listener = NULL;
+ rpcsvc_listener_t *next = NULL;
+ struct rpcsvc_program *prog = NULL;
+
+ while (*progs) {
+ prog = *progs;
+ (void)rpcsvc_program_unregister(rpc, prog);
+ progs++;
+ }
+
+ list_for_each_entry_safe(listener, next, &rpc->listeners, list)
+ {
+ rpcsvc_listener_destroy(listener);
+ }
+
+ (void)rpcsvc_unregister_notify(rpc, fn, this);
+ sys_unlink(sockfile);
+ if (rpc->rxpool) {
+ mem_pool_destroy(rpc->rxpool);
+ rpc->rxpool = NULL;
+ }
+
+ /* TODO Avoid freeing rpc object in case of brick multiplex
+ after freeing rpc object svc->rpclock corrupted and it takes
+ more time to detach a brick
+ */
+ if (!this->cleanup_starting)
+ GF_FREE(rpc);
}
rpcsvc_t *
-changelog_rpc_server_init (xlator_t *this, char *sockfile, void *cbkdata,
- rpcsvc_notify_t fn, struct rpcsvc_program **progs)
+changelog_rpc_server_init(xlator_t *this, char *sockfile, void *cbkdata,
+ rpcsvc_notify_t fn, struct rpcsvc_program **progs)
{
- int ret = 0;
- rpcsvc_t *rpc = NULL;
- dict_t *options = NULL;
- struct rpcsvc_program *prog = NULL;
-
- if (!cbkdata)
- cbkdata = this;
-
- options = dict_new ();
- if (!options)
- goto error_return;
-
- ret = rpcsvc_transport_unix_options_build (&options, sockfile);
- if (ret)
- goto dealloc_dict;
-
- rpc = rpcsvc_init (this, this->ctx, options, 8);
- if (rpc == NULL) {
- gf_msg (this->name, GF_LOG_ERROR, 0,
- CHANGELOG_MSG_RPC_START_ERROR,
- "failed to init rpc");
- goto dealloc_dict;
- }
-
- ret = rpcsvc_register_notify (rpc, fn, cbkdata);
+ int ret = 0;
+ rpcsvc_t *rpc = NULL;
+ dict_t *options = NULL;
+ struct rpcsvc_program *prog = NULL;
+
+ if (!cbkdata)
+ cbkdata = this;
+
+ options = dict_new();
+ if (!options)
+ goto error_return;
+
+ ret = rpcsvc_transport_unix_options_build(&options, sockfile);
+ if (ret)
+ goto dealloc_dict;
+
+ rpc = rpcsvc_init(this, this->ctx, options, 8);
+ if (rpc == NULL) {
+ gf_msg(this->name, GF_LOG_ERROR, 0, CHANGELOG_MSG_RPC_START_ERROR,
+ "failed to init rpc");
+ goto dealloc_dict;
+ }
+
+ ret = rpcsvc_register_notify(rpc, fn, cbkdata);
+ if (ret) {
+ gf_msg(this->name, GF_LOG_ERROR, 0,
+ CHANGELOG_MSG_NOTIFY_REGISTER_FAILED,
+ "failed to register notify function");
+ goto dealloc_rpc;
+ }
+
+ ret = rpcsvc_create_listeners(rpc, options, this->name);
+ if (ret != 1) {
+ gf_msg_debug(this->name, 0, "failed to create listeners");
+ goto dealloc_rpc;
+ }
+
+ while (*progs) {
+ prog = *progs;
+ ret = rpcsvc_program_register(rpc, prog, _gf_false);
if (ret) {
- gf_msg (this->name, GF_LOG_ERROR, 0,
- CHANGELOG_MSG_NOTIFY_REGISTER_FAILED,
- "failed to register notify function");
- goto dealloc_rpc;
- }
-
- ret = rpcsvc_create_listeners (rpc, options, this->name);
- if (ret != 1) {
- gf_msg_debug (this->name,
- 0, "failed to create listeners");
- goto dealloc_rpc;
+ gf_msg(this->name, GF_LOG_ERROR, 0,
+ CHANGELOG_MSG_PROGRAM_NAME_REG_FAILED,
+ "cannot register program "
+ "(name: %s, prognum: %d, pogver: %d)",
+ prog->progname, prog->prognum, prog->progver);
+ goto dealloc_rpc;
}
- while (*progs) {
- prog = *progs;
- ret = rpcsvc_program_register (rpc, prog, _gf_false);
- if (ret) {
- gf_msg (this->name, GF_LOG_ERROR, 0,
- CHANGELOG_MSG_PROGRAM_NAME_REG_FAILED,
- "cannot register program "
- "(name: %s, prognum: %d, pogver: %d)",
- prog->progname, prog->prognum, prog->progver);
- goto dealloc_rpc;
- }
-
- progs++;
- }
+ progs++;
+ }
- dict_unref (options);
- return rpc;
+ dict_unref(options);
+ return rpc;
- dealloc_rpc:
- GF_FREE (rpc);
- dealloc_dict:
- dict_unref (options);
- error_return:
- return NULL;
+dealloc_rpc:
+ GF_FREE(rpc);
+dealloc_dict:
+ dict_unref(options);
+error_return:
+ return NULL;
}