summaryrefslogtreecommitdiffstats
path: root/xlators/cluster/nsr-server/src
diff options
context:
space:
mode:
authorJeff Darcy <jdarcy@redhat.com>2014-01-28 20:38:39 +0000
committerJeff Darcy <jdarcy@redhat.com>2014-01-29 12:53:52 +0000
commite1bfa08e63d3be4d76b0a72f11cd9129d75b04dd (patch)
tree414905da8f4e8330f075b58589398a864c546601 /xlators/cluster/nsr-server/src
parentb66f359fda0b43563e43e1a299542f088337846a (diff)
etcd-sim: fix stdio-buffering problem
Leader election was not working because processes were continuing to read their own cached copies of the data file contents, not seeing each others' changes. Fflush is not sufficient to ensure that data propagates from one process to another. Even ensuring that it gets to disk requires an fsync as well (which we weren't doing). The only viable solution is to *reopen* the data file with an empty buffer. This also makes it easier to ensure correct locking across the whole file, instead of just from the previous seek position to EOF. Change-Id: I165d55afd5804dc3f579ea7e55c56da5066ae6c1 Signed-off-by: Jeff Darcy <jdarcy@redhat.com>
Diffstat (limited to 'xlators/cluster/nsr-server/src')
-rw-r--r--xlators/cluster/nsr-server/src/etcd-sim.c106
1 files changed, 68 insertions, 38 deletions
diff --git a/xlators/cluster/nsr-server/src/etcd-sim.c b/xlators/cluster/nsr-server/src/etcd-sim.c
index 3cf73a78a..782b5f617 100644
--- a/xlators/cluster/nsr-server/src/etcd-sim.c
+++ b/xlators/cluster/nsr-server/src/etcd-sim.c
@@ -32,6 +32,7 @@
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
+#include <sys/file.h>
/*
* Mock implementation of etcd
@@ -51,42 +52,33 @@ etcd_open (etcd_server *server_list)
}
typedef struct _etcd_sim_s {
- int fd;
- FILE *stream;
+ char *path;
} etcd_sim_t;
void
etcd_close (etcd_session this)
{
etcd_sim_t *sim = (etcd_sim_t *)this;
- fflush(sim->stream);
- fclose(sim->stream);
- close(sim->fd);
+ free(sim->path);
free(this);
}
char *
-etcd_get (etcd_session this, char *key)
+etcd_get_1 (FILE *stream, char *key)
{
char *str = NULL;
size_t len;
- etcd_sim_t *sim = (etcd_sim_t *)this;
unsigned long expires;
char *ret;
- lockf(sim->fd, F_LOCK, 0 );
- if (fseek(sim->stream, 0, SEEK_SET) == -1) {
- lockf(sim->fd, F_ULOCK, 0 );
- return NULL;
- }
// Read the file
while(1) {
if(str) {
free(str);
str = NULL;
}
- if (getline((char **)&str, &len,sim->stream) == -1) {
+ if (getline((char **)&str, &len,stream) == -1) {
break;
}
if (!strncmp(str, key, strlen(key))) {
@@ -100,36 +92,51 @@ etcd_get (etcd_session this, char *key)
ret = calloc(1, strlen(s) + 1);
strcpy(ret,s);
free(str);
- lockf(sim->fd, F_ULOCK, 0 );
return(ret);
}
}
- lockf(sim->fd, F_ULOCK, 0 );
return NULL;
}
+char *
+etcd_get (etcd_session this, char *key)
+{
+ etcd_sim_t *sim = (etcd_sim_t *)this;
+ int fd;
+ FILE *stream;
+ char *retval;
+
+ fd = open(sim->path,O_RDONLY);
+ if (!fd) {
+ return NULL;
+ }
+
+ stream = fdopen(fd,"r");
+ (void)flock(fd,LOCK_SH);
+ retval = etcd_get_1(stream,key);
+ (void)flock(fd,LOCK_UN);
+ fclose(stream); /* closes fd as well */
+
+ return retval;
+}
+
+
etcd_result
-etcd_set (etcd_session this, char *key, char *value,
- char *precond, unsigned int ttl)
+etcd_set_1 (FILE *stream, char *key, char *value,
+ char *precond, unsigned int ttl)
{
char *str = NULL;
char tp[255];
size_t len;
- etcd_sim_t *sim = (etcd_sim_t *)this;
unsigned long expires;
- lockf(sim->fd, F_LOCK, 0 );
- if (fseek(sim->stream, 0, SEEK_SET) == -1) {
- lockf(sim->fd, F_ULOCK, 0 );
- return ETCD_WTF;
- }
while(1) {
if(str) {
free(str);
str = NULL;
}
- if (getline((char **)&str, &len,sim->stream) == -1) {
+ if (getline((char **)&str, &len,stream) == -1) {
break;
}
if (!strncmp(str, key, strlen(key))) {
@@ -147,10 +154,9 @@ etcd_set (etcd_session this, char *key, char *value,
*/
if (precond && strcmp(precond, s)) {
free(str);
- lockf(sim->fd, F_ULOCK, 0 );
return ETCD_WTF;
}
- fseek(sim->stream, -strlen(str), SEEK_CUR);
+ fseek(stream, -strlen(str), SEEK_CUR);
free(str);
goto here;
}
@@ -160,32 +166,56 @@ here:
sprintf(tp,"%*s %*s %*lu\n",
-MAX_KEY_LEN, key, -MAX_VALUE_LEN, value,
-MAX_EXPIRE_LEN, ttl ? time(NULL) + ttl : ~0);
- if (fwrite(tp, 1,strlen(tp), sim->stream) != strlen(tp)) {
- lockf(sim->fd, F_ULOCK, 0 );
+ if (fwrite(tp, 1,strlen(tp), stream) != strlen(tp)) {
return ETCD_WTF;
}
- fflush(sim->stream);
- lockf(sim->fd, F_ULOCK, 0 );
+ fflush(stream);
+ fsync(fileno(stream));
return ETCD_OK;
}
+etcd_result
+etcd_set (etcd_session this, char *key, char *value,
+ char *precond, unsigned int ttl)
+{
+ etcd_sim_t *sim = (etcd_sim_t *)this;
+ int fd;
+ FILE *stream;
+ etcd_result retval;
+
+ fd = open(sim->path,O_RDWR);
+ if (fd < 0) {
+ return ETCD_WTF;
+ }
+
+ stream = fdopen(fd,"r+");
+ (void)flock(fd,LOCK_EX);
+ retval = etcd_set_1(stream,key,value,precond,ttl);
+ (void)flock(fd,LOCK_UN);
+ fclose(stream); /* closes fd as well */
+
+ return retval;
+}
+
etcd_session
etcd_open_str (char *server_names)
{
- etcd_sim_t *sim;
- char name[256];
+ etcd_sim_t *sim;
+ int fd;
sim = calloc(1, sizeof(etcd_sim_t));
- sprintf(name, "/tmp/%s", server_names);
- sim->fd = open(name, O_RDWR | O_CREAT, 0777);
- if (sim->fd == -1)
- return NULL;
- sim->stream = fopen(name, "r+");
- if (sim->stream == NULL)
+ (void)asprintf(&sim->path,"/tmp/%s",server_names);
+
+ fd = open(sim->path, O_RDWR | O_CREAT, 0777);
+ if (fd == -1) {
+ free(sim->path);
+ free(sim);
return NULL;
+ }
+ close(fd);
return ((void *)sim);
}