diff options
Diffstat (limited to 'xlators/protocol/server/src/server-helpers.c')
| -rw-r--r-- | xlators/protocol/server/src/server-helpers.c | 324 | 
1 files changed, 324 insertions, 0 deletions
diff --git a/xlators/protocol/server/src/server-helpers.c b/xlators/protocol/server/src/server-helpers.c index c11011abf9f..76c0036e08b 100644 --- a/xlators/protocol/server/src/server-helpers.c +++ b/xlators/protocol/server/src/server-helpers.c @@ -1092,3 +1092,327 @@ out:          return ret;  } + +int32_t +gf_barrier_transmit (server_conf_t *conf, gf_barrier_payload_t *payload) +{ +        gf_barrier_t            *barrier = NULL; +        int32_t                  ret     = -1; +        client_t                *client  = NULL; +        gf_boolean_t             lk_heal = _gf_false; +        call_frame_t            *frame   = NULL; +        server_state_t          *state   = NULL; + +        GF_VALIDATE_OR_GOTO ("barrier", conf, out); +        GF_VALIDATE_OR_GOTO ("barrier", conf->barrier, out); +        GF_VALIDATE_OR_GOTO ("barrier", payload, out); + +        barrier = conf->barrier; + +        frame = payload->frame; +        if (frame) { +                state = CALL_STATE (frame); +                frame->local = NULL; +                client = frame->root->client; +        } +        /* currently lk fops are not barrier'ed. This is reflecting code in +         * server_submit_reply */ +        if (client) +                lk_heal = ((server_conf_t *) client->this->private)->lk_heal; + +        ret = rpcsvc_submit_generic (payload->req, &payload->rsp, 1, +                                     payload->payload, payload->payload_count, +                                     payload->iobref); +        iobuf_unref (payload->iob); +        if (ret == -1) { +                gf_log_callingfn ("", GF_LOG_ERROR, "Reply submission failed"); +                if (frame && client && !lk_heal) { +                        server_connection_cleanup (frame->this, client, +                                                  INTERNAL_LOCKS | POSIX_LOCKS); +                } else { +                        /* TODO: Failure of open(dir), create, inodelk, entrylk +                           or lk fops send failure must be handled specially. */ +                } +                goto ret; +        } + +        ret = 0; +ret: +        if (state) { +                free_state (state); +        } + +        if (frame) { +                gf_client_unref (client); +                STACK_DESTROY (frame->root); +        } + +        if (payload->free_iobref) { +                iobref_unref (payload->iobref); +        } +out: +        return ret; +} + +gf_barrier_payload_t * +gf_barrier_dequeue (gf_barrier_t *barrier) +{ +        gf_barrier_payload_t  *payload = NULL; + +        if (!barrier || list_empty (&barrier->queue)) +                return NULL; + +        payload = list_entry (barrier->queue.next, +                              gf_barrier_payload_t, list); +        if (payload) { +                list_del_init (&payload->list); +                barrier->cur_size--; +        } + +        return payload; +} + + +void* +gf_barrier_dequeue_start (void *data) +{ +        server_conf_t           *conf = NULL; +        gf_barrier_t            *barrier = NULL; +        gf_barrier_payload_t    *payload = NULL; + +        conf = (server_conf_t *)data; +        if (!conf || !conf->barrier) +                return NULL; +        barrier = conf->barrier; + +        LOCK (&barrier->lock); +        { +                while (barrier->cur_size) { +                        payload = gf_barrier_dequeue (barrier); +                        if (payload) { +                                if (gf_barrier_transmit (conf, payload)) { +                                        gf_log ("server", GF_LOG_WARNING, +                                                "Failed to transmit"); +                                } +                                GF_FREE (payload); +                        } +                } +        } +        UNLOCK (&barrier->lock); +        return NULL; +} + +void +gf_barrier_timeout (void *data) +{ +        server_conf_t *conf = NULL; +        gf_barrier_t  *barrier = NULL; +        gf_boolean_t   need_dequeue = _gf_false; + +        conf = (server_conf_t *)data; +        if (!conf || !conf->barrier) +                goto out; +        barrier = conf->barrier; + +        gf_log ("", GF_LOG_INFO, "barrier timed-out"); +        LOCK (&barrier->lock); +        { +                need_dequeue = barrier->on; +                barrier->on = _gf_false; +        } +        UNLOCK (&barrier->lock); + +        if (need_dequeue == _gf_true) +                gf_barrier_dequeue_start (data); +out: +        return; +} + + +int32_t +gf_barrier_start (xlator_t *this) +{ +        server_conf_t *conf = NULL; +        gf_barrier_t  *barrier = NULL; +        int32_t        ret  = -1; +        struct timespec time = {0,}; + +        conf = this->private; + +        GF_VALIDATE_OR_GOTO ("server", this, out); +        GF_VALIDATE_OR_GOTO (this->name, conf, out); +        GF_VALIDATE_OR_GOTO (this->name, conf->barrier, out); + +        barrier = conf->barrier; + +        gf_log (this->name, GF_LOG_INFO, "barrier start called"); +        LOCK (&barrier->lock); +        { +                /* if barrier is on, reset timer */ +                if (barrier->on == _gf_true) { +                        ret = gf_timer_call_cancel (this->ctx, barrier->timer); +                        if (ret) { +                                gf_log (this->name, GF_LOG_ERROR, "Failed to " +                                        "unset timer, failing barrier start"); +                                goto unlock; +                        } +                } + +                barrier->on = _gf_true; +                time.tv_sec = barrier->time_out; +                time.tv_nsec = 0; + +                barrier->timer = gf_timer_call_after (this->ctx, time, +                                                      gf_barrier_timeout, +                                                      (void *)conf); +                if (!barrier->timer) { +                        gf_log (this->name, GF_LOG_ERROR, "Failed to set " +                                "timer, failing barrier start"); +                        barrier->on = _gf_false; +                } +        } +unlock: +        UNLOCK (&barrier->lock); + +        ret = 0; +out: +        return ret; +} + +int32_t +gf_barrier_stop (xlator_t *this) +{ +        server_conf_t *conf = NULL; +        gf_barrier_t  *barrier = NULL; +        int32_t        ret  = -1; +        gf_boolean_t   need_dequeue = _gf_false; + +        conf = this->private; + +        GF_VALIDATE_OR_GOTO ("server", this, out); +        GF_VALIDATE_OR_GOTO (this->name, conf, out); +        GF_VALIDATE_OR_GOTO (this->name, conf->barrier, out); + +        barrier = conf->barrier; + +        gf_log (this->name, GF_LOG_INFO, "barrier stop called"); +        LOCK (&barrier->lock); +        { +                need_dequeue = barrier->on; +                barrier->on = _gf_false; +        } +        UNLOCK (&barrier->lock); + +        if (need_dequeue == _gf_true) { +                gf_timer_call_cancel (this->ctx, barrier->timer); +                ret = gf_thread_create (&conf->barrier_th, NULL, +                                        gf_barrier_dequeue_start, +                                        conf); +                if (ret) { +                        gf_log (this->name, GF_LOG_CRITICAL, +                                "Failed to start un-barriering"); +                        goto out; +                } +        } +        ret = 0; +out: +        return ret; +} + +int32_t +gf_barrier_fops_configure (xlator_t *this, gf_barrier_t *barrier, char *str) +{ +        int32_t         ret = -1; +        char           *dup_str = NULL; +        char           *str_tok = NULL; +        char           *save_ptr = NULL; +        uint64_t        fops = 0; + +        /* by defaul fsync & flush needs to be barriered */ + +        fops |= 1 << GFS3_OP_FSYNC; +        fops |= 1 << GFS3_OP_FLUSH; + +        if (!str) +                goto done; + +        dup_str = gf_strdup (str); +        if (!dup_str) +                goto done; + +        str_tok = strtok_r (dup_str, ",", &save_ptr); +        if (!str_tok) +                goto done; + +        fops = 0; +        while (str_tok) { +                if (!strcmp(str_tok, "writev")) { +                        fops |= ((uint64_t)1 << GFS3_OP_WRITE); +                } else if (!strcmp(str_tok, "fsync")) { +                        fops |= ((uint64_t)1 << GFS3_OP_FSYNC); +                } else if (!strcmp(str_tok, "read")) { +                        fops |= ((uint64_t)1 << GFS3_OP_READ); +                } else if (!strcmp(str_tok, "rename")) { +                        fops |= ((uint64_t)1 << GFS3_OP_RENAME); +                } else if (!strcmp(str_tok, "flush")) { +                        fops |= ((uint64_t)1 << GFS3_OP_FLUSH); +                } else if (!strcmp(str_tok, "ftruncate")) { +                        fops |= ((uint64_t)1 << GFS3_OP_FTRUNCATE); +                } else if (!strcmp(str_tok, "fallocate")) { +                        fops |= ((uint64_t)1 << GFS3_OP_FALLOCATE); +                } else if (!strcmp(str_tok, "rmdir")) { +                        fops |= ((uint64_t)1 << GFS3_OP_RMDIR); +                }  else { +                        gf_log ("barrier", GF_LOG_ERROR, +                                "Invalid barrier fop %s", str_tok); +                } + +                str_tok = strtok_r (NULL, ",", &save_ptr); +        } +done: +        LOCK (&barrier->lock); +        { +                barrier->fops = fops; +        } +        UNLOCK (&barrier->lock); +        ret = 0; + +        GF_FREE (dup_str); +        return ret; +} + +void +gf_barrier_enqueue (gf_barrier_t *barrier, gf_barrier_payload_t *payload) +{ +        list_add_tail (&payload->list, &barrier->queue); +        barrier->cur_size++; +} + +gf_barrier_payload_t * +gf_barrier_payload (rpcsvc_request_t *req, struct iovec *rsp, +                    call_frame_t *frame, struct iovec *payload_orig, +                    int payloadcount, struct iobref *iobref, +                    struct iobuf *iob, gf_boolean_t free_iobref) +{ +        gf_barrier_payload_t *payload = NULL; + +        if (!rsp) +                return NULL; + +        payload = GF_CALLOC (1, sizeof (*payload),1); +        if (!payload) +                return NULL; + +        INIT_LIST_HEAD (&payload->list); + +        payload->req = req; +        memcpy (&payload->rsp, rsp, sizeof (struct iovec)); +        payload->frame = frame; +        payload->payload = payload_orig; +        payload->payload_count = payloadcount; +        payload->iobref = iobref; +        payload->iob = iob; +        payload->free_iobref = free_iobref; + +        return payload; +}  | 
