From 84aceeaabebb1ad03f00597e29ae2a9a81642560 Mon Sep 17 00:00:00 2001 From: mjs1995 Date: Thu, 27 Jul 2023 21:14:13 +0900 Subject: [PATCH 1/6] feat: implement UserVarEvent and add testcases Co-authored-by: heehehe --- pymysqlreplication/binlogstream.py | 7 ++- pymysqlreplication/event.py | 73 ++++++++++++++++++++++++++ pymysqlreplication/packet.py | 2 + pymysqlreplication/tests/test_basic.py | 58 ++++++++++++++++++-- 4 files changed, 134 insertions(+), 6 deletions(-) diff --git a/pymysqlreplication/binlogstream.py b/pymysqlreplication/binlogstream.py index fa65aa22..136a5c06 100644 --- a/pymysqlreplication/binlogstream.py +++ b/pymysqlreplication/binlogstream.py @@ -14,7 +14,8 @@ QueryEvent, RotateEvent, FormatDescriptionEvent, XidEvent, GtidEvent, StopEvent, XAPrepareEvent, BeginLoadQueryEvent, ExecuteLoadQueryEvent, - HeartbeatLogEvent, NotImplementedEvent, MariadbGtidEvent) + HeartbeatLogEvent, NotImplementedEvent, + MariadbGtidEvent, RandEvent, UserVarEvent) from .exceptions import BinLogNotEnabled from .row_event import ( UpdateRowsEvent, WriteRowsEvent, DeleteRowsEvent, TableMapEvent) @@ -600,7 +601,9 @@ def _allowed_event_list(self, only_events, ignored_events, TableMapEvent, HeartbeatLogEvent, NotImplementedEvent, - MariadbGtidEvent + MariadbGtidEvent, + RandEvent, + UserVarEvent )) if ignored_events is not None: for e in ignored_events: diff --git a/pymysqlreplication/event.py b/pymysqlreplication/event.py index d75c9db8..8ade0c53 100644 --- a/pymysqlreplication/event.py +++ b/pymysqlreplication/event.py @@ -436,6 +436,79 @@ def _dump(self): print("type: %d" % (self.type)) print("Value: %d" % (self.value)) +class RandEvent(BinLogEvent): + """ + RandEvent is generated every time a statement uses the RAND() function. + Indicates the seed values to use for generating a random number with RAND() in the next statement. + + RandEvent only works in statement-based logging (need to set binlog_format as 'STATEMENT') + and only works when the seed number is not specified. + + :ivar seed1: int - value for the first seed + :ivar seed2: int - value for the second seed + """ + + def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs): + super(RandEvent, self).__init__(from_packet, event_size, table_map, + ctl_connection, **kwargs) + # Payload + self._seed1 = self.packet.read_uint64() + self._seed2 = self.packet.read_uint64() + + @property + def seed1(self): + """Get the first seed value""" + return self._seed1 + + @property + def seed2(self): + """Get the second seed value""" + return self._seed2 + + def _dump(self): + super(RandEvent, self)._dump() + print("seed1: %d" % (self.seed1)) + print("seed2: %d" % (self.seed2)) + +class UserVarEvent(BinLogEvent): + """ + UserVarEvent is generated every time a statement uses a user variable. + Indicates the value to use for the user variable in the next statement. + + :ivar name_len: int - Length of user variable + :ivar name: str - User variable name + :ivar value: str - Value of the user variable + :ivar type: int - Type of the user variable + :ivar charset: int - The number of the character set for the user variable + :ivar is_null: int - Non-zero if the variable value is the SQL NULL value, 0 otherwise + :ivar flags: int - Extra flags associated with the user variable + """ + + def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs): + super(UserVarEvent, self).__init__(from_packet, event_size, table_map, ctl_connection, **kwargs) + + # Payload + self.name_len = self.packet.read_uint32() + self.name = self.packet.read(self.name_len).decode() + self.is_null = self.packet.read_uint8() + + if not self.is_null: + self.type = self.packet.read_uint8() + self.charset = self.packet.read_uint32() + self.value_len = self.packet.read_uint32() + self.value = self.packet.read(self.value_len).decode() + self.flags = self.packet.read_uint8() + + def _dump(self): + super(UserVarEvent, self)._dump() + print("User variable name: %s" % self.name) + print("Is NULL: %s" % ("Yes" if self.is_null else "No")) + if not self.is_null: + print("Type: %s" % self.type) + print("Charset: %s" % self.charset) + print("Value: %s" % self.value) + if self.flags is not None: + print("Flags: %s" % self.flags) class NotImplementedEvent(BinLogEvent): def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs): diff --git a/pymysqlreplication/packet.py b/pymysqlreplication/packet.py index 94baefdf..b11e3d8c 100644 --- a/pymysqlreplication/packet.py +++ b/pymysqlreplication/packet.py @@ -70,6 +70,8 @@ class BinLogPacketWrapper(object): constants.EXECUTE_LOAD_QUERY_EVENT: event.ExecuteLoadQueryEvent, constants.HEARTBEAT_LOG_EVENT: event.HeartbeatLogEvent, constants.XA_PREPARE_EVENT: event.XAPrepareEvent, + constants.RAND_EVENT: event.RandEvent, + constants.USER_VAR_EVENT: event.UserVarEvent, # row_event constants.UPDATE_ROWS_EVENT_V1: row_event.UpdateRowsEvent, constants.WRITE_ROWS_EVENT_V1: row_event.WriteRowsEvent, diff --git a/pymysqlreplication/tests/test_basic.py b/pymysqlreplication/tests/test_basic.py index 0db8a264..2c97296d 100644 --- a/pymysqlreplication/tests/test_basic.py +++ b/pymysqlreplication/tests/test_basic.py @@ -17,7 +17,7 @@ from pymysqlreplication.constants.BINLOG import * from pymysqlreplication.row_event import * -__all__ = ["TestBasicBinLogStreamReader", "TestMultipleRowBinLogStreamReader", "TestCTLConnectionSettings", "TestGtidBinLogStreamReader"] +__all__ = ["TestBasicBinLogStreamReader", "TestMultipleRowBinLogStreamReader", "TestCTLConnectionSettings", "TestGtidBinLogStreamReader", "TestStatementConnectionSetting"] class TestBasicBinLogStreamReader(base.PyMySQLReplicationTestCase): @@ -25,9 +25,9 @@ def ignoredEvents(self): return [GtidEvent] def test_allowed_event_list(self): - self.assertEqual(len(self.stream._allowed_event_list(None, None, False)), 16) - self.assertEqual(len(self.stream._allowed_event_list(None, None, True)), 15) - self.assertEqual(len(self.stream._allowed_event_list(None, [RotateEvent], False)), 15) + self.assertEqual(len(self.stream._allowed_event_list(None, None, False)), 18) + self.assertEqual(len(self.stream._allowed_event_list(None, None, True)), 17) + self.assertEqual(len(self.stream._allowed_event_list(None, [RotateEvent], False)), 17) self.assertEqual(len(self.stream._allowed_event_list([RotateEvent], None, False)), 1) def test_read_query_event(self): @@ -1002,6 +1002,56 @@ def test_parsing(self): gtid = Gtid("57b70f4e-20d3-11e5-a393-4a63946f7eac:1-:1") gtid = Gtid("57b70f4e-20d3-11e5-a393-4a63946f7eac::1") +class TestStatementConnectionSetting(base.PyMySQLReplicationTestCase): + def setUp(self): + super(TestStatementConnectionSetting, self).setUp() + self.stream.close() + self.stream = BinLogStreamReader( + self.database, + server_id=1024, + only_events=(RandEvent, UserVarEvent, QueryEvent), + fail_on_table_metadata_unavailable=True + ) + self.execute("SET @@binlog_format='STATEMENT'") + + def test_rand_event(self): + self.execute("CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data INT NOT NULL, PRIMARY KEY (id))") + self.execute("INSERT INTO test (data) VALUES(RAND())") + self.execute("COMMIT") + + self.assertEqual(self.bin_log_format(), "STATEMENT") + self.assertIsInstance(self.stream.fetchone(), QueryEvent) + self.assertIsInstance(self.stream.fetchone(), QueryEvent) + + expected_rand_event = self.stream.fetchone() + self.assertIsInstance(expected_rand_event, RandEvent) + self.assertEqual(type(expected_rand_event.seed1), int) + self.assertEqual(type(expected_rand_event.seed2), int) + + def test_user_var_event(self): + self.execute("CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR(50), PRIMARY KEY (id))") + self.execute("SET @test_user_var = 'foo'") + self.execute("INSERT INTO test (data) VALUES(@test_user_var)") + self.execute("COMMIT") + + self.assertEqual(self.bin_log_format(), "STATEMENT") + self.assertIsInstance(self.stream.fetchone(), QueryEvent) + self.assertIsInstance(self.stream.fetchone(), QueryEvent) + + expected_user_var_event = self.stream.fetchone() + self.assertIsInstance(expected_user_var_event, UserVarEvent) + self.assertEqual(type(expected_user_var_event.name_len), int) + self.assertEqual(expected_user_var_event.name, "test_user_var") + self.assertEqual(expected_user_var_event.value, "foo") + self.assertEqual(expected_user_var_event.is_null, 0) + self.assertEqual(expected_user_var_event.type, 0) + self.assertEqual(expected_user_var_event.charset, 33) + + def tearDown(self): + self.execute("SET @@binlog_format='ROW'") + self.assertEqual(self.bin_log_format(), "ROW") + super(TestStatementConnectionSetting, self).tearDown() + if __name__ == "__main__": import unittest From 6a02f4bb6ff5b3b11e2c243736f395172efe274c Mon Sep 17 00:00:00 2001 From: mjs Date: Sat, 12 Aug 2023 09:49:27 +0900 Subject: [PATCH 2/6] feat: extract values by type Co-authored-by: sean-k1 --- pymysqlreplication/event.py | 86 +++++++++++++++++++++++++++++++++++-- 1 file changed, 83 insertions(+), 3 deletions(-) diff --git a/pymysqlreplication/event.py b/pymysqlreplication/event.py index 8ade0c53..8caf1bf8 100644 --- a/pymysqlreplication/event.py +++ b/pymysqlreplication/event.py @@ -3,6 +3,7 @@ import binascii import struct import datetime +import decimal from pymysqlreplication.constants.STATUS_VAR_KEY import * from pymysqlreplication.exceptions import StatusVariableMismatch @@ -484,6 +485,14 @@ class UserVarEvent(BinLogEvent): :ivar flags: int - Extra flags associated with the user variable """ + type_codes = { + 0x00: 'STRING_RESULT', + 0x01: 'REAL_RESULT', + 0x02: 'INT_RESULT', + 0x03: 'ROW_RESULT', + 0x04: 'DECIMAL_RESULT', + } + def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs): super(UserVarEvent, self).__init__(from_packet, event_size, table_map, ctl_connection, **kwargs) @@ -496,15 +505,86 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs) self.type = self.packet.read_uint8() self.charset = self.packet.read_uint32() self.value_len = self.packet.read_uint32() - self.value = self.packet.read(self.value_len).decode() + + type_to_method = { + 0x00: self._read_string, + 0x01: self._read_real, + 0x02: self._read_int, + 0x04: self._read_decimal + } + + self.value = type_to_method.get(self.type, self._read_default)() self.flags = self.packet.read_uint8() - + else: + self.type, self.charset, self.value, self.flags = None, None, None, None + + def _read_string(self): + return self.packet.read(self.value_len).decode() + + def _read_real(self): + return str(struct.unpack('d', self.packet.read(8))[0]) + + def _read_int(self): + return str(self.packet.read_uint_by_size(self.value_len)) + + def _read_decimal(self): + self.precision = self.packet.read_uint8() + self.decimals = self.packet.read_uint8() + raw_decimal = self.packet.read(self.value_len) + return self._parse_decimal_from_bytes(raw_decimal, self.precision, self.decimals) + + def _read_default(self): + return self.packet.read(self.value_len) + + @staticmethod + def _parse_decimal_from_bytes(raw_decimal, precision, decimals): + digits_per_integer = 9 + compressed_bytes = [0, 1, 1, 2, 2, 3, 3, 4, 4, 4] + integral = precision - decimals + + uncomp_integral, comp_integral = divmod(integral, digits_per_integer) + uncomp_fractional, comp_fractional = divmod(decimals, digits_per_integer) + + res = "-" if not raw_decimal[0] & 0x80 else "" + mask = -1 if res == "-" else 0 + raw_decimal = bytearray([raw_decimal[0] ^ 0x80]) + raw_decimal[1:] + + def decode_decimal_decompress_value(comp_indx, data, mask): + size = compressed_bytes[comp_indx] + if size > 0: + databuff = bytearray(data[:size]) + for i in range(size): + databuff[i] ^= mask + return size, int.from_bytes(databuff, byteorder='big') + return 0, 0 + + pointer, value = decode_decimal_decompress_value(comp_integral, raw_decimal, mask) + res += str(value) + + for _ in range(uncomp_integral): + value = struct.unpack('>i', raw_decimal[pointer:pointer+4])[0] ^ mask + res += '%09d' % value + pointer += 4 + + res += "." + + for _ in range(uncomp_fractional): + value = struct.unpack('>i', raw_decimal[pointer:pointer+4])[0] ^ mask + res += '%09d' % value + pointer += 4 + + size, value = decode_decimal_decompress_value(comp_fractional, raw_decimal[pointer:], mask) + if size > 0: + res += '%0*d' % (comp_fractional, value) + + return decimal.Decimal(res) + def _dump(self): super(UserVarEvent, self)._dump() print("User variable name: %s" % self.name) print("Is NULL: %s" % ("Yes" if self.is_null else "No")) if not self.is_null: - print("Type: %s" % self.type) + print("Type: %s" % self.type_codes.get(self.type, 'UNKNOWN_TYPE')) print("Charset: %s" % self.charset) print("Value: %s" % self.value) if self.flags is not None: From 2585668b67fd65217775c4310e4672366ffe772f Mon Sep 17 00:00:00 2001 From: heehehe Date: Sat, 19 Aug 2023 10:13:29 +0900 Subject: [PATCH 3/6] test: add user_var_event tests by types Co-authored-by: mjs1995 --- pymysqlreplication/tests/test_basic.py | 61 +++++++++++++++++++++++++- 1 file changed, 59 insertions(+), 2 deletions(-) diff --git a/pymysqlreplication/tests/test_basic.py b/pymysqlreplication/tests/test_basic.py index 2c97296d..9440beed 100644 --- a/pymysqlreplication/tests/test_basic.py +++ b/pymysqlreplication/tests/test_basic.py @@ -1028,7 +1028,7 @@ def test_rand_event(self): self.assertEqual(type(expected_rand_event.seed1), int) self.assertEqual(type(expected_rand_event.seed2), int) - def test_user_var_event(self): + def test_user_var_string_event(self): self.execute("CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR(50), PRIMARY KEY (id))") self.execute("SET @test_user_var = 'foo'") self.execute("INSERT INTO test (data) VALUES(@test_user_var)") @@ -1040,13 +1040,70 @@ def test_user_var_event(self): expected_user_var_event = self.stream.fetchone() self.assertIsInstance(expected_user_var_event, UserVarEvent) - self.assertEqual(type(expected_user_var_event.name_len), int) + self.assertIsInstance(expected_user_var_event.name_len, int) self.assertEqual(expected_user_var_event.name, "test_user_var") self.assertEqual(expected_user_var_event.value, "foo") self.assertEqual(expected_user_var_event.is_null, 0) self.assertEqual(expected_user_var_event.type, 0) self.assertEqual(expected_user_var_event.charset, 33) + def test_user_var_real_event(self): + self.execute("CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data REAL, PRIMARY KEY (id))") + self.execute("SET @test_user_var = @@timestamp") + self.execute("INSERT INTO test (data) VALUES(@test_user_var)") + self.execute("COMMIT") + + self.assertEqual(self.bin_log_format(), "STATEMENT") + self.assertIsInstance(self.stream.fetchone(), QueryEvent) + self.assertIsInstance(self.stream.fetchone(), QueryEvent) + + expected_user_var_event = self.stream.fetchone() + self.assertIsInstance(expected_user_var_event, UserVarEvent) + self.assertIsInstance(expected_user_var_event.name_len, int) + self.assertEqual(expected_user_var_event.name, "test_user_var") + self.assertIsInstance(expected_user_var_event.value, str) + self.assertEqual(expected_user_var_event.is_null, 0) + self.assertEqual(expected_user_var_event.type, 1) + self.assertEqual(expected_user_var_event.charset, 33) + + def test_user_var_int_event(self): + self.execute("CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data INT, PRIMARY KEY (id))") + self.execute("SET @test_user_var = 5") + self.execute("INSERT INTO test (data) VALUES(@test_user_var)") + self.execute("COMMIT") + + self.assertEqual(self.bin_log_format(), "STATEMENT") + self.assertIsInstance(self.stream.fetchone(), QueryEvent) + self.assertIsInstance(self.stream.fetchone(), QueryEvent) + + expected_user_var_event = self.stream.fetchone() + self.assertIsInstance(expected_user_var_event, UserVarEvent) + self.assertIsInstance(expected_user_var_event.name_len, int) + self.assertEqual(expected_user_var_event.name, "test_user_var") + self.assertEqual(expected_user_var_event.value, '5') # TODO: have to fix to int 5 + self.assertEqual(expected_user_var_event.is_null, 0) + self.assertEqual(expected_user_var_event.type, 2) + self.assertEqual(expected_user_var_event.charset, 33) + + def test_user_var_decimal_event(self): + self.execute("CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data DECIMAL, PRIMARY KEY (id))") + self.execute("SET @test_user_var = 5.25") + self.execute("INSERT INTO test (data) VALUES(@test_user_var)") + self.execute("COMMIT") + + self.assertEqual(self.bin_log_format(), "STATEMENT") + self.assertIsInstance(self.stream.fetchone(), QueryEvent) + self.assertIsInstance(self.stream.fetchone(), QueryEvent) + + expected_user_var_event = self.stream.fetchone() + self.assertIsInstance(expected_user_var_event, UserVarEvent) + self.assertIsInstance(expected_user_var_event.name_len, int) + self.assertEqual(expected_user_var_event.name, "test_user_var") + self.assertEqual(expected_user_var_event.value, 5.25) + self.assertEqual(expected_user_var_event.is_null, 0) + self.assertEqual(expected_user_var_event.type, 4) + self.assertEqual(expected_user_var_event.charset, 33) + def tearDown(self): self.execute("SET @@binlog_format='ROW'") self.assertEqual(self.bin_log_format(), "ROW") From 16de1f8bc1fbfb9cc437614cf121d03e73e0cc23 Mon Sep 17 00:00:00 2001 From: mjs Date: Sun, 20 Aug 2023 14:33:57 +0900 Subject: [PATCH 4/6] feat: modify int type for negative/positive and refactor decimal type Co-authored-by: starcat37 --- .github/workflows/pytest.yml | 14 +- .mariadb/my.cnf | 23 +++ .mariadb/no_encryption_key.key | 1 + README.md | 7 +- docker-compose-test.yml | 59 ++++++ docker-compose.yml | 20 ++ docs/developement.rst | 2 +- examples/mariadb_gtid/read_event.py | 8 +- pymysqlreplication/binlogstream.py | 163 +++++++++------- pymysqlreplication/bitmap.py | 2 +- pymysqlreplication/event.py | 248 ++++++++++++++++--------- pymysqlreplication/packet.py | 6 +- pymysqlreplication/row_event.py | 75 ++------ pymysqlreplication/tests/base.py | 38 +++- pymysqlreplication/tests/test_basic.py | 123 ++++++++++-- test.Dockerfile | 16 ++ 16 files changed, 555 insertions(+), 250 deletions(-) create mode 100644 .mariadb/my.cnf create mode 100755 .mariadb/no_encryption_key.key create mode 100644 docker-compose-test.yml create mode 100644 test.Dockerfile diff --git a/.github/workflows/pytest.yml b/.github/workflows/pytest.yml index 265c4d57..47ea5f40 100644 --- a/.github/workflows/pytest.yml +++ b/.github/workflows/pytest.yml @@ -28,9 +28,17 @@ jobs: run: | docker compose create docker compose start - # Wait for the services to accept connections, - # TODO: do that smarter, poll connection attempt until it succeeds - sleep 30 + echo "wait mysql server" + + while : + do + if mysql -h 127.0.0.1 --user=root --execute "SELECT version();" 2>&1 >/dev/null && mysql -h 127.0.0.1 --port=3307 --user=root --execute "SELECT version();" 2>&1 >/dev/null; then + break + fi + sleep 1 + done + + echo "run pytest" - name: Install dependencies run: | diff --git a/.mariadb/my.cnf b/.mariadb/my.cnf new file mode 100644 index 00000000..c530c80c --- /dev/null +++ b/.mariadb/my.cnf @@ -0,0 +1,23 @@ +[client-server] +# Port or socket location where to connect +# port = 3306 +socket = /run/mysqld/mysqld.sock + +# Import all .cnf files from configuration directory + +!includedir /etc/mysql/mariadb.conf.d/ +!includedir /etc/mysql/conf.d/ + + +[mariadb] +plugin_load_add = file_key_management +# Key files that are not encrypted +loose_file_key_management_filename = /opt/key_file/no_encryption_key.key + +# Encrypted key file +# loose_file_key_management_filename=/opt/key_file/keyfile.enc +# loose_file_key_management_filekey=FILE:/opt/key_file/no_encryption_key.key +# file_key_management_encryption_algorithm=aes_ctr + +# Set encrypt_binlog +encrypt_binlog=ON \ No newline at end of file diff --git a/.mariadb/no_encryption_key.key b/.mariadb/no_encryption_key.key new file mode 100755 index 00000000..476ede79 --- /dev/null +++ b/.mariadb/no_encryption_key.key @@ -0,0 +1 @@ +1;dda0ccb18a28b0b4c2448b5f0217a134 \ No newline at end of file diff --git a/README.md b/README.md index 78cf85a1..2bc28db5 100644 --- a/README.md +++ b/README.md @@ -4,7 +4,7 @@ python-mysql-replication   -Pure Python Implementation of MySQL replication protocol build on top of PyMYSQL. This allow you to receive event like insert, update, delete with their datas and raw SQL queries. +Pure Python Implementation of MySQL replication protocol build on top of PyMYSQL. This allows you to receive event like insert, update, delete with their datas and raw SQL queries. Use cases =========== @@ -56,6 +56,11 @@ Limitations https://python-mysql-replication.readthedocs.org/en/latest/limitations.html +Featured Books +============= + +[Data Pipelines Pocket Reference](https://www.oreilly.com/library/view/data-pipelines-pocket/9781492087823/) (by James Densmore, O'Reilly): Introduced and exemplified in Chapter 4: Data Ingestion: Extracting Data. + Projects using this library =========================== diff --git a/docker-compose-test.yml b/docker-compose-test.yml new file mode 100644 index 00000000..38d95827 --- /dev/null +++ b/docker-compose-test.yml @@ -0,0 +1,59 @@ +version: '3.2' +services: + percona-5.7: + platform: linux/amd64 + image: percona:5.7 + environment: + MYSQL_ALLOW_EMPTY_PASSWORD: true + MYSQL_DATABASE: pymysqlreplication_test + ports: + - 3306:3306 + command: mysqld --log-bin=mysql-bin.log --server-id 1 --binlog-format=row --gtid_mode=on --enforce-gtid-consistency=on --log_slave_updates + restart: always + networks: + - default + + percona-5.7-ctl: + image: percona:5.7 + environment: + MYSQL_ALLOW_EMPTY_PASSWORD: true + MYSQL_DATABASE: pymysqlreplication_test + ports: + - 3307:3307 + command: mysqld --log-bin=mysql-bin.log --server-id 1 --binlog-format=row --gtid_mode=on --enforce-gtid-consistency=on --log_slave_updates -P 3307 + + pymysqlreplication: + build: + context: . + dockerfile: test.Dockerfile + args: + BASE_IMAGE: python:3.11-alpine + MYSQL_5_7: percona-5.7 + MYSQL_5_7_CTL: percona-5.7-ctl + + command: + - /bin/sh + - -ce + - | + echo "wait mysql server" + + while : + do + if mysql -h percona-5.7 --user=root --execute "USE pymysqlreplication_test;" 2>&1 >/dev/null && mysql -h percona-5.7-ctl --port=3307 --user=root --execute "USE pymysqlreplication_test;" 2>&1 >/dev/null; then + break + fi + sleep 1 + done + + echo "run pytest" + pytest -k "not test_no_trailing_rotate_event and not test_end_log_pos" + + working_dir: /pymysqlreplication + networks: + - default + depends_on: + - percona-5.7 + - percona-5.7-ctl + +networks: + default: {} diff --git a/docker-compose.yml b/docker-compose.yml index d6449d2c..45b53c3d 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -15,3 +15,23 @@ services: ports: - 3307:3307 command: mysqld --log-bin=mysql-bin.log --server-id 1 --binlog-format=row --gtid_mode=on --enforce-gtid-consistency=on --log_slave_updates -P 3307 + + mariadb-10.6: + image: mariadb:10.6 + environment: + MARIADB_ALLOW_EMPTY_ROOT_PASSWORD: 1 + ports: + - "3308:3306" + command: | + --server-id=1 + --default-authentication-plugin=mysql_native_password + --log-bin=master-bin + --binlog-format=row + --log-slave-updates=on + volumes: + - type: bind + source: ./.mariadb + target: /opt/key_file + - type: bind + source: ./.mariadb/my.cnf + target: /etc/mysql/my.cnf diff --git a/docs/developement.rst b/docs/developement.rst index 30e257e0..68eecc21 100644 --- a/docs/developement.rst +++ b/docs/developement.rst @@ -23,7 +23,7 @@ When it's possible we have an unit test. *pymysqlreplication/tests/* contains the test suite. The test suite use the standard *unittest* Python module. -**Be carefull** tests will reset the binary log of your MySQL server. +**Be careful** tests will reset the binary log of your MySQL server. Make sure you have the following configuration set in your mysql config file (usually my.cnf on development env): diff --git a/examples/mariadb_gtid/read_event.py b/examples/mariadb_gtid/read_event.py index cc88a97f..49598c3f 100644 --- a/examples/mariadb_gtid/read_event.py +++ b/examples/mariadb_gtid/read_event.py @@ -1,7 +1,7 @@ import pymysql from pymysqlreplication import BinLogStreamReader, gtid -from pymysqlreplication.event import GtidEvent, RotateEvent, MariadbGtidEvent, QueryEvent +from pymysqlreplication.event import GtidEvent, RotateEvent, MariadbGtidEvent, QueryEvent,MariadbAnnotateRowsEvent from pymysqlreplication.row_event import WriteRowsEvent, UpdateRowsEvent, DeleteRowsEvent MARIADB_SETTINGS = { @@ -65,10 +65,12 @@ def query_server_id(self): RotateEvent, WriteRowsEvent, UpdateRowsEvent, - DeleteRowsEvent + DeleteRowsEvent, + MariadbAnnotateRowsEvent ], auto_position=gtid, - is_mariadb=True + is_mariadb=True, + annotate_rows_event=True ) print('Starting reading events from GTID ', gtid) diff --git a/pymysqlreplication/binlogstream.py b/pymysqlreplication/binlogstream.py index 136a5c06..16e1abd1 100644 --- a/pymysqlreplication/binlogstream.py +++ b/pymysqlreplication/binlogstream.py @@ -1,22 +1,22 @@ # -*- coding: utf-8 -*- -import pymysql import struct from distutils.version import LooseVersion +import pymysql from pymysql.constants.COMMAND import COM_BINLOG_DUMP, COM_REGISTER_SLAVE from pymysql.cursors import DictCursor -from .packet import BinLogPacketWrapper from .constants.BINLOG import TABLE_MAP_EVENT, ROTATE_EVENT, FORMAT_DESCRIPTION_EVENT -from .gtid import GtidSet from .event import ( QueryEvent, RotateEvent, FormatDescriptionEvent, XidEvent, GtidEvent, StopEvent, XAPrepareEvent, BeginLoadQueryEvent, ExecuteLoadQueryEvent, - HeartbeatLogEvent, NotImplementedEvent, - MariadbGtidEvent, RandEvent, UserVarEvent) + HeartbeatLogEvent, NotImplementedEvent, MariadbGtidEvent, + MariadbAnnotateRowsEvent, RandEvent, MariadbStartEncryptionEvent, RowsQueryLogEvent, UserVarEvent) from .exceptions import BinLogNotEnabled +from .gtid import GtidSet +from .packet import BinLogPacketWrapper from .row_event import ( UpdateRowsEvent, WriteRowsEvent, DeleteRowsEvent, TableMapEvent) @@ -33,7 +33,6 @@ class ReportSlave(object): - """Represent the values that you may report when connecting as a slave to a master. SHOW SLAVE HOSTS related""" @@ -68,7 +67,7 @@ def __init__(self, value): self.hostname = value def __repr__(self): - return '' %\ + return '' % \ (self.hostname, self.username, self.password, self.port) def encoded(self, server_id, master_id=0): @@ -123,7 +122,6 @@ def encoded(self, server_id, master_id=0): class BinLogStreamReader(object): - """Connect to replication stream and read event """ report_slave = None @@ -142,6 +140,7 @@ def __init__(self, connection_settings, server_id, fail_on_table_metadata_unavailable=False, slave_heartbeat=None, is_mariadb=False, + annotate_rows_event=False, ignore_decode_errors=False): """ Attributes: @@ -167,7 +166,8 @@ def __init__(self, connection_settings, server_id, skip_to_timestamp: Ignore all events until reaching specified timestamp. report_slave: Report slave in SHOW SLAVE HOSTS. - slave_uuid: Report slave_uuid in SHOW SLAVE HOSTS. + slave_uuid: Report slave_uuid or replica_uuid in SHOW SLAVE HOSTS(MySQL 8.0.21-) or + SHOW REPLICAS(MySQL 8.0.22+) depends on your MySQL version. fail_on_table_metadata_unavailable: Should raise exception if we can't get table information on row_events @@ -179,6 +179,8 @@ def __init__(self, connection_settings, server_id, for semantics is_mariadb: Flag to indicate it's a MariaDB server, used with auto_position to point to Mariadb specific GTID. + annotate_rows_event: Parameter value to enable annotate rows event in mariadb, + used with 'is_mariadb' ignore_decode_errors: If true, any decode errors encountered when reading column data will be ignored. """ @@ -220,6 +222,7 @@ def __init__(self, connection_settings, server_id, self.auto_position = auto_position self.skip_to_timestamp = skip_to_timestamp self.is_mariadb = is_mariadb + self.__annotate_rows_event = annotate_rows_event if end_log_pos: self.is_past_end_log_pos = False @@ -251,6 +254,7 @@ def __connect_to_ctl(self): self._ctl_connection_settings = dict(self.__connection_settings) self._ctl_connection_settings["db"] = "information_schema" self._ctl_connection_settings["cursorclass"] = DictCursor + self._ctl_connection_settings["autocommit"] = True self._ctl_connection = self.pymysql_wrapper(**self._ctl_connection_settings) self._ctl_connection._get_table_information = self.__get_table_information self.__connected_ctl = True @@ -297,12 +301,12 @@ def __connect_to_stream(self): # we support it if self.__use_checksum: cur = self._stream_connection.cursor() - cur.execute("set @master_binlog_checksum= @@global.binlog_checksum") + cur.execute("SET @master_binlog_checksum= @@global.binlog_checksum") cur.close() if self.slave_uuid: cur = self._stream_connection.cursor() - cur.execute("set @slave_uuid= '%s'" % self.slave_uuid) + cur.execute("SET @slave_uuid = %s, @replica_uuid = %s", (self.slave_uuid, self.slave_uuid)) cur.close() if self.slave_heartbeat: @@ -311,14 +315,14 @@ def __connect_to_stream(self): 4294967)) # If heartbeat is too low, the connection will disconnect before, # this is also the behavior in mysql - heartbeat = float(min(net_timeout/2., self.slave_heartbeat)) + heartbeat = float(min(net_timeout / 2., self.slave_heartbeat)) if heartbeat > 4294967: heartbeat = 4294967 # master_heartbeat_period is nanoseconds heartbeat = int(heartbeat * 1000000000) cur = self._stream_connection.cursor() - cur.execute("set @master_heartbeat_period= %d" % heartbeat) + cur.execute("SET @master_heartbeat_period= %d" % heartbeat) cur.close() # When replicating from Mariadb 10.6.12 using binlog coordinates, a slave capability < 4 triggers a bug in @@ -332,67 +336,39 @@ def __connect_to_stream(self): self._register_slave() if not self.auto_position: - # only when log_file and log_pos both provided, the position info is - # valid, if not, get the current position from master - if self.log_file is None or self.log_pos is None: - cur = self._stream_connection.cursor() - cur.execute("SHOW MASTER STATUS") - master_status = cur.fetchone() - if master_status is None: - raise BinLogNotEnabled() - self.log_file, self.log_pos = master_status[:2] - cur.close() - - prelude = struct.pack(' 0: + value = self.packet.read_int_be_by_size(size) ^ mask + res += str(value) + + for i in range(0, uncomp_integral): + value = struct.unpack('>i', self.packet.read(4))[0] ^ mask + res += '%09d' % value + + res += "." + + for i in range(0, uncomp_fractional): + value = struct.unpack('>i', self.packet.read(4))[0] ^ mask + res += '%09d' % value + + size = compressed_bytes[comp_fractional] + if size > 0: + value = self.packet.read_int_be_by_size(size) ^ mask + res += '%0*d' % (comp_fractional, value) + + return decimal.Decimal(res) class GtidEvent(BinLogEvent): """GTID change in binlog event """ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs): - super(GtidEvent, self).__init__(from_packet, event_size, table_map, + super().__init__(from_packet, event_size, table_map, ctl_connection, **kwargs) self.commit_flag = struct.unpack("!B", self.packet.read(1))[0] == 1 @@ -98,7 +146,7 @@ class MariadbGtidEvent(BinLogEvent): """ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs): - super(MariadbGtidEvent, self).__init__(from_packet, event_size, table_map, ctl_connection, **kwargs) + super().__init__(from_packet, event_size, table_map, ctl_connection, **kwargs) self.server_id = self.packet.server_id self.gtid_seq_no = self.packet.read_uint64() @@ -107,11 +155,29 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs) self.gtid = "%d-%d-%d" % (self.domain_id, self.server_id, self.gtid_seq_no) def _dump(self): - super(MariadbGtidEvent, self)._dump() + super()._dump() print("Flags:", self.flags) print('GTID:', self.gtid) +class MariadbAnnotateRowsEvent(BinLogEvent): + """ + Annotate rows event + If you want to check this binlog, change the value of the flag(line 382 of the 'binlogstream.py') option to 2 + https://mariadb.com/kb/en/annotate_rows_event/ + + Attributes: + sql_statement: The SQL statement + """ + def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs): + super().__init__(from_packet, event_size, table_map, ctl_connection, **kwargs) + self.sql_statement = self.packet.read(event_size) + + def _dump(self): + super()._dump() + print("SQL statement :", self.sql_statement) + + class RotateEvent(BinLogEvent): """Change MySQL bin log file @@ -120,7 +186,7 @@ class RotateEvent(BinLogEvent): next_binlog: Name of next binlog file """ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs): - super(RotateEvent, self).__init__(from_packet, event_size, table_map, + super().__init__(from_packet, event_size, table_map, ctl_connection, **kwargs) self.position = struct.unpack(' 0: - databuff = bytearray(data[:size]) - for i in range(size): - databuff[i] ^= mask - return size, int.from_bytes(databuff, byteorder='big') - return 0, 0 + This event is written just once, after the Format Description event - pointer, value = decode_decimal_decompress_value(comp_integral, raw_decimal, mask) - res += str(value) + Attributes: + schema: The Encryption scheme, always set to 1 for system files. + key_version: The Encryption key version. + nonce: Nonce (12 random bytes) of current binlog file. + """ - for _ in range(uncomp_integral): - value = struct.unpack('>i', raw_decimal[pointer:pointer+4])[0] ^ mask - res += '%09d' % value - pointer += 4 + def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs): + super().__init__(from_packet, event_size, table_map, ctl_connection, **kwargs) - res += "." + self.schema = self.packet.read_uint8() + self.key_version = self.packet.read_uint32() + self.nonce = self.packet.read(12) - for _ in range(uncomp_fractional): - value = struct.unpack('>i', raw_decimal[pointer:pointer+4])[0] ^ mask - res += '%09d' % value - pointer += 4 + def _dump(self): + print("Schema: %d" % self.schema) + print("Key version: %d" % self.key_version) + print(f"Nonce: {self.nonce}") - size, value = decode_decimal_decompress_value(comp_fractional, raw_decimal[pointer:], mask) - if size > 0: - res += '%0*d' % (comp_fractional, value) - return decimal.Decimal(res) - - def _dump(self): - super(UserVarEvent, self)._dump() - print("User variable name: %s" % self.name) - print("Is NULL: %s" % ("Yes" if self.is_null else "No")) - if not self.is_null: - print("Type: %s" % self.type_codes.get(self.type, 'UNKNOWN_TYPE')) - print("Charset: %s" % self.charset) - print("Value: %s" % self.value) - if self.flags is not None: - print("Flags: %s" % self.flags) +class RowsQueryLogEvent(BinLogEvent): + """ + Record original query for the row events in Row-Based Replication + + More details are available in the MySQL Knowledge Base: + https://dev.mysql.com/doc/dev/mysql-server/latest/classRows__query__log__event.html + + :ivar query_length: uint - Length of the SQL statement + :ivar query: str - The executed SQL statement + """ + def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs): + super(RowsQueryLogEvent, self).__init__(from_packet, event_size, table_map, + ctl_connection, **kwargs) + self.query_length = self.packet.read_uint8() + self.query = self.packet.read(self.query_length).decode('utf-8') + def dump(self): + print("=== %s ===" % (self.__class__.__name__)) + print("Query length: %d" % self.query_length) + print("Query: %s" % self.query) + class NotImplementedEvent(BinLogEvent): def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs): - super(NotImplementedEvent, self).__init__( + super().__init__( from_packet, event_size, table_map, ctl_connection, **kwargs) self.packet.advance(event_size) diff --git a/pymysqlreplication/packet.py b/pymysqlreplication/packet.py index b11e3d8c..d1fb369f 100644 --- a/pymysqlreplication/packet.py +++ b/pymysqlreplication/packet.py @@ -70,6 +70,7 @@ class BinLogPacketWrapper(object): constants.EXECUTE_LOAD_QUERY_EVENT: event.ExecuteLoadQueryEvent, constants.HEARTBEAT_LOG_EVENT: event.HeartbeatLogEvent, constants.XA_PREPARE_EVENT: event.XAPrepareEvent, + constants.ROWS_QUERY_LOG_EVENT: event.RowsQueryLogEvent, constants.RAND_EVENT: event.RandEvent, constants.USER_VAR_EVENT: event.UserVarEvent, # row_event @@ -83,13 +84,14 @@ class BinLogPacketWrapper(object): #5.6 GTID enabled replication events constants.ANONYMOUS_GTID_LOG_EVENT: event.NotImplementedEvent, + constants.ANONYMOUS_GTID_LOG_EVENT: event.NotImplementedEvent, constants.PREVIOUS_GTIDS_LOG_EVENT: event.NotImplementedEvent, # MariaDB GTID - constants.MARIADB_ANNOTATE_ROWS_EVENT: event.NotImplementedEvent, + constants.MARIADB_ANNOTATE_ROWS_EVENT: event.MariadbAnnotateRowsEvent, constants.MARIADB_BINLOG_CHECKPOINT_EVENT: event.NotImplementedEvent, constants.MARIADB_GTID_EVENT: event.MariadbGtidEvent, constants.MARIADB_GTID_GTID_LIST_EVENT: event.NotImplementedEvent, - constants.MARIADB_START_ENCRYPTION_EVENT: event.NotImplementedEvent + constants.MARIADB_START_ENCRYPTION_EVENT: event.MariadbStartEncryptionEvent } def __init__(self, from_packet, table_map, diff --git a/pymysqlreplication/row_event.py b/pymysqlreplication/row_event.py index cfbfbd21..7b1db1b0 100644 --- a/pymysqlreplication/row_event.py +++ b/pymysqlreplication/row_event.py @@ -17,7 +17,7 @@ class RowsEvent(BinLogEvent): def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs): - super(RowsEvent, self).__init__(from_packet, event_size, table_map, + super().__init__(from_packet, event_size, table_map, ctl_connection, **kwargs) self.__rows = None self.__only_tables = kwargs["only_tables"] @@ -164,7 +164,7 @@ def _read_column_data(self, cols_bitmap): values[name] += b'\x00' * nr_pad elif column.type == FIELD_TYPE.NEWDECIMAL: - values[name] = self.__read_new_decimal(column) + values[name] = self._read_new_decimal(column.precision, column.decimals) elif column.type == FIELD_TYPE.BLOB: values[name] = self.__read_string(column.length_size, column) elif column.type == FIELD_TYPE.DATETIME: @@ -386,55 +386,8 @@ def __read_datetime2(self, column): return None return self.__add_fsp_to_time(t, column) - def __read_new_decimal(self, column): - """Read MySQL's new decimal format introduced in MySQL 5""" - - # This project was a great source of inspiration for - # understanding this storage format. - # https://github.com/jeremycole/mysql_binlog - - digits_per_integer = 9 - compressed_bytes = [0, 1, 1, 2, 2, 3, 3, 4, 4, 4] - integral = (column.precision - column.decimals) - uncomp_integral = int(integral / digits_per_integer) - uncomp_fractional = int(column.decimals / digits_per_integer) - comp_integral = integral - (uncomp_integral * digits_per_integer) - comp_fractional = column.decimals - (uncomp_fractional - * digits_per_integer) - - # Support negative - # The sign is encoded in the high bit of the the byte - # But this bit can also be used in the value - value = self.packet.read_uint8() - if value & 0x80 != 0: - res = "" - mask = 0 - else: - mask = -1 - res = "-" - self.packet.unread(struct.pack(' 0: - value = self.packet.read_int_be_by_size(size) ^ mask - res += str(value) - - for i in range(0, uncomp_integral): - value = struct.unpack('>i', self.packet.read(4))[0] ^ mask - res += '%09d' % value - - res += "." - - for i in range(0, uncomp_fractional): - value = struct.unpack('>i', self.packet.read(4))[0] ^ mask - res += '%09d' % value - - size = compressed_bytes[comp_fractional] - if size > 0: - value = self.packet.read_int_be_by_size(size) ^ mask - res += '%0*d' % (comp_fractional, value) - - return decimal.Decimal(res) + def _read_new_decimal(self, precision, decimals): + return super()._read_new_decimal(precision, decimals) def __read_binary_slice(self, binary, start, size, data_length): """ @@ -449,7 +402,7 @@ def __read_binary_slice(self, binary, start, size, data_length): return binary & mask def _dump(self): - super(RowsEvent, self)._dump() + super()._dump() print("Table: %s.%s" % (self.schema, self.table)) print("Affected columns: %d" % self.number_of_columns) print("Changed rows: %d" % (len(self.rows))) @@ -477,7 +430,7 @@ class DeleteRowsEvent(RowsEvent): """ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs): - super(DeleteRowsEvent, self).__init__(from_packet, event_size, + super().__init__(from_packet, event_size, table_map, ctl_connection, **kwargs) if self._processed: self.columns_present_bitmap = self.packet.read( @@ -490,7 +443,7 @@ def _fetch_one_row(self): return row def _dump(self): - super(DeleteRowsEvent, self)._dump() + super()._dump() print("Values:") for row in self.rows: print("--") @@ -505,7 +458,7 @@ class WriteRowsEvent(RowsEvent): """ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs): - super(WriteRowsEvent, self).__init__(from_packet, event_size, + super().__init__(from_packet, event_size, table_map, ctl_connection, **kwargs) if self._processed: self.columns_present_bitmap = self.packet.read( @@ -518,7 +471,7 @@ def _fetch_one_row(self): return row def _dump(self): - super(WriteRowsEvent, self)._dump() + super()._dump() print("Values:") for row in self.rows: print("--") @@ -538,7 +491,7 @@ class UpdateRowsEvent(RowsEvent): """ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs): - super(UpdateRowsEvent, self).__init__(from_packet, event_size, + super().__init__(from_packet, event_size, table_map, ctl_connection, **kwargs) if self._processed: #Body @@ -556,7 +509,7 @@ def _fetch_one_row(self): return row def _dump(self): - super(UpdateRowsEvent, self)._dump() + super()._dump() print("Affected columns: %d" % self.number_of_columns) print("Values:") for row in self.rows: @@ -574,7 +527,7 @@ class TableMapEvent(BinLogEvent): """ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs): - super(TableMapEvent, self).__init__(from_packet, event_size, + super().__init__(from_packet, event_size, table_map, ctl_connection, **kwargs) self.__only_tables = kwargs["only_tables"] self.__ignored_tables = kwargs["ignored_tables"] @@ -642,7 +595,7 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs) ordinal_pos_loc += 1 except IndexError: - # this a dirty hack to prevent row events containing columns which have been dropped prior + # this is a dirty hack to prevent row events containing columns which have been dropped prior # to pymysqlreplication start, but replayed from binlog from blowing up the service. # TODO: this does not address the issue if the column other than the last one is dropped column_schema = { @@ -669,7 +622,7 @@ def get_table(self): return self.table_obj def _dump(self): - super(TableMapEvent, self)._dump() + super()._dump() print("Table id: %d" % (self.table_id)) print("Schema: %s" % (self.schema)) print("Table: %s" % (self.table)) diff --git a/pymysqlreplication/tests/base.py b/pymysqlreplication/tests/base.py index 037c6d9d..009c3e29 100644 --- a/pymysqlreplication/tests/base.py +++ b/pymysqlreplication/tests/base.py @@ -19,11 +19,9 @@ def ignoredEvents(self): return [] def setUp(self): - - db = os.environ.get('DB') # default self.database = { - "host": "localhost", + "host": os.environ.get("MYSQL_5_7") or "localhost", "user": "root", "passwd": "", "port": 3306, @@ -67,7 +65,7 @@ def isMySQL80AndMore(self): def isMariaDB(self): if self.__is_mariaDB is None: - self.__is_mariaDB = "MariaDB" in self.execute("SELECT VERSION()").fetchone() + self.__is_mariaDB = "MariaDB" in self.execute("SELECT VERSION()").fetchone()[0] return self.__is_mariaDB @property @@ -108,16 +106,42 @@ def set_sql_mode(self): """set sql_mode to test with same sql_mode (mysql 5.7 sql_mode default is changed)""" version = float(self.getMySQLVersion().rsplit('.', 1)[0]) if version == 5.7: - self.execute("set @@sql_mode='NO_ENGINE_SUBSTITUTION'") + self.execute("SET @@sql_mode='NO_ENGINE_SUBSTITUTION'") def bin_log_format(self): - query = "select @@binlog_format" + query = "SELECT @@binlog_format" cursor = self.execute(query) result = cursor.fetchone() return result[0] def bin_log_basename(self): - cursor = self.execute('select @@log_bin_basename') + cursor = self.execute('SELECT @@log_bin_basename') bin_log_basename = cursor.fetchone()[0] bin_log_basename = bin_log_basename.split("/")[-1] return bin_log_basename + + +class PyMySQLReplicationMariaDbTestCase(PyMySQLReplicationTestCase): + def setUp(self): + # default + self.database = { + "host": "localhost", + "user": "root", + "passwd": "", + "port": 3308, + "use_unicode": True, + "charset": "utf8", + "db": "pymysqlreplication_test" + } + + self.conn_control = None + db = copy.copy(self.database) + db["db"] = None + self.connect_conn_control(db) + self.execute("DROP DATABASE IF EXISTS pymysqlreplication_test") + self.execute("CREATE DATABASE pymysqlreplication_test") + db = copy.copy(self.database) + self.connect_conn_control(db) + self.stream = None + self.resetBinLog() + \ No newline at end of file diff --git a/pymysqlreplication/tests/test_basic.py b/pymysqlreplication/tests/test_basic.py index 9440beed..a96976f5 100644 --- a/pymysqlreplication/tests/test_basic.py +++ b/pymysqlreplication/tests/test_basic.py @@ -1,9 +1,12 @@ # -*- coding: utf-8 -*- -import pymysql import copy -import time -import sys import io +import os +import sys +import time + +import pymysql + if sys.version_info < (2, 7): import unittest2 as unittest else: @@ -13,11 +16,10 @@ from pymysqlreplication import BinLogStreamReader from pymysqlreplication.gtid import GtidSet, Gtid from pymysqlreplication.event import * -from pymysqlreplication.exceptions import TableMetadataUnavailableError from pymysqlreplication.constants.BINLOG import * from pymysqlreplication.row_event import * -__all__ = ["TestBasicBinLogStreamReader", "TestMultipleRowBinLogStreamReader", "TestCTLConnectionSettings", "TestGtidBinLogStreamReader", "TestStatementConnectionSetting"] +__all__ = ["TestBasicBinLogStreamReader", "TestMultipleRowBinLogStreamReader", "TestCTLConnectionSettings", "TestGtidBinLogStreamReader", "TestMariadbBinlogStreamReader", "TestStatementConnectionSetting", "TestRowsQueryLogEvents"] class TestBasicBinLogStreamReader(base.PyMySQLReplicationTestCase): @@ -25,9 +27,9 @@ def ignoredEvents(self): return [GtidEvent] def test_allowed_event_list(self): - self.assertEqual(len(self.stream._allowed_event_list(None, None, False)), 18) - self.assertEqual(len(self.stream._allowed_event_list(None, None, True)), 17) - self.assertEqual(len(self.stream._allowed_event_list(None, [RotateEvent], False)), 17) + self.assertEqual(len(self.stream._allowed_event_list(None, None, False)), 21) + self.assertEqual(len(self.stream._allowed_event_list(None, None, True)), 20) + self.assertEqual(len(self.stream._allowed_event_list(None, [RotateEvent], False)), 20) self.assertEqual(len(self.stream._allowed_event_list([RotateEvent], None, False)), 1) def test_read_query_event(self): @@ -520,6 +522,7 @@ def test_end_log_pos(self): self.assertEqual(last_log_pos, 888) self.assertEqual(last_event_type, TABLE_MAP_EVENT) + class TestMultipleRowBinLogStreamReader(base.PyMySQLReplicationTestCase): def ignoredEvents(self): return [GtidEvent] @@ -760,14 +763,17 @@ def test_alter_column(self): self.assertEqual(event.rows[0]["values"]["data"], 'A value') self.stream.fetchone() # insert with three values + class TestCTLConnectionSettings(base.PyMySQLReplicationTestCase): def setUp(self): - super(TestCTLConnectionSettings, self).setUp() + super().setUp() self.stream.close() ctl_db = copy.copy(self.database) ctl_db["db"] = None ctl_db["port"] = 3307 + if os.environ.get("MYSQL_5_7_CTL") is not None: + ctl_db["host"] = os.environ.get("MYSQL_5_7_CTL") self.ctl_conn_control = pymysql.connect(**ctl_db) self.ctl_conn_control.cursor().execute("DROP DATABASE IF EXISTS pymysqlreplication_test") self.ctl_conn_control.cursor().execute("CREATE DATABASE pymysqlreplication_test") @@ -783,7 +789,7 @@ def setUp(self): ) def tearDown(self): - super(TestCTLConnectionSettings, self).tearDown() + super().tearDown() self.ctl_conn_control.close() def test_separate_ctl_settings_table_metadata_unavailable(self): @@ -816,9 +822,10 @@ def test_separate_ctl_settings_no_error(self): finally: self.resetBinLog() + class TestGtidBinLogStreamReader(base.PyMySQLReplicationTestCase): def setUp(self): - super(TestGtidBinLogStreamReader, self).setUp() + super().setUp() if not self.supportsGTID: raise unittest.SkipTest("database does not support GTID, skipping GTID tests") @@ -935,6 +942,7 @@ def test_gtidset_representation_payload(self): self.assertEqual(str(myset), str(parsedset)) + class GtidTests(unittest.TestCase): def test_ordering(self): gtid = Gtid("57b70f4e-20d3-11e5-a393-4a63946f7eac:1-56") @@ -1061,7 +1069,7 @@ def test_user_var_real_event(self): self.assertIsInstance(expected_user_var_event, UserVarEvent) self.assertIsInstance(expected_user_var_event.name_len, int) self.assertEqual(expected_user_var_event.name, "test_user_var") - self.assertIsInstance(expected_user_var_event.value, str) + self.assertIsInstance(expected_user_var_event.value,float) self.assertEqual(expected_user_var_event.is_null, 0) self.assertEqual(expected_user_var_event.type, 1) self.assertEqual(expected_user_var_event.charset, 33) @@ -1080,7 +1088,7 @@ def test_user_var_int_event(self): self.assertIsInstance(expected_user_var_event, UserVarEvent) self.assertIsInstance(expected_user_var_event.name_len, int) self.assertEqual(expected_user_var_event.name, "test_user_var") - self.assertEqual(expected_user_var_event.value, '5') # TODO: have to fix to int 5 + self.assertEqual(expected_user_var_event.value, 5) self.assertEqual(expected_user_var_event.is_null, 0) self.assertEqual(expected_user_var_event.type, 2) self.assertEqual(expected_user_var_event.charset, 33) @@ -1109,6 +1117,95 @@ def tearDown(self): self.assertEqual(self.bin_log_format(), "ROW") super(TestStatementConnectionSetting, self).tearDown() +class TestMariadbBinlogStreamReader(base.PyMySQLReplicationMariaDbTestCase): + + def test_annotate_rows_event(self): + query = "CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" + self.execute(query) + # Insert first event + query = "BEGIN;" + self.execute(query) + insert_query = b"INSERT INTO test (id, data) VALUES(1, 'Hello')" + self.execute(insert_query) + query = "COMMIT;" + self.execute(query) + + self.stream.close() + self.stream = BinLogStreamReader( + self.database, + server_id=1024, + blocking=False, + only_events=[MariadbAnnotateRowsEvent], + is_mariadb=True, + annotate_rows_event=True, + ) + + event = self.stream.fetchone() + #Check event type 160,MariadbAnnotateRowsEvent + self.assertEqual(event.event_type,160) + #Check self.sql_statement + self.assertEqual(event.sql_statement,insert_query) + self.assertIsInstance(event,MariadbAnnotateRowsEvent) + + def test_start_encryption_event(self): + query = "CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" + self.execute(query) + query = "INSERT INTO test (data) VALUES('Hello World')" + self.execute(query) + self.execute("COMMIT") + + self.assertIsInstance(self.stream.fetchone(), RotateEvent) + self.assertIsInstance(self.stream.fetchone(), FormatDescriptionEvent) + + start_encryption_event = self.stream.fetchone() + self.assertIsInstance(start_encryption_event, MariadbStartEncryptionEvent) + + schema = start_encryption_event.schema + key_version = start_encryption_event.key_version + nonce = start_encryption_event.nonce + + from pathlib import Path + + encryption_key_file_path = Path(__file__).parent.parent.parent + + try: + with open(f"{encryption_key_file_path}/.mariadb/no_encryption_key.key", "r") as key_file: + first_line = key_file.readline() + key_version_from_key_file = int(first_line.split(";")[0]) + except Exception as e: + self.fail("raised unexpected exception: {exception}".format(exception=e)) + finally: + self.resetBinLog() + + # schema is always 1 + self.assertEqual(schema, 1) + self.assertEqual(key_version, key_version_from_key_file) + self.assertEqual(type(nonce), bytes) + self.assertEqual(len(nonce), 12) + + +class TestRowsQueryLogEvents(base.PyMySQLReplicationTestCase): + def setUp(self): + super(TestRowsQueryLogEvents, self).setUp() + self.execute("SET SESSION binlog_rows_query_log_events=1") + + def tearDown(self): + self.execute("SET SESSION binlog_rows_query_log_events=0") + super(TestRowsQueryLogEvents, self).tearDown() + + def test_rows_query_log_event(self): + self.stream.close() + self.stream = BinLogStreamReader( + self.database, + server_id=1024, + only_events=[RowsQueryLogEvent], + ) + self.execute("CREATE TABLE IF NOT EXISTS test (id INT AUTO_INCREMENT PRIMARY KEY, name VARCHAR(255))") + self.execute("INSERT INTO test (name) VALUES ('Soul Lee')") + self.execute("COMMIT") + event = self.stream.fetchone() + self.assertIsInstance(event, RowsQueryLogEvent) + if __name__ == "__main__": import unittest diff --git a/test.Dockerfile b/test.Dockerfile new file mode 100644 index 00000000..6aa2e34a --- /dev/null +++ b/test.Dockerfile @@ -0,0 +1,16 @@ +ARG BASE_IMAGE=${BASE_IMAGE:-python:3.11-alpine} +FROM ${BASE_IMAGE} + +COPY pymysqlreplication pymysqlreplication +COPY README.md README.md +COPY setup.py setup.py +RUN apk add bind-tools +RUN apk add mysql-client +RUN pip install . +RUN pip install pytest + +ARG MYSQL_5_7 +ENV MYSQL_5_7 ${MYSQL_5_7} + +ARG MYSQL_5_7_CTL +ENV MYSQL_5_7_CTL ${MYSQL_5_7_CTL} \ No newline at end of file From 0595b76e8484c3a2d3afb4dd2799ddcac2eec69c Mon Sep 17 00:00:00 2001 From: mjs Date: Thu, 31 Aug 2023 19:11:28 +0900 Subject: [PATCH 5/6] feat: use temporary buffer for flags and add testcases Co-authored-by: minakiz --- README.md | 6 +- docker-compose-test.yml | 68 ++++-- docker-compose.yml | 46 ++-- examples/mariadb_gtid/read_event.py | 3 +- pymysqlreplication/binlogstream.py | 10 +- pymysqlreplication/event.py | 238 ++++++++++++++------- pymysqlreplication/packet.py | 3 +- pymysqlreplication/row_event.py | 282 +++++++++++++++---------- pymysqlreplication/tests/base.py | 11 +- pymysqlreplication/tests/test_basic.py | 197 +++++++++++++++-- test.Dockerfile | 12 +- 11 files changed, 621 insertions(+), 255 deletions(-) diff --git a/README.md b/README.md index 2bc28db5..e214145d 100644 --- a/README.md +++ b/README.md @@ -56,11 +56,14 @@ Limitations https://python-mysql-replication.readthedocs.org/en/latest/limitations.html -Featured Books +Featured ============= [Data Pipelines Pocket Reference](https://www.oreilly.com/library/view/data-pipelines-pocket/9781492087823/) (by James Densmore, O'Reilly): Introduced and exemplified in Chapter 4: Data Ingestion: Extracting Data. +[Streaming Changes in a Database with Amazon Kinesis](https://aws.amazon.com/blogs/database/streaming-changes-in-a-database-with-amazon-kinesis/) (by Emmanuel Espina, Amazon Web Services) + + Projects using this library =========================== @@ -84,6 +87,7 @@ Projects using this library * MySQL to Kafka (https://github.com/scottpersinger/mysql-to-kafka/) * Aventri MySQL Monitor (https://github.com/aventri/mysql-monitor) * BitSwanPump: A real-time stream processor (https://github.com/LibertyAces/BitSwanPump) +* clickhouse-mysql-data-reader: https://github.com/Altinity/clickhouse-mysql-data-reader MySQL server settings ========================= diff --git a/docker-compose-test.yml b/docker-compose-test.yml index 38d95827..3634e304 100644 --- a/docker-compose-test.yml +++ b/docker-compose-test.yml @@ -1,26 +1,56 @@ -version: '3.2' +version: '3.4' + +x-mysql: &mysql + environment: + MYSQL_ALLOW_EMPTY_PASSWORD: true + command: > + mysqld + --log-bin=mysql-bin.log + --server-id 1 + --binlog-format=row + --gtid_mode=on + --enforce-gtid-consistency=on + +x-mariadb: &mariadb + environment: + MARIADB_ALLOW_EMPTY_ROOT_PASSWORD: 1 + command: > + --server-id=1 + --default-authentication-plugin=mysql_native_password + --log-bin=master-bin + --binlog-format=row + services: - percona-5.7: - platform: linux/amd64 + percona-5.7-ctl: + <<: *mysql image: percona:5.7 - environment: - MYSQL_ALLOW_EMPTY_PASSWORD: true - MYSQL_DATABASE: pymysqlreplication_test ports: - - 3306:3306 - command: mysqld --log-bin=mysql-bin.log --server-id 1 --binlog-format=row --gtid_mode=on --enforce-gtid-consistency=on --log_slave_updates - restart: always + - "3307:3306" networks: - default - percona-5.7-ctl: + percona-5.7: + <<: *mysql image: percona:5.7 - environment: - MYSQL_ALLOW_EMPTY_PASSWORD: true - MYSQL_DATABASE: pymysqlreplication_test ports: - - 3307:3307 - command: mysqld --log-bin=mysql-bin.log --server-id 1 --binlog-format=row --gtid_mode=on --enforce-gtid-consistency=on --log_slave_updates -P 3307 + - "3306:3306" + networks: + - default + + mariadb-10.6: + <<: *mariadb + image: mariadb:10.6 + ports: + - "3308:3306" + volumes: + - type: bind + source: ./.mariadb + target: /opt/key_file + - type: bind + source: ./.mariadb/my.cnf + target: /etc/mysql/my.cnf + networks: + - default pymysqlreplication: build: @@ -30,6 +60,9 @@ services: BASE_IMAGE: python:3.11-alpine MYSQL_5_7: percona-5.7 MYSQL_5_7_CTL: percona-5.7-ctl + MYSQL_5_7_CTL_PORT: 3306 + MARIADB_10_6: mariadb-10.6 + MARIADB_10_6_PORT: 3306 command: - /bin/sh @@ -39,7 +72,7 @@ services: while : do - if mysql -h percona-5.7 --user=root --execute "USE pymysqlreplication_test;" 2>&1 >/dev/null && mysql -h percona-5.7-ctl --port=3307 --user=root --execute "USE pymysqlreplication_test;" 2>&1 >/dev/null; then + if mysql -h percona-5.7 --user=root --execute "SELECT version();" 2>&1 >/dev/null && mysql -h percona-5.7-ctl --user=root --execute "SELECT version();" 2>&1 >/dev/null; then break fi sleep 1 @@ -56,4 +89,5 @@ services: - percona-5.7-ctl networks: - default: {} + default: + driver: bridge diff --git a/docker-compose.yml b/docker-compose.yml index 45b53c3d..9e68758c 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,37 +1,47 @@ -version: '3.2' +version: '3.4' + +x-mysql: &mysql + environment: + MYSQL_ALLOW_EMPTY_PASSWORD: true + command: > + mysqld + --log-bin=mysql-bin.log + --server-id 1 + --binlog-format=row + --gtid_mode=on + --enforce-gtid-consistency=on + +x-mariadb: &mariadb + environment: + MARIADB_ALLOW_EMPTY_ROOT_PASSWORD: 1 + command: > + --log-bin=master-bin + --server-id=1 + --default-authentication-plugin=mysql_native_password + --binlog-format=row + services: percona-5.7: + <<: *mysql image: percona:5.7 - environment: - MYSQL_ALLOW_EMPTY_PASSWORD: true ports: - - 3306:3306 - command: mysqld --log-bin=mysql-bin.log --server-id 1 --binlog-format=row --gtid_mode=on --enforce-gtid-consistency=on --log_slave_updates + - "3306:3306" percona-5.7-ctl: + <<: *mysql image: percona:5.7 - environment: - MYSQL_ALLOW_EMPTY_PASSWORD: true ports: - - 3307:3307 - command: mysqld --log-bin=mysql-bin.log --server-id 1 --binlog-format=row --gtid_mode=on --enforce-gtid-consistency=on --log_slave_updates -P 3307 + - "3307:3306" mariadb-10.6: + <<: *mariadb image: mariadb:10.6 - environment: - MARIADB_ALLOW_EMPTY_ROOT_PASSWORD: 1 ports: - "3308:3306" - command: | - --server-id=1 - --default-authentication-plugin=mysql_native_password - --log-bin=master-bin - --binlog-format=row - --log-slave-updates=on volumes: - type: bind source: ./.mariadb target: /opt/key_file - type: bind source: ./.mariadb/my.cnf - target: /etc/mysql/my.cnf + target: /etc/mysql/my.cnf diff --git a/examples/mariadb_gtid/read_event.py b/examples/mariadb_gtid/read_event.py index 49598c3f..607bfa34 100644 --- a/examples/mariadb_gtid/read_event.py +++ b/examples/mariadb_gtid/read_event.py @@ -1,7 +1,7 @@ import pymysql from pymysqlreplication import BinLogStreamReader, gtid -from pymysqlreplication.event import GtidEvent, RotateEvent, MariadbGtidEvent, QueryEvent,MariadbAnnotateRowsEvent +from pymysqlreplication.event import GtidEvent, RotateEvent, MariadbGtidEvent, QueryEvent,MariadbAnnotateRowsEvent, MariadbBinLogCheckPointEvent from pymysqlreplication.row_event import WriteRowsEvent, UpdateRowsEvent, DeleteRowsEvent MARIADB_SETTINGS = { @@ -62,6 +62,7 @@ def query_server_id(self): blocking=False, only_events=[ MariadbGtidEvent, + MariadbBinLogCheckPointEvent, RotateEvent, WriteRowsEvent, UpdateRowsEvent, diff --git a/pymysqlreplication/binlogstream.py b/pymysqlreplication/binlogstream.py index 16e1abd1..9e4f4d48 100644 --- a/pymysqlreplication/binlogstream.py +++ b/pymysqlreplication/binlogstream.py @@ -13,7 +13,8 @@ XidEvent, GtidEvent, StopEvent, XAPrepareEvent, BeginLoadQueryEvent, ExecuteLoadQueryEvent, HeartbeatLogEvent, NotImplementedEvent, MariadbGtidEvent, - MariadbAnnotateRowsEvent, RandEvent, MariadbStartEncryptionEvent, RowsQueryLogEvent, UserVarEvent) + MariadbAnnotateRowsEvent, RandEvent, MariadbStartEncryptionEvent, RowsQueryLogEvent, + MariadbGtidListEvent, MariadbBinLogCheckPointEvent, UserVarEvent) from .exceptions import BinLogNotEnabled from .gtid import GtidSet from .packet import BinLogPacketWrapper @@ -624,7 +625,9 @@ def _allowed_event_list(self, only_events, ignored_events, MariadbAnnotateRowsEvent, RandEvent, MariadbStartEncryptionEvent, - UserVarEvent + UserVarEvent, + MariadbGtidListEvent, + MariadbBinLogCheckPointEvent )) if ignored_events is not None: for e in ignored_events: @@ -652,9 +655,8 @@ def __get_table_information(self, schema, table): information_schema.columns WHERE table_schema = %s AND table_name = %s - ORDER BY ORDINAL_POSITION """, (schema, table)) - result = cur.fetchall() + result = sorted(cur.fetchall(), key=lambda x: x['ORDINAL_POSITION']) cur.close() return result diff --git a/pymysqlreplication/event.py b/pymysqlreplication/event.py index 7c3ffb5f..7cc3fdaf 100644 --- a/pymysqlreplication/event.py +++ b/pymysqlreplication/event.py @@ -6,6 +6,7 @@ import decimal from pymysqlreplication.constants.STATUS_VAR_KEY import * from pymysqlreplication.exceptions import StatusVariableMismatch +from typing import Union, Optional class BinLogEvent(object): @@ -40,7 +41,7 @@ def _read_table_id(self): def dump(self): print("=== %s ===" % (self.__class__.__name__)) - print("Date: %s" % (datetime.datetime.fromtimestamp(self.timestamp) + print("Date: %s" % (datetime.datetime.utcfromtimestamp(self.timestamp) .isoformat())) print("Log position: %d" % self.packet.log_pos) print("Event size: %d" % (self.event_size)) @@ -52,55 +53,6 @@ def _dump(self): """Core data dumped for the event""" pass - def _read_new_decimal(self, precision, decimals): - """ - Read MySQL's new decimal format introduced in MySQL 5. - This project was a great source of inspiration for understanding this storage format. - (https://github.com/jeremycole/mysql_binlog) - """ - digits_per_integer = 9 - compressed_bytes = [0, 1, 1, 2, 2, 3, 3, 4, 4, 4] - integral = (precision - decimals) - uncomp_integral = int(integral / digits_per_integer) - uncomp_fractional = int(decimals / digits_per_integer) - comp_integral = integral - (uncomp_integral * digits_per_integer) - comp_fractional = decimals - (uncomp_fractional * digits_per_integer) - - # Support negative - # The sign is encoded in the high bit of the byte - # But this bit can also be used in the value - - value = self.packet.read_uint8() - if value & 0x80 != 0: - res = "" - mask = 0 - else: - mask = -1 - res = "-" - self.packet.unread(struct.pack(' 0: - value = self.packet.read_int_be_by_size(size) ^ mask - res += str(value) - - for i in range(0, uncomp_integral): - value = struct.unpack('>i', self.packet.read(4))[0] ^ mask - res += '%09d' % value - - res += "." - - for i in range(0, uncomp_fractional): - value = struct.unpack('>i', self.packet.read(4))[0] ^ mask - res += '%09d' % value - - size = compressed_bytes[comp_fractional] - if size > 0: - value = self.packet.read_int_be_by_size(size) ^ mask - res += '%0*d' % (comp_fractional, value) - - return decimal.Decimal(res) - class GtidEvent(BinLogEvent): """GTID change in binlog event """ @@ -159,6 +111,25 @@ def _dump(self): print("Flags:", self.flags) print('GTID:', self.gtid) +class MariadbBinLogCheckPointEvent(BinLogEvent): + """ + Represents a checkpoint in a binlog event in MariaDB. + + More details are available in the MariaDB Knowledge Base: + https://mariadb.com/kb/en/binlog_checkpoint_event/ + + :ivar filename_length: int - The length of the filename. + :ivar filename: str - The name of the file saved at the checkpoint. + """ + + def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs): + super(MariadbBinLogCheckPointEvent, self).__init__(from_packet, event_size, table_map, ctl_connection, + **kwargs) + filename_length = self.packet.read_uint32() + self.filename = self.packet.read(filename_length).decode() + + def _dump(self): + print('Filename:', self.filename) class MariadbAnnotateRowsEvent(BinLogEvent): """ @@ -175,7 +146,41 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs) def _dump(self): super()._dump() - print("SQL statement :", self.sql_statement) + print("SQL statement :", self.sql_statement) + +class MariadbGtidListEvent(BinLogEvent): + """ + GTID List event + https://mariadb.com/kb/en/gtid_list_event/ + + Attributes: + gtid_length: Number of GTIDs + gtid_list: list of 'MariadbGtidObejct' + + 'MariadbGtidObejct' Attributes: + domain_id: Replication Domain ID + server_id: Server_ID + gtid_seq_no: GTID sequence + gtid: 'domain_id'+ 'server_id' + 'gtid_seq_no' + """ + def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs): + + super(MariadbGtidListEvent, self).__init__(from_packet, event_size, table_map, ctl_connection, **kwargs) + + class MariadbGtidObejct(BinLogEvent): + """ + Information class of elements in GTID list + """ + def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs): + super(MariadbGtidObejct, self).__init__(from_packet, event_size, table_map, ctl_connection, **kwargs) + self.domain_id = self.packet.read_uint32() + self.server_id = self.packet.read_uint32() + self.gtid_seq_no = self.packet.read_uint64() + self.gtid = "%d-%d-%d" % (self.domain_id, self.server_id, self.gtid_seq_no) + + + self.gtid_length = self.packet.read_uint32() + self.gtid_list = [MariadbGtidObejct(from_packet, event_size, table_map, ctl_connection, **kwargs) for i in range(self.gtid_length)] class RotateEvent(BinLogEvent): @@ -555,10 +560,10 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs) super(UserVarEvent, self).__init__(from_packet, event_size, table_map, ctl_connection, **kwargs) # Payload - self.name_len = self.packet.read_uint32() - self.name = self.packet.read(self.name_len).decode() - self.is_null = self.packet.read_uint8() - self.type_to_codes_and_method = { + self.name_len: int = self.packet.read_uint32() + self.name: str = self.packet.read(self.name_len).decode() + self.is_null: int = self.packet.read_uint8() + self.type_to_codes_and_method: dict = { 0x00: ['STRING_RESULT', self._read_string], 0x01: ['REAL_RESULT', self._read_real], 0x02: ['INT_RESULT', self._read_int], @@ -566,43 +571,112 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs) 0x04: ['DECIMAL_RESULT', self._read_decimal] } - if not self.is_null: - self.type = self.packet.read_uint8() - self.charset = self.packet.read_uint32() - self.value_len = self.packet.read_uint32() + self.value: Optional[Union[str, float, int, decimal.Decimal]] = None + self.flags: Optional[int] = None + self.temp_value_buffer: Union[bytes, memoryview] = b'' - self.value = self.type_to_codes_and_method.get(self.type, ["UNKNOWN_RESULT", self._read_default])[1]() - self.flags = self.packet.read_uint8() + if not self.is_null: + self.type: int = self.packet.read_uint8() + self.charset: int = self.packet.read_uint32() + self.value_len: int = self.packet.read_uint32() + self.temp_value_buffer: Union[bytes, memoryview] = self.packet.read(self.value_len) + self.flags: int = self.packet.read_uint8() + self._set_value_from_temp_buffer() else: - self.type, self.charset, self.value, self.flags = None, None, None, None + self.type, self.charset, self.value_len, self.value, self.flags = None, None, None, None, None - def _read_string(self): - return self.packet.read(self.value_len).decode() + def _set_value_from_temp_buffer(self): + """ + Set the value from the temporary buffer based on the type code. + """ + if self.temp_value_buffer: + type_code, read_method = self.type_to_codes_and_method.get(self.type, ["UNKNOWN_RESULT", self._read_default]) + if type_code == 'INT_RESULT': + self.value = read_method(self.temp_value_buffer, self.flags) + else: + self.value = read_method(self.temp_value_buffer) + + def _read_string(self, buffer: bytes) -> str: + """ + Read string data. + """ + return buffer.decode() - def _read_real(self): - return struct.unpack(' float: + """ + Read real data. + """ + return struct.unpack(' int: + """ + Read integer data. + """ + fmt = ' decimal.Decimal: + """ + Read decimal data. + """ + self.precision = self.temp_value_buffer[0] + self.decimals = self.temp_value_buffer[1] + raw_decimal = self.temp_value_buffer[2:] + return self._parse_decimal_from_bytes(raw_decimal, self.precision, self.decimals) - def _read_default(self): + def _read_default(self) -> bytes: + """ + Read default data. + Used when the type is None. + """ return self.packet.read(self.value_len) - def _read_new_decimal(self, precision, decimals): - return float(super()._read_new_decimal(precision, decimals)) + @staticmethod + def _parse_decimal_from_bytes(raw_decimal: bytes, precision: int, decimals: int) -> decimal.Decimal: + """ + Parse decimal from bytes. + """ + digits_per_integer = 9 + compressed_bytes = [0, 1, 1, 2, 2, 3, 3, 4, 4, 4] + integral = precision - decimals + + uncomp_integral, comp_integral = divmod(integral, digits_per_integer) + uncomp_fractional, comp_fractional = divmod(decimals, digits_per_integer) - def _dump(self): + res = "-" if not raw_decimal[0] & 0x80 else "" + mask = -1 if res == "-" else 0 + raw_decimal = bytearray([raw_decimal[0] ^ 0x80]) + raw_decimal[1:] + + def decode_decimal_decompress_value(comp_indx, data, mask): + size = compressed_bytes[comp_indx] + if size > 0: + databuff = bytearray(data[:size]) + for i in range(size): + databuff[i] = (databuff[i] ^ mask) & 0xFF + return size, int.from_bytes(databuff, byteorder='big') + return 0, 0 + + pointer, value = decode_decimal_decompress_value(comp_integral, raw_decimal, mask) + res += str(value) + + for _ in range(uncomp_integral): + value = struct.unpack('>i', raw_decimal[pointer:pointer+4])[0] ^ mask + res += '%09d' % value + pointer += 4 + + res += "." + + for _ in range(uncomp_fractional): + value = struct.unpack('>i', raw_decimal[pointer:pointer+4])[0] ^ mask + res += '%09d' % value + pointer += 4 + + size, value = decode_decimal_decompress_value(comp_fractional, raw_decimal[pointer:], mask) + if size > 0: + res += '%0*d' % (comp_fractional, value) + return decimal.Decimal(res) + + def _dump(self) -> None: super(UserVarEvent, self)._dump() print("User variable name: %s" % self.name) print("Is NULL: %s" % ("Yes" if self.is_null else "No")) diff --git a/pymysqlreplication/packet.py b/pymysqlreplication/packet.py index d1fb369f..7a801f13 100644 --- a/pymysqlreplication/packet.py +++ b/pymysqlreplication/packet.py @@ -89,8 +89,9 @@ class BinLogPacketWrapper(object): # MariaDB GTID constants.MARIADB_ANNOTATE_ROWS_EVENT: event.MariadbAnnotateRowsEvent, constants.MARIADB_BINLOG_CHECKPOINT_EVENT: event.NotImplementedEvent, + constants.MARIADB_BINLOG_CHECKPOINT_EVENT: event.MariadbBinLogCheckPointEvent, constants.MARIADB_GTID_EVENT: event.MariadbGtidEvent, - constants.MARIADB_GTID_GTID_LIST_EVENT: event.NotImplementedEvent, + constants.MARIADB_GTID_GTID_LIST_EVENT: event.MariadbGtidListEvent, constants.MARIADB_START_ENCRYPTION_EVENT: event.MariadbStartEncryptionEvent } diff --git a/pymysqlreplication/row_event.py b/pymysqlreplication/row_event.py index 7b1db1b0..fcd138d3 100644 --- a/pymysqlreplication/row_event.py +++ b/pymysqlreplication/row_event.py @@ -79,8 +79,9 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs) #Body self.number_of_columns = self.packet.read_length_coded_binary() self.columns = self.table_map[self.table_id].columns + column_schemas = self.table_map[self.table_id].column_schemas - if len(self.columns) == 0: # could not read the table metadata, probably already dropped + if len(column_schemas) == 0: # could not read the table metadata, probably already dropped self.complete = False if self._fail_on_table_metadata_unavailable: raise TableMetadataUnavailableError(self.table) @@ -102,7 +103,7 @@ def _read_column_data(self, cols_bitmap): # See http://dev.mysql.com/doc/internals/en/rows-event.html null_bitmap = self.packet.read((BitCount(cols_bitmap) + 7) / 8) - nullBitmapIndex = 0 + null_bitmap_index = 0 nb_columns = len(self.columns) for i in range(0, nb_columns): column = self.columns[i] @@ -111,117 +112,127 @@ def _read_column_data(self, cols_bitmap): zerofill = self.table_map[self.table_id].columns[i].zerofill fixed_binary_length = self.table_map[self.table_id].columns[i].fixed_binary_length - if BitGet(cols_bitmap, i) == 0: - values[name] = None - continue - - if self._is_null(null_bitmap, nullBitmapIndex): - values[name] = None - elif column.type == FIELD_TYPE.TINY: - if unsigned: - values[name] = struct.unpack(" 255: - values[name] = self.__read_string(2, column) - else: - values[name] = self.__read_string(1, column) - - if fixed_binary_length and len(values[name]) < fixed_binary_length: - # Fixed-length binary fields are stored in the binlog - # without trailing zeros and must be padded with zeros up - # to the specified length at read time. - nr_pad = fixed_binary_length - len(values[name]) - values[name] += b'\x00' * nr_pad - - elif column.type == FIELD_TYPE.NEWDECIMAL: - values[name] = self._read_new_decimal(column.precision, column.decimals) - elif column.type == FIELD_TYPE.BLOB: - values[name] = self.__read_string(column.length_size, column) - elif column.type == FIELD_TYPE.DATETIME: - values[name] = self.__read_datetime() - elif column.type == FIELD_TYPE.TIME: - values[name] = self.__read_time() - elif column.type == FIELD_TYPE.DATE: - values[name] = self.__read_date() - elif column.type == FIELD_TYPE.TIMESTAMP: - values[name] = datetime.datetime.fromtimestamp( - self.packet.read_uint32()) - - # For new date format: - elif column.type == FIELD_TYPE.DATETIME2: - values[name] = self.__read_datetime2(column) - elif column.type == FIELD_TYPE.TIME2: - values[name] = self.__read_time2(column) - elif column.type == FIELD_TYPE.TIMESTAMP2: - values[name] = self.__add_fsp_to_time( - datetime.datetime.fromtimestamp( - self.packet.read_int_be_by_size(4)), column) - elif column.type == FIELD_TYPE.LONGLONG: - if unsigned: - values[name] = self.packet.read_uint64() - if zerofill: - values[name] = format(values[name], '020d') - else: - values[name] = self.packet.read_int64() - elif column.type == FIELD_TYPE.YEAR: - values[name] = self.packet.read_uint8() + 1900 - elif column.type == FIELD_TYPE.ENUM: - values[name] = column.enum_values[ - self.packet.read_uint_by_size(column.size)] - elif column.type == FIELD_TYPE.SET: - # We read set columns as a bitmap telling us which options - # are enabled - bit_mask = self.packet.read_uint_by_size(column.size) - values[name] = set( - val for idx, val in enumerate(column.set_values) - if bit_mask & 2 ** idx - ) or None - - elif column.type == FIELD_TYPE.BIT: - values[name] = self.__read_bit(column) - elif column.type == FIELD_TYPE.GEOMETRY: - values[name] = self.packet.read_length_coded_pascal_string( - column.length_size) - elif column.type == FIELD_TYPE.JSON: - values[name] = self.packet.read_binary_json(column.length_size) - else: - raise NotImplementedError("Unknown MySQL column type: %d" % - (column.type)) + values[name] = self.__read_values_name(column, null_bitmap, null_bitmap_index, + cols_bitmap, unsigned, zerofill, + fixed_binary_length, i) - nullBitmapIndex += 1 + if BitGet(cols_bitmap, i) != 0: + null_bitmap_index += 1 return values + def __read_values_name(self, column, null_bitmap, null_bitmap_index, cols_bitmap, unsigned, zerofill, + fixed_binary_length, i): + + if BitGet(cols_bitmap, i) == 0: + return None + + if self._is_null(null_bitmap, null_bitmap_index): + return None + + if column.type == FIELD_TYPE.TINY: + if unsigned: + ret = struct.unpack(" 255 else self.__read_string(1, column) + + if fixed_binary_length and len(ret) < fixed_binary_length: + # Fixed-length binary fields are stored in the binlog + # without trailing zeros and must be padded with zeros up + # to the specified length at read time. + nr_pad = fixed_binary_length - len(ret) + ret += b'\x00' * nr_pad + return ret + elif column.type == FIELD_TYPE.NEWDECIMAL: + return self.__read_new_decimal(column) + elif column.type == FIELD_TYPE.BLOB: + return self.__read_string(column.length_size, column) + elif column.type == FIELD_TYPE.DATETIME: + return self.__read_datetime() + elif column.type == FIELD_TYPE.TIME: + return self.__read_time() + elif column.type == FIELD_TYPE.DATE: + return self.__read_date() + elif column.type == FIELD_TYPE.TIMESTAMP: + return datetime.datetime.fromtimestamp( + self.packet.read_uint32()) + + # For new date format: + elif column.type == FIELD_TYPE.DATETIME2: + return self.__read_datetime2(column) + elif column.type == FIELD_TYPE.TIME2: + return self.__read_time2(column) + elif column.type == FIELD_TYPE.TIMESTAMP2: + return self.__add_fsp_to_time( + datetime.datetime.fromtimestamp( + self.packet.read_int_be_by_size(4)), column) + elif column.type == FIELD_TYPE.LONGLONG: + if unsigned: + ret = self.packet.read_uint64() + if zerofill: + ret = format(ret, '020d') + return ret + else: + return self.packet.read_int64() + elif column.type == FIELD_TYPE.YEAR: + return self.packet.read_uint8() + 1900 + elif column.type == FIELD_TYPE.ENUM: + return column.enum_values[ + self.packet.read_uint_by_size(column.size)] + elif column.type == FIELD_TYPE.SET: + # We read set columns as a bitmap telling us which options + # are enabled + bit_mask = self.packet.read_uint_by_size(column.size) + return set( + val for idx, val in enumerate(column.set_values) + if bit_mask & 2 ** idx + ) or None + + elif column.type == FIELD_TYPE.BIT: + return self.__read_bit(column) + elif column.type == FIELD_TYPE.GEOMETRY: + return self.packet.read_length_coded_pascal_string( + column.length_size) + elif column.type == FIELD_TYPE.JSON: + return self.packet.read_binary_json(column.length_size) + else: + raise NotImplementedError("Unknown MySQL column type: %d" % + (column.type)) + def __add_fsp_to_time(self, time, column): """Read and add the fractional part of time For more details about new date format: @@ -386,8 +397,55 @@ def __read_datetime2(self, column): return None return self.__add_fsp_to_time(t, column) - def _read_new_decimal(self, precision, decimals): - return super()._read_new_decimal(precision, decimals) + def __read_new_decimal(self, column): + """Read MySQL's new decimal format introduced in MySQL 5""" + + # This project was a great source of inspiration for + # understanding this storage format. + # https://github.com/jeremycole/mysql_binlog + + digits_per_integer = 9 + compressed_bytes = [0, 1, 1, 2, 2, 3, 3, 4, 4, 4] + integral = (column.precision - column.decimals) + uncomp_integral = int(integral / digits_per_integer) + uncomp_fractional = int(column.decimals / digits_per_integer) + comp_integral = integral - (uncomp_integral * digits_per_integer) + comp_fractional = column.decimals - (uncomp_fractional + * digits_per_integer) + + # Support negative + # The sign is encoded in the high bit of the the byte + # But this bit can also be used in the value + value = self.packet.read_uint8() + if value & 0x80 != 0: + res = "" + mask = 0 + else: + mask = -1 + res = "-" + self.packet.unread(struct.pack(' 0: + value = self.packet.read_int_be_by_size(size) ^ mask + res += str(value) + + for i in range(0, uncomp_integral): + value = struct.unpack('>i', self.packet.read(4))[0] ^ mask + res += '%09d' % value + + res += "." + + for i in range(0, uncomp_fractional): + value = struct.unpack('>i', self.packet.read(4))[0] ^ mask + res += '%09d' % value + + size = compressed_bytes[comp_fractional] + if size > 0: + value = self.packet.read_int_be_by_size(size) ^ mask + res += '%0*d' % (comp_fractional, value) + + return decimal.Decimal(res) def __read_binary_slice(self, binary, start, size, data_length): """ @@ -577,7 +635,7 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs) ordinal_pos_loc = 0 - if len(self.column_schemas) != 0: + if self.column_count != 0: # Read columns meta data column_types = bytearray(self.packet.read(self.column_count)) self.packet.read_length_coded_binary() diff --git a/pymysqlreplication/tests/base.py b/pymysqlreplication/tests/base.py index 009c3e29..fd18cb3d 100644 --- a/pymysqlreplication/tests/base.py +++ b/pymysqlreplication/tests/base.py @@ -125,10 +125,10 @@ class PyMySQLReplicationMariaDbTestCase(PyMySQLReplicationTestCase): def setUp(self): # default self.database = { - "host": "localhost", + "host": os.environ.get("MARIADB_10_6") or "localhost", "user": "root", "passwd": "", - "port": 3308, + "port": int(os.environ.get("MARIADB_10_6_PORT") or 3308), "use_unicode": True, "charset": "utf8", "db": "pymysqlreplication_test" @@ -144,4 +144,9 @@ def setUp(self): self.connect_conn_control(db) self.stream = None self.resetBinLog() - \ No newline at end of file + + def bin_log_basename(self): + cursor = self.execute('SELECT @@log_bin_basename') + bin_log_basename = cursor.fetchone()[0] + bin_log_basename = bin_log_basename.split("/")[-1] + return bin_log_basename diff --git a/pymysqlreplication/tests/test_basic.py b/pymysqlreplication/tests/test_basic.py index a96976f5..92f20c97 100644 --- a/pymysqlreplication/tests/test_basic.py +++ b/pymysqlreplication/tests/test_basic.py @@ -27,9 +27,9 @@ def ignoredEvents(self): return [GtidEvent] def test_allowed_event_list(self): - self.assertEqual(len(self.stream._allowed_event_list(None, None, False)), 21) - self.assertEqual(len(self.stream._allowed_event_list(None, None, True)), 20) - self.assertEqual(len(self.stream._allowed_event_list(None, [RotateEvent], False)), 20) + self.assertEqual(len(self.stream._allowed_event_list(None, None, False)), 22) + self.assertEqual(len(self.stream._allowed_event_list(None, None, True)), 21) + self.assertEqual(len(self.stream._allowed_event_list(None, [RotateEvent], False)), 21) self.assertEqual(len(self.stream._allowed_event_list([RotateEvent], None, False)), 1) def test_read_query_event(self): @@ -771,9 +771,8 @@ def setUp(self): self.stream.close() ctl_db = copy.copy(self.database) ctl_db["db"] = None - ctl_db["port"] = 3307 - if os.environ.get("MYSQL_5_7_CTL") is not None: - ctl_db["host"] = os.environ.get("MYSQL_5_7_CTL") + ctl_db["port"] = int(os.environ.get("MYSQL_5_7_CTL_PORT") or 3307) + ctl_db["host"] = os.environ.get("MYSQL_5_7_CTL") or "localhost" self.ctl_conn_control = pymysql.connect(**ctl_db) self.ctl_conn_control.cursor().execute("DROP DATABASE IF EXISTS pymysqlreplication_test") self.ctl_conn_control.cursor().execute("CREATE DATABASE pymysqlreplication_test") @@ -1075,9 +1074,11 @@ def test_user_var_real_event(self): self.assertEqual(expected_user_var_event.charset, 33) def test_user_var_int_event(self): - self.execute("CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data INT, PRIMARY KEY (id))") - self.execute("SET @test_user_var = 5") - self.execute("INSERT INTO test (data) VALUES(@test_user_var)") + self.execute("CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data1 INT, data2 INT, data3 INT, PRIMARY KEY (id))") + self.execute("SET @test_user_var1 = 5") + self.execute("SET @test_user_var2 = 0") + self.execute("SET @test_user_var3 = -5") + self.execute("INSERT INTO test (data1, data2, data3) VALUES(@test_user_var1, @test_user_var2, @test_user_var3)") self.execute("COMMIT") self.assertEqual(self.bin_log_format(), "STATEMENT") @@ -1087,16 +1088,113 @@ def test_user_var_int_event(self): expected_user_var_event = self.stream.fetchone() self.assertIsInstance(expected_user_var_event, UserVarEvent) self.assertIsInstance(expected_user_var_event.name_len, int) - self.assertEqual(expected_user_var_event.name, "test_user_var") + self.assertEqual(expected_user_var_event.name, "test_user_var1") self.assertEqual(expected_user_var_event.value, 5) self.assertEqual(expected_user_var_event.is_null, 0) self.assertEqual(expected_user_var_event.type, 2) self.assertEqual(expected_user_var_event.charset, 33) + expected_user_var_event = self.stream.fetchone() + self.assertIsInstance(expected_user_var_event, UserVarEvent) + self.assertIsInstance(expected_user_var_event.name_len, int) + self.assertEqual(expected_user_var_event.name, "test_user_var2") + self.assertEqual(expected_user_var_event.value, 0) + self.assertEqual(expected_user_var_event.is_null, 0) + self.assertEqual(expected_user_var_event.type, 2) + self.assertEqual(expected_user_var_event.charset, 33) + + expected_user_var_event = self.stream.fetchone() + self.assertIsInstance(expected_user_var_event, UserVarEvent) + self.assertIsInstance(expected_user_var_event.name_len, int) + self.assertEqual(expected_user_var_event.name, "test_user_var3") + self.assertEqual(expected_user_var_event.value, -5) + self.assertEqual(expected_user_var_event.is_null, 0) + self.assertEqual(expected_user_var_event.type, 2) + self.assertEqual(expected_user_var_event.charset, 33) + + def test_user_var_int24_event(self): + self.execute("CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data1 MEDIUMINT, data2 MEDIUMINT, data3 MEDIUMINT UNSIGNED, PRIMARY KEY (id))") + self.execute("SET @test_user_var1 = 8388607") + self.execute("SET @test_user_var2 = -8388607") + self.execute("SET @test_user_var3 = 16777215") + self.execute("INSERT INTO test (data1, data2, data3) VALUES(@test_user_var1, @test_user_var2, @test_user_var3)") + self.execute("COMMIT") + + self.assertEqual(self.bin_log_format(), "STATEMENT") + self.assertIsInstance(self.stream.fetchone(), QueryEvent) + self.assertIsInstance(self.stream.fetchone(), QueryEvent) + + expected_user_var_event = self.stream.fetchone() + self.assertIsInstance(expected_user_var_event, UserVarEvent) + self.assertIsInstance(expected_user_var_event.name_len, int) + self.assertEqual(expected_user_var_event.name, "test_user_var1") + self.assertEqual(expected_user_var_event.value, 8388607) + self.assertEqual(expected_user_var_event.is_null, 0) + self.assertEqual(expected_user_var_event.type, 2) + self.assertEqual(expected_user_var_event.charset, 33) + + expected_user_var_event = self.stream.fetchone() + self.assertIsInstance(expected_user_var_event, UserVarEvent) + self.assertIsInstance(expected_user_var_event.name_len, int) + self.assertEqual(expected_user_var_event.name, "test_user_var2") + self.assertEqual(expected_user_var_event.value, -8388607) + self.assertEqual(expected_user_var_event.is_null, 0) + self.assertEqual(expected_user_var_event.type, 2) + self.assertEqual(expected_user_var_event.charset, 33) + + expected_user_var_event = self.stream.fetchone() + self.assertIsInstance(expected_user_var_event, UserVarEvent) + self.assertIsInstance(expected_user_var_event.name_len, int) + self.assertEqual(expected_user_var_event.name, "test_user_var3") + self.assertEqual(expected_user_var_event.value, 16777215) + self.assertEqual(expected_user_var_event.is_null, 0) + self.assertEqual(expected_user_var_event.type, 2) + self.assertEqual(expected_user_var_event.charset, 33) + + def test_user_var_longlong_event(self): + self.execute("CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data1 BIGINT, data2 BIGINT, data3 BIGINT UNSIGNED, PRIMARY KEY (id))") + self.execute("SET @test_user_var1 = 9223372036854775807") + self.execute("SET @test_user_var2 = -9223372036854775808") + self.execute("SET @test_user_var3 = 18446744073709551615") + self.execute("INSERT INTO test (data1, data2, data3) VALUES(@test_user_var1, @test_user_var2, @test_user_var3)") + self.execute("COMMIT") + + self.assertEqual(self.bin_log_format(), "STATEMENT") + self.assertIsInstance(self.stream.fetchone(), QueryEvent) + self.assertIsInstance(self.stream.fetchone(), QueryEvent) + + expected_user_var_event = self.stream.fetchone() + self.assertIsInstance(expected_user_var_event, UserVarEvent) + self.assertIsInstance(expected_user_var_event.name_len, int) + self.assertEqual(expected_user_var_event.name, "test_user_var1") + self.assertEqual(expected_user_var_event.value, 9223372036854775807) + self.assertEqual(expected_user_var_event.is_null, 0) + self.assertEqual(expected_user_var_event.type, 2) + self.assertEqual(expected_user_var_event.charset, 33) + + expected_user_var_event = self.stream.fetchone() + self.assertIsInstance(expected_user_var_event, UserVarEvent) + self.assertIsInstance(expected_user_var_event.name_len, int) + self.assertEqual(expected_user_var_event.name, "test_user_var2") + self.assertEqual(expected_user_var_event.value, -9223372036854775808) + self.assertEqual(expected_user_var_event.is_null, 0) + self.assertEqual(expected_user_var_event.type, 2) + self.assertEqual(expected_user_var_event.charset, 33) + + expected_user_var_event = self.stream.fetchone() + self.assertIsInstance(expected_user_var_event, UserVarEvent) + self.assertIsInstance(expected_user_var_event.name_len, int) + self.assertEqual(expected_user_var_event.name, "test_user_var3") + self.assertEqual(expected_user_var_event.value, 18446744073709551615) + self.assertEqual(expected_user_var_event.is_null, 0) + self.assertEqual(expected_user_var_event.type, 2) + self.assertEqual(expected_user_var_event.charset, 33) + def test_user_var_decimal_event(self): - self.execute("CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data DECIMAL, PRIMARY KEY (id))") - self.execute("SET @test_user_var = 5.25") - self.execute("INSERT INTO test (data) VALUES(@test_user_var)") + self.execute("CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data1 DECIMAL, data2 DECIMAL, PRIMARY KEY (id))") + self.execute("SET @test_user_var1 = 5.25") + self.execute("SET @test_user_var2 = -5.25") + self.execute("INSERT INTO test (data1, data2) VALUES(@test_user_var1, @test_user_var2)") self.execute("COMMIT") self.assertEqual(self.bin_log_format(), "STATEMENT") @@ -1106,17 +1204,53 @@ def test_user_var_decimal_event(self): expected_user_var_event = self.stream.fetchone() self.assertIsInstance(expected_user_var_event, UserVarEvent) self.assertIsInstance(expected_user_var_event.name_len, int) - self.assertEqual(expected_user_var_event.name, "test_user_var") + self.assertEqual(expected_user_var_event.name, "test_user_var1") self.assertEqual(expected_user_var_event.value, 5.25) self.assertEqual(expected_user_var_event.is_null, 0) self.assertEqual(expected_user_var_event.type, 4) self.assertEqual(expected_user_var_event.charset, 33) + expected_user_var_event = self.stream.fetchone() + self.assertIsInstance(expected_user_var_event, UserVarEvent) + self.assertIsInstance(expected_user_var_event.name_len, int) + self.assertEqual(expected_user_var_event.name, "test_user_var2") + self.assertEqual(expected_user_var_event.value, -5.25) + self.assertEqual(expected_user_var_event.is_null, 0) + self.assertEqual(expected_user_var_event.type, 4) + self.assertEqual(expected_user_var_event.charset, 33) + def tearDown(self): self.execute("SET @@binlog_format='ROW'") self.assertEqual(self.bin_log_format(), "ROW") super(TestStatementConnectionSetting, self).tearDown() +class TestMariadbBinlogStreamReader(base.PyMySQLReplicationMariaDbTestCase): + def test_binlog_checkpoint_event(self): + self.stream.close() + self.stream = BinLogStreamReader( + self.database, + server_id=1023, + blocking=False, + is_mariadb=True + ) + + query = "DROP TABLE IF EXISTS test" + self.execute(query) + + query = "CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" + self.execute(query) + self.stream.close() + + event = self.stream.fetchone() + self.assertIsInstance(event, RotateEvent) + + event = self.stream.fetchone() + self.assertIsInstance(event,FormatDescriptionEvent) + + event = self.stream.fetchone() + self.assertIsInstance(event, MariadbBinLogCheckPointEvent) + self.assertEqual(event.filename, self.bin_log_basename()+".000001") + class TestMariadbBinlogStreamReader(base.PyMySQLReplicationMariaDbTestCase): def test_annotate_rows_event(self): @@ -1181,7 +1315,40 @@ def test_start_encryption_event(self): self.assertEqual(schema, 1) self.assertEqual(key_version, key_version_from_key_file) self.assertEqual(type(nonce), bytes) - self.assertEqual(len(nonce), 12) + self.assertEqual(len(nonce), 12) + + def test_gtid_list_event(self): + # set max_binlog_size to create new binlog file + query = 'SET GLOBAL max_binlog_size=4096' + self.execute(query) + # parse only Maradb GTID list event + self.stream.close() + self.stream = BinLogStreamReader( + self.database, + server_id=1024, + blocking=False, + only_events=[MariadbGtidListEvent], + is_mariadb=True, + ) + + query = "CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" + self.execute(query) + query = "INSERT INTO test (data) VALUES('Hello World')" + + for cnt in range(0,15): + self.execute(query) + self.execute("COMMIT") + + # 'mariadb gtid list event' of first binlog file + event = self.stream.fetchone() + self.assertEqual(event.event_type,163) + self.assertIsInstance(event,MariadbGtidListEvent) + + # 'mariadb gtid list event' of second binlog file + event = self.stream.fetchone() + self.assertEqual(event.event_type,163) + self.assertEqual(event.gtid_list[0].gtid, '0-1-15') + class TestRowsQueryLogEvents(base.PyMySQLReplicationTestCase): diff --git a/test.Dockerfile b/test.Dockerfile index 6aa2e34a..0272e00f 100644 --- a/test.Dockerfile +++ b/test.Dockerfile @@ -1,6 +1,7 @@ ARG BASE_IMAGE=${BASE_IMAGE:-python:3.11-alpine} FROM ${BASE_IMAGE} +COPY .mariadb .mariadb COPY pymysqlreplication pymysqlreplication COPY README.md README.md COPY setup.py setup.py @@ -13,4 +14,13 @@ ARG MYSQL_5_7 ENV MYSQL_5_7 ${MYSQL_5_7} ARG MYSQL_5_7_CTL -ENV MYSQL_5_7_CTL ${MYSQL_5_7_CTL} \ No newline at end of file +ENV MYSQL_5_7_CTL ${MYSQL_5_7_CTL} + +ARG MYSQL_5_7_CTL_PORT +ENV MYSQL_5_7_CTL_PORT ${MYSQL_5_7_CTL_PORT} + +ARG MARIADB_10_6 +ENV MARIADB_10_6 ${MARIADB_10_6} + +ARG MARIADB_10_6_PORT +ENV MARIADB_10_6_PORT ${MARIADB_10_6_PORT} \ No newline at end of file From 9493c21055b09f53febadfa59df54db790e3cf40 Mon Sep 17 00:00:00 2001 From: mjs Date: Fri, 1 Sep 2023 23:19:04 +0900 Subject: [PATCH 6/6] fix: modify for confilcts --- pymysqlreplication/tests/test_basic.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pymysqlreplication/tests/test_basic.py b/pymysqlreplication/tests/test_basic.py index 3f04c818..411894cd 100644 --- a/pymysqlreplication/tests/test_basic.py +++ b/pymysqlreplication/tests/test_basic.py @@ -27,9 +27,9 @@ def ignoredEvents(self): return [GtidEvent] def test_allowed_event_list(self): - self.assertEqual(len(self.stream._allowed_event_list(None, None, False)), 22) - self.assertEqual(len(self.stream._allowed_event_list(None, None, True)), 21) - self.assertEqual(len(self.stream._allowed_event_list(None, [RotateEvent], False)), 21) + self.assertEqual(len(self.stream._allowed_event_list(None, None, False)), 23) + self.assertEqual(len(self.stream._allowed_event_list(None, None, True)), 22) + self.assertEqual(len(self.stream._allowed_event_list(None, [RotateEvent], False)), 22) self.assertEqual(len(self.stream._allowed_event_list([RotateEvent], None, False)), 1) def test_read_query_event(self):