summaryrefslogtreecommitdiffstats
path: root/xlators/features/changelog/lib/src/gf-changelog.c
diff options
context:
space:
mode:
Diffstat (limited to 'xlators/features/changelog/lib/src/gf-changelog.c')
-rw-r--r--xlators/features/changelog/lib/src/gf-changelog.c86
1 files changed, 55 insertions, 31 deletions
diff --git a/xlators/features/changelog/lib/src/gf-changelog.c b/xlators/features/changelog/lib/src/gf-changelog.c
index e1cfdb038fa..103a7b01eb0 100644
--- a/xlators/features/changelog/lib/src/gf-changelog.c
+++ b/xlators/features/changelog/lib/src/gf-changelog.c
@@ -1,5 +1,5 @@
/*
- Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com>
+ Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com>
This file is part of GlusterFS.
This file is licensed to you under your choice of the GNU Lesser
@@ -54,14 +54,20 @@ gf_private_t *gf_changelog_alloc_priv ()
if (!priv)
goto error_return;
INIT_LIST_HEAD (&priv->connections);
+ INIT_LIST_HEAD (&priv->cleanups);
- ret = LOCK_INIT (&priv->lock);
+ ret = pthread_mutex_init (&priv->lock, NULL);
if (ret != 0)
goto free_priv;
- priv->api = NULL;
+ ret = pthread_cond_init (&priv->cond, NULL);
+ if (ret != 0)
+ goto cleanup_mutex;
+ priv->api = NULL;
return priv;
+ cleanup_mutex:
+ (void) pthread_mutex_destroy (&priv->lock);
free_priv:
GF_FREE (priv);
error_return:
@@ -248,21 +254,23 @@ gf_changelog_setup_rpc (xlator_t *this,
return -1;
}
-static void
-gf_cleanup_event (gf_changelog_t *entry)
+int
+gf_cleanup_event (xlator_t *this, struct gf_event_list *ev)
{
- xlator_t *this = NULL;
- struct gf_event_list *ev = NULL;
-
- this = entry->this;
- ev = &entry->event;
-
- (void) gf_thread_cleanup (this, ev->invoker);
+ int ret = 0;
- (void) pthread_mutex_destroy (&ev->lock);
- (void) pthread_cond_destroy (&ev->cond);
+ ret = gf_thread_cleanup (this, ev->invoker);
+ if (ret) {
+ gf_log (this->name, GF_LOG_WARNING,
+ "cannot cleanup callback invoker thread "
+ " [reason: %s]. Not freeing resources",
+ strerror (-ret));
+ return -1;
+ }
ev->entry = NULL;
+
+ return 0;
}
static int
@@ -284,11 +292,17 @@ gf_init_event (gf_changelog_t *entry)
ev->next_seq = 0; /* bootstrap sequencing */
- if (entry->ordered) {
- ret = pthread_create (&ev->invoker, NULL,
- gf_changelog_callback_invoker, ev);
- if (ret != 0)
- goto cleanup_cond;
+ ret = gf_thread_create (&ev->invoker, NULL,
+ gf_changelog_callback_invoker, ev);
+ if (ret != 0)
+ goto cleanup_cond;
+
+ if (GF_NEED_ORDERED_EVENTS (entry)) {
+ entry->pickevent = pick_event_ordered;
+ entry->queueevent = queue_ordered_event;
+ } else {
+ entry->pickevent = pick_event_unordered;
+ entry->queueevent = queue_unordered_event;
}
return 0;
@@ -303,7 +317,7 @@ gf_init_event (gf_changelog_t *entry)
/**
* TODO:
- * - cleanup invoker thread (if ordered mode)
+ * - cleanup invoker thread
* - cleanup event list
* - destroy rpc{-clnt, svc}
*/
@@ -339,6 +353,9 @@ gf_setup_brick_connection (xlator_t *this,
goto error_return;
INIT_LIST_HEAD (&entry->list);
+ LOCK_INIT (&entry->statelock);
+ entry->connstate = GF_CHANGELOG_CONN_STATE_PENDING;
+
entry->notify = brick->filter;
(void) strncpy (entry->brick, brick->brick_path, PATH_MAX);
@@ -346,11 +363,9 @@ gf_setup_brick_connection (xlator_t *this,
entry->invokerxl = xl;
entry->ordered = ordered;
- if (ordered) {
- ret = gf_init_event (entry);
- if (ret)
- goto free_entry;
- }
+ ret = gf_init_event (entry);
+ if (ret)
+ goto free_entry;
entry->fini = brick->fini;
entry->callback = brick->callback;
@@ -362,11 +377,11 @@ gf_setup_brick_connection (xlator_t *this,
goto cleanup_event;
priv->api = entry->ptr; /* pointer to API, if required */
- LOCK (&priv->lock);
+ pthread_mutex_lock (&priv->lock);
{
list_add_tail (&entry->list, &priv->connections);
}
- UNLOCK (&priv->lock);
+ pthread_mutex_unlock (&priv->lock);
ret = gf_changelog_setup_rpc (this, entry, CHANGELOG_RPC_PROBE_FILTER);
if (ret)
@@ -374,9 +389,9 @@ gf_setup_brick_connection (xlator_t *this,
return 0;
cleanup_event:
- if (ordered)
- gf_cleanup_event (entry);
+ (void) gf_cleanup_event (this, &entry->event);
free_entry:
+ gf_log (this->name, GF_LOG_DEBUG, "freeing entry %p", entry);
list_del (&entry->list); /* FIXME: kludge for now */
GF_FREE (entry);
error_return:
@@ -436,8 +451,8 @@ gf_changelog_set_master (xlator_t *master, void *xl)
if (!xl) {
/* poller thread */
- ret = pthread_create (&priv->poller,
- NULL, changelog_rpc_poller, THIS);
+ ret = gf_thread_create (&priv->poller,
+ NULL, changelog_rpc_poller, THIS);
if (ret != 0) {
GF_FREE (priv);
gf_log (master->name, GF_LOG_ERROR,
@@ -458,6 +473,7 @@ int
gf_changelog_init (void *xl)
{
int ret = 0;
+ gf_private_t *priv = NULL;
if (master)
return 0;
@@ -474,6 +490,14 @@ gf_changelog_init (void *xl)
if (ret)
goto dealloc_name;
+ priv = master->private;
+ ret = gf_thread_create (&priv->connectionjanitor, NULL,
+ gf_changelog_connection_janitor, master);
+ if (ret != 0) {
+ /* TODO: cleanup priv, mutex (poller thread for !xl) */
+ goto dealloc_name;
+ }
+
return 0;
dealloc_name: