diff options
| -rw-r--r-- | xlators/cluster/ec/src/ec-common.h | 1 | ||||
| -rw-r--r-- | xlators/cluster/ec/src/ec-data.c | 2 | ||||
| -rw-r--r-- | xlators/cluster/ec/src/ec-data.h | 1 | ||||
| -rw-r--r-- | xlators/cluster/ec/src/ec-heal.c | 109 | ||||
| -rw-r--r-- | xlators/cluster/ec/src/ec.c | 2 | ||||
| -rw-r--r-- | xlators/cluster/ec/src/ec.h | 4 | 
6 files changed, 114 insertions, 5 deletions
diff --git a/xlators/cluster/ec/src/ec-common.h b/xlators/cluster/ec/src/ec-common.h index 3334a7bfe0e..4c7fe0c820b 100644 --- a/xlators/cluster/ec/src/ec-common.h +++ b/xlators/cluster/ec/src/ec-common.h @@ -112,5 +112,6 @@ void ec_resume_parent(ec_fop_data_t * fop, int32_t error);  void ec_manager(ec_fop_data_t * fop, int32_t error);  gf_boolean_t ec_is_recoverable_error (int32_t op_errno); +void ec_handle_healers_done (ec_fop_data_t *fop);  #endif /* __EC_COMMON_H__ */ diff --git a/xlators/cluster/ec/src/ec-data.c b/xlators/cluster/ec/src/ec-data.c index 0632371bb6d..2a34f78999c 100644 --- a/xlators/cluster/ec/src/ec-data.c +++ b/xlators/cluster/ec/src/ec-data.c @@ -135,6 +135,7 @@ ec_fop_data_t * ec_fop_data_allocate(call_frame_t * frame, xlator_t * this,      }      INIT_LIST_HEAD(&fop->cbk_list); +    INIT_LIST_HEAD(&fop->healer);      INIT_LIST_HEAD(&fop->answer_list);      INIT_LIST_HEAD(&fop->pending_list);      INIT_LIST_HEAD(&fop->locks[0].wait_list); @@ -300,6 +301,7 @@ void ec_fop_data_release(ec_fop_data_t * fop)          ec = fop->xl->private;          ec_handle_last_pending_fop_completion (fop, ¬ify); +        ec_handle_healers_done (fop);          mem_put(fop);          if (notify) {              ec_pending_fops_completed(ec); diff --git a/xlators/cluster/ec/src/ec-data.h b/xlators/cluster/ec/src/ec-data.h index 670b3b88670..135ccdf5f53 100644 --- a/xlators/cluster/ec/src/ec-data.h +++ b/xlators/cluster/ec/src/ec-data.h @@ -213,6 +213,7 @@ struct _ec_fop_data      ec_cbk_t           cbks;      void              *data;      ec_heal_t         *heal; +    struct list_head   healer;      uint64_t           user_size;      uint32_t           head; diff --git a/xlators/cluster/ec/src/ec-heal.c b/xlators/cluster/ec/src/ec-heal.c index b014df25b94..31685356db0 100644 --- a/xlators/cluster/ec/src/ec-heal.c +++ b/xlators/cluster/ec/src/ec-heal.c @@ -26,6 +26,9 @@  #include "syncop-utils.h"  #include "cluster-syncop.h" +#define EC_MAX_BACKGROUND_HEALS 8 +#define EC_MAX_HEAL_WAITERS 128 +  #define alloca0(size) ({void *__ptr; __ptr = alloca(size); memset(__ptr, 0, size); __ptr; })  #define EC_COUNT(array, max) ({int __i; int __res = 0; for (__i = 0; __i < max; __i++) if (array[__i]) __res++; __res; })  #define EC_INTERSECT(dst, src1, src2, max) ({int __i; for (__i = 0; __i < max; __i++) dst[__i] = src1[__i] && src2[__i]; }) @@ -2318,6 +2321,106 @@ ec_heal_done (int ret, call_frame_t *heal, void *opaque)          return 0;  } +ec_fop_data_t* +__ec_dequeue_heals (ec_t *ec) +{ +        ec_fop_data_t *fop = NULL; + +        if (list_empty (&ec->heal_waiting)) +                goto none; + +        if (ec->healers == EC_MAX_BACKGROUND_HEALS) +                goto none; + +        GF_ASSERT (ec->healers < EC_MAX_BACKGROUND_HEALS); +        fop = list_entry(ec->heal_waiting.next, ec_fop_data_t, healer); +        ec->heal_waiters--; +        list_del_init(&fop->healer); +        list_add(&fop->healer, &ec->healing); +        ec->healers++; +        return fop; +none: +        gf_msg_debug (ec->xl->name, 0, "Num healers: %d, Num Waiters: %d", +                      ec->healers, ec->heal_waiters); +        return NULL; +} + +void +ec_heal_fail (ec_t *ec, ec_fop_data_t *fop) +{ +        if (fop->cbks.heal) { +            fop->cbks.heal (fop->req_frame, NULL, ec->xl, -1, EIO, 0, 0, +                            0, NULL); +        } +        if (fop) +            ec_fop_data_release (fop); +} + +void +ec_launch_heal (ec_t *ec, ec_fop_data_t *fop) +{ +        int     ret = 0; + +        ret = synctask_new (ec->xl->ctx->env, ec_synctask_heal_wrap, +                            ec_heal_done, NULL, fop); +        if (ret < 0) { +                ec_heal_fail (ec, fop); +        } +} + +void +ec_handle_healers_done (ec_fop_data_t *fop) +{ +        ec_t *ec = fop->xl->private; +        ec_fop_data_t *heal_fop = NULL; + +        if (list_empty (&fop->healer)) +                return; + +        LOCK (&ec->lock); +        { +                list_del_init (&fop->healer); +                ec->healers--; +                heal_fop = __ec_dequeue_heals (ec); +        } +        UNLOCK (&ec->lock); + +        if (heal_fop) +                ec_launch_heal (ec, heal_fop); + +} + +void +ec_heal_throttle (xlator_t *this, ec_fop_data_t *fop) +{ +        gf_boolean_t can_heal = _gf_true; +        ec_t         *ec      = this->private; + +        if (fop->req_frame == NULL) { + +                LOCK (&ec->lock); +                { +                        if (ec->heal_waiters >= EC_MAX_HEAL_WAITERS) { +                                can_heal = _gf_false; +                        } else { +                                list_add_tail(&fop->healer, &ec->heal_waiting); +                                ec->heal_waiters++; +                                fop = __ec_dequeue_heals (ec); +                        } +                } +                UNLOCK (&ec->lock); +        } + +        if (can_heal) { +                if (fop) +                        ec_launch_heal (ec, fop); +        } else { +               gf_msg_debug (this->name, 0, "Max number of heals are pending, " +                             "background self-heal rejected"); +                ec_heal_fail (ec, fop); +        } +} +  void  ec_heal (call_frame_t *frame, xlator_t *this, uintptr_t target,           int32_t minimum, fop_heal_cbk_t func, void *data, loc_t *loc, @@ -2325,7 +2428,6 @@ ec_heal (call_frame_t *frame, xlator_t *this, uintptr_t target,  {      ec_cbk_t callback = { .heal = func };      ec_fop_data_t *fop = NULL; -    int ret = 0;      gf_msg_trace ("ec", 0, "EC(HEAL) %p", frame); @@ -2353,10 +2455,7 @@ ec_heal (call_frame_t *frame, xlator_t *this, uintptr_t target,      if (xdata)          fop->xdata = dict_ref(xdata); -    ret = synctask_new (this->ctx->env, ec_synctask_heal_wrap, -                        ec_heal_done, NULL, fop); -    if (ret < 0) -            goto fail; +    ec_heal_throttle (this, fop);      return;  fail:      if (fop) diff --git a/xlators/cluster/ec/src/ec.c b/xlators/cluster/ec/src/ec.c index 64ab91bf9bd..dd51630ea79 100644 --- a/xlators/cluster/ec/src/ec.c +++ b/xlators/cluster/ec/src/ec.c @@ -543,6 +543,8 @@ init (xlator_t *this)      LOCK_INIT(&ec->lock);      INIT_LIST_HEAD(&ec->pending_fops); +    INIT_LIST_HEAD(&ec->heal_waiting); +    INIT_LIST_HEAD(&ec->healing);      ec->fop_pool = mem_pool_new(ec_fop_data_t, 1024);      ec->cbk_pool = mem_pool_new(ec_cbk_data_t, 4096); diff --git a/xlators/cluster/ec/src/ec.h b/xlators/cluster/ec/src/ec.h index fdedb89ec18..7f140204ece 100644 --- a/xlators/cluster/ec/src/ec.h +++ b/xlators/cluster/ec/src/ec.h @@ -28,6 +28,8 @@  struct _ec  {      xlator_t *        xl; +    int32_t           healers; +    int32_t           heal_waiters;      int32_t           nodes;      int32_t           bits_for_nodes;      int32_t           fragments; @@ -46,6 +48,8 @@ struct _ec      gf_timer_t *      timer;      gf_boolean_t      shutdown;      struct list_head  pending_fops; +    struct list_head  heal_waiting; +    struct list_head  healing;      struct mem_pool * fop_pool;      struct mem_pool * cbk_pool;      struct mem_pool * lock_pool;  | 
