diff --git a/ydb/tests/stress/olap_workload/__main__.py b/ydb/tests/stress/olap_workload/__main__.py index a3cfd6fd86b2..49997f62dca7 100644 --- a/ydb/tests/stress/olap_workload/__main__.py +++ b/ydb/tests/stress/olap_workload/__main__.py @@ -1,242 +1,7 @@ # -*- coding: utf-8 -*- import argparse -import ydb -import time -import random -import threading -from enum import Enum - -from ydb.tests.stress.common.common import WorkloadBase, YdbClient - -ydb.interceptor.monkey_patch_event_handler() - -supported_pk_types = [ - # Bool https://github.com/ydb-platform/ydb/issues/13037 - "Int8", - "Int16", - "Int32", - "Int64", - "Uint8", - "Uint16", - "Uint32", - "Uint64", - "Decimal(22,9)", - # "DyNumber", https://github.com/ydb-platform/ydb/issues/13048 - - "String", - "Utf8", - # Uuid", https://github.com/ydb-platform/ydb/issues/13047 - - "Date", - "Datetime", - "Datetime64", - "Timestamp", - # "Interval", https://github.com/ydb-platform/ydb/issues/13050 -] - -supported_types = supported_pk_types + [ - "Float", - "Double", - "Json", - "JsonDocument", - "Yson" -] - - -class WorkloadTablesCreateDrop(WorkloadBase): - class TableStatus(Enum): - CREATING = "Creating", - AVAILABLE = "Available", - DELITING = "Deleting" - - def __init__(self, client, prefix, stop, allow_nullables_in_pk): - super().__init__(client, prefix, "create_drop", stop) - self.allow_nullables_in_pk = allow_nullables_in_pk - self.created = 0 - self.deleted = 0 - self.tables = {} - self.lock = threading.Lock() - - def get_stat(self): - with self.lock: - return f"Created: {self.created}, Deleted: {self.deleted}, Exists: {len(self.tables)}" - - def _generate_new_table_n(self): - while True: - r = random.randint(1, 40000) - with self.lock: - if r not in self.tables: - self.tables[r] = WorkloadTablesCreateDrop.TableStatus.CREATING - return r - - def _get_table_to_delete(self): - with self.lock: - for n, s in self.tables.items(): - if s == WorkloadTablesCreateDrop.TableStatus.AVAILABLE: - self.tables[n] = WorkloadTablesCreateDrop.TableStatus.DELITING - return n - return None - - def create_table(self, table): - path = self.get_table_path(table) - column_n = random.randint(1, 10000) - primary_key_column_n = random.randint(1, column_n) - partition_key_column_n = random.randint(1, primary_key_column_n) - column_defs = [] - for i in range(column_n): - if i < primary_key_column_n: - c = random.choice(supported_pk_types) - if not self.allow_nullables_in_pk or random.choice([False, True]): - c += " NOT NULL" - else: - c = random.choice(supported_types) - if random.choice([False, True]): - c += " NOT NULL" - column_defs.append(c) - - stmt = f""" - CREATE TABLE `{path}` ( - {", ".join(["c" + str(i) + " " + column_defs[i] for i in range(column_n)])}, - PRIMARY KEY({", ".join(["c" + str(i) for i in range(primary_key_column_n)])}) - ) - PARTITION BY HASH({", ".join(["c" + str(i) for i in range(partition_key_column_n)])}) - WITH ( - STORE = COLUMN - ) - """ - self.client.query(stmt, True) - - def _create_tables_loop(self): - while not self.is_stop_requested(): - n = self._generate_new_table_n() - self.create_table(str(n)) - with self.lock: - self.tables[n] = WorkloadTablesCreateDrop.TableStatus.AVAILABLE - self.created += 1 - - def _delete_tables_loop(self): - while not self.is_stop_requested(): - n = self._get_table_to_delete() - if n is None: - print("create_drop: No tables to delete") - time.sleep(10) - continue - self.client.drop_table(self.get_table_path(str(n))) - with self.lock: - del self.tables[n] - self.deleted += 1 - - def get_workload_thread_funcs(self): - r = [self._create_tables_loop for x in range(0, 10)] - r.append(self._delete_tables_loop) - return r - - -class WorkloadInsertDelete(WorkloadBase): - def __init__(self, client, prefix, stop): - super().__init__(client, prefix, "insert_delete", stop) - self.inserted = 0 - self.current = 0 - self.table_name = "table" - self.lock = threading.Lock() - - def get_stat(self): - with self.lock: - return f"Inserted: {self.inserted}, Current: {self.current}" - - def _loop(self): - table_path = self.get_table_path(self.table_name) - self.client.query( - f""" - CREATE TABLE `{table_path}` ( - id Int64 NOT NULL, - i64Val Int64, - PRIMARY KEY(id) - ) - PARTITION BY HASH(id) - WITH ( - STORE = COLUMN - ) - """, - True, - ) - i = 1 - while not self.is_stop_requested(): - self.client.query( - f""" - INSERT INTO `{table_path}` (`id`, `i64Val`) - VALUES - ({i * 2}, {i * 10}), - ({i * 2 + 1}, {i * 10 + 1}) - """, - False, - ) - - self.client.query( - f""" - DELETE FROM `{table_path}` - WHERE i64Val % 2 == 1 - """, - False, - ) - - actual = self.client.query( - f""" - SELECT COUNT(*) as cnt, SUM(i64Val) as vals, SUM(id) as ids FROM `{table_path}` - """, - False, - )[0].rows[0] - expected = {"cnt": i, "vals": i * (i + 1) * 5, "ids": i * (i + 1)} - if actual != expected: - raise Exception(f"Incorrect result: expected:{expected}, actual:{actual}") - i += 1 - with self.lock: - self.inserted += 2 - self.current = actual["cnt"] - - def get_workload_thread_funcs(self): - return [self._loop] - - -class WorkloadRunner: - def __init__(self, client, name, duration, allow_nullables_in_pk): - self.client = client - self.name = args.path - self.tables_prefix = "/".join([self.client.database, self.name]) - self.duration = args.duration - self.allow_nullables_in_pk = allow_nullables_in_pk - - def __enter__(self): - self._cleanup() - return self - - def __exit__(self, exc_type, exc_value, traceback): - self._cleanup() - - def _cleanup(self): - print(f"Cleaning up {self.tables_prefix}...") - deleted = client.remove_recursively(self.tables_prefix) - print(f"Cleaning up {self.tables_prefix}... done, {deleted} tables deleted") - - def run(self): - stop = threading.Event() - workloads = [ - WorkloadTablesCreateDrop(self.client, self.name, stop, self.allow_nullables_in_pk), - WorkloadInsertDelete(self.client, self.name, stop), - ] - for w in workloads: - w.start() - started_at = started_at = time.time() - while time.time() - started_at < self.duration: - print(f"Elapsed {(int)(time.time() - started_at)} seconds, stat:") - for w in workloads: - print(f"\t{w.name}: {w.get_stat()}") - time.sleep(10) - stop.set() - print("Waiting for stop...") - for w in workloads: - w.join() - print("Waiting for stop... stopped") +from ydb.tests.stress.common.common import YdbClient +from ydb.tests.stress.olap_workload.workload import WorkloadRunner if __name__ == "__main__": diff --git a/ydb/tests/stress/olap_workload/tests/test_workload.py b/ydb/tests/stress/olap_workload/tests/test_workload.py index 157c14efd391..a12d5178f3c1 100644 --- a/ydb/tests/stress/olap_workload/tests/test_workload.py +++ b/ydb/tests/stress/olap_workload/tests/test_workload.py @@ -1,9 +1,9 @@ # -*- coding: utf-8 -*- -import yatest - from ydb.tests.library.harness.kikimr_runner import KiKiMR from ydb.tests.library.harness.kikimr_config import KikimrConfigGenerator from ydb.tests.library.common.types import Erasure +from ydb.tests.stress.common.common import YdbClient +from ydb.tests.stress.olap_workload.workload import WorkloadRunner class TestYdbWorkload(object): @@ -22,14 +22,7 @@ def teardown_class(cls): cls.cluster.stop() def test(self): - workload_path = yatest.common.build_path("ydb/tests/stress/olap_workload/olap_workload") - yatest.common.execute( - [ - workload_path, - "--endpoint", f"grpc://localhost:{self.cluster.nodes[1].grpc_port}", - "--database=/Root", - "--duration", "120", - "--allow-nullables-in-pk", "1", - ], - wait=True - ) + client = YdbClient(f'grpc://localhost:{self.cluster.nodes[1].grpc_port}', '/Root', True) + client.wait_connection() + with WorkloadRunner(client, 'olap_workload', 120, True) as runner: + runner.run() diff --git a/ydb/tests/stress/olap_workload/tests/ya.make b/ydb/tests/stress/olap_workload/tests/ya.make index ac97247e0826..73cf8d439be6 100644 --- a/ydb/tests/stress/olap_workload/tests/ya.make +++ b/ydb/tests/stress/olap_workload/tests/ya.make @@ -13,12 +13,12 @@ SIZE(MEDIUM) DEPENDS( ydb/apps/ydbd - ydb/apps/ydb - ydb/tests/stress/olap_workload ) PEERDIR( ydb/tests/library + ydb/tests/stress/common + ydb/tests/stress/olap_workload/workload ) diff --git a/ydb/tests/stress/olap_workload/workload/__init__.py b/ydb/tests/stress/olap_workload/workload/__init__.py new file mode 100644 index 000000000000..7dc9aeec3654 --- /dev/null +++ b/ydb/tests/stress/olap_workload/workload/__init__.py @@ -0,0 +1,237 @@ +# -*- coding: utf-8 -*- +import ydb +import time +import random +import threading +from enum import Enum + +from ydb.tests.stress.common.common import WorkloadBase + +supported_pk_types = [ + # Bool https://github.com/ydb-platform/ydb/issues/13037 + "Int8", + "Int16", + "Int32", + "Int64", + "Uint8", + "Uint16", + "Uint32", + "Uint64", + "Decimal(22,9)", + # "DyNumber", https://github.com/ydb-platform/ydb/issues/13048 + + "String", + "Utf8", + # Uuid", https://github.com/ydb-platform/ydb/issues/13047 + + "Date", + "Datetime", + "Datetime64", + "Timestamp", + # "Interval", https://github.com/ydb-platform/ydb/issues/13050 +] + +supported_types = supported_pk_types + [ + "Float", + "Double", + "Json", + "JsonDocument", + "Yson" +] + + +class WorkloadTablesCreateDrop(WorkloadBase): + class TableStatus(Enum): + CREATING = "Creating", + AVAILABLE = "Available", + DELITING = "Deleting" + + def __init__(self, client, prefix, stop, allow_nullables_in_pk): + super().__init__(client, prefix, "create_drop", stop) + self.allow_nullables_in_pk = allow_nullables_in_pk + self.created = 0 + self.deleted = 0 + self.tables = {} + self.lock = threading.Lock() + + def get_stat(self): + with self.lock: + return f"Created: {self.created}, Deleted: {self.deleted}, Exists: {len(self.tables)}" + + def _generate_new_table_n(self): + while True: + r = random.randint(1, 40000) + with self.lock: + if r not in self.tables: + self.tables[r] = WorkloadTablesCreateDrop.TableStatus.CREATING + return r + + def _get_table_to_delete(self): + with self.lock: + for n, s in self.tables.items(): + if s == WorkloadTablesCreateDrop.TableStatus.AVAILABLE: + self.tables[n] = WorkloadTablesCreateDrop.TableStatus.DELITING + return n + return None + + def create_table(self, table): + path = self.get_table_path(table) + column_n = random.randint(1, 10000) + primary_key_column_n = random.randint(1, column_n) + partition_key_column_n = random.randint(1, primary_key_column_n) + column_defs = [] + for i in range(column_n): + if i < primary_key_column_n: + c = random.choice(supported_pk_types) + if not self.allow_nullables_in_pk or random.choice([False, True]): + c += " NOT NULL" + else: + c = random.choice(supported_types) + if random.choice([False, True]): + c += " NOT NULL" + column_defs.append(c) + + stmt = f""" + CREATE TABLE `{path}` ( + {", ".join(["c" + str(i) + " " + column_defs[i] for i in range(column_n)])}, + PRIMARY KEY({", ".join(["c" + str(i) for i in range(primary_key_column_n)])}) + ) + PARTITION BY HASH({", ".join(["c" + str(i) for i in range(partition_key_column_n)])}) + WITH ( + STORE = COLUMN + ) + """ + self.client.query(stmt, True) + + def _create_tables_loop(self): + while not self.is_stop_requested(): + n = self._generate_new_table_n() + self.create_table(str(n)) + with self.lock: + self.tables[n] = WorkloadTablesCreateDrop.TableStatus.AVAILABLE + self.created += 1 + + def _delete_tables_loop(self): + while not self.is_stop_requested(): + n = self._get_table_to_delete() + if n is None: + print("create_drop: No tables to delete") + time.sleep(10) + continue + self.client.drop_table(self.get_table_path(str(n))) + with self.lock: + del self.tables[n] + self.deleted += 1 + + def get_workload_thread_funcs(self): + r = [self._create_tables_loop for x in range(0, 10)] + r.append(self._delete_tables_loop) + return r + + +class WorkloadInsertDelete(WorkloadBase): + def __init__(self, client, prefix, stop): + super().__init__(client, prefix, "insert_delete", stop) + self.inserted = 0 + self.current = 0 + self.table_name = "table" + self.lock = threading.Lock() + + def get_stat(self): + with self.lock: + return f"Inserted: {self.inserted}, Current: {self.current}" + + def _loop(self): + table_path = self.get_table_path(self.table_name) + self.client.query( + f""" + CREATE TABLE `{table_path}` ( + id Int64 NOT NULL, + i64Val Int64, + PRIMARY KEY(id) + ) + PARTITION BY HASH(id) + WITH ( + STORE = COLUMN + ) + """, + True, + ) + i = 1 + while not self.is_stop_requested(): + self.client.query( + f""" + INSERT INTO `{table_path}` (`id`, `i64Val`) + VALUES + ({i * 2}, {i * 10}), + ({i * 2 + 1}, {i * 10 + 1}) + """, + False, + ) + + self.client.query( + f""" + DELETE FROM `{table_path}` + WHERE i64Val % 2 == 1 + """, + False, + ) + + actual = self.client.query( + f""" + SELECT COUNT(*) as cnt, SUM(i64Val) as vals, SUM(id) as ids FROM `{table_path}` + """, + False, + )[0].rows[0] + expected = {"cnt": i, "vals": i * (i + 1) * 5, "ids": i * (i + 1)} + if actual != expected: + raise Exception(f"Incorrect result: expected:{expected}, actual:{actual}") + i += 1 + with self.lock: + self.inserted += 2 + self.current = actual["cnt"] + + def get_workload_thread_funcs(self): + return [self._loop] + + +class WorkloadRunner: + def __init__(self, client, path, duration, allow_nullables_in_pk): + self.client = client + self.name = path + self.tables_prefix = "/".join([self.client.database, self.name]) + self.duration = duration + self.allow_nullables_in_pk = allow_nullables_in_pk + ydb.interceptor.monkey_patch_event_handler() + + def __enter__(self): + self._cleanup() + return self + + def __exit__(self, exc_type, exc_value, traceback): + self._cleanup() + + def _cleanup(self): + print(f"Cleaning up {self.tables_prefix}...") + deleted = self.client.remove_recursively(self.tables_prefix) + print(f"Cleaning up {self.tables_prefix}... done, {deleted} tables deleted") + + def run(self): + stop = threading.Event() + workloads = [ + WorkloadTablesCreateDrop(self.client, self.name, stop, self.allow_nullables_in_pk), + WorkloadInsertDelete(self.client, self.name, stop), + ] + for w in workloads: + w.start() + started_at = started_at = time.time() + while time.time() - started_at < self.duration: + print(f"Elapsed {(int)(time.time() - started_at)} seconds, stat:") + for w in workloads: + print(f"\t{w.name}: {w.get_stat()}") + time.sleep(10) + stop.set() + print("Waiting for stop...") + for w in workloads: + w.join() + print("Waiting for stop... stopped") diff --git a/ydb/tests/stress/olap_workload/workload/ya.make b/ydb/tests/stress/olap_workload/workload/ya.make new file mode 100644 index 000000000000..ec5f262063dc --- /dev/null +++ b/ydb/tests/stress/olap_workload/workload/ya.make @@ -0,0 +1,13 @@ +PY3_LIBRARY() + +PY_SRCS( + __init__.py +) + +PEERDIR( + ydb/public/sdk/python + ydb/public/sdk/python/enable_v3_new_behavior + ydb/tests/stress/common +) + +END() diff --git a/ydb/tests/stress/olap_workload/ya.make b/ydb/tests/stress/olap_workload/ya.make index c5d99bcebc08..e5ae09e3fc1a 100644 --- a/ydb/tests/stress/olap_workload/ya.make +++ b/ydb/tests/stress/olap_workload/ya.make @@ -6,9 +6,7 @@ PY_SRCS( PEERDIR( ydb/tests/stress/common - ydb/public/sdk/python - ydb/public/sdk/python/enable_v3_new_behavior - library/python/monlib + ydb/tests/stress/olap_workload/workload ) END() diff --git a/ydb/tests/stress/oltp_workload/__main__.py b/ydb/tests/stress/oltp_workload/__main__.py index 172b369db5d3..d1fc93674708 100644 --- a/ydb/tests/stress/oltp_workload/__main__.py +++ b/ydb/tests/stress/oltp_workload/__main__.py @@ -1,194 +1,8 @@ # -*- coding: utf-8 -*- import argparse -import ydb -import time -import threading -from ydb.tests.stress.common.common import WorkloadBase, YdbClient - -ydb.interceptor.monkey_patch_event_handler() - -supported_pk_types = [ - "Int64", - "Uint64", - "Int32", - "Uint32", - "Int16", - "Uint16", - "Int8", - "Uint8", - "Bool", - "Decimal(1,0)", - "Decimal(22,9)", - "Decimal(35,10)", - "DyNumber", - - "String", - "Utf8", - "Uuid", - - "Date", - "Datetime", - "Timestamp", - "Interval", - "Date32", - "Datetime64", - "Timestamp64", - "Interval64" -] - -supported_types = supported_pk_types + [ - "Float", - "Double", - "Json", - "JsonDocument", - "Yson" -] - -null_types = [ - "Int64", - "Decimal(22,9)", - "Decimal(35,10)", - "String", -] - - -class WorkloadInsertDeleteAllTypes(WorkloadBase): - def __init__(self, client, prefix, stop): - super().__init__(client, prefix, "insert_delete_all_types", stop) - self.inserted = 0 - self.current = 0 - self.table_name = "table" - self.lock = threading.Lock() - - def get_stat(self): - with self.lock: - return f"Inserted: {self.inserted}, Current: {self.current}" - - def _loop(self): - table_path = self.get_table_path(self.table_name) - create_sql = f""" - CREATE TABLE `{table_path}` ( - {", ".join(["pk" + str(i) + " " + supported_pk_types[i] for i in range(len(supported_pk_types))])}, - {", ".join(["null_pk" + str(i) + " " + null_types[i] for i in range(len(null_types))])}, - {", ".join(["col" + str(i) + " " + supported_types[i] for i in range(len(supported_types))])}, - {", ".join(["null_col" + str(i) + " " + null_types[i] for i in range(len(null_types))])}, - PRIMARY KEY( - {", ".join(["pk" + str(i) for i in range(len(supported_pk_types))])}, - {", ".join(["null_pk" + str(i) for i in range(len(null_types))])} - ) - ) - """ - # print(create_sql) - - self.client.query(create_sql, True,) - i = 1 - while not self.is_stop_requested(): - insert_sql = f""" - INSERT INTO `{table_path}` ( - {", ".join(["pk" + str(i) for i in range(len(supported_pk_types))])}, - {", ".join(["null_pk" + str(i) for i in range(len(null_types))])}, - {", ".join(["col" + str(i) for i in range(len(supported_types))])}, - {", ".join(["null_col" + str(i) for i in range(len(null_types))])} - ) - VALUES - ({i * 2}, {i * 10}, - -2147483648, 0, -32768, 0, -128, 0, false, - CAST('1' AS Decimal(1,0)), CAST('1234567890123456789.000000001' AS Decimal(22,9)), CAST('1234567890123456789123456789.000000001' AS Decimal(35,10)), - CAST('-1.234' AS DyNumber), 'AnotherString', 'AnotherUtf8', CAST('123e4567-e89b-12d3-a456-556642440000' AS Uuid), - CAST('2023-10-02' AS Date), CAST('2023-10-02T11:00:00' AS Datetime), CAST(1696243200000000 AS Timestamp), CAST(-86400 AS Interval), - Date32('998-06-02'), CAST('2023-10-02T11:00:00.654321' AS Datetime64), Timestamp64('0998-06-02T12:30:00.123456Z'),Interval64('PT2S'), - NULL, NULL, NULL, NULL, - -2000000, {i * 10}, -222, 222, -22, 22, -2, 2, true, - CAST('2' AS Decimal(1,0)), CAST('2234567890123456789.000000001' AS Decimal(22,9)), CAST('2234567890123456789123456789.000000001' AS Decimal(35,10)), - CAST('123E4' AS DyNumber), 'SampleString', 'SampleUtf8', CAST('550e8400-e29b-41d4-a716-446655440000' AS Uuid), - CAST('2023-10-01' AS Date), CAST('2023-10-01T10:00:00' AS Datetime), CAST(1696156800000000 AS Timestamp), CAST(3600 AS Interval), - Date32('998-06-01'), CAST('2023-10-01T10:00:00.123456' AS Datetime64), Timestamp64('0998-06-02T12:30:00.678901Z'),Interval64('-PT2S'), 3.14f, 2.71828, - CAST('{{"json_key":"json_value"}}' AS Json), CAST('{{"doc_key":"doc_value"}}' AS JsonDocument), CAST('<yson><key1>value1</key1></yson>' AS Yson), - NULL, NULL, NULL, NULL), - ({i * 2 + 1}, {i * 10 + 1}, - 2147483647, 4294967295, 32767, 65535, 127, 255, true, - CAST('3' AS Decimal(1,0)), CAST('3234567890123456789.000000001' AS Decimal(22,9)), CAST('3234567890123456789123456789.000000001' AS Decimal(35,10)), - CAST('4.567E-3' AS DyNumber), 'ExampleString', 'ExampleUtf8', CAST('00112233-4455-6677-8899-aabbccddeeff' AS Uuid), - CAST('2022-12-31' AS Date), CAST('2022-12-31T23:59:59' AS Datetime), CAST(1672444799000000 AS Timestamp), CAST(172800 AS Interval), - Date32('1000-01-01'), CAST('2022-12-31T23:59:59.999999' AS Datetime64), Timestamp64('1000-01-01T00:00:00.000000Z'), Interval64('PT1440M'), - NULL, NULL, NULL, NULL, - -4000000, {i * 10 + 1}, -444, 444, -44, 44, -4, 4, false, - CAST('4' AS Decimal(1,0)), CAST('4234567890123456789.000000001' AS Decimal(22,9)), CAST('4234567890123456789123456789.000000001' AS Decimal(35,10)), - CAST('-987E-4' AS DyNumber), 'NewString', 'NewUtf8', CAST('01234567-89ab-cdef-0123-456789abcdef' AS Uuid), - CAST('1980-03-15' AS Date), CAST('1980-03-15T08:00:00' AS Datetime), CAST(315532800000000 AS Timestamp), CAST(-31536000 AS Interval), - Date32('2000-02-29'), CAST('1980-03-15T08:00:00.123456' AS Datetime64), Timestamp64('2000-02-29T12:30:00.999999Z'), Interval64('-PT600S'), -0.123f, 2.71828, - CAST('{{"another_key":"another_value"}}' AS Json), CAST('{{"another_doc_key":"another_doc_value"}}' AS JsonDocument), CAST('<yson><key2>value2</key2></yson>' AS Yson), - NULL, NULL, NULL, NULL); - """ - # print(insert_sql) - self.client.query(insert_sql, False,) - - self.client.query( - f""" - DELETE FROM `{table_path}` - WHERE col1 % 2 == 1 AND null_pk0 IS NULL - """, - False, - ) - - actual = self.client.query( - f""" - SELECT COUNT(*) as cnt, SUM(col1) as vals, SUM(pk0) as ids FROM `{table_path}` - """, - False, - )[0].rows[0] - expected = {"cnt": i, "vals": i * (i + 1) * 5, "ids": i * (i + 1)} - if actual != expected: - raise Exception(f"Incorrect result: expected:{expected}, actual:{actual}") - i += 1 - with self.lock: - self.inserted += 2 - self.current = actual["cnt"] - - def get_workload_thread_funcs(self): - return [self._loop] - - -class WorkloadRunner: - def __init__(self, client, name, duration): - self.client = client - self.name = args.path - self.tables_prefix = "/".join([self.client.database, self.name]) - self.duration = args.duration - - def __enter__(self): - self._cleanup() - return self - - def __exit__(self, exc_type, exc_value, traceback): - self._cleanup() - - def _cleanup(self): - print(f"Cleaning up {self.tables_prefix}...") - deleted = client.remove_recursively(self.tables_prefix) - print(f"Cleaning up {self.tables_prefix}... done, {deleted} tables deleted") - - def run(self): - stop = threading.Event() - - workloads = [ - WorkloadInsertDeleteAllTypes(self.client, self.name, stop), - ] - - for w in workloads: - w.start() - started_at = started_at = time.time() - while time.time() - started_at < self.duration: - print(f"Elapsed {(int)(time.time() - started_at)} seconds, stat:") - for w in workloads: - print(f"\t{w.name}: {w.get_stat()}") - time.sleep(10) - stop.set() - print("Waiting for stop...") - for w in workloads: - w.join() - print("Waiting for stop... stopped") +from ydb.tests.stress.oltp_workload.workload import WorkloadRunner +from ydb.tests.stress.common.common import YdbClient if __name__ == "__main__": diff --git a/ydb/tests/stress/oltp_workload/tests/test_workload.py b/ydb/tests/stress/oltp_workload/tests/test_workload.py index d3478f06469d..b2f93992c630 100644 --- a/ydb/tests/stress/oltp_workload/tests/test_workload.py +++ b/ydb/tests/stress/oltp_workload/tests/test_workload.py @@ -1,8 +1,8 @@ # -*- coding: utf-8 -*- -import yatest - from ydb.tests.library.harness.kikimr_runner import KiKiMR from ydb.tests.library.harness.kikimr_config import KikimrConfigGenerator +from ydb.tests.stress.oltp_workload.workload import WorkloadRunner +from ydb.tests.stress.common.common import YdbClient class TestYdbWorkload(object): @@ -16,13 +16,7 @@ def teardown_class(cls): cls.cluster.stop() def test(self): - workload_path = yatest.common.build_path("ydb/tests/stress/oltp_workload/oltp_workload") - yatest.common.execute( - [ - workload_path, - "--endpoint", f"grpc://localhost:{self.cluster.nodes[1].grpc_port}", - "--database=/Root", - "--duration", "120", - ], - wait=True - ) + client = YdbClient(f'grpc://localhost:{self.cluster.nodes[1].grpc_port}', '/Root', True) + client.wait_connection() + with WorkloadRunner(client, 'oltp_workload', 120) as runner: + runner.run() diff --git a/ydb/tests/stress/oltp_workload/tests/ya.make b/ydb/tests/stress/oltp_workload/tests/ya.make index a7f7cb8e5e3d..142a5373ed94 100644 --- a/ydb/tests/stress/oltp_workload/tests/ya.make +++ b/ydb/tests/stress/oltp_workload/tests/ya.make @@ -13,12 +13,12 @@ SIZE(MEDIUM) DEPENDS( ydb/apps/ydbd - ydb/apps/ydb - ydb/tests/stress/oltp_workload ) PEERDIR( ydb/tests/library + ydb/tests/stress/oltp_workload/workload + ydb/tests/stress/common ) diff --git a/ydb/tests/stress/oltp_workload/workload/__init__.py b/ydb/tests/stress/oltp_workload/workload/__init__.py new file mode 100644 index 000000000000..319d9b7c5790 --- /dev/null +++ b/ydb/tests/stress/oltp_workload/workload/__init__.py @@ -0,0 +1,191 @@ +# -*- coding: utf-8 -*- +import ydb +import time +import threading + +from ydb.tests.stress.common.common import WorkloadBase + +ydb.interceptor.monkey_patch_event_handler() + +supported_pk_types = [ + "Int64", + "Uint64", + "Int32", + "Uint32", + "Int16", + "Uint16", + "Int8", + "Uint8", + "Bool", + "Decimal(1,0)", + "Decimal(22,9)", + "Decimal(35,10)", + "DyNumber", + + "String", + "Utf8", + "Uuid", + + "Date", + "Datetime", + "Timestamp", + "Interval", + "Date32", + "Datetime64", + "Timestamp64", + "Interval64" +] + +supported_types = supported_pk_types + [ + "Float", + "Double", + "Json", + "JsonDocument", + "Yson" +] + +null_types = [ + "Int64", + "Decimal(22,9)", + "Decimal(35,10)", + "String", +] + + +class WorkloadInsertDeleteAllTypes(WorkloadBase): + def __init__(self, client, prefix, stop): + super().__init__(client, prefix, "insert_delete_all_types", stop) + self.inserted = 0 + self.current = 0 + self.table_name = "table" + self.lock = threading.Lock() + + def get_stat(self): + with self.lock: + return f"Inserted: {self.inserted}, Current: {self.current}" + + def _loop(self): + table_path = self.get_table_path(self.table_name) + create_sql = f""" + CREATE TABLE `{table_path}` ( + {", ".join(["pk" + str(i) + " " + supported_pk_types[i] for i in range(len(supported_pk_types))])}, + {", ".join(["null_pk" + str(i) + " " + null_types[i] for i in range(len(null_types))])}, + {", ".join(["col" + str(i) + " " + supported_types[i] for i in range(len(supported_types))])}, + {", ".join(["null_col" + str(i) + " " + null_types[i] for i in range(len(null_types))])}, + PRIMARY KEY( + {", ".join(["pk" + str(i) for i in range(len(supported_pk_types))])}, + {", ".join(["null_pk" + str(i) for i in range(len(null_types))])} + ) + ) + """ + # print(create_sql) + + self.client.query(create_sql, True,) + i = 1 + while not self.is_stop_requested(): + insert_sql = f""" + INSERT INTO `{table_path}` ( + {", ".join(["pk" + str(i) for i in range(len(supported_pk_types))])}, + {", ".join(["null_pk" + str(i) for i in range(len(null_types))])}, + {", ".join(["col" + str(i) for i in range(len(supported_types))])}, + {", ".join(["null_col" + str(i) for i in range(len(null_types))])} + ) + VALUES + ({i * 2}, {i * 10}, + -2147483648, 0, -32768, 0, -128, 0, false, + CAST('1' AS Decimal(1,0)), CAST('1234567890123456789.000000001' AS Decimal(22,9)), CAST('1234567890123456789123456789.000000001' AS Decimal(35,10)), + CAST('-1.234' AS DyNumber), 'AnotherString', 'AnotherUtf8', CAST('123e4567-e89b-12d3-a456-556642440000' AS Uuid), + CAST('2023-10-02' AS Date), CAST('2023-10-02T11:00:00' AS Datetime), CAST(1696243200000000 AS Timestamp), CAST(-86400 AS Interval), + Date32('998-06-02'), CAST('2023-10-02T11:00:00.654321' AS Datetime64), Timestamp64('0998-06-02T12:30:00.123456Z'),Interval64('PT2S'), + NULL, NULL, NULL, NULL, + -2000000, {i * 10}, -222, 222, -22, 22, -2, 2, true, + CAST('2' AS Decimal(1,0)), CAST('2234567890123456789.000000001' AS Decimal(22,9)), CAST('2234567890123456789123456789.000000001' AS Decimal(35,10)), + CAST('123E4' AS DyNumber), 'SampleString', 'SampleUtf8', CAST('550e8400-e29b-41d4-a716-446655440000' AS Uuid), + CAST('2023-10-01' AS Date), CAST('2023-10-01T10:00:00' AS Datetime), CAST(1696156800000000 AS Timestamp), CAST(3600 AS Interval), + Date32('998-06-01'), CAST('2023-10-01T10:00:00.123456' AS Datetime64), Timestamp64('0998-06-02T12:30:00.678901Z'),Interval64('-PT2S'), 3.14f, 2.71828, + CAST('{{"json_key":"json_value"}}' AS Json), CAST('{{"doc_key":"doc_value"}}' AS JsonDocument), CAST('<yson><key1>value1</key1></yson>' AS Yson), + NULL, NULL, NULL, NULL), + ({i * 2 + 1}, {i * 10 + 1}, + 2147483647, 4294967295, 32767, 65535, 127, 255, true, + CAST('3' AS Decimal(1,0)), CAST('3234567890123456789.000000001' AS Decimal(22,9)), CAST('3234567890123456789123456789.000000001' AS Decimal(35,10)), + CAST('4.567E-3' AS DyNumber), 'ExampleString', 'ExampleUtf8', CAST('00112233-4455-6677-8899-aabbccddeeff' AS Uuid), + CAST('2022-12-31' AS Date), CAST('2022-12-31T23:59:59' AS Datetime), CAST(1672444799000000 AS Timestamp), CAST(172800 AS Interval), + Date32('1000-01-01'), CAST('2022-12-31T23:59:59.999999' AS Datetime64), Timestamp64('1000-01-01T00:00:00.000000Z'), Interval64('PT1440M'), + NULL, NULL, NULL, NULL, + -4000000, {i * 10 + 1}, -444, 444, -44, 44, -4, 4, false, + CAST('4' AS Decimal(1,0)), CAST('4234567890123456789.000000001' AS Decimal(22,9)), CAST('4234567890123456789123456789.000000001' AS Decimal(35,10)), + CAST('-987E-4' AS DyNumber), 'NewString', 'NewUtf8', CAST('01234567-89ab-cdef-0123-456789abcdef' AS Uuid), + CAST('1980-03-15' AS Date), CAST('1980-03-15T08:00:00' AS Datetime), CAST(315532800000000 AS Timestamp), CAST(-31536000 AS Interval), + Date32('2000-02-29'), CAST('1980-03-15T08:00:00.123456' AS Datetime64), Timestamp64('2000-02-29T12:30:00.999999Z'), Interval64('-PT600S'), -0.123f, 2.71828, + CAST('{{"another_key":"another_value"}}' AS Json), CAST('{{"another_doc_key":"another_doc_value"}}' AS JsonDocument), CAST('<yson><key2>value2</key2></yson>' AS Yson), + NULL, NULL, NULL, NULL); + """ + # print(insert_sql) + self.client.query(insert_sql, False,) + + self.client.query( + f""" + DELETE FROM `{table_path}` + WHERE col1 % 2 == 1 AND null_pk0 IS NULL + """, + False, + ) + + actual = self.client.query( + f""" + SELECT COUNT(*) as cnt, SUM(col1) as vals, SUM(pk0) as ids FROM `{table_path}` + """, + False, + )[0].rows[0] + expected = {"cnt": i, "vals": i * (i + 1) * 5, "ids": i * (i + 1)} + if actual != expected: + raise Exception(f"Incorrect result: expected:{expected}, actual:{actual}") + i += 1 + with self.lock: + self.inserted += 2 + self.current = actual["cnt"] + + def get_workload_thread_funcs(self): + return [self._loop] + + +class WorkloadRunner: + def __init__(self, client, path, duration): + self.client = client + self.name = path + self.tables_prefix = "/".join([self.client.database, self.name]) + self.duration = duration + ydb.interceptor.monkey_patch_event_handler() + + def __enter__(self): + self._cleanup() + return self + + def __exit__(self, exc_type, exc_value, traceback): + self._cleanup() + + def _cleanup(self): + print(f"Cleaning up {self.tables_prefix}...") + deleted = self.client.remove_recursively(self.tables_prefix) + print(f"Cleaning up {self.tables_prefix}... done, {deleted} tables deleted") + + def run(self): + stop = threading.Event() + + workloads = [ + WorkloadInsertDeleteAllTypes(self.client, self.name, stop), + ] + + for w in workloads: + w.start() + started_at = started_at = time.time() + while time.time() - started_at < self.duration: + print(f"Elapsed {(int)(time.time() - started_at)} seconds, stat:") + for w in workloads: + print(f"\t{w.name}: {w.get_stat()}") + time.sleep(10) + stop.set() + print("Waiting for stop...") + for w in workloads: + w.join() + print("Waiting for stop... stopped") diff --git a/ydb/tests/stress/oltp_workload/workload/ya.make b/ydb/tests/stress/oltp_workload/workload/ya.make new file mode 100644 index 000000000000..0303e5d573b8 --- /dev/null +++ b/ydb/tests/stress/oltp_workload/workload/ya.make @@ -0,0 +1,13 @@ +PY3_LIBRARY() + +PY_SRCS( + __init__.py +) + +PEERDIR( + ydb/tests/stress/common + ydb/public/sdk/python + ydb/public/sdk/python/enable_v3_new_behavior +) + +END() diff --git a/ydb/tests/stress/oltp_workload/ya.make b/ydb/tests/stress/oltp_workload/ya.make index 061a5c0c3c7b..4e202d987b31 100644 --- a/ydb/tests/stress/oltp_workload/ya.make +++ b/ydb/tests/stress/oltp_workload/ya.make @@ -5,10 +5,8 @@ PY_SRCS( ) PEERDIR( - ydb/tests/stress/common - ydb/public/sdk/python - ydb/public/sdk/python/enable_v3_new_behavior - library/python/monlib + ydb/tests/stress/common + ydb/tests/stress/oltp_workload/workload ) END() diff --git a/ydb/tests/stress/simple_queue/__main__.py b/ydb/tests/stress/simple_queue/__main__.py index b20661aa013e..0e30383ec0dd 100644 --- a/ydb/tests/stress/simple_queue/__main__.py +++ b/ydb/tests/stress/simple_queue/__main__.py @@ -1,528 +1,6 @@ # -*- coding: utf-8 -*- import argparse -import copy -import os -import random -import time -import logging -import string -import threading -import collections -import itertools -import queue -import ydb -from library.python.monlib.metric_registry import MetricRegistry -import socket - -logger = logging.getLogger(__name__) - -BLOB_MIN_SIZE = 128 * 1024 - - -def random_string(size): - return ''.join([random.choice(string.ascii_lowercase) for _ in range(size)]) - - -def generate_blobs(count=32): - return [random_string(idx * BLOB_MIN_SIZE) for idx in range(1, count + 1)] - - -class EventKind(object): - ALTER_TABLE = 'alter_table' - COPY_TABLE = 'copy_table' - DROP_TABLE = 'drop_table' - START_READ_TABLE = 'start_read_table' - READ_TABLE_CHUNK = 'read_table_chunk' - - WRITE = 'write' - REMOVE_OUTDATED = 'remove_outdated' - FIND_OUTDATED = 'find_outdated' - - SCAN_QUERY_CHUNK = 'scan_query_chunk' - START_SCAN_QUERY = 'start_scan_query' - - @classmethod - def periodic_tasks(cls): - return ( - cls.START_READ_TABLE, - cls.START_SCAN_QUERY, - ) - - @classmethod - def rare(cls): - return ( - cls.ALTER_TABLE, - cls.DROP_TABLE, - cls.COPY_TABLE, - cls.START_READ_TABLE, - cls.START_SCAN_QUERY, - ) - - @classmethod - def list(cls): - return ( - cls.COPY_TABLE, - cls.DROP_TABLE, - cls.ALTER_TABLE, - - cls.START_READ_TABLE, - cls.READ_TABLE_CHUNK, - - cls.FIND_OUTDATED, - cls.REMOVE_OUTDATED, - cls.WRITE, - cls.START_SCAN_QUERY, - cls.SCAN_QUERY_CHUNK, - ) - - -def get_table_description(table_name, mode): - if mode == "row": - store_entry = "STORE = ROW," - ttl_entry = """TTL = Interval("PT240S") ON `timestamp` AS SECONDS,""" - elif mode == "column": - store_entry = "STORE = COLUMN," - ttl_entry = "" - else: - raise RuntimeError("Unkown mode: {}".format(mode)) - - return f""" - CREATE TABLE `{table_name}` ( - key Uint64 NOT NULL, - `timestamp` Uint64 NOT NULL, - value Utf8 FAMILY lz4_family NOT NULL, - PRIMARY KEY (key), - FAMILY lz4_family ( - COMPRESSION = "lz4" - ), - INDEX by_timestamp GLOBAL ON (`timestamp`) - ) - WITH ( - {store_entry} - {ttl_entry} - AUTO_PARTITIONING_BY_SIZE = ENABLED, - AUTO_PARTITIONING_BY_LOAD = ENABLED, - AUTO_PARTITIONING_PARTITION_SIZE_MB = 128, - READ_REPLICAS_SETTINGS = "PER_AZ:1", - KEY_BLOOM_FILTER = ENABLED - ); - """.format(table_name=table_name) - - -def timestamp(): - return int(time.time()) - - -def extract_keys(response): - try: - result_sets = response.result() - return result_sets[0].rows - except Exception: - return [] - - -def status_code_to_label(status=None): - if status is None: - return 'Success' - return status.name.lower().capitalize() - - -class WorkloadStats(object): - def __init__(self, *evs): - self.lock = threading.Lock() - self.registry = MetricRegistry() - self.by_events_stats = {} - for ev in evs: - self.init_kind(ev) - - def int_gauge(self, sensor, **labels): - all_labels = copy.deepcopy(labels) - all_labels.update({'sensor': sensor}) - return self.registry.int_gauge(all_labels) - - def rate(self, sensor, **labels): - all_labels = copy.deepcopy(labels) - all_labels.update({'sensor': sensor}) - return self.registry.rate(all_labels) - - def init_kind(self, ev_kind): - self.by_events_stats[ev_kind] = {} - for status in list(ydb.StatusCode): - if status == ydb.StatusCode.STATUS_CODE_UNSPECIFIED: - continue - - label = status_code_to_label(status) - self.by_events_stats[ev_kind][label] = self.rate( - label, event=ev_kind) - - def save_event(self, ev_kind, details=None): - label = status_code_to_label(details) - if ev_kind not in self.by_events_stats: - return - - self.by_events_stats[ev_kind][label].inc() - - def print_stats(self): - report = ["=" * 120] - for event_kind, stats in self.by_events_stats.items(): - something_appended = False - total_response_count = sum([responses_count.get() for responses_count in stats.values()]) - if total_response_count == 0: - continue - - for response_kind, responses_count in stats.items(): - value = responses_count.get() - is_success = response_kind == status_code_to_label() - if value > 0 or is_success: - something_appended = True - line = "EventKind: {event_kind}, {response_kind} responses count: {responses_count}".format( - event_kind=event_kind, - response_kind=response_kind, - responses_count=value, - ) - if is_success: - line += " ({:.2f}%)".format(100.0 * value / total_response_count) - report.append(line) - if something_appended: - report.append("") - report.append("=" * 120) - print("\n".join(report)) - - -class YdbQueue(object): - def __init__(self, idx, database, stats, driver, pool, mode): - self.working_dir = os.path.join(database, socket.gethostname().split('.')[0].replace('-', '_') + "_" + str(idx)) - self.copies_dir = os.path.join(self.working_dir, 'copies') - self.table_name = self.table_name_with_timestamp() - self.queries = {} - self.pool = pool - self.driver = driver - self.stats = stats - self.blobs = generate_blobs(16) - self.blobs_iter = itertools.cycle(self.blobs) - self.outdated_period = 60 * 2 - self.database = database - self.ops = ydb.BaseRequestSettings().with_operation_timeout(19).with_timeout(20) - self.driver.scheme_client.make_directory(self.working_dir) - self.driver.scheme_client.make_directory(self.copies_dir) - self.mode = mode - print("Working dir %s" % self.working_dir) - f = self.prepare_new_queue(self.table_name) - f.result() - # a queue with tables to drop - self.drop_queue = collections.deque() - # a set with keys that are ready to be removed - self.outdated_keys = collections.deque() - self.outdated_keys_max_size = 50 - - def table_name_with_timestamp(self, working_dir=None): - if working_dir is not None: - return os.path.join(working_dir, "queue_" + str(timestamp())) - return os.path.join(self.working_dir, "queue_" + str(timestamp())) - - def prepare_new_queue(self, table_name=None): - session = self.pool.acquire() - table_name = self.table_name_with_timestamp() if table_name is None else table_name - f = session.async_execute_scheme(get_table_description(table_name, self.mode), settings=self.ops) - f.add_done_callback(lambda x: self.on_received_response(session, x, 'create')) - return f - - def switch(self, switch_to): - self.table_name = switch_to - self.outdated_keys.clear() - - def on_received_response(self, session, response, event, callback=None): - self.pool.release(session) - - if callback is not None: - callback(response) - - try: - response.result() - self.stats.save_event(event) - except ydb.Error as e: - debug = False - if debug: - print(event) - print(e) - print() - - self.stats.save_event(event, e.status) - - def send_query(self, query, parameters, event_kind, callback=None): - session = self.pool.acquire() - f = session.transaction().async_execute( - query, parameters=parameters, commit_tx=True, settings=self.ops) - f.add_done_callback( - lambda response: self.on_received_response( - session, response, event_kind, callback - ) - ) - return f - - def on_list_response(self, base, f, switch=True): - try: - response = f.result() - except ydb.Error: - return - - tables = [] - for child in response.children: - if child.is_directory(): - continue - - tables.append( - os.path.join( - base, child.name - ) - ) - - candidates = list(sorted(tables)) - if switch: - switch_to = candidates.pop() - self.switch(switch_to) - self.drop_queue.extend(candidates) - - def list_copies_dir(self): - f = self.driver.scheme_client.async_list_directory(self.copies_dir) - f.add_done_callback( - lambda x: self.on_list_response( - self.copies_dir, x, switch=False - ) - ) - - def list_working_dir(self): - f = self.driver.scheme_client.async_list_directory(self.working_dir) - f.add_done_callback( - lambda x: self.on_list_response( - self.working_dir, x, switch=True, - ) - ) - - def remove_outdated(self): - try: - keys_set = self.outdated_keys.popleft() - except IndexError: - return - - query = ydb.DataQuery( - """ - --!syntax_v1 - DECLARE $keys as List<Struct<key: Uint64>>; - DELETE FROM `{}` ON SELECT `key` FROM AS_TABLE($keys); - """.format(self.table_name), { - '$keys': ydb.ListType(ydb.StructType().add_member('key', ydb.PrimitiveType.Uint64)).proto - } - ) - parameters = { - '$keys': keys_set - } - - return self.send_query(query=query, event_kind=EventKind.REMOVE_OUTDATED, parameters=parameters) - - def on_find_outdated(self, resp): - try: - rs = resp.result() - self.outdated_keys.append(rs[0].rows) - except ydb.Error: - return - - def find_outdated(self): - # ensure queue is not large enough - if len(self.outdated_keys) > self.outdated_keys_max_size: - return - - outdated_timestamp = timestamp() - self.outdated_period - query = """ - --!syntax_v1 - SELECT `key` FROM `{table_name}` - WHERE `key` <= {outdated_timestamp} - ORDER BY `key` - LIMIT 50; - """.format(table_name=self.table_name, outdated_timestamp=outdated_timestamp) - parameters = None - return self.send_query(query=query, event_kind=EventKind.FIND_OUTDATED, parameters=parameters, callback=self.on_find_outdated) - - def write(self): - current_timestamp = timestamp() - blob = next(self.blobs_iter) - query = ydb.DataQuery( - """ - --!syntax_v1 - DECLARE $key as Uint64; - DECLARE $value as Utf8; - DECLARE $timestamp as Uint64; - UPSERT INTO `{}` (`key`, `timestamp`, `value`) VALUES ($key, $timestamp, $value); - """.format(self.table_name), - { - '$key': ydb.PrimitiveType.Uint64.proto, - '$value': ydb.PrimitiveType.Utf8.proto, - '$timestamp': ydb.PrimitiveType.Uint64.proto, - } - ) - parameters = { - '$key': current_timestamp, - '$value': blob, - '$timestamp': current_timestamp, - } - - return self.send_query(query=query, event_kind=EventKind.WRITE, parameters=parameters) - - def move_iterator(self, it, callback): - next_f = next(it) - next_f.add_done_callback(lambda x: callback(it, x)) - - def on_read_table_chunk(self, it, f): - try: - f.result() - self.stats.save_event(EventKind.READ_TABLE_CHUNK) - except ydb.Error as e: - self.stats.save_event(EventKind.READ_TABLE_CHUNK, e.status) - except StopIteration: - return - self.move_iterator(it, self.on_read_table_chunk) - - def on_scan_query_chunk(self, it, f): - try: - f.result() - self.stats.save_event(EventKind.SCAN_QUERY_CHUNK) - except ydb.Error as e: - self.stats.save_event(EventKind.SCAN_QUERY_CHUNK, e.status) - except StopIteration: - return - - self.move_iterator(it, self.on_scan_query_chunk) - - def start_scan_query(self): - it = self.driver.table_client.async_scan_query('select count(*) as cnt from `%s`' % self.table_name) - self.stats.save_event(EventKind.START_SCAN_QUERY) - self.move_iterator(it, self.on_scan_query_chunk) - - def start_read_table(self): - with self.pool.checkout() as session: - it = session.async_read_table(self.table_name, columns=('key', )) - self.stats.save_event(EventKind.START_READ_TABLE) - self.move_iterator(it, self.on_read_table_chunk) - - def alter_table(self): - session = self.pool.acquire() - query = "ALTER TABLE `{table_name}` ADD COLUMN column_{val} Utf8".format( - table_name=self.table_name, - val=random.randint(1, 100000), - ) - - f = session.async_execute_scheme(query, settings=self.ops) - f.add_done_callback( - lambda response: self.on_received_response( - session, response, EventKind.ALTER_TABLE, - ) - ) - - def drop_table(self): - duplicates = set() - while len(self.drop_queue) > 0: - candidate = self.drop_queue.popleft() - if candidate in duplicates: - continue - - duplicates.add(candidate) - session = self.pool.acquire() - f = session.async_drop_table(candidate, settings=self.ops) - f.add_done_callback( - lambda response: self.on_received_response( - session, response, EventKind.DROP_TABLE, - ) - ) - - def copy_table(self): - session = self.pool.acquire() - dst_table = self.table_name_with_timestamp(self.copies_dir) - f = session.async_copy_table(self.table_name, dst_table, settings=self.ops) - f.add_done_callback( - lambda response: self.on_received_response( - session, response, EventKind.COPY_TABLE - ) - ) - - -class Workload(object): - def __init__(self, endpoint, database, duration, mode): - self.database = database - self.driver = ydb.Driver(ydb.DriverConfig(endpoint, database)) - self.pool = ydb.SessionPool(self.driver, size=200) - self.round_size = 1000 - self.duration = duration - self.delayed_events = queue.Queue() - self.workload_stats = WorkloadStats(*EventKind.list()) - # TODO: run both modes in parallel? - self.mode = mode - self.ydb_queues = [ - YdbQueue(idx, database, self.workload_stats, self.driver, self.pool, self.mode) - for idx in range(2) - ] - - def random_points(self, size=1): - return set([random.randint(0, self.round_size) for _ in range(size)]) - - def loop(self): - started_at = time.time() - round_id_it = itertools.count(start=1) - queue_it = itertools.cycle(self.ydb_queues) - - while time.time() - started_at < self.duration: - - for ydb_queue in self.ydb_queues: - ydb_queue.list_working_dir() - ydb_queue.list_copies_dir() - print("Table name: %s" % ydb_queue.table_name) - - round_id = next(round_id_it) - if round_id % 10 == 0: - for ydb_queue in self.ydb_queues: - ydb_queue.prepare_new_queue() - - self.workload_stats.print_stats() - - schedule = [] - for op in EventKind.rare(): - schedule.extend([(point, op) for point in self.random_points()]) - - for op in EventKind.periodic_tasks(): - schedule.extend([(point, op) for point in self.random_points(size=50)]) - - schedule = collections.deque(list(sorted(schedule))) - - print("Starting round_id %d" % round_id) - print("Round schedule %s" % schedule) - for step_id in range(self.round_size): - - if time.time() - started_at > self.duration: - break - - ydb_queue = next(queue_it) - - if step_id % 100 == 0: - print("step_id %d" % step_id) - - yield ydb_queue.write - yield ydb_queue.find_outdated - yield ydb_queue.remove_outdated - - while len(schedule) > 0: - scheduled_at, op = schedule[0] - if scheduled_at != step_id: - break - - schedule.popleft() - yield getattr(ydb_queue, op) - - def __enter__(self): - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - self.pool.stop() - self.driver.stop() - +from ydb.tests.stress.simple_queue.workload import Workload if __name__ == '__main__': text = """\033[92mQueue workload\x1b[0m""" diff --git a/ydb/tests/stress/simple_queue/tests/test_workload.py b/ydb/tests/stress/simple_queue/tests/test_workload.py index 5ab92629a78e..625a26f0f6c3 100644 --- a/ydb/tests/stress/simple_queue/tests/test_workload.py +++ b/ydb/tests/stress/simple_queue/tests/test_workload.py @@ -1,9 +1,10 @@ # -*- coding: utf-8 -*- -import yatest +import pytest from ydb.tests.library.harness.kikimr_runner import KiKiMR from ydb.tests.library.harness.kikimr_config import KikimrConfigGenerator from ydb.tests.library.common.types import Erasure +from ydb.tests.stress.simple_queue.workload import Workload class TestYdbWorkload(object): @@ -13,24 +14,13 @@ def setup_class(cls): config_generator.yaml_config["table_service_config"]["allow_olap_data_query"] = True cls.cluster = KiKiMR(config_generator) cls.cluster.start() - workload_path = yatest.common.build_path("ydb/tests/stress/simple_queue/simple_queue") - cls.workload_command_prefix = [ - workload_path, - "--endpoint", "grpc://localhost:%d" % cls.cluster.nodes[1].grpc_port, - "--database=/Root", - "--duration", "60", - ] @classmethod def teardown_class(cls): cls.cluster.stop() - def test_row(self): - command = self.workload_command_prefix - command.extend(["--mode", "row"]) - yatest.common.execute(command, wait=True) - - def test_column(self): - command = self.workload_command_prefix - command.extend(["--mode", "column"]) - yatest.common.execute(command, wait=True) + @pytest.mark.parametrize('mode', ['row', 'column']) + def test(self, mode: str): + with Workload(f'grpc://localhost:{self.cluster.nodes[1].grpc_port}', '/Root', 60, mode) as workload: + for handle in workload.loop(): + handle() diff --git a/ydb/tests/stress/simple_queue/tests/ya.make b/ydb/tests/stress/simple_queue/tests/ya.make index 1ec45a49191c..064e7dc7fdd5 100644 --- a/ydb/tests/stress/simple_queue/tests/ya.make +++ b/ydb/tests/stress/simple_queue/tests/ya.make @@ -13,12 +13,11 @@ SIZE(MEDIUM) DEPENDS( ydb/apps/ydbd - ydb/apps/ydb - ydb/tests/stress/simple_queue ) PEERDIR( ydb/tests/library + ydb/tests/stress/simple_queue/workload ) diff --git a/ydb/tests/stress/simple_queue/workload/__init__.py b/ydb/tests/stress/simple_queue/workload/__init__.py new file mode 100644 index 000000000000..9c77d519d57d --- /dev/null +++ b/ydb/tests/stress/simple_queue/workload/__init__.py @@ -0,0 +1,521 @@ +# -*- coding: utf-8 -*- +import copy +import os +import random +import time +import string +import threading +import collections +import itertools +import queue +import ydb +from library.python.monlib.metric_registry import MetricRegistry +import socket + + +BLOB_MIN_SIZE = 128 * 1024 + + +def random_string(size): + return ''.join([random.choice(string.ascii_lowercase) for _ in range(size)]) + + +def generate_blobs(count=32): + return [random_string(idx * BLOB_MIN_SIZE) for idx in range(1, count + 1)] + + +class EventKind(object): + ALTER_TABLE = 'alter_table' + COPY_TABLE = 'copy_table' + DROP_TABLE = 'drop_table' + START_READ_TABLE = 'start_read_table' + READ_TABLE_CHUNK = 'read_table_chunk' + + WRITE = 'write' + REMOVE_OUTDATED = 'remove_outdated' + FIND_OUTDATED = 'find_outdated' + + SCAN_QUERY_CHUNK = 'scan_query_chunk' + START_SCAN_QUERY = 'start_scan_query' + + @classmethod + def periodic_tasks(cls): + return ( + cls.START_READ_TABLE, + cls.START_SCAN_QUERY, + ) + + @classmethod + def rare(cls): + return ( + cls.ALTER_TABLE, + cls.DROP_TABLE, + cls.COPY_TABLE, + cls.START_READ_TABLE, + cls.START_SCAN_QUERY, + ) + + @classmethod + def list(cls): + return ( + cls.COPY_TABLE, + cls.DROP_TABLE, + cls.ALTER_TABLE, + + cls.START_READ_TABLE, + cls.READ_TABLE_CHUNK, + + cls.FIND_OUTDATED, + cls.REMOVE_OUTDATED, + cls.WRITE, + cls.START_SCAN_QUERY, + cls.SCAN_QUERY_CHUNK, + ) + + +def get_table_description(table_name, mode): + if mode == "row": + store_entry = "STORE = ROW," + ttl_entry = """TTL = Interval("PT240S") ON `timestamp` AS SECONDS,""" + elif mode == "column": + store_entry = "STORE = COLUMN," + ttl_entry = "" + else: + raise RuntimeError("Unkown mode: {}".format(mode)) + + return f""" + CREATE TABLE `{table_name}` ( + key Uint64 NOT NULL, + `timestamp` Uint64 NOT NULL, + value Utf8 FAMILY lz4_family NOT NULL, + PRIMARY KEY (key), + FAMILY lz4_family ( + COMPRESSION = "lz4" + ), + INDEX by_timestamp GLOBAL ON (`timestamp`) + ) + WITH ( + {store_entry} + {ttl_entry} + AUTO_PARTITIONING_BY_SIZE = ENABLED, + AUTO_PARTITIONING_BY_LOAD = ENABLED, + AUTO_PARTITIONING_PARTITION_SIZE_MB = 128, + READ_REPLICAS_SETTINGS = "PER_AZ:1", + KEY_BLOOM_FILTER = ENABLED + ); + """.format(table_name=table_name) + + +def timestamp(): + return int(time.time()) + + +def extract_keys(response): + try: + result_sets = response.result() + return result_sets[0].rows + except Exception: + return [] + + +def status_code_to_label(status=None): + if status is None: + return 'Success' + return status.name.lower().capitalize() + + +class WorkloadStats(object): + def __init__(self, *evs): + self.lock = threading.Lock() + self.registry = MetricRegistry() + self.by_events_stats = {} + for ev in evs: + self.init_kind(ev) + + def int_gauge(self, sensor, **labels): + all_labels = copy.deepcopy(labels) + all_labels.update({'sensor': sensor}) + return self.registry.int_gauge(all_labels) + + def rate(self, sensor, **labels): + all_labels = copy.deepcopy(labels) + all_labels.update({'sensor': sensor}) + return self.registry.rate(all_labels) + + def init_kind(self, ev_kind): + self.by_events_stats[ev_kind] = {} + for status in list(ydb.StatusCode): + if status == ydb.StatusCode.STATUS_CODE_UNSPECIFIED: + continue + + label = status_code_to_label(status) + self.by_events_stats[ev_kind][label] = self.rate( + label, event=ev_kind) + + def save_event(self, ev_kind, details=None): + label = status_code_to_label(details) + if ev_kind not in self.by_events_stats: + return + + self.by_events_stats[ev_kind][label].inc() + + def print_stats(self): + report = ["=" * 120] + for event_kind, stats in self.by_events_stats.items(): + something_appended = False + total_response_count = sum([responses_count.get() for responses_count in stats.values()]) + if total_response_count == 0: + continue + + for response_kind, responses_count in stats.items(): + value = responses_count.get() + is_success = response_kind == status_code_to_label() + if value > 0 or is_success: + something_appended = True + line = "EventKind: {event_kind}, {response_kind} responses count: {responses_count}".format( + event_kind=event_kind, + response_kind=response_kind, + responses_count=value, + ) + if is_success: + line += " ({:.2f}%)".format(100.0 * value / total_response_count) + report.append(line) + if something_appended: + report.append("") + report.append("=" * 120) + print("\n".join(report)) + + +class YdbQueue(object): + def __init__(self, idx, database, stats, driver, pool, mode): + self.working_dir = os.path.join(database, socket.gethostname().split('.')[0].replace('-', '_') + "_" + str(idx)) + self.copies_dir = os.path.join(self.working_dir, 'copies') + self.table_name = self.table_name_with_timestamp() + self.queries = {} + self.pool = pool + self.driver = driver + self.stats = stats + self.blobs = generate_blobs(16) + self.blobs_iter = itertools.cycle(self.blobs) + self.outdated_period = 60 * 2 + self.database = database + self.ops = ydb.BaseRequestSettings().with_operation_timeout(19).with_timeout(20) + self.driver.scheme_client.make_directory(self.working_dir) + self.driver.scheme_client.make_directory(self.copies_dir) + self.mode = mode + print("Working dir %s" % self.working_dir) + f = self.prepare_new_queue(self.table_name) + f.result() + # a queue with tables to drop + self.drop_queue = collections.deque() + # a set with keys that are ready to be removed + self.outdated_keys = collections.deque() + self.outdated_keys_max_size = 50 + + def table_name_with_timestamp(self, working_dir=None): + if working_dir is not None: + return os.path.join(working_dir, "queue_" + str(timestamp())) + return os.path.join(self.working_dir, "queue_" + str(timestamp())) + + def prepare_new_queue(self, table_name=None): + session = self.pool.acquire() + table_name = self.table_name_with_timestamp() if table_name is None else table_name + f = session.async_execute_scheme(get_table_description(table_name, self.mode), settings=self.ops) + f.add_done_callback(lambda x: self.on_received_response(session, x, 'create')) + return f + + def switch(self, switch_to): + self.table_name = switch_to + self.outdated_keys.clear() + + def on_received_response(self, session, response, event, callback=None): + self.pool.release(session) + + if callback is not None: + callback(response) + + try: + response.result() + self.stats.save_event(event) + except ydb.Error as e: + debug = False + if debug: + print(event) + print(e) + print() + + self.stats.save_event(event, e.status) + + def send_query(self, query, parameters, event_kind, callback=None): + session = self.pool.acquire() + f = session.transaction().async_execute( + query, parameters=parameters, commit_tx=True, settings=self.ops) + f.add_done_callback( + lambda response: self.on_received_response( + session, response, event_kind, callback + ) + ) + return f + + def on_list_response(self, base, f, switch=True): + try: + response = f.result() + except ydb.Error: + return + + tables = [] + for child in response.children: + if child.is_directory(): + continue + + tables.append( + os.path.join( + base, child.name + ) + ) + + candidates = list(sorted(tables)) + if switch: + switch_to = candidates.pop() + self.switch(switch_to) + self.drop_queue.extend(candidates) + + def list_copies_dir(self): + f = self.driver.scheme_client.async_list_directory(self.copies_dir) + f.add_done_callback( + lambda x: self.on_list_response( + self.copies_dir, x, switch=False + ) + ) + + def list_working_dir(self): + f = self.driver.scheme_client.async_list_directory(self.working_dir) + f.add_done_callback( + lambda x: self.on_list_response( + self.working_dir, x, switch=True, + ) + ) + + def remove_outdated(self): + try: + keys_set = self.outdated_keys.popleft() + except IndexError: + return + + query = ydb.DataQuery( + """ + --!syntax_v1 + DECLARE $keys as List<Struct<key: Uint64>>; + DELETE FROM `{}` ON SELECT `key` FROM AS_TABLE($keys); + """.format(self.table_name), { + '$keys': ydb.ListType(ydb.StructType().add_member('key', ydb.PrimitiveType.Uint64)).proto + } + ) + parameters = { + '$keys': keys_set + } + + return self.send_query(query=query, event_kind=EventKind.REMOVE_OUTDATED, parameters=parameters) + + def on_find_outdated(self, resp): + try: + rs = resp.result() + self.outdated_keys.append(rs[0].rows) + except ydb.Error: + return + + def find_outdated(self): + # ensure queue is not large enough + if len(self.outdated_keys) > self.outdated_keys_max_size: + return + + outdated_timestamp = timestamp() - self.outdated_period + query = """ + --!syntax_v1 + SELECT `key` FROM `{table_name}` + WHERE `key` <= {outdated_timestamp} + ORDER BY `key` + LIMIT 50; + """.format(table_name=self.table_name, outdated_timestamp=outdated_timestamp) + parameters = None + return self.send_query(query=query, event_kind=EventKind.FIND_OUTDATED, parameters=parameters, callback=self.on_find_outdated) + + def write(self): + current_timestamp = timestamp() + blob = next(self.blobs_iter) + query = ydb.DataQuery( + """ + --!syntax_v1 + DECLARE $key as Uint64; + DECLARE $value as Utf8; + DECLARE $timestamp as Uint64; + UPSERT INTO `{}` (`key`, `timestamp`, `value`) VALUES ($key, $timestamp, $value); + """.format(self.table_name), + { + '$key': ydb.PrimitiveType.Uint64.proto, + '$value': ydb.PrimitiveType.Utf8.proto, + '$timestamp': ydb.PrimitiveType.Uint64.proto, + } + ) + parameters = { + '$key': current_timestamp, + '$value': blob, + '$timestamp': current_timestamp, + } + + return self.send_query(query=query, event_kind=EventKind.WRITE, parameters=parameters) + + def move_iterator(self, it, callback): + next_f = next(it) + next_f.add_done_callback(lambda x: callback(it, x)) + + def on_read_table_chunk(self, it, f): + try: + f.result() + self.stats.save_event(EventKind.READ_TABLE_CHUNK) + except ydb.Error as e: + self.stats.save_event(EventKind.READ_TABLE_CHUNK, e.status) + except StopIteration: + return + self.move_iterator(it, self.on_read_table_chunk) + + def on_scan_query_chunk(self, it, f): + try: + f.result() + self.stats.save_event(EventKind.SCAN_QUERY_CHUNK) + except ydb.Error as e: + self.stats.save_event(EventKind.SCAN_QUERY_CHUNK, e.status) + except StopIteration: + return + + self.move_iterator(it, self.on_scan_query_chunk) + + def start_scan_query(self): + it = self.driver.table_client.async_scan_query('select count(*) as cnt from `%s`' % self.table_name) + self.stats.save_event(EventKind.START_SCAN_QUERY) + self.move_iterator(it, self.on_scan_query_chunk) + + def start_read_table(self): + with self.pool.checkout() as session: + it = session.async_read_table(self.table_name, columns=('key', )) + self.stats.save_event(EventKind.START_READ_TABLE) + self.move_iterator(it, self.on_read_table_chunk) + + def alter_table(self): + session = self.pool.acquire() + query = "ALTER TABLE `{table_name}` ADD COLUMN column_{val} Utf8".format( + table_name=self.table_name, + val=random.randint(1, 100000), + ) + + f = session.async_execute_scheme(query, settings=self.ops) + f.add_done_callback( + lambda response: self.on_received_response( + session, response, EventKind.ALTER_TABLE, + ) + ) + + def drop_table(self): + duplicates = set() + while len(self.drop_queue) > 0: + candidate = self.drop_queue.popleft() + if candidate in duplicates: + continue + + duplicates.add(candidate) + session = self.pool.acquire() + f = session.async_drop_table(candidate, settings=self.ops) + f.add_done_callback( + lambda response: self.on_received_response( + session, response, EventKind.DROP_TABLE, + ) + ) + + def copy_table(self): + session = self.pool.acquire() + dst_table = self.table_name_with_timestamp(self.copies_dir) + f = session.async_copy_table(self.table_name, dst_table, settings=self.ops) + f.add_done_callback( + lambda response: self.on_received_response( + session, response, EventKind.COPY_TABLE + ) + ) + + +class Workload(object): + def __init__(self, endpoint, database, duration, mode): + self.database = database + self.driver = ydb.Driver(ydb.DriverConfig(endpoint, database)) + self.pool = ydb.SessionPool(self.driver, size=200) + self.round_size = 1000 + self.duration = duration + self.delayed_events = queue.Queue() + self.workload_stats = WorkloadStats(*EventKind.list()) + # TODO: run both modes in parallel? + self.mode = mode + self.ydb_queues = [ + YdbQueue(idx, database, self.workload_stats, self.driver, self.pool, self.mode) + for idx in range(2) + ] + + def random_points(self, size=1): + return set([random.randint(0, self.round_size) for _ in range(size)]) + + def loop(self): + started_at = time.time() + round_id_it = itertools.count(start=1) + queue_it = itertools.cycle(self.ydb_queues) + + while time.time() - started_at < self.duration: + + for ydb_queue in self.ydb_queues: + ydb_queue.list_working_dir() + ydb_queue.list_copies_dir() + print("Table name: %s" % ydb_queue.table_name) + + round_id = next(round_id_it) + if round_id % 10 == 0: + for ydb_queue in self.ydb_queues: + ydb_queue.prepare_new_queue() + + self.workload_stats.print_stats() + + schedule = [] + for op in EventKind.rare(): + schedule.extend([(point, op) for point in self.random_points()]) + + for op in EventKind.periodic_tasks(): + schedule.extend([(point, op) for point in self.random_points(size=50)]) + + schedule = collections.deque(list(sorted(schedule))) + + print("Starting round_id %d" % round_id) + print("Round schedule %s" % schedule) + for step_id in range(self.round_size): + + if time.time() - started_at > self.duration: + break + + ydb_queue = next(queue_it) + + if step_id % 100 == 0: + print("step_id %d" % step_id) + + yield ydb_queue.write + yield ydb_queue.find_outdated + yield ydb_queue.remove_outdated + + while len(schedule) > 0: + scheduled_at, op = schedule[0] + if scheduled_at != step_id: + break + + schedule.popleft() + yield getattr(ydb_queue, op) + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.pool.stop() + self.driver.stop() diff --git a/ydb/tests/stress/simple_queue/workload/ya.make b/ydb/tests/stress/simple_queue/workload/ya.make new file mode 100644 index 000000000000..73aae8bfa949 --- /dev/null +++ b/ydb/tests/stress/simple_queue/workload/ya.make @@ -0,0 +1,12 @@ +PY3_LIBRARY() + +PY_SRCS( + __init__.py +) + +PEERDIR( + library/python/monlib +) + + +END() diff --git a/ydb/tests/stress/simple_queue/ya.make b/ydb/tests/stress/simple_queue/ya.make index cd4c38ee24ab..f22406fd8118 100644 --- a/ydb/tests/stress/simple_queue/ya.make +++ b/ydb/tests/stress/simple_queue/ya.make @@ -5,8 +5,7 @@ PY_SRCS( ) PEERDIR( - ydb/public/sdk/python - library/python/monlib + ydb/tests/stress/simple_queue/workload ) END()