diff options
author | Jeff Darcy <jdarcy@redhat.com> | 2014-01-28 20:38:39 +0000 |
---|---|---|
committer | Jeff Darcy <jdarcy@redhat.com> | 2014-01-29 12:53:52 +0000 |
commit | e1bfa08e63d3be4d76b0a72f11cd9129d75b04dd (patch) | |
tree | 414905da8f4e8330f075b58589398a864c546601 /xlators/cluster/nsr-server/src | |
parent | b66f359fda0b43563e43e1a299542f088337846a (diff) |
etcd-sim: fix stdio-buffering problem
Leader election was not working because processes were continuing to
read their own cached copies of the data file contents, not seeing each
others' changes. Fflush is not sufficient to ensure that data
propagates from one process to another. Even ensuring that it gets to
disk requires an fsync as well (which we weren't doing). The only
viable solution is to *reopen* the data file with an empty buffer. This
also makes it easier to ensure correct locking across the whole file,
instead of just from the previous seek position to EOF.
Change-Id: I165d55afd5804dc3f579ea7e55c56da5066ae6c1
Signed-off-by: Jeff Darcy <jdarcy@redhat.com>
Diffstat (limited to 'xlators/cluster/nsr-server/src')
-rw-r--r-- | xlators/cluster/nsr-server/src/etcd-sim.c | 106 |
1 files changed, 68 insertions, 38 deletions
diff --git a/xlators/cluster/nsr-server/src/etcd-sim.c b/xlators/cluster/nsr-server/src/etcd-sim.c index 3cf73a78a..782b5f617 100644 --- a/xlators/cluster/nsr-server/src/etcd-sim.c +++ b/xlators/cluster/nsr-server/src/etcd-sim.c @@ -32,6 +32,7 @@ #include <sys/types.h> #include <sys/stat.h> #include <fcntl.h> +#include <sys/file.h> /* * Mock implementation of etcd @@ -51,42 +52,33 @@ etcd_open (etcd_server *server_list) } typedef struct _etcd_sim_s { - int fd; - FILE *stream; + char *path; } etcd_sim_t; void etcd_close (etcd_session this) { etcd_sim_t *sim = (etcd_sim_t *)this; - fflush(sim->stream); - fclose(sim->stream); - close(sim->fd); + free(sim->path); free(this); } char * -etcd_get (etcd_session this, char *key) +etcd_get_1 (FILE *stream, char *key) { char *str = NULL; size_t len; - etcd_sim_t *sim = (etcd_sim_t *)this; unsigned long expires; char *ret; - lockf(sim->fd, F_LOCK, 0 ); - if (fseek(sim->stream, 0, SEEK_SET) == -1) { - lockf(sim->fd, F_ULOCK, 0 ); - return NULL; - } // Read the file while(1) { if(str) { free(str); str = NULL; } - if (getline((char **)&str, &len,sim->stream) == -1) { + if (getline((char **)&str, &len,stream) == -1) { break; } if (!strncmp(str, key, strlen(key))) { @@ -100,36 +92,51 @@ etcd_get (etcd_session this, char *key) ret = calloc(1, strlen(s) + 1); strcpy(ret,s); free(str); - lockf(sim->fd, F_ULOCK, 0 ); return(ret); } } - lockf(sim->fd, F_ULOCK, 0 ); return NULL; } +char * +etcd_get (etcd_session this, char *key) +{ + etcd_sim_t *sim = (etcd_sim_t *)this; + int fd; + FILE *stream; + char *retval; + + fd = open(sim->path,O_RDONLY); + if (!fd) { + return NULL; + } + + stream = fdopen(fd,"r"); + (void)flock(fd,LOCK_SH); + retval = etcd_get_1(stream,key); + (void)flock(fd,LOCK_UN); + fclose(stream); /* closes fd as well */ + + return retval; +} + + etcd_result -etcd_set (etcd_session this, char *key, char *value, - char *precond, unsigned int ttl) +etcd_set_1 (FILE *stream, char *key, char *value, + char *precond, unsigned int ttl) { char *str = NULL; char tp[255]; size_t len; - etcd_sim_t *sim = (etcd_sim_t *)this; unsigned long expires; - lockf(sim->fd, F_LOCK, 0 ); - if (fseek(sim->stream, 0, SEEK_SET) == -1) { - lockf(sim->fd, F_ULOCK, 0 ); - return ETCD_WTF; - } while(1) { if(str) { free(str); str = NULL; } - if (getline((char **)&str, &len,sim->stream) == -1) { + if (getline((char **)&str, &len,stream) == -1) { break; } if (!strncmp(str, key, strlen(key))) { @@ -147,10 +154,9 @@ etcd_set (etcd_session this, char *key, char *value, */ if (precond && strcmp(precond, s)) { free(str); - lockf(sim->fd, F_ULOCK, 0 ); return ETCD_WTF; } - fseek(sim->stream, -strlen(str), SEEK_CUR); + fseek(stream, -strlen(str), SEEK_CUR); free(str); goto here; } @@ -160,32 +166,56 @@ here: sprintf(tp,"%*s %*s %*lu\n", -MAX_KEY_LEN, key, -MAX_VALUE_LEN, value, -MAX_EXPIRE_LEN, ttl ? time(NULL) + ttl : ~0); - if (fwrite(tp, 1,strlen(tp), sim->stream) != strlen(tp)) { - lockf(sim->fd, F_ULOCK, 0 ); + if (fwrite(tp, 1,strlen(tp), stream) != strlen(tp)) { return ETCD_WTF; } - fflush(sim->stream); - lockf(sim->fd, F_ULOCK, 0 ); + fflush(stream); + fsync(fileno(stream)); return ETCD_OK; } +etcd_result +etcd_set (etcd_session this, char *key, char *value, + char *precond, unsigned int ttl) +{ + etcd_sim_t *sim = (etcd_sim_t *)this; + int fd; + FILE *stream; + etcd_result retval; + + fd = open(sim->path,O_RDWR); + if (fd < 0) { + return ETCD_WTF; + } + + stream = fdopen(fd,"r+"); + (void)flock(fd,LOCK_EX); + retval = etcd_set_1(stream,key,value,precond,ttl); + (void)flock(fd,LOCK_UN); + fclose(stream); /* closes fd as well */ + + return retval; +} + etcd_session etcd_open_str (char *server_names) { - etcd_sim_t *sim; - char name[256]; + etcd_sim_t *sim; + int fd; sim = calloc(1, sizeof(etcd_sim_t)); - sprintf(name, "/tmp/%s", server_names); - sim->fd = open(name, O_RDWR | O_CREAT, 0777); - if (sim->fd == -1) - return NULL; - sim->stream = fopen(name, "r+"); - if (sim->stream == NULL) + (void)asprintf(&sim->path,"/tmp/%s",server_names); + + fd = open(sim->path, O_RDWR | O_CREAT, 0777); + if (fd == -1) { + free(sim->path); + free(sim); return NULL; + } + close(fd); return ((void *)sim); } |