summaryrefslogtreecommitdiffstats
path: root/xlators
diff options
context:
space:
mode:
authorRaghavendra G <rgowdapp@redhat.com>2014-08-11 10:14:18 +0530
committerVijay Bellur <vbellur@redhat.com>2014-08-26 04:56:29 -0700
commita1fe3d72e373bf0deaed152842d12d94bb9129dc (patch)
treea7c50b39c25885f5a378fbe5b07e585c4ee2be8c /xlators
parent27daeba0f7e02669919a7d89a04ea8fe39e06898 (diff)
cluster/dht: introduce locking api.
Change-Id: I41389ba91951d3e63e617aa32cd0bee848261c72 BUG: 1130888 Signed-off-by: Raghavendra G <rgowdapp@redhat.com> Reviewed-on: http://review.gluster.org/8521 Reviewed-by: Pranith Kumar Karampuri <pkarampu@redhat.com> Tested-by: Gluster Build System <jenkins@build.gluster.com> Reviewed-by: Vijay Bellur <vbellur@redhat.com>
Diffstat (limited to 'xlators')
-rw-r--r--xlators/cluster/dht/src/Makefile.am2
-rw-r--r--xlators/cluster/dht/src/dht-common.h56
-rw-r--r--xlators/cluster/dht/src/dht-helper.c568
-rw-r--r--xlators/cluster/dht/src/dht-helper.h19
-rw-r--r--xlators/cluster/dht/src/dht-shared.c14
5 files changed, 658 insertions, 1 deletions
diff --git a/xlators/cluster/dht/src/Makefile.am b/xlators/cluster/dht/src/Makefile.am
index 46a2bcd06b8..ab58affe2b4 100644
--- a/xlators/cluster/dht/src/Makefile.am
+++ b/xlators/cluster/dht/src/Makefile.am
@@ -20,7 +20,7 @@ nufa_la_LIBADD = $(top_builddir)/libglusterfs/src/libglusterfs.la
switch_la_LDFLAGS = -module -avoid-version
switch_la_LIBADD = $(top_builddir)/libglusterfs/src/libglusterfs.la
-noinst_HEADERS = dht-common.h dht-mem-types.h dht-messages.h \
+noinst_HEADERS = dht-common.h dht-mem-types.h dht-messages.h dht-helper.h \
$(top_builddir)/xlators/lib/src/libxlator.h
AM_CPPFLAGS = $(GF_CPPFLAGS) -I$(top_srcdir)/libglusterfs/src \
diff --git a/xlators/cluster/dht/src/dht-common.h b/xlators/cluster/dht/src/dht-common.h
index ab0605e9317..c310b6c543b 100644
--- a/xlators/cluster/dht/src/dht-common.h
+++ b/xlators/cluster/dht/src/dht-common.h
@@ -120,6 +120,15 @@ struct dht_skip_linkto_unlink {
uuid_t hashed_gfid;
};
+typedef struct {
+ xlator_t *xl;
+ loc_t loc; /* contains/points to inode to lock on. */
+ short type; /* read/write lock. */
+ char *domain; /* Only locks within a single domain
+ * contend with each other
+ */
+ gf_boolean_t locked;
+} dht_lock_t;
struct dht_local {
int call_cnt;
@@ -214,6 +223,15 @@ struct dht_local {
struct dht_skip_linkto_unlink skip_unlink;
+ struct {
+ fop_inodelk_cbk_t inodelk_cbk;
+ dht_lock_t **locks;
+ int lk_count;
+
+ /* whether locking failed on _any_ of the "locks" above */
+ int op_ret;
+ int op_errno;
+ } lock;
};
typedef struct dht_local dht_local_t;
@@ -335,6 +353,8 @@ struct dht_conf {
/* Support size-weighted rebalancing (heterogeneous bricks). */
gf_boolean_t do_weighting;
gf_boolean_t randomize_by_gfid;
+
+ struct mem_pool *lock_pool;
};
typedef struct dht_conf dht_conf_t;
@@ -834,4 +854,40 @@ dht_lookup_everywhere_done (call_frame_t *frame, xlator_t *this);
int
dht_fill_dict_to_avoid_unlink_of_migrating_file (dict_t *dict);
+
+
+/* Acquire non-blocking inodelk on a list of xlators.
+ *
+ * @lk_array: array of lock requests lock on.
+ *
+ * @lk_count: number of locks in @lk_array
+ *
+ * @inodelk_cbk: will be called after inodelk replies are received
+ *
+ * @retval: -1 if stack_winding inodelk fails. 0 otherwise.
+ * inodelk_cbk is called with appropriate error on errors.
+ * On failure to acquire lock on all members of list, successful
+ * locks are unlocked before invoking cbk.
+ */
+
+int
+dht_nonblocking_inodelk (call_frame_t *frame, dht_lock_t **lk_array,
+ int lk_count, fop_inodelk_cbk_t inodelk_cbk);
+
+/* same as dht_nonblocking_inodelk, but issues sequential blocking locks on
+ * @lk_array directly. locks are issued on some order which remains same
+ * for a list of xlators (irrespective of order of xlators within list).
+ */
+int
+dht_blocking_inodelk (call_frame_t *frame, dht_lock_t **lk_array,
+ int lk_count, fop_inodelk_cbk_t inodelk_cbk);
+
+int32_t
+dht_unlock_inodelk (call_frame_t *frame, dht_lock_t **lk_array, int lk_count,
+ fop_inodelk_cbk_t inodelk_cbk);
+
+dht_lock_t *
+dht_lock_new (xlator_t *this, xlator_t *xl, loc_t *loc, short type,
+ const char *domain);
+
#endif/* _DHT_H */
diff --git a/xlators/cluster/dht/src/dht-helper.c b/xlators/cluster/dht/src/dht-helper.c
index 7c24947787d..2655f716d9a 100644
--- a/xlators/cluster/dht/src/dht-helper.c
+++ b/xlators/cluster/dht/src/dht-helper.c
@@ -17,6 +17,7 @@
#include "glusterfs.h"
#include "xlator.h"
#include "dht-common.h"
+#include "dht-helper.h"
static inline int
dht_inode_ctx_set1 (xlator_t *this, inode_t *inode, xlator_t *subvol)
@@ -256,6 +257,164 @@ out:
return 0;
}
+char *
+dht_lock_asprintf (dht_lock_t *lock)
+{
+ char *lk_buf = NULL;
+ char gfid[GF_UUID_BUF_SIZE] = {0, };
+
+ if (lock == NULL)
+ goto out;
+
+ uuid_utoa_r (lock->loc.gfid, gfid);
+
+ gf_asprintf (&lk_buf, "%s:%s", lock->xl->name, gfid);
+
+out:
+ return lk_buf;
+}
+
+void
+dht_log_lk_array (char *name, gf_loglevel_t log_level, dht_lock_t **lk_array,
+ int count)
+{
+ int i = 0;
+ char *lk_buf = NULL;
+
+ if ((lk_array == NULL) || (count == 0))
+ goto out;
+
+ for (i = 0; i < count; i++) {
+ lk_buf = dht_lock_asprintf (lk_array[i]);
+ gf_log (name, log_level, "%d. %s", i, lk_buf);
+ GF_FREE (lk_buf);
+ }
+
+out:
+ return;
+}
+
+void
+dht_lock_stack_destroy (call_frame_t *lock_frame)
+{
+ dht_local_t *local = NULL;
+
+ local = lock_frame->local;
+
+ local->lock.locks = NULL;
+ local->lock.lk_count = 0;
+
+ DHT_STACK_DESTROY (lock_frame);
+ return;
+}
+
+void
+dht_lock_free (dht_lock_t *lock)
+{
+ if (lock == NULL)
+ goto out;
+
+ loc_wipe (&lock->loc);
+ GF_FREE (lock->domain);
+ mem_put (lock);
+
+out:
+ return;
+}
+
+void
+dht_lock_array_free (dht_lock_t **lk_array, int count)
+{
+ int i = 0;
+ dht_lock_t *lock = NULL;
+
+ if (lk_array == NULL)
+ goto out;
+
+ for (i = 0; i < count; i++) {
+ lock = lk_array[i];
+ lk_array[i] = NULL;
+ dht_lock_free (lock);
+ }
+
+out:
+ return;
+}
+
+dht_lock_t *
+dht_lock_new (xlator_t *this, xlator_t *xl, loc_t *loc, short type,
+ const char *domain)
+{
+ dht_conf_t *conf = NULL;
+ dht_lock_t *lock = NULL;
+
+ conf = this->private;
+
+ lock = mem_get0 (conf->lock_pool);
+ if (lock == NULL)
+ goto out;
+
+ lock->xl = xl;
+ lock->type = type;
+ lock->domain = gf_strdup (domain);
+ if (lock->domain == NULL) {
+ dht_lock_free (lock);
+ lock = NULL;
+ goto out;
+ }
+
+ /* Fill only inode and gfid.
+ posix and protocol/server give preference to pargfid/basename over
+ gfid/inode for resolution if all the three parameters of loc_t are
+ present. I want to avoid the following hypothetical situation:
+
+ 1. rebalance did a lookup on a dentry and got a gfid.
+ 2. rebalance acquires lock on loc_t which was filled with gfid and
+ path (pargfid/bname) from step 1.
+ 3. somebody deleted and recreated the same file
+ 4. rename on the same path acquires lock on loc_t which now points
+ to a different inode (and hence gets the lock).
+ 5. rebalance continues to migrate file (note that not all fops done
+ by rebalance during migration are inode/gfid based Eg., unlink)
+ 6. rename continues.
+ */
+ lock->loc.inode = inode_ref (loc->inode);
+ loc_gfid (loc, lock->loc.gfid);
+
+out:
+ return lock;
+}
+
+int
+dht_local_lock_init (call_frame_t *frame, dht_lock_t **lk_array,
+ int lk_count, fop_inodelk_cbk_t inodelk_cbk)
+{
+ int ret = -1;
+ dht_local_t *local = NULL;
+
+ local = frame->local;
+
+ if (local == NULL) {
+ local = dht_local_init (frame, NULL, NULL, 0);
+ }
+
+ if (local == NULL) {
+ goto out;
+ }
+
+ local->lock.inodelk_cbk = inodelk_cbk;
+ local->lock.locks = lk_array;
+ local->lock.lk_count = lk_count;
+
+ ret = dht_lock_order_requests (local->lock.locks,
+ local->lock.lk_count);
+ if (ret < 0)
+ goto out;
+
+ ret = 0;
+out:
+ return ret;
+}
void
dht_local_wipe (xlator_t *this, dht_local_t *local)
@@ -303,6 +462,9 @@ dht_local_wipe (xlator_t *this, dht_local_t *local)
local->selfheal.layout = NULL;
}
+ dht_lock_array_free (local->lock.locks, local->lock.lk_count);
+ GF_FREE (local->lock.locks);
+
GF_FREE (local->newpath);
GF_FREE (local->key);
@@ -1220,3 +1382,409 @@ dht_subvol_status (dht_conf_t *conf, xlator_t *subvol)
}
return 0;
}
+
+void
+dht_inodelk_done (call_frame_t *lock_frame)
+{
+ fop_inodelk_cbk_t inodelk_cbk = NULL;
+ call_frame_t *main_frame = NULL;
+ dht_local_t *local = NULL;
+
+ local = lock_frame->local;
+ main_frame = local->main_frame;
+
+ local->lock.locks = NULL;
+ local->lock.lk_count = 0;
+
+ inodelk_cbk = local->lock.inodelk_cbk;
+ local->lock.inodelk_cbk = NULL;
+
+ inodelk_cbk (main_frame, NULL, main_frame->this, local->lock.op_ret,
+ local->lock.op_errno, NULL);
+
+ dht_lock_stack_destroy (lock_frame);
+ return;
+}
+
+int
+dht_inodelk_cleanup_cbk (call_frame_t *frame, void *cookie,
+ xlator_t *this, int32_t op_ret, int32_t op_errno,
+ dict_t *xdata)
+{
+ dht_inodelk_done (frame);
+ return 0;
+}
+
+inline int32_t
+dht_lock_count (dht_lock_t **lk_array, int lk_count)
+{
+ int i = 0, locked = 0;
+
+ if ((lk_array == NULL) || (lk_count == 0))
+ goto out;
+
+ for (i = 0; i < lk_count; i++) {
+ if (lk_array[i]->locked)
+ locked++;
+ }
+out:
+ return locked;
+}
+
+void
+dht_inodelk_cleanup (call_frame_t *lock_frame)
+{
+ dht_lock_t **lk_array = NULL;
+ int lk_count = 0, lk_acquired = 0;
+ dht_local_t *local = NULL;
+
+ local = lock_frame->local;
+
+ lk_array = local->lock.locks;
+ lk_count = local->lock.lk_count;
+
+ lk_acquired = dht_lock_count (lk_array, lk_count);
+ if (lk_acquired != 0) {
+ dht_unlock_inodelk (lock_frame, lk_array, lk_count,
+ dht_inodelk_cleanup_cbk);
+ } else {
+ dht_inodelk_done (lock_frame);
+ }
+
+ return;
+}
+
+int32_t
+dht_unlock_inodelk_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno, dict_t *xdata)
+{
+ dht_local_t *local = NULL;
+ int lk_index = 0, call_cnt = 0;
+ char gfid[GF_UUID_BUF_SIZE] = {0};
+
+ lk_index = (long) cookie;
+
+ local = frame->local;
+ if (op_ret < 0) {
+ uuid_utoa_r (local->lock.locks[lk_index]->loc.gfid,
+ gfid);
+
+ gf_log (this->name, GF_LOG_WARNING,
+ "unlocking failed on %s:%s (%s)",
+ local->lock.locks[lk_index]->xl->name,
+ gfid, strerror (op_errno));
+ } else {
+ local->lock.locks[lk_index]->locked = 0;
+ }
+
+ call_cnt = dht_frame_return (frame);
+ if (is_last_call (call_cnt)) {
+ dht_inodelk_done (frame);
+ }
+
+ return 0;
+}
+
+call_frame_t *
+dht_lock_frame (call_frame_t *parent_frame)
+{
+ call_frame_t *lock_frame = NULL;
+
+ lock_frame = copy_frame (parent_frame);
+ if (lock_frame == NULL)
+ goto out;
+
+ set_lk_owner_from_ptr (&lock_frame->root->lk_owner, parent_frame->root);
+
+out:
+ return lock_frame;
+}
+
+int32_t
+dht_unlock_inodelk (call_frame_t *frame, dht_lock_t **lk_array, int lk_count,
+ fop_inodelk_cbk_t inodelk_cbk)
+{
+ dht_local_t *local = NULL;
+ struct gf_flock flock = {0,};
+ int ret = -1 , i = 0;
+ call_frame_t *lock_frame = NULL;
+ int call_cnt = 0;
+
+ GF_VALIDATE_OR_GOTO ("dht-locks", frame, done);
+ GF_VALIDATE_OR_GOTO (frame->this->name, lk_array, done);
+ GF_VALIDATE_OR_GOTO (frame->this->name, inodelk_cbk, done);
+
+ call_cnt = dht_lock_count (lk_array, lk_count);
+ if (call_cnt == 0) {
+ ret = 0;
+ goto done;
+ }
+
+ lock_frame = dht_lock_frame (frame);
+ if (lock_frame == NULL) {
+ gf_log (frame->this->name, GF_LOG_WARNING,
+ "cannot allocate a frame, not unlocking following "
+ "locks:");
+
+ dht_log_lk_array (frame->this->name, GF_LOG_WARNING, lk_array,
+ lk_count);
+ goto done;
+ }
+
+ ret = dht_local_lock_init (lock_frame, lk_array, lk_count, inodelk_cbk);
+ if (ret < 0) {
+ gf_log (frame->this->name, GF_LOG_WARNING,
+ "storing locks in local failed, not unlocking "
+ "following locks:");
+
+ dht_log_lk_array (frame->this->name, GF_LOG_WARNING, lk_array,
+ lk_count);
+
+ goto done;
+ }
+
+ local = lock_frame->local;
+ local->main_frame = frame;
+ local->call_cnt = call_cnt;
+
+ flock.l_type = F_UNLCK;
+
+ for (i = 0; i < local->lock.lk_count; i++) {
+ if (!local->lock.locks[i]->locked)
+ continue;
+
+ STACK_WIND_COOKIE (lock_frame, dht_unlock_inodelk_cbk,
+ (void *)(long)i,
+ local->lock.locks[i]->xl,
+ local->lock.locks[i]->xl->fops->inodelk,
+ local->lock.locks[i]->domain,
+ &local->lock.locks[i]->loc, F_SETLK,
+ &flock, NULL);
+ }
+
+ return 0;
+
+done:
+ if (lock_frame)
+ dht_lock_stack_destroy (lock_frame);
+
+ return ret;
+}
+
+int32_t
+dht_nonblocking_inodelk_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno, dict_t *xdata)
+{
+ dht_local_t *local = NULL;
+ int lk_index = 0, call_cnt = 0;
+ char gfid[GF_UUID_BUF_SIZE] = {0};
+
+ local = frame->local;
+ lk_index = (long) cookie;
+
+ if (op_ret == -1) {
+ local->lock.op_ret = -1;
+ local->lock.op_errno = op_errno;
+
+ if (local && local->lock.locks[lk_index]) {
+ uuid_utoa_r (local->lock.locks[lk_index]->loc.inode->gfid,
+ gfid);
+
+ gf_log (this->name, GF_LOG_DEBUG,
+ "inodelk failed on gfid: %s "
+ "subvolume: %s (%s)", gfid,
+ local->lock.locks[lk_index]->xl->name,
+ strerror (op_errno));
+ }
+
+ goto out;
+ }
+
+ local->lock.locks[lk_index]->locked = _gf_true;
+
+out:
+ call_cnt = dht_frame_return (frame);
+ if (is_last_call (call_cnt)) {
+ if (local->lock.op_ret < 0) {
+ dht_inodelk_cleanup (frame);
+ return 0;
+ }
+
+ dht_inodelk_done (frame);
+ }
+
+ return 0;
+}
+
+int
+dht_nonblocking_inodelk (call_frame_t *frame, dht_lock_t **lk_array,
+ int lk_count, fop_inodelk_cbk_t inodelk_cbk)
+{
+ struct gf_flock flock = {0,};
+ int i = 0, ret = 0;
+ dht_local_t *local = NULL;
+ call_frame_t *lock_frame = NULL;
+
+ GF_VALIDATE_OR_GOTO ("dht-locks", frame, out);
+ GF_VALIDATE_OR_GOTO (frame->this->name, lk_array, out);
+ GF_VALIDATE_OR_GOTO (frame->this->name, inodelk_cbk, out);
+
+ lock_frame = dht_lock_frame (frame);
+ if (lock_frame == NULL)
+ goto out;
+
+ ret = dht_local_lock_init (lock_frame, lk_array, lk_count, inodelk_cbk);
+ if (ret < 0) {
+ goto out;
+ }
+
+ local = lock_frame->local;
+ local->main_frame = frame;
+
+ local->call_cnt = lk_count;
+
+ for (i = 0; i < local->lock.lk_count; i++) {
+ flock.l_type = local->lock.locks[i]->type;
+
+ STACK_WIND_COOKIE (lock_frame, dht_nonblocking_inodelk_cbk,
+ (void *) (long) i,
+ local->lock.locks[i]->xl,
+ local->lock.locks[i]->xl->fops->inodelk,
+ local->lock.locks[i]->domain,
+ &local->lock.locks[i]->loc, F_SETLK,
+ &flock, NULL);
+ }
+
+ return 0;
+
+out:
+ if (lock_frame)
+ dht_lock_stack_destroy (lock_frame);
+
+ return -1;
+}
+
+int32_t
+dht_blocking_inodelk_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno, dict_t *xdata)
+{
+ int lk_index = 0;
+ dht_local_t *local = NULL;
+
+ lk_index = (long) cookie;
+
+ local = frame->local;
+
+ if (op_ret == 0) {
+ local->lock.locks[lk_index]->locked = _gf_true;
+ } else {
+ local->lock.op_ret = -1;
+ local->lock.op_errno = op_errno;
+ goto cleanup;
+ }
+
+ if (lk_index == (local->lock.lk_count - 1)) {
+ dht_inodelk_done (frame);
+ } else {
+ dht_blocking_inodelk_rec (frame, ++lk_index);
+ }
+
+ return 0;
+
+cleanup:
+ dht_inodelk_cleanup (frame);
+
+ return 0;
+}
+
+void
+dht_blocking_inodelk_rec (call_frame_t *frame, int i)
+{
+ dht_local_t *local = NULL;
+ struct gf_flock flock = {0,};
+
+ local = frame->local;
+
+ flock.l_type = local->lock.locks[i]->type;
+
+ STACK_WIND_COOKIE (frame, dht_blocking_inodelk_cbk,
+ (void *) (long) i,
+ local->lock.locks[i]->xl,
+ local->lock.locks[i]->xl->fops->inodelk,
+ local->lock.locks[i]->domain,
+ &local->lock.locks[i]->loc, F_SETLKW, &flock, NULL);
+
+ return;
+}
+
+int
+dht_lock_request_cmp (const void *val1, const void *val2)
+{
+ dht_lock_t *lock1 = NULL;
+ dht_lock_t *lock2 = NULL;
+ int ret = 0;
+
+ lock1 = (dht_lock_t *)val1;
+ lock2 = (dht_lock_t *)val2;
+
+ GF_VALIDATE_OR_GOTO ("dht-locks", lock1, out);
+ GF_VALIDATE_OR_GOTO ("dht-locks", lock2, out);
+
+ ret = strcmp (lock1->xl->name, lock2->xl->name);
+
+ if (ret == 0) {
+ ret = uuid_compare (lock1->loc.gfid, lock2->loc.gfid);
+ }
+
+out:
+ return ret;
+}
+
+int
+dht_lock_order_requests (dht_lock_t **locks, int count)
+{
+ int ret = -1;
+
+ if (!locks || !count)
+ goto out;
+
+ qsort (locks, count, sizeof (*locks), dht_lock_request_cmp);
+ ret = 0;
+
+out:
+ return ret;
+}
+
+int
+dht_blocking_inodelk (call_frame_t *frame, dht_lock_t **lk_array,
+ int lk_count, fop_inodelk_cbk_t inodelk_cbk)
+{
+ int ret = -1;
+ call_frame_t *lock_frame = NULL;
+ dht_local_t *local = NULL;
+
+ GF_VALIDATE_OR_GOTO ("dht-locks", frame, out);
+ GF_VALIDATE_OR_GOTO (frame->this->name, lk_array, out);
+ GF_VALIDATE_OR_GOTO (frame->this->name, inodelk_cbk, out);
+
+ lock_frame = dht_lock_frame (frame);
+ if (lock_frame == NULL)
+ goto out;
+
+ ret = dht_local_lock_init (lock_frame, lk_array, lk_count, inodelk_cbk);
+ if (ret < 0) {
+ goto out;
+ }
+
+ local = lock_frame->local;
+ local->main_frame = frame;
+
+ dht_blocking_inodelk_rec (lock_frame, 0);
+
+ return 0;
+out:
+ if (lock_frame)
+ dht_lock_stack_destroy (lock_frame);
+
+ return -1;
+}
diff --git a/xlators/cluster/dht/src/dht-helper.h b/xlators/cluster/dht/src/dht-helper.h
new file mode 100644
index 00000000000..e3ab9c4d93b
--- /dev/null
+++ b/xlators/cluster/dht/src/dht-helper.h
@@ -0,0 +1,19 @@
+/*
+ Copyright (c) 2008-2014 Red Hat, Inc. <http://www.redhat.com>
+ This file is part of GlusterFS.
+
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
+*/
+#ifndef _DHT_HELPER_H
+#define _DHT_HELPER_H
+
+int
+dht_lock_order_requests (dht_lock_t **lk_array, int count);
+
+void
+dht_blocking_inodelk_rec (call_frame_t *frame, int i);
+
+#endif
diff --git a/xlators/cluster/dht/src/dht-shared.c b/xlators/cluster/dht/src/dht-shared.c
index f8faecf6870..22a7260f829 100644
--- a/xlators/cluster/dht/src/dht-shared.c
+++ b/xlators/cluster/dht/src/dht-shared.c
@@ -218,6 +218,9 @@ dht_fini (xlator_t *this)
GF_FREE (conf->subvolume_status);
+ if (conf->lock_pool)
+ mem_pool_destroy (conf->lock_pool);
+
GF_FREE (conf);
}
out:
@@ -663,6 +666,14 @@ dht_init (xlator_t *this)
GF_OPTION_INIT ("weighted-rebalance", conf->do_weighting, bool, err);
+ conf->lock_pool = mem_pool_new (dht_lock_t, 512);
+ if (!conf->lock_pool) {
+ gf_msg (this->name, GF_LOG_ERROR, 0, DHT_MSG_INIT_FAILED,
+ "failed to create lock mem_pool, failing "
+ "initialization");
+ goto err;
+ }
+
this->private = conf;
return 0;
@@ -688,6 +699,9 @@ err:
GF_FREE (conf->link_xattr_name);
GF_FREE (conf->wild_xattr_name);
+ if (conf->lock_pool)
+ mem_pool_destroy (conf->lock_pool);
+
GF_FREE (conf);
}