/* Copyright (c) 2015 Red Hat, Inc. This file is part of GlusterFS. This file is licensed to you under your choice of the GNU Lesser General Public License, version 3 or any later version (LGPLv3 or later), or the GNU General Public License, version 2 (GPLv2), in all cases as published by the Free Software Foundation. */ #include #include #include #include #include #include #include #include #ifndef _GNU_SOURCE #define _GNU_SOURCE #endif #include #include #include #include #include #include #include "gf-changelog-rpc.h" #include "gf-changelog-helpers.h" /* from the changelog translator */ #include "changelog-misc.h" #include "changelog-mem-types.h" #include "changelog-lib-messages.h" /** * Global singleton xlator pointer for the library, initialized * during library load. This should probably be hidden inside * an initialized object which is an handle for the consumer. * * TODO: do away with the global.. */ xlator_t *master = NULL; static inline gf_private_t * gf_changelog_alloc_priv() { int ret = 0; gf_private_t *priv = NULL; priv = GF_CALLOC(1, sizeof(*priv), gf_changelog_mt_priv_t); if (!priv) goto error_return; INIT_LIST_HEAD(&priv->connections); INIT_LIST_HEAD(&priv->cleanups); ret = pthread_mutex_init(&priv->lock, NULL); if (ret != 0) goto free_priv; ret = pthread_cond_init(&priv->cond, NULL); if (ret != 0) goto cleanup_mutex; priv->api = NULL; return priv; cleanup_mutex: (void)pthread_mutex_destroy(&priv->lock); free_priv: GF_FREE(priv); error_return: return NULL; } #define GF_CHANGELOG_EVENT_POOL_SIZE 16384 #define GF_CHANGELOG_EVENT_THREAD_COUNT 4 static int gf_changelog_ctx_defaults_init(glusterfs_ctx_t *ctx) { cmd_args_t *cmd_args = NULL; struct rlimit lim = { 0, }; call_pool_t *pool = NULL; int ret = -1; ret = xlator_mem_acct_init(THIS, gf_changelog_mt_end); if (ret != 0) return -1; ctx->process_uuid = generate_glusterfs_ctx_id(); if (!ctx->process_uuid) return -1; ctx->page_size = 128 * GF_UNIT_KB; ctx->iobuf_pool = iobuf_pool_new(); if (!ctx->iobuf_pool) goto free_pool; ctx->event_pool = gf_event_pool_new(GF_CHANGELOG_EVENT_POOL_SIZE, GF_CHANGELOG_EVENT_THREAD_COUNT); if (!ctx->event_pool) goto free_pool; pool = GF_CALLOC(1, sizeof(call_pool_t), gf_changelog_mt_libgfchangelog_call_pool_t); if (!pool) goto free_pool; /* frame_mem_pool size 112 * 64 */ pool->frame_mem_pool = mem_pool_new(call_frame_t, 32); if (!pool->frame_mem_pool) goto free_pool; /* stack_mem_pool size 256 * 128 */ pool->stack_mem_pool = mem_pool_new(call_stack_t, 16); if (!pool->stack_mem_pool) goto free_pool; ctx->stub_mem_pool = mem_pool_new(call_stub_t, 16); if (!ctx->stub_mem_pool) goto free_pool; ctx->dict_pool = mem_pool_new(dict_t, 32); if (!ctx->dict_pool) goto free_pool; ctx->dict_pair_pool = mem_pool_new(data_pair_t, 512); if (!ctx->dict_pair_pool) goto free_pool; ctx->dict_data_pool = mem_pool_new(data_t, 512); if (!ctx->dict_data_pool) goto free_pool; ctx->logbuf_pool = mem_pool_new(log_buf_t, 256); if (!ctx->logbuf_pool) goto free_pool; INIT_LIST_HEAD(&pool->all_frames); LOCK_INIT(&pool->lock); ctx->pool = pool; LOCK_INIT(&ctx->lock); cmd_args = &ctx->cmd_args; INIT_LIST_HEAD(&cmd_args->xlator_options); lim.rlim_cur = RLIM_INFINITY; lim.rlim_max = RLIM_INFINITY; setrlimit(RLIMIT_CORE, &lim); return 0; free_pool: if (pool) { GF_FREE(pool->frame_mem_pool); GF_FREE(pool->stack_mem_pool); GF_FREE(pool); } GF_FREE(ctx->stub_mem_pool); GF_FREE(ctx->dict_pool); GF_FREE(ctx->dict_pair_pool); GF_FREE(ctx->dict_data_pool); GF_FREE(ctx->logbuf_pool); GF_FREE(ctx->iobuf_pool); GF_FREE(ctx->event_pool); return -1; } /* TODO: cleanup ctx defaults */ void gf_changelog_cleanup_this(xlator_t *this) { glusterfs_ctx_t *ctx = NULL; if (!this) return; ctx = this->ctx; syncenv_destroy(ctx->env); free(ctx); this->private = NULL; this->ctx = NULL; mem_pools_fini(); } static int gf_changelog_init_context() { glusterfs_ctx_t *ctx = NULL; ctx = glusterfs_ctx_new(); if (!ctx) goto error_return; if (glusterfs_globals_init(ctx)) goto free_ctx; THIS->ctx = ctx; if (gf_changelog_ctx_defaults_init(ctx)) goto free_ctx; ctx->env = syncenv_new(0, 0, 0); if (!ctx->env) goto free_ctx; return 0; free_ctx: free(ctx); THIS->ctx = NULL; error_return: return -1; } static int gf_changelog_init_master() { int ret = 0; ret = gf_changelog_init_context(); mem_pools_init(); return ret; } /* TODO: cleanup clnt/svc on failure */ int gf_changelog_setup_rpc(xlator_t *this, gf_changelog_t *entry, int proc) { int ret = 0; rpcsvc_t *svc = NULL; struct rpc_clnt *rpc = NULL; /** * Initialize a connect back socket. A probe() RPC call to the server * triggers a reverse connect. */ svc = gf_changelog_reborp_init_rpc_listner(this, entry->brick, RPC_SOCK(entry), entry); if (!svc) goto error_return; RPC_REBORP(entry) = svc; /* Initialize an RPC client */ rpc = gf_changelog_rpc_init(this, entry); if (!rpc) goto error_return; RPC_PROBER(entry) = rpc; /** * @FIXME * till we have connection state machine, let's delay the RPC call * for now.. */ sleep(2); /** * Probe changelog translator for reverse connection. After a successful * call, there's less use of the client and can be disconnected, but * let's leave the connection active for any future RPC calls. */ ret = gf_changelog_invoke_rpc(this, entry, proc); if (ret) { gf_msg(this->name, GF_LOG_ERROR, 0, CHANGELOG_LIB_MSG_INVOKE_RPC_FAILED, "Could not initiate probe RPC, bailing out!!!"); goto error_return; } return 0; error_return: return -1; } int gf_cleanup_event(xlator_t *this, struct gf_event_list *ev) { int ret = 0; ret = gf_thread_cleanup(this, ev->invoker); if (ret) { gf_msg(this->name, GF_LOG_WARNING, -ret, CHANGELOG_LIB_MSG_CLEANUP_ERROR, "cannot cleanup callback invoker thread." " Not freeing resources"); return -1; } ev->entry = NULL; return 0; } static int gf_init_event(gf_changelog_t *entry) { int ret = 0; struct gf_event_list *ev = NULL; ev = &entry->event; ev->entry = entry; ret = pthread_mutex_init(&ev->lock, NULL); if (ret != 0) goto error_return; ret = pthread_cond_init(&ev->cond, NULL); if (ret != 0) goto cleanup_mutex; INIT_LIST_HEAD(&ev->events); ev->next_seq = 0; /* bootstrap sequencing */ if (GF_NEED_ORDERED_EVENTS(entry)) { entry->pickevent = pick_event_ordered; entry->queueevent = queue_ordered_event; } else { entry->pickevent = pick_event_unordered; entry->queueevent = queue_unordered_event; } ret = gf_thread_create(&ev->invoker, NULL, gf_changelog_callback_invoker, ev, "clogcbki"); if (ret != 0) { entry->pickevent = NULL; entry->queueevent = NULL; goto cleanup_cond; } return 0; cleanup_cond: (void)pthread_cond_destroy(&ev->cond); cleanup_mutex: (void)pthread_mutex_destroy(&ev->lock); error_return: return -1; } /** * TODO: * - cleanup invoker thread * - cleanup event list * - destroy rpc{-clnt, svc} */ int gf_cleanup_brick_connection(xlator_t *this, gf_changelog_t *entry) { return 0; } int gf_cleanup_connections(xlator_t *this) { return 0; } static int gf_setup_brick_connection(xlator_t *this, struct gf_brick_spec *brick, gf_boolean_t ordered, void *xl) { int ret = 0; gf_private_t *priv = NULL; gf_changelog_t *entry = NULL; priv = this->private; if (!brick->callback || !brick->init || !brick->fini) goto error_return; entry = GF_CALLOC(1, sizeof(*entry), gf_changelog_mt_libgfchangelog_t); if (!entry) goto error_return; INIT_LIST_HEAD(&entry->list); LOCK_INIT(&entry->statelock); entry->connstate = GF_CHANGELOG_CONN_STATE_PENDING; entry->notify = brick->filter; if (snprintf(entry->brick, PATH_MAX, "%s", brick->brick_path) >= PATH_MAX) goto free_entry; entry->this = this; entry->invokerxl = xl; entry->ordered = ordered; ret = gf_init_event(entry); if (ret) goto free_entry; entry->fini = brick->fini; entry->callback = brick->callback; entry->connected = brick->connected; entry->disconnected = brick->disconnected; entry->ptr = brick->init(this, brick); if (!entry->ptr) goto cleanup_event; priv->api = entry->ptr; /* pointer to API, if required */ pthread_mutex_lock(&priv->lock); { list_add_tail(&entry->list, &priv->connections); } pthread_mutex_unlock(&priv->lock); ret = gf_changelog_setup_rpc(this, entry, CHANGELOG_RPC_PROBE_FILTER); if (ret) goto cleanup_event; return 0; cleanup_event: (void)gf_cleanup_event(this, &entry->event); free_entry: gf_msg_debug(this->name, 0, "freeing entry %p", entry); list_del(&entry->list); /* FIXME: kludge for now */ GF_FREE(entry); error_return: return -1; } int gf_changelog_register_brick(xlator_t *this, struct gf_brick_spec *brick, gf_boolean_t ordered, void *xl) { return gf_setup_brick_connection(this, brick, ordered, xl); } static int gf_changelog_setup_logging(xlator_t *this, char *logfile, int loglevel) { /* passing ident as NULL means to use default ident for syslog */ if (gf_log_init(this->ctx, logfile, NULL)) return -1; gf_log_set_loglevel(this->ctx, (loglevel == -1) ? GF_LOG_INFO : loglevel); return 0; } static int gf_changelog_set_master(xlator_t *master, void *xl) { int32_t ret = 0; xlator_t *this = NULL; xlator_t *old_this = NULL; gf_private_t *priv = NULL; this = xl; if (!this || !this->ctx) { ret = gf_changelog_init_master(); if (ret) return -1; this = THIS; } master->ctx = this->ctx; INIT_LIST_HEAD(&master->volume_options); SAVE_THIS(THIS); ret = xlator_mem_acct_init(THIS, gf_changelog_mt_end); if (ret != 0) goto restore_this; priv = gf_changelog_alloc_priv(); if (!priv) { ret = -1; goto restore_this; } if (!xl) { /* poller thread */ ret = gf_thread_create(&priv->poller, NULL, changelog_rpc_poller, THIS, "clogpoll"); if (ret != 0) { GF_FREE(priv); gf_msg(master->name, GF_LOG_ERROR, 0, CHANGELOG_LIB_MSG_THREAD_CREATION_FAILED, "failed to spawn poller thread"); goto restore_this; } } master->private = priv; restore_this: RESTORE_THIS(); return ret; } int gf_changelog_init(void *xl) { int ret = 0; gf_private_t *priv = NULL; if (master) return 0; master = calloc(1, sizeof(*master)); if (!master) goto error_return; master->name = strdup("gfchangelog"); if (!master->name) goto dealloc_master; ret = gf_changelog_set_master(master, xl); if (ret) goto dealloc_name; priv = master->private; ret = gf_thread_create(&priv->connectionjanitor, NULL, gf_changelog_connection_janitor, master, "clogjan"); if (ret != 0) { /* TODO: cleanup priv, mutex (poller thread for !xl) */ goto dealloc_name; } return 0; dealloc_name: free(master->name); dealloc_master: free(master); master = NULL; error_return: return -1; } int gf_changelog_register_generic(struct gf_brick_spec *bricks, int count, int ordered, char *logfile, int lvl, void *xl) { int ret = 0; xlator_t *this = NULL; xlator_t *old_this = NULL; struct gf_brick_spec *brick = NULL; gf_boolean_t need_order = _gf_false; SAVE_THIS(xl); this = THIS; if (!this) goto error_return; ret = gf_changelog_setup_logging(this, logfile, lvl); if (ret) goto error_return; need_order = (ordered) ? _gf_true : _gf_false; brick = bricks; while (count--) { gf_smsg(this->name, GF_LOG_INFO, 0, CHANGELOG_LIB_MSG_NOTIFY_REGISTER_INFO, "Registering brick", "brick=%s", brick->brick_path, "notify_filter=%d", brick->filter, NULL); ret = gf_changelog_register_brick(this, brick, need_order, xl); if (ret != 0) { gf_msg(this->name, GF_LOG_ERROR, 0, CHANGELOG_LIB_MSG_NOTIFY_REGISTER_FAILED, "Error registering with changelog xlator"); break; } brick++; } if (ret != 0) goto cleanup_inited_bricks; RESTORE_THIS(); return 0; cleanup_inited_bricks: gf_cleanup_connections(this); error_return: RESTORE_THIS(); return -1; } /** * @API * gf_changelog_register() * * This is _NOT_ a generic register API. It's a special API to handle * updates at a journal granulality. This is used by consumers wanting * to process persistent journal such as geo-replication via a set of * APIs. All of this is required to maintain backward compatibility. * Owner specific private data is stored in ->api (in gf_private_t), * which is used by APIs to access it's private data. This limits * the library access to a single brick, but that's how it used to * be anyway. Furthermore, this API solely _owns_ "this", therefore * callers already having a notion of "this" are expected to use the * newer API. * * Newer applications wanting to use this library need not face this * limitation and reply of the much more feature rich generic register * API, which is purely callback based. * * NOTE: @max_reconnects is not used but required for backward compat. * * For generic API, refer gf_changelog_register_generic(). */ int gf_changelog_register(char *brick_path, char *scratch_dir, char *log_file, int log_level, int max_reconnects) { struct gf_brick_spec brick = { 0, }; if (master) THIS = master; else return -1; brick.brick_path = brick_path; brick.filter = CHANGELOG_OP_TYPE_JOURNAL; brick.init = gf_changelog_journal_init; brick.fini = gf_changelog_journal_fini; brick.callback = gf_changelog_handle_journal; brick.connected = gf_changelog_journal_connect; brick.disconnected = gf_changelog_journal_disconnect; brick.ptr = scratch_dir; return gf_changelog_register_generic(&brick, 1, 1, log_file, log_level, NULL); }