diff options
| author | Pavan Sondur <pavan@gluster.com> | 2010-08-12 04:49:15 +0000 | 
|---|---|---|
| committer | Anand V. Avati <avati@dev.gluster.com> | 2010-08-12 03:29:54 -0700 | 
| commit | e0347526dd77f7171ae9da72fc92ca99a79dd282 (patch) | |
| tree | f39e9f192564fdd9648257d6b7252ea38cecbe6e | |
| parent | e98ebc1da4f49fba2bcaaf3212b00058e615cf29 (diff) | |
cluster/pump: Dynamically control sink connect and disconnect.
Signed-off-by: Pavan Vilas Sondur <pavan@gluster.com>
Signed-off-by: Anand V. Avati <avati@dev.gluster.com>
BUG: 1303 (Cleanup replace-brick state info)
URL: http://bugs.gluster.com/cgi-bin/bugzilla3/show_bug.cgi?id=1303
| -rw-r--r-- | xlators/cluster/afr/src/afr.h | 2 | ||||
| -rw-r--r-- | xlators/cluster/afr/src/pump.c | 459 | ||||
| -rw-r--r-- | xlators/cluster/afr/src/pump.h | 32 | 
3 files changed, 348 insertions, 145 deletions
diff --git a/xlators/cluster/afr/src/afr.h b/xlators/cluster/afr/src/afr.h index 3fa987ee83d..284eb7a1b3d 100644 --- a/xlators/cluster/afr/src/afr.h +++ b/xlators/cluster/afr/src/afr.h @@ -245,6 +245,8 @@ typedef struct _afr_local {  	int32_t  inodelk_count;  	int32_t  entrylk_count; +        dict_t  *dict; +          int (*up_down_flush_cbk) (call_frame_t *, xlator_t *);  	/*  diff --git a/xlators/cluster/afr/src/pump.c b/xlators/cluster/afr/src/pump.c index e69c028450b..b62b079aa1b 100644 --- a/xlators/cluster/afr/src/pump.c +++ b/xlators/cluster/afr/src/pump.c @@ -28,7 +28,47 @@  #include "afr-common.c" -pump_state_t +static int +pump_mark_start_pending (xlator_t *this) +{ +        afr_private_t  *priv      = NULL; +        pump_private_t *pump_priv = NULL; + +        priv      = this->private; +        pump_priv = priv->pump_private; + +        pump_priv->pump_start_pending = 1; + +        return 0; +} + +static int +is_pump_start_pending (xlator_t *this) +{ +        afr_private_t  *priv      = NULL; +        pump_private_t *pump_priv = NULL; + +        priv      = this->private; +        pump_priv = priv->pump_private; + +        return (pump_priv->pump_start_pending); +} + +static int +pump_remove_start_pending (xlator_t *this) +{ +        afr_private_t  *priv      = NULL; +        pump_private_t *pump_priv = NULL; + +        priv      = this->private; +        pump_priv = priv->pump_private; + +        pump_priv->pump_start_pending = 0; + +        return 0; +} + +static pump_state_t  pump_get_state ()  {          xlator_t *this = NULL; @@ -59,16 +99,11 @@ pump_change_state (xlator_t *this, pump_state_t state)          pump_state_t state_old;          pump_state_t state_new; -	unsigned char *     child_up = NULL; -        int i = 0; -          priv = this->private;          pump_priv = priv->pump_private; -	child_up = priv->child_up; - -        assert (pump_priv); +        GF_ASSERT (pump_priv);          LOCK (&pump_priv->pump_state_lock);          { @@ -77,48 +112,6 @@ pump_change_state (xlator_t *this, pump_state_t state)                  pump_priv->pump_state = state; -                switch (pump_priv->pump_state) { -                case PUMP_STATE_RESUME: -                case PUMP_STATE_RUNNING: -                case PUMP_STATE_PAUSE: -                { -                        priv->pump_loaded = _gf_true; -                        i = 1; - -                        child_up[i] = 1; - -                        LOCK (&priv->lock); -                        { -                                priv->up_count++; -                        } -                        UNLOCK (&priv->lock); - -                        break; -                } -                case PUMP_STATE_ABORT: -                { -                        priv->pump_loaded = _gf_false; -                        i = 1; - -                        child_up[i] = 0; - -                        LOCK (&priv->lock); -                        { -                                priv->down_count++; -                        } -                        UNLOCK (&priv->lock); - -                        LOCK (&pump_priv->resume_path_lock); -                        { -                                pump_priv->number_files_pumped = 0; -                        } -                        UNLOCK (&pump_priv->resume_path_lock); - - -                        break; -                } - -                }          }          UNLOCK (&pump_priv->pump_state_lock); @@ -338,67 +331,24 @@ is_pump_traversal_allowed (xlator_t *this, const char *path)  }  static int -pump_update_file_stats (xlator_t *this, long source_blocks, -                               long sink_blocks) +pump_save_file_stats (xlator_t *this, const char *path)  {          afr_private_t  *priv        = NULL;          pump_private_t *pump_priv   = NULL; -        priv = this->private; +        priv      = this->private;          pump_priv = priv->pump_private;          LOCK (&pump_priv->resume_path_lock);          { -                pump_priv->source_blocks = source_blocks; -                pump_priv->sink_blocks   = sink_blocks; -        } -        UNLOCK (&pump_priv->resume_path_lock); - -        return 0; -} - -static int -pump_save_file_stats (xlator_t *this) -{ -        afr_private_t  *priv        = NULL; -        struct statvfs  source_buf  = {0, }; -        struct statvfs  sink_buf    = {0, }; -        loc_t loc; -        int ret = -1; - -        priv = this->private; - -        assert (priv->root_inode); - -        build_root_loc (priv->root_inode, &loc); - -        ret = syncop_statfs (PUMP_SOURCE_CHILD (this), -                             &loc, &source_buf); -        if (ret < 0) { -                gf_log (this->name, GF_LOG_DEBUG, -                        "source statfs failed"); -        } else { -                gf_log (this->name, GF_LOG_DEBUG, -                        "source statfs succeeded"); -        } +                pump_priv->number_files_pumped++; - -        ret = syncop_statfs (PUMP_SOURCE_CHILD (this), -                             &loc, &sink_buf); -        if (ret < 0) { -                gf_log (this->name, GF_LOG_DEBUG, -                        "sink statfs failed"); -        } else { -                gf_log (this->name, GF_LOG_DEBUG, -                        "sink statfs succeeded"); +                strncpy (pump_priv->current_file, path, +                         PATH_MAX);          } - -        pump_update_file_stats (this, -                                source_buf.f_blocks, -                                sink_buf.f_blocks); +        UNLOCK (&pump_priv->resume_path_lock);          return 0; -  }  static int  pump_save_path (xlator_t *this, const char *path) @@ -435,16 +385,6 @@ pump_save_path (xlator_t *this, const char *path)                          "setxattr succeeded - saved path=%s", path);                  gf_log (this->name, GF_LOG_DEBUG,                          "Saving path for status info"); - -                LOCK (&pump_priv->resume_path_lock); -                { -                        pump_priv->number_files_pumped++; - -                        strncpy (pump_priv->current_file, path, -                                 PATH_MAX); -                } -                UNLOCK (&pump_priv->resume_path_lock); -          }          dict_unref (dict); @@ -534,7 +474,7 @@ gf_pump_traverse_directory (loc_t *loc)                          if (!IS_ENTRY_CWD(entry->d_name) &&                              !IS_ENTRY_PARENT (entry->d_name)) {                                  pump_save_path (this, entry_loc.path); -                                pump_save_file_stats (this); +                                pump_save_file_stats (this, entry_loc.path);                          }                          ret = pump_check_and_update_status (this); @@ -726,19 +666,16 @@ pump_task_completion (int ret, void *data)  }  int -pump_start (call_frame_t *frame, xlator_t *this) +pump_start (call_frame_t *pump_frame, xlator_t *this)  {  	afr_private_t *priv = NULL;  	pump_private_t *pump_priv = NULL; -        call_frame_t *pump_frame = NULL;  	int ret = -1;  	priv = this->private;          pump_priv = priv->pump_private; -        pump_frame = copy_frame (frame); -          if (!pump_frame->root->lk_owner)                  pump_frame->root->lk_owner = PUMP_LK_OWNER; @@ -782,6 +719,212 @@ is_pump_loaded (xlator_t *this)  } +static int +pump_start_synctask (xlator_t *this) +{ +        call_frame_t *frame = NULL; +        int ret = 0; + +        frame = create_frame (this, this->ctx->pool); +        if (!frame) { +                gf_log (this->name, GF_LOG_ERROR, +                        "Out of memory"); +                ret = -1; +                goto out; +        } + +        pump_change_state (this, PUMP_STATE_RUNNING); + +        ret = pump_start (frame, this); + +out: +        return ret; +} + +int32_t +pump_cmd_start_setxattr_cbk (call_frame_t *frame, +                             void *cookie, +                             xlator_t *this, +                             int32_t op_ret, +                             int32_t op_errno) + +{ +        afr_local_t *local = NULL; +        int ret = 0; + +        local = frame->local; + +        if (op_ret < 0) { +                gf_log (this->name, GF_LOG_ERROR, +                        "Could not initiate destination " +                        "brick connect"); +                ret = op_ret; +                goto out; +        } + +        gf_log (this->name, GF_LOG_DEBUG, +                "Successfully initiated destination " +                "brick connect"); + +        pump_mark_start_pending (this); + +out: +        local->op_ret = ret; +        pump_command_reply (frame, this); + +        return 0; +} + +static int +pump_initiate_sink_connect (call_frame_t *frame, xlator_t *this) +{ +        afr_local_t   *local     = NULL; +        afr_private_t *priv      = NULL; +        dict_t        *dict      = NULL; +        char          *dst_brick = NULL; +        loc_t loc; + +        int ret = 0; + +        priv  = this->private; +        local = frame->local; + +        GF_ASSERT (priv->root_inode); + +        build_root_loc (priv->root_inode, &loc); + +        ret = dict_get_str (local->dict, PUMP_CMD_START, &dst_brick); +        if (ret < 0) { +                gf_log (this->name, GF_LOG_ERROR, +                        "Could not get destination brick value"); +                goto out; +        } + +        dict = dict_new (); +        if (!dict) { +                gf_log (this->name, GF_LOG_ERROR, +                        "Out of memory"); +                ret = -1; +                goto out; +        } + +        GF_ASSERT (dst_brick); +        gf_log (this->name, GF_LOG_DEBUG, +                "Got destination brick as %s", dst_brick); + +        ret = dict_set_str (dict, CLIENT_CMD_CONNECT, dst_brick); +        if (ret < 0) { +                gf_log (this->name, GF_LOG_ERROR, +                        "Could not inititiate destination brick " +                        "connect"); +                goto out; +        } + +	STACK_WIND (frame, +		    pump_cmd_start_setxattr_cbk, +		    PUMP_SINK_CHILD(this), +		    PUMP_SINK_CHILD(this)->fops->setxattr, +		    &loc, +		    dict, +		    0); + +        ret = 0; + +        dict_unref (dict); +out: +        return ret; +} + +int32_t +pump_cmd_abort_setxattr_cbk (call_frame_t *frame, +                             void *cookie, +                             xlator_t *this, +                             int32_t op_ret, +                             int32_t op_errno) + +{ +        afr_local_t *local = NULL; +        int ret = 0; + +        local = frame->local; + +        if (op_ret < 0) { +                gf_log (this->name, GF_LOG_ERROR, +                        "Could not initiate destination " +                        "brick disconnect"); +                ret = op_ret; +                goto out; +        } + +        gf_log (this->name, GF_LOG_DEBUG, +                "Successfully initiated destination " +                "brick disconnect"); +        ret = 0; + +out: +        local->op_ret = ret; +        pump_command_reply (frame, this); +        return 0; +} + +static int +pump_initiate_sink_disconnect (call_frame_t *frame, xlator_t *this) +{ +        afr_local_t   *local     = NULL; +        afr_private_t *priv      = NULL; +        dict_t        *dict      = NULL; +        loc_t loc; + +        int ret = 0; + +        priv  = this->private; +        local = frame->local; + +        GF_ASSERT (priv->root_inode); + +        build_root_loc (priv->root_inode, &loc); + +        dict = dict_new (); +        if (!dict) { +                gf_log (this->name, GF_LOG_ERROR, +                        "Out of memory"); +                ret = -1; +                goto out; +        } + +        ret = dict_set_str (dict, CLIENT_CMD_DISCONNECT, "jargon"); +        if (ret < 0) { +                gf_log (this->name, GF_LOG_ERROR, +                        "Could not inititiate destination brick " +                        "disconnect"); +                goto out; +        } + +	STACK_WIND (frame, +		    pump_cmd_abort_setxattr_cbk, +		    PUMP_SINK_CHILD(this), +		    PUMP_SINK_CHILD(this)->fops->setxattr, +		    &loc, +		    dict, +		    0); + +        ret = 0; + +        dict_unref (dict); +out: +        return ret; +} + +static int +is_pump_aborted (xlator_t *this) +{ +        pump_state_t state; + +        state = pump_get_state (); + +        return ((state == PUMP_STATE_ABORT)); +} +  int32_t  pump_cmd_start_getxattr_cbk (call_frame_t *frame,                               void *cookie, @@ -795,14 +938,17 @@ pump_cmd_start_getxattr_cbk (call_frame_t *frame,          pump_state_t state;          int ret = 0; +        int need_unwind = 0;          int dict_ret = -1;          local = frame->local;          if (op_ret < 0) {                  gf_log (this->name, GF_LOG_DEBUG, -                        "getxattr failed - changing pump state to RUNNING with '/'"); +                        "getxattr failed - changing pump " +                        "state to RUNNING with '/'");                  path = "/"; +                ret = op_ret;          } else {                  gf_log (this->name, GF_LOG_TRACE,                          "getxattr succeeded"); @@ -822,13 +968,22 @@ pump_cmd_start_getxattr_cbk (call_frame_t *frame,          }          pump_set_resume_path (this, path); -        pump_change_state (this, PUMP_STATE_RUNNING); -        ret = pump_start (frame, this); +        if (is_pump_aborted (this)) +                /* We're re-starting pump afresh */ +                ret = pump_initiate_sink_connect (frame, this); +        else { +                /* We're re-starting pump from a previous +                   pause */ +                ret = pump_start_synctask (this); +                need_unwind = 1; +        }  out: -        local->op_ret = ret; -        pump_command_reply (frame, this); +        if ((ret < 0) || (need_unwind == 1)) { +                local->op_ret = ret; +                pump_command_reply (frame, this); +        }  	return 0;  } @@ -924,13 +1079,14 @@ pump_execute_start (call_frame_t *frame, xlator_t *this)          local = frame->local;          if (!priv->root_inode) { -                gf_log (this->name, GF_LOG_NORMAL, -                        "Pump xlator cannot be started without an initial lookup"); +                gf_log (this->name, GF_LOG_ERROR, +                        "Pump xlator cannot be started without an initial " +                        "lookup");                  ret = -1;                  goto out;          } -        assert (priv->root_inode); +        GF_ASSERT (priv->root_inode);          build_root_loc (priv->root_inode, &loc); @@ -960,6 +1116,7 @@ pump_cmd_abort_removexattr_cbk (call_frame_t *frame,                                  int32_t op_errno)  {          afr_local_t *local = NULL; +        int ret = 0;          local = frame->local; @@ -967,16 +1124,23 @@ pump_cmd_abort_removexattr_cbk (call_frame_t *frame,                  gf_log (this->name, GF_LOG_ERROR,                          "Aborting pump failed. Please remove xattr"                          PUMP_PATH "of the source child's '/'"); -                local->op_ret = -1; -        } else { -                gf_log (this->name, GF_LOG_DEBUG, -                "remove xattr succeeded"); -                local->op_ret = 0; +                ret = op_ret; +                goto out;          } +        gf_log (this->name, GF_LOG_DEBUG, +                "remove xattr succeeded"); + +          pump_change_state (this, PUMP_STATE_ABORT); +        ret = pump_initiate_sink_disconnect (frame, this); + +out: +        if (ret < 0) { +                local->op_ret = ret; +                pump_command_reply (frame, this); +        } -        pump_command_reply (frame, this);  	return 0;  } @@ -1000,7 +1164,7 @@ pump_execute_abort (call_frame_t *frame, xlator_t *this)                  goto out;          } -        assert (priv->root_inode); +        GF_ASSERT (priv->root_inode);          build_root_loc (priv->root_inode, &root_loc); @@ -1446,6 +1610,8 @@ pump_command_reply (call_frame_t *frame, xlator_t *this)                  gf_log (this->name, GF_LOG_NORMAL,                          "Command succeeded"); +        dict_unref (local->dict); +          AFR_STACK_UNWIND (setxattr,                            frame,                            local->op_ret, @@ -1463,14 +1629,17 @@ pump_parse_command (call_frame_t *frame, xlator_t *this,          if (pump_command_start (this, dict)) {                  frame->local = local; +                local->dict = dict_ref (dict);                  ret = pump_execute_start (frame, this);          } else if (pump_command_pause (this, dict)) {                  frame->local = local; +                local->dict = dict_ref (dict);                  ret = pump_execute_pause (frame, this);          } else if (pump_command_abort (this, dict)) {                  frame->local = local; +                local->dict = dict_ref (dict);                  ret = pump_execute_abort (frame, this);          }          return ret; @@ -1566,19 +1735,47 @@ mem_acct_init (xlator_t *this)          return ret;  } +static int +is_xlator_pump_sink (xlator_t *child) +{ +        return (child == PUMP_SINK_CHILD(THIS)); +} + +static int +is_xlator_pump_source (xlator_t *child) +{ +        return (child == PUMP_SOURCE_CHILD(THIS)); +} +  int32_t  notify (xlator_t *this, int32_t event,  	void *data, ...)  {          int ret = -1; +        xlator_t *child_xl = NULL; + +        child_xl = (xlator_t *) data; + +        ret = afr_notify (this, event, data);  	switch (event) {  	case GF_EVENT_CHILD_DOWN: -                pump_change_state (this, PUMP_STATE_ABORT); +                if (is_xlator_pump_source (child_xl)) +                        pump_change_state (this, PUMP_STATE_ABORT);                  break; -        } -        ret = afr_notify (this, event, data); +        case GF_EVENT_CHILD_UP: +                if (is_xlator_pump_sink (child_xl)) +                        if (is_pump_start_pending (this)) { +                                ret = pump_start_synctask (this); +                                if (ret < 0) +                                        gf_log (this->name, GF_LOG_DEBUG, +                                                "Could not start pump " +                                                "synctask"); +                                else +                                        pump_remove_start_pending (this); +                        } +        }          return ret;  } diff --git a/xlators/cluster/afr/src/pump.h b/xlators/cluster/afr/src/pump.h index 15799002b18..e786fb0ded0 100644 --- a/xlators/cluster/afr/src/pump.h +++ b/xlators/cluster/afr/src/pump.h @@ -22,6 +22,10 @@  #include "syncop.h" +/* FIXME: Needs to be defined in a common file */ +#define CLIENT_CMD_CONNECT "trusted.glusterfs.client-connect" +#define CLIENT_CMD_DISCONNECT "trusted.glusterfs.client-disconnect" +  #define PUMP_PID 696969  #define PUMP_LK_OWNER 696969 @@ -43,23 +47,23 @@  #define PUMP_SINK_CHILD(xl) (xl->children->next->xlator)  typedef enum { -        PUMP_STATE_RUNNING, -        PUMP_STATE_RESUME, -        PUMP_STATE_PAUSE, -        PUMP_STATE_ABORT, +        PUMP_STATE_RUNNING,             /* Pump is running and migrating files */ +        PUMP_STATE_RESUME,              /* Pump is resuming from a previous pause */ +        PUMP_STATE_PAUSE,               /* Pump is paused */ +        PUMP_STATE_ABORT,               /* Pump is aborted */  } pump_state_t;  typedef struct _pump_private { -	struct syncenv *env; -        const char *resume_path; -        gf_lock_t resume_path_lock; -        gf_lock_t pump_state_lock; -        pump_state_t pump_state; -        long source_blocks; -        long sink_blocks; -        char current_file[PATH_MAX]; -        uint64_t number_files_pumped; -        gf_boolean_t pump_finished; +	struct syncenv *env;            /* The env pointer to the pump synctask */ +        const char *resume_path;        /* path to resume from the last pause */ +        gf_lock_t resume_path_lock;     /* Synchronize resume_path changes */ +        gf_lock_t pump_state_lock;      /* Synchronize pump_state changes */ +        pump_state_t pump_state;        /* State of pump */ +        char current_file[PATH_MAX];    /* Current file being pumped */ +        uint64_t number_files_pumped;   /* Number of files pumped */ +        gf_boolean_t pump_finished;     /* Boolean to indicate pump termination */ +        char pump_start_pending;        /* Boolean to mark start pending until +                                           CHILD_UP */  } pump_private_t;  void  | 
