diff options
Diffstat (limited to 'xlators/features/changelog/lib/src/gf-changelog-reborp.c')
-rw-r--r-- | xlators/features/changelog/lib/src/gf-changelog-reborp.c | 280 |
1 files changed, 182 insertions, 98 deletions
diff --git a/xlators/features/changelog/lib/src/gf-changelog-reborp.c b/xlators/features/changelog/lib/src/gf-changelog-reborp.c index d7e60fb9634..457d829e69b 100644 --- a/xlators/features/changelog/lib/src/gf-changelog-reborp.c +++ b/xlators/features/changelog/lib/src/gf-changelog-reborp.c @@ -21,19 +21,135 @@ struct rpcsvc_program *gf_changelog_reborp_programs[]; +void * +gf_changelog_connection_janitor (void *arg) +{ + int32_t ret = 0; + xlator_t *this = NULL; + gf_private_t *priv = NULL; + gf_changelog_t *entry = NULL; + struct gf_event *event = NULL; + struct gf_event_list *ev = NULL; + unsigned long drained = 0; + + this = arg; + THIS = this; + + priv = this->private; + + while (1) { + pthread_mutex_lock (&priv->lock); + { + while (list_empty (&priv->cleanups)) + pthread_cond_wait (&priv->cond, &priv->lock); + + entry = list_first_entry (&priv->cleanups, + gf_changelog_t, list); + list_del_init (&entry->list); + } + pthread_mutex_unlock (&priv->lock); + + drained = 0; + ev = &entry->event; + + gf_log (this->name, GF_LOG_INFO, + "Cleaning brick entry for brick %s", entry->brick); + + /* 0x0: disbale rpc-clnt */ + rpc_clnt_disable (RPC_PROBER (entry)); + + /* 0x1: cleanup callback invoker thread */ + ret = gf_cleanup_event (this, ev); + if (ret) + continue; + + /* 0x2: drain pending events */ + while (!list_empty (&ev->events)) { + event = list_first_entry (&ev->events, + struct gf_event, list); + gf_log (this->name, GF_LOG_INFO, + "Draining event [Seq: %lu, Payload: %d]", + event->seq, event->count); + + GF_FREE (event); + drained++; + } + + gf_log (this->name, GF_LOG_INFO, + "Drained %lu events", drained); + + /* 0x3: freeup brick entry */ + gf_log (this->name, GF_LOG_INFO, "freeing entry %p", entry); + LOCK_DESTROY (&entry->statelock); + GF_FREE (entry); + } + + return NULL; +} + +static inline void +__gf_changelog_set_conn_state (gf_changelog_t *entry, + gf_changelog_conn_state_t state) +{ + entry->connstate = state; +} + /** - * On a reverse connection, unlink the socket file. + * state check login to gaurd access object after free */ +static inline void +gf_changelog_check_event (gf_private_t *priv, + gf_changelog_t *entry, rpcsvc_event_t event) +{ + gf_boolean_t needfree = _gf_false; + gf_changelog_conn_state_t laststate; + /** + * need to handle couple of connection states to gaurd correct + * freeing of object. + */ + LOCK (&entry->statelock); + { + laststate = entry->connstate; + if (event == RPCSVC_EVENT_ACCEPT) { + __gf_changelog_set_conn_state + (entry, GF_CHANGELOG_CONN_STATE_ACCEPTED); + + if (laststate == GF_CHANGELOG_CONN_STATE_DISCONNECTED) + needfree = _gf_true; + } + + if (event == RPCSVC_EVENT_DISCONNECT) { + __gf_changelog_set_conn_state + (entry, GF_CHANGELOG_CONN_STATE_DISCONNECTED); + + if (laststate == GF_CHANGELOG_CONN_STATE_ACCEPTED) + needfree = _gf_true; + } + } + UNLOCK (&entry->statelock); + + /** + * TODO: + * Handle the race between ACCEPT and DISCONNECT in the + * reconnect code. So purging of entry is deliberately + * avoided here. It will be handled in the reconnect code. + */ +} + int gf_changelog_reborp_rpcsvc_notify (rpcsvc_t *rpc, void *mydata, rpcsvc_event_t event, void *data) { - int ret = 0; + int ret = 0; xlator_t *this = NULL; gf_private_t *priv = NULL; gf_changelog_t *entry = NULL; char sock[UNIX_PATH_MAX] = {0,}; + if (!(event == RPCSVC_EVENT_ACCEPT || + event == RPCSVC_EVENT_DISCONNECT)) + return 0; + entry = mydata; this = entry->this; priv = this->private; @@ -42,29 +158,23 @@ gf_changelog_reborp_rpcsvc_notify (rpcsvc_t *rpc, void *mydata, case RPCSVC_EVENT_ACCEPT: ret = unlink (RPC_SOCK(entry)); if (ret != 0) - gf_log (this->name, GF_LOG_WARNING, "failed to unlink" - " reverse socket file %s", RPC_SOCK (entry)); + gf_log (this->name, GF_LOG_WARNING, "failed to unlink " + "reverse socket %s", RPC_SOCK (entry)); if (entry->connected) GF_CHANGELOG_INVOKE_CBK (this, entry->connected, entry->brick, entry->ptr); break; case RPCSVC_EVENT_DISCONNECT: - LOCK (&priv->lock); - { - list_del (&entry->list); - } - UNLOCK (&priv->lock); - if (entry->disconnected) GF_CHANGELOG_INVOKE_CBK (this, entry->disconnected, entry->brick, entry->ptr); - - GF_FREE (entry); - break; + /* passthrough */ default: break; } + /* gf_changelog_check_event (priv, entry, event); */ + return 0; } @@ -140,12 +250,29 @@ __can_process_event (struct gf_event_list *ev, struct gf_event **event) return 0; } -inline void -__process_event_list (struct gf_event_list *ev, struct gf_event **event) +void +pick_event_ordered (struct gf_event_list *ev, struct gf_event **event) { - while (list_empty (&ev->events) - || !__can_process_event (ev, event)) - pthread_cond_wait (&ev->cond, &ev->lock); + pthread_mutex_lock (&ev->lock); + { + while (list_empty (&ev->events) + || !__can_process_event (ev, event)) + pthread_cond_wait (&ev->cond, &ev->lock); + } + pthread_mutex_unlock (&ev->lock); +} + +void +pick_event_unordered (struct gf_event_list *ev, struct gf_event **event) +{ + pthread_mutex_lock (&ev->lock); + { + while (list_empty (&ev->events)) + pthread_cond_wait (&ev->cond, &ev->lock); + *event = list_first_entry (&ev->events, struct gf_event, list); + list_del (&(*event)->list); + } + pthread_mutex_unlock (&ev->lock); } void * @@ -160,14 +287,10 @@ gf_changelog_callback_invoker (void *arg) ev = arg; entry = ev->entry; - this = entry->this; + THIS = this = entry->this; while (1) { - pthread_mutex_lock (&ev->lock); - { - __process_event_list (ev, &event); - } - pthread_mutex_unlock (&ev->lock); + entry->pickevent (ev, &event); vec = (struct iovec *) &event->iov; gf_changelog_invoke_callback (entry, &vec, event->count); @@ -192,9 +315,36 @@ orderfn (struct list_head *pos1, struct list_head *pos2) return -1; } +void +queue_ordered_event (struct gf_event_list *ev, struct gf_event *event) +{ + /* add event to the ordered event list and wake up listner(s) */ + pthread_mutex_lock (&ev->lock); + { + list_add_order (&event->list, &ev->events, orderfn); + if (!ev->next_seq) + ev->next_seq = event->seq; + if (ev->next_seq == event->seq) + pthread_cond_signal (&ev->cond); + } + pthread_mutex_unlock (&ev->lock); +} + +void +queue_unordered_event (struct gf_event_list *ev, struct gf_event *event) +{ + /* add event to the tail of the queue and wake up listener(s) */ + pthread_mutex_lock (&ev->lock); + { + list_add_tail (&event->list, &ev->events); + pthread_cond_signal (&ev->cond); + } + pthread_mutex_unlock (&ev->lock); +} + int -gf_changelog_ordered_event_handler (rpcsvc_request_t *req, - xlator_t *this, gf_changelog_t *entry) +gf_changelog_event_handler (rpcsvc_request_t *req, + xlator_t *this, gf_changelog_t *entry) { int i = 0; size_t payloadlen = 0; @@ -255,16 +405,8 @@ gf_changelog_ordered_event_handler (rpcsvc_request_t *req, rpc_req.seq, entry->brick, rpc_req.tv_sec, rpc_req.tv_usec, payloadcnt, payloadlen); - /* add it to the ordered event list and wake up listner(s) */ - pthread_mutex_lock (&ev->lock); - { - list_add_order (&event->list, &ev->events, orderfn); - if (!ev->next_seq) - ev->next_seq = event->seq; - if (ev->next_seq == event->seq) - pthread_cond_signal (&ev->cond); - } - pthread_mutex_unlock (&ev->lock); + /* dispatch event */ + entry->queueevent (ev, event); /* ack sequence number */ rpc_rsp.op_ret = 0; @@ -281,76 +423,18 @@ gf_changelog_ordered_event_handler (rpcsvc_request_t *req, } int -gf_changelog_unordered_event_handler (rpcsvc_request_t *req, - xlator_t *this, gf_changelog_t *entry) -{ - int i = 0; - int ret = 0; - ssize_t len = 0; - int payloadcnt = 0; - struct iovec vector[MAX_IOVEC] = {{0,}, }; - changelog_event_req rpc_req = {0,}; - changelog_event_rsp rpc_rsp = {0,}; - - len = xdr_to_generic (req->msg[0], - &rpc_req, (xdrproc_t)xdr_changelog_event_req); - if (len < 0) { - gf_log (this->name, GF_LOG_ERROR, "xdr decoding failed"); - req->rpc_err = GARBAGE_ARGS; - goto handle_xdr_error; - } - - /* prepare payload */ - if (len < req->msg[0].iov_len) { - payloadcnt = 1; - vector[0].iov_base = (req->msg[0].iov_base + len); - vector[0].iov_len = (req->msg[0].iov_len - len); - } - - for (i = 1; i < req->count; i++) { - vector[payloadcnt++] = req->msg[i]; - } - - gf_log (this->name, GF_LOG_DEBUG, - "seq: %lu (time: %lu.%lu), (vec: %d)", - rpc_req.seq, rpc_req.tv_sec, rpc_req.tv_usec, payloadcnt); - - /* invoke callback */ - struct iovec *vec = (struct iovec *) &vector; - gf_changelog_invoke_callback (entry, &vec, payloadcnt); - - /* ack sequence number */ - rpc_rsp.op_ret = 0; - rpc_rsp.seq = rpc_req.seq; - - goto submit_rpc; - - handle_xdr_error: - rpc_rsp.op_ret = -1; - rpc_rsp.seq = 0; /* invalid */ - submit_rpc: - return changelog_rpc_sumbit_reply (req, &rpc_rsp, NULL, 0, NULL, - (xdrproc_t)xdr_changelog_event_rsp); -} - -int gf_changelog_reborp_handle_event (rpcsvc_request_t *req) { - int ret = 0; - xlator_t *this = NULL; - rpcsvc_t *svc = NULL; - gf_changelog_t *entry = NULL; + xlator_t *this = NULL; + rpcsvc_t *svc = NULL; + gf_changelog_t *entry = NULL; svc = rpcsvc_request_service (req); entry = svc->mydata; this = THIS = entry->this; - ret = GF_NEED_ORDERED_EVENTS (entry) - ? gf_changelog_ordered_event_handler (req, this, entry) - : gf_changelog_unordered_event_handler (req, this, entry); - - return ret; + return gf_changelog_event_handler (req, this, entry); } rpcsvc_actor_t gf_changelog_reborp_actors[CHANGELOG_REV_PROC_MAX] = { |