From 2d0da44826d4b4652169604785ec63cce805bddd Mon Sep 17 00:00:00 2001 From: Krishnan Parthasarathi Date: Wed, 12 Dec 2012 15:07:38 +0530 Subject: glusterd: Added syncop version of BRICK_OP - Made rsp dict available to all glusterd's STAGE/BRICK/COMMIT OP. Change-Id: I5d825d0670d0f1aa8a0603f2307b3600ff6ccfe4 BUG: 852147 Signed-off-by: Krishnan Parthasarathi Reviewed-on: http://review.gluster.org/4296 Tested-by: Gluster Build System Reviewed-by: Amar Tumballi Reviewed-by: Anand Avati --- xlators/mgmt/glusterd/src/glusterd-syncop.c | 270 +++++++++++++++++++++++++--- 1 file changed, 242 insertions(+), 28 deletions(-) (limited to 'xlators') diff --git a/xlators/mgmt/glusterd/src/glusterd-syncop.c b/xlators/mgmt/glusterd/src/glusterd-syncop.c index 2b1c88c3cc3..a8f58e0255d 100644 --- a/xlators/mgmt/glusterd/src/glusterd-syncop.c +++ b/xlators/mgmt/glusterd/src/glusterd-syncop.c @@ -80,6 +80,7 @@ out: /* Defined in glusterd-rpc-ops.c */ extern struct rpc_clnt_program gd_mgmt_prog; +extern struct rpc_clnt_program gd_brick_prog; int32_t gd_syncop_mgmt_lock_cbk (struct rpc_req *req, struct iovec *iov, @@ -311,6 +312,137 @@ out: } /*TODO: Need to add syncop for brick ops*/ +int32_t +gd_syncop_brick_op_cbk (struct rpc_req *req, struct iovec *iov, + int count, void *myframe) +{ + struct syncargs *args = NULL; + gd1_mgmt_brick_op_rsp rsp = {0,}; + int ret = -1; + call_frame_t *frame = NULL; + + frame = myframe; + args = frame->local; + frame->local = NULL; + + /* initialize */ + args->op_ret = -1; + args->op_errno = EINVAL; + + if (-1 == req->rpc_status) { + args->op_errno = ENOTCONN; + goto out; + } + + ret = xdr_to_generic (*iov, &rsp, + (xdrproc_t)xdr_gd1_mgmt_brick_op_rsp); + if (ret < 0) + goto out; + + if (rsp.output.output_len) { + args->dict = dict_new (); + if (!args->dict) { + ret = -1; + args->op_errno = ENOMEM; + goto out; + } + + ret = dict_unserialize (rsp.output.output_val, + rsp.output.output_len, + &args->dict); + if (ret < 0) + goto out; + } + + args->op_ret = rsp.op_ret; + args->op_errno = rsp.op_errno; + args->errstr = gf_strdup (rsp.op_errstr); + +out: + if (strcmp (rsp.op_errstr, "") != 0) + free (rsp.op_errstr); + free (rsp.output.output_val); + STACK_DESTROY (frame->root); + + __wake (args); + + return 0; +} + +static void +gd_brick_op_req_free (gd1_mgmt_brick_op_req *req) +{ + if (!req) + return; + + if (strcmp (req->name, "") != 0) + GF_FREE (req->name); + GF_FREE (req->input.input_val); + GF_FREE (req); +} + +int +gd_syncop_mgmt_brick_op (struct rpc_clnt *rpc, glusterd_pending_node_t *pnode, + int op, dict_t *dict_out, dict_t *op_ctx, + char **errstr) +{ + struct syncargs args = {0, }; + gd1_mgmt_brick_op_req *req = NULL; + int ret = 0; + xlator_t *this = NULL; + + this = THIS; + args.op_ret = -1; + args.op_errno = ENOTCONN; + + if ((pnode->type == GD_NODE_NFS) || + ((pnode->type == GD_NODE_SHD) && + (op == GD_OP_STATUS_VOLUME))) { + ret = glusterd_node_op_build_payload + (op, &req, dict_out); + + } else { + ret = glusterd_brick_op_build_payload + (op, pnode->node, &req, dict_out); + + } + + if (ret) + goto out; + + GD_SYNCOP (rpc, (&args), gd_syncop_brick_op_cbk, + req, &gd_brick_prog, req->op, + xdr_gd1_mgmt_brick_op_req); + + if (args.errstr && errstr) + *errstr = args.errstr; + else + GF_FREE (args.errstr); + + if (GD_OP_STATUS_VOLUME == op) { + ret = dict_set_int32 (args.dict, "index", pnode->index); + if (ret) { + gf_log (this->name, GF_LOG_ERROR, + "Error setting index on brick status" + " rsp dict"); + args.op_ret = -1; + goto out; + } + } + if (args.op_ret == 0) + glusterd_handle_node_rsp (dict_out, pnode->node, op, + args.dict, op_ctx, errstr, + pnode->type); + +out: + errno = args.op_errno; + if (args.dict) + dict_unref (args.dict); + gd_brick_op_req_free (req); + return args.op_ret; + +} + int32_t gd_syncop_commit_op_cbk (struct rpc_req *req, struct iovec *iov, int count, void *myframe) @@ -416,7 +548,7 @@ static int glusterd_syncop_aggr_rsp_dict (glusterd_op_t op, dict_t *aggr, dict_t *rsp, char *op_errstr) { - int ret = -1; + int ret = 0; switch (op) { case GD_OP_REPLACE_BRICK: @@ -473,19 +605,23 @@ out: void gd_sync_task_begin (dict_t *op_ctx, rpcsvc_request_t * req) { - int ret = -1; - dict_t *req_dict = NULL; - dict_t *rsp_dict = NULL; - glusterd_peerinfo_t *peerinfo = NULL; - glusterd_peerinfo_t *tmp = NULL; - glusterd_conf_t *conf = NULL; - uuid_t tmp_uuid = {0,}; - glusterd_op_t op = 0; - int32_t tmp_op = 0; - gf_boolean_t local_locked = _gf_false; - char *op_errstr = NULL; - xlator_t *this = NULL; - char *hostname = NULL; + int ret = -1; + dict_t *req_dict = NULL; + dict_t *rsp_dict = NULL; + glusterd_peerinfo_t *peerinfo = NULL; + glusterd_peerinfo_t *tmp = NULL; + glusterd_conf_t *conf = NULL; + uuid_t tmp_uuid = {0,}; + glusterd_op_t op = 0; + int32_t tmp_op = 0; + gf_boolean_t local_locked = _gf_false; + char *op_errstr = NULL; + glusterd_pending_node_t *pending_node = NULL; + rpc_clnt_t *rpc = NULL; + int brick_count = 0; + struct list_head selected = {0}; + xlator_t *this = NULL; + char *hostname = NULL; this = THIS; GF_ASSERT (this); @@ -501,11 +637,6 @@ gd_sync_task_begin (dict_t *op_ctx, rpcsvc_request_t * req) op = tmp_op; - ret = -1; - rsp_dict = dict_new (); - if (!rsp_dict) - goto out; - /* Lock everything */ ret = glusterd_lock (MY_UUID); if (ret) { @@ -554,12 +685,30 @@ gd_sync_task_begin (dict_t *op_ctx, rpcsvc_request_t * req) } /* stage op */ + ret = -1; + rsp_dict = dict_new (); + if (!rsp_dict) + goto out; + ret = glusterd_op_stage_validate (op, req_dict, &op_errstr, rsp_dict); if (ret) { hostname = "localhost"; goto stage_done; } + if (op == GD_OP_REPLACE_BRICK) { + ret = glusterd_syncop_aggr_rsp_dict (op, op_ctx, rsp_dict, + op_errstr); + if (ret) { + gf_log (this->name, GF_LOG_ERROR, "%s", + (op_errstr)? op_errstr: "Failed to aggregate " + "response from node/brick"); + goto out; + } + } + dict_unref (rsp_dict); + rsp_dict = NULL; + list_for_each_entry (peerinfo, &conf->xaction_peers, op_peers_list) { ret = gd_syncop_mgmt_stage_op (peerinfo->rpc, MY_UUID, tmp_uuid, @@ -570,13 +719,19 @@ gd_sync_task_begin (dict_t *op_ctx, rpcsvc_request_t * req) goto stage_done; } - if (op == GD_OP_REPLACE_BRICK) - (void) glusterd_syncop_aggr_rsp_dict (op, op_ctx, + if (op == GD_OP_REPLACE_BRICK) { + ret = glusterd_syncop_aggr_rsp_dict (op, op_ctx, rsp_dict, op_errstr); - - if (rsp_dict) - dict_unref (rsp_dict); + if (ret) { + gf_log (this->name, GF_LOG_ERROR, "%s", + (op_errstr)? op_errstr: "Failed to " + "aggregate response from node/brick"); + goto out; + } + } + dict_unref (rsp_dict); + rsp_dict = NULL; } stage_done: @@ -589,12 +744,64 @@ stage_done: goto out; } + /*brick op */ + INIT_LIST_HEAD (&selected); + ret = glusterd_op_bricks_select (op, req_dict, &op_errstr, &selected); + if (ret) { + gf_log (this->name, GF_LOG_ERROR, "%s", + (op_errstr)? op_errstr: "Brick op failed. Check " + "glusterd log file for more details."); + goto out; + } + + brick_count = 0; + list_for_each_entry (pending_node, &selected, list) { + rpc = glusterd_pending_node_get_rpc (pending_node); + if (!rpc) { + if (pending_node->type == GD_NODE_REBALANCE) { + ret = 0; + glusterd_defrag_volume_node_rsp (req_dict, + NULL, op_ctx); + goto out; + } + + ret = -1; + gf_log (this->name, GF_LOG_ERROR, "Brick Op failed " + "due to rpc failure."); + goto out; + } + ret = gd_syncop_mgmt_brick_op (rpc, pending_node, op, req_dict, + op_ctx, &op_errstr); + if (ret) + goto out; + + brick_count++; + } + + gf_log (this->name, GF_LOG_DEBUG, "Sent op req to %d bricks", + brick_count); + /* commit op */ + rsp_dict = dict_new (); + if (!rsp_dict) { + ret = -1; + goto out; + } + ret = glusterd_op_commit_perform (op, req_dict, &op_errstr, rsp_dict); if (ret) { hostname = "localhost"; goto commit_done; } + ret = glusterd_syncop_aggr_rsp_dict (op, op_ctx, rsp_dict, op_errstr); + if (ret) { + gf_log (this->name, GF_LOG_ERROR, "%s", + (op_errstr)? op_errstr: "Failed to aggregate response " + "from node/brick"); + goto out; + } + dict_unref (rsp_dict); + rsp_dict = NULL; list_for_each_entry (peerinfo, &conf->xaction_peers, op_peers_list) { ret = gd_syncop_mgmt_commit_op (peerinfo->rpc, @@ -605,10 +812,16 @@ stage_done: hostname = peerinfo->hostname; goto commit_done; } - (void) glusterd_syncop_aggr_rsp_dict (op, op_ctx, rsp_dict, - op_errstr); - if (rsp_dict) - dict_unref (rsp_dict); + ret = glusterd_syncop_aggr_rsp_dict (op, op_ctx, rsp_dict, + op_errstr); + if (ret) { + gf_log (this->name, GF_LOG_ERROR, "%s", + (op_errstr)? op_errstr: "Failed to aggregate " + "response from node/brick"); + goto out; + } + dict_unref (rsp_dict); + rsp_dict = NULL; } commit_done: if (ret) { @@ -665,6 +878,7 @@ glusterd_op_begin_synctask (rpcsvc_request_t *req, glusterd_op_t op, } gd_sync_task_begin (dict, req); + ret = 0; out: if (dict) dict_unref (dict); -- cgit