diff options
Diffstat (limited to 'xlators/cluster/nsr-recon/src/recon_xlator.c')
-rw-r--r-- | xlators/cluster/nsr-recon/src/recon_xlator.c | 837 |
1 files changed, 837 insertions, 0 deletions
diff --git a/xlators/cluster/nsr-recon/src/recon_xlator.c b/xlators/cluster/nsr-recon/src/recon_xlator.c new file mode 100644 index 000000000..62583d526 --- /dev/null +++ b/xlators/cluster/nsr-recon/src/recon_xlator.c @@ -0,0 +1,837 @@ +/* + Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com> + This file is part of GlusterFS. + + This file is licensed to you under your choice of the GNU Lesser + General Public License, version 3 or any later version (LGPLv3 or + later), or the GNU General Public License, version 2 (GPLv2), in all + cases as published by the Free Software Foundation. +*/ +#ifndef _CONFIG_H +#define _CONFIG_H +#include "config.h" +#endif + +#include <sys/types.h> +#include <fcntl.h> +#include <string.h> +#include <unistd.h> + +#include "call-stub.h" +#include "defaults.h" +#include "xlator.h" + +#include "recon_driver.h" +#include "recon_xlator.h" + +typedef struct _nsr_recon_fd_s { + int32_t term; + nsr_recon_driver_state_t state; + uint32_t first_index; + uint32_t last_index; + call_frame_t *frame; +} nsr_recon_fd_t; + + +typedef struct _nsr_txn_id_s { + uint32_t txn_id; + call_frame_t *frame; + struct list_head list; +} nsr_txn_id_t; + +// Given fd, get back the NSR based fd context. +static int32_t this_fd_ctx_get(fd_t *fd, xlator_t *this, nsr_recon_fd_t **rfd) +{ + uint64_t tmp = 0; + int32_t ret = -1; + + if ((ret = fd_ctx_get(fd, this, &tmp)) != 0) { + return ret; + } else { + *rfd = (nsr_recon_fd_t *)tmp; + return 0; + } +} + +// Add the frame in q after associating with txn_id +static void put_frame(nsr_recon_private_t *priv, + call_frame_t *frame, + uint32_t txn_id) +{ + xlator_t *this = priv->this; + nsr_txn_id_t * tid = GF_CALLOC(1, sizeof(nsr_txn_id_t), gf_mt_recon_private_t); + tid->txn_id = txn_id; + tid->frame = frame; + INIT_LIST_HEAD(&(tid->list)); + list_add_tail(&(tid->list), &(priv->list)); + recon_main_log (this->name, GF_LOG_INFO, "adding framef or txn id %d into queue \n", txn_id); +} + +// get the frame from the queue given the txn id +static void get_frame(nsr_recon_private_t *priv, + call_frame_t **frame, + uint32_t txn_id) +{ + nsr_txn_id_t *tid = NULL; + xlator_t *this = priv->this; + + list_for_each_entry(tid, &(priv->list), list) { + if (tid->txn_id == txn_id) { + *frame = tid->frame; + recon_main_log (this->name, GF_LOG_INFO, "got frame for txn id %d into queue \n", txn_id); + return; + } + } + recon_main_log (this->name, GF_LOG_INFO, "got no frame for txn id %d into queue \n", txn_id); + GF_ASSERT(0); +} + +// Get the term info for the term number specified +void nsr_recon_libchangelog_get_this_term_info(xlator_t *this, char *bp, int32_t term, nsr_recon_last_term_info_t *lt) +{ + struct stat buf; + char path[PATH_MAX]; + + bzero(lt, sizeof(nsr_recon_last_term_info_t)); + lt->last_term = term; + sprintf(path,"%s/%s%d",bp,"TERM.",term); + if (!stat(path, &buf) && (buf.st_size > 128)) { + if (buf.st_size <= 128) { + lt->first_index = 0; + lt->last_index = 0; + lt->commited_ops = 0; + } + else { + lt->first_index = 1; + lt->last_index = ((buf.st_size - 128)/128) + 1 ; + lt->commited_ops = lt->last_index - lt->first_index + 1; + } + } + recon_main_log (this->name, GF_LOG_INFO, "for term=%d got first_index=%d last_index=%d commited_ops=%d\n", + term, lt->first_index, lt->last_index, lt->commited_ops); + return; +} + +// Given the term number, find the last term in the changelogs +void nsr_recon_libchangelog_get_last_term_info(xlator_t *this, char *bp, int32_t term, nsr_recon_last_term_info_t *lt) +{ + uint32_t t = term; + struct stat buf; + char path[PATH_MAX]; + bzero(lt, sizeof(nsr_recon_last_term_info_t)); + while(t) { + // journal file is of type TERM-1.jnl + sprintf(path,"%s/%s%d",bp,"TERM.",t); + if (!stat(path, &buf)) { + nsr_recon_libchangelog_get_this_term_info(this, bp, t, lt); + recon_main_log (this->name, GF_LOG_INFO, "got last term given current term %d as %d\n", term, t); + return; + } + t--; + } + recon_main_log (this->name, GF_LOG_INFO, "got no last term given current term %d \n", term); + + return; +} + +// Return back the frame stored against the txn_id +void nsr_recon_return_back(nsr_recon_private_t *priv, uint32_t txn_id) +{ + call_frame_t *old_frame = NULL; + xlator_t *this = priv->this; + int32_t op_ret = 0; + int32_t op_errno = 0; + + get_frame(priv, &old_frame, txn_id); + if (old_frame) { + recon_main_log (this->name, GF_LOG_INFO, "nsr_recon_writev returns old frame \n"); + // first return the original write for which this ack was sent + STACK_UNWIND_STRICT (writev, old_frame, op_ret, op_errno, NULL, NULL, NULL); + } else { + recon_main_log (this->name, GF_LOG_ERROR, "EIII---nsr_recon_writev cnnot return old frame \n"); + } +} + +typedef enum records_type_t { + fop_gfid_pgfid_oldloc_newloc = 1, + fop_gfid_pgfid_entry = fop_gfid_pgfid_oldloc_newloc + 1, + fop_gfid = fop_gfid_pgfid_entry + 1 , + fop_gfid_offset = fop_gfid + 1, + fop_gfid_offset_len = fop_gfid_offset + 1, +} records_type_t; + +// Get the backend ./glusterfs/xx/xx/<...> path +static void +get_gfid_path(nsr_recon_private_t *priv, char *gfid, char *path) +{ + strcpy(path, priv->base_dir); + strcat(path, "/.glusterfs/"); + strncat(path,gfid,2); + strcat(path,"/"); + strncat(path,gfid+2,2); + strcat(path,"/"); + strcat(path,gfid); +} + + +// Get the link to which backend points to +static gf_boolean_t +get_link_using_gfid(nsr_recon_private_t *priv, char *gfid, char *path) +{ + char lp[PATH_MAX]; + xlator_t *this = priv->this; + get_gfid_path(priv,gfid, lp); + if (readlink(lp, path, 255) == -1) { + GF_ASSERT(0); + recon_main_log(priv->this, GF_LOG_ERROR, + "cannot get readlink for %s\n",lp); + return _gf_false; + } + return _gf_true; +} + +// Get the list of changelog records given a term , first and last index. +void nsr_recon_libchangelog_get_records(xlator_t *this, char *bp, int32_t term, uint32_t first, uint32_t last, void *buf) +{ + // do a mmap; seek into the first and read all records till last. + // TBD - right now all records are pseudo holes but mark them as fills. + // TBD - pseudo hole to be implemented when actual fsync gets done on data. + char read_buf[((last - first) + 1) * 128]; + char *rb = &(read_buf[0]); + char path[PATH_MAX]; + int fd; + uint32_t index = 0; + + recon_main_log (this->name, GF_LOG_INFO, + "libchangelog_get_records called for term %d index from %d to %d \n", + term, first, last ); + + sprintf(path,"%s/%s%d",bp,"TERM.",term); + fd = open(path, O_RDONLY); + if (fd != -1) { + char *start = NULL; + nsr_recon_record_details_t * rec = (nsr_recon_record_details_t *)buf; + if (first == 0) + lseek(fd, 128, SEEK_SET); + else + lseek(fd, first * 128, SEEK_SET); + read(fd, rb, (last - first + 1) * 128); + start = rb; + index = first; + do { + recon_main_log (this->name, GF_LOG_INFO, + "libchangelog_get_records start inspecting records at index %d \n", + index ); + if (!strncmp(start, "_PRE_", 5)) { + char op_str[4]; + uint32_t i=0, opcode = 0; + records_type_t type; + + start += 5; + // increment by the NULLs after the PRE + start += 4; + // now we have the opcode + i = 0; + while (*start != 0) { + op_str[i++] = (*start); + start++; + } + op_str[i] = '\0'; + opcode = strtoul(op_str, NULL, 10); + recon_main_log (this->name, GF_LOG_ERR, + "libchangelog_get_records: got opcode %d @index %d\n", opcode, index); + if ((opcode == GF_FOP_RENAME)) { + type = fop_gfid_pgfid_oldloc_newloc; + } else if ((opcode == GF_FOP_UNLINK) || + (opcode == GF_FOP_RMDIR) || + (opcode == GF_FOP_LINK) || + (opcode == GF_FOP_MKDIR) || + (opcode == GF_FOP_SYMLINK) || + (opcode == GF_FOP_MKNOD) || + (opcode == GF_FOP_CREATE)) { + type = fop_gfid_pgfid_entry; + } else if ((opcode == GF_FOP_FSETATTR) || + (opcode == GF_FOP_SETATTR) || + (opcode == GF_FOP_FREMOVEXATTR) || + (opcode == GF_FOP_REMOVEXATTR) || + (opcode == GF_FOP_SETXATTR) || + (opcode == GF_FOP_FSETXATTR)) { + type = fop_gfid; + } else if ((opcode == GF_FOP_TRUNCATE) || + (opcode == GF_FOP_FTRUNCATE)) { + type = fop_gfid_offset; + } else if (opcode == GF_FOP_WRITE) { + type = fop_gfid_offset_len; + } else { + recon_main_log (this->name, GF_LOG_ERR, + "libchangelog_get_records:got no proper opcode %d @index %d\n", + opcode, index); + //GF_ASSERT(0); + // make this as a hole. + // TBD - check this logic later. maybe we should raise alarm here because + // this means that changelog is corrupted. We are not handling changelog + // corruptions as of now. + rec->type = NSR_LOG_HOLE; + goto finish; + } + // TBD - handle psuedo holes once that logic is in. + rec->type = NSR_LOG_FILL; + recon_main_log (this->name, GF_LOG_ERR, + "libchangelog_get_records:got type %d at index %d \n", + rec->type, index); + rec->op = opcode; + + // Now get the gfid and parse it + // before that increment the pointer + start++; + for (i=0; i < 36; i++) { + rec->gfid[i] = (*start); + start++; + } + rec->gfid[i] = '\0'; + + if (opcode == GF_FOP_SYMLINK) { + // the symlink would have been removed. Hence ignore this. + // TBD - have an uniform error policy in case of such cases. + // Right now we are handling some on the source and some on the destination. + if(get_link_using_gfid(this->private, rec->gfid, rec->link_path) == _gf_false) { + rec->type = NSR_LOG_HOLE; + goto finish; + } + } + + GF_ASSERT(*start == 0); + start ++; + + i = 0; + // If type is fop_gfid_offset+_len, get offset + if ((type == fop_gfid_offset) || (type == fop_gfid_offset_len)) { + char offset_str[128]; + while(*start != 0) { + offset_str[i++] = *start; + start ++; + } + offset_str[i] = '\0'; + // get over the 0 + start++; + rec->offset = strtoul(offset_str, NULL, 10); + recon_main_log (this->name, GF_LOG_ERR, + "libchangelog_get_records:got offset %d @index %d \n", rec->offset, index); + + } + i = 0; + if (type == fop_gfid_offset_len) { + char len_str[128]; + while(*start != 0) { + len_str[i++] = *start; + start ++; + } + len_str[i] = '\0'; + // get over the 0 + start++; + rec->len = strtoul(len_str, NULL, 10); + recon_main_log (this->name, GF_LOG_ERR, + "libchangelog_get_records:got length %d @index %d \n", rec->len, index); + } + i = 0; + if (type == fop_gfid_pgfid_entry) { + // first get the gfid and then the path + for (i=0; i < 36; i++) { + rec->pargfid[i] = (*start); + start++; + } + rec->pargfid[i] = '\0'; + GF_ASSERT(*start == '/'); + start ++; + + i = 0; + while(*start != 0) { + rec->entry[i++] = *start; + start ++; + } + rec->entry[i] = '\0'; + // get over the 0 + start++; + recon_main_log (this->name, GF_LOG_ERR, + "libchangelog_get_records:got entry %s @index %d \n", rec->entry, index); + + } + i = 0; + if (type == fop_gfid_pgfid_oldloc_newloc) { + + // first get the source and then the destination + // source stuff gets stored in pargfid/entry + for (i=0; i < 36; i++) { + rec->pargfid[i] = (*start); + start++; + } + rec->pargfid[i] = '\0'; + GF_ASSERT(*start == '/'); + start ++; + + i=0; + while(*start != 0) { + rec->entry[i++] = *start; + start ++; + } + rec->entry[i] = '\0'; + // get over the 0 + start++; + + // dst stuff gets stored in gfid/newloc + for (i=0; i < 36; i++) { + rec->gfid[i] = (*start); + start++; + } + rec->gfid[i] = '\0'; + GF_ASSERT(*start == '/'); + start ++; + i = 0; + while(*start != 0) { + rec->newloc[i++] = *start; + start ++; + } + rec->newloc[i] = '\0'; + // get over the 0 + start++; + + } + ENDIAN_CONVERSION_RD((*rec), _gf_false); //htonl + } +finish: + if (index == last) + break; + index++; + rb += 128; + start = rb; + rec++; + } while(1); + } + close(fd); + + recon_main_log (this->name, GF_LOG_INFO, + "libchangelog_get_records finsihed inspecting records for term %d \n", + term); + return; +} + +int32_t +nsr_recon_open (call_frame_t *frame, xlator_t *this, + loc_t *loc, int32_t flags, fd_t *fd, dict_t *xdata) +{ + int32_t op_ret = 0; + int32_t op_errno = 0; + nsr_recon_fd_t *rfd = NULL; + + recon_main_log (this->name, GF_LOG_INFO, "nsr_recon_open called for path %s \n",loc->path ); + rfd = GF_CALLOC (1, sizeof (*rfd), gf_mt_recon_private_t); + if (!rfd) { + op_ret = -1; + op_errno = ENOMEM; + } + + op_ret = fd_ctx_set (fd, this, (uint64_t)(long)rfd); + if (op_ret) { + op_ret = -1; + op_errno = EINVAL; + } + recon_main_log (this->name, GF_LOG_INFO, "nsr_recon_open returns with %d for path %s \n",op_ret,loc->path ); + STACK_UNWIND_STRICT (open, frame, op_ret, op_errno, fd, NULL); + return 0; +} + +int32_t +nsr_recon_writev (call_frame_t *frame, xlator_t *this, fd_t *fd, + struct iovec *vector, int32_t count, off_t offset, + uint32_t flags, struct iobref *iobref, dict_t *xdata) +{ + nsr_recon_fd_t *rfd = NULL; + nsr_recon_private_t *priv = NULL; + int32_t op_ret = 0; + int32_t op_errno = 0; + int32_t ret = 0; + + ret = this_fd_ctx_get (fd, this, &rfd); + if (ret < 0) { + return -1; + } + priv = (nsr_recon_private_t *)this->private; + + recon_main_log (this->name, GF_LOG_INFO, "nsr_recon_writev called for offset %d \n",(unsigned int)offset ); + GF_ASSERT(count == 1); + switch (offset) { + // gets called to return back + case nsr_recon_xlator_sector_0: + { + char c[4]; + uint32_t txn_id; + + recon_main_log (this->name, GF_LOG_INFO, "nsr_recon_writev clled to return back \n"); + memcpy((void *)c, (void *)vector[0].iov_base, 4); + txn_id = ntohl(atoi(c)); + nsr_recon_return_back(priv, txn_id); + STACK_UNWIND_STRICT (writev, frame, op_ret, op_errno, + NULL, NULL, NULL); + break; + } + // client(brick, leader) writes the role of the node + case nsr_recon_xlator_sector_1 : + { + nsr_recon_role_t rr; + memcpy((void *)&rr, (void *)vector[0].iov_base, sizeof(rr)); + ENDIAN_CONVERSION_RR(rr, _gf_true); //ntohl + + recon_main_log (this->name, GF_LOG_INFO, "nsr_recon_writev called to set role %d\n", rr.role); + if ((rr.role != leader) && + (rr.role != reconciliator) && + (rr.role != resolutor)) { + recon_main_log (this->name, GF_LOG_ERROR, + "EIII---nsr_recon_writev cannot set state \n"); + STACK_UNWIND_STRICT (writev, frame, -1, op_errno, + NULL, NULL, NULL); + } + + GF_ASSERT(rr.num <= MAXIMUM_REPLICA_STRENGTH); + + // Store the stack frame so that when the actual job gets finished + // we send the response back to the brick. + if (nsr_recon_driver_set_role(priv->driver_thread_context, + &rr, + priv->txn_id) == _gf_false) { + recon_main_log (this->name, GF_LOG_ERROR, + "nsr_recon_writev set_role - cannot seem to set role \n"); + STACK_UNWIND_STRICT (writev, frame, -1, op_errno, + NULL, NULL, NULL); + } else { + uint32_t old = priv->txn_id; + atomic_cmpxchg(&priv->txn_id, old,old+1); + put_frame(priv, frame, old); + recon_main_log (this->name, GF_LOG_INFO, + "nsr_recon_writev set_role - set role succesfully \n"); + } + break; + } + // client(reconciliator) writes how much it needs for the read + case nsr_recon_xlator_sector_2 : + { + nsr_recon_log_info_t li; + memcpy((void *)&li, (void *)vector[0].iov_base, sizeof(li)); + ENDIAN_CONVERSION_LI(li, _gf_true); //ntohl + + recon_main_log (this->name, GF_LOG_INFO, + "nsr_recon_writev - setting term info for reconcilation info. term=%d, first_index=%d,start_index=%d \n", + li.term, li.first_index, li.last_index); + rfd->term = li.term; + rfd->last_index = li.last_index; + rfd->first_index = li.first_index; + STACK_UNWIND_STRICT (writev, frame, op_ret, op_errno, + NULL, NULL, NULL); + break; + } + // client(reconciliator) writes term for which it needs info + case nsr_recon_xlator_sector_3 : + { + int32_t term; + + memcpy((void *)&term, (void *)vector[0].iov_base, sizeof(term)); + term = ntohl(term); //ntohl + recon_main_log (this->name, GF_LOG_INFO, + "nsr_recon_writev - setting term info for term info. term=%d\n", + term); + rfd->term = term; + STACK_UNWIND_STRICT (writev, frame, op_ret, op_errno, + NULL, NULL, NULL); + break; + } + // client(reconciliator) writes current term so that it gets last term info later + case nsr_recon_xlator_sector_4 : + { + int32_t term; + + memcpy((void *)&term, (void *)vector[0].iov_base, sizeof(term)); + term = ntohl(term); //ntohl + recon_main_log (this->name, GF_LOG_INFO, + "nsr_recon_writev - setting term info for last term info given current term=%d\n", + term); + rfd->term = term; + STACK_UNWIND_STRICT (writev, frame, op_ret, op_errno, + NULL, NULL, NULL); + break; + } + } + + return 0; +} + +int +nsr_recon_readv (call_frame_t *frame, xlator_t *this, + fd_t *fd, size_t size, off_t offset, uint32_t flags, dict_t *xdata) +{ + nsr_recon_fd_t *rfd = NULL; + int32_t op_ret = 0; + int32_t op_errno = 0; + // copied stuff from quick-read.c and posix.c + struct iobuf *iobuf = NULL; + struct iobref *iobref = NULL; + struct iovec iov = {0, }; + int32_t ret = -1; + nsr_recon_private_t *priv = NULL; + + iobuf = iobuf_get2 (this->ctx->iobuf_pool, op_ret); + if (!iobuf) { + op_errno = ENOMEM; + goto out; + } + + iobref = iobref_new (); + if (!iobref) { + op_errno = ENOMEM; + goto out; + } + + iobref_add (iobref, iobuf); + + ret = this_fd_ctx_get (fd, this, &rfd); + if (ret < 0) { + op_errno = -ret; + goto out; + } + priv = (nsr_recon_private_t *)this->private; + + recon_main_log (this->name, GF_LOG_INFO, "nsr_recon_readv called for offset %d \n",(unsigned int)offset ); + switch (offset) { + // client(leader) reads from here to get info for this term on this node + // invole libchagelog to get the information + case nsr_recon_xlator_sector_3 : + { + nsr_recon_last_term_info_t lt; + GF_ASSERT(size == sizeof(lt)); + nsr_recon_libchangelog_get_this_term_info(this,priv->changelog_base_path, rfd->term, <); + recon_main_log (this->name, GF_LOG_INFO, + "nsr_recon_readv - getting term info for term=%d, ops=%d, first=%d, last=%d\n", + rfd->term, lt.commited_ops, lt.first_index, lt.last_index); + ENDIAN_CONVERSION_LT(lt, _gf_false); //htonl + memcpy(iobuf->ptr, <, size); + goto out; + } + // client(reconciliator) reads individual record information + case nsr_recon_xlator_sector_2 : + { + uint32_t num = (rfd->last_index - rfd->first_index + 1); + recon_main_log (this->name, GF_LOG_INFO, + "nsr_recon_readv - expected size %lu got size %lu\n", + (num * sizeof(nsr_recon_record_details_t)), size); + + GF_ASSERT(size == (num * sizeof(nsr_recon_record_details_t))); + recon_main_log (this->name, GF_LOG_INFO, + "nsr_recon_readv - getting records for term=%d from %d to %d\n", + rfd->term, rfd->first_index, rfd->last_index); + nsr_recon_libchangelog_get_records(this, priv->changelog_base_path, + rfd->term, rfd->first_index, rfd->last_index, iobuf->ptr); + goto out; + } + // read last term info + case nsr_recon_xlator_sector_4 : + { + nsr_recon_last_term_info_t lt; + GF_ASSERT(size == sizeof(lt)); + nsr_recon_libchangelog_get_last_term_info(this, priv->changelog_base_path, rfd->term, <); + recon_main_log (this->name, GF_LOG_INFO, + "nsr_recon_readv - getting last term info given current term=%d. last term = %d ops=%d, first=%d, last=%d\n", + rfd->term, lt.last_term, lt.commited_ops, lt.first_index, lt.last_index); + ENDIAN_CONVERSION_LT(lt, _gf_false); //htonl + memcpy(iobuf->ptr, <, size); + goto out; + } + } + +out: + if (op_errno == 0) { + iov.iov_base = iobuf->ptr; + ret = iov.iov_len = size; + } + + STACK_UNWIND_STRICT (readv, frame, ret, op_errno, &iov, 1, NULL, iobref , NULL); + + if (iobref) + iobref_unref (iobref); + if (iobuf) + iobuf_unref (iobuf); + return 0; +} + +int +nsr_recon_lookup (call_frame_t *frame, xlator_t *this, + loc_t *loc, dict_t *xdata) +{ + struct iatt buf = {0, }; + // dirty hack to set root as regular but seems to work. + buf.ia_type = IA_IFREG; + recon_main_log (this->name, GF_LOG_INFO, "nsr_recon_lookup called \n"); + + STACK_UNWIND_STRICT (lookup, frame, 0, 0, this->itable->root, &buf, NULL, NULL); + return 0; +} + + +int32_t +nsr_recon_flush (call_frame_t *frame, xlator_t *this, + fd_t *fd, dict_t *xdata) +{ + STACK_UNWIND_STRICT (flush, frame, 0, 0, NULL); + return 0; +} + +int32_t +init (xlator_t *this) +{ + nsr_recon_private_t *priv = NULL; + char *local, *members; + unsigned int i=0; + + priv = GF_CALLOC (1, sizeof (*priv), gf_mt_recon_private_t); + if (!priv) { + recon_main_log (this->name, GF_LOG_ERROR, + "priv allocation error\n"); + return -1; + } + GF_OPTION_INIT ("replica-group-size", priv->replica_group_size, uint32, err); + GF_OPTION_INIT ("vol-name", priv->volname, str, err); + if (!priv->volname) { + recon_main_log (this->name, GF_LOG_ERROR, + "missing volname option (required)"); + return -1; + } + GF_OPTION_INIT ("changelog-dir", priv->changelog_base_path, str, err); + if (!priv->changelog_base_path) { + recon_main_log (this->name, GF_LOG_ERROR, + "missing changelog directory option (required)"); + return -1; + } + GF_OPTION_INIT ("base-dir", priv->base_dir, str, err); + if (!priv->base_dir) { + recon_main_log (this->name, GF_LOG_ERROR, + "missing brick base directory option (required)"); + return -1; + } + GF_OPTION_INIT ("replica-group-members", members, str, err); + if (!members) { + recon_main_log (this->name, GF_LOG_ERROR, + "missing membership option (required)"); + return -1; + } + GF_OPTION_INIT ("local-member", local, str, err); + if (!local) { + recon_main_log (this->name, GF_LOG_ERROR, + "missing local member option (required)"); + return -1; + } + + priv->replica_group_members = GF_CALLOC (priv->replica_group_size, + sizeof(char *), + gf_mt_recon_private_t); + priv->replica_group_members[0] = GF_CALLOC (1, + strlen(local), + gf_mt_recon_private_t); + if (!priv->replica_group_members || !(priv->replica_group_members[0])) { + recon_main_log (this->name, GF_LOG_ERROR, + "str allocation error\n"); + return -1; + } + strcpy(priv->replica_group_members[0], local); + for (i=1; i < priv->replica_group_size; i++) { + char *member; + if (i == 1) + member = strtok(members, ","); + else + member = strtok(NULL, ","); + priv->replica_group_members[i] = GF_CALLOC (1, strlen(member) + 1, gf_mt_recon_private_t); + if (!priv->replica_group_members[i]) { + recon_main_log (this->name, GF_LOG_ERROR, + "str allocation error\n"); + return -1; + } + strcpy(priv->replica_group_members[i], member); + } + + + priv->this = this; + this->private = (void *)priv; + + recon_main_log (this->name, GF_LOG_INFO, "creating reconciliation driver \n"); + + if (pthread_create(&priv->thread_id, NULL, nsr_reconciliation_driver, priv)) { + recon_main_log (this->name, GF_LOG_ERROR, + "pthread creation error \n"); + return -1; + } + + INIT_LIST_HEAD(&(priv->list)); + + + return 0; + +err: + return -1; +} + + +void +fini (xlator_t *this) +{ + nsr_recon_private_t *priv = NULL; + void *ret = NULL; + + priv = (nsr_recon_private_t *)this->private; + + pthread_cancel(priv->thread_id); + pthread_join(priv->thread_id, &ret); +} + + +struct xlator_fops fops = { + .open = nsr_recon_open, + .readv = nsr_recon_readv, + .writev = nsr_recon_writev, + .lookup = nsr_recon_lookup, + .flush = nsr_recon_flush +}; + +struct xlator_cbks cbks = { +}; + +struct volume_options options[] = { + { .key = {"replica-group-size"}, + .type = GF_OPTION_TYPE_INT, + .min = 2, + .max = INT_MAX, + .default_value = "2", + .description = "Number of bricks in replica group. can be derived but putting it here for testing." + }, + { + .key = {"vol-name"}, + .type = GF_OPTION_TYPE_STR, + .description = "volume name" + }, + { + .key = {"local-member"}, + .type = GF_OPTION_TYPE_STR, + .description = "member(brick) for which this translator is responsible." + }, + { + .key = {"replica-group-members"}, + .type = GF_OPTION_TYPE_STR, + .description = "Comma seperated member names other than local." + }, + { + .key = {"changelog-dir"}, + .type = GF_OPTION_TYPE_STR, + .description = "Base directory where per term changelogs are maintained." + }, + { + .key = {"base-dir"}, + .type = GF_OPTION_TYPE_STR, + .description = "Base directory for this brick. This should go away once we fix gfid based lookups" + }, + { .key = {NULL} }, +}; |