summaryrefslogtreecommitdiffstats
path: root/xlators/features/changelog/src/changelog-rpc.c
diff options
context:
space:
mode:
authorGluster Ant <bugzilla-bot@gluster.org>2018-09-12 17:52:45 +0530
committerNigel Babu <nigelb@redhat.com>2018-09-12 17:52:45 +0530
commite16868dede6455cab644805af6fe1ac312775e13 (patch)
tree15aebdb4fff2d87cf8a72f836816b3aa634da58d /xlators/features/changelog/src/changelog-rpc.c
parent45a71c0548b6fd2c757aa2e7b7671a1411948894 (diff)
Land part 2 of clang-format changes
Change-Id: Ia84cc24c8924e6d22d02ac15f611c10e26db99b4 Signed-off-by: Nigel Babu <nigelb@redhat.com>
Diffstat (limited to 'xlators/features/changelog/src/changelog-rpc.c')
-rw-r--r--xlators/features/changelog/src/changelog-rpc.c548
1 files changed, 269 insertions, 279 deletions
diff --git a/xlators/features/changelog/src/changelog-rpc.c b/xlators/features/changelog/src/changelog-rpc.c
index 9027fe10a2a..828f85e8e45 100644
--- a/xlators/features/changelog/src/changelog-rpc.c
+++ b/xlators/features/changelog/src/changelog-rpc.c
@@ -16,256 +16,246 @@
struct rpcsvc_program *changelog_programs[];
static void
-changelog_cleanup_dispatchers (xlator_t *this,
- changelog_priv_t *priv, int count)
+changelog_cleanup_dispatchers(xlator_t *this, changelog_priv_t *priv, int count)
{
- for (count--; count >= 0; count--) {
- (void) changelog_thread_cleanup
- (this, priv->ev_dispatcher[count]);
- priv->ev_dispatcher[count] = 0;
- }
+ for (count--; count >= 0; count--) {
+ (void)changelog_thread_cleanup(this, priv->ev_dispatcher[count]);
+ priv->ev_dispatcher[count] = 0;
+ }
}
int
-changelog_cleanup_rpc_threads (xlator_t *this, changelog_priv_t *priv)
+changelog_cleanup_rpc_threads(xlator_t *this, changelog_priv_t *priv)
{
- int ret = 0;
- changelog_clnt_t *conn = NULL;
-
- conn = &priv->connections;
- if (!conn)
- return 0;
-
- /** terminate RPC thread(s) */
- ret = changelog_thread_cleanup (this, priv->connector);
- if (ret != 0)
- goto error_return;
- priv->connector = 0;
-
- /** terminate dispatcher thread(s) */
- changelog_cleanup_dispatchers (this, priv, priv->nr_dispatchers);
-
- /* TODO: what about pending and waiting connections? */
- changelog_ev_cleanup_connections (this, conn);
-
- /* destroy locks */
- ret = pthread_mutex_destroy (&conn->pending_lock);
- if (ret != 0)
- goto error_return;
- ret = pthread_cond_destroy (&conn->pending_cond);
- if (ret != 0)
- goto error_return;
- ret = LOCK_DESTROY (&conn->active_lock);
- if (ret != 0)
- goto error_return;
- ret = LOCK_DESTROY (&conn->wait_lock);
- if (ret != 0)
- goto error_return;
+ int ret = 0;
+ changelog_clnt_t *conn = NULL;
+
+ conn = &priv->connections;
+ if (!conn)
return 0;
- error_return:
- return -1;
+ /** terminate RPC thread(s) */
+ ret = changelog_thread_cleanup(this, priv->connector);
+ if (ret != 0)
+ goto error_return;
+ priv->connector = 0;
+
+ /** terminate dispatcher thread(s) */
+ changelog_cleanup_dispatchers(this, priv, priv->nr_dispatchers);
+
+ /* TODO: what about pending and waiting connections? */
+ changelog_ev_cleanup_connections(this, conn);
+
+ /* destroy locks */
+ ret = pthread_mutex_destroy(&conn->pending_lock);
+ if (ret != 0)
+ goto error_return;
+ ret = pthread_cond_destroy(&conn->pending_cond);
+ if (ret != 0)
+ goto error_return;
+ ret = LOCK_DESTROY(&conn->active_lock);
+ if (ret != 0)
+ goto error_return;
+ ret = LOCK_DESTROY(&conn->wait_lock);
+ if (ret != 0)
+ goto error_return;
+ return 0;
+
+error_return:
+ return -1;
}
static int
-changelog_init_rpc_threads (xlator_t *this, changelog_priv_t *priv,
- rbuf_t *rbuf, int nr_dispatchers)
+changelog_init_rpc_threads(xlator_t *this, changelog_priv_t *priv, rbuf_t *rbuf,
+ int nr_dispatchers)
{
- int j = 0;
- int ret = 0;
- changelog_clnt_t *conn = NULL;
- char thread_name[GF_THREAD_NAMEMAX] = {0,};
-
-
- conn = &priv->connections;
-
- conn->this = this;
- conn->rbuf = rbuf;
- conn->sequence = 1; /* start with sequence number one */
-
- INIT_LIST_HEAD (&conn->pending);
- INIT_LIST_HEAD (&conn->active);
- INIT_LIST_HEAD (&conn->waitq);
-
- ret = pthread_mutex_init (&conn->pending_lock, NULL);
- if (ret)
- goto error_return;
- ret = pthread_cond_init (&conn->pending_cond, NULL);
- if (ret)
- goto cleanup_pending_lock;
-
- ret = LOCK_INIT (&conn->active_lock);
- if (ret)
- goto cleanup_pending_cond;
- ret = LOCK_INIT (&conn->wait_lock);
- if (ret)
- goto cleanup_active_lock;
-
- /* spawn reverse connection thread */
- ret = gf_thread_create (&priv->connector,
- NULL, changelog_ev_connector, conn, "clogecon");
- if (ret != 0)
- goto cleanup_wait_lock;
-
- /* spawn dispatcher thread(s) */
- priv->ev_dispatcher = GF_CALLOC (nr_dispatchers, sizeof(pthread_t),
- gf_changelog_mt_ev_dispatcher_t);
- if (!priv->ev_dispatcher)
- goto cleanup_connector;
-
- /* spawn dispatcher threads */
- for (; j < nr_dispatchers; j++) {
- snprintf (thread_name, sizeof(thread_name),
- "clogd%03hx", (j & 0x3ff));
- ret = gf_thread_create (&priv->ev_dispatcher[j],
- NULL, changelog_ev_dispatch, conn,
- thread_name);
- if (ret != 0) {
- changelog_cleanup_dispatchers (this, priv, j);
- break;
- }
+ int j = 0;
+ int ret = 0;
+ changelog_clnt_t *conn = NULL;
+ char thread_name[GF_THREAD_NAMEMAX] = {
+ 0,
+ };
+
+ conn = &priv->connections;
+
+ conn->this = this;
+ conn->rbuf = rbuf;
+ conn->sequence = 1; /* start with sequence number one */
+
+ INIT_LIST_HEAD(&conn->pending);
+ INIT_LIST_HEAD(&conn->active);
+ INIT_LIST_HEAD(&conn->waitq);
+
+ ret = pthread_mutex_init(&conn->pending_lock, NULL);
+ if (ret)
+ goto error_return;
+ ret = pthread_cond_init(&conn->pending_cond, NULL);
+ if (ret)
+ goto cleanup_pending_lock;
+
+ ret = LOCK_INIT(&conn->active_lock);
+ if (ret)
+ goto cleanup_pending_cond;
+ ret = LOCK_INIT(&conn->wait_lock);
+ if (ret)
+ goto cleanup_active_lock;
+
+ /* spawn reverse connection thread */
+ ret = gf_thread_create(&priv->connector, NULL, changelog_ev_connector, conn,
+ "clogecon");
+ if (ret != 0)
+ goto cleanup_wait_lock;
+
+ /* spawn dispatcher thread(s) */
+ priv->ev_dispatcher = GF_CALLOC(nr_dispatchers, sizeof(pthread_t),
+ gf_changelog_mt_ev_dispatcher_t);
+ if (!priv->ev_dispatcher)
+ goto cleanup_connector;
+
+ /* spawn dispatcher threads */
+ for (; j < nr_dispatchers; j++) {
+ snprintf(thread_name, sizeof(thread_name), "clogd%03hx", (j & 0x3ff));
+ ret = gf_thread_create(&priv->ev_dispatcher[j], NULL,
+ changelog_ev_dispatch, conn, thread_name);
+ if (ret != 0) {
+ changelog_cleanup_dispatchers(this, priv, j);
+ break;
}
-
- if (ret != 0)
- goto cleanup_connector;
-
- priv->nr_dispatchers = nr_dispatchers;
- return 0;
-
- cleanup_connector:
- (void) pthread_cancel (priv->connector);
- cleanup_wait_lock:
- LOCK_DESTROY (&conn->wait_lock);
- cleanup_active_lock:
- LOCK_DESTROY (&conn->active_lock);
- cleanup_pending_cond:
- (void) pthread_cond_destroy (&conn->pending_cond);
- cleanup_pending_lock:
- (void) pthread_mutex_destroy (&conn->pending_lock);
- error_return:
- return -1;
+ }
+
+ if (ret != 0)
+ goto cleanup_connector;
+
+ priv->nr_dispatchers = nr_dispatchers;
+ return 0;
+
+cleanup_connector:
+ (void)pthread_cancel(priv->connector);
+cleanup_wait_lock:
+ LOCK_DESTROY(&conn->wait_lock);
+cleanup_active_lock:
+ LOCK_DESTROY(&conn->active_lock);
+cleanup_pending_cond:
+ (void)pthread_cond_destroy(&conn->pending_cond);
+cleanup_pending_lock:
+ (void)pthread_mutex_destroy(&conn->pending_lock);
+error_return:
+ return -1;
}
int
-changelog_rpcsvc_notify (rpcsvc_t *rpc,
- void *xl, rpcsvc_event_t event, void *data)
+changelog_rpcsvc_notify(rpcsvc_t *rpc, void *xl, rpcsvc_event_t event,
+ void *data)
{
- return 0;
+ return 0;
}
void
-changelog_destroy_rpc_listner (xlator_t *this, changelog_priv_t *priv)
+changelog_destroy_rpc_listner(xlator_t *this, changelog_priv_t *priv)
{
- char sockfile[UNIX_PATH_MAX] = {0,};
- changelog_clnt_t *c_clnt = &priv->connections;
- changelog_rpc_clnt_t *crpc = NULL;
- int nofconn = 0;
-
- /* sockfile path could have been saved to avoid this */
- CHANGELOG_MAKE_SOCKET_PATH (priv->changelog_brick,
- sockfile, UNIX_PATH_MAX);
- changelog_rpc_server_destroy (this,
- priv->rpc, sockfile,
- changelog_rpcsvc_notify,
- changelog_programs);
-
- /* TODO Below approach is not perfect to wait for cleanup
- all active connections without this code brick process
- can be crash in case of brick multiplexing if any in-progress
- request process on rpc by changelog xlator after
- cleanup resources
- */
-
- if (c_clnt) {
- do {
- nofconn = 0;
- LOCK (&c_clnt->active_lock);
- list_for_each_entry (crpc, &c_clnt->active, list) {
- nofconn++;
- }
- UNLOCK (&c_clnt->active_lock);
- LOCK (&c_clnt->wait_lock);
- list_for_each_entry (crpc, &c_clnt->waitq, list) {
- nofconn++;
- }
- UNLOCK (&c_clnt->wait_lock);
- pthread_mutex_lock (&c_clnt->pending_lock);
- list_for_each_entry (crpc, &c_clnt->pending, list) {
- nofconn++;
- }
- pthread_mutex_unlock (&c_clnt->pending_lock);
-
- } while (nofconn); /* Wait for all connection cleanup */
- }
-
- (void) changelog_cleanup_rpc_threads (this, priv);
+ char sockfile[UNIX_PATH_MAX] = {
+ 0,
+ };
+ changelog_clnt_t *c_clnt = &priv->connections;
+ changelog_rpc_clnt_t *crpc = NULL;
+ int nofconn = 0;
+
+ /* sockfile path could have been saved to avoid this */
+ CHANGELOG_MAKE_SOCKET_PATH(priv->changelog_brick, sockfile, UNIX_PATH_MAX);
+ changelog_rpc_server_destroy(this, priv->rpc, sockfile,
+ changelog_rpcsvc_notify, changelog_programs);
+
+ /* TODO Below approach is not perfect to wait for cleanup
+ all active connections without this code brick process
+ can be crash in case of brick multiplexing if any in-progress
+ request process on rpc by changelog xlator after
+ cleanup resources
+ */
+
+ if (c_clnt) {
+ do {
+ nofconn = 0;
+ LOCK(&c_clnt->active_lock);
+ list_for_each_entry(crpc, &c_clnt->active, list) { nofconn++; }
+ UNLOCK(&c_clnt->active_lock);
+ LOCK(&c_clnt->wait_lock);
+ list_for_each_entry(crpc, &c_clnt->waitq, list) { nofconn++; }
+ UNLOCK(&c_clnt->wait_lock);
+ pthread_mutex_lock(&c_clnt->pending_lock);
+ list_for_each_entry(crpc, &c_clnt->pending, list) { nofconn++; }
+ pthread_mutex_unlock(&c_clnt->pending_lock);
+
+ } while (nofconn); /* Wait for all connection cleanup */
+ }
+
+ (void)changelog_cleanup_rpc_threads(this, priv);
}
rpcsvc_t *
-changelog_init_rpc_listener (xlator_t *this, changelog_priv_t *priv,
+changelog_init_rpc_listener(xlator_t *this, changelog_priv_t *priv,
rbuf_t *rbuf, int nr_dispatchers)
{
- int ret = 0;
- char sockfile[UNIX_PATH_MAX] = {0,};
- rpcsvc_t *svcp;
-
- ret = changelog_init_rpc_threads (this, priv, rbuf, nr_dispatchers);
- if (ret)
- return NULL;
-
- CHANGELOG_MAKE_SOCKET_PATH (priv->changelog_brick,
- sockfile, UNIX_PATH_MAX);
- (void) sys_unlink (sockfile);
- svcp = changelog_rpc_server_init (this, sockfile, NULL,
- changelog_rpcsvc_notify,
- changelog_programs);
- return svcp;
+ int ret = 0;
+ char sockfile[UNIX_PATH_MAX] = {
+ 0,
+ };
+ rpcsvc_t *svcp;
+
+ ret = changelog_init_rpc_threads(this, priv, rbuf, nr_dispatchers);
+ if (ret)
+ return NULL;
+
+ CHANGELOG_MAKE_SOCKET_PATH(priv->changelog_brick, sockfile, UNIX_PATH_MAX);
+ (void)sys_unlink(sockfile);
+ svcp = changelog_rpc_server_init(
+ this, sockfile, NULL, changelog_rpcsvc_notify, changelog_programs);
+ return svcp;
}
void
-changelog_rpc_clnt_cleanup (changelog_rpc_clnt_t *crpc)
+changelog_rpc_clnt_cleanup(changelog_rpc_clnt_t *crpc)
{
- if (!crpc)
- return;
- crpc->c_clnt = NULL;
- LOCK_DESTROY (&crpc->lock);
- GF_FREE (crpc);
+ if (!crpc)
+ return;
+ crpc->c_clnt = NULL;
+ LOCK_DESTROY(&crpc->lock);
+ GF_FREE(crpc);
}
static changelog_rpc_clnt_t *
-changelog_rpc_clnt_init (xlator_t *this,
- changelog_probe_req *rpc_req, changelog_clnt_t *c_clnt)
+changelog_rpc_clnt_init(xlator_t *this, changelog_probe_req *rpc_req,
+ changelog_clnt_t *c_clnt)
{
- int ret = 0;
- changelog_rpc_clnt_t *crpc = NULL;
-
- crpc = GF_CALLOC (1, sizeof (*crpc), gf_changelog_mt_rpc_clnt_t);
- if (!crpc)
- goto error_return;
- INIT_LIST_HEAD (&crpc->list);
-
- /* Take a ref, the last unref will be on RPC_CLNT_DESTROY
- * which comes as a result of last rpc_clnt_unref.
- */
- GF_ATOMIC_INIT (crpc->ref, 1);
- changelog_set_disconnect_flag (crpc, _gf_false);
-
- crpc->filter = rpc_req->filter;
- (void) memcpy (crpc->sock, rpc_req->sock, strlen (rpc_req->sock));
-
- crpc->this = this;
- crpc->c_clnt = c_clnt;
- crpc->cleanup = changelog_rpc_clnt_cleanup;
-
- ret = LOCK_INIT (&crpc->lock);
- if (ret != 0)
- goto dealloc_crpc;
- return crpc;
-
- dealloc_crpc:
- GF_FREE (crpc);
- error_return:
- return NULL;
+ int ret = 0;
+ changelog_rpc_clnt_t *crpc = NULL;
+
+ crpc = GF_CALLOC(1, sizeof(*crpc), gf_changelog_mt_rpc_clnt_t);
+ if (!crpc)
+ goto error_return;
+ INIT_LIST_HEAD(&crpc->list);
+
+ /* Take a ref, the last unref will be on RPC_CLNT_DESTROY
+ * which comes as a result of last rpc_clnt_unref.
+ */
+ GF_ATOMIC_INIT(crpc->ref, 1);
+ changelog_set_disconnect_flag(crpc, _gf_false);
+
+ crpc->filter = rpc_req->filter;
+ (void)memcpy(crpc->sock, rpc_req->sock, strlen(rpc_req->sock));
+
+ crpc->this = this;
+ crpc->c_clnt = c_clnt;
+ crpc->cleanup = changelog_rpc_clnt_cleanup;
+
+ ret = LOCK_INIT(&crpc->lock);
+ if (ret != 0)
+ goto dealloc_crpc;
+ return crpc;
+
+dealloc_crpc:
+ GF_FREE(crpc);
+error_return:
+ return NULL;
}
/**
@@ -279,58 +269,59 @@ changelog_rpc_clnt_init (xlator_t *this,
*/
int
-changelog_handle_probe (rpcsvc_request_t *req)
+changelog_handle_probe(rpcsvc_request_t *req)
{
- int ret = 0;
- xlator_t *this = NULL;
- rpcsvc_t *svc = NULL;
- changelog_priv_t *priv = NULL;
- changelog_clnt_t *c_clnt = NULL;
- changelog_rpc_clnt_t *crpc = NULL;
-
- changelog_probe_req rpc_req = {0,};
- changelog_probe_rsp rpc_rsp = {0,};
-
-
- this = req->trans->xl;
- if (this->cleanup_starting) {
- gf_msg (this->name, GF_LOG_DEBUG, 0,
- CHANGELOG_MSG_HANDLE_PROBE_ERROR,
- "cleanup_starting flag is already set for xl");
- return 0;
- }
-
- ret = xdr_to_generic (req->msg[0],
- &rpc_req, (xdrproc_t)xdr_changelog_probe_req);
- if (ret < 0) {
- gf_msg ("", GF_LOG_ERROR, 0,
- CHANGELOG_MSG_HANDLE_PROBE_ERROR,
- "xdr decoding error");
- req->rpc_err = GARBAGE_ARGS;
- goto handle_xdr_error;
- }
-
- /* ->xl hidden in rpcsvc */
- svc = rpcsvc_request_service (req);
- this = svc->xl;
- priv = this->private;
- c_clnt = &priv->connections;
-
- crpc = changelog_rpc_clnt_init (this, &rpc_req, c_clnt);
- if (!crpc)
- goto handle_xdr_error;
-
- changelog_ev_queue_connection (c_clnt, crpc);
- rpc_rsp.op_ret = 0;
-
- goto submit_rpc;
-
- handle_xdr_error:
- rpc_rsp.op_ret = -1;
- submit_rpc:
- (void) changelog_rpc_sumbit_reply (req, &rpc_rsp, NULL, 0, NULL,
- (xdrproc_t)xdr_changelog_probe_rsp);
+ int ret = 0;
+ xlator_t *this = NULL;
+ rpcsvc_t *svc = NULL;
+ changelog_priv_t *priv = NULL;
+ changelog_clnt_t *c_clnt = NULL;
+ changelog_rpc_clnt_t *crpc = NULL;
+
+ changelog_probe_req rpc_req = {
+ 0,
+ };
+ changelog_probe_rsp rpc_rsp = {
+ 0,
+ };
+
+ this = req->trans->xl;
+ if (this->cleanup_starting) {
+ gf_msg(this->name, GF_LOG_DEBUG, 0, CHANGELOG_MSG_HANDLE_PROBE_ERROR,
+ "cleanup_starting flag is already set for xl");
return 0;
+ }
+
+ ret = xdr_to_generic(req->msg[0], &rpc_req,
+ (xdrproc_t)xdr_changelog_probe_req);
+ if (ret < 0) {
+ gf_msg("", GF_LOG_ERROR, 0, CHANGELOG_MSG_HANDLE_PROBE_ERROR,
+ "xdr decoding error");
+ req->rpc_err = GARBAGE_ARGS;
+ goto handle_xdr_error;
+ }
+
+ /* ->xl hidden in rpcsvc */
+ svc = rpcsvc_request_service(req);
+ this = svc->xl;
+ priv = this->private;
+ c_clnt = &priv->connections;
+
+ crpc = changelog_rpc_clnt_init(this, &rpc_req, c_clnt);
+ if (!crpc)
+ goto handle_xdr_error;
+
+ changelog_ev_queue_connection(c_clnt, crpc);
+ rpc_rsp.op_ret = 0;
+
+ goto submit_rpc;
+
+handle_xdr_error:
+ rpc_rsp.op_ret = -1;
+submit_rpc:
+ (void)changelog_rpc_sumbit_reply(req, &rpc_rsp, NULL, 0, NULL,
+ (xdrproc_t)xdr_changelog_probe_rsp);
+ return 0;
}
/**
@@ -338,22 +329,21 @@ changelog_handle_probe (rpcsvc_request_t *req)
*/
rpcsvc_actor_t changelog_svc_actors[CHANGELOG_RPC_PROC_MAX] = {
- [CHANGELOG_RPC_PROBE_FILTER] = {
- "CHANGELOG PROBE FILTER", CHANGELOG_RPC_PROBE_FILTER,
- changelog_handle_probe, NULL, 0, DRC_NA
- },
+ [CHANGELOG_RPC_PROBE_FILTER] = {"CHANGELOG PROBE FILTER",
+ CHANGELOG_RPC_PROBE_FILTER,
+ changelog_handle_probe, NULL, 0, DRC_NA},
};
struct rpcsvc_program changelog_svc_prog = {
- .progname = CHANGELOG_RPC_PROGNAME,
- .prognum = CHANGELOG_RPC_PROGNUM,
- .progver = CHANGELOG_RPC_PROGVER,
- .numactors = CHANGELOG_RPC_PROC_MAX,
- .actors = changelog_svc_actors,
- .synctask = _gf_true,
+ .progname = CHANGELOG_RPC_PROGNAME,
+ .prognum = CHANGELOG_RPC_PROGNUM,
+ .progver = CHANGELOG_RPC_PROGVER,
+ .numactors = CHANGELOG_RPC_PROC_MAX,
+ .actors = changelog_svc_actors,
+ .synctask = _gf_true,
};
struct rpcsvc_program *changelog_programs[] = {
- &changelog_svc_prog,
- NULL,
+ &changelog_svc_prog,
+ NULL,
};