From 8071909e84b6a479a6b5be1eddd15e8b16fc1a80 Mon Sep 17 00:00:00 2001 From: Amar Tumballi Date: Fri, 27 Aug 2010 06:45:38 +0000 Subject: rpc: server to client callback mechanism Signed-off-by: Amar Tumballi Signed-off-by: Vijay Bellur --- glusterfsd/src/glusterfsd-mgmt.c | 52 +++++--- rpc/rpc-lib/src/protocol-common.h | 18 ++- rpc/rpc-lib/src/rpc-clnt.c | 82 +++++++++++- rpc/rpc-lib/src/rpc-clnt.h | 43 +++++- rpc/rpc-lib/src/rpc-transport.h | 3 + rpc/rpc-lib/src/rpcsvc.c | 185 ++++++++++++++++++++++++++ rpc/rpc-lib/src/rpcsvc.h | 11 +- rpc/rpc-transport/socket/src/socket.c | 2 + xlators/mgmt/glusterd/src/glusterd-op-sm.c | 91 +------------ xlators/mgmt/glusterd/src/glusterd-utils.c | 2 +- xlators/mgmt/glusterd/src/glusterd-utils.h | 2 - xlators/mgmt/glusterd/src/glusterd-volgen.c | 2 + xlators/mgmt/glusterd/src/glusterd.c | 48 ++++--- xlators/mgmt/glusterd/src/glusterd.h | 5 + xlators/protocol/client/src/Makefile.am | 3 +- xlators/protocol/client/src/client-callback.c | 60 +++++++++ xlators/protocol/client/src/client.c | 5 + xlators/protocol/server/src/server.c | 13 ++ xlators/protocol/server/src/server.h | 1 + 19 files changed, 496 insertions(+), 132 deletions(-) create mode 100644 xlators/protocol/client/src/client-callback.c diff --git a/glusterfsd/src/glusterfsd-mgmt.c b/glusterfsd/src/glusterfsd-mgmt.c index e6cfa6ac6d1..2602d5431bc 100644 --- a/glusterfsd/src/glusterfsd-mgmt.c +++ b/glusterfsd/src/glusterfsd-mgmt.c @@ -43,6 +43,36 @@ static char is_mgmt_rpc_reconnect; typedef ssize_t (*mgmt_serialize_t) (struct iovec outmsg, void *args); + +int glusterfs_mgmt_pmap_signin (glusterfs_ctx_t *ctx); +int glusterfs_volfile_fetch (glusterfs_ctx_t *ctx); +int glusterfs_process_volfp (glusterfs_ctx_t *ctx, FILE *fp); + +int +mgmt_cbk_spec (void *data) +{ + glusterfs_ctx_t *ctx = NULL; + + ctx = glusterfs_ctx_get (); + gf_log ("mgmt", GF_LOG_INFO, "Volume file changed"); + + glusterfs_volfile_fetch (ctx); + return 0; +} + +rpcclnt_cb_actor_t gluster_cbk_actors[] = { + [GF_CBK_FETCHSPEC] = {"FETCHSPEC", GF_CBK_FETCHSPEC, mgmt_cbk_spec }, +}; + + +struct rpcclnt_cb_program mgmt_cbk_prog = { + .progname = "GlusterFS Callback", + .prognum = GLUSTER_CBK_PROGRAM, + .progver = GLUSTER_CBK_VERSION, + .actors = gluster_cbk_actors, + .numactors = GF_CBK_MAXVALUE, +}; + char *clnt_pmap_procs[GF_PMAP_MAXVALUE] = { [GF_PMAP_NULL] = "NULL", [GF_PMAP_PORTBYBRICK] = "PORTBYBRICK", @@ -74,11 +104,6 @@ rpc_clnt_prog_t clnt_handshake_prog = { .procnames = clnt_handshake_procs, }; - -int glusterfs_mgmt_pmap_signin (glusterfs_ctx_t *ctx); -int glusterfs_volfile_fetch (glusterfs_ctx_t *ctx); -int glusterfs_process_volfp (glusterfs_ctx_t *ctx, FILE *fp); - int mgmt_submit_request (void *req, call_frame_t *frame, glusterfs_ctx_t *ctx, @@ -133,7 +158,6 @@ out: /* XXX: move these into @ctx */ static char oldvolfile[131072]; static int oldvollen = 0; -static void *timer; int mgmt_getspec_cbk (struct rpc_req *req, struct iovec *iov, int count, @@ -145,7 +169,6 @@ mgmt_getspec_cbk (struct rpc_req *req, struct iovec *iov, int count, int ret = 0; ssize_t size = 0; FILE *tmpfp = NULL; - struct timeval tv = {0, }; frame = myframe; ctx = frame->this->ctx; @@ -194,11 +217,6 @@ mgmt_getspec_cbk (struct rpc_req *req, struct iovec *iov, int count, } out: - tv.tv_sec = 1; - timer = gf_timer_call_after (ctx, tv, - (gf_timer_cbk_t) glusterfs_volfile_fetch, - ctx); - STACK_DESTROY (frame->root); if (rsp.spec) @@ -216,12 +234,6 @@ glusterfs_volfile_fetch (glusterfs_ctx_t *ctx) int ret = 0; call_frame_t *frame = NULL; - { - if (timer) - gf_timer_call_cancel (ctx, timer); - timer = NULL; - } - cmd_args = &ctx->cmd_args; frame = create_frame (THIS, ctx->pool); @@ -318,6 +330,10 @@ glusterfs_mgmt_init (glusterfs_ctx_t *ctx) if (ret) goto out; + ret = rpcclnt_cbk_program_register (rpc, &mgmt_cbk_prog); + if (ret) + goto out; + out: return ret; } diff --git a/rpc/rpc-lib/src/protocol-common.h b/rpc/rpc-lib/src/protocol-common.h index 4df5a554fec..fdb42dfe663 100644 --- a/rpc/rpc-lib/src/protocol-common.h +++ b/rpc/rpc-lib/src/protocol-common.h @@ -140,6 +140,13 @@ enum gf_probe_resp { GF_PROBE_FRIEND, }; +enum gf_cbk_procnum { + GF_CBK_NULL = 0, + GF_CBK_FETCHSPEC, + GF_CBK_INO_FLUSH, + GF_CBK_MAXVALUE, +}; + #define GLUSTER3_1_FOP_PROGRAM 1298437 /* Completely random */ #define GLUSTER3_1_FOP_VERSION 310 /* 3.1.0 */ #define GLUSTER3_1_FOP_PROCCNT GFS3_OP_MAXVALUE @@ -152,10 +159,13 @@ enum gf_probe_resp { #define GLUSTER3_1_CLI_VERSION 1 /* 0.0.1 */ #define GLUSTER3_1_CLI_PROCCNT GF1_CLI_MAXVALUE -#define GLUSTER_HNDSK_PROGRAM 14398633 /* Completely random */ -#define GLUSTER_HNDSK_VERSION 1 /* 0.0.1 */ +#define GLUSTER_HNDSK_PROGRAM 14398633 /* Completely random */ +#define GLUSTER_HNDSK_VERSION 1 /* 0.0.1 */ + +#define GLUSTER_PMAP_PROGRAM 34123456 +#define GLUSTER_PMAP_VERSION 1 -#define GLUSTER_PMAP_PROGRAM 34123456 -#define GLUSTER_PMAP_VERSION 1 +#define GLUSTER_CBK_PROGRAM 52743234 /* Completely random */ +#define GLUSTER_CBK_VERSION 1 /* 0.0.1 */ #endif /* !_PROTOCOL_COMMON_H */ diff --git a/rpc/rpc-lib/src/rpc-clnt.c b/rpc/rpc-lib/src/rpc-clnt.c index eac9f875066..8d923ed5f43 100644 --- a/rpc/rpc-lib/src/rpc-clnt.c +++ b/rpc/rpc-lib/src/rpc-clnt.c @@ -31,6 +31,7 @@ #include "rpc-transport.h" #include "protocol-common.h" #include "mem-pool.h" +#include "xdr-rpc.h" void rpc_clnt_reply_deinit (struct rpc_req *req, struct mem_pool *pool); @@ -653,6 +654,49 @@ out: return ret; } +int +rpc_clnt_handle_cbk (struct rpc_clnt *clnt, rpc_transport_pollin_t *msg) +{ + char *msgbuf = NULL; + rpcclnt_cb_program_t *program = NULL; + struct rpc_msg rpcmsg; + struct iovec progmsg; /* RPC Program payload */ + size_t msglen = 0; + int found = 0; + int ret = -1; + int procnum = 0; + + msgbuf = msg->vector[0].iov_base; + msglen = msg->vector[0].iov_len; + + ret = xdr_to_rpc_call (msgbuf, msglen, &rpcmsg, &progmsg, NULL,NULL); + if (ret == -1) { + gf_log ("rpc-clnt", GF_LOG_ERROR, "RPC call decoding failed"); + goto out; + } + + gf_log ("rpc-clnt", GF_LOG_INFO, "RPC XID: %lx, Ver: %ld, Program: %ld," + " ProgVers: %ld, Proc: %ld", rpc_call_xid (&rpcmsg), + rpc_call_rpcvers (&rpcmsg), rpc_call_program (&rpcmsg), + rpc_call_progver (&rpcmsg), rpc_call_progproc (&rpcmsg)); + + procnum = rpc_call_progproc (&rpcmsg); + + list_for_each_entry (program, &clnt->programs, program) { + if ((program->prognum == rpc_call_program (&rpcmsg)) + && (program->progver == rpc_call_progver (&rpcmsg))) { + found = 1; + break; + } + } + if (found && (procnum < program->numactors) && + (program->actors[procnum].actor)) { + program->actors[procnum].actor (&progmsg); + } + +out: + return ret; +} int rpc_clnt_handle_reply (struct rpc_clnt *clnt, rpc_transport_pollin_t *pollin) @@ -796,7 +840,10 @@ rpc_clnt_notify (rpc_transport_t *trans, void *mydata, case RPC_TRANSPORT_MSG_RECEIVED: { pollin = data; - ret = rpc_clnt_handle_reply (clnt, pollin); + if (pollin->is_reply) + ret = rpc_clnt_handle_reply (clnt, pollin); + else + ret = rpc_clnt_handle_cbk (clnt, pollin); /* ret = clnt->notifyfn (clnt, clnt->mydata, RPC_CLNT_MSG, * data); */ @@ -943,6 +990,8 @@ rpc_clnt_init (struct rpc_clnt_config *config, dict_t *options, goto out; } + INIT_LIST_HEAD (&rpc->programs); + out: return rpc; } @@ -1168,6 +1217,37 @@ out: return request_iob; } +int +rpcclnt_cbk_program_register (struct rpc_clnt *clnt, + rpcclnt_cb_program_t *program) +{ + int ret = -1; + + if (!clnt) + goto out; + + if (program->actors == NULL) + goto out; + + INIT_LIST_HEAD (&program->program); + + list_add_tail (&program->program, &clnt->programs); + + ret = 0; + gf_log ("rpc-clnt", GF_LOG_DEBUG, "New program registered: %s, Num: %d," + " Ver: %d", program->progname, program->prognum, + program->progver); + +out: + if (ret == -1) { + gf_log ("rpc-clnt", GF_LOG_ERROR, "Program registration failed:" + " %s, Num: %d, Ver: %d", program->progname, + program->prognum, program->progver); + } + + return ret; +} + int rpc_clnt_submit (struct rpc_clnt *rpc, rpc_clnt_prog_t *prog, diff --git a/rpc/rpc-lib/src/rpc-clnt.h b/rpc/rpc-lib/src/rpc-clnt.h index 2381aaa087c..a0251c7c551 100644 --- a/rpc/rpc-lib/src/rpc-clnt.h +++ b/rpc/rpc-lib/src/rpc-clnt.h @@ -83,6 +83,38 @@ typedef struct rpc_clnt_program { int numproc; } rpc_clnt_prog_t; +typedef int (*rpcclnt_cb_fn) (void *data); + +/* The descriptor for each procedure/actor that runs + * over the RPC service. + */ +typedef struct rpcclnt_actor_desc { + char procname[32]; + int procnum; + rpcclnt_cb_fn actor; +} rpcclnt_cb_actor_t; + +/* Describes a program and its version along with the function pointers + * required to handle the procedures/actors of each program/version. + * Never changed ever by any thread so no need for a lock. + */ +typedef struct rpcclnt_cb_program { + char progname[32]; + int prognum; + int progver; + rpcclnt_cb_actor_t *actors; /* All procedure handlers */ + int numactors; /* Num actors in actor array */ + + /* Program specific state handed to actors */ + void *private; + + + /* list member to link to list of registered services with rpc_clnt */ + struct list_head program; +} rpcclnt_cb_program_t; + + + #define RPC_MAX_AUTH_BYTES 400 typedef struct rpc_auth_data { int flavour; @@ -141,6 +173,9 @@ struct rpc_clnt { void *mydata; uint64_t xid; + /* list of cb programs registered with rpc-clnt */ + struct list_head programs; + /* Memory pool for rpc_req_t */ struct mem_pool *reqpool; @@ -171,7 +206,7 @@ int rpc_clnt_register_notify (struct rpc_clnt *rpc, rpc_clnt_notify_t fn, * also be filled with pointer to buffer to hold header and length * of the header. */ - + int rpc_clnt_submit (struct rpc_clnt *rpc, rpc_clnt_prog_t *prog, int procnum, fop_cbk_fn_t cbkfn, struct iovec *proghdr, int proghdrcount, @@ -190,4 +225,10 @@ void rpc_clnt_reconnect (void *trans_ptr); void rpc_clnt_reconfig (struct rpc_clnt *rpc, struct rpc_clnt_config *config); +/* All users of RPC services should use this API to register their + * procedure handlers. + */ +int rpcclnt_cbk_program_register (struct rpc_clnt *svc, + rpcclnt_cb_program_t *program); + #endif /* !_RPC_CLNT_H */ diff --git a/rpc/rpc-lib/src/rpc-transport.h b/rpc/rpc-lib/src/rpc-transport.h index c5b6f382e7b..cccae5f261d 100644 --- a/rpc/rpc-lib/src/rpc-transport.h +++ b/rpc/rpc-lib/src/rpc-transport.h @@ -166,6 +166,7 @@ struct rpc_transport_pollin { char vectored; void *private; struct iobref *iobref; + char is_reply; }; typedef struct rpc_transport_pollin rpc_transport_pollin_t; @@ -196,6 +197,8 @@ struct rpc_transport { void *notify_data; peer_info_t peerinfo; peer_info_t myinfo; + + struct list_head list; }; struct rpc_transport_ops { diff --git a/rpc/rpc-lib/src/rpcsvc.c b/rpc/rpc-lib/src/rpcsvc.c index 5b5c2998c5e..ee3d674c2eb 100644 --- a/rpc/rpc-lib/src/rpcsvc.c +++ b/rpc/rpc-lib/src/rpcsvc.c @@ -46,6 +46,8 @@ #include #include +#include "xdr-rpcclnt.h" + struct rpcsvc_program gluster_dump_prog; #define rpcsvc_alloc_request(svc, request) \ @@ -1119,6 +1121,189 @@ err: return txrecord; } +inline int +rpcsvc_get_callid (rpcsvc_t *rpc) +{ + return GF_UNIVERSAL_ANSWER; +} + +int +rpcsvc_fill_callback (int prognum, int progver, int procnum, int payload, + uint64_t xid, struct rpc_msg *request) +{ + int ret = -1; + + if (!request) { + goto out; + } + + memset (request, 0, sizeof (*request)); + + request->rm_xid = xid; + request->rm_direction = CALL; + + request->rm_call.cb_rpcvers = 2; + request->rm_call.cb_prog = prognum; + request->rm_call.cb_vers = progver; + request->rm_call.cb_proc = procnum; + + request->rm_call.cb_cred.oa_flavor = AUTH_NONE; + request->rm_call.cb_cred.oa_base = NULL; + request->rm_call.cb_cred.oa_length = 0; + + request->rm_call.cb_verf.oa_flavor = AUTH_NONE; + request->rm_call.cb_verf.oa_base = NULL; + request->rm_call.cb_verf.oa_length = 0; + + ret = 0; +out: + return ret; +} + + +struct iovec +rpcsvc_callback_build_header (char *recordstart, size_t rlen, + struct rpc_msg *request, size_t payload) +{ + struct iovec requesthdr = {0, }; + struct iovec txrecord = {0, 0}; + int ret = -1; + size_t fraglen = 0; + + ret = rpc_request_to_xdr (request, recordstart, rlen, &requesthdr); + if (ret == -1) { + gf_log ("rpcsvc", GF_LOG_DEBUG, + "Failed to create RPC request"); + goto out; + } + + fraglen = payload + requesthdr.iov_len; + gf_log ("rpcsvc", GF_LOG_TRACE, "Request fraglen %zu, payload: %zu, " + "rpc hdr: %zu", fraglen, payload, requesthdr.iov_len); + + txrecord.iov_base = recordstart; + + /* Remember, this is only the vec for the RPC header and does not + * include the payload above. We needed the payload only to calculate + * the size of the full fragment. This size is sent in the fragment + * header. + */ + txrecord.iov_len = requesthdr.iov_len; + +out: + return txrecord; +} + +struct iobuf * +rpcsvc_callback_build_record (rpcsvc_t *rpc, int prognum, int progver, + int procnum, size_t payload, uint64_t xid, + struct iovec *recbuf) +{ + struct rpc_msg request = {0, }; + struct iobuf *request_iob = NULL; + char *record = NULL; + struct iovec recordhdr = {0, }; + size_t pagesize = 0; + int ret = -1; + + if ((!rpc) || (!recbuf)) { + goto out; + } + + /* First, try to get a pointer into the buffer which the RPC + * layer can use. + */ + request_iob = iobuf_get (rpc->ctx->iobuf_pool); + if (!request_iob) { + gf_log ("rpcsvc", GF_LOG_ERROR, "Failed to get iobuf"); + goto out; + } + + pagesize = ((struct iobuf_pool *)rpc->ctx->iobuf_pool)->page_size; + + record = iobuf_ptr (request_iob); /* Now we have it. */ + + /* Fill the rpc structure and XDR it into the buffer got above. */ + ret = rpcsvc_fill_callback (prognum, progver, procnum, payload, xid, + &request); + if (ret == -1) { + gf_log ("rpcsvc", GF_LOG_DEBUG, "cannot build a rpc-request " + "xid (%"PRIu64")", xid); + goto out; + } + + recordhdr = rpcsvc_callback_build_header (record, pagesize, &request, + payload); + + if (!recordhdr.iov_base) { + gf_log ("rpc-clnt", GF_LOG_ERROR, "Failed to build record " + " header"); + iobuf_unref (request_iob); + request_iob = NULL; + recbuf->iov_base = NULL; + goto out; + } + + recbuf->iov_base = recordhdr.iov_base; + recbuf->iov_len = recordhdr.iov_len; + +out: + return request_iob; +} + +int +rpcsvc_callback_submit (rpcsvc_t *rpc, rpc_transport_t *trans, + rpcsvc_cbk_program_t *prog, int procnum, + struct iovec *proghdr, int proghdrcount) +{ + struct iobuf *request_iob = NULL; + struct iovec rpchdr = {0,}; + rpc_transport_req_t req; + int ret = -1; + int proglen = 0; + uint64_t callid = 0; + + if (!rpc) { + goto out; + } + + memset (&req, 0, sizeof (req)); + + callid = rpcsvc_get_callid (rpc); + + if (proghdr) { + proglen += iov_length (proghdr, proghdrcount); + } + + request_iob = rpcsvc_callback_build_record (rpc, prog->prognum, + prog->progver, procnum, + proglen, callid, + &rpchdr); + if (!request_iob) { + gf_log ("rpcsvc", GF_LOG_DEBUG, + "cannot build rpc-record"); + goto out; + } + + req.msg.rpchdr = &rpchdr; + req.msg.rpchdrcount = 1; + req.msg.proghdr = proghdr; + req.msg.proghdrcount = proghdrcount; + + ret = rpc_transport_submit_request (trans, &req); + if (ret == -1) { + gf_log ("rpc-clnt", GF_LOG_DEBUG, + "transmission of rpc-request failed"); + goto out; + } + + ret = 0; + +out: + iobuf_unref (request_iob); + + return ret; +} inline int rpcsvc_transport_submit (rpc_transport_t *trans, struct iovec *hdrvec, diff --git a/rpc/rpc-lib/src/rpcsvc.h b/rpc/rpc-lib/src/rpcsvc.h index 10b20af0a88..fca7d047a7b 100644 --- a/rpc/rpc-lib/src/rpcsvc.h +++ b/rpc/rpc-lib/src/rpcsvc.h @@ -375,7 +375,11 @@ struct rpcsvc_program { struct list_head program; }; - +typedef struct rpcsvc_cbk_program { + char *progname; + int prognum; + int progver; +} rpcsvc_cbk_program_t; /* All users of RPC services should use this API to register their * procedure handlers. */ @@ -525,4 +529,9 @@ rpcsvc_combine_gen_spec_volume_checks (int gen, int spec); extern char * rpcsvc_volume_allowed (dict_t *options, char *volname); + +int rpcsvc_callback_submit (rpcsvc_t *rpc, rpc_transport_t *trans, + rpcsvc_cbk_program_t *prog, int procnum, + struct iovec *proghdr, int proghdrcount); + #endif diff --git a/rpc/rpc-transport/socket/src/socket.c b/rpc/rpc-transport/socket/src/socket.c index 6d2d584d111..26b56fad577 100644 --- a/rpc/rpc-transport/socket/src/socket.c +++ b/rpc/rpc-transport/socket/src/socket.c @@ -1477,6 +1477,8 @@ __socket_proto_state_machine (rpc_transport_t *this, ret = -1; goto out; } + if (priv->incoming.msg_type == REPLY) + (*pollin)->is_reply = 1; priv->incoming.request_info = NULL; } diff --git a/xlators/mgmt/glusterd/src/glusterd-op-sm.c b/xlators/mgmt/glusterd/src/glusterd-op-sm.c index ecbe7b150ff..11fca774a9f 100644 --- a/xlators/mgmt/glusterd/src/glusterd-op-sm.c +++ b/xlators/mgmt/glusterd/src/glusterd-op-sm.c @@ -254,87 +254,6 @@ out: return ret; } -int -glusterd_volume_create_generate_volfiles (glusterd_volinfo_t *volinfo) -{ - int32_t ret = -1; - char cmd_str[8192] = {0,}; - char path[PATH_MAX] = {0,}; - glusterd_conf_t *priv = NULL; - xlator_t *this = NULL; - char bricks[8192] = {0,}; - glusterd_brickinfo_t *brickinfo = NULL; - int32_t len = 0; - - this = THIS; - GF_ASSERT (this); - priv = this->private; - - GF_ASSERT (priv); - GF_ASSERT (volinfo); - - GLUSTERD_GET_VOLUME_DIR(path, volinfo, priv); - if (!volinfo->port) { - //volinfo->port = ++glusterfs_port; - } - - list_for_each_entry (brickinfo, &volinfo->bricks, brick_list) { - snprintf (bricks + len, 8192 - len, "%s:%s ", - brickinfo->hostname, brickinfo->path); - len = strlen (bricks); - } - - gf_log ("", GF_LOG_DEBUG, "Brick string: %s", bricks); - - switch (volinfo->type) { - - case GF_CLUSTER_TYPE_REPLICATE: - { - snprintf (cmd_str, 8192, - "%s/bin/glusterfs-volgen --portmapper-mode -n %s " - " -c %s -r 1 %s -p %d --num-replica %d", - GFS_PREFIX, volinfo->volname, path, bricks, - volinfo->port, volinfo->sub_count); - ret = gf_system (cmd_str); - gf_log ("", 1, "%s", cmd_str); - break; - } - - case GF_CLUSTER_TYPE_STRIPE: - { - snprintf (cmd_str, 8192, - "%s/bin/glusterfs-volgen --portmapper-mode -n %s " - " -c %s -r 0 %s -p %d --num-stripe %d", - GFS_PREFIX, volinfo->volname, path, bricks, - volinfo->port, volinfo->sub_count); - ret = gf_system (cmd_str); - gf_log ("", 1, "%s", cmd_str); - break; - } - - case GF_CLUSTER_TYPE_NONE: - { - snprintf (cmd_str, 8192, - "%s/bin/glusterfs-volgen --portmapper-mode " - " -n %s -c %s %s -p %d", - GFS_PREFIX, volinfo->volname, path, bricks, - volinfo->port); - ret = gf_system (cmd_str); - gf_log ("", 1, "%s", cmd_str); - break; - } - - default: - gf_log ("", GF_LOG_ERROR, "Unkown type: %d", - volinfo->type); - ret = -1; - } -//out: - gf_log ("", GF_LOG_DEBUG, "Returning %d", ret); - return ret; -} - - static int glusterd_op_stage_create_volume (gd1_mgmt_stage_op_req *req) @@ -952,8 +871,7 @@ glusterd_op_add_brick (gd1_mgmt_stage_op_req *req) if (!ret && (!uuid_compare (brickinfo->uuid, priv->uuid)) && (GLUSTERD_STATUS_STARTED == volinfo->status)) { - ret = - glusterd_volume_create_generate_volfiles (volinfo); + ret = glusterd_create_volfiles (volinfo); if (ret) goto out; @@ -975,7 +893,7 @@ glusterd_op_add_brick (gd1_mgmt_stage_op_req *req) } if (!glfs_started) { - ret = glusterd_volume_create_generate_volfiles (volinfo); + ret = glusterd_create_volfiles (volinfo); if (ret) goto out; } @@ -1892,8 +1810,7 @@ glusterd_op_remove_brick (gd1_mgmt_stage_op_req *req) if ((!uuid_compare (brickinfo->uuid, priv->uuid)) && (GLUSTERD_STATUS_STARTED == volinfo->status)) { - ret = - glusterd_volume_create_generate_volfiles (volinfo); + ret = glusterd_create_volfiles (volinfo); if (ret) goto out; @@ -1918,7 +1835,7 @@ glusterd_op_remove_brick (gd1_mgmt_stage_op_req *req) } if (!glfs_stopped) { - ret = glusterd_volume_create_generate_volfiles (volinfo); + ret = glusterd_create_volfiles (volinfo); if (ret) goto out; } diff --git a/xlators/mgmt/glusterd/src/glusterd-utils.c b/xlators/mgmt/glusterd/src/glusterd-utils.c index 770ee86d43c..cbafb81ec4b 100644 --- a/xlators/mgmt/glusterd/src/glusterd-utils.c +++ b/xlators/mgmt/glusterd/src/glusterd-utils.c @@ -1356,7 +1356,7 @@ glusterd_import_friend_volume (dict_t *vols, int count) ret = glusterd_store_update_volume (volinfo); } - ret = glusterd_volume_create_generate_volfiles (volinfo); + ret = glusterd_create_volfiles (volinfo); if (ret) goto out; diff --git a/xlators/mgmt/glusterd/src/glusterd-utils.h b/xlators/mgmt/glusterd/src/glusterd-utils.h index 1e9469ce0fd..6af21a490fe 100644 --- a/xlators/mgmt/glusterd/src/glusterd-utils.h +++ b/xlators/mgmt/glusterd/src/glusterd-utils.h @@ -127,6 +127,4 @@ glusterd_compare_friend_data (dict_t *vols, int32_t *status); int glusterd_volume_compute_cksum (glusterd_volinfo_t *volinfo); -int -glusterd_volume_create_generate_volfiles (glusterd_volinfo_t *volinfo); #endif diff --git a/xlators/mgmt/glusterd/src/glusterd-volgen.c b/xlators/mgmt/glusterd/src/glusterd-volgen.c index 10ee80e359c..ee7d6839197 100644 --- a/xlators/mgmt/glusterd/src/glusterd-volgen.c +++ b/xlators/mgmt/glusterd/src/glusterd-volgen.c @@ -1683,6 +1683,8 @@ glusterd_create_volfiles (glusterd_volinfo_t *volinfo) goto out; } + ret = glusterd_fetchspec_notify (THIS); + out: return ret; } diff --git a/xlators/mgmt/glusterd/src/glusterd.c b/xlators/mgmt/glusterd/src/glusterd.c index 1d19c7ed80a..d23f06190d7 100644 --- a/xlators/mgmt/glusterd/src/glusterd.c +++ b/xlators/mgmt/glusterd/src/glusterd.c @@ -52,6 +52,12 @@ extern struct rpc_clnt_program glusterd3_1_mgmt_prog; extern struct rpcsvc_program gluster_pmap_prog; extern glusterd_op_info_t opinfo; +rpcsvc_cbk_program_t glusterd_cbk_prog = { + .progname = "Gluster Callback", + .prognum = GLUSTER_CBK_PROGRAM, + .progver = GLUSTER_CBK_VERSION, +}; + static int glusterd_opinfo_init () @@ -100,7 +106,24 @@ glusterd_uuid_init () return 0; } +int +glusterd_fetchspec_notify (xlator_t *this) +{ + int ret = -1; + glusterd_conf_t *priv = NULL; + rpc_transport_t *trans = NULL; + + priv = this->private; + list_for_each_entry (trans, &priv->xprt_list, list) { + rpcsvc_callback_submit (priv->rpc, trans, &glusterd_cbk_prog, + GF_CBK_FETCHSPEC, NULL, 0); + } + + ret = 0; + + return ret; +} int glusterd_priv (xlator_t *this) @@ -135,6 +158,7 @@ glusterd_rpcsvc_notify (rpcsvc_t *rpc, void *xl, rpcsvc_event_t event, { xlator_t *this = NULL; rpc_transport_t *xprt = NULL; + glusterd_conf_t *priv = NULL; if (!xl || !data) { gf_log ("glusterd", GF_LOG_WARNING, @@ -145,13 +169,19 @@ glusterd_rpcsvc_notify (rpcsvc_t *rpc, void *xl, rpcsvc_event_t event, this = xl; xprt = data; + priv = this->private; + switch (event) { case RPCSVC_EVENT_ACCEPT: { + INIT_LIST_HEAD (&xprt->list); + + list_add_tail (&xprt->list, &priv->xprt_list); break; } case RPCSVC_EVENT_DISCONNECT: { + list_del (&xprt->list); pmap_registry_remove (this, 0, NULL, xprt); break; } @@ -328,11 +358,12 @@ init (xlator_t *this) conf->mgmt = &glusterd3_1_mgmt_prog; strncpy (conf->workdir, dirname, PATH_MAX); + INIT_LIST_HEAD (&conf->xprt_list); + this->private = conf; //this->ctx->top = this; ret = glusterd_uuid_init (); - if (ret < 0) goto out; @@ -390,22 +421,11 @@ int notify (xlator_t *this, int32_t event, void *data, ...) { int ret = 0; - //transport_t *trans = data; - //peer_info_t *peerinfo = NULL; - //peer_info_t *myinfo = NULL; -/* if (trans != NULL) { - peerinfo = &(trans->peerinfo); - myinfo = &(trans->myinfo); - } -*/ switch (event) { - case GF_EVENT_POLLIN: - // ret = glusterd_pollin (this, trans); break; - case GF_EVENT_POLLERR: break; @@ -422,10 +442,6 @@ notify (xlator_t *this, int32_t event, void *data, ...) } - -//struct xlator_mops mops = { -//}; - struct xlator_fops fops = { }; diff --git a/xlators/mgmt/glusterd/src/glusterd.h b/xlators/mgmt/glusterd/src/glusterd.h index 0f2004779c8..f3e864a5f6e 100644 --- a/xlators/mgmt/glusterd/src/glusterd.h +++ b/xlators/mgmt/glusterd/src/glusterd.h @@ -69,6 +69,7 @@ typedef struct { struct pmap_registry *pmap; struct list_head volumes; struct list_head hostnames; + struct list_head xprt_list; glusterd_store_handle_t *handle; } glusterd_conf_t; @@ -337,4 +338,8 @@ glusterd_remove_brick (rpcsvc_request_t *req, dict_t *dict); int glusterd_xfer_cli_deprobe_resp (rpcsvc_request_t *req, int32_t op_ret, int32_t op_errno, char *hostname); + +int +glusterd_fetchspec_notify (xlator_t *this); + #endif diff --git a/xlators/protocol/client/src/Makefile.am b/xlators/protocol/client/src/Makefile.am index 159faf268a7..007810e9901 100644 --- a/xlators/protocol/client/src/Makefile.am +++ b/xlators/protocol/client/src/Makefile.am @@ -8,7 +8,8 @@ client_la_LIBADD = $(top_builddir)/libglusterfs/src/libglusterfs.la \ $(top_builddir)/rpc/rpc-lib/src/libgfrpc.la \ $(top_builddir)/rpc/xdr/src/libgfxdr.la -client_la_SOURCES = client.c client-helpers.c client3_1-fops.c client-handshake.c +client_la_SOURCES = client.c client-helpers.c client3_1-fops.c \ + client-handshake.c client-callback.c noinst_HEADERS = client.h client-mem-types.h AM_CFLAGS = -fPIC -D_FILE_OFFSET_BITS=64 -D_GNU_SOURCE -Wall -D$(GF_HOST_OS) \ diff --git a/xlators/protocol/client/src/client-callback.c b/xlators/protocol/client/src/client-callback.c new file mode 100644 index 00000000000..51bd3375845 --- /dev/null +++ b/xlators/protocol/client/src/client-callback.c @@ -0,0 +1,60 @@ +/* + Copyright (c) 2010 Gluster, Inc. + This file is part of GlusterFS. + + GlusterFS is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published + by the Free Software Foundation; either version 3 of the License, + or (at your option) any later version. + + GlusterFS is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see + . +*/ + +#ifndef _CONFIG_H +#define _CONFIG_H +#include "config.h" +#endif + +#include "client.h" +#include "rpc-clnt.h" + +int +client_cbk_null (void *data) +{ + return 0; +} + +int +client_cbk_fetchspec (void *data) +{ + gf_log ("", 1, "here i am"); + return 0; +} + +int +client_cbk_ino_flush (void *data) +{ + return 0; +} + +rpcclnt_cb_actor_t gluster_cbk_actors[] = { + [GF_CBK_NULL] = {"NULL", GF_CBK_NULL, client_cbk_null }, + [GF_CBK_FETCHSPEC] = {"FETCHSPEC", GF_CBK_FETCHSPEC, client_cbk_fetchspec }, + [GF_CBK_INO_FLUSH] = {"INO_FLUSH", GF_CBK_INO_FLUSH, client_cbk_ino_flush }, +}; + + +struct rpcclnt_cb_program gluster_cbk_prog = { + .progname = "GlusterFS Callback", + .prognum = GLUSTER_CBK_PROGRAM, + .progver = GLUSTER_CBK_VERSION, + .actors = gluster_cbk_actors, + .numactors = GF_CBK_MAXVALUE, +}; diff --git a/xlators/protocol/client/src/client.c b/xlators/protocol/client/src/client.c index c840d96f54c..458ecfa8f88 100644 --- a/xlators/protocol/client/src/client.c +++ b/xlators/protocol/client/src/client.c @@ -34,6 +34,7 @@ extern rpc_clnt_prog_t clnt_handshake_prog; extern rpc_clnt_prog_t clnt_dump_prog; +extern struct rpcclnt_cb_program gluster_cbk_prog; int client_handshake (xlator_t *this, struct rpc_clnt *rpc); void client_start_ping (void *data); @@ -1730,6 +1731,10 @@ client_init_rpc (xlator_t *this) conf->handshake = &clnt_handshake_prog; conf->dump = &clnt_dump_prog; + ret = rpcclnt_cbk_program_register (conf->rpc, &gluster_cbk_prog); + if (ret) + goto out; + ret = 0; gf_log (this->name, GF_LOG_DEBUG, "client init successful"); diff --git a/xlators/protocol/server/src/server.c b/xlators/protocol/server/src/server.c index 262da3e3158..8a7d9da0bbf 100644 --- a/xlators/protocol/server/src/server.c +++ b/xlators/protocol/server/src/server.c @@ -123,6 +123,10 @@ server_submit_reply (call_frame_t *frame, rpcsvc_request_t *req, void *arg, ret = rpcsvc_submit_generic (req, &rsp, 1, payload, payloadcount, iobref); + /* TODO: this is demo purpose only */ + /* ret = rpcsvc_callback_submit (req->svc, req->trans, req->prog, + GF_CBK_NULL, &rsp, 1); + */ /* Now that we've done our job of handing the message to the RPC layer * we can safely unref the iob in the hope that RPC layer must have * ref'ed the iob on receiving into the txlist. @@ -366,6 +370,7 @@ server_rpc_notify (rpcsvc_t *rpc, void *xl, rpcsvc_event_t event, xlator_t *this = NULL; rpc_transport_t *xprt = NULL; server_connection_t *conn = NULL; + server_conf_t *conf = NULL; if (!xl || !data) { @@ -376,6 +381,7 @@ server_rpc_notify (rpcsvc_t *rpc, void *xl, rpcsvc_event_t event, this = xl; xprt = data; + conf = this->private; switch (event) { case RPCSVC_EVENT_ACCEPT: @@ -388,6 +394,10 @@ server_rpc_notify (rpcsvc_t *rpc, void *xl, rpcsvc_event_t event, xprt->protocol_private = conn; */ + INIT_LIST_HEAD (&xprt->list); + + list_add_tail (&xprt->list, &conf->xprt_list); + break; } case RPCSVC_EVENT_DISCONNECT: @@ -395,6 +405,8 @@ server_rpc_notify (rpcsvc_t *rpc, void *xl, rpcsvc_event_t event, if (conn) server_connection_put (this, conn); + list_del (&xprt->list); + break; default: break; @@ -449,6 +461,7 @@ init (xlator_t *this) GF_VALIDATE_OR_GOTO(this->name, conf, out); INIT_LIST_HEAD (&conf->conns); + INIT_LIST_HEAD (&conf->xprt_list); pthread_mutex_init (&conf->mutex, NULL); this->private = conf; diff --git a/xlators/protocol/server/src/server.h b/xlators/protocol/server/src/server.h index 62e5ef886cf..b58e8a5fada 100644 --- a/xlators/protocol/server/src/server.h +++ b/xlators/protocol/server/src/server.h @@ -99,6 +99,7 @@ struct server_conf { dict_t *auth_modules; pthread_mutex_t mutex; struct list_head conns; + struct list_head xprt_list; }; typedef struct server_conf server_conf_t; -- cgit