#!/usr/bin/env python # Copyright (c) 2015 Red Hat, Inc. # This file is part of GlusterFS. # # This file is licensed to you under your choice of the GNU Lesser # General Public License, version 3 or any later version (LGPLv3 or # later), or the GNU General Public License, version 2 (GPLv2), in all # cases as published by the Free Software Foundation. import sqlite3 import urllib import os from utils import RecordType class OutputMerger(object): """ Class to merge the output files collected from different nodes """ def __init__(self, db_path, all_dbs): self.conn = sqlite3.connect(db_path) self.cursor = self.conn.cursor() self.cursor_reader = self.conn.cursor() query = "DROP TABLE IF EXISTS finallist" self.cursor.execute(query) query = """ CREATE TABLE finallist( id INTEGER PRIMARY KEY AUTOINCREMENT, ts VARCHAR, type VARCHAR, gfid VARCHAR, path1 VARCHAR, path2 VARCHAR, UNIQUE (type, path1, path2) ON CONFLICT IGNORE ) """ self.cursor.execute(query) # If node database exists, read each db and insert into # final table. Ignore if combination of TYPE PATH1 PATH2 # already exists for node_db in all_dbs: if os.path.exists(node_db): conn = sqlite3.connect(node_db) cursor = conn.cursor() query = """ SELECT ts, type, gfid, path1, path2 FROM gfidpath WHERE path1 != '' ORDER BY id ASC """ for row in cursor.execute(query): self.add_if_not_exists(row[0], row[1], row[2], row[3], row[4]) self.conn.commit() def add_if_not_exists(self, ts, ty, gfid, path1, path2=""): # Adds record to finallist only if not exists query = """ INSERT INTO finallist(ts, type, gfid, path1, path2) VALUES(?, ?, ?, ?, ?) """ self.cursor.execute(query, (ts, ty, gfid, path1, path2)) def get(self): query = """SELECT type, path1, path2 FROM finallist ORDER BY ts ASC, id ASC""" return self.cursor_reader.execute(query) def get_failures(self): query = """ SELECT gfid FROM finallist WHERE path1 = '' OR (path2 = '' AND type = 'RENAME') """ return self.cursor_reader.execute(query) class ChangelogData(object): def __init__(self, dbpath): self.conn = sqlite3.connect(dbpath) self.cursor = self.conn.cursor() self.cursor_reader = self.conn.cursor() self._create_table_gfidpath() self._create_table_pgfid() self._create_table_inodegfid() def _create_table_gfidpath(self): drop_table = "DROP TABLE IF EXISTS gfidpath" self.cursor.execute(drop_table) create_table = """ CREATE TABLE gfidpath( id INTEGER PRIMARY KEY AUTOINCREMENT, ts VARCHAR, type VARCHAR, gfid VARCHAR(40), pgfid1 VARCHAR(40), bn1 VARCHAR(500), pgfid2 VARCHAR(40), bn2 VARCHAR(500), path1 VARCHAR DEFAULT '', path2 VARCHAR DEFAULT '' ) """ self.cursor.execute(create_table) def _create_table_inodegfid(self): drop_table = "DROP TABLE IF EXISTS inodegfid" self.cursor.execute(drop_table) create_table = """ CREATE TABLE inodegfid( inode INTEGER PRIMARY KEY, gfid VARCHAR(40), converted INTEGER DEFAULT 0, UNIQUE (inode, gfid) ON CONFLICT IGNORE ) """ self.cursor.execute(create_table) def _create_table_pgfid(self): drop_table = "DROP TABLE IF EXISTS pgfid" self.cursor.execute(drop_table) create_table = """ CREATE TABLE pgfid( pgfid VARCHAR(40) PRIMARY KEY, UNIQUE (pgfid) ON CONFLICT IGNORE ) """ self.cursor.execute(create_table) def _get(self, tablename, filters): # SELECT * FROM WHERE params = [] query = "SELECT * FROM %s WHERE 1=1" % tablename for key, value in filters.items(): query += " AND %s = ?" % key params.append(value) return self.cursor_reader.execute(query, params) def _get_distinct(self, tablename, distinct_field, filters): # SELECT DISTINCT FROM WHERE params = [] query = "SELECT DISTINCT %s FROM %s WHERE 1=1" % (distinct_field, tablename) for key, value in filters.items(): query += " AND %s = ?" % key params.append(value) return self.cursor_reader.execute(query, params) def _delete(self, tablename, filters): # DELETE FROM WHERE query = "DELETE FROM %s WHERE 1=1" % tablename params = [] for key, value in filters.items(): query += " AND %s = ?" % key params.append(value) self.cursor.execute(query, params) def _add(self, tablename, data): # INSERT INTO (, ..) VALUES(?,?..) query = "INSERT INTO %s(" % tablename fields = [] params = [] for key, value in data.items(): fields.append(key) params.append(value) values_substitute = len(fields)*["?"] query += "%s) VALUES(%s)" % (",".join(fields), ",".join(values_substitute)) self.cursor.execute(query, params) def _update(self, tablename, data, filters): # UPDATE SET col1 = ?,.. WHERE col1=? AND .. params = [] update_fields = [] for key, value in data.items(): update_fields.append("%s = ?" % key) params.append(value) query = "UPDATE %s SET %s WHERE 1 = 1" % (tablename, ", ".join(update_fields)) for key, value in filters.items(): query += " AND %s = ?" % key params.append(value) self.cursor.execute(query, params) def _exists(self, tablename, filters): if not filters: return False query = "SELECT COUNT(1) FROM %s WHERE 1=1" % tablename params = [] for key, value in filters.items(): query += " AND %s = ?" % key params.append(value) self.cursor.execute(query, params) row = self.cursor.fetchone() return True if row[0] > 0 else False def gfidpath_add(self, changelogfile, ty, gfid, pgfid1="", bn1="", pgfid2="", bn2="", path1="", path2=""): self._add("gfidpath", { "ts": changelogfile.split(".")[-1], "type": ty, "gfid": gfid, "pgfid1": pgfid1, "bn1": bn1, "pgfid2": pgfid2, "bn2": bn2, "path1": path1, "path2": path2 }) def gfidpath_update(self, data, filters): self._update("gfidpath", data, filters) def gfidpath_delete(self, filters): self._delete("gfidpath", filters) def gfidpath_exists(self, filters): return self._exists("gfidpath", filters) def gfidpath_get(self, filters={}): return self._get("gfidpath", filters) def gfidpath_get_distinct(self, distinct_field, filters={}): return self._get_distinct("gfidpath", distinct_field, filters) def pgfid_add(self, pgfid): self._add("pgfid", { "pgfid": pgfid }) def pgfid_update(self, data, filters): self._update("pgfid", data, filters) def pgfid_get(self, filters={}): return self._get("pgfid", filters) def pgfid_get_distinct(self, distinct_field, filters={}): return self._get_distinct("pgfid", distinct_field, filters) def pgfid_exists(self, filters): return self._exists("pgfid", filters) def inodegfid_add(self, inode, gfid, converted=0): self._add("inodegfid", { "inode": inode, "gfid": gfid, "converted": converted }) def inodegfid_update(self, data, filters): self._update("inodegfid", data, filters) def inodegfid_get(self, filters={}): return self._get("inodegfid", filters) def inodegfid_get_distinct(self, distinct_field, filters={}): return self._get_distinct("inodegfid", distinct_field, filters) def inodegfid_exists(self, filters): return self._exists("inodegfid", filters) def append_path1(self, path, inode): # || is for concatenate in SQL query = """UPDATE gfidpath SET path1 = ',' || ? WHERE gfid IN (SELECT gfid FROM inodegfid WHERE inode = ?)""" self.cursor.execute(query, (path, inode)) def gfidpath_set_path1(self, path1, pgfid1): # || is for concatenate in SQL if path1 == "": update_str1 = "? || bn1" update_str2 = "? || bn2" else: update_str1 = "? || '%2F' || bn1" update_str2 = "? || '%2F' || bn2" query = """UPDATE gfidpath SET path1 = %s WHERE pgfid1 = ?""" % update_str1 self.cursor.execute(query, (path1, pgfid1)) # Set Path2 if pgfid1 and pgfid2 are same query = """UPDATE gfidpath SET path2 = %s WHERE pgfid2 = ?""" % update_str2 self.cursor.execute(query, (path1, pgfid1)) def gfidpath_set_path2(self, path2, pgfid2): # || is for concatenate in SQL if path2 == "": update_str = "? || bn2" else: update_str = "? || '%2F' || bn2" query = """UPDATE gfidpath SET path2 = %s WHERE pgfid2 = ?""" % update_str self.cursor.execute(query, (path2, pgfid2)) def when_create_mknod_mkdir(self, changelogfile, data): # E / # Add the Entry to DB pgfid1, bn1 = urllib.unquote_plus(data[6]).split("/", 1) # Quote again the basename bn1 = urllib.quote_plus(bn1.strip()) self.gfidpath_add(changelogfile, RecordType.NEW, data[1], pgfid1, bn1) def when_rename(self, changelogfile, data): # E RENAME / / pgfid1, bn1 = urllib.unquote_plus(data[3]).split("/", 1) pgfid2, bn2 = urllib.unquote_plus(data[4]).split("/", 1) # Quote again the basename bn1 = urllib.quote_plus(bn1.strip()) bn2 = urllib.quote_plus(bn2.strip()) if self.gfidpath_exists({"gfid": data[1], "type": "NEW", "pgfid1": pgfid1, "bn1": bn1}): # If / is same as CREATE, Update # / in NEW. self.gfidpath_update({"pgfid1": pgfid2, "bn1": bn2}, {"gfid": data[1], "type": "NEW", "pgfid1": pgfid1, "bn1": bn1}) elif self.gfidpath_exists({"gfid": data[1], "type": "RENAME", "pgfid2": pgfid1, "bn2": bn1}): # If / is same as /(may be previous # RENAME) then UPDATE / as / self.gfidpath_update({"pgfid2": pgfid2, "bn2": bn2}, {"gfid": data[1], "type": "RENAME", "pgfid2": pgfid1, "bn2": bn1}) else: # Else insert as RENAME self.gfidpath_add(changelogfile, RecordType.RENAME, data[1], pgfid1, bn1, pgfid2, bn2) def when_link_symlink(self, changelogfile, data): # E / # Add as New record in Db as Type NEW pgfid1, bn1 = urllib.unquote_plus(data[3]).split("/", 1) # Quote again the basename bn1 = urllib.quote_plus(bn1.strip()) self.gfidpath_add(changelogfile, RecordType.NEW, data[1], pgfid1, bn1) def when_data_meta(self, changelogfile, data): # If GFID row exists, Ignore else Add to Db if not self.gfidpath_exists({"gfid": data[1]}): self.gfidpath_add(changelogfile, RecordType.MODIFY, data[1]) def when_unlink_rmdir(self, changelogfile, data): # E / pgfid1, bn1 = urllib.unquote_plus(data[3]).split("/", 1) # Quote again the basename bn1 = urllib.quote_plus(bn1.strip()) deleted_path = data[4] if len(data) == 5 else "" if self.gfidpath_exists({"gfid": data[1], "type": "NEW", "pgfid1": pgfid1, "bn1": bn1}): # If path exists in table as NEW with same GFID # Delete that row self.gfidpath_delete({"gfid": data[1], "type": "NEW", "pgfid1": pgfid1, "bn1": bn1}) else: # Else Record as DELETE self.gfidpath_add(changelogfile, RecordType.DELETE, data[1], pgfid1, bn1, path1=deleted_path) # Update path1 as deleted_path if pgfid1 and bn1 is same as deleted self.gfidpath_update({"path1": deleted_path}, {"gfid": data[1], "pgfid1": pgfid1, "bn1": bn1}) # Update path2 as deleted_path if pgfid2 and bn2 is same as deleted self.gfidpath_update({"path2": deleted_path}, { "type": RecordType.RENAME, "gfid": data[1], "pgfid2": pgfid1, "bn2": bn1}) # If deleted directory is parent for somebody query1 = """UPDATE gfidpath SET path1 = ? || '%2F' || bn1 WHERE pgfid1 = ? AND path1 != ''""" self.cursor.execute(query1, (deleted_path, data[1])) query1 = """UPDATE gfidpath SET path2 = ? || '%2F' || bn1 WHERE pgfid2 = ? AND path2 != ''""" self.cursor.execute(query1, (deleted_path, data[1])) def commit(self): self.conn.commit()