summaryrefslogtreecommitdiffstats
path: root/xlators/features/changelog
diff options
context:
space:
mode:
authorVenky Shankar <vshankar@redhat.com>2015-03-21 19:59:45 +0530
committerVijay Bellur <vbellur@redhat.com>2015-03-24 06:32:39 -0700
commit470d5c4af7f85809997a77af187c8f430b0354c8 (patch)
tree38ff73b3ceb39d55b883def3a241f7755d939d53 /xlators/features/changelog
parent9c9f3f368c693b1cf5f67f3d8d4e599d4ba61383 (diff)
libgfchangelog: cleanups on disconnection
[WIP patch as of now, just needs a little tweak] A pending TODO in the code caused regressions to fail as bitrot daemons are spawned during volume start (equivalent to enabling bitrot by default). The problematic part that casued such failures is during brick disconnections with unsafe handling of event data structured in the code. With this patch, data structures are properly cleaned up with care taken to cleanup all accessors first. This also fixes potential memory leaks which was bluntly ignored before. Change-Id: I70ed82cb1a0fb56c85ef390007e321a97a35c5ce BUG: 1170075 Signed-off-by: Venky Shankar <vshankar@redhat.com> original-author: Venky Shankar <vshankar@redhat.com> Reviewed-on: http://review.gluster.org/9959 Reviewed-by: Vijay Bellur <vbellur@redhat.com> Tested-by: Vijay Bellur <vbellur@redhat.com>
Diffstat (limited to 'xlators/features/changelog')
-rw-r--r--xlators/features/changelog/lib/examples/c/get-changes-multi.c4
-rw-r--r--xlators/features/changelog/lib/src/gf-changelog-helpers.h44
-rw-r--r--xlators/features/changelog/lib/src/gf-changelog-reborp.c280
-rw-r--r--xlators/features/changelog/lib/src/gf-changelog-rpc.c6
-rw-r--r--xlators/features/changelog/lib/src/gf-changelog.c86
5 files changed, 280 insertions, 140 deletions
diff --git a/xlators/features/changelog/lib/examples/c/get-changes-multi.c b/xlators/features/changelog/lib/examples/c/get-changes-multi.c
index ae404bc..3741bdf 100644
--- a/xlators/features/changelog/lib/examples/c/get-changes-multi.c
+++ b/xlators/features/changelog/lib/examples/c/get-changes-multi.c
@@ -44,7 +44,7 @@ void brick_callback (void *xl, char *brick,
void fill_brick_spec (struct gf_brick_spec *brick, char *path)
{
brick->brick_path = strdup (path);
- brick->filter = CHANGELOG_OP_TYPE_RELEASE;
+ brick->filter = CHANGELOG_OP_TYPE_BR_RELEASE;
brick->init = brick_init;
brick->fini = brick_fini;
@@ -75,7 +75,7 @@ main (int argc, char **argv)
goto error_return;
ret = gf_changelog_register_generic ((struct gf_brick_spec *)bricks, 2,
- 1, "/tmp/multi-changes.log", 9,
+ 0, "/tmp/multi-changes.log", 9,
NULL);
if (ret)
goto error_return;
diff --git a/xlators/features/changelog/lib/src/gf-changelog-helpers.h b/xlators/features/changelog/lib/src/gf-changelog-helpers.h
index 4247cb4..adde1e5 100644
--- a/xlators/features/changelog/lib/src/gf-changelog-helpers.h
+++ b/xlators/features/changelog/lib/src/gf-changelog-helpers.h
@@ -47,6 +47,7 @@ typedef struct read_line {
} read_line_t;
struct gf_changelog;
+struct gf_event;
/**
* Event list for ordered event notification
@@ -64,7 +65,7 @@ struct gf_event_list {
struct gf_changelog *entry; /* backpointer to it's brick
encapsulator (entry) */
- struct list_head events; /* list of events (ordered) */
+ struct list_head events; /* list of events */
};
/**
@@ -84,7 +85,7 @@ struct gf_event {
/**
* assign the base address of the IO vector to the correct memory
- * area and set it's addressable length.
+o * area and set it's addressable length.
*/
#define GF_EVENT_ASSIGN_IOVEC(vec, event, len, pos) \
do { \
@@ -95,11 +96,20 @@ struct gf_event {
pos += len; \
} while (0)
+typedef enum gf_changelog_conn_state {
+ GF_CHANGELOG_CONN_STATE_PENDING = 0,
+ GF_CHANGELOG_CONN_STATE_ACCEPTED,
+ GF_CHANGELOG_CONN_STATE_DISCONNECTED,
+} gf_changelog_conn_state_t;
+
/**
* An instance of this structure is allocated for each brick for which
* notifications are streamed.
*/
typedef struct gf_changelog {
+ gf_lock_t statelock;
+ gf_changelog_conn_state_t connstate;
+
xlator_t *this;
struct list_head list; /* list of instances */
@@ -125,6 +135,9 @@ typedef struct gf_changelog {
gf_boolean_t ordered;
+ void (*queueevent) (struct gf_event_list *, struct gf_event *);
+ void (*pickevent) (struct gf_event_list *, struct gf_event **);
+
struct gf_event_list event;
} gf_changelog_t;
@@ -140,13 +153,17 @@ gf_changelog_filter_check (gf_changelog_t *entry, changelog_event_t *event)
/** private structure */
typedef struct gf_private {
- gf_lock_t lock; /* protects ->connections */
+ pthread_mutex_t lock; /* protects ->connections, cleanups */
+ pthread_cond_t cond;
void *api; /* pointer for API access */
pthread_t poller; /* event poller thread */
+ pthread_t connectionjanitor; /* connection cleaner */
struct list_head connections; /* list of connections */
+ struct list_head cleanups; /* list of connection to be
+ cleaned up */
} gf_private_t;
#define GF_CHANGELOG_GET_API_PTR(this) (((gf_private_t *) this->private)->api)
@@ -218,4 +235,25 @@ gf_thread_cleanup (xlator_t *this, pthread_t thread);
void *
gf_changelog_callback_invoker (void *arg);
+int
+gf_cleanup_event (xlator_t *, struct gf_event_list *);
+
+/* (un)ordered event queueing */
+void
+queue_ordered_event (struct gf_event_list *, struct gf_event *);
+
+void
+queue_unordered_event (struct gf_event_list *, struct gf_event *);
+
+/* (un)ordered event picking */
+void
+pick_event_ordered (struct gf_event_list *, struct gf_event **);
+
+void
+pick_event_unordered (struct gf_event_list *, struct gf_event **);
+
+/* connection janitor thread */
+void *
+gf_changelog_connection_janitor (void *);
+
#endif
diff --git a/xlators/features/changelog/lib/src/gf-changelog-reborp.c b/xlators/features/changelog/lib/src/gf-changelog-reborp.c
index d7e60fb..457d829 100644
--- a/xlators/features/changelog/lib/src/gf-changelog-reborp.c
+++ b/xlators/features/changelog/lib/src/gf-changelog-reborp.c
@@ -21,19 +21,135 @@
struct rpcsvc_program *gf_changelog_reborp_programs[];
+void *
+gf_changelog_connection_janitor (void *arg)
+{
+ int32_t ret = 0;
+ xlator_t *this = NULL;
+ gf_private_t *priv = NULL;
+ gf_changelog_t *entry = NULL;
+ struct gf_event *event = NULL;
+ struct gf_event_list *ev = NULL;
+ unsigned long drained = 0;
+
+ this = arg;
+ THIS = this;
+
+ priv = this->private;
+
+ while (1) {
+ pthread_mutex_lock (&priv->lock);
+ {
+ while (list_empty (&priv->cleanups))
+ pthread_cond_wait (&priv->cond, &priv->lock);
+
+ entry = list_first_entry (&priv->cleanups,
+ gf_changelog_t, list);
+ list_del_init (&entry->list);
+ }
+ pthread_mutex_unlock (&priv->lock);
+
+ drained = 0;
+ ev = &entry->event;
+
+ gf_log (this->name, GF_LOG_INFO,
+ "Cleaning brick entry for brick %s", entry->brick);
+
+ /* 0x0: disbale rpc-clnt */
+ rpc_clnt_disable (RPC_PROBER (entry));
+
+ /* 0x1: cleanup callback invoker thread */
+ ret = gf_cleanup_event (this, ev);
+ if (ret)
+ continue;
+
+ /* 0x2: drain pending events */
+ while (!list_empty (&ev->events)) {
+ event = list_first_entry (&ev->events,
+ struct gf_event, list);
+ gf_log (this->name, GF_LOG_INFO,
+ "Draining event [Seq: %lu, Payload: %d]",
+ event->seq, event->count);
+
+ GF_FREE (event);
+ drained++;
+ }
+
+ gf_log (this->name, GF_LOG_INFO,
+ "Drained %lu events", drained);
+
+ /* 0x3: freeup brick entry */
+ gf_log (this->name, GF_LOG_INFO, "freeing entry %p", entry);
+ LOCK_DESTROY (&entry->statelock);
+ GF_FREE (entry);
+ }
+
+ return NULL;
+}
+
+static inline void
+__gf_changelog_set_conn_state (gf_changelog_t *entry,
+ gf_changelog_conn_state_t state)
+{
+ entry->connstate = state;
+}
+
/**
- * On a reverse connection, unlink the socket file.
+ * state check login to gaurd access object after free
*/
+static inline void
+gf_changelog_check_event (gf_private_t *priv,
+ gf_changelog_t *entry, rpcsvc_event_t event)
+{
+ gf_boolean_t needfree = _gf_false;
+ gf_changelog_conn_state_t laststate;
+ /**
+ * need to handle couple of connection states to gaurd correct
+ * freeing of object.
+ */
+ LOCK (&entry->statelock);
+ {
+ laststate = entry->connstate;
+ if (event == RPCSVC_EVENT_ACCEPT) {
+ __gf_changelog_set_conn_state
+ (entry, GF_CHANGELOG_CONN_STATE_ACCEPTED);
+
+ if (laststate == GF_CHANGELOG_CONN_STATE_DISCONNECTED)
+ needfree = _gf_true;
+ }
+
+ if (event == RPCSVC_EVENT_DISCONNECT) {
+ __gf_changelog_set_conn_state
+ (entry, GF_CHANGELOG_CONN_STATE_DISCONNECTED);
+
+ if (laststate == GF_CHANGELOG_CONN_STATE_ACCEPTED)
+ needfree = _gf_true;
+ }
+ }
+ UNLOCK (&entry->statelock);
+
+ /**
+ * TODO:
+ * Handle the race between ACCEPT and DISCONNECT in the
+ * reconnect code. So purging of entry is deliberately
+ * avoided here. It will be handled in the reconnect code.
+ */
+}
+
int
gf_changelog_reborp_rpcsvc_notify (rpcsvc_t *rpc, void *mydata,
rpcsvc_event_t event, void *data)
{
- int ret = 0;
+ int ret = 0;
xlator_t *this = NULL;
gf_private_t *priv = NULL;
gf_changelog_t *entry = NULL;
char sock[UNIX_PATH_MAX] = {0,};
+ if (!(event == RPCSVC_EVENT_ACCEPT ||
+ event == RPCSVC_EVENT_DISCONNECT))
+ return 0;
+
entry = mydata;
this = entry->this;
priv = this->private;
@@ -42,29 +158,23 @@ gf_changelog_reborp_rpcsvc_notify (rpcsvc_t *rpc, void *mydata,
case RPCSVC_EVENT_ACCEPT:
ret = unlink (RPC_SOCK(entry));
if (ret != 0)
- gf_log (this->name, GF_LOG_WARNING, "failed to unlink"
- " reverse socket file %s", RPC_SOCK (entry));
+ gf_log (this->name, GF_LOG_WARNING, "failed to unlink "
+ "reverse socket %s", RPC_SOCK (entry));
if (entry->connected)
GF_CHANGELOG_INVOKE_CBK (this, entry->connected,
entry->brick, entry->ptr);
break;
case RPCSVC_EVENT_DISCONNECT:
- LOCK (&priv->lock);
- {
- list_del (&entry->list);
- }
- UNLOCK (&priv->lock);
-
if (entry->disconnected)
GF_CHANGELOG_INVOKE_CBK (this, entry->disconnected,
entry->brick, entry->ptr);
-
- GF_FREE (entry);
- break;
+ /* passthrough */
default:
break;
}
+ /* gf_changelog_check_event (priv, entry, event); */
+
return 0;
}
@@ -140,12 +250,29 @@ __can_process_event (struct gf_event_list *ev, struct gf_event **event)
return 0;
}
-inline void
-__process_event_list (struct gf_event_list *ev, struct gf_event **event)
+void
+pick_event_ordered (struct gf_event_list *ev, struct gf_event **event)
{
- while (list_empty (&ev->events)
- || !__can_process_event (ev, event))
- pthread_cond_wait (&ev->cond, &ev->lock);
+ pthread_mutex_lock (&ev->lock);
+ {
+ while (list_empty (&ev->events)
+ || !__can_process_event (ev, event))
+ pthread_cond_wait (&ev->cond, &ev->lock);
+ }
+ pthread_mutex_unlock (&ev->lock);
+}
+
+void
+pick_event_unordered (struct gf_event_list *ev, struct gf_event **event)
+{
+ pthread_mutex_lock (&ev->lock);
+ {
+ while (list_empty (&ev->events))
+ pthread_cond_wait (&ev->cond, &ev->lock);
+ *event = list_first_entry (&ev->events, struct gf_event, list);
+ list_del (&(*event)->list);
+ }
+ pthread_mutex_unlock (&ev->lock);
}
void *
@@ -160,14 +287,10 @@ gf_changelog_callback_invoker (void *arg)
ev = arg;
entry = ev->entry;
- this = entry->this;
+ THIS = this = entry->this;
while (1) {
- pthread_mutex_lock (&ev->lock);
- {
- __process_event_list (ev, &event);
- }
- pthread_mutex_unlock (&ev->lock);
+ entry->pickevent (ev, &event);
vec = (struct iovec *) &event->iov;
gf_changelog_invoke_callback (entry, &vec, event->count);
@@ -192,9 +315,36 @@ orderfn (struct list_head *pos1, struct list_head *pos2)
return -1;
}
+void
+queue_ordered_event (struct gf_event_list *ev, struct gf_event *event)
+{
+ /* add event to the ordered event list and wake up listner(s) */
+ pthread_mutex_lock (&ev->lock);
+ {
+ list_add_order (&event->list, &ev->events, orderfn);
+ if (!ev->next_seq)
+ ev->next_seq = event->seq;
+ if (ev->next_seq == event->seq)
+ pthread_cond_signal (&ev->cond);
+ }
+ pthread_mutex_unlock (&ev->lock);
+}
+
+void
+queue_unordered_event (struct gf_event_list *ev, struct gf_event *event)
+{
+ /* add event to the tail of the queue and wake up listener(s) */
+ pthread_mutex_lock (&ev->lock);
+ {
+ list_add_tail (&event->list, &ev->events);
+ pthread_cond_signal (&ev->cond);
+ }
+ pthread_mutex_unlock (&ev->lock);
+}
+
int
-gf_changelog_ordered_event_handler (rpcsvc_request_t *req,
- xlator_t *this, gf_changelog_t *entry)
+gf_changelog_event_handler (rpcsvc_request_t *req,
+ xlator_t *this, gf_changelog_t *entry)
{
int i = 0;
size_t payloadlen = 0;
@@ -255,16 +405,8 @@ gf_changelog_ordered_event_handler (rpcsvc_request_t *req,
rpc_req.seq, entry->brick, rpc_req.tv_sec,
rpc_req.tv_usec, payloadcnt, payloadlen);
- /* add it to the ordered event list and wake up listner(s) */
- pthread_mutex_lock (&ev->lock);
- {
- list_add_order (&event->list, &ev->events, orderfn);
- if (!ev->next_seq)
- ev->next_seq = event->seq;
- if (ev->next_seq == event->seq)
- pthread_cond_signal (&ev->cond);
- }
- pthread_mutex_unlock (&ev->lock);
+ /* dispatch event */
+ entry->queueevent (ev, event);
/* ack sequence number */
rpc_rsp.op_ret = 0;
@@ -281,76 +423,18 @@ gf_changelog_ordered_event_handler (rpcsvc_request_t *req,
}
int
-gf_changelog_unordered_event_handler (rpcsvc_request_t *req,
- xlator_t *this, gf_changelog_t *entry)
-{
- int i = 0;
- int ret = 0;
- ssize_t len = 0;
- int payloadcnt = 0;
- struct iovec vector[MAX_IOVEC] = {{0,}, };
- changelog_event_req rpc_req = {0,};
- changelog_event_rsp rpc_rsp = {0,};
-
- len = xdr_to_generic (req->msg[0],
- &rpc_req, (xdrproc_t)xdr_changelog_event_req);
- if (len < 0) {
- gf_log (this->name, GF_LOG_ERROR, "xdr decoding failed");
- req->rpc_err = GARBAGE_ARGS;
- goto handle_xdr_error;
- }
-
- /* prepare payload */
- if (len < req->msg[0].iov_len) {
- payloadcnt = 1;
- vector[0].iov_base = (req->msg[0].iov_base + len);
- vector[0].iov_len = (req->msg[0].iov_len - len);
- }
-
- for (i = 1; i < req->count; i++) {
- vector[payloadcnt++] = req->msg[i];
- }
-
- gf_log (this->name, GF_LOG_DEBUG,
- "seq: %lu (time: %lu.%lu), (vec: %d)",
- rpc_req.seq, rpc_req.tv_sec, rpc_req.tv_usec, payloadcnt);
-
- /* invoke callback */
- struct iovec *vec = (struct iovec *) &vector;
- gf_changelog_invoke_callback (entry, &vec, payloadcnt);
-
- /* ack sequence number */
- rpc_rsp.op_ret = 0;
- rpc_rsp.seq = rpc_req.seq;
-
- goto submit_rpc;
-
- handle_xdr_error:
- rpc_rsp.op_ret = -1;
- rpc_rsp.seq = 0; /* invalid */
- submit_rpc:
- return changelog_rpc_sumbit_reply (req, &rpc_rsp, NULL, 0, NULL,
- (xdrproc_t)xdr_changelog_event_rsp);
-}
-
-int
gf_changelog_reborp_handle_event (rpcsvc_request_t *req)
{
- int ret = 0;
- xlator_t *this = NULL;
- rpcsvc_t *svc = NULL;
- gf_changelog_t *entry = NULL;
+ xlator_t *this = NULL;
+ rpcsvc_t *svc = NULL;
+ gf_changelog_t *entry = NULL;
svc = rpcsvc_request_service (req);
entry = svc->mydata;
this = THIS = entry->this;
- ret = GF_NEED_ORDERED_EVENTS (entry)
- ? gf_changelog_ordered_event_handler (req, this, entry)
- : gf_changelog_unordered_event_handler (req, this, entry);
-
- return ret;
+ return gf_changelog_event_handler (req, this, entry);
}
rpcsvc_actor_t gf_changelog_reborp_actors[CHANGELOG_REV_PROC_MAX] = {
diff --git a/xlators/features/changelog/lib/src/gf-changelog-rpc.c b/xlators/features/changelog/lib/src/gf-changelog-rpc.c
index c2a4c04..270632b 100644
--- a/xlators/features/changelog/lib/src/gf-changelog-rpc.c
+++ b/xlators/features/changelog/lib/src/gf-changelog-rpc.c
@@ -19,17 +19,11 @@ int
gf_changelog_rpc_notify (struct rpc_clnt *rpc,
void *mydata, rpc_clnt_event_t event, void *data)
{
- xlator_t *this = NULL;
-
- this = mydata;
-
switch (event) {
case RPC_CLNT_CONNECT:
rpc_clnt_set_connected (&rpc->conn);
break;
case RPC_CLNT_DISCONNECT:
- rpc_clnt_unset_connected (&rpc->conn);
- break;
case RPC_CLNT_MSG:
case RPC_CLNT_DESTROY:
break;
diff --git a/xlators/features/changelog/lib/src/gf-changelog.c b/xlators/features/changelog/lib/src/gf-changelog.c
index e1cfdb0..103a7b0 100644
--- a/xlators/features/changelog/lib/src/gf-changelog.c
+++ b/xlators/features/changelog/lib/src/gf-changelog.c
@@ -1,5 +1,5 @@
/*
- Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com>
+ Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com>
This file is part of GlusterFS.
This file is licensed to you under your choice of the GNU Lesser
@@ -54,14 +54,20 @@ gf_private_t *gf_changelog_alloc_priv ()
if (!priv)
goto error_return;
INIT_LIST_HEAD (&priv->connections);
+ INIT_LIST_HEAD (&priv->cleanups);
- ret = LOCK_INIT (&priv->lock);
+ ret = pthread_mutex_init (&priv->lock, NULL);
if (ret != 0)
goto free_priv;
- priv->api = NULL;
+ ret = pthread_cond_init (&priv->cond, NULL);
+ if (ret != 0)
+ goto cleanup_mutex;
+ priv->api = NULL;
return priv;
+ cleanup_mutex:
+ (void) pthread_mutex_destroy (&priv->lock);
free_priv:
GF_FREE (priv);
error_return:
@@ -248,21 +254,23 @@ gf_changelog_setup_rpc (xlator_t *this,
return -1;
}
-static void
-gf_cleanup_event (gf_changelog_t *entry)
+int
+gf_cleanup_event (xlator_t *this, struct gf_event_list *ev)
{
- xlator_t *this = NULL;
- struct gf_event_list *ev = NULL;
-
- this = entry->this;
- ev = &entry->event;
-
- (void) gf_thread_cleanup (this, ev->invoker);
+ int ret = 0;
- (void) pthread_mutex_destroy (&ev->lock);
- (void) pthread_cond_destroy (&ev->cond);
+ ret = gf_thread_cleanup (this, ev->invoker);
+ if (ret) {
+ gf_log (this->name, GF_LOG_WARNING,
+ "cannot cleanup callback invoker thread "
+ " [reason: %s]. Not freeing resources",
+ strerror (-ret));
+ return -1;
+ }
ev->entry = NULL;
+
+ return 0;
}
static int
@@ -284,11 +292,17 @@ gf_init_event (gf_changelog_t *entry)
ev->next_seq = 0; /* bootstrap sequencing */
- if (entry->ordered) {
- ret = pthread_create (&ev->invoker, NULL,
- gf_changelog_callback_invoker, ev);
- if (ret != 0)
- goto cleanup_cond;
+ ret = gf_thread_create (&ev->invoker, NULL,
+ gf_changelog_callback_invoker, ev);
+ if (ret != 0)
+ goto cleanup_cond;
+
+ if (GF_NEED_ORDERED_EVENTS (entry)) {
+ entry->pickevent = pick_event_ordered;
+ entry->queueevent = queue_ordered_event;
+ } else {
+ entry->pickevent = pick_event_unordered;
+ entry->queueevent = queue_unordered_event;
}
return 0;
@@ -303,7 +317,7 @@ gf_init_event (gf_changelog_t *entry)
/**
* TODO:
- * - cleanup invoker thread (if ordered mode)
+ * - cleanup invoker thread
* - cleanup event list
* - destroy rpc{-clnt, svc}
*/
@@ -339,6 +353,9 @@ gf_setup_brick_connection (xlator_t *this,
goto error_return;
INIT_LIST_HEAD (&entry->list);
+ LOCK_INIT (&entry->statelock);
+ entry->connstate = GF_CHANGELOG_CONN_STATE_PENDING;
+
entry->notify = brick->filter;
(void) strncpy (entry->brick, brick->brick_path, PATH_MAX);
@@ -346,11 +363,9 @@ gf_setup_brick_connection (xlator_t *this,
entry->invokerxl = xl;
entry->ordered = ordered;
- if (ordered) {
- ret = gf_init_event (entry);
- if (ret)
- goto free_entry;
- }
+ ret = gf_init_event (entry);
+ if (ret)
+ goto free_entry;
entry->fini = brick->fini;
entry->callback = brick->callback;
@@ -362,11 +377,11 @@ gf_setup_brick_connection (xlator_t *this,
goto cleanup_event;
priv->api = entry->ptr; /* pointer to API, if required */
- LOCK (&priv->lock);
+ pthread_mutex_lock (&priv->lock);
{
list_add_tail (&entry->list, &priv->connections);
}
- UNLOCK (&priv->lock);
+ pthread_mutex_unlock (&priv->lock);
ret = gf_changelog_setup_rpc (this, entry, CHANGELOG_RPC_PROBE_FILTER);
if (ret)
@@ -374,9 +389,9 @@ gf_setup_brick_connection (xlator_t *this,
return 0;
cleanup_event:
- if (ordered)
- gf_cleanup_event (entry);
+ (void) gf_cleanup_event (this, &entry->event);
free_entry:
+ gf_log (this->name, GF_LOG_DEBUG, "freeing entry %p", entry);
list_del (&entry->list); /* FIXME: kludge for now */
GF_FREE (entry);
error_return:
@@ -436,8 +451,8 @@ gf_changelog_set_master (xlator_t *master, void *xl)
if (!xl) {
/* poller thread */
- ret = pthread_create (&priv->poller,
- NULL, changelog_rpc_poller, THIS);
+ ret = gf_thread_create (&priv->poller,
+ NULL, changelog_rpc_poller, THIS);
if (ret != 0) {
GF_FREE (priv);
gf_log (master->name, GF_LOG_ERROR,
@@ -458,6 +473,7 @@ int
gf_changelog_init (void *xl)
{
int ret = 0;
+ gf_private_t *priv = NULL;
if (master)
return 0;
@@ -474,6 +490,14 @@ gf_changelog_init (void *xl)
if (ret)
goto dealloc_name;
+ priv = master->private;
+ ret = gf_thread_create (&priv->connectionjanitor, NULL,
+ gf_changelog_connection_janitor, master);
+ if (ret != 0) {
+ /* TODO: cleanup priv, mutex (poller thread for !xl) */
+ goto dealloc_name;
+ }
+
return 0;
dealloc_name: