summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--cli/src/cli-cmd-volume.c37
-rw-r--r--cli/src/cli-cmd.c45
-rw-r--r--cli/src/cli-cmd.h2
-rw-r--r--cli/src/cli.c50
-rw-r--r--cli/src/cli.h10
-rw-r--r--cli/src/cli3_1-cops.c21
-rw-r--r--cli/src/input.c1
-rw-r--r--rpc/rpc-lib/src/rpc-clnt.c6
8 files changed, 137 insertions, 35 deletions
diff --git a/cli/src/cli-cmd-volume.c b/cli/src/cli-cmd-volume.c
index 445432ecd03..cc8200bd20f 100644
--- a/cli/src/cli-cmd-volume.c
+++ b/cli/src/cli-cmd-volume.c
@@ -144,7 +144,6 @@ cli_cmd_volume_start_cbk (struct cli_state *state, struct cli_cmd_word *word,
call_frame_t *frame = NULL;
char *volname = NULL;
- proc = &cli_rpc_prog->proctable[GF1_CLI_START_VOLUME];
frame = create_frame (THIS, THIS->ctx->pool);
if (!frame)
@@ -159,12 +158,14 @@ cli_cmd_volume_start_cbk (struct cli_state *state, struct cli_cmd_word *word,
volname = (char *)words[2];
GF_ASSERT (volname);
+ proc = &cli_rpc_prog->proctable[GF1_CLI_START_VOLUME];
+
if (proc->fn) {
ret = proc->fn (frame, THIS, volname);
}
out:
- if (ret && volname)
+ if (!proc && ret && volname)
cli_out ("Starting Volume %s failed", volname);
return ret;
@@ -180,7 +181,6 @@ cli_cmd_volume_stop_cbk (struct cli_state *state, struct cli_cmd_word *word,
call_frame_t *frame = NULL;
char *volname = NULL;
- proc = &cli_rpc_prog->proctable[GF1_CLI_STOP_VOLUME];
frame = create_frame (THIS, THIS->ctx->pool);
if (!frame)
@@ -190,12 +190,14 @@ cli_cmd_volume_stop_cbk (struct cli_state *state, struct cli_cmd_word *word,
volname = (char *)words[2];
GF_ASSERT (volname);
+ proc = &cli_rpc_prog->proctable[GF1_CLI_STOP_VOLUME];
+
if (proc->fn) {
ret = proc->fn (frame, THIS, volname);
}
out:
- if (ret)
+ if (!proc && ret)
cli_out ("Stopping Volume %s failed", volname);
return ret;
@@ -211,7 +213,6 @@ cli_cmd_volume_rename_cbk (struct cli_state *state, struct cli_cmd_word *word,
call_frame_t *frame = NULL;
dict_t *dict = NULL;
- proc = &cli_rpc_prog->proctable[GF1_CLI_RENAME_VOLUME];
frame = create_frame (THIS, THIS->ctx->pool);
if (!frame)
@@ -236,12 +237,14 @@ cli_cmd_volume_rename_cbk (struct cli_state *state, struct cli_cmd_word *word,
if (ret)
goto out;
+ proc = &cli_rpc_prog->proctable[GF1_CLI_RENAME_VOLUME];
+
if (proc->fn) {
ret = proc->fn (frame, THIS, dict);
}
out:
- if (ret) {
+ if (!proc && ret) {
char *volname = (char *) words[2];
if (dict)
dict_destroy (dict);
@@ -261,7 +264,6 @@ cli_cmd_volume_defrag_cbk (struct cli_state *state, struct cli_cmd_word *word,
call_frame_t *frame = NULL;
char *volname = NULL;
- proc = &cli_rpc_prog->proctable[GF1_CLI_DEFRAG_VOLUME];
frame = create_frame (THIS, THIS->ctx->pool);
if (!frame)
@@ -271,12 +273,14 @@ cli_cmd_volume_defrag_cbk (struct cli_state *state, struct cli_cmd_word *word,
volname = (char *)words[2];
GF_ASSERT (volname);
+ proc = &cli_rpc_prog->proctable[GF1_CLI_DEFRAG_VOLUME];
+
if (proc->fn) {
ret = proc->fn (frame, THIS, volname);
}
out:
- if (ret)
+ if (!proc && ret)
cli_out ("Defrag of Volume %s failed", volname);
return 0;
@@ -293,7 +297,6 @@ cli_cmd_volume_set_cbk (struct cli_state *state, struct cli_cmd_word *word,
char *volname = NULL;
dict_t *dict = NULL;
- proc = &cli_rpc_prog->proctable[GF1_CLI_SET_VOLUME];
frame = create_frame (THIS, THIS->ctx->pool);
if (!frame)
@@ -310,12 +313,14 @@ cli_cmd_volume_set_cbk (struct cli_state *state, struct cli_cmd_word *word,
goto out;
//TODO: Build validation here
+ proc = &cli_rpc_prog->proctable[GF1_CLI_SET_VOLUME];
+
if (proc->fn) {
ret = proc->fn (frame, THIS, dict);
}
out:
- if (ret) {
+ if (!proc && ret) {
if (dict)
dict_destroy (dict);
cli_out ("Changing option on Volume %s failed", volname);
@@ -335,8 +340,6 @@ cli_cmd_volume_add_brick_cbk (struct cli_state *state,
call_frame_t *frame = NULL;
dict_t *options = NULL;
- proc = &cli_rpc_prog->proctable[GF1_CLI_ADD_BRICK];
-
frame = create_frame (THIS, THIS->ctx->pool);
if (!frame)
goto out;
@@ -346,12 +349,14 @@ cli_cmd_volume_add_brick_cbk (struct cli_state *state,
if (ret)
goto out;
+ proc = &cli_rpc_prog->proctable[GF1_CLI_ADD_BRICK];
+
if (proc->fn) {
ret = proc->fn (frame, THIS, options);
}
out:
- if (ret) {
+ if (!proc && ret) {
char *volname = (char *) words[2];
cli_out ("Adding brick to Volume %s failed",volname );
}
@@ -369,8 +374,6 @@ cli_cmd_volume_remove_brick_cbk (struct cli_state *state,
call_frame_t *frame = NULL;
dict_t *options = NULL;
- proc = &cli_rpc_prog->proctable[GF1_CLI_REMOVE_BRICK];
-
frame = create_frame (THIS, THIS->ctx->pool);
if (!frame)
goto out;
@@ -380,12 +383,14 @@ cli_cmd_volume_remove_brick_cbk (struct cli_state *state,
if (ret)
goto out;
+ proc = &cli_rpc_prog->proctable[GF1_CLI_REMOVE_BRICK];
+
if (proc->fn) {
ret = proc->fn (frame, THIS, options);
}
out:
- if (ret) {
+ if (!proc && ret) {
char *volname = (char *) words[2];
cli_out ("Removing brick from Volume %s failed",volname );
}
diff --git a/cli/src/cli-cmd.c b/cli/src/cli-cmd.c
index 4ef8e86cd2a..043cb91d7e4 100644
--- a/cli/src/cli-cmd.c
+++ b/cli/src/cli-cmd.c
@@ -37,6 +37,11 @@
static int cmd_done;
static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
static pthread_mutex_t cond_mutex = PTHREAD_MUTEX_INITIALIZER;
+static pthread_cond_t conn = PTHREAD_COND_INITIALIZER;
+static pthread_mutex_t conn_mutex = PTHREAD_MUTEX_INITIALIZER;
+
+int cli_op_ret = 0;
+int connected = 1;
int
cli_cmd_process (struct cli_state *state, int argc, char **argv)
@@ -178,6 +183,7 @@ cli_cmd_await_response ()
pthread_mutex_lock (&cond_mutex);
{
+ cli_op_ret = 0;
while (!cmd_done) {
pthread_cond_wait (&cond, &cond_mutex);
}
@@ -187,15 +193,16 @@ cli_cmd_await_response ()
pthread_mutex_destroy (&cond_mutex);
pthread_cond_destroy (&cond);
- return 0;
+ return cli_op_ret;
}
int
-cli_cmd_broadcast_response ()
+cli_cmd_broadcast_response (int32_t status)
{
pthread_mutex_lock (&cond_mutex);
{
cmd_done = 1;
+ cli_op_ret = status;
pthread_cond_broadcast (&cond);
}
@@ -204,3 +211,37 @@ cli_cmd_broadcast_response ()
return 0;
}
+int32_t
+cli_cmd_await_connected ()
+{
+ pthread_mutex_init (&conn_mutex, NULL);
+ pthread_cond_init (&conn, NULL);
+
+ pthread_mutex_lock (&conn_mutex);
+ {
+ while (!connected) {
+ pthread_cond_wait (&conn, &conn_mutex);
+ }
+ }
+ pthread_mutex_unlock (&conn_mutex);
+
+ pthread_mutex_destroy (&conn_mutex);
+ pthread_cond_destroy (&conn);
+
+ return 0;
+}
+
+int32_t
+cli_cmd_broadcast_connected ()
+{
+ connected = 1;
+ gf_log ("", GF_LOG_NORMAL, "Connected");
+ pthread_mutex_lock (&conn_mutex);
+ {
+ pthread_cond_broadcast (&conn);
+ }
+
+ pthread_mutex_unlock (&conn_mutex);
+
+ return 0;
+}
diff --git a/cli/src/cli-cmd.h b/cli/src/cli-cmd.h
index 4a838151049..b08f5b3d5f2 100644
--- a/cli/src/cli-cmd.h
+++ b/cli/src/cli-cmd.h
@@ -44,5 +44,5 @@ void cli_cmd_tokens_destroy (char **tokens);
int cli_cmd_await_response ();
-int cli_cmd_broadcast_response ();
+int cli_cmd_broadcast_response (int32_t status);
#endif /* __CLI_CMD_H__ */
diff --git a/cli/src/cli.c b/cli/src/cli.c
index d70c67f8535..5f734d8e8f2 100644
--- a/cli/src/cli.c
+++ b/cli/src/cli.c
@@ -74,6 +74,7 @@
#include <fnmatch.h>
+extern int connected;
/* using argp for command line parsing */
static char gf_doc[] = "";
@@ -319,6 +320,41 @@ out:
}
int
+cli_rpc_notify (struct rpc_clnt *rpc, void *mydata, rpc_clnt_event_t event,
+ void *data)
+{
+ xlator_t *this = NULL;
+ int ret = 0;
+
+ this = data;
+
+ switch (event) {
+ case RPC_CLNT_CONNECT:
+ {
+
+ cli_cmd_broadcast_connected ();
+ gf_log (this->name, GF_LOG_TRACE, "got RPC_CLNT_CONNECT");
+ break;
+ }
+
+ case RPC_CLNT_DISCONNECT:
+ {
+ gf_log (this->name, GF_LOG_TRACE, "got RPC_CLNT_DISCONNECT");
+ connected = 0;
+ break;
+ }
+
+ default:
+ gf_log (this->name, GF_LOG_TRACE,
+ "got some other RPC event %d", event);
+ ret = 0;
+ break;
+ }
+
+ return ret;
+}
+
+int
parse_cmdline (int argc, char *argv[], struct cli_state *state)
{
int ret = 0;
@@ -393,8 +429,10 @@ cli_rpc_init (struct cli_state *state)
dict_t *options = NULL;
int ret = -1;
int port = CLI_GLUSTERD_PORT;
+ xlator_t *this = NULL;
+ this = THIS;
cli_rpc_prog = &cli3_1_prog;
options = dict_new ();
if (!options)
@@ -418,8 +456,11 @@ cli_rpc_init (struct cli_state *state)
if (ret)
goto out;
- rpc = rpc_clnt_init (&rpc_cfg, options, THIS->ctx, THIS->name);
+ rpc = rpc_clnt_init (&rpc_cfg, options, this->ctx, this->name);
+ if (rpc) {
+ ret = rpc_clnt_register_notify (rpc, cli_rpc_notify, this);
+ }
out:
return rpc;
}
@@ -459,6 +500,10 @@ main (int argc, char *argv[])
if (ret)
goto out;
+ global_rpc = cli_rpc_init (&state);
+ if (!global_rpc)
+ goto out;
+
state.ctx = ctx;
global_state = &state;
@@ -474,9 +519,6 @@ main (int argc, char *argv[])
if (ret)
goto out;
- global_rpc = cli_rpc_init (&state);
- if (!global_rpc)
- goto out;
ret = cli_input_init (&state);
if (ret)
diff --git a/cli/src/cli.h b/cli/src/cli.h
index 140a1af13ac..c31b4631ab8 100644
--- a/cli/src/cli.h
+++ b/cli/src/cli.h
@@ -167,4 +167,14 @@ cli_cmd_volume_replace_brick_parse (const char **words, int wordcount,
dict_t **options);
cli_local_t * cli_local_get ();
+
+int32_t
+cli_cmd_await_connected ();
+
+int32_t
+cli_cmd_broadcast_connected ();
+
+int
+cli_rpc_notify (struct rpc_clnt *rpc, void *mydata, rpc_clnt_event_t event,
+ void *data);
#endif /* __CLI_H__ */
diff --git a/cli/src/cli3_1-cops.c b/cli/src/cli3_1-cops.c
index 6ebe6ddbe40..8e6c76ae688 100644
--- a/cli/src/cli3_1-cops.c
+++ b/cli/src/cli3_1-cops.c
@@ -33,6 +33,7 @@
#include "protocol-common.h"
extern rpc_clnt_prog_t *cli_rpc_prog;
+extern int cli_op_ret;
int
gf_cli3_1_probe_cbk (struct rpc_req *req, struct iovec *iov,
@@ -60,7 +61,7 @@ gf_cli3_1_probe_cbk (struct rpc_req *req, struct iovec *iov,
ret = rsp.op_ret;
out:
- cli_cmd_broadcast_response ();
+ cli_cmd_broadcast_response (ret);
return ret;
}
@@ -90,7 +91,7 @@ gf_cli3_1_deprobe_cbk (struct rpc_req *req, struct iovec *iov,
ret = rsp.op_ret;
out:
- cli_cmd_broadcast_response ();
+ cli_cmd_broadcast_response (ret);
return ret;
}
@@ -191,7 +192,7 @@ gf_cli3_1_list_friends_cbk (struct rpc_req *req, struct iovec *iov,
ret = 0;
out:
- cli_cmd_broadcast_response ();
+ cli_cmd_broadcast_response (ret);
if (ret)
cli_out ("Command Execution Failed");
@@ -299,7 +300,7 @@ gf_cli3_1_get_volume_cbk (struct rpc_req *req, struct iovec *iov,
ret = 0;
out:
- cli_cmd_broadcast_response ();
+ cli_cmd_broadcast_response (ret);
if (ret)
cli_out ("Command Execution Failed");
@@ -341,7 +342,7 @@ gf_cli3_1_create_volume_cbk (struct rpc_req *req, struct iovec *iov,
ret = rsp.op_ret;
out:
- cli_cmd_broadcast_response ();
+ cli_cmd_broadcast_response (ret);
return ret;
}
@@ -379,7 +380,7 @@ gf_cli3_1_delete_volume_cbk (struct rpc_req *req, struct iovec *iov,
ret = rsp.op_ret;
out:
- cli_cmd_broadcast_response ();
+ cli_cmd_broadcast_response (ret);
gf_log ("", GF_LOG_NORMAL, "Returning with %d", ret);
return ret;
}
@@ -419,7 +420,7 @@ gf_cli3_1_start_volume_cbk (struct rpc_req *req, struct iovec *iov,
ret = rsp.op_ret;
out:
- cli_cmd_broadcast_response ();
+ cli_cmd_broadcast_response (ret);
return ret;
}
@@ -458,7 +459,7 @@ gf_cli3_1_stop_volume_cbk (struct rpc_req *req, struct iovec *iov,
ret = rsp.op_ret;
out:
- cli_cmd_broadcast_response ();
+ cli_cmd_broadcast_response (ret);
return ret;
}
@@ -497,7 +498,7 @@ gf_cli3_1_defrag_volume_cbk (struct rpc_req *req, struct iovec *iov,
ret = rsp.op_ret;
out:
- cli_cmd_broadcast_response ();
+ cli_cmd_broadcast_response (ret);
return ret;
}
@@ -582,7 +583,7 @@ gf_cli3_1_add_brick_cbk (struct rpc_req *req, struct iovec *iov,
ret = rsp.op_ret;
out:
- cli_cmd_broadcast_response ();
+ cli_cmd_broadcast_response (ret);
return ret;
}
diff --git a/cli/src/input.c b/cli/src/input.c
index a577a0f4c13..25a7cb62dd9 100644
--- a/cli/src/input.c
+++ b/cli/src/input.c
@@ -41,6 +41,7 @@ cli_batch (void *d)
state = d;
+ cli_cmd_await_connected ();
ret = cli_cmd_process (state, state->argc, state->argv);
gf_log ("", GF_LOG_NORMAL, "Exiting with: %d", ret);
exit (ret);
diff --git a/rpc/rpc-lib/src/rpc-clnt.c b/rpc/rpc-lib/src/rpc-clnt.c
index 15173d0501d..f2c2736e05e 100644
--- a/rpc/rpc-lib/src/rpc-clnt.c
+++ b/rpc/rpc-lib/src/rpc-clnt.c
@@ -643,6 +643,7 @@ rpc_clnt_handle_reply (struct rpc_clnt *clnt, rpc_transport_pollin_t *pollin)
rpc_request_info_t *request_info = NULL;
int ret = -1;
struct rpc_req req = {0, };
+ int cbk_ret = -1;
conn = &clnt->conn;
@@ -666,7 +667,8 @@ rpc_clnt_handle_reply (struct rpc_clnt *clnt, rpc_transport_pollin_t *pollin)
"failed");
}
- saved_frame->cbkfn (&req, req.rsp, req.rspcnt, saved_frame->frame);
+ cbk_ret = saved_frame->cbkfn (&req, req.rsp, req.rspcnt,
+ saved_frame->frame);
if (ret == 0) {
rpc_clnt_reply_deinit (&req);
@@ -679,7 +681,7 @@ out:
GF_FREE (saved_frame);
}
- return ret;
+ return cbk_ret;
}