summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorKrishnan Parthasarathi <kparthas@redhat.com>2013-02-20 14:44:23 +0530
committerAnand Avati <avati@redhat.com>2013-03-06 22:06:30 -0800
commit8224bc6111b3bf5a710b6e5315b39b85904f3fe1 (patch)
tree02f2ca47808d8249ed3cdbf76b1cd46e5e2e2d4a
parente020c861579790cc0e1c183d9d5d518f43951712 (diff)
glusterd: Increasing throughput of synctask based mgmt ops.
Change-Id: Ibd963f78707b157fc4c9729aa87206cfd5ecfe81 BUG: 913662 Signed-off-by: Krishnan Parthasarathi <kparthas@redhat.com> Reviewed-on: http://review.gluster.org/4638 Tested-by: Gluster Build System <jenkins@build.gluster.com> Reviewed-by: Jeff Darcy <jdarcy@redhat.com>
-rw-r--r--libglusterfs/src/syncop.h1
-rw-r--r--xlators/mgmt/glusterd/src/glusterd-mem-types.h3
-rw-r--r--xlators/mgmt/glusterd/src/glusterd-syncop.c963
-rw-r--r--xlators/mgmt/glusterd/src/glusterd-syncop.h22
4 files changed, 564 insertions, 425 deletions
diff --git a/libglusterfs/src/syncop.h b/libglusterfs/src/syncop.h
index d4086291a47..764f2960545 100644
--- a/libglusterfs/src/syncop.h
+++ b/libglusterfs/src/syncop.h
@@ -114,6 +114,7 @@ struct syncargs {
uuid_t uuid;
char *errstr;
dict_t *dict;
+ pthread_mutex_t lock_dict;
/* do not touch */
struct synctask *task;
diff --git a/xlators/mgmt/glusterd/src/glusterd-mem-types.h b/xlators/mgmt/glusterd/src/glusterd-mem-types.h
index a13236da1fe..98216e28ab0 100644
--- a/xlators/mgmt/glusterd/src/glusterd-mem-types.h
+++ b/xlators/mgmt/glusterd/src/glusterd-mem-types.h
@@ -65,7 +65,8 @@ typedef enum gf_gld_mem_types_ {
gf_gld_mt_charptr = gf_common_mt_end + 49,
gf_gld_mt_hooks_stub_t = gf_common_mt_end + 50,
gf_gld_mt_hooks_priv_t = gf_common_mt_end + 51,
- gf_gld_mt_end = gf_common_mt_end + 52,
+ gf_gld_mt_mop_commit_req_t = gf_common_mt_end + 52,
+ gf_gld_mt_end = gf_common_mt_end + 53,
} gf_gld_mem_types_t;
#endif
diff --git a/xlators/mgmt/glusterd/src/glusterd-syncop.c b/xlators/mgmt/glusterd/src/glusterd-syncop.c
index 370f454df5c..1deb15d5e2f 100644
--- a/xlators/mgmt/glusterd/src/glusterd-syncop.c
+++ b/xlators/mgmt/glusterd/src/glusterd-syncop.c
@@ -8,7 +8,6 @@
cases as published by the Free Software Foundation.
*/
/* rpc related syncops */
-#include "syncop.h"
#include "rpc-clnt.h"
#include "protocol-common.h"
#include "xdr-generic.h"
@@ -19,6 +18,57 @@
#include "glusterd-op-sm.h"
#include "glusterd-utils.h"
+static void
+gd_collate_errors (struct syncargs *args, int op_ret, int op_errno,
+ char *op_errstr)
+{
+ if (args->op_ret)
+ return;
+ args->op_ret = op_ret;
+ args->op_errno = op_errno;
+ if (op_ret && op_errstr && strcmp (op_errstr, ""))
+ args->errstr = gf_strdup (op_errstr);
+}
+
+static void
+gd_syncargs_init (struct syncargs *args, dict_t *op_ctx)
+{
+ args->dict = op_ctx;
+ pthread_mutex_init (&args->lock_dict, NULL);
+}
+
+static void
+gd_stage_op_req_free (gd1_mgmt_stage_op_req *req)
+{
+ if (!req)
+ return;
+
+ GF_FREE (req->buf.buf_val);
+ GF_FREE (req);
+}
+
+static void
+gd_commit_op_req_free (gd1_mgmt_commit_op_req *req)
+{
+ if (!req)
+ return;
+
+ GF_FREE (req->buf.buf_val);
+ GF_FREE (req);
+}
+
+static void
+gd_brick_op_req_free (gd1_mgmt_brick_op_req *req)
+{
+ if (!req)
+ return;
+
+ if (strcmp (req->name, "") != 0)
+ GF_FREE (req->name);
+ GF_FREE (req->input.input_val);
+ GF_FREE (req);
+}
+
int
gd_syncop_submit_request (struct rpc_clnt *rpc, void *req,
void *cookie, rpc_clnt_prog_t *prog,
@@ -82,23 +132,83 @@ out:
extern struct rpc_clnt_program gd_mgmt_prog;
extern struct rpc_clnt_program gd_brick_prog;
+static int
+glusterd_syncop_aggr_rsp_dict (glusterd_op_t op, dict_t *aggr, dict_t *rsp)
+{
+ int ret = 0;
+
+ switch (op) {
+ case GD_OP_REPLACE_BRICK:
+ ret = glusterd_rb_use_rsp_dict (aggr, rsp);
+ if (ret)
+ goto out;
+ break;
+
+ case GD_OP_SYNC_VOLUME:
+ ret = glusterd_sync_use_rsp_dict (aggr, rsp);
+ if (ret)
+ goto out;
+ break;
+
+ case GD_OP_PROFILE_VOLUME:
+ ret = glusterd_profile_volume_use_rsp_dict (aggr, rsp);
+ if (ret)
+ goto out;
+ break;
+
+ case GD_OP_GSYNC_SET:
+ ret = glusterd_gsync_use_rsp_dict (aggr, rsp, NULL);
+ if (ret)
+ goto out;
+ break;
+
+ case GD_OP_STATUS_VOLUME:
+ ret = glusterd_volume_status_copy_to_op_ctx_dict (aggr, rsp);
+ if (ret)
+ goto out;
+ break;
+
+ case GD_OP_REBALANCE:
+ case GD_OP_DEFRAG_BRICK_VOLUME:
+ ret = glusterd_volume_rebalance_use_rsp_dict (aggr, rsp);
+ if (ret)
+ goto out;
+ break;
+
+ case GD_OP_HEAL_VOLUME:
+ ret = glusterd_volume_heal_use_rsp_dict (aggr, rsp);
+ if (ret)
+ goto out;
+
+ break;
+
+ case GD_OP_CLEARLOCKS_VOLUME:
+ ret = glusterd_volume_clearlocks_use_rsp_dict (aggr, rsp);
+ if (ret)
+ goto out;
+
+ break;
+
+ default:
+ break;
+ }
+out:
+ return ret;
+}
+
int32_t
gd_syncop_mgmt_lock_cbk (struct rpc_req *req, struct iovec *iov,
int count, void *myframe)
{
- struct syncargs *args = NULL;
- gd1_mgmt_cluster_lock_rsp rsp = {{0},};
- int ret = -1;
- call_frame_t *frame = NULL;
+ int ret = -1;
+ struct syncargs *args = NULL;
+ gd1_mgmt_cluster_lock_rsp rsp = {{0},};
+ call_frame_t *frame = NULL;
- frame = myframe;
- args = frame->local;
+ frame = myframe;
+ args = frame->local;
frame->local = NULL;
- /* initialize */
- args->op_ret = -1;
- args->op_errno = EINVAL;
-
if (-1 == req->rpc_status) {
args->op_errno = ENOTCONN;
goto out;
@@ -106,64 +216,49 @@ gd_syncop_mgmt_lock_cbk (struct rpc_req *req, struct iovec *iov,
ret = xdr_to_generic (*iov, &rsp,
(xdrproc_t)xdr_gd1_mgmt_cluster_lock_rsp);
- if (ret < 0) {
+ if (ret < 0)
goto out;
- }
-
- args->op_ret = rsp.op_ret;
- args->op_errno = rsp.op_errno;
+ gd_collate_errors (args, rsp.op_ret, rsp.op_errno, NULL);
uuid_copy (args->uuid, rsp.uuid);
out:
STACK_DESTROY (frame->root);
-
- __wake (args);
-
+ synctask_barrier_wake(args);
return 0;
}
int
-gd_syncop_mgmt_lock (struct rpc_clnt *rpc, uuid_t my_uuid, uuid_t recv_uuid)
+gd_syncop_mgmt_lock (struct rpc_clnt *rpc, struct syncargs *args,
+ uuid_t my_uuid, uuid_t recv_uuid)
{
- struct syncargs args = {0, };
+ int ret = -1;
gd1_mgmt_cluster_lock_req req = {{0},};
uuid_copy (req.uuid, my_uuid);
-
- args.op_ret = -1;
- args.op_errno = ENOTCONN;
-
- GD_SYNCOP (rpc, (&args), gd_syncop_mgmt_lock_cbk,
- &req, &gd_mgmt_prog, GLUSTERD_MGMT_CLUSTER_LOCK,
- xdr_gd1_mgmt_cluster_lock_req);
-
- if (!args.op_ret)
- uuid_copy (recv_uuid, args.uuid);
-
- errno = args.op_errno;
- return args.op_ret;
-
+ ret = gd_syncop_submit_request (rpc, &req, args, &gd_mgmt_prog,
+ GLUSTERD_MGMT_CLUSTER_LOCK,
+ gd_syncop_mgmt_lock_cbk,
+ (xdrproc_t) xdr_gd1_mgmt_cluster_lock_req);
+ if (ret)
+ synctask_barrier_wake(args);
+ return ret;
}
int32_t
gd_syncop_mgmt_unlock_cbk (struct rpc_req *req, struct iovec *iov,
int count, void *myframe)
{
- struct syncargs *args = NULL;
- gd1_mgmt_cluster_unlock_rsp rsp = {{0},};
- int ret = -1;
- call_frame_t *frame = NULL;
+ int ret = -1;
+ struct syncargs *args = NULL;
+ gd1_mgmt_cluster_unlock_rsp rsp = {{0},};
+ call_frame_t *frame = NULL;
frame = myframe;
- args = frame->local;
+ args = frame->local;
frame->local = NULL;
- /* initialize */
- args->op_ret = -1;
- args->op_errno = EINVAL;
-
if (-1 == req->rpc_status) {
args->op_errno = ENOTCONN;
goto out;
@@ -171,154 +266,143 @@ gd_syncop_mgmt_unlock_cbk (struct rpc_req *req, struct iovec *iov,
ret = xdr_to_generic (*iov, &rsp,
(xdrproc_t)xdr_gd1_mgmt_cluster_unlock_rsp);
- if (ret < 0) {
+ if (ret < 0)
goto out;
- }
-
- args->op_ret = rsp.op_ret;
- args->op_errno = rsp.op_errno;
+ gd_collate_errors (args, rsp.op_ret, rsp.op_errno, NULL);
uuid_copy (args->uuid, rsp.uuid);
out:
STACK_DESTROY (frame->root);
-
- __wake (args);
-
+ synctask_barrier_wake(args);
return 0;
}
int
-gd_syncop_mgmt_unlock (struct rpc_clnt *rpc, uuid_t my_uuid, uuid_t recv_uuid)
+gd_syncop_mgmt_unlock (struct rpc_clnt *rpc, struct syncargs *args,
+ uuid_t my_uuid, uuid_t recv_uuid)
{
- struct syncargs args = {0, };
- gd1_mgmt_cluster_unlock_req req = {{0},};
+ int ret = -1;
+ gd1_mgmt_cluster_unlock_req req = {{0},};
uuid_copy (req.uuid, my_uuid);
-
- args.op_ret = -1;
- args.op_errno = ENOTCONN;
-
- GD_SYNCOP (rpc, (&args), gd_syncop_mgmt_unlock_cbk,
- &req, &gd_mgmt_prog, GLUSTERD_MGMT_CLUSTER_UNLOCK,
- xdr_gd1_mgmt_cluster_unlock_req);
-
- if (!args.op_ret)
- uuid_copy (recv_uuid, args.uuid);
-
- errno = args.op_errno;
- return args.op_ret;
-
+ ret = gd_syncop_submit_request (rpc, &req, args, &gd_mgmt_prog,
+ GLUSTERD_MGMT_CLUSTER_UNLOCK,
+ gd_syncop_mgmt_unlock_cbk,
+ (xdrproc_t) xdr_gd1_mgmt_cluster_lock_req);
+ if (ret)
+ synctask_barrier_wake(args);
+ return ret;
}
int32_t
gd_syncop_stage_op_cbk (struct rpc_req *req, struct iovec *iov,
int count, void *myframe)
{
- struct syncargs *args = NULL;
- gd1_mgmt_stage_op_rsp rsp = {{0},};
- int ret = -1;
- call_frame_t *frame = NULL;
-
+ int ret = -1;
+ gd1_mgmt_stage_op_rsp rsp = {{0},};
+ struct syncargs *args = NULL;
+ xlator_t *this = NULL;
+ dict_t *rsp_dict = NULL;
+ call_frame_t *frame = NULL;
+
+ this = THIS;
frame = myframe;
- args = frame->local;
+ args = frame->local;
frame->local = NULL;
- /* initialize */
- args->op_ret = -1;
- args->op_errno = EINVAL;
-
if (-1 == req->rpc_status) {
+ args->op_ret = -1;
args->op_errno = ENOTCONN;
goto out;
}
ret = xdr_to_generic (*iov, &rsp,
(xdrproc_t)xdr_gd1_mgmt_stage_op_rsp);
- if (ret < 0) {
+ if (ret < 0)
goto out;
- }
if (rsp.dict.dict_len) {
/* Unserialize the dictionary */
- args->dict = dict_new ();
+ rsp_dict = dict_new ();
ret = dict_unserialize (rsp.dict.dict_val,
rsp.dict.dict_len,
- &args->dict);
+ &rsp_dict);
if (ret < 0) {
GF_FREE (rsp.dict.dict_val);
goto out;
} else {
- args->dict->extra_stdfree = rsp.dict.dict_val;
+ rsp_dict->extra_stdfree = rsp.dict.dict_val;
}
}
- args->op_ret = rsp.op_ret;
- args->op_errno = rsp.op_errno;
-
+ gd_collate_errors (args, rsp.op_ret, rsp.op_errno, rsp.op_errstr);
uuid_copy (args->uuid, rsp.uuid);
-
- args->errstr = gf_strdup (rsp.op_errstr);
+ if (rsp.op == GD_OP_REPLACE_BRICK) {
+ pthread_mutex_lock (&args->lock_dict);
+ {
+ ret = glusterd_syncop_aggr_rsp_dict (rsp.op, args->dict,
+ rsp_dict);
+ if (ret)
+ gf_log (this->name, GF_LOG_ERROR, "%s",
+ "Failed to aggregate response from "
+ " node/brick");
+ }
+ pthread_mutex_unlock (&args->lock_dict);
+ }
out:
- STACK_DESTROY (frame->root);
-
- __wake (args);
+ if (rsp_dict)
+ dict_unref (rsp_dict);
+ STACK_DESTROY (frame->root);
+ synctask_barrier_wake(args);
return 0;
}
int
-gd_syncop_mgmt_stage_op (struct rpc_clnt *rpc, uuid_t my_uuid, uuid_t recv_uuid,
- int op, dict_t *dict_out, dict_t **dict_in,
- char **errstr)
+gd_syncop_mgmt_stage_op (struct rpc_clnt *rpc, struct syncargs *args,
+ uuid_t my_uuid, uuid_t recv_uuid, int op,
+ dict_t *dict_out, dict_t *op_ctx)
{
- struct syncargs args = {0, };
- gd1_mgmt_stage_op_req req = {{0},};
- int ret = 0;
+ gd1_mgmt_stage_op_req *req = NULL;
+ int ret = -1;
- uuid_copy (req.uuid, my_uuid);
- req.op = op;
+ req = GF_CALLOC (1, sizeof (*req), gf_gld_mt_mop_stage_req_t);
+ if (!req)
+ goto out;
- args.op_ret = -1;
- args.op_errno = ENOTCONN;
+ uuid_copy (req->uuid, my_uuid);
+ req->op = op;
ret = dict_allocate_and_serialize (dict_out,
- &req.buf.buf_val, &req.buf.buf_len);
+ &req->buf.buf_val, &req->buf.buf_len);
if (ret)
goto out;
- GD_SYNCOP (rpc, (&args), gd_syncop_stage_op_cbk,
- &req, &gd_mgmt_prog, GLUSTERD_MGMT_STAGE_OP,
- xdr_gd1_mgmt_stage_op_req);
-
- if (args.errstr && errstr)
- *errstr = args.errstr;
- else GF_FREE (args.errstr);
-
- if (args.dict && dict_in)
- *dict_in = args.dict;
- else if (args.dict)
- dict_unref (args.dict);
-
- uuid_copy (recv_uuid, args.uuid);
+ ret = gd_syncop_submit_request (rpc, req, args, &gd_mgmt_prog,
+ GLUSTERD_MGMT_STAGE_OP,
+ gd_syncop_stage_op_cbk,
+ (xdrproc_t) xdr_gd1_mgmt_stage_op_req);
out:
- errno = args.op_errno;
- return args.op_ret;
+ gd_stage_op_req_free (req);
+ if (ret)
+ synctask_barrier_wake(args);
+
+ return ret;
}
-/*TODO: Need to add syncop for brick ops*/
int32_t
gd_syncop_brick_op_cbk (struct rpc_req *req, struct iovec *iov,
int count, void *myframe)
{
struct syncargs *args = NULL;
gd1_mgmt_brick_op_rsp rsp = {0,};
- int ret = -1;
+ int ret = -1;
call_frame_t *frame = NULL;
frame = myframe;
@@ -362,25 +446,13 @@ out:
if (strcmp (rsp.op_errstr, "") != 0)
free (rsp.op_errstr);
free (rsp.output.output_val);
- STACK_DESTROY (frame->root);
+ STACK_DESTROY (frame->root);
__wake (args);
return 0;
}
-static void
-gd_brick_op_req_free (gd1_mgmt_brick_op_req *req)
-{
- if (!req)
- return;
-
- if (strcmp (req->name, "") != 0)
- GF_FREE (req->name);
- GF_FREE (req->input.input_val);
- GF_FREE (req);
-}
-
int
gd_syncop_mgmt_brick_op (struct rpc_clnt *rpc, glusterd_pending_node_t *pnode,
int op, dict_t *dict_out, dict_t *op_ctx,
@@ -447,19 +519,18 @@ int32_t
gd_syncop_commit_op_cbk (struct rpc_req *req, struct iovec *iov,
int count, void *myframe)
{
- struct syncargs *args = NULL;
- gd1_mgmt_commit_op_rsp rsp = {{0},};
- int ret = -1;
- call_frame_t *frame = NULL;
-
+ int ret = -1;
+ gd1_mgmt_commit_op_rsp rsp = {{0},};
+ struct syncargs *args = NULL;
+ xlator_t *this = NULL;
+ dict_t *rsp_dict = NULL;
+ call_frame_t *frame = NULL;
+
+ this = THIS;
frame = myframe;
- args = frame->local;
+ args = frame->local;
frame->local = NULL;
- /* initialize */
- args->op_ret = -1;
- args->op_errno = EINVAL;
-
if (-1 == req->rpc_status) {
args->op_errno = ENOTCONN;
goto out;
@@ -473,295 +544,361 @@ gd_syncop_commit_op_cbk (struct rpc_req *req, struct iovec *iov,
if (rsp.dict.dict_len) {
/* Unserialize the dictionary */
- args->dict = dict_new ();
+ rsp_dict = dict_new ();
ret = dict_unserialize (rsp.dict.dict_val,
rsp.dict.dict_len,
- &args->dict);
+ &rsp_dict);
if (ret < 0) {
GF_FREE (rsp.dict.dict_val);
goto out;
} else {
- args->dict->extra_stdfree = rsp.dict.dict_val;
+ rsp_dict->extra_stdfree = rsp.dict.dict_val;
}
}
- args->op_ret = rsp.op_ret;
- args->op_errno = rsp.op_errno;
-
+ gd_collate_errors (args, rsp.op_ret, rsp.op_errno, rsp.op_errstr);
uuid_copy (args->uuid, rsp.uuid);
-
- args->errstr = gf_strdup (rsp.op_errstr);
-
+ pthread_mutex_lock (&args->lock_dict);
+ {
+ ret = glusterd_syncop_aggr_rsp_dict (rsp.op, args->dict,
+ rsp_dict);
+ if (ret)
+ gf_log (this->name, GF_LOG_ERROR, "%s",
+ "Failed to aggregate response from "
+ " node/brick");
+ }
+ pthread_mutex_unlock (&args->lock_dict);
out:
- STACK_DESTROY (frame->root);
+ if (rsp_dict)
+ dict_unref (rsp_dict);
- __wake (args);
+ STACK_DESTROY (frame->root);
+ synctask_barrier_wake(args);
return 0;
}
int
-gd_syncop_mgmt_commit_op (struct rpc_clnt *rpc, uuid_t my_uuid, uuid_t recv_uuid,
- int op, dict_t *dict_out, dict_t **dict_in,
- char **errstr)
+gd_syncop_mgmt_commit_op (struct rpc_clnt *rpc, struct syncargs *args,
+ uuid_t my_uuid, uuid_t recv_uuid,
+ int op, dict_t *dict_out, dict_t *op_ctx)
{
- struct syncargs args = {0, };
- gd1_mgmt_commit_op_req req = {{0},};
- int ret = 0;
+ gd1_mgmt_commit_op_req *req = NULL;
+ int ret = -1;
- uuid_copy (req.uuid, my_uuid);
- req.op = op;
+ req = GF_CALLOC (1, sizeof (*req), gf_gld_mt_mop_commit_req_t);
+ if (!req)
+ goto out;
- args.op_ret = -1;
- args.op_errno = ENOTCONN;
+ uuid_copy (req->uuid, my_uuid);
+ req->op = op;
ret = dict_allocate_and_serialize (dict_out,
- &req.buf.buf_val, &req.buf.buf_len);
+ &req->buf.buf_val, &req->buf.buf_len);
if (ret)
goto out;
- GD_SYNCOP (rpc, (&args), gd_syncop_commit_op_cbk,
- &req, &gd_mgmt_prog, GLUSTERD_MGMT_COMMIT_OP,
- xdr_gd1_mgmt_commit_op_req);
-
- if (args.errstr && errstr)
- *errstr = args.errstr;
- else GF_FREE (args.errstr);
-
- if (args.dict && dict_in)
- *dict_in = args.dict;
- else if (args.dict)
- dict_unref (args.dict);
-
- uuid_copy (recv_uuid, args.uuid);
-
+ ret = gd_syncop_submit_request (rpc, req, args, &gd_mgmt_prog,
+ GLUSTERD_MGMT_COMMIT_OP ,
+ gd_syncop_commit_op_cbk,
+ (xdrproc_t) xdr_gd1_mgmt_commit_op_req);
out:
- errno = args.op_errno;
- return args.op_ret;
+ gd_commit_op_req_free (req);
+ if (ret)
+ synctask_barrier_wake(args);
+ return ret;
}
-
-static int
-glusterd_syncop_aggr_rsp_dict (glusterd_op_t op, dict_t *aggr, dict_t *rsp,
- char *op_errstr)
+int
+gd_build_peers_list (struct list_head *peers, struct list_head *xact_peers)
{
- int ret = 0;
+ glusterd_peerinfo_t *peerinfo = NULL;
+ int npeers = 0;
- switch (op) {
- case GD_OP_REPLACE_BRICK:
- ret = glusterd_rb_use_rsp_dict (aggr, rsp);
- if (ret)
- goto out;
- break;
-
- case GD_OP_SYNC_VOLUME:
- ret = glusterd_sync_use_rsp_dict (aggr, rsp);
- if (ret)
- goto out;
- break;
+ list_for_each_entry (peerinfo, peers, uuid_list) {
+ if (!peerinfo->connected)
+ continue;
+ if (peerinfo->state.state != GD_FRIEND_STATE_BEFRIENDED)
+ continue;
+ list_add_tail (&peerinfo->op_peers_list, xact_peers);
+ npeers++;
+ }
+ return npeers;
+}
- case GD_OP_PROFILE_VOLUME:
- ret = glusterd_profile_volume_use_rsp_dict (aggr, rsp);
- if (ret)
- goto out;
- break;
+int
+gd_lock_op_phase (struct list_head *peers, glusterd_op_t op, dict_t *op_ctx,
+ char **op_errstr, int npeers)
+{
+ int ret = -1;
+ int peer_cnt = 0;
+ uuid_t peer_uuid = {0};
+ xlator_t *this = NULL;
+ glusterd_peerinfo_t *peerinfo = NULL;
+ struct syncargs args = {0};
+
+ if (!npeers) {
+ ret = 0;
+ goto out;
+ }
- case GD_OP_GSYNC_SET:
- ret = glusterd_gsync_use_rsp_dict (aggr, rsp, op_errstr);
- if (ret)
- goto out;
- break;
+ this = THIS;
+ synctask_barrier_init((&args));
+ peer_cnt = 0;
+ list_for_each_entry (peerinfo, peers, op_peers_list) {
+ gd_syncop_mgmt_lock (peerinfo->rpc, &args, MY_UUID, peer_uuid);
+ peer_cnt++;
+ }
+ synctask_barrier_wait((&args), peer_cnt);
+ ret = args.op_ret;
+ if (ret) {
+ gf_asprintf (op_errstr, "Another transaction could be "
+ "in progress. Please try again after "
+ "sometime.");
+ gf_log (this->name, GF_LOG_ERROR, "Failed to acquire lock");
+ goto out;
+ }
- case GD_OP_STATUS_VOLUME:
- ret = glusterd_volume_status_copy_to_op_ctx_dict (aggr, rsp);
- if (ret)
- goto out;
- break;
+ ret = 0;
+out:
+ return ret;
+}
- case GD_OP_REBALANCE:
- case GD_OP_DEFRAG_BRICK_VOLUME:
- ret = glusterd_volume_rebalance_use_rsp_dict (aggr, rsp);
- if (ret)
- goto out;
- break;
+int
+gd_stage_op_phase (struct list_head *peers, glusterd_op_t op, dict_t *op_ctx,
+ dict_t *req_dict, char **op_errstr, int npeers)
+{
+ int ret = -1;
+ int peer_cnt = 0;
+ dict_t *rsp_dict = NULL;
+ char *hostname = NULL;
+ xlator_t *this = NULL;
+ glusterd_peerinfo_t *peerinfo = NULL;
+ uuid_t tmp_uuid = {0};
+ char *errstr = NULL;
+ struct syncargs args = {0};
- case GD_OP_HEAL_VOLUME:
- ret = glusterd_volume_heal_use_rsp_dict (aggr, rsp);
- if (ret)
- goto out;
+ this = THIS;
+ ret = -1;
+ rsp_dict = dict_new ();
+ if (!rsp_dict)
+ goto out;
- break;
+ ret = glusterd_op_stage_validate (op, req_dict, op_errstr, rsp_dict);
+ if (ret) {
+ hostname = "localhost";
+ goto stage_done;
+ }
- case GD_OP_CLEARLOCKS_VOLUME:
- ret = glusterd_volume_clearlocks_use_rsp_dict (aggr, rsp);
- if (ret)
+ if ((op == GD_OP_REPLACE_BRICK) ||
+ (op == GD_OP_CLEARLOCKS_VOLUME)) {
+ ret = glusterd_syncop_aggr_rsp_dict (op, op_ctx, rsp_dict);
+ if (ret) {
+ gf_log (this->name, GF_LOG_ERROR, "%s",
+ (*op_errstr)? *op_errstr: "Failed to aggregate "
+ "response from node/brick");
goto out;
+ }
+ }
+ dict_unref (rsp_dict);
+ rsp_dict = NULL;
- break;
+stage_done:
+ if (ret) {
+ gf_log (this->name, GF_LOG_ERROR, LOGSTR_STAGE_FAIL,
+ gd_op_list[op], hostname, (*op_errstr) ? ":" : " ",
+ (*op_errstr) ? *op_errstr : " ");
+ if (op_errstr == NULL)
+ gf_asprintf (op_errstr, OPERRSTR_STAGE_FAIL, hostname);
+ goto out;
+ }
- default:
- break;
+ if (!npeers) {
+ ret = 0;
+ goto out;
+ }
+
+ gd_syncargs_init (&args, op_ctx);
+ synctask_barrier_init((&args));
+ peer_cnt = 0;
+ list_for_each_entry (peerinfo, peers, op_peers_list) {
+ ret = gd_syncop_mgmt_stage_op (peerinfo->rpc, &args,
+ MY_UUID, tmp_uuid,
+ op, req_dict, op_ctx);
+ peer_cnt++;
}
+ synctask_barrier_wait((&args), peer_cnt);
+ ret = args.op_ret;
+ if (dict_get_str (op_ctx, "errstr", &errstr) == 0)
+ *op_errstr = gf_strdup (errstr);
+
out:
+ if (rsp_dict)
+ dict_unref (rsp_dict);
return ret;
}
-void
-gd_sync_task_begin (dict_t *op_ctx, rpcsvc_request_t * req)
+int
+gd_commit_op_phase (struct list_head *peers, glusterd_op_t op, dict_t *op_ctx,
+ dict_t *req_dict, char **op_errstr, int npeers)
{
- int ret = -1;
- dict_t *req_dict = NULL;
dict_t *rsp_dict = NULL;
+ int peer_cnt = -1;
+ int ret = -1;
+ char *hostname = NULL;
glusterd_peerinfo_t *peerinfo = NULL;
- glusterd_peerinfo_t *tmp = NULL;
- glusterd_conf_t *conf = NULL;
- uuid_t tmp_uuid = {0,};
- glusterd_op_t op = 0;
- int32_t tmp_op = 0;
- gf_boolean_t local_locked = _gf_false;
- char *op_errstr = NULL;
- glusterd_pending_node_t *pending_node = NULL;
- rpc_clnt_t *rpc = NULL;
- int brick_count = 0;
- struct list_head selected = {0};
xlator_t *this = NULL;
- char *hostname = NULL;
+ uuid_t tmp_uuid = {0};
+ char *errstr = NULL;
+ struct syncargs args = {0};
this = THIS;
- GF_ASSERT (this);
- conf = this->private;
- GF_ASSERT (conf);
-
- ret = dict_get_int32 (op_ctx, GD_SYNC_OPCODE_KEY, &tmp_op);
- if (ret) {
- gf_log (this->name, GF_LOG_ERROR, "Failed to get volume "
- "operation");
+ rsp_dict = dict_new ();
+ if (!rsp_dict) {
+ ret = -1;
goto out;
}
- op = tmp_op;
-
- /* Lock everything */
- ret = glusterd_lock (MY_UUID);
+ ret = glusterd_op_commit_perform (op, req_dict, op_errstr, rsp_dict);
if (ret) {
- gf_log (this->name, GF_LOG_ERROR, "Unable to acquire lock");
- gf_asprintf (&op_errstr, "Another transaction is in progress. "
- "Please try again after sometime.");
- goto out;
+ hostname = "localhost";
+ goto commit_done;
}
- /* successful lock in local node */
- local_locked = _gf_true;
-
- /* storing op globally to access in synctask code paths
- * This is still acceptable, as we are performing this under
- * the 'cluster' lock*/
-
- glusterd_op_set_op (op);
- INIT_LIST_HEAD (&conf->xaction_peers);
- list_for_each_entry (peerinfo, &conf->peers, uuid_list) {
- if (!peerinfo->connected)
- continue;
- if (peerinfo->state.state != GD_FRIEND_STATE_BEFRIENDED)
- continue;
-
- ret = gd_syncop_mgmt_lock (peerinfo->rpc,
- MY_UUID, tmp_uuid);
- if (ret) {
- gf_asprintf (&op_errstr, "Another transaction could be "
- "in progress. Please try again after "
- "sometime.");
- gf_log (this->name, GF_LOG_ERROR, "Failed to acquire "
- "lock on peer %s", peerinfo->hostname);
- goto out;
- } else {
- list_add_tail (&peerinfo->op_peers_list,
- &conf->xaction_peers);
- }
+ ret = glusterd_syncop_aggr_rsp_dict (op, op_ctx, rsp_dict);
+ if (ret) {
+ gf_log (this->name, GF_LOG_ERROR, "%s",
+ (*op_errstr)? *op_errstr: "Failed to aggregate response "
+ "from node/brick");
+ goto out;
}
+ dict_unref (rsp_dict);
+ rsp_dict = NULL;
- ret = glusterd_op_build_payload (&req_dict, &op_errstr, op_ctx);
+commit_done:
if (ret) {
- gf_log (this->name, GF_LOG_ERROR, LOGSTR_BUILD_PAYLOAD,
- gd_op_list[op]);
- if (op_errstr == NULL)
- gf_asprintf (&op_errstr, OPERRSTR_BUILD_PAYLOAD);
+ gf_log (this->name, GF_LOG_ERROR, LOGSTR_COMMIT_FAIL,
+ gd_op_list[op], hostname, (*op_errstr) ? ":" : " ",
+ (*op_errstr) ? *op_errstr : " ");
+ if (*op_errstr == NULL)
+ gf_asprintf (op_errstr, OPERRSTR_COMMIT_FAIL,
+ hostname);
+ goto out;
+ } else {
+ glusterd_op_modify_op_ctx (op, op_ctx);
+ }
+
+ if (!npeers) {
+ ret = 0;
goto out;
}
+ gd_syncargs_init (&args, op_ctx);
+ synctask_barrier_init((&args));
+ peer_cnt = 0;
+ list_for_each_entry (peerinfo, peers, op_peers_list) {
+ ret = gd_syncop_mgmt_commit_op (peerinfo->rpc, &args,
+ MY_UUID, tmp_uuid,
+ op, req_dict, op_ctx);
+ peer_cnt++;
+ }
+ synctask_barrier_wait((&args), peer_cnt);
+ ret = args.op_ret;
+ if (dict_get_str (op_ctx, "errstr", &errstr) == 0)
+ *op_errstr = gf_strdup (errstr);
- /* stage op */
- ret = -1;
- rsp_dict = dict_new ();
- if (!rsp_dict)
+out:
+ if (rsp_dict)
+ dict_unref (rsp_dict);
+ return ret;
+}
+
+int
+gd_unlock_op_phase (struct list_head *peers, glusterd_op_t op, int op_ret,
+ rpcsvc_request_t *req, dict_t *op_ctx, char *op_errstr,
+ int npeers)
+{
+ glusterd_peerinfo_t *peerinfo = NULL;
+ glusterd_peerinfo_t *tmp = NULL;
+ uuid_t tmp_uuid = {0};
+ int peer_cnt = 0;
+ int ret = -1;
+ xlator_t *this = NULL;
+ struct syncargs args = {0};
+
+ if (!npeers) {
+ ret = 0;
goto out;
+ }
- ret = glusterd_op_stage_validate (op, req_dict, &op_errstr, rsp_dict);
+ this = THIS;
+ synctask_barrier_init((&args));
+ peer_cnt = 0;
+ list_for_each_entry_safe (peerinfo, tmp, peers, op_peers_list) {
+ gd_syncop_mgmt_unlock (peerinfo->rpc, &args, MY_UUID, tmp_uuid);
+ list_del_init (&peerinfo->op_peers_list);
+ peer_cnt++;
+ }
+ synctask_barrier_wait((&args), peer_cnt);
+ ret = args.op_ret;
if (ret) {
- hostname = "localhost";
- goto stage_done;
+ gf_log (this->name, GF_LOG_ERROR, "Failed to unlock "
+ "on some peer(s)");
}
- if ((op == GD_OP_REPLACE_BRICK) ||
- (op == GD_OP_CLEARLOCKS_VOLUME)) {
- ret = glusterd_syncop_aggr_rsp_dict (op, op_ctx, rsp_dict,
- op_errstr);
- if (ret) {
- gf_log (this->name, GF_LOG_ERROR, "%s",
- (op_errstr)? op_errstr: "Failed to aggregate "
- "response from node/brick");
- goto out;
- }
- }
- dict_unref (rsp_dict);
- rsp_dict = NULL;
+out:
+ glusterd_op_send_cli_response (op, op_ret, 0, req, op_ctx, op_errstr);
+ glusterd_op_clear_op (op);
+ glusterd_unlock (MY_UUID);
- list_for_each_entry (peerinfo, &conf->xaction_peers, op_peers_list) {
- ret = gd_syncop_mgmt_stage_op (peerinfo->rpc,
- MY_UUID, tmp_uuid,
- op, req_dict, &rsp_dict,
- &op_errstr);
- if (ret) {
- hostname = peerinfo->hostname;
- goto stage_done;
- }
+ return 0;
+}
- if (op == GD_OP_REPLACE_BRICK) {
- ret = glusterd_syncop_aggr_rsp_dict (op, op_ctx,
- rsp_dict,
- op_errstr);
- if (ret) {
- gf_log (this->name, GF_LOG_ERROR, "%s",
- (op_errstr)? op_errstr: "Failed to "
- "aggregate response from node/brick");
- goto out;
- }
- }
- dict_unref (rsp_dict);
- rsp_dict = NULL;
+int
+gd_get_brick_count (struct list_head *bricks)
+{
+ glusterd_pending_node_t *pending_node = NULL;
+ int npeers = 0;
+ list_for_each_entry (pending_node, bricks, list) {
+ npeers++;
}
+ return npeers;
+}
-stage_done:
- if (ret) {
- gf_log (this->name, GF_LOG_ERROR, LOGSTR_STAGE_FAIL,
- gd_op_list[op], hostname, (op_errstr) ? ":" : " ",
- (op_errstr) ? op_errstr : " ");
- if (op_errstr == NULL)
- gf_asprintf (&op_errstr, OPERRSTR_STAGE_FAIL, hostname);
+int
+gd_brick_op_phase (glusterd_op_t op, dict_t *op_ctx, dict_t *req_dict, char **op_errstr)
+{
+ glusterd_pending_node_t *pending_node = NULL;
+ struct list_head selected = {0,};
+ xlator_t *this = NULL;
+ int brick_count = 0;
+ int ret = -1;
+ rpc_clnt_t *rpc = NULL;
+ dict_t *rsp_dict = NULL;
+
+ this = THIS;
+ rsp_dict = dict_new ();
+ if (!rsp_dict) {
+ ret = -1;
goto out;
}
- /*brick op */
INIT_LIST_HEAD (&selected);
- ret = glusterd_op_bricks_select (op, req_dict, &op_errstr, &selected);
+ ret = glusterd_op_bricks_select (op, req_dict, op_errstr, &selected);
if (ret) {
gf_log (this->name, GF_LOG_ERROR, "%s",
- (op_errstr)? op_errstr: "Brick op failed. Check "
+ (*op_errstr)? *op_errstr: "Brick op failed. Check "
"glusterd log file for more details.");
goto out;
}
+ ret = glusterd_syncop_aggr_rsp_dict (op, op_ctx, rsp_dict);
+ if (ret)
+ goto out;
+ dict_unref (rsp_dict);
+ rsp_dict = NULL;
+
brick_count = 0;
list_for_each_entry (pending_node, &selected, list) {
rpc = glusterd_pending_node_get_rpc (pending_node);
@@ -779,95 +916,97 @@ stage_done:
goto out;
}
ret = gd_syncop_mgmt_brick_op (rpc, pending_node, op, req_dict,
- op_ctx, &op_errstr);
+ op_ctx, op_errstr);
if (ret)
goto out;
brick_count++;
}
+ ret = 0;
+out:
+ if (rsp_dict)
+ dict_unref (rsp_dict);
gf_log (this->name, GF_LOG_DEBUG, "Sent op req to %d bricks",
brick_count);
+ return ret;
+}
- /* commit op */
- rsp_dict = dict_new ();
- if (!rsp_dict) {
- ret = -1;
- goto out;
- }
+void
+gd_sync_task_begin (dict_t *op_ctx, rpcsvc_request_t * req)
+{
+ int ret = -1;
+ int npeers = 0;
+ dict_t *req_dict = NULL;
+ glusterd_conf_t *conf = NULL;
+ glusterd_op_t op = 0;
+ int32_t tmp_op = 0;
+ char *op_errstr = NULL;
+ xlator_t *this = NULL;
- ret = glusterd_op_commit_perform (op, req_dict, &op_errstr, rsp_dict);
+ this = THIS;
+ GF_ASSERT (this);
+ conf = this->private;
+ GF_ASSERT (conf);
+
+ ret = dict_get_int32 (op_ctx, GD_SYNC_OPCODE_KEY, &tmp_op);
if (ret) {
- hostname = "localhost";
- goto commit_done;
+ gf_log (this->name, GF_LOG_ERROR, "Failed to get volume "
+ "operation");
+ goto out;
}
- ret = glusterd_syncop_aggr_rsp_dict (op, op_ctx, rsp_dict, op_errstr);
+
+ op = tmp_op;
+ ret = glusterd_lock (MY_UUID);
if (ret) {
- gf_log (this->name, GF_LOG_ERROR, "%s",
- (op_errstr)? op_errstr: "Failed to aggregate response "
- "from node/brick");
+ gf_log (this->name, GF_LOG_ERROR, "Unable to acquire lock");
+ gf_asprintf (&op_errstr, "Another transaction is in progress. "
+ "Please try again after sometime.");
goto out;
}
- dict_unref (rsp_dict);
- rsp_dict = NULL;
- list_for_each_entry (peerinfo, &conf->xaction_peers, op_peers_list) {
- ret = gd_syncop_mgmt_commit_op (peerinfo->rpc,
- MY_UUID, tmp_uuid,
- op, req_dict, &rsp_dict,
- &op_errstr);
- if (ret) {
- hostname = peerinfo->hostname;
- goto commit_done;
- }
- ret = glusterd_syncop_aggr_rsp_dict (op, op_ctx, rsp_dict,
- op_errstr);
- if (ret) {
- gf_log (this->name, GF_LOG_ERROR, "%s",
- (op_errstr)? op_errstr: "Failed to aggregate "
- "response from node/brick");
- goto out;
- }
- dict_unref (rsp_dict);
- rsp_dict = NULL;
- }
-commit_done:
+ /* storing op globally to access in synctask code paths
+ * This is still acceptable, as we are performing this under
+ * the 'cluster' lock*/
+ glusterd_op_set_op (op);
+ INIT_LIST_HEAD (&conf->xaction_peers);
+ npeers = gd_build_peers_list (&conf->peers, &conf->xaction_peers);
+
+ ret = gd_lock_op_phase (&conf->xaction_peers, op, op_ctx, &op_errstr, npeers);
+ if (ret)
+ goto out;
+
+ ret = glusterd_op_build_payload (&req_dict, &op_errstr, op_ctx);
if (ret) {
- gf_log (this->name, GF_LOG_ERROR, LOGSTR_COMMIT_FAIL,
- gd_op_list[op], hostname, (op_errstr) ? ":" : " ",
- (op_errstr) ? op_errstr : " ");
+ gf_log (this->name, GF_LOG_ERROR, LOGSTR_BUILD_PAYLOAD,
+ gd_op_list[op]);
if (op_errstr == NULL)
- gf_asprintf (&op_errstr, OPERRSTR_COMMIT_FAIL,
- hostname);
+ gf_asprintf (&op_errstr, OPERRSTR_BUILD_PAYLOAD);
goto out;
- } else {
- glusterd_op_modify_op_ctx (op, op_ctx);
- }
+ }
- ret = 0;
-out:
- if (local_locked) {
- list_for_each_entry_safe (peerinfo, tmp, &conf->xaction_peers,
- op_peers_list) {
- gd_syncop_mgmt_unlock (peerinfo->rpc,
- MY_UUID, tmp_uuid);
- list_del_init (&peerinfo->op_peers_list);
- }
+ ret = gd_stage_op_phase (&conf->xaction_peers, op, op_ctx, req_dict,
+ &op_errstr, npeers);
+ if (ret)
+ goto out;
- /* Local node should be the one to be locked first,
- unlocked last to prevent races */
- glusterd_op_clear_op (op);
- glusterd_unlock (MY_UUID);
- }
+ ret = gd_brick_op_phase (op, op_ctx, req_dict, &op_errstr);
+ if (ret)
+ goto out;
+
+ ret = gd_commit_op_phase (&conf->xaction_peers, op, op_ctx, req_dict,
+ &op_errstr, npeers);
+ if (ret)
+ goto out;
- glusterd_op_send_cli_response (op, ret, 0, req, op_ctx, op_errstr);
+ ret = 0;
+out:
+ (void) gd_unlock_op_phase (&conf->xaction_peers, op, ret, req,
+ op_ctx, op_errstr, npeers);
if (req_dict)
dict_unref (req_dict);
- if (ret && rsp_dict)
- dict_unref (rsp_dict);
-
if (op_errstr)
GF_FREE (op_errstr);
diff --git a/xlators/mgmt/glusterd/src/glusterd-syncop.h b/xlators/mgmt/glusterd/src/glusterd-syncop.h
index 268f3bf2357..658ed4e2a28 100644
--- a/xlators/mgmt/glusterd/src/glusterd-syncop.h
+++ b/xlators/mgmt/glusterd/src/glusterd-syncop.h
@@ -12,7 +12,6 @@
#include "syncop.h"
-
#define GD_SYNC_OPCODE_KEY "sync-mgmt-operation"
/* gd_syncop_* */
@@ -36,15 +35,14 @@ int gd_syncop_submit_request (struct rpc_clnt *rpc, void *req,
xdrproc_t xdrproc);
-int gd_syncop_mgmt_lock (struct rpc_clnt *rpc, uuid_t my_uuid,
- uuid_t recv_uuid);
-int gd_syncop_mgmt_unlock (struct rpc_clnt *rpc, uuid_t my_uuid,
- uuid_t recv_uuid);
-int gd_syncop_mgmt_stage_op (struct rpc_clnt *rpc, uuid_t my_uuid,
- uuid_t recv_uuid, int op, dict_t *dict_out,
- dict_t **dict_in, char **errstr);
-int gd_syncop_mgmt_commit_op (struct rpc_clnt *rpc, uuid_t my_uuid,
- uuid_t recv_uuid, int op, dict_t *dict_out,
- dict_t **dict_in, char **errstr);
-
+int gd_syncop_mgmt_lock (struct rpc_clnt *rpc, struct syncargs *arg,
+ uuid_t my_uuid, uuid_t recv_uuid);
+int gd_syncop_mgmt_unlock (struct rpc_clnt *rpc, struct syncargs *arg,
+ uuid_t my_uuid, uuid_t recv_uuid);
+int gd_syncop_mgmt_stage_op (struct rpc_clnt *rpc, struct syncargs *arg,
+ uuid_t my_uuid, uuid_t recv_uuid, int op,
+ dict_t *dict_out, dict_t *op_ctx);
+int gd_syncop_mgmt_commit_op (struct rpc_clnt *rpc, struct syncargs *arg,
+ uuid_t my_uuid, uuid_t recv_uuid, int op,
+ dict_t *dict_out, dict_t *op_ctx);
#endif /* __RPC_SYNCOP_H */