summaryrefslogtreecommitdiffstats
path: root/xlators/protocol
diff options
context:
space:
mode:
Diffstat (limited to 'xlators/protocol')
-rw-r--r--xlators/protocol/server/src/server-helpers.c31
-rw-r--r--xlators/protocol/server/src/server.c11
2 files changed, 25 insertions, 17 deletions
diff --git a/xlators/protocol/server/src/server-helpers.c b/xlators/protocol/server/src/server-helpers.c
index 053670765..50aae6714 100644
--- a/xlators/protocol/server/src/server-helpers.c
+++ b/xlators/protocol/server/src/server-helpers.c
@@ -1144,14 +1144,12 @@ gf_barrier_dequeue (gf_barrier_t *barrier)
if (!barrier || list_empty (&barrier->queue))
return NULL;
- LOCK (&barrier->lock);
- {
- payload = list_entry (barrier->queue.next,
- gf_barrier_payload_t, list);
+ payload = list_entry (barrier->queue.next,
+ gf_barrier_payload_t, list);
+ if (payload) {
list_del_init (&payload->list);
barrier->cur_size--;
}
- UNLOCK (&barrier->lock);
return payload;
}
@@ -1169,17 +1167,20 @@ gf_barrier_dequeue_start (void *data)
return;
barrier = conf->barrier;
-
- while (!list_empty (&barrier->queue)) {
- payload = gf_barrier_dequeue (barrier);
- if (payload) {
- if (gf_barrier_transmit (conf, payload)) {
- gf_log ("server", GF_LOG_WARNING,
- "Failed to transmit");
+ LOCK (&barrier->lock);
+ {
+ while (barrier->cur_size) {
+ payload = gf_barrier_dequeue (barrier);
+ if (payload) {
+ if (gf_barrier_transmit (conf, payload)) {
+ gf_log ("server", GF_LOG_WARNING,
+ "Failed to transmit");
+ }
+ GF_FREE (payload);
}
- GF_FREE (payload);
}
}
+ UNLOCK (&barrier->lock);
return;
}
@@ -1195,6 +1196,7 @@ gf_barrier_timeout (void *data)
goto out;
barrier = conf->barrier;
+ gf_log ("", GF_LOG_INFO, "barrier timed-out");
LOCK (&barrier->lock);
{
need_dequeue = barrier->on;
@@ -1225,6 +1227,7 @@ gf_barrier_start (xlator_t *this)
barrier = conf->barrier;
+ gf_log (this->name, GF_LOG_INFO, "barrier start called");
LOCK (&barrier->lock);
{
/* if barrier is on, reset timer */
@@ -1273,6 +1276,7 @@ gf_barrier_stop (xlator_t *this)
barrier = conf->barrier;
+ gf_log (this->name, GF_LOG_INFO, "barrier stop called");
LOCK (&barrier->lock);
{
need_dequeue = barrier->on;
@@ -1344,6 +1348,7 @@ done:
{
barrier->fops = fops;
}
+ UNLOCK (&barrier->lock);
ret = 0;
GF_FREE (dup_str);
diff --git a/xlators/protocol/server/src/server.c b/xlators/protocol/server/src/server.c
index 9a59634a9..327a4b390 100644
--- a/xlators/protocol/server/src/server.c
+++ b/xlators/protocol/server/src/server.c
@@ -139,6 +139,7 @@ server_submit_reply (call_frame_t *frame, rpcsvc_request_t *req, void *arg,
server_conf_t *conf = NULL;
gf_barrier_t *barrier = NULL;
gf_barrier_payload_t *stub = NULL;
+ gf_boolean_t barriered = _gf_false;
GF_VALIDATE_OR_GOTO ("server", req, ret);
@@ -183,7 +184,7 @@ server_submit_reply (call_frame_t *frame, rpcsvc_request_t *req, void *arg,
iob, new_iobref);
if (stub) {
gf_barrier_enqueue (barrier, stub);
- goto ret;
+ barriered = _gf_true;
} else {
gf_log ("", GF_LOG_ERROR, "Failed to "
" barrier fop %"PRIu64,
@@ -192,6 +193,8 @@ server_submit_reply (call_frame_t *frame, rpcsvc_request_t *req, void *arg,
}
}
UNLOCK (&barrier->lock);
+ if (barriered == _gf_false)
+ goto ret;
}
/* Then, submit the message for transmission. */
ret = rpcsvc_submit_generic (req, &rsp, 1, payload, payloadcount,
@@ -937,6 +940,7 @@ init (xlator_t *this)
}
LOCK_INIT (&barrier->lock);
+ barrier->on = _gf_false;
GF_OPTION_INIT ("barrier-queue-length", barrier->max_size,
int64, out);
@@ -1020,8 +1024,7 @@ notify (xlator_t *this, int32_t event, void *data, ...)
va_end (ap);
switch (event) {
- /* todo: GF_EVENT_BARRIER */
- case 100:
+ case GF_EVENT_VOLUME_BARRIER_OP:
ret = dict_get_int32 (dict, "barrier", &val);
if (ret) {
gf_log (this->name, GF_LOG_ERROR,
@@ -1040,7 +1043,7 @@ notify (xlator_t *this, int32_t event, void *data, ...)
gf_log (this->name, GF_LOG_ERROR,
"Barrier stop failed");
}
- ret = dict_set_int32 (dict, "barrier-status", ret);
+ ret = dict_set_int32 (output, "barrier-status", ret);
if (ret)
gf_log (this->name, GF_LOG_ERROR,
"Failed to set barrier-status in dict");