diff options
| author | Csaba Henk <csaba@redhat.com> | 2012-05-27 03:56:24 +0530 | 
|---|---|---|
| committer | Vijay Bellur <vijay@gluster.com> | 2012-06-13 08:37:41 -0700 | 
| commit | 118ce698e8af425bf75ceab2c9e71cfdaa0ac848 (patch) | |
| tree | 83c642e4f2b60ffec25cbaf4c18dd08f24dcbcaa | |
| parent | 1877c8ea84adfc6c8943bba806e410de5eba84a7 (diff) | |
geo-rep: checkpointing
- gluster vol geo-rep M S conf checkpoint <LABEL|now>
  sets a checkpoint with LABEL (the keyword "now" is special,
  it's rendered to the label "as of <timestamp of current time>")
  that's used to refer to the checkpoint in the sequel.
  (Technically, gsyncd makes a note of the xtime of master's root
  as of setting the checkpoint, called the "checkpoint target".)
- gluster vol geo-rep M S conf \!checkpoint
  deletes the checkpoint.
- gluster vol geo-rep M S stat
  if status is OK, and there is a checkpoint configured, the checkpoint
  info is appended to status (either "not yet reached", or
  "completed at <timestamp of completion>").
  (Technically, the worker runs a thread that monitors / serializes /
  verifies checkpoint status, and answers checkpoint status requests
  through a UNIX socket; monitoring boils down to querying the xtime
  of slave's root and comparing with the target.)
- gluster vol geo-rep M S conf log-file | xargs grep checkpoint
  displays the checkpoint history. Set, delete and completion events
  are logged properly.
Change-Id: I4398e0819f1504e6e496b4209e91a0e156e1a0f8
BUG: 826512
Signed-off-by: Csaba Henk <csaba@redhat.com>
Reviewed-on: http://review.gluster.com/3491
Tested-by: Gluster Build System <jenkins@build.gluster.com>
Reviewed-by: Venky Shankar <vshankar@redhat.com>
| -rw-r--r-- | cli/src/cli-cmd-parser.c | 23 | ||||
| -rw-r--r-- | xlators/features/marker/utils/syncdaemon/gsyncd.py | 56 | ||||
| -rw-r--r-- | xlators/features/marker/utils/syncdaemon/master.py | 129 | ||||
| -rw-r--r-- | xlators/features/marker/utils/syncdaemon/syncdutils.py | 6 | ||||
| -rw-r--r-- | xlators/mgmt/glusterd/src/glusterd-geo-rep.c | 206 | ||||
| -rw-r--r-- | xlators/mgmt/glusterd/src/glusterd.c | 7 | 
6 files changed, 397 insertions, 30 deletions
diff --git a/cli/src/cli-cmd-parser.c b/cli/src/cli-cmd-parser.c index f447ccae3d5..931d89ae1bd 100644 --- a/cli/src/cli-cmd-parser.c +++ b/cli/src/cli-cmd-parser.c @@ -1577,6 +1577,29 @@ cli_cmd_gsync_set_parse (const char **words, int wordcount, dict_t **options)                          }                          append_str[append_len - 2] = '\0'; +                        /* "checkpoint now" is special: we resolve that "now" */ +                        if (strcmp (words[cmdi + 1], "checkpoint") == 0 && +                            strcmp (append_str, "now") == 0) { +                                struct timeval tv = {0,}; +                                struct tm     *tm = NULL; + +                                ret = gettimeofday (&tv, NULL); +                                if (ret == -1) +                                         goto out; +                                tm = localtime (&tv.tv_sec); + +                                GF_FREE (append_str); +                                append_str = GF_CALLOC (1, 300, cli_mt_append_str); +                                if (!append_str) { +                                        ret = -1; +                                        goto out; +                                } +                                strcpy (append_str, "as of "); +                                strftime (append_str + strlen ("as of "), +                                          300 - strlen ("as of "), +                                          "%Y-%m-%d %H:%M:%S", tm); +                        } +                          ret = dict_set_dynstr (dict, "op_value", append_str);                  } diff --git a/xlators/features/marker/utils/syncdaemon/gsyncd.py b/xlators/features/marker/utils/syncdaemon/gsyncd.py index 9ac32ce4267..d68cea6725e 100644 --- a/xlators/features/marker/utils/syncdaemon/gsyncd.py +++ b/xlators/features/marker/utils/syncdaemon/gsyncd.py @@ -58,6 +58,25 @@ class GLogger(Logger):          logging.getLogger().handlers = []          logging.basicConfig(**lprm) +    @classmethod +    def _gsyncd_loginit(cls, **kw): +        lkw = {} +        if gconf.log_level: +            lkw['level'] = gconf.log_level +        if kw.get('log_file'): +            if kw['log_file'] in ('-', '/dev/stderr'): +                lkw['stream'] = sys.stderr +            elif kw['log_file'] == '/dev/stdout': +                lkw['stream'] = sys.stdout +            else: +                lkw['filename'] = kw['log_file'] + +        cls.setup(label=kw.get('label'), **lkw) + +        lkw.update({'saved_label': kw.get('label')}) +        gconf.log_metadata = lkw +        gconf.log_exit = True +  def startup(**kw):      """set up logging, pidfile grabbing, daemonization"""      if getattr(gconf, 'pid_file', None) and kw.get('go_daemon') != 'postconn': @@ -88,22 +107,7 @@ def startup(**kw):          select((x,), (), ())          os.close(x) -    lkw = {} -    if gconf.log_level: -        lkw['level'] = gconf.log_level -    if kw.get('log_file'): -        if kw['log_file'] in ('-', '/dev/stderr'): -            lkw['stream'] = sys.stderr -        elif kw['log_file'] == '/dev/stdout': -            lkw['stream'] = sys.stdout -        else: -            lkw['filename'] = kw['log_file'] - -    GLogger.setup(label=kw.get('label'), **lkw) - -    lkw.update({'saved_label': kw.get('label')}) -    gconf.log_metadata = lkw -    gconf.log_exit = True +    GLogger._gsyncd_loginit(**kw)  def main():      """main routine, signal/exception handling boilerplates""" @@ -166,6 +170,8 @@ def main_i():      op.add_option('--sync-jobs',           metavar='N',     type=int, default=3)      op.add_option('--turns',               metavar='N',     type=int, default=0, help=SUPPRESS_HELP)      op.add_option('--allow-network',       metavar='IPS',   default='') +    op.add_option('--state-socket-unencoded', metavar='SOCKF', type=str, action='callback', callback=store_abs) +    op.add_option('--checkpoint',          metavar='LABEL', default='')      op.add_option('-c', '--config-file',   metavar='CONF',  type=str, action='callback', callback=store_local)      # duh. need to specify dest or value will be mapped to None :S @@ -278,6 +284,7 @@ def main_i():          rconf['config_file'] = os.path.join(os.path.dirname(sys.argv[0]), "conf/gsyncd.conf")      gcnf = GConffile(rconf['config_file'], canon_peers, defaults.__dict__, opts.__dict__, namedict) +    checkpoint_change = False      if confdata:          opt_ok = norm(confdata.opt) in tunables + [None]          if confdata.op == 'check': @@ -293,7 +300,14 @@ def main_i():              gcnf.set(confdata.opt, confdata.val, confdata.rx)          elif confdata.op == 'del':              gcnf.delete(confdata.opt, confdata.rx) -        return +        # when modifying checkpoint, it's important to make a log +        # of that, so in that case we go on to set up logging even +        # if its just config invocation +        if confdata.opt == 'checkpoint' and confdata.op in ('set', 'del') and \ +           not confdata.rx: +            checkpoint_change = True +        if not checkpoint_change: +            return      gconf.__dict__.update(defaults.__dict__)      gcnf.update_to(gconf.__dict__) @@ -331,6 +345,14 @@ def main_i():              raise GsyncdError('cannot recognize log level "%s"' % lvl0)          gconf.log_level = lvl2 +    if checkpoint_change: +        GLogger._gsyncd_loginit(log_file=gconf.log_file, label='conf') +        if confdata.op == 'set': +            logging.info('checkpoint %s set' % confdata.val) +        elif confdata.op == 'del': +            logging.info('checkpoint info was reset') +        return +      go_daemon = rconf['go_daemon']      be_monitor = rconf.get('monitor') diff --git a/xlators/features/marker/utils/syncdaemon/master.py b/xlators/features/marker/utils/syncdaemon/master.py index 8e196f8c5f4..4826037f134 100644 --- a/xlators/features/marker/utils/syncdaemon/master.py +++ b/xlators/features/marker/utils/syncdaemon/master.py @@ -5,12 +5,20 @@ import stat  import random  import signal  import logging +import socket  import errno -from errno import ENOENT, ENODATA +from errno import ENOENT, ENODATA, EPIPE  from threading import currentThread, Condition, Lock +from datetime import datetime +try: +    from hashlib import md5 as md5 +except ImportError: +    # py 2.4 +    from md5 import new as md5  from gconf import gconf -from syncdutils import FreeObject, Thread, GsyncdError, boolify +from syncdutils import FreeObject, Thread, GsyncdError, boolify, \ +                       escape, unescape, select  URXTIME = (-1, 0) @@ -113,6 +121,122 @@ class GMaster(object):          # the actual volinfo we make use of          self.volinfo = None          self.terminate = False +        self.checkpoint_thread = None + +    @staticmethod +    def _checkpt_param(chkpt, prm, timish=True): +        """use config backend to lookup a parameter belonging to +           checkpoint @chkpt""" +        cprm = getattr(gconf, 'checkpoint_' + prm, None) +        if not cprm: +            return +        chkpt_mapped, val = cprm.split(':', 1) +        if unescape(chkpt_mapped) != chkpt: +            return +        if timish: +            val = tuple(int(x) for x in val.split(".")) +        return val + +    @staticmethod +    def _set_checkpt_param(chkpt, prm, val, timish=True): +        """use config backend to store a parameter associated +           with checkpoint @chkpt""" +        if timish: +            val = "%d.%d" % tuple(val) +        gconf.configinterface.set('checkpoint_' + prm, "%s:%s" % (escape(chkpt), val)) + +    @staticmethod +    def humantime(*tpair): +        """format xtime-like (sec, nsec) pair to human readable format""" +        ts = datetime.fromtimestamp(float('.'.join(str(n) for n in tpair))).\ +               strftime("%Y-%m-%d %H:%M:%S") +        if len(tpair) > 1: +            ts += '.' + str(tpair[1]) +        return ts + +    def checkpt_service(self, chan, chkpt, tgt): +        """checkpoint service loop + +        monitor and verify checkpoint status for @chkpt, and listen +        for incoming requests for whom we serve a pretty-formatted +        status report""" +        if not chkpt: +            # dummy loop for the case when there is no checkpt set +            while True: +                select([chan], [], []) +                conn, _ = chan.accept() +                conn.send('\0') +                conn.close() +        completed = self._checkpt_param(chkpt, 'completed') +        while True: +            s,_,_ = select([chan], [], [], (not completed) and 5 or None) +            # either request made and we re-check to not +            # give back stale data, or we still hunting for completion +            if tgt < self.volmark: +                # indexing has been reset since setting the checkpoint +                status = "is invalid" +            else: +                xtr = self.xtime('.', self.slave) +                if isinstance(xtr, int): +                    raise GsyncdError("slave root directory is unaccessible (%s)", +                                      os.strerror(xtr)) +                ncompleted = (xtr >= tgt) +                if completed and not ncompleted: # stale data +                    logging.warn("completion time %s for checkpoint %s became stale" % \ +                                 (self.humantime(*completed), chkpt)) +                    completed = None +                    gconf.confdata.delete('checkpoint-completed') +                if ncompleted and not completed: # just reaching completion +                    completed = [ int(x) for x in ("%.6f" % time.time()).split('.') ] +                    self._set_checkpt_param(chkpt, 'completed', completed) +                    logging.info("checkpoint %s completed" % chkpt) +                status = completed and \ +                  "completed at " + self.humantime(completed[0]) or \ +                  "not reached yet" +            if s: +                conn = None +                try: +                    conn, _ = chan.accept() +                    try: +                        conn.send("  | checkpoint %s %s\0" % (chkpt, status)) +                    except: +                        exc = sys.exc_info()[1] +                        if (isinstance(exc, OSError) or isinstance(exc, IOError)) and \ +                           exc.errno == EPIPE: +                            logging.debug('checkpoint client disconnected') +                        else: +                            raise +                finally: +                    if conn: +                        conn.close() + +    def start_checkpoint_thread(self): +        """prepare and start checkpoint service""" +        if self.checkpoint_thread or not getattr(gconf, 'state_socket_unencoded', None): +            return +        chan = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) +        state_socket = "/tmp/%s.socket" % md5(gconf.state_socket_unencoded).hexdigest() +        try: +            os.unlink(state_socket) +        except: +            if sys.exc_info()[0] == OSError: +                pass +        chan.bind(state_socket) +        chan.listen(1) +        checkpt_tgt = None +        if gconf.checkpoint: +            checkpt_tgt = self._checkpt_param(gconf.checkpoint, 'target') +            if not checkpt_tgt: +                checkpt_tgt = self.xtime('.') +                if isinstance(checkpt_tgt, int): +                    raise GsyncdError("master root directory is unaccessible (%s)", +                                      os.strerror(checkpt_tgt)) +                self._set_checkpt_param(gconf.checkpoint, 'target', checkpt_tgt) +            logging.debug("checkpoint target %d.%d has been determined for checkpoint %s" % \ +                          (checkpt_tgt[0], checkpt_tgt[1], gconf.checkpoint)) +        t = Thread(target=self.checkpt_service, args=(chan, gconf.checkpoint, checkpt_tgt)) +        t.start() +        self.checkpoint_thread = t      def crawl_loop(self):          """start the keep-alive thread and iterate .crawl""" @@ -291,6 +415,7 @@ class GMaster(object):              if self.volinfo:                  if self.volinfo['retval']:                      raise GsyncdError ("master is corrupt") +                self.start_checkpoint_thread()              else:                  if should_display_info or self.crawls == 0:                      if self.inter_master: diff --git a/xlators/features/marker/utils/syncdaemon/syncdutils.py b/xlators/features/marker/utils/syncdaemon/syncdutils.py index f786bc34326..1d4eb20032c 100644 --- a/xlators/features/marker/utils/syncdaemon/syncdutils.py +++ b/xlators/features/marker/utils/syncdaemon/syncdutils.py @@ -138,6 +138,12 @@ def finalize(*a, **kw):                      raise      if gconf.ssh_ctl_dir and not gconf.cpid:          shutil.rmtree(gconf.ssh_ctl_dir) +    if getattr(gconf, 'state_socket', None): +        try: +            os.unlink(gconf.state_socket) +        except: +            if sys.exc_info()[0] == OSError: +                pass      if gconf.log_exit:          logging.info("exiting.")      sys.stdout.flush() diff --git a/xlators/mgmt/glusterd/src/glusterd-geo-rep.c b/xlators/mgmt/glusterd/src/glusterd-geo-rep.c index 26d1bff152d..c66b2db578e 100644 --- a/xlators/mgmt/glusterd/src/glusterd-geo-rep.c +++ b/xlators/mgmt/glusterd/src/glusterd-geo-rep.c @@ -355,9 +355,9 @@ glusterd_get_slave (glusterd_volinfo_t *vol, const char *slaveurl, char **slavek  static int -glusterd_query_extutil (char *resbuf, runner_t *runner) +glusterd_query_extutil_generic (char *resbuf, size_t blen, runner_t *runner, void *data, +                                int (*fcbk)(char *resbuf, size_t blen, FILE *fp, void *data))  { -        char               *ptr = NULL;          int                 ret = 0;          runner_redir (runner, STDOUT_FILENO, RUN_PIPE); @@ -367,11 +367,9 @@ glusterd_query_extutil (char *resbuf, runner_t *runner)                  return -1;          } -        ptr = fgets(resbuf, PATH_MAX, runner_chio (runner, STDOUT_FILENO)); -        if (ptr) -                resbuf[strlen(resbuf)-1] = '\0'; //strip off \n +        ret = fcbk (resbuf, blen, runner_chio (runner, STDOUT_FILENO), data); -        ret = runner_end (runner); +        ret |= runner_end (runner);          if (ret)                  gf_log ("", GF_LOG_ERROR, "reading data from child failed"); @@ -379,6 +377,80 @@ glusterd_query_extutil (char *resbuf, runner_t *runner)  }  static int +_fcbk_singleline(char *resbuf, size_t blen, FILE *fp, void *data) +{ +        char *ptr = NULL; + +        errno = 0; +        ptr = fgets (resbuf, blen, fp); +        if (ptr) +                resbuf[strlen(resbuf)-1] = '\0'; //strip off \n + +        return errno ? -1 : 0; +} + +static int +glusterd_query_extutil (char *resbuf, runner_t *runner) +{ +        return glusterd_query_extutil_generic (resbuf, PATH_MAX, runner, NULL, +                                               _fcbk_singleline); +} + +static int +_fcbk_conftodict (char *resbuf, size_t blen, FILE *fp, void *data) +{ +        char   *ptr = NULL; +        dict_t *dict = data; +        char   *v = NULL; + +        for (;;) { +                errno = 0; +                ptr = fgets (resbuf, blen, fp); +                if (!ptr) +                        break; +                v = resbuf + strlen(resbuf) - 1; +                while (isspace (*v)) +                        /* strip trailing space */ +                        *v-- = '\0'; +                if (v == resbuf) +                        /* skip empty line */ +                        continue; +                v = strchr (resbuf, ':'); +                if (!v) +                        return -1; +                *v++ = '\0'; +                while (isspace (*v)) +                        v++; +                v = gf_strdup (v); +                if (!v) +                        return -1; +                if (dict_set_dynstr (dict, resbuf, v) != 0) { +                        GF_FREE (v); +                        return -1; +                } +        } + +        return errno ? -1 : 0; +} + +static int +glusterd_gsync_get_config (char *master, char *slave, char *gl_workdir, dict_t *dict) +{ +        /* key + value, where value must be able to accommodate a path */ +        char resbuf[256 + PATH_MAX] = {0,}; +        runner_t             runner = {0,}; + +        runinit (&runner); +        runner_add_args  (&runner, GSYNCD_PREFIX"/gsyncd", "-c", NULL); +        runner_argprintf (&runner, "%s/"GSYNC_CONF, gl_workdir); +        runner_argprintf (&runner, ":%s", master); +        runner_add_args  (&runner, slave, "--config-get-all", NULL); + +        return glusterd_query_extutil_generic (resbuf, sizeof (resbuf), +                                               &runner, dict, _fcbk_conftodict); +} + +static int  glusterd_gsync_get_param_file (char *prmfile, const char *param, char *master,                                 char *slave, char *gl_workdir)  { @@ -1309,29 +1381,112 @@ glusterd_gsync_read_frm_status (char *path, char *buf, size_t blen)  }  static int +glusterd_gsync_fetch_status_extra (char *path, char *buf, size_t blen) +{ +        char sockpath[PATH_MAX] = {0,}; +        struct sockaddr_un   sa = {0,}; +        size_t                l = 0; +        int                   s = -1; +        struct pollfd       pfd = {0,}; +        int                 ret = 0; + +        l = strlen (buf); +        /* seek to end of data in buf */ +        buf += l; +        blen -= l; + +        glusterd_set_socket_filepath (path, sockpath, sizeof (sockpath)); + +        strncpy(sa.sun_path, sockpath, sizeof(sa.sun_path)); +        if (sa.sun_path[sizeof (sa.sun_path) - 1]) +                return -1; +        sa.sun_family = AF_UNIX; + +        s = socket(AF_UNIX, SOCK_STREAM, 0); +        if (s == -1) +                return -1; + +        ret = connect (s, (struct sockaddr *)&sa, sizeof (sa)); +        if (ret == -1) +                goto out; +        pfd.fd = s; +        pfd.events = POLLIN; +        /* we don't want to hang on gsyncd */ +        if (poll (&pfd, 1, 5000) < 1 || +            !(pfd.revents & POLLIN)) { +                ret = -1; +                goto out; +        } +        ret = read(s, buf, blen); +        /* we expect a terminating 0 byte */ +        if (ret == 0 || (ret > 0 && buf[ret - 1])) +                ret = -1; +        if (ret > 0) +                ret = 0; + + out: +        close (s); +        return ret; +} + +static int +dict_get_param (dict_t *dict, char *key, char **param) +{ +        char  *dk = NULL; +        char   *s = NULL; +        char    x = '\0'; +        int   ret = 0; + +        if (dict_get_str (dict, key, param) == 0) +                return 0; + +        dk = gf_strdup (key); +        if (!key) +                return -1; + +        s = strpbrk (dk, "-_"); +        if (!s) +                return -1; +        x = (*s == '-') ? '_' : '-'; +        *s++ = x; +        while ((s = strpbrk (s, "-_"))) +                *s++ = x; + +        ret = dict_get_str (dict, dk, param); + +        GF_FREE (dk); +        return ret; +} + +static int  glusterd_read_status_file (char *master, char *slave,                             dict_t *dict)  {          glusterd_conf_t *priv = NULL;          int              ret = 0; -        char             statefile[PATH_MAX] = {0, }; +        char            *statefile = NULL;          char             buf[1024] = {0, };          char             mst[1024] = {0, };          char             slv[1024] = {0, };          char             sts[1024] = {0, };          char            *bufp = NULL; +        dict_t          *confd = NULL;          int              gsync_count = 0;          int              status = 0;          GF_ASSERT (THIS);          GF_ASSERT (THIS->private); +        confd = dict_new (); +        if (!dict) +                return -1; +          priv = THIS->private; -        ret = glusterd_gsync_get_param_file (statefile, "state", master, -                                             slave, priv->workdir); +        ret = glusterd_gsync_get_config (master, slave, priv->workdir, +                                         confd);          if (ret) { -                gf_log ("", GF_LOG_ERROR, "Unable to get the name of status" -                        "file for %s(master), %s(slave)", master, slave); +                gf_log ("", GF_LOG_ERROR, "Unable to get configuration data" +                        "for %s(master), %s(slave)", master, slave);                  goto out;          } @@ -1343,6 +1498,9 @@ glusterd_read_status_file (char *master, char *slave,          } else if (ret == -1)                  goto out; +        ret = dict_get_param (confd, "state_file", &statefile); +        if (ret) +                goto out;          ret = glusterd_gsync_read_frm_status (statefile, buf, sizeof (buf));          if (ret) {                  gf_log ("", GF_LOG_ERROR, "Unable to read the status" @@ -1350,6 +1508,30 @@ glusterd_read_status_file (char *master, char *slave,                  strncpy (buf, "defunct", sizeof (buf));                  goto done;          } +        if (strcmp (buf, "OK") != 0) +                goto done; + +        ret = dict_get_param (confd, "state_socket_unencoded", &statefile); +        if (ret) +                goto out; +        ret = glusterd_gsync_fetch_status_extra (statefile, buf, sizeof (buf)); +        if (ret) { +                gf_log ("", GF_LOG_ERROR, "Unable to fetch extra status" +                        "for %s(master), %s(slave)", master, slave); +                /* there is a slight chance that this occurs due to race +                 * -- in that case, the following options all seem bad: +                 * +                 * - suppress irregurlar behavior by just leaving status +                 *   on "OK" +                 * - freak out users with a misleading "defunct" +                 * - overload the meaning of the regular error signal +                 *   mechanism of gsyncd, that is, when status is "faulty" +                 * +                 * -- so we just come up with something new... +                 */ +                strncpy (buf, "N/A", sizeof (buf)); +                goto done; +        }   done:          ret = dict_get_int32 (dict, "gsync-count", &gsync_count); @@ -1394,6 +1576,8 @@ glusterd_read_status_file (char *master, char *slave,          ret = 0;   out: +        dict_destroy (confd); +          gf_log ("", GF_LOG_DEBUG, "Returning %d ", ret);          return ret;  } diff --git a/xlators/mgmt/glusterd/src/glusterd.c b/xlators/mgmt/glusterd/src/glusterd.c index 0dfffbbed39..a3869e6317f 100644 --- a/xlators/mgmt/glusterd/src/glusterd.c +++ b/xlators/mgmt/glusterd/src/glusterd.c @@ -500,6 +500,13 @@ configure_syncdaemon (glusterd_conf_t *conf)          runner_add_args (&runner, ".", ".", NULL);          RUN_GSYNCD_CMD; +        /* state-socket */ +        runinit_gsyncd_setrx (&runner, conf); +        runner_add_arg (&runner, "state-socket-unencoded"); +        runner_argprintf (&runner, "%s/${mastervol}/${eSlave}.socket", georepdir); +        runner_add_args (&runner, ".", ".", NULL); +        RUN_GSYNCD_CMD; +          /* log-file */          runinit_gsyncd_setrx (&runner, conf);          runner_add_args (&runner,  | 
