summaryrefslogtreecommitdiffstats
path: root/xlators/features/changelog/src/changelog-ev-handle.h
blob: 53119c5e209f373b43dbda96a202ee23764896b8 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
/*
   Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com>
   This file is part of GlusterFS.

   This file is licensed to you under your choice of the GNU Lesser
   General Public License, version 3 or any later version (LGPLv3 or
   later), or the GNU General Public License, version 2 (GPLv2), in all
   cases as published by the Free Software Foundation.
*/

#ifndef __CHANGELOG_EV_HANDLE_H
#define __CHANGELOG_EV_HANDLE_H

#include <glusterfs/list.h>
#include <glusterfs/xlator.h>
#include "rpc-clnt.h"

#include <glusterfs/rot-buffs.h>

struct changelog_clnt;

typedef struct changelog_rpc_clnt {
    xlator_t *this;

    gf_lock_t lock;

    gf_atomic_t ref;
    gf_boolean_t disconnected;

    unsigned int filter;
    char sock[UNIX_PATH_MAX];

    struct changelog_clnt *c_clnt; /* back pointer to list holder */

    struct rpc_clnt *rpc; /* RPC client endpoint */

    struct list_head list; /* ->pending, ->waitq, ->active */

    void (*cleanup)(struct changelog_rpc_clnt *); /* cleanup handler */
} changelog_rpc_clnt_t;

static inline void
changelog_rpc_clnt_ref(changelog_rpc_clnt_t *crpc)
{
    GF_ATOMIC_INC(crpc->ref);
}

static inline void
changelog_set_disconnect_flag(changelog_rpc_clnt_t *crpc, gf_boolean_t flag)
{
    crpc->disconnected = flag;
}

static inline int
changelog_rpc_clnt_is_disconnected(changelog_rpc_clnt_t *crpc)
{
    return (crpc->disconnected == _gf_true);
}

static inline void
changelog_rpc_clnt_unref(changelog_rpc_clnt_t *crpc)
{
    gf_boolean_t gone = _gf_false;
    uint64_t ref = 0;

    ref = GF_ATOMIC_DEC(crpc->ref);

    if (!ref && changelog_rpc_clnt_is_disconnected(crpc)) {
        list_del(&crpc->list);
        gone = _gf_true;
    }

    if (gone)
        crpc->cleanup(crpc);
}

/**
 * This structure holds pending and active clients. On probe RPC all
 * an instance of the above structure (@changelog_rpc_clnt) is placed
 * in ->pending and gets moved to ->active on a successful connect.
 *
 * locking rules:
 *
 * Manipulating ->pending
 * ->pending_lock
 *    ->pending
 *
 * Manipulating ->active
 * ->active_lock
 *    ->active
 *
 * Moving object from ->pending to ->active
 * ->pending_lock
 *   ->active_lock
 *
 * Objects are _never_ moved from ->active to ->pending, i.e., during
 * disconnection, the object is destroyed. Well, we could have tried
 * to reconnect, but that's pure waste.. let the other end reconnect.
 */

typedef struct changelog_clnt {
    xlator_t *this;

    /* pending connections */
    pthread_mutex_t pending_lock;
    pthread_cond_t pending_cond;
    struct list_head pending;

    /* current active connections */
    gf_lock_t active_lock;
    struct list_head active;

    gf_lock_t wait_lock;
    struct list_head waitq;

    /* consumer part of rot-buffs */
    rbuf_t *rbuf;
    unsigned long sequence;
} changelog_clnt_t;

void *
changelog_ev_connector(void *);

void *
changelog_ev_dispatch(void *);

/* APIs */
void
changelog_ev_queue_connection(changelog_clnt_t *, changelog_rpc_clnt_t *);

void
changelog_ev_cleanup_connections(xlator_t *, changelog_clnt_t *);

#endif