Skip to content

Commit efc612c

Browse files
authored
Freeze table by partitions (#219)
* Freeze table by partitions * ExecPool for generic purposes * Thread-safe freeze * Correct timeout for freeze by partitions * Fix lint * ExecPool classes without explicit Thread/ProcessPool in constructors * Remove ClickhouseClientMultithreading * Small fixes
1 parent 926c85b commit efc612c

File tree

9 files changed

+201
-57
lines changed

9 files changed

+201
-57
lines changed

ch_backup/backup/layout.py

+1-2
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,6 @@ def upload_table_create_statement(
8888
backup_meta: BackupMetadata,
8989
db: Database,
9090
table: Table,
91-
create_statement: bytes,
9291
) -> None:
9392
"""
9493
Upload table create statement.
@@ -105,7 +104,7 @@ def upload_table_create_statement(
105104
table.name,
106105
)
107106
self._storage_loader.upload_data(
108-
create_statement,
107+
table.create_statement,
109108
remote_path,
110109
is_async=True,
111110
encryption=backup_meta.encrypted,

ch_backup/ch_backup.py

+1-3
Original file line numberDiff line numberDiff line change
@@ -196,9 +196,7 @@ def backup(
196196
databases,
197197
db_tables,
198198
schema_only=sources.schema_only,
199-
freeze_threads=self._config["multiprocessing"][
200-
"freeze_threads"
201-
],
199+
multiprocessing_config=self._config["multiprocessing"],
202200
)
203201

204202
# Upload operations are async. Should wait until they are all finished.

ch_backup/clickhouse/client.py

+32-8
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,8 @@
22
ClickHouse client.
33
"""
44

5-
from typing import Any
5+
from contextlib import contextmanager
6+
from typing import Any, Iterator, Optional
67

78
import requests
89

@@ -34,6 +35,8 @@ class ClickhouseClient:
3435
"""
3536

3637
def __init__(self, config: dict, settings: dict = None) -> None:
38+
self._config = config
39+
self._settings = settings
3740
host = config["host"]
3841
protocol = config["protocol"]
3942
port = config["port"] or (8123 if protocol == "http" else 8443)
@@ -58,6 +61,7 @@ def query(
5861
settings: dict = None,
5962
timeout: float = None,
6063
should_retry: bool = True,
64+
new_session: bool = False,
6165
) -> Any:
6266
"""
6367
Execute query.
@@ -68,13 +72,18 @@ def query(
6872
if timeout is None:
6973
timeout = self.timeout
7074

71-
response = self._session.post(
72-
self._url,
73-
params=settings,
74-
json=post_data,
75-
timeout=(self.connect_timeout, timeout),
76-
data=query.encode("utf-8"),
77-
)
75+
# https://github.com/psf/requests/issues/2766
76+
# requests.Session object is not guaranteed to be thread-safe.
77+
# When using ClickhouseClient with multithreading, "new_session"
78+
# should be True, so a separate Session is used for each query.
79+
with self._get_session(new_session) as session:
80+
response = session.post(
81+
self._url,
82+
params=settings,
83+
json=post_data,
84+
timeout=(self.connect_timeout, timeout),
85+
data=query.encode("utf-8"),
86+
)
7887

7988
response.raise_for_status()
8089
except requests.exceptions.HTTPError as e:
@@ -87,6 +96,21 @@ def query(
8796
except ValueError:
8897
return str.strip(response.text)
8998

99+
@contextmanager
100+
def _get_session(
101+
self, new_session: Optional[bool] = False
102+
) -> Iterator[requests.Session]:
103+
session = (
104+
self._create_session(self._config, self._settings)
105+
if new_session
106+
else self._session
107+
)
108+
try:
109+
yield session
110+
finally:
111+
if new_session:
112+
session.close()
113+
90114
@staticmethod
91115
def _create_session(config, settings):
92116
session = requests.Session()

ch_backup/clickhouse/control.py

+69-6
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
from ch_backup.clickhouse.client import ClickhouseClient
2222
from ch_backup.clickhouse.models import Database, Disk, FrozenPart, Table
2323
from ch_backup.exceptions import ClickhouseBackupError
24+
from ch_backup.storage.async_pipeline.base_pipeline.exec_pool import ThreadExecPool
2425
from ch_backup.util import (
2526
chown_dir_contents,
2627
chown_file,
@@ -167,6 +168,13 @@
167168
"""
168169
)
169170

171+
FREEZE_PARTITION_SQL = strip_query(
172+
"""
173+
ALTER TABLE `{db_name}`.`{table_name}`
174+
FREEZE PARTITION ID '{partition}' WITH NAME '{backup_name}'
175+
"""
176+
)
177+
170178
SYSTEM_UNFREEZE_SQL = strip_query(
171179
"""
172180
SYSTEM UNFREEZE WITH NAME '{backup_name}'
@@ -435,6 +443,15 @@
435443
"""
436444
)
437445

446+
GET_PARTITIONS = strip_query(
447+
"""
448+
SELECT DISTINCT partition_id
449+
FROM system.parts
450+
WHERE database = '{db_name}' AND table = '{table_name}' AND active = 1
451+
FORMAT JSONCompact
452+
"""
453+
)
454+
438455

439456
# pylint: disable=too-many-public-methods
440457
class ClickhouseCTL:
@@ -535,18 +552,64 @@ def attach_table(self, table: Union[TableMetadata, Table]) -> None:
535552

536553
self._ch_client.query(query_sql)
537554

538-
def freeze_table(self, backup_name: str, table: Table) -> None:
555+
def freeze_table(
556+
self,
557+
backup_name: str,
558+
table: Table,
559+
threads: int,
560+
) -> None:
539561
"""
540562
Make snapshot of the specified table.
541563
"""
542-
query_sql = FREEZE_TABLE_SQL.format(
564+
# Table has no partitions or created with deprecated syntax.
565+
# FREEZE PARTITION ID with deprecated syntax throws segmentation fault in CH.
566+
freeze_by_partitions = threads > 0 and "PARTITION BY" in table.create_statement
567+
if freeze_by_partitions:
568+
with ThreadExecPool(max(1, threads)) as pool:
569+
if freeze_by_partitions:
570+
partitions_to_freeze = self.list_partitions(table)
571+
for partition in partitions_to_freeze:
572+
query_sql = FREEZE_PARTITION_SQL.format(
573+
db_name=escape(table.database),
574+
table_name=escape(table.name),
575+
backup_name=backup_name,
576+
partition=partition,
577+
)
578+
pool.submit(
579+
f'Freeze partition "{partition}"',
580+
self._ch_client.query,
581+
query_sql,
582+
timeout=self._freeze_timeout,
583+
should_retry=False,
584+
new_session=True,
585+
)
586+
pool.wait_all(
587+
keep_going=False,
588+
timeout=self._freeze_timeout,
589+
)
590+
else:
591+
query_sql = FREEZE_TABLE_SQL.format(
592+
db_name=escape(table.database),
593+
table_name=escape(table.name),
594+
backup_name=backup_name,
595+
)
596+
self._ch_client.query(
597+
query_sql,
598+
timeout=self._freeze_timeout,
599+
should_retry=False,
600+
new_session=True,
601+
)
602+
603+
def list_partitions(self, table: Table) -> List[str]:
604+
"""
605+
Get list of active partitions for table.
606+
"""
607+
query_sql = GET_PARTITIONS.format(
543608
db_name=escape(table.database),
544609
table_name=escape(table.name),
545-
backup_name=backup_name,
546-
)
547-
self._ch_client.query(
548-
query_sql, timeout=self._freeze_timeout, should_retry=False
549610
)
611+
result = self._ch_client.query(query_sql, should_retry=False)["data"]
612+
return [partition[0] for partition in result]
550613

551614
def system_unfreeze(self, backup_name: str) -> None:
552615
"""

ch_backup/config.py

+2
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,8 @@ def _as_seconds(t: str) -> int:
160160
"cloud_storage_restore_workers": 4,
161161
# The number of threads for parallel freeze of tables
162162
"freeze_threads": 4,
163+
# The number of threads for parallel freeze of partitions. If set to 0, will freeze table in one query.
164+
"freeze_partition_threads": 16,
163165
# The number of threads for parallel drop replica
164166
"drop_replica_threads": 8,
165167
},

ch_backup/logic/table.py

+26-23
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44

55
import os
66
from collections import deque
7-
from concurrent.futures import Future, ThreadPoolExecutor, as_completed
87
from dataclasses import dataclass
98
from functools import partial
109
from itertools import chain
@@ -28,6 +27,9 @@
2827
from ch_backup.exceptions import ClickhouseBackupError
2928
from ch_backup.logic.backup_manager import BackupManager
3029
from ch_backup.logic.upload_part_observer import UploadPartObserver
30+
from ch_backup.storage.async_pipeline.base_pipeline.exec_pool import (
31+
ThreadExecPool,
32+
)
3133
from ch_backup.util import compare_schema
3234

3335

@@ -54,7 +56,7 @@ def backup(
5456
databases: Sequence[Database],
5557
db_tables: Dict[str, list],
5658
schema_only: bool,
57-
freeze_threads: int,
59+
multiprocessing_config: Dict,
5860
) -> None:
5961
"""
6062
Backup tables metadata, MergeTree data and Cloud storage metadata.
@@ -76,7 +78,7 @@ def backup(
7678
db_tables[db.name],
7779
backup_name,
7880
schema_only,
79-
freeze_threads,
81+
multiprocessing_config,
8082
)
8183

8284
def _collect_local_metadata_change_times(
@@ -110,7 +112,7 @@ def _backup(
110112
tables: Sequence[str],
111113
backup_name: str,
112114
schema_only: bool,
113-
freeze_threads: int,
115+
multiprocessing_config: Dict,
114116
) -> None:
115117
"""
116118
Backup single database tables.
@@ -133,33 +135,32 @@ def _backup(
133135
# Create shadow/increment.txt if not exists manually to avoid
134136
# race condition with parallel freeze
135137
context.ch_ctl.create_shadow_increment()
136-
futures: List[Future] = []
137-
with ThreadPoolExecutor(max_workers=freeze_threads) as pool:
138+
with ThreadExecPool(
139+
multiprocessing_config.get("freeze_threads", 1)
140+
) as pool:
138141
for table in tables_:
139-
future = pool.submit(
142+
pool.submit(
143+
f'Freeze table "{table.database}"."{table.name}"',
140144
TableBackup._freeze_table,
141145
context,
142146
db,
143147
table,
144148
backup_name,
145149
schema_only,
150+
multiprocessing_config.get("freeze_partition_threads", 0),
146151
)
147-
futures.append(future)
148152

149-
for future in as_completed(futures):
150-
table_and_create_statement = future.result()
151-
if table_and_create_statement is not None:
152-
table, create_statement = table_and_create_statement
153+
for freezed_table in pool.as_completed(keep_going=False):
154+
if freezed_table is not None:
153155
self._backup_freezed_table(
154156
context,
155157
db,
156-
table,
158+
freezed_table,
157159
backup_name,
158160
schema_only,
159161
change_times,
160-
create_statement,
161162
)
162-
self._backup_cloud_storage_metadata(context, table)
163+
self._backup_cloud_storage_metadata(context, freezed_table)
163164

164165
context.backup_layout.wait()
165166
context.ch_ctl.remove_freezed_data()
@@ -174,13 +175,14 @@ def _freeze_table(
174175
table: Table,
175176
backup_name: str,
176177
schema_only: bool,
177-
) -> Optional[Tuple[Table, bytes]]:
178+
freeze_partition_threads: int,
179+
) -> Optional[Table]:
178180
"""
179181
Freeze table and return it's create statement
180182
"""
181183
logging.debug('Trying to freeze "{}"."{}"', table.database, table.name)
182-
183184
create_statement = TableBackup._load_create_statement_from_disk(table)
185+
table.create_statement = create_statement or ""
184186
if not create_statement:
185187
logging.warning(
186188
'Skipping table backup for "{}"."{}". Local metadata is empty or absent',
@@ -192,7 +194,9 @@ def _freeze_table(
192194
# Freeze only MergeTree tables
193195
if not schema_only and table.is_merge_tree():
194196
try:
195-
context.ch_ctl.freeze_table(backup_name, table)
197+
context.ch_ctl.freeze_table(
198+
backup_name, table, freeze_partition_threads
199+
)
196200
except ClickhouseError:
197201
if context.ch_ctl.does_table_exist(table.database, table.name):
198202
logging.error(
@@ -209,10 +213,10 @@ def _freeze_table(
209213
)
210214
return None
211215

212-
return (table, create_statement)
216+
return table
213217

214218
@staticmethod
215-
def _load_create_statement_from_disk(table: Table) -> Optional[bytes]:
219+
def _load_create_statement_from_disk(table: Table) -> Optional[str]:
216220
"""
217221
Load a create statement of the table from a metadata file on the disk.
218222
"""
@@ -224,7 +228,7 @@ def _load_create_statement_from_disk(table: Table) -> Optional[bytes]:
224228
)
225229
return None
226230
try:
227-
return Path(table.metadata_path).read_bytes()
231+
return Path(table.metadata_path).read_text("utf-8")
228232
except OSError as e:
229233
logging.debug(
230234
'Cannot load a create statement of the table "{}"."{}": {}',
@@ -378,7 +382,6 @@ def _backup_freezed_table(
378382
backup_name: str,
379383
schema_only: bool,
380384
change_times: Dict[str, TableMetadataChangeTime],
381-
create_statement: bytes,
382385
) -> None:
383386
# Check if table metadata was updated
384387
new_change_time = self._get_change_time(table.metadata_path)
@@ -400,7 +403,7 @@ def _backup_freezed_table(
400403
)
401404
# Backup table metadata
402405
context.backup_layout.upload_table_create_statement(
403-
context.backup_meta, db, table, create_statement
406+
context.backup_meta, db, table
404407
)
405408
# Backup table data
406409
if not schema_only:

0 commit comments

Comments
 (0)