diff options
| author | Amar Tumballi <amar@gluster.com> | 2010-08-27 06:45:38 +0000 | 
|---|---|---|
| committer | Vijay Bellur <vijay@dev.gluster.com> | 2010-08-27 05:53:56 -0700 | 
| commit | 8071909e84b6a479a6b5be1eddd15e8b16fc1a80 (patch) | |
| tree | db3a56cef557a3c86fd983e778927c70a8665363 | |
| parent | fd282c4299a5354aa272345e312ac600d9dcc680 (diff) | |
rpc: server to client callback mechanism
Signed-off-by: Amar Tumballi <amar@gluster.com>
Signed-off-by: Vijay Bellur <vijay@dev.gluster.com>
| -rw-r--r-- | glusterfsd/src/glusterfsd-mgmt.c | 52 | ||||
| -rw-r--r-- | rpc/rpc-lib/src/protocol-common.h | 18 | ||||
| -rw-r--r-- | rpc/rpc-lib/src/rpc-clnt.c | 82 | ||||
| -rw-r--r-- | rpc/rpc-lib/src/rpc-clnt.h | 43 | ||||
| -rw-r--r-- | rpc/rpc-lib/src/rpc-transport.h | 3 | ||||
| -rw-r--r-- | rpc/rpc-lib/src/rpcsvc.c | 185 | ||||
| -rw-r--r-- | rpc/rpc-lib/src/rpcsvc.h | 11 | ||||
| -rw-r--r-- | rpc/rpc-transport/socket/src/socket.c | 2 | ||||
| -rw-r--r-- | xlators/mgmt/glusterd/src/glusterd-op-sm.c | 91 | ||||
| -rw-r--r-- | xlators/mgmt/glusterd/src/glusterd-utils.c | 2 | ||||
| -rw-r--r-- | xlators/mgmt/glusterd/src/glusterd-utils.h | 2 | ||||
| -rw-r--r-- | xlators/mgmt/glusterd/src/glusterd-volgen.c | 2 | ||||
| -rw-r--r-- | xlators/mgmt/glusterd/src/glusterd.c | 48 | ||||
| -rw-r--r-- | xlators/mgmt/glusterd/src/glusterd.h | 5 | ||||
| -rw-r--r-- | xlators/protocol/client/src/Makefile.am | 3 | ||||
| -rw-r--r-- | xlators/protocol/client/src/client-callback.c | 60 | ||||
| -rw-r--r-- | xlators/protocol/client/src/client.c | 5 | ||||
| -rw-r--r-- | xlators/protocol/server/src/server.c | 13 | ||||
| -rw-r--r-- | xlators/protocol/server/src/server.h | 1 | 
19 files changed, 496 insertions, 132 deletions
diff --git a/glusterfsd/src/glusterfsd-mgmt.c b/glusterfsd/src/glusterfsd-mgmt.c index e6cfa6ac6d1..2602d5431bc 100644 --- a/glusterfsd/src/glusterfsd-mgmt.c +++ b/glusterfsd/src/glusterfsd-mgmt.c @@ -43,6 +43,36 @@ static char is_mgmt_rpc_reconnect;  typedef ssize_t (*mgmt_serialize_t) (struct iovec outmsg, void *args); + +int glusterfs_mgmt_pmap_signin (glusterfs_ctx_t *ctx); +int glusterfs_volfile_fetch (glusterfs_ctx_t *ctx); +int glusterfs_process_volfp (glusterfs_ctx_t *ctx, FILE *fp); + +int +mgmt_cbk_spec (void *data) +{ +        glusterfs_ctx_t *ctx = NULL; + +        ctx = glusterfs_ctx_get (); +        gf_log ("mgmt", GF_LOG_INFO, "Volume file changed"); + +        glusterfs_volfile_fetch (ctx); +        return 0; +} + +rpcclnt_cb_actor_t gluster_cbk_actors[] = { +        [GF_CBK_FETCHSPEC] = {"FETCHSPEC", GF_CBK_FETCHSPEC, mgmt_cbk_spec }, +}; + + +struct rpcclnt_cb_program mgmt_cbk_prog = { +        .progname  = "GlusterFS Callback", +        .prognum   = GLUSTER_CBK_PROGRAM, +        .progver   = GLUSTER_CBK_VERSION, +        .actors    = gluster_cbk_actors, +        .numactors = GF_CBK_MAXVALUE, +}; +  char *clnt_pmap_procs[GF_PMAP_MAXVALUE] = {          [GF_PMAP_NULL]        = "NULL",          [GF_PMAP_PORTBYBRICK] = "PORTBYBRICK", @@ -74,11 +104,6 @@ rpc_clnt_prog_t clnt_handshake_prog = {          .procnames = clnt_handshake_procs,  }; - -int glusterfs_mgmt_pmap_signin (glusterfs_ctx_t *ctx); -int glusterfs_volfile_fetch (glusterfs_ctx_t *ctx); -int glusterfs_process_volfp (glusterfs_ctx_t *ctx, FILE *fp); -  int  mgmt_submit_request (void *req, call_frame_t *frame,                       glusterfs_ctx_t *ctx, @@ -133,7 +158,6 @@ out:  /* XXX: move these into @ctx */  static char oldvolfile[131072];  static int oldvollen = 0; -static void *timer;  int  mgmt_getspec_cbk (struct rpc_req *req, struct iovec *iov, int count, @@ -145,7 +169,6 @@ mgmt_getspec_cbk (struct rpc_req *req, struct iovec *iov, int count,          int                      ret   = 0;          ssize_t                  size = 0;          FILE                    *tmpfp = NULL; -        struct timeval           tv = {0, };          frame = myframe;          ctx = frame->this->ctx; @@ -194,11 +217,6 @@ mgmt_getspec_cbk (struct rpc_req *req, struct iovec *iov, int count,          }  out: -        tv.tv_sec = 1; -        timer = gf_timer_call_after (ctx, tv, -                                     (gf_timer_cbk_t) glusterfs_volfile_fetch, -                                     ctx); -          STACK_DESTROY (frame->root);          if (rsp.spec) @@ -216,12 +234,6 @@ glusterfs_volfile_fetch (glusterfs_ctx_t *ctx)          int               ret = 0;          call_frame_t     *frame = NULL; -        { -                if (timer) -                        gf_timer_call_cancel (ctx, timer); -                timer = NULL; -        } -          cmd_args = &ctx->cmd_args;          frame = create_frame (THIS, ctx->pool); @@ -318,6 +330,10 @@ glusterfs_mgmt_init (glusterfs_ctx_t *ctx)          if (ret)                  goto out; +        ret = rpcclnt_cbk_program_register (rpc, &mgmt_cbk_prog); +        if (ret) +                goto out; +  out:          return ret;  } diff --git a/rpc/rpc-lib/src/protocol-common.h b/rpc/rpc-lib/src/protocol-common.h index 4df5a554fec..fdb42dfe663 100644 --- a/rpc/rpc-lib/src/protocol-common.h +++ b/rpc/rpc-lib/src/protocol-common.h @@ -140,6 +140,13 @@ enum gf_probe_resp {  	GF_PROBE_FRIEND,  }; +enum gf_cbk_procnum { +        GF_CBK_NULL = 0, +        GF_CBK_FETCHSPEC, +        GF_CBK_INO_FLUSH, +        GF_CBK_MAXVALUE, +}; +  #define GLUSTER3_1_FOP_PROGRAM   1298437 /* Completely random */  #define GLUSTER3_1_FOP_VERSION   310 /* 3.1.0 */  #define GLUSTER3_1_FOP_PROCCNT   GFS3_OP_MAXVALUE @@ -152,10 +159,13 @@ enum gf_probe_resp {  #define GLUSTER3_1_CLI_VERSION   1   /* 0.0.1 */  #define GLUSTER3_1_CLI_PROCCNT   GF1_CLI_MAXVALUE -#define GLUSTER_HNDSK_PROGRAM   14398633 /* Completely random */ -#define GLUSTER_HNDSK_VERSION   1   /* 0.0.1 */ +#define GLUSTER_HNDSK_PROGRAM    14398633 /* Completely random */ +#define GLUSTER_HNDSK_VERSION    1   /* 0.0.1 */ + +#define GLUSTER_PMAP_PROGRAM     34123456 +#define GLUSTER_PMAP_VERSION     1 -#define GLUSTER_PMAP_PROGRAM    34123456 -#define GLUSTER_PMAP_VERSION    1 +#define GLUSTER_CBK_PROGRAM      52743234 /* Completely random */ +#define GLUSTER_CBK_VERSION      1   /* 0.0.1 */  #endif /* !_PROTOCOL_COMMON_H */ diff --git a/rpc/rpc-lib/src/rpc-clnt.c b/rpc/rpc-lib/src/rpc-clnt.c index eac9f875066..8d923ed5f43 100644 --- a/rpc/rpc-lib/src/rpc-clnt.c +++ b/rpc/rpc-lib/src/rpc-clnt.c @@ -31,6 +31,7 @@  #include "rpc-transport.h"  #include "protocol-common.h"  #include "mem-pool.h" +#include "xdr-rpc.h"  void  rpc_clnt_reply_deinit (struct rpc_req *req, struct mem_pool *pool); @@ -653,6 +654,49 @@ out:          return ret;  } +int +rpc_clnt_handle_cbk (struct rpc_clnt *clnt, rpc_transport_pollin_t *msg) +{ +        char                 *msgbuf = NULL; +        rpcclnt_cb_program_t *program = NULL; +        struct rpc_msg        rpcmsg; +        struct iovec          progmsg; /* RPC Program payload */ +        size_t                msglen = 0; +        int                   found  = 0; +        int                   ret    = -1; +        int                   procnum = 0; + +        msgbuf = msg->vector[0].iov_base; +        msglen = msg->vector[0].iov_len; + +        ret = xdr_to_rpc_call (msgbuf, msglen, &rpcmsg, &progmsg, NULL,NULL); +        if (ret == -1) { +                gf_log ("rpc-clnt", GF_LOG_ERROR, "RPC call decoding failed"); +                goto out; +        } + +        gf_log ("rpc-clnt", GF_LOG_INFO, "RPC XID: %lx, Ver: %ld, Program: %ld," +                " ProgVers: %ld, Proc: %ld", rpc_call_xid (&rpcmsg), +                rpc_call_rpcvers (&rpcmsg), rpc_call_program (&rpcmsg), +                rpc_call_progver (&rpcmsg), rpc_call_progproc (&rpcmsg)); + +        procnum = rpc_call_progproc (&rpcmsg); + +        list_for_each_entry (program, &clnt->programs, program) { +                if ((program->prognum == rpc_call_program (&rpcmsg)) +                    && (program->progver == rpc_call_progver (&rpcmsg))) { +                        found = 1; +                        break; +                } +        } +        if (found && (procnum < program->numactors) && +            (program->actors[procnum].actor)) { +                program->actors[procnum].actor (&progmsg); +        } + +out: +        return ret; +}  int  rpc_clnt_handle_reply (struct rpc_clnt *clnt, rpc_transport_pollin_t *pollin) @@ -796,7 +840,10 @@ rpc_clnt_notify (rpc_transport_t *trans, void *mydata,          case RPC_TRANSPORT_MSG_RECEIVED:          {                  pollin = data; -                ret = rpc_clnt_handle_reply (clnt, pollin); +                if (pollin->is_reply) +                        ret = rpc_clnt_handle_reply (clnt, pollin); +                else +                        ret = rpc_clnt_handle_cbk (clnt, pollin);                  /* ret = clnt->notifyfn (clnt, clnt->mydata, RPC_CLNT_MSG,                   * data);                   */ @@ -943,6 +990,8 @@ rpc_clnt_init (struct rpc_clnt_config *config, dict_t *options,                  goto out;          } +        INIT_LIST_HEAD (&rpc->programs); +  out:          return rpc;  } @@ -1168,6 +1217,37 @@ out:          return request_iob;  } +int +rpcclnt_cbk_program_register (struct rpc_clnt *clnt, +                              rpcclnt_cb_program_t *program) +{ +        int ret = -1; + +        if (!clnt) +                goto out; + +        if (program->actors == NULL) +                goto out; + +        INIT_LIST_HEAD (&program->program); + +        list_add_tail (&program->program, &clnt->programs); + +        ret = 0; +        gf_log ("rpc-clnt", GF_LOG_DEBUG, "New program registered: %s, Num: %d," +                " Ver: %d", program->progname, program->prognum, +                program->progver); + +out: +        if (ret == -1) { +                gf_log ("rpc-clnt", GF_LOG_ERROR, "Program registration failed:" +                        " %s, Num: %d, Ver: %d", program->progname, +                        program->prognum, program->progver); +        } + +        return ret; +} +  int  rpc_clnt_submit (struct rpc_clnt *rpc, rpc_clnt_prog_t *prog, diff --git a/rpc/rpc-lib/src/rpc-clnt.h b/rpc/rpc-lib/src/rpc-clnt.h index 2381aaa087c..a0251c7c551 100644 --- a/rpc/rpc-lib/src/rpc-clnt.h +++ b/rpc/rpc-lib/src/rpc-clnt.h @@ -83,6 +83,38 @@ typedef struct rpc_clnt_program {          int                   numproc;  } rpc_clnt_prog_t; +typedef int (*rpcclnt_cb_fn) (void *data); + +/* The descriptor for each procedure/actor that runs + * over the RPC service. + */ +typedef struct rpcclnt_actor_desc { +        char                 procname[32]; +        int                  procnum; +        rpcclnt_cb_fn        actor; +} rpcclnt_cb_actor_t; + +/* Describes a program and its version along with the function pointers + * required to handle the procedures/actors of each program/version. + * Never changed ever by any thread so no need for a lock. + */ +typedef struct rpcclnt_cb_program { +        char                      progname[32]; +        int                       prognum; +        int                       progver; +        rpcclnt_cb_actor_t       *actors;        /* All procedure handlers */ +        int                       numactors;     /* Num actors in actor array */ + +        /* Program specific state handed to actors */ +        void                    *private; + + +        /* list member to link to list of registered services with rpc_clnt */ +        struct list_head        program; +} rpcclnt_cb_program_t; + + +  #define RPC_MAX_AUTH_BYTES   400  typedef struct rpc_auth_data {          int             flavour; @@ -141,6 +173,9 @@ struct rpc_clnt {          void                  *mydata;          uint64_t               xid; +        /* list of cb programs registered with rpc-clnt */ +        struct list_head       programs; +          /* Memory pool for rpc_req_t */          struct mem_pool       *reqpool; @@ -171,7 +206,7 @@ int rpc_clnt_register_notify (struct rpc_clnt *rpc, rpc_clnt_notify_t fn,   *    also be filled with pointer to buffer to hold header and length   *    of the header.   */ -  +  int rpc_clnt_submit (struct rpc_clnt *rpc, rpc_clnt_prog_t *prog,                       int procnum, fop_cbk_fn_t cbkfn,                       struct iovec *proghdr, int proghdrcount, @@ -190,4 +225,10 @@ void rpc_clnt_reconnect (void *trans_ptr);  void rpc_clnt_reconfig (struct rpc_clnt *rpc, struct rpc_clnt_config *config); +/* All users of RPC services should use this API to register their + * procedure handlers. + */ +int rpcclnt_cbk_program_register (struct rpc_clnt *svc, +                                  rpcclnt_cb_program_t *program); +  #endif /* !_RPC_CLNT_H */ diff --git a/rpc/rpc-lib/src/rpc-transport.h b/rpc/rpc-lib/src/rpc-transport.h index c5b6f382e7b..cccae5f261d 100644 --- a/rpc/rpc-lib/src/rpc-transport.h +++ b/rpc/rpc-lib/src/rpc-transport.h @@ -166,6 +166,7 @@ struct rpc_transport_pollin {          char vectored;          void *private;          struct iobref *iobref; +        char is_reply;  };  typedef struct rpc_transport_pollin rpc_transport_pollin_t; @@ -196,6 +197,8 @@ struct rpc_transport {          void                      *notify_data;  	peer_info_t                peerinfo;  	peer_info_t                myinfo; + +        struct list_head           list;  };  struct rpc_transport_ops { diff --git a/rpc/rpc-lib/src/rpcsvc.c b/rpc/rpc-lib/src/rpcsvc.c index 5b5c2998c5e..ee3d674c2eb 100644 --- a/rpc/rpc-lib/src/rpcsvc.c +++ b/rpc/rpc-lib/src/rpcsvc.c @@ -46,6 +46,8 @@  #include <stdarg.h>  #include <stdio.h> +#include "xdr-rpcclnt.h" +  struct rpcsvc_program gluster_dump_prog;  #define rpcsvc_alloc_request(svc, request)                              \ @@ -1119,6 +1121,189 @@ err:          return txrecord;  } +inline int +rpcsvc_get_callid (rpcsvc_t *rpc) +{ +        return GF_UNIVERSAL_ANSWER; +} + +int +rpcsvc_fill_callback (int prognum, int progver, int procnum, int payload, +                      uint64_t xid, struct rpc_msg *request) +{ +        int   ret          = -1; + +        if (!request) { +                goto out; +        } + +        memset (request, 0, sizeof (*request)); + +        request->rm_xid = xid; +        request->rm_direction = CALL; + +        request->rm_call.cb_rpcvers = 2; +        request->rm_call.cb_prog = prognum; +        request->rm_call.cb_vers = progver; +        request->rm_call.cb_proc = procnum; + +        request->rm_call.cb_cred.oa_flavor = AUTH_NONE; +        request->rm_call.cb_cred.oa_base   = NULL; +        request->rm_call.cb_cred.oa_length = 0; + +        request->rm_call.cb_verf.oa_flavor = AUTH_NONE; +        request->rm_call.cb_verf.oa_base = NULL; +        request->rm_call.cb_verf.oa_length = 0; + +        ret = 0; +out: +        return ret; +} + + +struct iovec +rpcsvc_callback_build_header (char *recordstart, size_t rlen, +                             struct rpc_msg *request, size_t payload) +{ +        struct iovec    requesthdr = {0, }; +        struct iovec    txrecord   = {0, 0}; +        int             ret        = -1; +        size_t          fraglen    = 0; + +        ret = rpc_request_to_xdr (request, recordstart, rlen, &requesthdr); +        if (ret == -1) { +                gf_log ("rpcsvc", GF_LOG_DEBUG, +                        "Failed to create RPC request"); +                goto out; +        } + +        fraglen = payload + requesthdr.iov_len; +        gf_log ("rpcsvc", GF_LOG_TRACE, "Request fraglen %zu, payload: %zu, " +                "rpc hdr: %zu", fraglen, payload, requesthdr.iov_len); + +        txrecord.iov_base = recordstart; + +        /* Remember, this is only the vec for the RPC header and does not +         * include the payload above. We needed the payload only to calculate +         * the size of the full fragment. This size is sent in the fragment +         * header. +         */ +        txrecord.iov_len = requesthdr.iov_len; + +out: +        return txrecord; +} + +struct iobuf * +rpcsvc_callback_build_record (rpcsvc_t *rpc, int prognum, int progver, +                              int procnum, size_t payload, uint64_t xid, +                              struct iovec *recbuf) +{ +        struct rpc_msg           request     = {0, }; +        struct iobuf            *request_iob = NULL; +        char                    *record      = NULL; +        struct iovec             recordhdr   = {0, }; +        size_t                   pagesize    = 0; +        int                      ret         = -1; + +        if ((!rpc) || (!recbuf)) { +                goto out; +        } + +        /* First, try to get a pointer into the buffer which the RPC +         * layer can use. +         */ +        request_iob = iobuf_get (rpc->ctx->iobuf_pool); +        if (!request_iob) { +                gf_log ("rpcsvc", GF_LOG_ERROR, "Failed to get iobuf"); +                goto out; +        } + +        pagesize = ((struct iobuf_pool *)rpc->ctx->iobuf_pool)->page_size; + +        record = iobuf_ptr (request_iob);  /* Now we have it. */ + +        /* Fill the rpc structure and XDR it into the buffer got above. */ +        ret = rpcsvc_fill_callback (prognum, progver, procnum, payload, xid, +                                    &request); +        if (ret == -1) { +                gf_log ("rpcsvc", GF_LOG_DEBUG, "cannot build a rpc-request " +                        "xid (%"PRIu64")", xid); +                goto out; +        } + +        recordhdr = rpcsvc_callback_build_header (record, pagesize, &request, +                                                  payload); + +        if (!recordhdr.iov_base) { +                gf_log ("rpc-clnt", GF_LOG_ERROR, "Failed to build record " +                        " header"); +                iobuf_unref (request_iob); +                request_iob = NULL; +                recbuf->iov_base = NULL; +                goto out; +        } + +        recbuf->iov_base = recordhdr.iov_base; +        recbuf->iov_len = recordhdr.iov_len; + +out: +        return request_iob; +} + +int +rpcsvc_callback_submit (rpcsvc_t *rpc, rpc_transport_t *trans, +                        rpcsvc_cbk_program_t *prog, int procnum, +                        struct iovec *proghdr, int proghdrcount) +{ +        struct iobuf          *request_iob = NULL; +        struct iovec           rpchdr      = {0,}; +        rpc_transport_req_t    req; +        int                    ret         = -1; +        int                    proglen     = 0; +        uint64_t               callid      = 0; + +        if (!rpc) { +                goto out; +        } + +        memset (&req, 0, sizeof (req)); + +        callid = rpcsvc_get_callid (rpc); + +        if (proghdr) { +                proglen += iov_length (proghdr, proghdrcount); +        } + +        request_iob = rpcsvc_callback_build_record (rpc, prog->prognum, +                                                    prog->progver, procnum, +                                                    proglen, callid, +                                                    &rpchdr); +        if (!request_iob) { +                gf_log ("rpcsvc", GF_LOG_DEBUG, +                        "cannot build rpc-record"); +                goto out; +        } + +        req.msg.rpchdr = &rpchdr; +        req.msg.rpchdrcount = 1; +        req.msg.proghdr = proghdr; +        req.msg.proghdrcount = proghdrcount; + +        ret = rpc_transport_submit_request (trans, &req); +        if (ret == -1) { +                gf_log ("rpc-clnt", GF_LOG_DEBUG, +                        "transmission of rpc-request failed"); +                goto out; +        } + +        ret = 0; + +out: +        iobuf_unref (request_iob); + +        return ret; +}  inline int  rpcsvc_transport_submit (rpc_transport_t *trans, struct iovec *hdrvec, diff --git a/rpc/rpc-lib/src/rpcsvc.h b/rpc/rpc-lib/src/rpcsvc.h index 10b20af0a88..fca7d047a7b 100644 --- a/rpc/rpc-lib/src/rpcsvc.h +++ b/rpc/rpc-lib/src/rpcsvc.h @@ -375,7 +375,11 @@ struct rpcsvc_program {          struct list_head        program;  }; - +typedef struct rpcsvc_cbk_program { +        char                 *progname; +        int                   prognum; +        int                   progver; +} rpcsvc_cbk_program_t;  /* All users of RPC services should use this API to register their   * procedure handlers.   */ @@ -525,4 +529,9 @@ rpcsvc_combine_gen_spec_volume_checks (int gen, int spec);  extern char *  rpcsvc_volume_allowed (dict_t *options, char *volname); + +int rpcsvc_callback_submit (rpcsvc_t *rpc, rpc_transport_t *trans, +                            rpcsvc_cbk_program_t *prog, int procnum, +                            struct iovec *proghdr, int proghdrcount); +  #endif diff --git a/rpc/rpc-transport/socket/src/socket.c b/rpc/rpc-transport/socket/src/socket.c index 6d2d584d111..26b56fad577 100644 --- a/rpc/rpc-transport/socket/src/socket.c +++ b/rpc/rpc-transport/socket/src/socket.c @@ -1477,6 +1477,8 @@ __socket_proto_state_machine (rpc_transport_t *this,                                          ret = -1;                                          goto out;                                  } +                                if (priv->incoming.msg_type == REPLY) +                                        (*pollin)->is_reply = 1;                                  priv->incoming.request_info = NULL;                          } diff --git a/xlators/mgmt/glusterd/src/glusterd-op-sm.c b/xlators/mgmt/glusterd/src/glusterd-op-sm.c index ecbe7b150ff..11fca774a9f 100644 --- a/xlators/mgmt/glusterd/src/glusterd-op-sm.c +++ b/xlators/mgmt/glusterd/src/glusterd-op-sm.c @@ -254,87 +254,6 @@ out:          return ret;  } -int -glusterd_volume_create_generate_volfiles (glusterd_volinfo_t *volinfo) -{ -        int32_t         ret = -1; -        char            cmd_str[8192] = {0,}; -        char            path[PATH_MAX] = {0,}; -        glusterd_conf_t *priv = NULL; -        xlator_t        *this = NULL; -        char            bricks[8192] = {0,}; -        glusterd_brickinfo_t    *brickinfo = NULL; -        int32_t         len = 0; - -        this = THIS; -        GF_ASSERT (this); -        priv = this->private; - -        GF_ASSERT (priv); -        GF_ASSERT (volinfo); - -        GLUSTERD_GET_VOLUME_DIR(path, volinfo, priv); -        if (!volinfo->port) { -                //volinfo->port = ++glusterfs_port; -        } - -        list_for_each_entry (brickinfo, &volinfo->bricks, brick_list) { -                snprintf (bricks + len, 8192 - len, "%s:%s ", -                          brickinfo->hostname, brickinfo->path); -                len = strlen (bricks); -        } - -        gf_log ("", GF_LOG_DEBUG, "Brick string: %s", bricks); - -        switch (volinfo->type) { - -                case GF_CLUSTER_TYPE_REPLICATE: -                { -                        snprintf (cmd_str, 8192, -                                  "%s/bin/glusterfs-volgen --portmapper-mode -n %s " -                                  " -c %s -r 1 %s -p %d --num-replica %d", -                                  GFS_PREFIX, volinfo->volname, path, bricks, -                                  volinfo->port, volinfo->sub_count); -                        ret = gf_system (cmd_str); -                        gf_log ("", 1, "%s", cmd_str); -                        break; -                } - -                case GF_CLUSTER_TYPE_STRIPE: -                { -                        snprintf (cmd_str, 8192, -                                  "%s/bin/glusterfs-volgen --portmapper-mode -n %s " -                                  " -c %s -r 0 %s -p %d --num-stripe %d", -                                  GFS_PREFIX, volinfo->volname, path, bricks, -                                  volinfo->port, volinfo->sub_count); -                        ret = gf_system (cmd_str); -                        gf_log ("", 1, "%s", cmd_str); -                        break; -                } - -                case GF_CLUSTER_TYPE_NONE: -                { -                        snprintf (cmd_str, 8192, -                                  "%s/bin/glusterfs-volgen --portmapper-mode " -                                  " -n %s -c %s %s -p %d", -                                  GFS_PREFIX, volinfo->volname, path, bricks, -                                  volinfo->port); -                        ret = gf_system (cmd_str); -                        gf_log ("", 1, "%s", cmd_str); -                        break; -                } - -                default: -                        gf_log ("", GF_LOG_ERROR, "Unkown type: %d", -                                volinfo->type); -                        ret = -1; -        } -//out: -        gf_log ("", GF_LOG_DEBUG, "Returning %d", ret); -        return ret; -} - -  static int  glusterd_op_stage_create_volume (gd1_mgmt_stage_op_req *req) @@ -952,8 +871,7 @@ glusterd_op_add_brick (gd1_mgmt_stage_op_req *req)                  if (!ret && (!uuid_compare (brickinfo->uuid, priv->uuid)) &&                                  (GLUSTERD_STATUS_STARTED == volinfo->status)) { -                        ret = -                          glusterd_volume_create_generate_volfiles (volinfo); +                        ret = glusterd_create_volfiles (volinfo);                          if (ret)                                  goto out; @@ -975,7 +893,7 @@ glusterd_op_add_brick (gd1_mgmt_stage_op_req *req)          }          if (!glfs_started) { -                ret = glusterd_volume_create_generate_volfiles (volinfo); +                ret = glusterd_create_volfiles (volinfo);                  if (ret)                          goto out;          } @@ -1892,8 +1810,7 @@ glusterd_op_remove_brick (gd1_mgmt_stage_op_req *req)                  if ((!uuid_compare (brickinfo->uuid, priv->uuid)) &&                      (GLUSTERD_STATUS_STARTED == volinfo->status)) { -                        ret = -                          glusterd_volume_create_generate_volfiles (volinfo); +                        ret = glusterd_create_volfiles (volinfo);                          if (ret)                                  goto out; @@ -1918,7 +1835,7 @@ glusterd_op_remove_brick (gd1_mgmt_stage_op_req *req)          }          if (!glfs_stopped) { -                ret = glusterd_volume_create_generate_volfiles (volinfo); +                ret = glusterd_create_volfiles (volinfo);                  if (ret)                          goto out;          } diff --git a/xlators/mgmt/glusterd/src/glusterd-utils.c b/xlators/mgmt/glusterd/src/glusterd-utils.c index 770ee86d43c..cbafb81ec4b 100644 --- a/xlators/mgmt/glusterd/src/glusterd-utils.c +++ b/xlators/mgmt/glusterd/src/glusterd-utils.c @@ -1356,7 +1356,7 @@ glusterd_import_friend_volume (dict_t *vols, int count)                  ret = glusterd_store_update_volume (volinfo);          } -        ret = glusterd_volume_create_generate_volfiles (volinfo); +        ret = glusterd_create_volfiles (volinfo);          if (ret)                  goto out; diff --git a/xlators/mgmt/glusterd/src/glusterd-utils.h b/xlators/mgmt/glusterd/src/glusterd-utils.h index 1e9469ce0fd..6af21a490fe 100644 --- a/xlators/mgmt/glusterd/src/glusterd-utils.h +++ b/xlators/mgmt/glusterd/src/glusterd-utils.h @@ -127,6 +127,4 @@ glusterd_compare_friend_data (dict_t  *vols, int32_t *status);  int  glusterd_volume_compute_cksum (glusterd_volinfo_t  *volinfo); -int -glusterd_volume_create_generate_volfiles (glusterd_volinfo_t *volinfo);  #endif diff --git a/xlators/mgmt/glusterd/src/glusterd-volgen.c b/xlators/mgmt/glusterd/src/glusterd-volgen.c index 10ee80e359c..ee7d6839197 100644 --- a/xlators/mgmt/glusterd/src/glusterd-volgen.c +++ b/xlators/mgmt/glusterd/src/glusterd-volgen.c @@ -1683,6 +1683,8 @@ glusterd_create_volfiles (glusterd_volinfo_t *volinfo)                  goto out;          } +        ret = glusterd_fetchspec_notify (THIS); +  out:          return ret;  } diff --git a/xlators/mgmt/glusterd/src/glusterd.c b/xlators/mgmt/glusterd/src/glusterd.c index 1d19c7ed80a..d23f06190d7 100644 --- a/xlators/mgmt/glusterd/src/glusterd.c +++ b/xlators/mgmt/glusterd/src/glusterd.c @@ -52,6 +52,12 @@ extern struct rpc_clnt_program glusterd3_1_mgmt_prog;  extern struct rpcsvc_program gluster_pmap_prog;  extern glusterd_op_info_t opinfo; +rpcsvc_cbk_program_t glusterd_cbk_prog = { +        .progname  = "Gluster Callback", +        .prognum   = GLUSTER_CBK_PROGRAM, +        .progver   = GLUSTER_CBK_VERSION, +}; +  static int  glusterd_opinfo_init () @@ -100,7 +106,24 @@ glusterd_uuid_init ()          return 0;  } +int +glusterd_fetchspec_notify (xlator_t *this) +{ +        int              ret   = -1; +        glusterd_conf_t *priv  = NULL; +        rpc_transport_t *trans = NULL; + +        priv = this->private; +        list_for_each_entry (trans, &priv->xprt_list, list) { +                rpcsvc_callback_submit (priv->rpc, trans, &glusterd_cbk_prog, +                                        GF_CBK_FETCHSPEC, NULL, 0); +        } + +        ret = 0; + +        return ret; +}  int  glusterd_priv (xlator_t *this) @@ -135,6 +158,7 @@ glusterd_rpcsvc_notify (rpcsvc_t *rpc, void *xl, rpcsvc_event_t event,  {          xlator_t            *this = NULL;          rpc_transport_t     *xprt = NULL; +        glusterd_conf_t     *priv = NULL;          if (!xl || !data) {                  gf_log ("glusterd", GF_LOG_WARNING, @@ -145,13 +169,19 @@ glusterd_rpcsvc_notify (rpcsvc_t *rpc, void *xl, rpcsvc_event_t event,          this = xl;          xprt = data; +        priv = this->private; +          switch (event) {          case RPCSVC_EVENT_ACCEPT:          { +                INIT_LIST_HEAD (&xprt->list); + +                list_add_tail (&xprt->list, &priv->xprt_list);                  break;          }          case RPCSVC_EVENT_DISCONNECT:          { +                list_del (&xprt->list);                  pmap_registry_remove (this, 0, NULL, xprt);                  break;          } @@ -328,11 +358,12 @@ init (xlator_t *this)          conf->mgmt = &glusterd3_1_mgmt_prog;          strncpy (conf->workdir, dirname, PATH_MAX); +        INIT_LIST_HEAD (&conf->xprt_list); +          this->private = conf;          //this->ctx->top = this;          ret = glusterd_uuid_init (); -          if (ret < 0)                  goto out; @@ -390,22 +421,11 @@ int  notify (xlator_t *this, int32_t event, void *data, ...)  {          int          ret = 0; -        //transport_t *trans = data; -        //peer_info_t *peerinfo = NULL; -        //peer_info_t *myinfo = NULL; -/*        if (trans != NULL) { -                peerinfo = &(trans->peerinfo); -                myinfo = &(trans->myinfo); -        } -*/          switch (event) { -                  case GF_EVENT_POLLIN: -          //              ret = glusterd_pollin (this, trans);                          break; -                  case GF_EVENT_POLLERR:                          break; @@ -422,10 +442,6 @@ notify (xlator_t *this, int32_t event, void *data, ...)  } - -//struct xlator_mops mops = { -//}; -  struct xlator_fops fops = {  }; diff --git a/xlators/mgmt/glusterd/src/glusterd.h b/xlators/mgmt/glusterd/src/glusterd.h index 0f2004779c8..f3e864a5f6e 100644 --- a/xlators/mgmt/glusterd/src/glusterd.h +++ b/xlators/mgmt/glusterd/src/glusterd.h @@ -69,6 +69,7 @@ typedef struct {          struct pmap_registry *pmap;          struct list_head  volumes;          struct list_head  hostnames; +        struct list_head  xprt_list;          glusterd_store_handle_t *handle;  } glusterd_conf_t; @@ -337,4 +338,8 @@ glusterd_remove_brick (rpcsvc_request_t *req, dict_t *dict);  int  glusterd_xfer_cli_deprobe_resp (rpcsvc_request_t *req, int32_t op_ret,                                  int32_t op_errno, char *hostname); + +int +glusterd_fetchspec_notify (xlator_t *this); +  #endif diff --git a/xlators/protocol/client/src/Makefile.am b/xlators/protocol/client/src/Makefile.am index 159faf268a7..007810e9901 100644 --- a/xlators/protocol/client/src/Makefile.am +++ b/xlators/protocol/client/src/Makefile.am @@ -8,7 +8,8 @@ client_la_LIBADD = $(top_builddir)/libglusterfs/src/libglusterfs.la \  	$(top_builddir)/rpc/rpc-lib/src/libgfrpc.la \  	$(top_builddir)/rpc/xdr/src/libgfxdr.la -client_la_SOURCES = client.c client-helpers.c client3_1-fops.c client-handshake.c +client_la_SOURCES = client.c client-helpers.c client3_1-fops.c  \ +	client-handshake.c client-callback.c  noinst_HEADERS = client.h client-mem-types.h  AM_CFLAGS = -fPIC -D_FILE_OFFSET_BITS=64 -D_GNU_SOURCE -Wall -D$(GF_HOST_OS) \ diff --git a/xlators/protocol/client/src/client-callback.c b/xlators/protocol/client/src/client-callback.c new file mode 100644 index 00000000000..51bd3375845 --- /dev/null +++ b/xlators/protocol/client/src/client-callback.c @@ -0,0 +1,60 @@ +/* +  Copyright (c) 2010 Gluster, Inc. <http://www.gluster.com> +  This file is part of GlusterFS. + +  GlusterFS is free software; you can redistribute it and/or modify +  it under the terms of the GNU General Public License as published +  by the Free Software Foundation; either version 3 of the License, +  or (at your option) any later version. + +  GlusterFS is distributed in the hope that it will be useful, but +  WITHOUT ANY WARRANTY; without even the implied warranty of +  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU +  General Public License for more details. + +  You should have received a copy of the GNU General Public License +  along with this program.  If not, see +  <http://www.gnu.org/licenses/>. +*/ + +#ifndef _CONFIG_H +#define _CONFIG_H +#include "config.h" +#endif + +#include "client.h" +#include "rpc-clnt.h" + +int +client_cbk_null (void *data) +{ +        return 0; +} + +int +client_cbk_fetchspec (void *data) +{ +        gf_log ("", 1, "here i am"); +        return 0; +} + +int +client_cbk_ino_flush (void *data) +{ +        return 0; +} + +rpcclnt_cb_actor_t gluster_cbk_actors[] = { +        [GF_CBK_NULL]      = {"NULL",      GF_CBK_NULL,      client_cbk_null }, +        [GF_CBK_FETCHSPEC] = {"FETCHSPEC", GF_CBK_FETCHSPEC, client_cbk_fetchspec }, +        [GF_CBK_INO_FLUSH] = {"INO_FLUSH", GF_CBK_INO_FLUSH, client_cbk_ino_flush }, +}; + + +struct rpcclnt_cb_program gluster_cbk_prog = { +        .progname  = "GlusterFS Callback", +        .prognum   = GLUSTER_CBK_PROGRAM, +        .progver   = GLUSTER_CBK_VERSION, +        .actors    = gluster_cbk_actors, +        .numactors = GF_CBK_MAXVALUE, +}; diff --git a/xlators/protocol/client/src/client.c b/xlators/protocol/client/src/client.c index c840d96f54c..458ecfa8f88 100644 --- a/xlators/protocol/client/src/client.c +++ b/xlators/protocol/client/src/client.c @@ -34,6 +34,7 @@  extern rpc_clnt_prog_t clnt_handshake_prog;  extern rpc_clnt_prog_t clnt_dump_prog; +extern struct rpcclnt_cb_program gluster_cbk_prog;  int client_handshake (xlator_t *this, struct rpc_clnt *rpc);  void client_start_ping (void *data); @@ -1730,6 +1731,10 @@ client_init_rpc (xlator_t *this)          conf->handshake = &clnt_handshake_prog;          conf->dump      = &clnt_dump_prog; +        ret = rpcclnt_cbk_program_register (conf->rpc, &gluster_cbk_prog); +        if (ret) +                goto out; +          ret = 0;          gf_log (this->name, GF_LOG_DEBUG, "client init successful"); diff --git a/xlators/protocol/server/src/server.c b/xlators/protocol/server/src/server.c index 262da3e3158..8a7d9da0bbf 100644 --- a/xlators/protocol/server/src/server.c +++ b/xlators/protocol/server/src/server.c @@ -123,6 +123,10 @@ server_submit_reply (call_frame_t *frame, rpcsvc_request_t *req, void *arg,          ret = rpcsvc_submit_generic (req, &rsp, 1, payload, payloadcount,                                       iobref); +        /* TODO: this is demo purpose only */ +        /* ret = rpcsvc_callback_submit (req->svc, req->trans, req->prog, +                                         GF_CBK_NULL, &rsp, 1); +        */          /* Now that we've done our job of handing the message to the RPC layer           * we can safely unref the iob in the hope that RPC layer must have           * ref'ed the iob on receiving into the txlist. @@ -366,6 +370,7 @@ server_rpc_notify (rpcsvc_t *rpc, void *xl, rpcsvc_event_t event,          xlator_t            *this = NULL;          rpc_transport_t     *xprt = NULL;          server_connection_t *conn = NULL; +        server_conf_t       *conf = NULL;          if (!xl || !data) { @@ -376,6 +381,7 @@ server_rpc_notify (rpcsvc_t *rpc, void *xl, rpcsvc_event_t event,          this = xl;          xprt = data; +        conf = this->private;          switch (event) {          case RPCSVC_EVENT_ACCEPT: @@ -388,6 +394,10 @@ server_rpc_notify (rpcsvc_t *rpc, void *xl, rpcsvc_event_t event,                  xprt->protocol_private = conn;                  */ +                INIT_LIST_HEAD (&xprt->list); + +                list_add_tail (&xprt->list, &conf->xprt_list); +                  break;          }          case RPCSVC_EVENT_DISCONNECT: @@ -395,6 +405,8 @@ server_rpc_notify (rpcsvc_t *rpc, void *xl, rpcsvc_event_t event,                  if (conn)                          server_connection_put (this, conn); +                list_del (&xprt->list); +                  break;          default:                  break; @@ -449,6 +461,7 @@ init (xlator_t *this)          GF_VALIDATE_OR_GOTO(this->name, conf, out);          INIT_LIST_HEAD (&conf->conns); +        INIT_LIST_HEAD (&conf->xprt_list);          pthread_mutex_init (&conf->mutex, NULL);          this->private = conf; diff --git a/xlators/protocol/server/src/server.h b/xlators/protocol/server/src/server.h index 62e5ef886cf..b58e8a5fada 100644 --- a/xlators/protocol/server/src/server.h +++ b/xlators/protocol/server/src/server.h @@ -99,6 +99,7 @@ struct server_conf {  	dict_t                 *auth_modules;  	pthread_mutex_t         mutex;  	struct list_head        conns; +        struct list_head        xprt_list;  };  typedef struct server_conf server_conf_t;  | 
