summaryrefslogtreecommitdiffstats
path: root/xlators/features/changelog/lib/src/gf-changelog-reborp.c
diff options
context:
space:
mode:
authorVenky Shankar <vshankar@redhat.com>2015-03-21 19:59:45 +0530
committerVijay Bellur <vbellur@redhat.com>2015-03-24 06:32:39 -0700
commit470d5c4af7f85809997a77af187c8f430b0354c8 (patch)
tree38ff73b3ceb39d55b883def3a241f7755d939d53 /xlators/features/changelog/lib/src/gf-changelog-reborp.c
parent9c9f3f368c693b1cf5f67f3d8d4e599d4ba61383 (diff)
libgfchangelog: cleanups on disconnection
[WIP patch as of now, just needs a little tweak] A pending TODO in the code caused regressions to fail as bitrot daemons are spawned during volume start (equivalent to enabling bitrot by default). The problematic part that casued such failures is during brick disconnections with unsafe handling of event data structured in the code. With this patch, data structures are properly cleaned up with care taken to cleanup all accessors first. This also fixes potential memory leaks which was bluntly ignored before. Change-Id: I70ed82cb1a0fb56c85ef390007e321a97a35c5ce BUG: 1170075 Signed-off-by: Venky Shankar <vshankar@redhat.com> original-author: Venky Shankar <vshankar@redhat.com> Reviewed-on: http://review.gluster.org/9959 Reviewed-by: Vijay Bellur <vbellur@redhat.com> Tested-by: Vijay Bellur <vbellur@redhat.com>
Diffstat (limited to 'xlators/features/changelog/lib/src/gf-changelog-reborp.c')
-rw-r--r--xlators/features/changelog/lib/src/gf-changelog-reborp.c280
1 files changed, 182 insertions, 98 deletions
diff --git a/xlators/features/changelog/lib/src/gf-changelog-reborp.c b/xlators/features/changelog/lib/src/gf-changelog-reborp.c
index d7e60fb9634..457d829e69b 100644
--- a/xlators/features/changelog/lib/src/gf-changelog-reborp.c
+++ b/xlators/features/changelog/lib/src/gf-changelog-reborp.c
@@ -21,19 +21,135 @@
struct rpcsvc_program *gf_changelog_reborp_programs[];
+void *
+gf_changelog_connection_janitor (void *arg)
+{
+ int32_t ret = 0;
+ xlator_t *this = NULL;
+ gf_private_t *priv = NULL;
+ gf_changelog_t *entry = NULL;
+ struct gf_event *event = NULL;
+ struct gf_event_list *ev = NULL;
+ unsigned long drained = 0;
+
+ this = arg;
+ THIS = this;
+
+ priv = this->private;
+
+ while (1) {
+ pthread_mutex_lock (&priv->lock);
+ {
+ while (list_empty (&priv->cleanups))
+ pthread_cond_wait (&priv->cond, &priv->lock);
+
+ entry = list_first_entry (&priv->cleanups,
+ gf_changelog_t, list);
+ list_del_init (&entry->list);
+ }
+ pthread_mutex_unlock (&priv->lock);
+
+ drained = 0;
+ ev = &entry->event;
+
+ gf_log (this->name, GF_LOG_INFO,
+ "Cleaning brick entry for brick %s", entry->brick);
+
+ /* 0x0: disbale rpc-clnt */
+ rpc_clnt_disable (RPC_PROBER (entry));
+
+ /* 0x1: cleanup callback invoker thread */
+ ret = gf_cleanup_event (this, ev);
+ if (ret)
+ continue;
+
+ /* 0x2: drain pending events */
+ while (!list_empty (&ev->events)) {
+ event = list_first_entry (&ev->events,
+ struct gf_event, list);
+ gf_log (this->name, GF_LOG_INFO,
+ "Draining event [Seq: %lu, Payload: %d]",
+ event->seq, event->count);
+
+ GF_FREE (event);
+ drained++;
+ }
+
+ gf_log (this->name, GF_LOG_INFO,
+ "Drained %lu events", drained);
+
+ /* 0x3: freeup brick entry */
+ gf_log (this->name, GF_LOG_INFO, "freeing entry %p", entry);
+ LOCK_DESTROY (&entry->statelock);
+ GF_FREE (entry);
+ }
+
+ return NULL;
+}
+
+static inline void
+__gf_changelog_set_conn_state (gf_changelog_t *entry,
+ gf_changelog_conn_state_t state)
+{
+ entry->connstate = state;
+}
+
/**
- * On a reverse connection, unlink the socket file.
+ * state check login to gaurd access object after free
*/
+static inline void
+gf_changelog_check_event (gf_private_t *priv,
+ gf_changelog_t *entry, rpcsvc_event_t event)
+{
+ gf_boolean_t needfree = _gf_false;
+ gf_changelog_conn_state_t laststate;
+ /**
+ * need to handle couple of connection states to gaurd correct
+ * freeing of object.
+ */
+ LOCK (&entry->statelock);
+ {
+ laststate = entry->connstate;
+ if (event == RPCSVC_EVENT_ACCEPT) {
+ __gf_changelog_set_conn_state
+ (entry, GF_CHANGELOG_CONN_STATE_ACCEPTED);
+
+ if (laststate == GF_CHANGELOG_CONN_STATE_DISCONNECTED)
+ needfree = _gf_true;
+ }
+
+ if (event == RPCSVC_EVENT_DISCONNECT) {
+ __gf_changelog_set_conn_state
+ (entry, GF_CHANGELOG_CONN_STATE_DISCONNECTED);
+
+ if (laststate == GF_CHANGELOG_CONN_STATE_ACCEPTED)
+ needfree = _gf_true;
+ }
+ }
+ UNLOCK (&entry->statelock);
+
+ /**
+ * TODO:
+ * Handle the race between ACCEPT and DISCONNECT in the
+ * reconnect code. So purging of entry is deliberately
+ * avoided here. It will be handled in the reconnect code.
+ */
+}
+
int
gf_changelog_reborp_rpcsvc_notify (rpcsvc_t *rpc, void *mydata,
rpcsvc_event_t event, void *data)
{
- int ret = 0;
+ int ret = 0;
xlator_t *this = NULL;
gf_private_t *priv = NULL;
gf_changelog_t *entry = NULL;
char sock[UNIX_PATH_MAX] = {0,};
+ if (!(event == RPCSVC_EVENT_ACCEPT ||
+ event == RPCSVC_EVENT_DISCONNECT))
+ return 0;
+
entry = mydata;
this = entry->this;
priv = this->private;
@@ -42,29 +158,23 @@ gf_changelog_reborp_rpcsvc_notify (rpcsvc_t *rpc, void *mydata,
case RPCSVC_EVENT_ACCEPT:
ret = unlink (RPC_SOCK(entry));
if (ret != 0)
- gf_log (this->name, GF_LOG_WARNING, "failed to unlink"
- " reverse socket file %s", RPC_SOCK (entry));
+ gf_log (this->name, GF_LOG_WARNING, "failed to unlink "
+ "reverse socket %s", RPC_SOCK (entry));
if (entry->connected)
GF_CHANGELOG_INVOKE_CBK (this, entry->connected,
entry->brick, entry->ptr);
break;
case RPCSVC_EVENT_DISCONNECT:
- LOCK (&priv->lock);
- {
- list_del (&entry->list);
- }
- UNLOCK (&priv->lock);
-
if (entry->disconnected)
GF_CHANGELOG_INVOKE_CBK (this, entry->disconnected,
entry->brick, entry->ptr);
-
- GF_FREE (entry);
- break;
+ /* passthrough */
default:
break;
}
+ /* gf_changelog_check_event (priv, entry, event); */
+
return 0;
}
@@ -140,12 +250,29 @@ __can_process_event (struct gf_event_list *ev, struct gf_event **event)
return 0;
}
-inline void
-__process_event_list (struct gf_event_list *ev, struct gf_event **event)
+void
+pick_event_ordered (struct gf_event_list *ev, struct gf_event **event)
{
- while (list_empty (&ev->events)
- || !__can_process_event (ev, event))
- pthread_cond_wait (&ev->cond, &ev->lock);
+ pthread_mutex_lock (&ev->lock);
+ {
+ while (list_empty (&ev->events)
+ || !__can_process_event (ev, event))
+ pthread_cond_wait (&ev->cond, &ev->lock);
+ }
+ pthread_mutex_unlock (&ev->lock);
+}
+
+void
+pick_event_unordered (struct gf_event_list *ev, struct gf_event **event)
+{
+ pthread_mutex_lock (&ev->lock);
+ {
+ while (list_empty (&ev->events))
+ pthread_cond_wait (&ev->cond, &ev->lock);
+ *event = list_first_entry (&ev->events, struct gf_event, list);
+ list_del (&(*event)->list);
+ }
+ pthread_mutex_unlock (&ev->lock);
}
void *
@@ -160,14 +287,10 @@ gf_changelog_callback_invoker (void *arg)
ev = arg;
entry = ev->entry;
- this = entry->this;
+ THIS = this = entry->this;
while (1) {
- pthread_mutex_lock (&ev->lock);
- {
- __process_event_list (ev, &event);
- }
- pthread_mutex_unlock (&ev->lock);
+ entry->pickevent (ev, &event);
vec = (struct iovec *) &event->iov;
gf_changelog_invoke_callback (entry, &vec, event->count);
@@ -192,9 +315,36 @@ orderfn (struct list_head *pos1, struct list_head *pos2)
return -1;
}
+void
+queue_ordered_event (struct gf_event_list *ev, struct gf_event *event)
+{
+ /* add event to the ordered event list and wake up listner(s) */
+ pthread_mutex_lock (&ev->lock);
+ {
+ list_add_order (&event->list, &ev->events, orderfn);
+ if (!ev->next_seq)
+ ev->next_seq = event->seq;
+ if (ev->next_seq == event->seq)
+ pthread_cond_signal (&ev->cond);
+ }
+ pthread_mutex_unlock (&ev->lock);
+}
+
+void
+queue_unordered_event (struct gf_event_list *ev, struct gf_event *event)
+{
+ /* add event to the tail of the queue and wake up listener(s) */
+ pthread_mutex_lock (&ev->lock);
+ {
+ list_add_tail (&event->list, &ev->events);
+ pthread_cond_signal (&ev->cond);
+ }
+ pthread_mutex_unlock (&ev->lock);
+}
+
int
-gf_changelog_ordered_event_handler (rpcsvc_request_t *req,
- xlator_t *this, gf_changelog_t *entry)
+gf_changelog_event_handler (rpcsvc_request_t *req,
+ xlator_t *this, gf_changelog_t *entry)
{
int i = 0;
size_t payloadlen = 0;
@@ -255,16 +405,8 @@ gf_changelog_ordered_event_handler (rpcsvc_request_t *req,
rpc_req.seq, entry->brick, rpc_req.tv_sec,
rpc_req.tv_usec, payloadcnt, payloadlen);
- /* add it to the ordered event list and wake up listner(s) */
- pthread_mutex_lock (&ev->lock);
- {
- list_add_order (&event->list, &ev->events, orderfn);
- if (!ev->next_seq)
- ev->next_seq = event->seq;
- if (ev->next_seq == event->seq)
- pthread_cond_signal (&ev->cond);
- }
- pthread_mutex_unlock (&ev->lock);
+ /* dispatch event */
+ entry->queueevent (ev, event);
/* ack sequence number */
rpc_rsp.op_ret = 0;
@@ -281,76 +423,18 @@ gf_changelog_ordered_event_handler (rpcsvc_request_t *req,
}
int
-gf_changelog_unordered_event_handler (rpcsvc_request_t *req,
- xlator_t *this, gf_changelog_t *entry)
-{
- int i = 0;
- int ret = 0;
- ssize_t len = 0;
- int payloadcnt = 0;
- struct iovec vector[MAX_IOVEC] = {{0,}, };
- changelog_event_req rpc_req = {0,};
- changelog_event_rsp rpc_rsp = {0,};
-
- len = xdr_to_generic (req->msg[0],
- &rpc_req, (xdrproc_t)xdr_changelog_event_req);
- if (len < 0) {
- gf_log (this->name, GF_LOG_ERROR, "xdr decoding failed");
- req->rpc_err = GARBAGE_ARGS;
- goto handle_xdr_error;
- }
-
- /* prepare payload */
- if (len < req->msg[0].iov_len) {
- payloadcnt = 1;
- vector[0].iov_base = (req->msg[0].iov_base + len);
- vector[0].iov_len = (req->msg[0].iov_len - len);
- }
-
- for (i = 1; i < req->count; i++) {
- vector[payloadcnt++] = req->msg[i];
- }
-
- gf_log (this->name, GF_LOG_DEBUG,
- "seq: %lu (time: %lu.%lu), (vec: %d)",
- rpc_req.seq, rpc_req.tv_sec, rpc_req.tv_usec, payloadcnt);
-
- /* invoke callback */
- struct iovec *vec = (struct iovec *) &vector;
- gf_changelog_invoke_callback (entry, &vec, payloadcnt);
-
- /* ack sequence number */
- rpc_rsp.op_ret = 0;
- rpc_rsp.seq = rpc_req.seq;
-
- goto submit_rpc;
-
- handle_xdr_error:
- rpc_rsp.op_ret = -1;
- rpc_rsp.seq = 0; /* invalid */
- submit_rpc:
- return changelog_rpc_sumbit_reply (req, &rpc_rsp, NULL, 0, NULL,
- (xdrproc_t)xdr_changelog_event_rsp);
-}
-
-int
gf_changelog_reborp_handle_event (rpcsvc_request_t *req)
{
- int ret = 0;
- xlator_t *this = NULL;
- rpcsvc_t *svc = NULL;
- gf_changelog_t *entry = NULL;
+ xlator_t *this = NULL;
+ rpcsvc_t *svc = NULL;
+ gf_changelog_t *entry = NULL;
svc = rpcsvc_request_service (req);
entry = svc->mydata;
this = THIS = entry->this;
- ret = GF_NEED_ORDERED_EVENTS (entry)
- ? gf_changelog_ordered_event_handler (req, this, entry)
- : gf_changelog_unordered_event_handler (req, this, entry);
-
- return ret;
+ return gf_changelog_event_handler (req, this, entry);
}
rpcsvc_actor_t gf_changelog_reborp_actors[CHANGELOG_REV_PROC_MAX] = {