diff --git a/utils/change_engine/db_engine_converter.py b/utils/change_engine/db_engine_converter.py new file mode 100755 index 000000000000..018835376a4e --- /dev/null +++ b/utils/change_engine/db_engine_converter.py @@ -0,0 +1,125 @@ +#!/usr/bin/python3 +from clickhouse_driver import Client +from interface import Database +import sys +from enum import Enum +from copy import deepcopy + + +class UserInteractor: + def __init__(self, default_answer=None): + self.__default_answer = default_answer + + def ask(self, text): + answer = None + if self.__default_answer is not None: + answer = self.__default_answer + else: + answer = input(text) + + return answer + + def notify(self, text): #TODO logging + print(text) + + +class Action(Enum): + CONTINUE = 1 + REVERT = 2 + CONVERT = 3 + TERMINATE = 4 + + +class DBEngineConverter: + def __init__(self, user_interactor=UserInteractor()): + self.__temporary_prefix = None + self.__client = Client('localhost') + self.__user_interactor = deepcopy(user_interactor) + + def __get_temporary_name(self, database_name): + assert self.__temporary_prefix is not None + return f'{self.__temporary_prefix}{database_name}' + + def __get_needed_action(self, database, temp_database): + if temp_database.exists: + + intersect_tables = set([table.name for table in database.tables]) & set([table.name for table in temp_database.tables]) + tables = dict([(table.name, table) for table in database.tables]) + temp_tables = dict([(table.name, table) for table in temp_database.tables]) + + for table in intersect_tables: + if tables[table].row_count > temp_tables[table].row_count: + temp_tables[table].drop() + else: + tables[table].drop() + + answer = self.__user_interactor.ask( + f'Changing for database {database.name} failed in previous launch.\n' + 'Do you want to continue changing? (y/n)\n' + ) + + if answer == 'y': + return Action.CONTINUE + elif answer == 'n': + return Action.REVERT + else: + self.__user_interactor.notify(f'Wrong answer, skipped') + return Action.TERMINATE + else: + return Action.CONVERT + + def convert(self, database_name, engine_from, engine_to, safe_rename=True): #TODO engine name validation + self.__temporary_prefix = f'__temporary_{engine_from}_to_{engine_to}__' + + database = Database(database_name, self.__client) + temp_database = Database(self.__get_temporary_name(database_name), self.__client) + + action = self.__get_needed_action(database, temp_database) + + if action == Action.CONVERT or action == Action.CONTINUE: + if not database.exists: + raise ValueError(f'database \'{database.name}\' does not exists') + + if database.engine != engine_from: + raise ValueError(f'database {database.name} engine must be {engine_from}') + + if not temp_database.exists: + temp_database.create(engine=engine_to) + + temp_database.move_tables(database.tables, safe_rename) + database.drop() + + if engine_to == 'Atomic': + temp_database.rename(database_name) + else: + database.create(engine=engine_to) + database.move_tables(temp_database.tables, safe_rename) + temp_database.drop() + + assert database.engine == engine_to, 'something went wrong' + elif action == Action.REVERT: + if not database.exists: + database.create(engine=engine_from) + + if database.engine == engine_from: + database.move_tables(temp_database.tables, safe_rename) + temp_database.drop() + else: + temp_database.move_tables(database.tables, safe_rename) + database.drop() + database.create(engine=engine_from) + database.move_tables(temp_database.tables, safe_rename) + temp_database.drop() + elif action == Action.TERMINATE: + pass + + +def main(): + converter = DBEngineConverter() + + db, engine_from, engine_to = sys.argv[1:4] + converter.convert(db, engine_from, engine_to) + + +if __name__ == '__main__': + main() diff --git a/utils/change_engine/interface.py b/utils/change_engine/interface.py index 9120e211da7d..0269de892427 100644 --- a/utils/change_engine/interface.py +++ b/utils/change_engine/interface.py @@ -4,6 +4,11 @@ def __init__(self, name, client): self.__database, self.__name = name.split('.') self.__client = client + @property + def row_count(self): + count = self.__client.execute(f'SELECT COUNT(*) FROM {self.fullname}')[0][0] + return count + @property def name(self): return self.__name @@ -18,28 +23,41 @@ def fullname(self): @property def exists(self): - tables = self.__client.execute(f'SELECT name FROM system.tables WHERE database == \'{self.database_name}\' and name == \'{self.name}\'') + tables = self.__client.execute(f'SELECT name FROM system.tables WHERE database == \'{self.database_name}\' and name == \'{self.name}\';') return len(tables) == 1 def create(self): assert not self.exists, f'Table {self.fullname} already exists' - self.__client.execute(f'CREATE TABLE {self.fullname}') + self.__client.execute(f'CREATE TABLE {self.fullname};') def rename_table(self, new_name): assert self.exists, f'Table {self.fullname} does not exists' - self.__client.execute(f'RENAME TABLE {self.fullname} TO {new_name}') + self.__client.execute(f'RENAME TABLE {self.fullname} TO {new_name};') + self.__fullname = new_name + self.__name, self.__database = self.__fullname.split('.') + + def safe_rename_table(self, new_name): + assert self.exists, f'Table {self.fullname} does not exists' + + self.__client.execute(f'CREATE TABLE {new_name} AS {self.fullname};') + self.__client.execute(f'INSERT INTO {new_name} SELECT * FROM {self.fullname};') + self.drop() + self.__fullname = new_name self.__name, self.__database = self.__fullname.split('.') def rename_database(self, new_database): self.rename_table(f'{new_database}.{self.name}') + def safe_rename_database(self, new_database): + self.safe_rename_table(f'{new_database}.{self.name}') + def drop(self): assert self.exists, f'Table {self.fullname} does not exists' - self.__client.execute(f'DROP TABLE {self.fullname}') + self.__client.execute(f'DROP TABLE {self.fullname};') class Database: @@ -53,21 +71,21 @@ def name(self): @property def exists(self): - dbs = self.__client.execute(f'SELECT name FROM system.databases WHERE name == \'{self.name}\'') + dbs = self.__client.execute(f'SELECT name FROM system.databases WHERE name == \'{self.name}\';') return len(dbs) == 1 @property def engine(self): assert self.exists, f'Database {self.name} does not exists' - dbs = self.__client.execute(f'SELECT engine FROM system.databases WHERE name == \'{self.name}\'') + dbs = self.__client.execute(f'SELECT engine FROM system.databases WHERE name == \'{self.name}\';') return dbs[0][0] @property def tables(self): assert self.exists, f'Database {self.name} does not exists' - tables = self.__client.execute(f'SELECT name FROM system.tables WHERE database == \'{self.name}\'') + tables = self.__client.execute(f'SELECT name FROM system.tables WHERE database == \'{self.name}\';') tables = map(lambda row: row[0], tables) return [self.__construct_table(name) for name in tables] @@ -78,7 +96,7 @@ def __construct_table(self, table_name): def get_table(self, table_name): assert self.exists, f'Database {self.name} does not exists' - tables = self.__client.execute(f'SELECT name FROM system.tables WHERE database == \'{self.name}\' and name == \'{table_name}\'') + tables = self.__client.execute(f'SELECT name FROM system.tables WHERE database == \'{self.name}\' and name == \'{table_name}\';') assert len(tables) == 1, f'Table {table_name} does not exists' return self.__construct_table(table_name) @@ -86,12 +104,12 @@ def get_table(self, table_name): def create(self, engine = 'Ordinary'): assert not self.exists, f'Database {self.name} already exists' - self.__client.execute(f'CREATE DATABASE {self.name} ENGINE={engine}') + self.__client.execute(f'CREATE DATABASE {self.name} ENGINE={engine};') def rename(self, new_name): assert self.exists, f'Database {self.name} does not exists' - self.__client.execute(f'RENAME DATABASE {self.name} TO {new_name}') + self.__client.execute(f'RENAME DATABASE {self.name} TO {new_name};') self.__name = new_name def create_table(self, table_name): @@ -102,11 +120,14 @@ def create_table(self, table_name): return table - def move_tables(self, tables): + def move_tables(self, tables, safe_rename): for table in tables: - table.rename_database(self.name) + if safe_rename: + table.safe_rename_database(self.name) + else: + table.rename_database(self.name) def drop(self): assert self.exists, f'Database {self.name} does not exists' - self.__client.execute(f'DROP DATABASE {self.name}') + self.__client.execute(f'DROP DATABASE {self.name};') diff --git a/utils/change_engine/test.py b/utils/change_engine/test.py index 6da8ca2d043e..de5ed1968532 100644 --- a/utils/change_engine/test.py +++ b/utils/change_engine/test.py @@ -1,7 +1,6 @@ from clickhouse_driver import Client from interface import Database -from ordinary_to_atomic import DBOrdinaryToAtomicConverter, UserInteractor -import logging +from db_engine_converter import DBEngineConverter, UserInteractor import multiprocessing import pytest import random @@ -11,81 +10,251 @@ client = Client('localhost') -def check_changing(tables, engine_from, engine_to): +def check_changing(tables, engine_from, engine_to, tables_type='numbers'): def wrapper(function, *args, **kwargs): def wrapped(): name = f'test_{random.randint(1, 100000)}' - client.execute(f'CREATE DATABASE {name} ENGINE = {engine_from}') + client.execute(f'CREATE DATABASE {name} ENGINE = {engine_from};') database = Database(name, client) + row_counts = {} + for i in range(tables): - client.execute( - f''' - CREATE TABLE {name}.table_{i}(  - id UInt32, - name String - ) ENGINE = MergeTree ORDER BY (id, name); - ''' - ) + if tables_type == 'numbers': + client.execute( + f''' + CREATE TABLE {name}.table_{i}(  + id UInt32 + ) ENGINE = MergeTree ORDER BY id; + ''' + ) + + row_count = random.randint(1, 100) + + values = ('({})\n' * row_count).format(*range(row_count)) + client.execute(f'INSERT INTO {name}.table_{i} VALUES {values};') + + row_counts[f'{name}.table_{i}'] = row_count + elif tables_type == 'cell_towers': + client.execute( + f''' + CREATE TABLE {name}.table_{i} AS cell_towers + ''' + ) + + client.execute( + f''' + INSERT INTO {name}.table_{i} SELECT * FROM cell_towers + ''' + ) + + row_count = client.execute(f'SELECT COUNT(*) FROM {name}.table_{i}')[0][0] + row_counts[f'{name}.table_{i}'] = row_count table_names = sorted([t.name for t in database.tables]) + begin = time.time() + res = function(*args, database_name = name, **kwargs) + elapsed_time = (time.time() - begin) + print(f'elapsed time: {elapsed_time}') + correct_engine = database.engine == engine_to + + if not correct_engine: + print('wrong engine', database.engine, engine_to) + correct_length = len(database.tables) == tables + + if not correct_length: + print('wrong count', len(database.tables), tables) + correct_tables = table_names == sorted([t.name for t in database.tables]) + correct_data = True - client.execute(f'drop database {name}') + for table_name, row_count in row_counts.items(): + count = client.execute(f'SELECT COUNT(*) FROM {table_name};')[0][0] + correct_data &= count == row_count + + client.execute(f'drop database {name};') assert correct_engine assert correct_length assert correct_tables + assert correct_data return res return wrapped return wrapper -@check_changing(tables = 10, engine_from = 'Ordinary', engine_to='Atomic') +# Ordinary to Atomic + + +@check_changing(tables = 10, engine_from = 'Ordinary', engine_to = 'Atomic') def test_convert_ordinary_to_atomic(database_name): - converter = DBOrdinaryToAtomicConverter() + converter = DBEngineConverter() - converter.convert(database_name) + converter.convert(database_name, 'Ordinary', 'Atomic') -@check_changing(tables = 1000, engine_from = 'Ordinary', engine_to='Atomic') +@check_changing(tables = 1000, engine_from = 'Ordinary', engine_to = 'Atomic') def test_stress_ordinary_to_atomic(database_name): - converter = DBOrdinaryToAtomicConverter() + converter = DBEngineConverter() - converter.convert(database_name) + converter.convert(database_name, 'Ordinary', 'Atomic') -@check_changing(tables = 1000, engine_from = 'Ordinary', engine_to='Atomic') +@check_changing(tables = 500, engine_from = 'Ordinary', engine_to = 'Atomic') def test_continue_after_fail_ordinary_to_atomic(database_name): - converter = DBOrdinaryToAtomicConverter(UserInteractor('y')) + converter = DBEngineConverter(UserInteractor('y')) - process = multiprocessing.Process(target=converter.convert, args=(database_name,)) + process = multiprocessing.Process(target=converter.convert, args=(database_name, 'Ordinary', 'Atomic')) process.start() time.sleep(1) process.terminate() - converter.convert(database_name) + time.sleep(0.5) + + converter.convert(database_name, 'Ordinary', 'Atomic') -@check_changing(tables = 1000, engine_from = 'Ordinary', engine_to='Ordinary') +@check_changing(tables = 500, engine_from = 'Ordinary', engine_to = 'Ordinary') def test_revert_after_fail_ordinary_to_atomic(database_name): - converter = DBOrdinaryToAtomicConverter(UserInteractor('n')) + converter = DBEngineConverter(UserInteractor('n')) + + process = multiprocessing.Process(target=converter.convert, args=(database_name, 'Ordinary', 'Atomic')) + process.start() + + time.sleep(1) + + process.terminate() + + time.sleep(0.5) + + converter.convert(database_name, 'Ordinary', 'Atomic') + + +@check_changing(tables = 500, engine_from = 'Ordinary', engine_to = 'Atomic') +def test_atomicity_table_rename_ordinary_to_atomic(database_name): + for i in range(1000): + converter = DBEngineConverter(UserInteractor('n')) + + process = multiprocessing.Process(target=converter.convert, args=(database_name, 'Ordinary', 'Atomic', False)) + process.start() + + time.sleep(0.5) + + process.terminate() + + time.sleep(0.5) + + converter = DBEngineConverter(UserInteractor('y')) + converter.convert(database_name, 'Ordinary', 'Atomic') + + +@check_changing(tables = 5000, engine_from = 'Ordinary', engine_to = 'Atomic') +def test_convert_fails_ordinary_to_atomic(database_name): + for i in range(10): + converter = DBEngineConverter(UserInteractor('y')) + + process = multiprocessing.Process(target=converter.convert, args=(database_name, 'Ordinary', 'Atomic')) + process.start() + + time.sleep(1) + + process.terminate() - process = multiprocessing.Process(target=converter.convert, args=(database_name,)) + time.sleep(0.5) + + converter = DBEngineConverter(UserInteractor('y')) + converter.convert(database_name, 'Ordinary', 'Atomic') + +# Atomic to Ordinary + + +@check_changing(tables = 10, engine_from = 'Atomic', engine_to = 'Ordinary') +def test_convert_atomic_to_ordinary(database_name): + converter = DBEngineConverter() + + converter.convert(database_name, 'Atomic', 'Ordinary') + + +@check_changing(tables = 1000, engine_from = 'Atomic', engine_to = 'Ordinary') +def test_stress_atomic_to_ordinary(database_name): + converter = DBEngineConverter() + + converter.convert(database_name, 'Atomic', 'Ordinary') + + +@check_changing(tables = 500, engine_from = 'Atomic', engine_to = 'Ordinary') +def test_continue_after_fail_atomic_to_ordinary(database_name): + converter = DBEngineConverter(UserInteractor('y')) + + process = multiprocessing.Process(target=converter.convert, args=(database_name, 'Atomic', 'Ordinary')) + process.start() + + time.sleep(1) + + process.terminate() + + time.sleep(0.5) + + converter.convert(database_name, 'Atomic', 'Ordinary') + + +@check_changing(tables = 500, engine_from = 'Atomic', engine_to = 'Atomic') +def test_revert_after_fail_atomic_to_ordinary(database_name): + converter = DBEngineConverter(UserInteractor('n')) + + process = multiprocessing.Process(target=converter.convert, args=(database_name, 'Atomic', 'Ordinary')) process.start() time.sleep(1) process.terminate() - converter.convert(database_name) + time.sleep(0.5) + + converter.convert(database_name, 'Atomic', 'Ordinary') + + +@check_changing(tables = 500, engine_from = 'Atomic', engine_to = 'Ordinary') +def test_atomicity_table_rename_atomic_to_ordinary(database_name): + for i in range(1000): + converter = DBEngineConverter(UserInteractor('n')) + + process = multiprocessing.Process(target=converter.convert, args=(database_name, 'Atomic', 'Ordinary', False)) + process.start() + + time.sleep(0.5) + + process.terminate() + + time.sleep(0.5) + + converter = DBEngineConverter(UserInteractor('y')) + converter.convert(database_name, 'Atomic', 'Ordinary') + + +@check_changing(tables = 500, engine_from = 'Atomic', engine_to = 'Ordinary') +def test_convert_fails_atomic_to_ordinary(database_name): + for i in range(500): + converter = DBEngineConverter(UserInteractor('y')) + + process = multiprocessing.Process(target=converter.convert, args=(database_name, 'Atomic', 'Ordinary')) + process.start() + + time.sleep(1) + + process.terminate() + + time.sleep(0.5) + + converter = DBEngineConverter(UserInteractor('y')) + converter.convert(database_name, 'Atomic', 'Ordinary') diff --git a/utils/change_engine/test_performance.py b/utils/change_engine/test_performance.py new file mode 100644 index 000000000000..fe74b518aaa9 --- /dev/null +++ b/utils/change_engine/test_performance.py @@ -0,0 +1,204 @@ +from db_engine_converter import DBEngineConverter +from test import check_changing + + +# Cell towers + + +@check_changing(tables = 100, engine_from = 'Atomic', engine_to = 'Ordinary', tables_type = 'cell_towers') +def test_cell_towers_100(database_name): + converter = DBEngineConverter() + + converter.convert(database_name, 'Atomic', 'Ordinary') + + +@check_changing(tables = 50, engine_from = 'Atomic', engine_to = 'Ordinary', tables_type = 'cell_towers') +def test_cell_towers_50(database_name): + converter = DBEngineConverter() + + converter.convert(database_name, 'Atomic', 'Ordinary') + + +@check_changing(tables = 20, engine_from = 'Atomic', engine_to = 'Ordinary', tables_type = 'cell_towers') +def test_cell_towers_20(database_name): + converter = DBEngineConverter() + + converter.convert(database_name, 'Atomic', 'Ordinary') + + +@check_changing(tables = 10, engine_from = 'Atomic', engine_to = 'Ordinary', tables_type = 'cell_towers') +def test_cell_towers_10(database_name): + converter = DBEngineConverter() + + converter.convert(database_name, 'Atomic', 'Ordinary') + + +@check_changing(tables = 5, engine_from = 'Atomic', engine_to = 'Ordinary', tables_type = 'cell_towers') +def test_cell_towers_5(database_name): + converter = DBEngineConverter() + + converter.convert(database_name, 'Atomic', 'Ordinary') + + +@check_changing(tables = 2, engine_from = 'Atomic', engine_to = 'Ordinary', tables_type = 'cell_towers') +def test_cell_towers_2(database_name): + converter = DBEngineConverter() + + converter.convert(database_name, 'Atomic', 'Ordinary') + + +@check_changing(tables = 1, engine_from = 'Atomic', engine_to = 'Ordinary', tables_type = 'cell_towers') +def test_cell_towers_1(database_name): + converter = DBEngineConverter() + + converter.convert(database_name, 'Atomic', 'Ordinary') + + +@check_changing(tables = 100, engine_from = 'Atomic', engine_to = 'Ordinary', tables_type = 'cell_towers') +def test_cell_towers_100_unsafe(database_name): + converter = DBEngineConverter() + + converter.convert(database_name, 'Atomic', 'Ordinary', safe_rename=False) + + +@check_changing(tables = 50, engine_from = 'Atomic', engine_to = 'Ordinary', tables_type = 'cell_towers') +def test_cell_towers_50_unsafe(database_name): + converter = DBEngineConverter() + + converter.convert(database_name, 'Atomic', 'Ordinary', safe_rename=False) + + +@check_changing(tables = 20, engine_from = 'Atomic', engine_to = 'Ordinary', tables_type = 'cell_towers') +def test_cell_towers_20_unsafe(database_name): + converter = DBEngineConverter() + + converter.convert(database_name, 'Atomic', 'Ordinary', safe_rename=False) + + +@check_changing(tables = 10, engine_from = 'Atomic', engine_to = 'Ordinary', tables_type = 'cell_towers') +def test_cell_towers_10_unsafe(database_name): + converter = DBEngineConverter() + + converter.convert(database_name, 'Atomic', 'Ordinary', safe_rename=False) + + +@check_changing(tables = 5, engine_from = 'Atomic', engine_to = 'Ordinary', tables_type = 'cell_towers') +def test_cell_towers_5_unsafe(database_name): + converter = DBEngineConverter() + + converter.convert(database_name, 'Atomic', 'Ordinary', safe_rename=False) + + +@check_changing(tables = 2, engine_from = 'Atomic', engine_to = 'Ordinary', tables_type = 'cell_towers') +def test_cell_towers_2_unsafe(database_name): + converter = DBEngineConverter() + + converter.convert(database_name, 'Atomic', 'Ordinary', safe_rename=False) + + +@check_changing(tables = 1, engine_from = 'Atomic', engine_to = 'Ordinary', tables_type = 'cell_towers') +def test_cell_towers_1_unsafe(database_name): + converter = DBEngineConverter() + + converter.convert(database_name, 'Atomic', 'Ordinary', safe_rename=False) + + +# Numbers + + +@check_changing(tables = 100, engine_from = 'Atomic', engine_to = 'Ordinary') +def test_numbers_100(database_name): + converter = DBEngineConverter() + + converter.convert(database_name, 'Atomic', 'Ordinary') + + +@check_changing(tables = 50, engine_from = 'Atomic', engine_to = 'Ordinary') +def test_numbers_50(database_name): + converter = DBEngineConverter() + + converter.convert(database_name, 'Atomic', 'Ordinary') + + +@check_changing(tables = 20, engine_from = 'Atomic', engine_to = 'Ordinary') +def test_numbers_20(database_name): + converter = DBEngineConverter() + + converter.convert(database_name, 'Atomic', 'Ordinary') + + +@check_changing(tables = 10, engine_from = 'Atomic', engine_to = 'Ordinary') +def test_numbers_10(database_name): + converter = DBEngineConverter() + + converter.convert(database_name, 'Atomic', 'Ordinary') + + +@check_changing(tables = 5, engine_from = 'Atomic', engine_to = 'Ordinary') +def test_numbers_5(database_name): + converter = DBEngineConverter() + + converter.convert(database_name, 'Atomic', 'Ordinary') + + +@check_changing(tables = 2, engine_from = 'Atomic', engine_to = 'Ordinary') +def test_numbers_2(database_name): + converter = DBEngineConverter() + + converter.convert(database_name, 'Atomic', 'Ordinary') + + +@check_changing(tables = 1, engine_from = 'Atomic', engine_to = 'Ordinary') +def test_numbers_1(database_name): + converter = DBEngineConverter() + + converter.convert(database_name, 'Atomic', 'Ordinary') + + +@check_changing(tables = 100, engine_from = 'Atomic', engine_to = 'Ordinary') +def test_numbers_100_unsafe(database_name): + converter = DBEngineConverter() + + converter.convert(database_name, 'Atomic', 'Ordinary', safe_rename=False) + + +@check_changing(tables = 50, engine_from = 'Atomic', engine_to = 'Ordinary') +def test_numbers_50_unsafe(database_name): + converter = DBEngineConverter() + + converter.convert(database_name, 'Atomic', 'Ordinary', safe_rename=False) + + +@check_changing(tables = 20, engine_from = 'Atomic', engine_to = 'Ordinary') +def test_numbers_20_unsafe(database_name): + converter = DBEngineConverter() + + converter.convert(database_name, 'Atomic', 'Ordinary', safe_rename=False) + + +@check_changing(tables = 10, engine_from = 'Atomic', engine_to = 'Ordinary') +def test_numbers_10_unsafe(database_name): + converter = DBEngineConverter() + + converter.convert(database_name, 'Atomic', 'Ordinary', safe_rename=False) + + +@check_changing(tables = 5, engine_from = 'Atomic', engine_to = 'Ordinary') +def test_numbers_5_unsafe(database_name): + converter = DBEngineConverter() + + converter.convert(database_name, 'Atomic', 'Ordinary', safe_rename=False) + + +@check_changing(tables = 2, engine_from = 'Atomic', engine_to = 'Ordinary') +def test_numbers_2_unsafe(database_name): + converter = DBEngineConverter() + + converter.convert(database_name, 'Atomic', 'Ordinary', safe_rename=False) + + +@check_changing(tables = 1, engine_from = 'Atomic', engine_to = 'Ordinary') +def test_numbers_1_unsafe(database_name): + converter = DBEngineConverter() + + converter.convert(database_name, 'Atomic', 'Ordinary', safe_rename=False)