/* Copyright (c) 2015 Red Hat, Inc. This file is part of GlusterFS. This file is licensed to you under your choice of the GNU Lesser General Public License, version 3 or any later version (LGPLv3 or later), or the GNU General Public License, version 2 (GPLv2), in all cases as published by the Free Software Foundation. */ #include "changelog-ev-handle.h" #include "changelog-rpc-common.h" #include "changelog-helpers.h" struct rpc_clnt_program changelog_ev_program; #define NR_IOVEC (MAX_IOVEC - 3) struct ev_rpc_vec { int count; struct iovec vector[NR_IOVEC]; /* sequence number */ unsigned long seq; }; struct ev_rpc { rbuf_list_t *rlist; struct rpc_clnt *rpc; struct ev_rpc_vec vec; }; /** * As of now this just does the minimal (retval logging). Going further * un-acknowledges sequence numbers can be retransmitted and other * intelligence can be built into the server. */ int changelog_event_dispatch_cbk(struct rpc_req *req, struct iovec *iov, int count, void *myframe) { return 0; } /* dispatcher RPC */ int changelog_dispatch_vec(call_frame_t *frame, xlator_t *this, struct rpc_clnt *rpc, struct ev_rpc_vec *vec) { struct timeval tv = { 0, }; changelog_event_req req = { 0, }; (void)gettimeofday(&tv, NULL); /** * Event dispatch RPC header contains a sequence number for each * dispatch. This allows the receiver to order the request before * processing. */ req.seq = vec->seq; req.tv_sec = tv.tv_sec; req.tv_usec = tv.tv_usec; return changelog_rpc_sumbit_req( rpc, (void *)&req, frame, &changelog_ev_program, CHANGELOG_REV_PROC_EVENT, vec->vector, vec->count, NULL, this, changelog_event_dispatch_cbk, (xdrproc_t)xdr_changelog_event_req); } int changelog_event_dispatch_rpc(call_frame_t *frame, xlator_t *this, void *data) { int idx = 0; int count = 0; int ret = 0; unsigned long sequence = 0; rbuf_iovec_t *rvec = NULL; struct ev_rpc *erpc = NULL; struct rlist_iter riter = { { 0, }, }; /* dispatch NR_IOVEC IO vectors at a time. */ erpc = data; sequence = erpc->rlist->seq[0]; rlist_iter_init(&riter, erpc->rlist); rvec_for_each_entry(rvec, &riter) { idx = count % NR_IOVEC; if (++count == NR_IOVEC) { erpc->vec.vector[idx] = rvec->iov; erpc->vec.seq = sequence++; erpc->vec.count = NR_IOVEC; ret = changelog_dispatch_vec(frame, this, erpc->rpc, &erpc->vec); if (ret) break; count = 0; continue; } erpc->vec.vector[idx] = rvec->iov; } if (ret) goto error_return; idx = count % NR_IOVEC; if (idx) { erpc->vec.seq = sequence; erpc->vec.count = idx; ret = changelog_dispatch_vec(frame, this, erpc->rpc, &erpc->vec); } error_return: return ret; } int changelog_rpc_notify(struct rpc_clnt *rpc, void *mydata, rpc_clnt_event_t event, void *data) { xlator_t *this = NULL; changelog_rpc_clnt_t *crpc = NULL; changelog_clnt_t *c_clnt = NULL; changelog_priv_t *priv = NULL; changelog_ev_selector_t *selection = NULL; uint64_t clntcnt = 0; uint64_t xprtcnt = 0; crpc = mydata; this = crpc->this; c_clnt = crpc->c_clnt; priv = this->private; switch (event) { case RPC_CLNT_CONNECT: selection = &priv->ev_selection; GF_ATOMIC_INC(priv->clntcnt); LOCK(&c_clnt->wait_lock); { LOCK(&c_clnt->active_lock); { changelog_select_event(this, selection, crpc->filter); list_move_tail(&crpc->list, &c_clnt->active); } UNLOCK(&c_clnt->active_lock); } UNLOCK(&c_clnt->wait_lock); break; case RPC_CLNT_DISCONNECT: rpc_clnt_disable(crpc->rpc); /* rpc_clnt_disable doesn't unref the rpc. It just marks * the rpc as disabled and cancels reconnection timer. * Hence unref the rpc object to free it. */ rpc_clnt_unref(crpc->rpc); if (priv) selection = &priv->ev_selection; LOCK(&crpc->lock); { if (selection) changelog_deselect_event(this, selection, crpc->filter); changelog_set_disconnect_flag(crpc, _gf_true); } UNLOCK(&crpc->lock); LOCK(&c_clnt->active_lock); { list_del_init(&crpc->list); } UNLOCK(&c_clnt->active_lock); break; case RPC_CLNT_MSG: case RPC_CLNT_DESTROY: /* Free up mydata */ changelog_rpc_clnt_unref(crpc); clntcnt = GF_ATOMIC_DEC(priv->clntcnt); xprtcnt = GF_ATOMIC_GET(priv->xprtcnt); if (this->cleanup_starting) { if (!clntcnt && !xprtcnt) changelog_process_cleanup_event(this); } break; case RPC_CLNT_PING: break; } return 0; } void * changelog_ev_connector(void *data) { xlator_t *this = NULL; changelog_clnt_t *c_clnt = NULL; changelog_rpc_clnt_t *crpc = NULL; c_clnt = data; this = c_clnt->this; while (1) { pthread_mutex_lock(&c_clnt->pending_lock); { while (list_empty(&c_clnt->pending)) pthread_cond_wait(&c_clnt->pending_cond, &c_clnt->pending_lock); crpc = list_first_entry(&c_clnt->pending, changelog_rpc_clnt_t, list); crpc->rpc = changelog_rpc_client_init(this, crpc, crpc->sock, changelog_rpc_notify); if (!crpc->rpc) { gf_smsg(this->name, GF_LOG_ERROR, 0, CHANGELOG_MSG_RPC_CONNECT_ERROR, "path=%s", crpc->sock, NULL); crpc->cleanup(crpc); goto mutex_unlock; } LOCK(&c_clnt->wait_lock); { list_move_tail(&crpc->list, &c_clnt->waitq); } UNLOCK(&c_clnt->wait_lock); } mutex_unlock: pthread_mutex_unlock(&c_clnt->pending_lock); } return NULL; } void changelog_ev_cleanup_connections(xlator_t *this, changelog_clnt_t *c_clnt) { changelog_rpc_clnt_t *crpc = NULL; /* cleanup active connections */ LOCK(&c_clnt->active_lock); { list_for_each_entry(crpc, &c_clnt->active, list) { rpc_clnt_disable(crpc->rpc); } } UNLOCK(&c_clnt->active_lock); } /** * TODO: granularize lock * * If we have multiple threads dispatching events, doing it this way is * a performance bottleneck. */ static changelog_rpc_clnt_t * get_client(changelog_clnt_t *c_clnt, struct list_head **next) { changelog_rpc_clnt_t *crpc = NULL; LOCK(&c_clnt->active_lock); { if (*next == &c_clnt->active) goto unblock; crpc = list_entry(*next, changelog_rpc_clnt_t, list); /* ref rpc as DISCONNECT might unref the rpc asynchronously */ changelog_rpc_clnt_ref(crpc); rpc_clnt_ref(crpc->rpc); *next = (*next)->next; } unblock: UNLOCK(&c_clnt->active_lock); return crpc; } static void put_client(changelog_clnt_t *c_clnt, changelog_rpc_clnt_t *crpc) { LOCK(&c_clnt->active_lock); { rpc_clnt_unref(crpc->rpc); changelog_rpc_clnt_unref(crpc); } UNLOCK(&c_clnt->active_lock); } void _dispatcher(rbuf_list_t *rlist, void *arg) { xlator_t *this = NULL; changelog_clnt_t *c_clnt = NULL; changelog_rpc_clnt_t *crpc = NULL; struct ev_rpc erpc = { 0, }; struct list_head *next = NULL; c_clnt = arg; this = c_clnt->this; erpc.rlist = rlist; next = c_clnt->active.next; while (1) { crpc = get_client(c_clnt, &next); if (!crpc) break; erpc.rpc = crpc->rpc; (void)changelog_invoke_rpc(this, crpc->rpc, &changelog_ev_program, CHANGELOG_REV_PROC_EVENT, &erpc); put_client(c_clnt, crpc); } } /** this is called under rotbuff's lock */ void sequencer(rbuf_list_t *rlist, void *mydata) { unsigned long range = 0; changelog_clnt_t *c_clnt = 0; c_clnt = mydata; range = (RLIST_ENTRY_COUNT(rlist)) / NR_IOVEC; if ((RLIST_ENTRY_COUNT(rlist)) % NR_IOVEC) range++; RLIST_STORE_SEQ(rlist, c_clnt->sequence, range); c_clnt->sequence += range; } void * changelog_ev_dispatch(void *data) { int ret = 0; void *opaque = NULL; xlator_t *this = NULL; changelog_clnt_t *c_clnt = NULL; struct timeval tv = { 0, }; c_clnt = data; this = c_clnt->this; while (1) { /* TODO: change this to be pthread cond based.. later */ tv.tv_sec = 1; tv.tv_usec = 0; select(0, NULL, NULL, NULL, &tv); ret = rbuf_get_buffer(c_clnt->rbuf, &opaque, sequencer, c_clnt); if (ret != RBUF_CONSUMABLE) { if (ret != RBUF_EMPTY) gf_smsg(this->name, GF_LOG_WARNING, 0, CHANGELOG_MSG_BUFFER_STARVATION_ERROR, "Failed to get buffer for RPC dispatch", "rbuf_retval=%d", ret, NULL); continue; } ret = rbuf_wait_for_completion(c_clnt->rbuf, opaque, _dispatcher, c_clnt); if (ret) gf_smsg(this->name, GF_LOG_WARNING, 0, CHANGELOG_MSG_PUT_BUFFER_FAILED, NULL); } return NULL; } void changelog_ev_queue_connection(changelog_clnt_t *c_clnt, changelog_rpc_clnt_t *crpc) { pthread_mutex_lock(&c_clnt->pending_lock); { list_add_tail(&crpc->list, &c_clnt->pending); pthread_cond_signal(&c_clnt->pending_cond); } pthread_mutex_unlock(&c_clnt->pending_lock); } struct rpc_clnt_procedure changelog_ev_procs[CHANGELOG_REV_PROC_MAX] = { [CHANGELOG_REV_PROC_NULL] = {"NULL", NULL}, [CHANGELOG_REV_PROC_EVENT] = {"EVENT DISPATCH", changelog_event_dispatch_rpc}, }; struct rpc_clnt_program changelog_ev_program = { .progname = "CHANGELOG EVENT DISPATCHER", .prognum = CHANGELOG_REV_RPC_PROCNUM, .progver = CHANGELOG_REV_RPC_PROCVER, .numproc = CHANGELOG_REV_PROC_MAX, .proctable = changelog_ev_procs, };