diff options
Diffstat (limited to 'xlators/features/changelog/lib/src/gf-changelog.c')
-rw-r--r-- | xlators/features/changelog/lib/src/gf-changelog.c | 86 |
1 files changed, 55 insertions, 31 deletions
diff --git a/xlators/features/changelog/lib/src/gf-changelog.c b/xlators/features/changelog/lib/src/gf-changelog.c index e1cfdb038fa..103a7b01eb0 100644 --- a/xlators/features/changelog/lib/src/gf-changelog.c +++ b/xlators/features/changelog/lib/src/gf-changelog.c @@ -1,5 +1,5 @@ /* - Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com> + Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com> This file is part of GlusterFS. This file is licensed to you under your choice of the GNU Lesser @@ -54,14 +54,20 @@ gf_private_t *gf_changelog_alloc_priv () if (!priv) goto error_return; INIT_LIST_HEAD (&priv->connections); + INIT_LIST_HEAD (&priv->cleanups); - ret = LOCK_INIT (&priv->lock); + ret = pthread_mutex_init (&priv->lock, NULL); if (ret != 0) goto free_priv; - priv->api = NULL; + ret = pthread_cond_init (&priv->cond, NULL); + if (ret != 0) + goto cleanup_mutex; + priv->api = NULL; return priv; + cleanup_mutex: + (void) pthread_mutex_destroy (&priv->lock); free_priv: GF_FREE (priv); error_return: @@ -248,21 +254,23 @@ gf_changelog_setup_rpc (xlator_t *this, return -1; } -static void -gf_cleanup_event (gf_changelog_t *entry) +int +gf_cleanup_event (xlator_t *this, struct gf_event_list *ev) { - xlator_t *this = NULL; - struct gf_event_list *ev = NULL; - - this = entry->this; - ev = &entry->event; - - (void) gf_thread_cleanup (this, ev->invoker); + int ret = 0; - (void) pthread_mutex_destroy (&ev->lock); - (void) pthread_cond_destroy (&ev->cond); + ret = gf_thread_cleanup (this, ev->invoker); + if (ret) { + gf_log (this->name, GF_LOG_WARNING, + "cannot cleanup callback invoker thread " + " [reason: %s]. Not freeing resources", + strerror (-ret)); + return -1; + } ev->entry = NULL; + + return 0; } static int @@ -284,11 +292,17 @@ gf_init_event (gf_changelog_t *entry) ev->next_seq = 0; /* bootstrap sequencing */ - if (entry->ordered) { - ret = pthread_create (&ev->invoker, NULL, - gf_changelog_callback_invoker, ev); - if (ret != 0) - goto cleanup_cond; + ret = gf_thread_create (&ev->invoker, NULL, + gf_changelog_callback_invoker, ev); + if (ret != 0) + goto cleanup_cond; + + if (GF_NEED_ORDERED_EVENTS (entry)) { + entry->pickevent = pick_event_ordered; + entry->queueevent = queue_ordered_event; + } else { + entry->pickevent = pick_event_unordered; + entry->queueevent = queue_unordered_event; } return 0; @@ -303,7 +317,7 @@ gf_init_event (gf_changelog_t *entry) /** * TODO: - * - cleanup invoker thread (if ordered mode) + * - cleanup invoker thread * - cleanup event list * - destroy rpc{-clnt, svc} */ @@ -339,6 +353,9 @@ gf_setup_brick_connection (xlator_t *this, goto error_return; INIT_LIST_HEAD (&entry->list); + LOCK_INIT (&entry->statelock); + entry->connstate = GF_CHANGELOG_CONN_STATE_PENDING; + entry->notify = brick->filter; (void) strncpy (entry->brick, brick->brick_path, PATH_MAX); @@ -346,11 +363,9 @@ gf_setup_brick_connection (xlator_t *this, entry->invokerxl = xl; entry->ordered = ordered; - if (ordered) { - ret = gf_init_event (entry); - if (ret) - goto free_entry; - } + ret = gf_init_event (entry); + if (ret) + goto free_entry; entry->fini = brick->fini; entry->callback = brick->callback; @@ -362,11 +377,11 @@ gf_setup_brick_connection (xlator_t *this, goto cleanup_event; priv->api = entry->ptr; /* pointer to API, if required */ - LOCK (&priv->lock); + pthread_mutex_lock (&priv->lock); { list_add_tail (&entry->list, &priv->connections); } - UNLOCK (&priv->lock); + pthread_mutex_unlock (&priv->lock); ret = gf_changelog_setup_rpc (this, entry, CHANGELOG_RPC_PROBE_FILTER); if (ret) @@ -374,9 +389,9 @@ gf_setup_brick_connection (xlator_t *this, return 0; cleanup_event: - if (ordered) - gf_cleanup_event (entry); + (void) gf_cleanup_event (this, &entry->event); free_entry: + gf_log (this->name, GF_LOG_DEBUG, "freeing entry %p", entry); list_del (&entry->list); /* FIXME: kludge for now */ GF_FREE (entry); error_return: @@ -436,8 +451,8 @@ gf_changelog_set_master (xlator_t *master, void *xl) if (!xl) { /* poller thread */ - ret = pthread_create (&priv->poller, - NULL, changelog_rpc_poller, THIS); + ret = gf_thread_create (&priv->poller, + NULL, changelog_rpc_poller, THIS); if (ret != 0) { GF_FREE (priv); gf_log (master->name, GF_LOG_ERROR, @@ -458,6 +473,7 @@ int gf_changelog_init (void *xl) { int ret = 0; + gf_private_t *priv = NULL; if (master) return 0; @@ -474,6 +490,14 @@ gf_changelog_init (void *xl) if (ret) goto dealloc_name; + priv = master->private; + ret = gf_thread_create (&priv->connectionjanitor, NULL, + gf_changelog_connection_janitor, master); + if (ret != 0) { + /* TODO: cleanup priv, mutex (poller thread for !xl) */ + goto dealloc_name; + } + return 0; dealloc_name: |