diff options
| author | Csaba Henk <csaba@redhat.com> | 2018-08-09 11:46:33 +0200 | 
|---|---|---|
| committer | Amar Tumballi <amarts@redhat.com> | 2018-11-06 04:21:57 +0000 | 
| commit | bceb9f25671e65cb2f0987a84370055e7c36900f (patch) | |
| tree | 3ed0a55bae57f2d4aae643a70d8a44c41bf77d6c /xlators/mount | |
| parent | 258db7178663305c26aa0068d4f72159ff0bc3ba (diff) | |
fuse: interrupt handling framework
- add sub-framework to send timed responses to kernel
- add interrupt handler queue
- implement INTERRUPT
fuse_interrupt looks up handlers for interrupted messages
in the queue. If found, it invokes the handler function.
Else responds with EAGAIN with a delay.
See spec at
https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/tree/Documentation/filesystems/fuse.txt?h=v4.17#n148
and explanation in comments.
Change-Id: I1a79d3679b31f36e14b4ac8f60b7f2c1ea2badfb
updates: #465
Signed-off-by: Csaba Henk <csaba@redhat.com>
Diffstat (limited to 'xlators/mount')
| -rw-r--r-- | xlators/mount/fuse/src/fuse-bridge.c | 472 | ||||
| -rw-r--r-- | xlators/mount/fuse/src/fuse-bridge.h | 39 | ||||
| -rw-r--r-- | xlators/mount/fuse/src/fuse-mem-types.h | 2 | 
3 files changed, 512 insertions, 1 deletions
diff --git a/xlators/mount/fuse/src/fuse-bridge.c b/xlators/mount/fuse/src/fuse-bridge.c index 75b91a43fb1..e4e894e6c9b 100644 --- a/xlators/mount/fuse/src/fuse-bridge.c +++ b/xlators/mount/fuse/src/fuse-bridge.c @@ -15,6 +15,7 @@  #include "compat-errno.h"  #include "glusterfs-acl.h"  #include "syscall.h" +#include "timespec.h"  #ifdef __NetBSD__  #undef open /* in perfuse.h, pulled from mount-gluster-compat.h */ @@ -446,6 +447,362 @@ fuse_invalidate_inode(xlator_t *this, uint64_t fuse_ino)  #endif  } +static fuse_timed_message_t * +fuse_timed_message_new(void) +{ +    fuse_timed_message_t *dmsg = NULL; + +    dmsg = GF_MALLOC(sizeof(*dmsg), gf_fuse_mt_timed_message_t); +    if (!dmsg) { +        return NULL; +    } + +    /* should be NULL if not set */ +    dmsg->fuse_message_body = NULL; +    INIT_LIST_HEAD(&dmsg->next); + +    return dmsg; +} + +static void +fuse_timed_message_free(fuse_timed_message_t *dmsg) +{ +    GF_FREE(dmsg->fuse_message_body); +    GF_FREE(dmsg); +} + +static void +send_fuse_timed(xlator_t *this, fuse_timed_message_t *dmsg) +{ +    fuse_private_t *priv = NULL; + +    priv = this->private; + +    if (!priv->timed_response_fuse_thread_started) { +        return; +    } + +    pthread_mutex_lock(&priv->timed_mutex); +    { +        list_add_tail(&dmsg->next, &priv->timed_list); +        pthread_cond_signal(&priv->timed_cond); +    } +    pthread_mutex_unlock(&priv->timed_mutex); +} + +fuse_interrupt_record_t * +fuse_interrupt_record_new(fuse_in_header_t *finh, +                          fuse_interrupt_handler_t handler) +{ +    fuse_interrupt_record_t *fir = NULL; + +    fir = GF_MALLOC(sizeof(*fir), gf_fuse_mt_interrupt_record_t); +    if (!fir) { +        return NULL; +    } + +    fir->hit = _gf_false; +    fir->interrupt_state = INTERRUPT_NONE; +    fir->data = NULL; + +    fir->interrupt_handler = handler; +    memcpy(&fir->fuse_in_header, finh, sizeof(*finh)); +    pthread_cond_init(&fir->handler_cond, NULL); +    pthread_mutex_init(&fir->handler_mutex, NULL); +    INIT_LIST_HEAD(&fir->next); + +    return fir; +} + +static void +fuse_interrupt_record_free(fuse_interrupt_record_t *fir, void **datap) +{ +    /* +     * If caller wishes, we give back the private data to let them deal with it +     * however they want; otherwise we take care of freeing it. +     */ +    if (datap) { +        *datap = fir->data; +    } else { +        GF_FREE(fir->data); +    } + +    GF_FREE(fir); +} + +void +fuse_interrupt_record_insert(xlator_t *this, fuse_interrupt_record_t *fir) +{ +    fuse_private_t *priv = NULL; + +    priv = this->private; +    pthread_mutex_lock(&priv->interrupt_mutex); +    { +        list_add_tail(&fir->next, &priv->interrupt_list); +    } +    pthread_mutex_unlock(&priv->interrupt_mutex); +} + +static fuse_interrupt_record_t * +fuse_interrupt_record_fetch(xlator_t *this, uint64_t unique, gf_boolean_t reap) +{ +    fuse_interrupt_record_t *fir = NULL; +    gf_boolean_t found = _gf_false; +    fuse_private_t *priv = NULL; + +    priv = this->private; +    pthread_mutex_lock(&priv->interrupt_mutex); +    { +        list_for_each_entry(fir, &priv->interrupt_list, next) +        { +            if (fir->fuse_in_header.unique == unique) { +                /* +                 * If we are to reap, we do it regardless the +                 * hit flag; otherwise we take the record only +                 * hasn't yet flagged hit. +                 */ +                if (reap || !fir->hit) { +                    found = _gf_true; +                } +                /* +                 * If we are not reaping (coming from handler +                 * context), we set the hit flag. +                 */ +                if (!reap) { +                    fir->hit = _gf_true; +                } +                break; +            } +        } +        if (found && reap) { +            list_del(&fir->next); +        } +    } +    pthread_mutex_unlock(&priv->interrupt_mutex); + +    if (found) { +        return fir; +    } +    return NULL; +} + +static fuse_interrupt_record_t * +fuse_interrupt_record_get(xlator_t *this, uint64_t unique) +{ +    return fuse_interrupt_record_fetch(this, unique, _gf_false); +} + +static fuse_interrupt_record_t * +fuse_interrupt_record_reap(xlator_t *this, uint64_t unique) +{ +    return fuse_interrupt_record_fetch(this, unique, _gf_true); +} + +static void +fuse_interrupt(xlator_t *this, fuse_in_header_t *finh, void *msg, +               struct iobuf *iobuf) +{ +    struct fuse_interrupt_in *fii = msg; +    fuse_interrupt_record_t *fir = NULL; + +    gf_log("glusterfs-fuse", GF_LOG_TRACE, +           "unique %" PRIu64 " INTERRUPT for %" PRIu64, finh->unique, +           fii->unique); + +    fir = fuse_interrupt_record_get(this, fii->unique); +    if (fir) { +        gf_log("glusterfs-fuse", GF_LOG_DEBUG, +               "unique %" PRIu64 " INTERRUPT for %" PRIu64 +               ": handler triggered", +               finh->unique, fii->unique); + +        fir->interrupt_handler(this, fir); +    } else { +        fuse_timed_message_t *dmsg = NULL; + +        /* +         * No record found for this interrupt request. +         * +         * It's either because the handler for the interrupted message +         * does not want to handle interrupt, or this interrupt +         * message beat the interrupted which hasn't yet added a record +         * to the interrupt queue. Either case we reply with error +         * EAGAIN with some (0.01 sec) delay. That will have this +         * interrupt request resent, unless the interrupted message +         * has been already answered. +         * +         * So effectively we are looping in between kernel and +         * userspace, which will be exited either when the interrupted +         * message handler has added an interrupt record, or has +         * replied to kernel. See +         * +         * https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/ +         * linux.git/tree/Documentation/filesystems/fuse.txt?h=v4.18#n148 +         */ + +        gf_log("glusterfs-fuse", GF_LOG_DEBUG, +               "unique %" PRIu64 " INTERRUPT for %" PRIu64 ": no handler found", +               finh->unique, fii->unique); + +        dmsg = fuse_timed_message_new(); +        if (!dmsg) { +            gf_log("glusterfs-fuse", GF_LOG_ERROR, +                   "unique %" PRIu64 " INTERRUPT for %" PRIu64 +                   ":" +                   " failed to allocate timed message", +                   finh->unique, fii->unique); + +            return; +        } + +        dmsg->fuse_out_header.unique = finh->unique; +        dmsg->fuse_out_header.len = sizeof(dmsg->fuse_out_header); +        dmsg->fuse_out_header.error = -EAGAIN; +        timespec_now(&dmsg->scheduled_ts); +        timespec_adjust_delta(&dmsg->scheduled_ts, +                              (struct timespec){0, 10000000}); + +        send_fuse_timed(this, dmsg); +    } +} + +/* + * Function to be called in fop cbk context (if the fop engages + * with interrupt handling). + */ +gf_boolean_t +fuse_interrupt_finish_fop(call_frame_t *frame, xlator_t *this, +                          gf_boolean_t sync, void **datap) +{ +    fuse_interrupt_record_t *fir = NULL; +    fuse_state_t *state = frame->root->state; +    fuse_in_header_t *finh = state->finh; +    gf_boolean_t hit = _gf_false; +    gf_boolean_t handled = _gf_false; +    fuse_interrupt_state_t intstat_orig = INTERRUPT_NONE; + +    fir = fuse_interrupt_record_reap(this, finh->unique); +    if (!fir) { +        /* +         * No interrupt record was inserted (however, caller would usually know +         * about that and there is no point then in calling this function). +         */ +        return _gf_false; +    } + +    /* +     * The interrupt handler (if finds the record) modifies fir->hit; however, +     * that could have occurred only before fuse_interrupt_record_reap(), so +     * we are safe here with a lock-free access. +     */ +    hit = fir->hit; +    if (hit) { +        pthread_mutex_lock(&fir->handler_mutex); +        { +            intstat_orig = fir->interrupt_state; +            if (fir->interrupt_state == INTERRUPT_NONE) { +                fir->interrupt_state = INTERRUPT_SQUELCHED; +                if (sync) { +                    while (fir->interrupt_state == INTERRUPT_NONE) { +                        pthread_cond_wait(&fir->handler_cond, +                                          &fir->handler_mutex); +                    } +                } +            } +        } +        pthread_mutex_unlock(&fir->handler_mutex); +    } + +    gf_log("glusterfs-fuse", GF_LOG_DEBUG, "intstat_orig=%d", intstat_orig); + +    /* +     * From this on fir can only be referred under the conditions that imply +     * we are to free it (otherwise interrupt handler might have already freed +     * it). +     */ + +    if (/* there was no interrupt */ +        !hit || +        /* lost the race against interrupt handler */ +        intstat_orig != INTERRUPT_NONE || +        /* we took cleaning up on us */ +        sync) { +        /* cleaning up */ +        fuse_interrupt_record_free(fir, datap); +    } else if (datap) { +        *datap = NULL; +    } + +    handled = (intstat_orig == INTERRUPT_HANDLED); +    if (handled) { +        /* +         * Fuse request was answered already from interrupt context, we can do +         * away with the stack. +         */ +        free_fuse_state(state); +        STACK_DESTROY(frame->root); +    } + +    /* +     * Let caller know if they have to answer the fuse request. +     */ +    return handled; +} + +/* + * Function to be called in interrupt handler context. + */ +void +fuse_interrupt_finish_interrupt(xlator_t *this, fuse_interrupt_record_t *fir, +                                fuse_interrupt_state_t intstat, +                                gf_boolean_t sync, void **datap) +{ +    fuse_in_header_t finh = { +        0, +    }; +    fuse_interrupt_state_t intstat_orig = INTERRUPT_NONE; + +    pthread_mutex_lock(&fir->handler_mutex); +    { +        intstat_orig = fir->interrupt_state; +        if (fir->interrupt_state == INTERRUPT_NONE) { +            fir->interrupt_state = intstat; +            if (sync) { +                pthread_cond_signal(&fir->handler_cond); +            } +        } +        finh = fir->fuse_in_header; +    } +    pthread_mutex_unlock(&fir->handler_mutex); + +    gf_log("glusterfs-fuse", GF_LOG_DEBUG, "intstat_orig=%d", intstat_orig); + +    /* +     * From this on fir can only be referred under the conditions that imply +     * we are to free it (otherwise fop handler might have already freed it). +     */ + +    if (/* we won the race, response is up to us */ +        intstat_orig == INTERRUPT_NONE && +        /* interrupt handling was successful, let the kernel know */ +        intstat == INTERRUPT_HANDLED) { +        send_fuse_err(this, &finh, EINTR); +    } + +    if (/* lost the race ... */ +        intstat_orig != INTERRUPT_NONE && +        /* +         * ... and there is no contract with fop handler that it does the +         * cleanup ... +         */ +        !sync) { +        /* ... so we do! */ +        fuse_interrupt_record_free(fir, datap); +    } else if (datap) { +        *datap = NULL; +    } +} +  int  send_fuse_err(xlator_t *this, fuse_in_header_t *finh, int error)  { @@ -4196,6 +4553,97 @@ notify_kernel_loop(void *data)  }  #endif +static void * +timed_response_loop(void *data) +{ +    ssize_t rv = 0; +    size_t len = 0; +    xlator_t *this = NULL; +    fuse_private_t *priv = NULL; +    fuse_timed_message_t *dmsg = NULL; +    fuse_timed_message_t *tmp = NULL; +    struct timespec now = { +        0, +    }; +    struct timespec delta = { +        0, +    }; +    struct iovec iovs[2] = { +        { +            0, +        }, +    }; + +    this = data; +    priv = this->private; + +    for (;;) { +        pthread_mutex_lock(&priv->timed_mutex); +        { +            while (list_empty(&priv->timed_list)) { +                pthread_cond_wait(&priv->timed_cond, &priv->timed_mutex); +            } + +            dmsg = list_entry(priv->timed_list.next, fuse_timed_message_t, +                              next); +            list_for_each_entry(tmp, &priv->timed_list, next) +            { +                if (timespec_cmp(&tmp->scheduled_ts, &dmsg->scheduled_ts) < 0) { +                    dmsg = tmp; +                } +            } + +            list_del_init(&dmsg->next); +        } +        pthread_mutex_unlock(&priv->timed_mutex); + +        timespec_now(&now); +        if (timespec_cmp(&now, &dmsg->scheduled_ts) < 0) { +            timespec_sub(&now, &dmsg->scheduled_ts, &delta); +            nanosleep(&delta, NULL); +        } + +        gf_log("glusterfs-fuse", GF_LOG_TRACE, +               "sending timed " +               "message of unique %" PRIu64, +               dmsg->fuse_out_header.unique); + +        len = dmsg->fuse_out_header.len; +        iovs[0] = (struct iovec){&dmsg->fuse_out_header, +                                 sizeof(struct fuse_out_header)}; +        iovs[1] = (struct iovec){dmsg->fuse_message_body, +                                 len - sizeof(struct fuse_out_header)}; +        rv = sys_writev(priv->fd, iovs, 2); +        check_and_dump_fuse_W(priv, iovs, 2, rv); + +        fuse_timed_message_free(dmsg); + +        if (rv == -1 && errno == EBADF) { +            break; +        } + +        if (rv != len && !(rv == -1 && errno == ENOENT)) { +            gf_log("glusterfs-fuse", GF_LOG_INFO, +                   "len: %zu, rv: %zd, errno: %d", len, rv, errno); +        } +    } + +    gf_log("glusterfs-fuse", GF_LOG_ERROR, "timed response loop terminated"); + +    pthread_mutex_lock(&priv->timed_mutex); +    { +        priv->timed_response_fuse_thread_started = _gf_false; +        list_for_each_entry_safe(dmsg, tmp, &priv->timed_list, next) +        { +            list_del_init(&dmsg->next); +            fuse_timed_message_free(dmsg); +        } +    } +    pthread_mutex_unlock(&priv->timed_mutex); + +    return NULL; +} +  static void  fuse_init(xlator_t *this, fuse_in_header_t *finh, void *msg,            struct iobuf *iobuf) @@ -4210,6 +4658,7 @@ fuse_init(xlator_t *this, fuse_in_header_t *finh, void *msg,  #if FUSE_KERNEL_MINOR_VERSION >= 9      pthread_t messenger;  #endif +    pthread_t delayer;      priv = this->private; @@ -4257,6 +4706,18 @@ fuse_init(xlator_t *this, fuse_in_header_t *finh, void *msg,          fino.flags |= FUSE_BIG_WRITES;      } +    /* Start the thread processing timed responses */ +    ret = gf_thread_create(&delayer, NULL, timed_response_loop, this, +                           "fusedlyd"); +    if (ret != 0) { +        gf_log("glusterfs-fuse", GF_LOG_ERROR, +               "failed to start timed response thread (%s)", strerror(errno)); + +        sys_close(priv->fd); +        goto out; +    } +    priv->timed_response_fuse_thread_started = _gf_true; +      /* Used for 'reverse invalidation of inode' */      if (fini->minor >= 12) {          ret = gf_thread_create(&messenger, NULL, notify_kernel_loop, this, @@ -5300,6 +5761,8 @@ fuse_priv_dump(xlator_t *this)      gf_proc_dump_write("init_recvd", "%d", (int)private->init_recvd);      gf_proc_dump_write("strict_volfile_check", "%d",                         (int)private->strict_volfile_check); +    gf_proc_dump_write("timed_response_thread_started", "%d", +                       (int)private->timed_response_fuse_thread_started);      gf_proc_dump_write("reverse_thread_started", "%d",                         (int)private->reverse_fuse_thread_started);      gf_proc_dump_write("use_readdirp", "%d", private->use_readdirp); @@ -5552,7 +6015,7 @@ static fuse_handler_t *fuse_std_ops[FUSE_OP_HIGH] = {      [FUSE_SETLKW] = fuse_setlk,      [FUSE_ACCESS] = fuse_access,      [FUSE_CREATE] = fuse_create, -    /* [FUSE_INTERRUPT] */ +    [FUSE_INTERRUPT] = fuse_interrupt,      /* [FUSE_BMAP] */      [FUSE_DESTROY] = fuse_destroy,  /* [FUSE_IOCTL] */ @@ -5679,6 +6142,13 @@ init(xlator_t *this_xl)      pthread_cond_init(&priv->invalidate_cond, NULL);      pthread_mutex_init(&priv->invalidate_mutex, NULL); +    INIT_LIST_HEAD(&priv->timed_list); +    pthread_cond_init(&priv->timed_cond, NULL); +    pthread_mutex_init(&priv->timed_mutex, NULL); + +    INIT_LIST_HEAD(&priv->interrupt_list); +    pthread_mutex_init(&priv->interrupt_mutex, NULL); +      /* get options from option dictionary */      ret = dict_get_str(options, ZR_MOUNTPOINT_OPT, &value_string);      if (ret == -1 || value_string == NULL) { diff --git a/xlators/mount/fuse/src/fuse-bridge.h b/xlators/mount/fuse/src/fuse-bridge.h index 318f33b5d61..b391af76bac 100644 --- a/xlators/mount/fuse/src/fuse-bridge.h +++ b/xlators/mount/fuse/src/fuse-bridge.h @@ -151,6 +151,16 @@ struct fuse_private {      /* Writeback cache support */      gf_boolean_t kernel_writeback_cache;      int attr_times_granularity; + +    /* Delayed fuse response */ +    struct list_head timed_list; +    pthread_cond_t timed_cond; +    pthread_mutex_t timed_mutex; +    gf_boolean_t timed_response_fuse_thread_started; + +    /* Interrupt subscription */ +    struct list_head interrupt_list; +    pthread_mutex_t interrupt_mutex;  };  typedef struct fuse_private fuse_private_t; @@ -165,6 +175,35 @@ struct fuse_invalidate_node {  };  typedef struct fuse_invalidate_node fuse_invalidate_node_t; +struct fuse_timed_message { +    struct fuse_out_header fuse_out_header; +    void *fuse_message_body; +    struct timespec scheduled_ts; +    struct list_head next; +}; +typedef struct fuse_timed_message fuse_timed_message_t; + +enum fuse_interrupt_state { +    INTERRUPT_NONE, +    INTERRUPT_SQUELCHED, +    INTERRUPT_HANDLED, +}; +typedef enum fuse_interrupt_state fuse_interrupt_state_t; +struct fuse_interrupt_record; +typedef struct fuse_interrupt_record fuse_interrupt_record_t; +typedef void (*fuse_interrupt_handler_t)(xlator_t *this, +                                         fuse_interrupt_record_t *); +struct fuse_interrupt_record { +    struct fuse_in_header fuse_in_header; +    void *data; +    gf_boolean_t hit; +    fuse_interrupt_state_t interrupt_state; +    fuse_interrupt_handler_t interrupt_handler; +    pthread_cond_t handler_cond; +    pthread_mutex_t handler_mutex; +    struct list_head next; +}; +  struct fuse_graph_switch_args {      xlator_t *this;      xlator_t *old_subvol; diff --git a/xlators/mount/fuse/src/fuse-mem-types.h b/xlators/mount/fuse/src/fuse-mem-types.h index 5e6ab9308e0..c3c0028473a 100644 --- a/xlators/mount/fuse/src/fuse-mem-types.h +++ b/xlators/mount/fuse/src/fuse-mem-types.h @@ -24,6 +24,8 @@ enum gf_fuse_mem_types_ {      gf_fuse_mt_gids_t,      gf_fuse_mt_invalidate_node_t,      gf_fuse_mt_pthread_t, +    gf_fuse_mt_timed_message_t, +    gf_fuse_mt_interrupt_record_t,      gf_fuse_mt_end  };  #endif  | 
