diff options
author | Venky Shankar <vshankar@redhat.com> | 2015-03-21 19:59:45 +0530 |
---|---|---|
committer | Vijay Bellur <vbellur@redhat.com> | 2015-03-24 06:32:39 -0700 |
commit | 470d5c4af7f85809997a77af187c8f430b0354c8 (patch) | |
tree | 38ff73b3ceb39d55b883def3a241f7755d939d53 /xlators/features/changelog/lib/src/gf-changelog-reborp.c | |
parent | 9c9f3f368c693b1cf5f67f3d8d4e599d4ba61383 (diff) |
libgfchangelog: cleanups on disconnection
[WIP patch as of now, just needs a little tweak]
A pending TODO in the code caused regressions to fail as
bitrot daemons are spawned during volume start (equivalent
to enabling bitrot by default). The problematic part that
casued such failures is during brick disconnections with
unsafe handling of event data structured in the code.
With this patch, data structures are properly cleaned up
with care taken to cleanup all accessors first. This also
fixes potential memory leaks which was bluntly ignored
before.
Change-Id: I70ed82cb1a0fb56c85ef390007e321a97a35c5ce
BUG: 1170075
Signed-off-by: Venky Shankar <vshankar@redhat.com>
original-author: Venky Shankar <vshankar@redhat.com>
Reviewed-on: http://review.gluster.org/9959
Reviewed-by: Vijay Bellur <vbellur@redhat.com>
Tested-by: Vijay Bellur <vbellur@redhat.com>
Diffstat (limited to 'xlators/features/changelog/lib/src/gf-changelog-reborp.c')
-rw-r--r-- | xlators/features/changelog/lib/src/gf-changelog-reborp.c | 280 |
1 files changed, 182 insertions, 98 deletions
diff --git a/xlators/features/changelog/lib/src/gf-changelog-reborp.c b/xlators/features/changelog/lib/src/gf-changelog-reborp.c index d7e60fb9634..457d829e69b 100644 --- a/xlators/features/changelog/lib/src/gf-changelog-reborp.c +++ b/xlators/features/changelog/lib/src/gf-changelog-reborp.c @@ -21,19 +21,135 @@ struct rpcsvc_program *gf_changelog_reborp_programs[]; +void * +gf_changelog_connection_janitor (void *arg) +{ + int32_t ret = 0; + xlator_t *this = NULL; + gf_private_t *priv = NULL; + gf_changelog_t *entry = NULL; + struct gf_event *event = NULL; + struct gf_event_list *ev = NULL; + unsigned long drained = 0; + + this = arg; + THIS = this; + + priv = this->private; + + while (1) { + pthread_mutex_lock (&priv->lock); + { + while (list_empty (&priv->cleanups)) + pthread_cond_wait (&priv->cond, &priv->lock); + + entry = list_first_entry (&priv->cleanups, + gf_changelog_t, list); + list_del_init (&entry->list); + } + pthread_mutex_unlock (&priv->lock); + + drained = 0; + ev = &entry->event; + + gf_log (this->name, GF_LOG_INFO, + "Cleaning brick entry for brick %s", entry->brick); + + /* 0x0: disbale rpc-clnt */ + rpc_clnt_disable (RPC_PROBER (entry)); + + /* 0x1: cleanup callback invoker thread */ + ret = gf_cleanup_event (this, ev); + if (ret) + continue; + + /* 0x2: drain pending events */ + while (!list_empty (&ev->events)) { + event = list_first_entry (&ev->events, + struct gf_event, list); + gf_log (this->name, GF_LOG_INFO, + "Draining event [Seq: %lu, Payload: %d]", + event->seq, event->count); + + GF_FREE (event); + drained++; + } + + gf_log (this->name, GF_LOG_INFO, + "Drained %lu events", drained); + + /* 0x3: freeup brick entry */ + gf_log (this->name, GF_LOG_INFO, "freeing entry %p", entry); + LOCK_DESTROY (&entry->statelock); + GF_FREE (entry); + } + + return NULL; +} + +static inline void +__gf_changelog_set_conn_state (gf_changelog_t *entry, + gf_changelog_conn_state_t state) +{ + entry->connstate = state; +} + /** - * On a reverse connection, unlink the socket file. + * state check login to gaurd access object after free */ +static inline void +gf_changelog_check_event (gf_private_t *priv, + gf_changelog_t *entry, rpcsvc_event_t event) +{ + gf_boolean_t needfree = _gf_false; + gf_changelog_conn_state_t laststate; + /** + * need to handle couple of connection states to gaurd correct + * freeing of object. + */ + LOCK (&entry->statelock); + { + laststate = entry->connstate; + if (event == RPCSVC_EVENT_ACCEPT) { + __gf_changelog_set_conn_state + (entry, GF_CHANGELOG_CONN_STATE_ACCEPTED); + + if (laststate == GF_CHANGELOG_CONN_STATE_DISCONNECTED) + needfree = _gf_true; + } + + if (event == RPCSVC_EVENT_DISCONNECT) { + __gf_changelog_set_conn_state + (entry, GF_CHANGELOG_CONN_STATE_DISCONNECTED); + + if (laststate == GF_CHANGELOG_CONN_STATE_ACCEPTED) + needfree = _gf_true; + } + } + UNLOCK (&entry->statelock); + + /** + * TODO: + * Handle the race between ACCEPT and DISCONNECT in the + * reconnect code. So purging of entry is deliberately + * avoided here. It will be handled in the reconnect code. + */ +} + int gf_changelog_reborp_rpcsvc_notify (rpcsvc_t *rpc, void *mydata, rpcsvc_event_t event, void *data) { - int ret = 0; + int ret = 0; xlator_t *this = NULL; gf_private_t *priv = NULL; gf_changelog_t *entry = NULL; char sock[UNIX_PATH_MAX] = {0,}; + if (!(event == RPCSVC_EVENT_ACCEPT || + event == RPCSVC_EVENT_DISCONNECT)) + return 0; + entry = mydata; this = entry->this; priv = this->private; @@ -42,29 +158,23 @@ gf_changelog_reborp_rpcsvc_notify (rpcsvc_t *rpc, void *mydata, case RPCSVC_EVENT_ACCEPT: ret = unlink (RPC_SOCK(entry)); if (ret != 0) - gf_log (this->name, GF_LOG_WARNING, "failed to unlink" - " reverse socket file %s", RPC_SOCK (entry)); + gf_log (this->name, GF_LOG_WARNING, "failed to unlink " + "reverse socket %s", RPC_SOCK (entry)); if (entry->connected) GF_CHANGELOG_INVOKE_CBK (this, entry->connected, entry->brick, entry->ptr); break; case RPCSVC_EVENT_DISCONNECT: - LOCK (&priv->lock); - { - list_del (&entry->list); - } - UNLOCK (&priv->lock); - if (entry->disconnected) GF_CHANGELOG_INVOKE_CBK (this, entry->disconnected, entry->brick, entry->ptr); - - GF_FREE (entry); - break; + /* passthrough */ default: break; } + /* gf_changelog_check_event (priv, entry, event); */ + return 0; } @@ -140,12 +250,29 @@ __can_process_event (struct gf_event_list *ev, struct gf_event **event) return 0; } -inline void -__process_event_list (struct gf_event_list *ev, struct gf_event **event) +void +pick_event_ordered (struct gf_event_list *ev, struct gf_event **event) { - while (list_empty (&ev->events) - || !__can_process_event (ev, event)) - pthread_cond_wait (&ev->cond, &ev->lock); + pthread_mutex_lock (&ev->lock); + { + while (list_empty (&ev->events) + || !__can_process_event (ev, event)) + pthread_cond_wait (&ev->cond, &ev->lock); + } + pthread_mutex_unlock (&ev->lock); +} + +void +pick_event_unordered (struct gf_event_list *ev, struct gf_event **event) +{ + pthread_mutex_lock (&ev->lock); + { + while (list_empty (&ev->events)) + pthread_cond_wait (&ev->cond, &ev->lock); + *event = list_first_entry (&ev->events, struct gf_event, list); + list_del (&(*event)->list); + } + pthread_mutex_unlock (&ev->lock); } void * @@ -160,14 +287,10 @@ gf_changelog_callback_invoker (void *arg) ev = arg; entry = ev->entry; - this = entry->this; + THIS = this = entry->this; while (1) { - pthread_mutex_lock (&ev->lock); - { - __process_event_list (ev, &event); - } - pthread_mutex_unlock (&ev->lock); + entry->pickevent (ev, &event); vec = (struct iovec *) &event->iov; gf_changelog_invoke_callback (entry, &vec, event->count); @@ -192,9 +315,36 @@ orderfn (struct list_head *pos1, struct list_head *pos2) return -1; } +void +queue_ordered_event (struct gf_event_list *ev, struct gf_event *event) +{ + /* add event to the ordered event list and wake up listner(s) */ + pthread_mutex_lock (&ev->lock); + { + list_add_order (&event->list, &ev->events, orderfn); + if (!ev->next_seq) + ev->next_seq = event->seq; + if (ev->next_seq == event->seq) + pthread_cond_signal (&ev->cond); + } + pthread_mutex_unlock (&ev->lock); +} + +void +queue_unordered_event (struct gf_event_list *ev, struct gf_event *event) +{ + /* add event to the tail of the queue and wake up listener(s) */ + pthread_mutex_lock (&ev->lock); + { + list_add_tail (&event->list, &ev->events); + pthread_cond_signal (&ev->cond); + } + pthread_mutex_unlock (&ev->lock); +} + int -gf_changelog_ordered_event_handler (rpcsvc_request_t *req, - xlator_t *this, gf_changelog_t *entry) +gf_changelog_event_handler (rpcsvc_request_t *req, + xlator_t *this, gf_changelog_t *entry) { int i = 0; size_t payloadlen = 0; @@ -255,16 +405,8 @@ gf_changelog_ordered_event_handler (rpcsvc_request_t *req, rpc_req.seq, entry->brick, rpc_req.tv_sec, rpc_req.tv_usec, payloadcnt, payloadlen); - /* add it to the ordered event list and wake up listner(s) */ - pthread_mutex_lock (&ev->lock); - { - list_add_order (&event->list, &ev->events, orderfn); - if (!ev->next_seq) - ev->next_seq = event->seq; - if (ev->next_seq == event->seq) - pthread_cond_signal (&ev->cond); - } - pthread_mutex_unlock (&ev->lock); + /* dispatch event */ + entry->queueevent (ev, event); /* ack sequence number */ rpc_rsp.op_ret = 0; @@ -281,76 +423,18 @@ gf_changelog_ordered_event_handler (rpcsvc_request_t *req, } int -gf_changelog_unordered_event_handler (rpcsvc_request_t *req, - xlator_t *this, gf_changelog_t *entry) -{ - int i = 0; - int ret = 0; - ssize_t len = 0; - int payloadcnt = 0; - struct iovec vector[MAX_IOVEC] = {{0,}, }; - changelog_event_req rpc_req = {0,}; - changelog_event_rsp rpc_rsp = {0,}; - - len = xdr_to_generic (req->msg[0], - &rpc_req, (xdrproc_t)xdr_changelog_event_req); - if (len < 0) { - gf_log (this->name, GF_LOG_ERROR, "xdr decoding failed"); - req->rpc_err = GARBAGE_ARGS; - goto handle_xdr_error; - } - - /* prepare payload */ - if (len < req->msg[0].iov_len) { - payloadcnt = 1; - vector[0].iov_base = (req->msg[0].iov_base + len); - vector[0].iov_len = (req->msg[0].iov_len - len); - } - - for (i = 1; i < req->count; i++) { - vector[payloadcnt++] = req->msg[i]; - } - - gf_log (this->name, GF_LOG_DEBUG, - "seq: %lu (time: %lu.%lu), (vec: %d)", - rpc_req.seq, rpc_req.tv_sec, rpc_req.tv_usec, payloadcnt); - - /* invoke callback */ - struct iovec *vec = (struct iovec *) &vector; - gf_changelog_invoke_callback (entry, &vec, payloadcnt); - - /* ack sequence number */ - rpc_rsp.op_ret = 0; - rpc_rsp.seq = rpc_req.seq; - - goto submit_rpc; - - handle_xdr_error: - rpc_rsp.op_ret = -1; - rpc_rsp.seq = 0; /* invalid */ - submit_rpc: - return changelog_rpc_sumbit_reply (req, &rpc_rsp, NULL, 0, NULL, - (xdrproc_t)xdr_changelog_event_rsp); -} - -int gf_changelog_reborp_handle_event (rpcsvc_request_t *req) { - int ret = 0; - xlator_t *this = NULL; - rpcsvc_t *svc = NULL; - gf_changelog_t *entry = NULL; + xlator_t *this = NULL; + rpcsvc_t *svc = NULL; + gf_changelog_t *entry = NULL; svc = rpcsvc_request_service (req); entry = svc->mydata; this = THIS = entry->this; - ret = GF_NEED_ORDERED_EVENTS (entry) - ? gf_changelog_ordered_event_handler (req, this, entry) - : gf_changelog_unordered_event_handler (req, this, entry); - - return ret; + return gf_changelog_event_handler (req, this, entry); } rpcsvc_actor_t gf_changelog_reborp_actors[CHANGELOG_REV_PROC_MAX] = { |