Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change engine final #15

Open
wants to merge 6 commits into
base: managed-feature/switch-database-engine
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 125 additions & 0 deletions utils/change_engine/db_engine_converter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
#!/usr/bin/python3
from clickhouse_driver import Client
from interface import Database
import sys
from enum import Enum
from copy import deepcopy


class UserInteractor:
def __init__(self, default_answer=None):
self.__default_answer = default_answer

def ask(self, text):
answer = None
if self.__default_answer is not None:
answer = self.__default_answer
else:
answer = input(text)

return answer

def notify(self, text): #TODO logging
print(text)


class Action(Enum):
CONTINUE = 1
REVERT = 2
CONVERT = 3
TERMINATE = 4


class DBEngineConverter:
def __init__(self, user_interactor=UserInteractor()):
self.__temporary_prefix = None
self.__client = Client('localhost')
self.__user_interactor = deepcopy(user_interactor)

def __get_temporary_name(self, database_name):
assert self.__temporary_prefix is not None
return f'{self.__temporary_prefix}{database_name}'

def __get_needed_action(self, database, temp_database):
if temp_database.exists:

intersect_tables = set([table.name for table in database.tables]) & set([table.name for table in temp_database.tables])
tables = dict([(table.name, table) for table in database.tables])
temp_tables = dict([(table.name, table) for table in temp_database.tables])

for table in intersect_tables:
if tables[table].row_count > temp_tables[table].row_count:
temp_tables[table].drop()
else:
tables[table].drop()

answer = self.__user_interactor.ask(
f'Changing for database {database.name} failed in previous launch.\n'
'Do you want to continue changing? (y/n)\n'
)

if answer == 'y':
return Action.CONTINUE
elif answer == 'n':
return Action.REVERT
else:
self.__user_interactor.notify(f'Wrong answer, skipped')
return Action.TERMINATE
else:
return Action.CONVERT

def convert(self, database_name, engine_from, engine_to, safe_rename=True): #TODO engine name validation
self.__temporary_prefix = f'__temporary_{engine_from}_to_{engine_to}__'

database = Database(database_name, self.__client)
temp_database = Database(self.__get_temporary_name(database_name), self.__client)

action = self.__get_needed_action(database, temp_database)

if action == Action.CONVERT or action == Action.CONTINUE:
if not database.exists:
raise ValueError(f'database \'{database.name}\' does not exists')

if database.engine != engine_from:
raise ValueError(f'database {database.name} engine must be {engine_from}')

if not temp_database.exists:
temp_database.create(engine=engine_to)

temp_database.move_tables(database.tables, safe_rename)
database.drop()

if engine_to == 'Atomic':
temp_database.rename(database_name)
else:
database.create(engine=engine_to)
database.move_tables(temp_database.tables, safe_rename)
temp_database.drop()

assert database.engine == engine_to, 'something went wrong'
elif action == Action.REVERT:
if not database.exists:
database.create(engine=engine_from)

if database.engine == engine_from:
database.move_tables(temp_database.tables, safe_rename)
temp_database.drop()
else:
temp_database.move_tables(database.tables, safe_rename)
database.drop()
database.create(engine=engine_from)
database.move_tables(temp_database.tables, safe_rename)
temp_database.drop()
elif action == Action.TERMINATE:
pass


def main():
converter = DBEngineConverter()

db, engine_from, engine_to = sys.argv[1:4]
converter.convert(db, engine_from, engine_to)


if __name__ == '__main__':
main()
47 changes: 34 additions & 13 deletions utils/change_engine/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@ def __init__(self, name, client):
self.__database, self.__name = name.split('.')
self.__client = client

@property
def row_count(self):
count = self.__client.execute(f'SELECT COUNT(*) FROM {self.fullname}')[0][0]
return count

@property
def name(self):
return self.__name
Expand All @@ -18,28 +23,41 @@ def fullname(self):

@property
def exists(self):
tables = self.__client.execute(f'SELECT name FROM system.tables WHERE database == \'{self.database_name}\' and name == \'{self.name}\'')
tables = self.__client.execute(f'SELECT name FROM system.tables WHERE database == \'{self.database_name}\' and name == \'{self.name}\';')
return len(tables) == 1

def create(self):
assert not self.exists, f'Table {self.fullname} already exists'

self.__client.execute(f'CREATE TABLE {self.fullname}')
self.__client.execute(f'CREATE TABLE {self.fullname};')

def rename_table(self, new_name):
assert self.exists, f'Table {self.fullname} does not exists'

self.__client.execute(f'RENAME TABLE {self.fullname} TO {new_name}')
self.__client.execute(f'RENAME TABLE {self.fullname} TO {new_name};')
self.__fullname = new_name
self.__name, self.__database = self.__fullname.split('.')

def safe_rename_table(self, new_name):
assert self.exists, f'Table {self.fullname} does not exists'

self.__client.execute(f'CREATE TABLE {new_name} AS {self.fullname};')
self.__client.execute(f'INSERT INTO {new_name} SELECT * FROM {self.fullname};')
self.drop()

self.__fullname = new_name
self.__name, self.__database = self.__fullname.split('.')

def rename_database(self, new_database):
self.rename_table(f'{new_database}.{self.name}')

def safe_rename_database(self, new_database):
self.safe_rename_table(f'{new_database}.{self.name}')

def drop(self):
assert self.exists, f'Table {self.fullname} does not exists'

self.__client.execute(f'DROP TABLE {self.fullname}')
self.__client.execute(f'DROP TABLE {self.fullname};')


class Database:
Expand All @@ -53,21 +71,21 @@ def name(self):

@property
def exists(self):
dbs = self.__client.execute(f'SELECT name FROM system.databases WHERE name == \'{self.name}\'')
dbs = self.__client.execute(f'SELECT name FROM system.databases WHERE name == \'{self.name}\';')
return len(dbs) == 1

@property
def engine(self):
assert self.exists, f'Database {self.name} does not exists'

dbs = self.__client.execute(f'SELECT engine FROM system.databases WHERE name == \'{self.name}\'')
dbs = self.__client.execute(f'SELECT engine FROM system.databases WHERE name == \'{self.name}\';')
return dbs[0][0]

@property
def tables(self):
assert self.exists, f'Database {self.name} does not exists'

tables = self.__client.execute(f'SELECT name FROM system.tables WHERE database == \'{self.name}\'')
tables = self.__client.execute(f'SELECT name FROM system.tables WHERE database == \'{self.name}\';')
tables = map(lambda row: row[0], tables)

return [self.__construct_table(name) for name in tables]
Expand All @@ -78,20 +96,20 @@ def __construct_table(self, table_name):
def get_table(self, table_name):
assert self.exists, f'Database {self.name} does not exists'

tables = self.__client.execute(f'SELECT name FROM system.tables WHERE database == \'{self.name}\' and name == \'{table_name}\'')
tables = self.__client.execute(f'SELECT name FROM system.tables WHERE database == \'{self.name}\' and name == \'{table_name}\';')
assert len(tables) == 1, f'Table {table_name} does not exists'

return self.__construct_table(table_name)

def create(self, engine = 'Ordinary'):
assert not self.exists, f'Database {self.name} already exists'

self.__client.execute(f'CREATE DATABASE {self.name} ENGINE={engine}')
self.__client.execute(f'CREATE DATABASE {self.name} ENGINE={engine};')

def rename(self, new_name):
assert self.exists, f'Database {self.name} does not exists'

self.__client.execute(f'RENAME DATABASE {self.name} TO {new_name}')
self.__client.execute(f'RENAME DATABASE {self.name} TO {new_name};')
self.__name = new_name

def create_table(self, table_name):
Expand All @@ -102,11 +120,14 @@ def create_table(self, table_name):

return table

def move_tables(self, tables):
def move_tables(self, tables, safe_rename):
for table in tables:
table.rename_database(self.name)
if safe_rename:
table.safe_rename_database(self.name)
else:
table.rename_database(self.name)

def drop(self):
assert self.exists, f'Database {self.name} does not exists'

self.__client.execute(f'DROP DATABASE {self.name}')
self.__client.execute(f'DROP DATABASE {self.name};')
Loading