summaryrefslogtreecommitdiffstats
path: root/xlators/features/changelog/lib/src/gf-changelog-reborp.c
diff options
context:
space:
mode:
Diffstat (limited to 'xlators/features/changelog/lib/src/gf-changelog-reborp.c')
-rw-r--r--xlators/features/changelog/lib/src/gf-changelog-reborp.c280
1 files changed, 182 insertions, 98 deletions
diff --git a/xlators/features/changelog/lib/src/gf-changelog-reborp.c b/xlators/features/changelog/lib/src/gf-changelog-reborp.c
index d7e60fb9634..457d829e69b 100644
--- a/xlators/features/changelog/lib/src/gf-changelog-reborp.c
+++ b/xlators/features/changelog/lib/src/gf-changelog-reborp.c
@@ -21,19 +21,135 @@
struct rpcsvc_program *gf_changelog_reborp_programs[];
+void *
+gf_changelog_connection_janitor (void *arg)
+{
+ int32_t ret = 0;
+ xlator_t *this = NULL;
+ gf_private_t *priv = NULL;
+ gf_changelog_t *entry = NULL;
+ struct gf_event *event = NULL;
+ struct gf_event_list *ev = NULL;
+ unsigned long drained = 0;
+
+ this = arg;
+ THIS = this;
+
+ priv = this->private;
+
+ while (1) {
+ pthread_mutex_lock (&priv->lock);
+ {
+ while (list_empty (&priv->cleanups))
+ pthread_cond_wait (&priv->cond, &priv->lock);
+
+ entry = list_first_entry (&priv->cleanups,
+ gf_changelog_t, list);
+ list_del_init (&entry->list);
+ }
+ pthread_mutex_unlock (&priv->lock);
+
+ drained = 0;
+ ev = &entry->event;
+
+ gf_log (this->name, GF_LOG_INFO,
+ "Cleaning brick entry for brick %s", entry->brick);
+
+ /* 0x0: disbale rpc-clnt */
+ rpc_clnt_disable (RPC_PROBER (entry));
+
+ /* 0x1: cleanup callback invoker thread */
+ ret = gf_cleanup_event (this, ev);
+ if (ret)
+ continue;
+
+ /* 0x2: drain pending events */
+ while (!list_empty (&ev->events)) {
+ event = list_first_entry (&ev->events,
+ struct gf_event, list);
+ gf_log (this->name, GF_LOG_INFO,
+ "Draining event [Seq: %lu, Payload: %d]",
+ event->seq, event->count);
+
+ GF_FREE (event);
+ drained++;
+ }
+
+ gf_log (this->name, GF_LOG_INFO,
+ "Drained %lu events", drained);
+
+ /* 0x3: freeup brick entry */
+ gf_log (this->name, GF_LOG_INFO, "freeing entry %p", entry);
+ LOCK_DESTROY (&entry->statelock);
+ GF_FREE (entry);
+ }
+
+ return NULL;
+}
+
+static inline void
+__gf_changelog_set_conn_state (gf_changelog_t *entry,
+ gf_changelog_conn_state_t state)
+{
+ entry->connstate = state;
+}
+
/**
- * On a reverse connection, unlink the socket file.
+ * state check login to gaurd access object after free
*/
+static inline void
+gf_changelog_check_event (gf_private_t *priv,
+ gf_changelog_t *entry, rpcsvc_event_t event)
+{
+ gf_boolean_t needfree = _gf_false;
+ gf_changelog_conn_state_t laststate;
+ /**
+ * need to handle couple of connection states to gaurd correct
+ * freeing of object.
+ */
+ LOCK (&entry->statelock);
+ {
+ laststate = entry->connstate;
+ if (event == RPCSVC_EVENT_ACCEPT) {
+ __gf_changelog_set_conn_state
+ (entry, GF_CHANGELOG_CONN_STATE_ACCEPTED);
+
+ if (laststate == GF_CHANGELOG_CONN_STATE_DISCONNECTED)
+ needfree = _gf_true;
+ }
+
+ if (event == RPCSVC_EVENT_DISCONNECT) {
+ __gf_changelog_set_conn_state
+ (entry, GF_CHANGELOG_CONN_STATE_DISCONNECTED);
+
+ if (laststate == GF_CHANGELOG_CONN_STATE_ACCEPTED)
+ needfree = _gf_true;
+ }
+ }
+ UNLOCK (&entry->statelock);
+
+ /**
+ * TODO:
+ * Handle the race between ACCEPT and DISCONNECT in the
+ * reconnect code. So purging of entry is deliberately
+ * avoided here. It will be handled in the reconnect code.
+ */
+}
+
int
gf_changelog_reborp_rpcsvc_notify (rpcsvc_t *rpc, void *mydata,
rpcsvc_event_t event, void *data)
{
- int ret = 0;
+ int ret = 0;
xlator_t *this = NULL;
gf_private_t *priv = NULL;
gf_changelog_t *entry = NULL;
char sock[UNIX_PATH_MAX] = {0,};
+ if (!(event == RPCSVC_EVENT_ACCEPT ||
+ event == RPCSVC_EVENT_DISCONNECT))
+ return 0;
+
entry = mydata;
this = entry->this;
priv = this->private;
@@ -42,29 +158,23 @@ gf_changelog_reborp_rpcsvc_notify (rpcsvc_t *rpc, void *mydata,
case RPCSVC_EVENT_ACCEPT:
ret = unlink (RPC_SOCK(entry));
if (ret != 0)
- gf_log (this->name, GF_LOG_WARNING, "failed to unlink"
- " reverse socket file %s", RPC_SOCK (entry));
+ gf_log (this->name, GF_LOG_WARNING, "failed to unlink "
+ "reverse socket %s", RPC_SOCK (entry));
if (entry->connected)
GF_CHANGELOG_INVOKE_CBK (this, entry->connected,
entry->brick, entry->ptr);
break;
case RPCSVC_EVENT_DISCONNECT:
- LOCK (&priv->lock);
- {
- list_del (&entry->list);
- }
- UNLOCK (&priv->lock);
-
if (entry->disconnected)
GF_CHANGELOG_INVOKE_CBK (this, entry->disconnected,
entry->brick, entry->ptr);
-
- GF_FREE (entry);
- break;
+ /* passthrough */
default:
break;
}
+ /* gf_changelog_check_event (priv, entry, event); */
+
return 0;
}
@@ -140,12 +250,29 @@ __can_process_event (struct gf_event_list *ev, struct gf_event **event)
return 0;
}
-inline void
-__process_event_list (struct gf_event_list *ev, struct gf_event **event)
+void
+pick_event_ordered (struct gf_event_list *ev, struct gf_event **event)
{
- while (list_empty (&ev->events)
- || !__can_process_event (ev, event))
- pthread_cond_wait (&ev->cond, &ev->lock);
+ pthread_mutex_lock (&ev->lock);
+ {
+ while (list_empty (&ev->events)
+ || !__can_process_event (ev, event))
+ pthread_cond_wait (&ev->cond, &ev->lock);
+ }
+ pthread_mutex_unlock (&ev->lock);
+}
+
+void
+pick_event_unordered (struct gf_event_list *ev, struct gf_event **event)
+{
+ pthread_mutex_lock (&ev->lock);
+ {
+ while (list_empty (&ev->events))
+ pthread_cond_wait (&ev->cond, &ev->lock);
+ *event = list_first_entry (&ev->events, struct gf_event, list);
+ list_del (&(*event)->list);
+ }
+ pthread_mutex_unlock (&ev->lock);
}
void *
@@ -160,14 +287,10 @@ gf_changelog_callback_invoker (void *arg)
ev = arg;
entry = ev->entry;
- this = entry->this;
+ THIS = this = entry->this;
while (1) {
- pthread_mutex_lock (&ev->lock);
- {
- __process_event_list (ev, &event);
- }
- pthread_mutex_unlock (&ev->lock);
+ entry->pickevent (ev, &event);
vec = (struct iovec *) &event->iov;
gf_changelog_invoke_callback (entry, &vec, event->count);
@@ -192,9 +315,36 @@ orderfn (struct list_head *pos1, struct list_head *pos2)
return -1;
}
+void
+queue_ordered_event (struct gf_event_list *ev, struct gf_event *event)
+{
+ /* add event to the ordered event list and wake up listner(s) */
+ pthread_mutex_lock (&ev->lock);
+ {
+ list_add_order (&event->list, &ev->events, orderfn);
+ if (!ev->next_seq)
+ ev->next_seq = event->seq;
+ if (ev->next_seq == event->seq)
+ pthread_cond_signal (&ev->cond);
+ }
+ pthread_mutex_unlock (&ev->lock);
+}
+
+void
+queue_unordered_event (struct gf_event_list *ev, struct gf_event *event)
+{
+ /* add event to the tail of the queue and wake up listener(s) */
+ pthread_mutex_lock (&ev->lock);
+ {
+ list_add_tail (&event->list, &ev->events);
+ pthread_cond_signal (&ev->cond);
+ }
+ pthread_mutex_unlock (&ev->lock);
+}
+
int
-gf_changelog_ordered_event_handler (rpcsvc_request_t *req,
- xlator_t *this, gf_changelog_t *entry)
+gf_changelog_event_handler (rpcsvc_request_t *req,
+ xlator_t *this, gf_changelog_t *entry)
{
int i = 0;
size_t payloadlen = 0;
@@ -255,16 +405,8 @@ gf_changelog_ordered_event_handler (rpcsvc_request_t *req,
rpc_req.seq, entry->brick, rpc_req.tv_sec,
rpc_req.tv_usec, payloadcnt, payloadlen);
- /* add it to the ordered event list and wake up listner(s) */
- pthread_mutex_lock (&ev->lock);
- {
- list_add_order (&event->list, &ev->events, orderfn);
- if (!ev->next_seq)
- ev->next_seq = event->seq;
- if (ev->next_seq == event->seq)
- pthread_cond_signal (&ev->cond);
- }
- pthread_mutex_unlock (&ev->lock);
+ /* dispatch event */
+ entry->queueevent (ev, event);
/* ack sequence number */
rpc_rsp.op_ret = 0;
@@ -281,76 +423,18 @@ gf_changelog_ordered_event_handler (rpcsvc_request_t *req,
}
int
-gf_changelog_unordered_event_handler (rpcsvc_request_t *req,
- xlator_t *this, gf_changelog_t *entry)
-{
- int i = 0;
- int ret = 0;
- ssize_t len = 0;
- int payloadcnt = 0;
- struct iovec vector[MAX_IOVEC] = {{0,}, };
- changelog_event_req rpc_req = {0,};
- changelog_event_rsp rpc_rsp = {0,};
-
- len = xdr_to_generic (req->msg[0],
- &rpc_req, (xdrproc_t)xdr_changelog_event_req);
- if (len < 0) {
- gf_log (this->name, GF_LOG_ERROR, "xdr decoding failed");
- req->rpc_err = GARBAGE_ARGS;
- goto handle_xdr_error;
- }
-
- /* prepare payload */
- if (len < req->msg[0].iov_len) {
- payloadcnt = 1;
- vector[0].iov_base = (req->msg[0].iov_base + len);
- vector[0].iov_len = (req->msg[0].iov_len - len);
- }
-
- for (i = 1; i < req->count; i++) {
- vector[payloadcnt++] = req->msg[i];
- }
-
- gf_log (this->name, GF_LOG_DEBUG,
- "seq: %lu (time: %lu.%lu), (vec: %d)",
- rpc_req.seq, rpc_req.tv_sec, rpc_req.tv_usec, payloadcnt);
-
- /* invoke callback */
- struct iovec *vec = (struct iovec *) &vector;
- gf_changelog_invoke_callback (entry, &vec, payloadcnt);
-
- /* ack sequence number */
- rpc_rsp.op_ret = 0;
- rpc_rsp.seq = rpc_req.seq;
-
- goto submit_rpc;
-
- handle_xdr_error:
- rpc_rsp.op_ret = -1;
- rpc_rsp.seq = 0; /* invalid */
- submit_rpc:
- return changelog_rpc_sumbit_reply (req, &rpc_rsp, NULL, 0, NULL,
- (xdrproc_t)xdr_changelog_event_rsp);
-}
-
-int
gf_changelog_reborp_handle_event (rpcsvc_request_t *req)
{
- int ret = 0;
- xlator_t *this = NULL;
- rpcsvc_t *svc = NULL;
- gf_changelog_t *entry = NULL;
+ xlator_t *this = NULL;
+ rpcsvc_t *svc = NULL;
+ gf_changelog_t *entry = NULL;
svc = rpcsvc_request_service (req);
entry = svc->mydata;
this = THIS = entry->this;
- ret = GF_NEED_ORDERED_EVENTS (entry)
- ? gf_changelog_ordered_event_handler (req, this, entry)
- : gf_changelog_unordered_event_handler (req, this, entry);
-
- return ret;
+ return gf_changelog_event_handler (req, this, entry);
}
rpcsvc_actor_t gf_changelog_reborp_actors[CHANGELOG_REV_PROC_MAX] = {