From f4f4316af69690d669aa7150bb1f6072e1962173 Mon Sep 17 00:00:00 2001 From: starcat37 Date: Thu, 27 Jul 2023 21:14:13 +0900 Subject: [PATCH 01/42] Feat: implement rand_event --- .gitignore | 2 ++ examples/dump_events.py | 6 +++++- pymysqlreplication/binlogstream.py | 12 +++++++----- pymysqlreplication/event.py | 18 ++++++++++++++++++ pymysqlreplication/packet.py | 1 + 5 files changed, 33 insertions(+), 6 deletions(-) diff --git a/.gitignore b/.gitignore index 9a27e404..27e7b3c9 100644 --- a/.gitignore +++ b/.gitignore @@ -43,3 +43,5 @@ _build # Pyenv .python-version MANIFEST + +venv/ \ No newline at end of file diff --git a/examples/dump_events.py b/examples/dump_events.py index e0de09f1..3287a5a9 100644 --- a/examples/dump_events.py +++ b/examples/dump_events.py @@ -21,7 +21,11 @@ def main(): # the end of the stream stream = BinLogStreamReader(connection_settings=MYSQL_SETTINGS, server_id=3, - blocking=True) + blocking=True, + # log_file = "binlog.000022", + # log_pos = 1265, + resume_stream = True, # log-pos도 명시할 경우 + ) for binlogevent in stream: binlogevent.dump() diff --git a/pymysqlreplication/binlogstream.py b/pymysqlreplication/binlogstream.py index fa65aa22..e06760fe 100644 --- a/pymysqlreplication/binlogstream.py +++ b/pymysqlreplication/binlogstream.py @@ -14,6 +14,7 @@ QueryEvent, RotateEvent, FormatDescriptionEvent, XidEvent, GtidEvent, StopEvent, XAPrepareEvent, BeginLoadQueryEvent, ExecuteLoadQueryEvent, + RandEvent, HeartbeatLogEvent, NotImplementedEvent, MariadbGtidEvent) from .exceptions import BinLogNotEnabled from .row_event import ( @@ -346,9 +347,9 @@ def __connect_to_stream(self): + bytes(bytearray([COM_BINLOG_DUMP])) if self.__resume_stream: - prelude += struct.pack(' Date: Thu, 27 Jul 2023 21:30:18 +0900 Subject: [PATCH 02/42] fix: change number of testcases --- pymysqlreplication/tests/test_basic.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pymysqlreplication/tests/test_basic.py b/pymysqlreplication/tests/test_basic.py index 0db8a264..396f8cbc 100644 --- a/pymysqlreplication/tests/test_basic.py +++ b/pymysqlreplication/tests/test_basic.py @@ -25,9 +25,9 @@ def ignoredEvents(self): return [GtidEvent] def test_allowed_event_list(self): - self.assertEqual(len(self.stream._allowed_event_list(None, None, False)), 16) - self.assertEqual(len(self.stream._allowed_event_list(None, None, True)), 15) - self.assertEqual(len(self.stream._allowed_event_list(None, [RotateEvent], False)), 15) + self.assertEqual(len(self.stream._allowed_event_list(None, None, False)), 17) + self.assertEqual(len(self.stream._allowed_event_list(None, None, True)), 16) + self.assertEqual(len(self.stream._allowed_event_list(None, [RotateEvent], False)), 16) self.assertEqual(len(self.stream._allowed_event_list([RotateEvent], None, False)), 1) def test_read_query_event(self): From 13efa87eba92b01f7a81c197d08e90a212dac96a Mon Sep 17 00:00:00 2001 From: mjs Date: Thu, 27 Jul 2023 21:36:02 +0900 Subject: [PATCH 03/42] Add property for rand event --- pymysqlreplication/event.py | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/pymysqlreplication/event.py b/pymysqlreplication/event.py index bbe9791c..08a28c9b 100644 --- a/pymysqlreplication/event.py +++ b/pymysqlreplication/event.py @@ -438,6 +438,9 @@ def _dump(self): class RandEvent(BinLogEvent): """ + RandEvent is generated every time a statement uses the RAND() function. + Indicates the seed values to use for generating a random number with RAND() in the next statement. + Attributes: seed1 seed2 @@ -449,7 +452,17 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs) # Payload self.seed1 = self.packet.read_uint64() self.seed2 = self.packet.read_uint64() - + + @property + def seed1(self): + """Get the first seed value""" + return self.seed1 + + @property + def seed2(self): + """Get the second seed value""" + return self.seed2 + def _dump(self): super(RandEvent, self)._dump() print("seed1: %d" % (self.seed1)) From c3be8c9be2b7e65ef237caa535a4db91bc5f3dd7 Mon Sep 17 00:00:00 2001 From: mjs Date: Thu, 27 Jul 2023 22:26:38 +0900 Subject: [PATCH 04/42] fix: rename seed to _seed --- pymysqlreplication/event.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pymysqlreplication/event.py b/pymysqlreplication/event.py index 08a28c9b..248735e0 100644 --- a/pymysqlreplication/event.py +++ b/pymysqlreplication/event.py @@ -450,18 +450,18 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs) super(RandEvent, self).__init__(from_packet, event_size, table_map, ctl_connection, **kwargs) # Payload - self.seed1 = self.packet.read_uint64() - self.seed2 = self.packet.read_uint64() + self._seed1 = self.packet.read_uint64() + self._seed2 = self.packet.read_uint64() @property def seed1(self): """Get the first seed value""" - return self.seed1 + return self._seed1 @property def seed2(self): """Get the second seed value""" - return self.seed2 + return self._seed2 def _dump(self): super(RandEvent, self)._dump() From f8324cc14b801b8a2ad665c2c857d3295fb1787f Mon Sep 17 00:00:00 2001 From: heehehe Date: Sat, 29 Jul 2023 14:47:48 +0900 Subject: [PATCH 05/42] test: add test_rand_event --- pymysqlreplication/tests/test_basic.py | 29 +++++++++++++++++++++++++- 1 file changed, 28 insertions(+), 1 deletion(-) diff --git a/pymysqlreplication/tests/test_basic.py b/pymysqlreplication/tests/test_basic.py index 396f8cbc..2a6ec54a 100644 --- a/pymysqlreplication/tests/test_basic.py +++ b/pymysqlreplication/tests/test_basic.py @@ -17,7 +17,7 @@ from pymysqlreplication.constants.BINLOG import * from pymysqlreplication.row_event import * -__all__ = ["TestBasicBinLogStreamReader", "TestMultipleRowBinLogStreamReader", "TestCTLConnectionSettings", "TestGtidBinLogStreamReader"] +__all__ = ["TestBasicBinLogStreamReader", "TestMultipleRowBinLogStreamReader", "TestCTLConnectionSettings", "TestGtidBinLogStreamReader", "TestStatementConnectionSetting"] class TestBasicBinLogStreamReader(base.PyMySQLReplicationTestCase): @@ -1002,6 +1002,33 @@ def test_parsing(self): gtid = Gtid("57b70f4e-20d3-11e5-a393-4a63946f7eac:1-:1") gtid = Gtid("57b70f4e-20d3-11e5-a393-4a63946f7eac::1") +class TestStatementConnectionSetting(base.PyMySQLReplicationTestCase): + def setUp(self): + super(TestStatementConnectionSetting, self).setUp() + self.stream.close() + self.stream = BinLogStreamReader( + self.database, + server_id=1024, + only_events=(RandEvent, QueryEvent), + fail_on_table_metadata_unavailable=True + ) + + def test_rand_event(self): + self.execute("SET @@binlog_format='STATEMENT'") + self.execute("CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data INT NOT NULL, PRIMARY KEY (id))") + self.execute("INSERT INTO test (data) VALUES(RAND())") + self.execute("COMMIT") + + self.assertEqual(self.bin_log_format(), "STATEMENT") + self.assertIsInstance(self.stream.fetchone(), QueryEvent) + self.assertIsInstance(self.stream.fetchone(), QueryEvent) + self.assertIsInstance(self.stream.fetchone(), RandEvent) + + def tearDown(self): + self.execute("SET @@binlog_format='ROW'") + self.assertEqual(self.bin_log_format(), "ROW") + super(TestStatementConnectionSetting, self).tearDown() + if __name__ == "__main__": import unittest From 6511f85ddaced2fe0e447e9607428c1c474c635f Mon Sep 17 00:00:00 2001 From: heehehe Date: Sat, 29 Jul 2023 14:55:47 +0900 Subject: [PATCH 06/42] test: move statement setting to setUp --- pymysqlreplication/tests/test_basic.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pymysqlreplication/tests/test_basic.py b/pymysqlreplication/tests/test_basic.py index 2a6ec54a..d9b1a3f1 100644 --- a/pymysqlreplication/tests/test_basic.py +++ b/pymysqlreplication/tests/test_basic.py @@ -1012,9 +1012,9 @@ def setUp(self): only_events=(RandEvent, QueryEvent), fail_on_table_metadata_unavailable=True ) + self.execute("SET @@binlog_format='STATEMENT'") def test_rand_event(self): - self.execute("SET @@binlog_format='STATEMENT'") self.execute("CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data INT NOT NULL, PRIMARY KEY (id))") self.execute("INSERT INTO test (data) VALUES(RAND())") self.execute("COMMIT") From f267674b43bf6521d304980f14b293a679884f60 Mon Sep 17 00:00:00 2001 From: starcat37 Date: Sat, 29 Jul 2023 14:59:53 +0900 Subject: [PATCH 07/42] Chore: Delete unnecessary comment --- .gitignore | 3 ++- examples/dump_events.py | 4 +--- pymysqlreplication/binlogstream.py | 8 ++++---- 3 files changed, 7 insertions(+), 8 deletions(-) diff --git a/.gitignore b/.gitignore index 27e7b3c9..d22f7738 100644 --- a/.gitignore +++ b/.gitignore @@ -44,4 +44,5 @@ _build .python-version MANIFEST -venv/ \ No newline at end of file +# venv +venv/* \ No newline at end of file diff --git a/examples/dump_events.py b/examples/dump_events.py index 3287a5a9..1e99bcc0 100644 --- a/examples/dump_events.py +++ b/examples/dump_events.py @@ -22,9 +22,7 @@ def main(): stream = BinLogStreamReader(connection_settings=MYSQL_SETTINGS, server_id=3, blocking=True, - # log_file = "binlog.000022", - # log_pos = 1265, - resume_stream = True, # log-pos도 명시할 경우 + resume_stream = True, ) for binlogevent in stream: diff --git a/pymysqlreplication/binlogstream.py b/pymysqlreplication/binlogstream.py index e06760fe..485ec10b 100644 --- a/pymysqlreplication/binlogstream.py +++ b/pymysqlreplication/binlogstream.py @@ -347,9 +347,9 @@ def __connect_to_stream(self): + bytes(bytearray([COM_BINLOG_DUMP])) if self.__resume_stream: - prelude += struct.pack(' Date: Sat, 29 Jul 2023 15:03:22 +0900 Subject: [PATCH 08/42] Chore: delete unnecessary venv/* in .gitignore --- .gitignore | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/.gitignore b/.gitignore index d22f7738..224181fe 100644 --- a/.gitignore +++ b/.gitignore @@ -42,7 +42,4 @@ _build # Pyenv .python-version -MANIFEST - -# venv -venv/* \ No newline at end of file +MANIFEST \ No newline at end of file From bf9ec0bb7b60e317f519ad576323ad647181ef04 Mon Sep 17 00:00:00 2001 From: heehehe Date: Sat, 29 Jul 2023 15:05:29 +0900 Subject: [PATCH 09/42] comment: add rand_event warning description --- pymysqlreplication/event.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pymysqlreplication/event.py b/pymysqlreplication/event.py index 248735e0..ec72c6c9 100644 --- a/pymysqlreplication/event.py +++ b/pymysqlreplication/event.py @@ -440,6 +440,9 @@ class RandEvent(BinLogEvent): """ RandEvent is generated every time a statement uses the RAND() function. Indicates the seed values to use for generating a random number with RAND() in the next statement. + Warning + - RAND_EVENT only works in statement-based logging. (need to set binlog_format as 'STATEMENT') + - RAND_EVENT only works when the seed number is not specified. Attributes: seed1 From ccd5b867dc0ea6d7e4ebe979fdd326822f6e99d6 Mon Sep 17 00:00:00 2001 From: heehehe Date: Sat, 29 Jul 2023 15:07:17 +0900 Subject: [PATCH 10/42] refactor: change RandEvent import position --- .gitignore | 2 +- pymysqlreplication/binlogstream.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/.gitignore b/.gitignore index 224181fe..9a27e404 100644 --- a/.gitignore +++ b/.gitignore @@ -42,4 +42,4 @@ _build # Pyenv .python-version -MANIFEST \ No newline at end of file +MANIFEST diff --git a/pymysqlreplication/binlogstream.py b/pymysqlreplication/binlogstream.py index 485ec10b..2c300f16 100644 --- a/pymysqlreplication/binlogstream.py +++ b/pymysqlreplication/binlogstream.py @@ -14,8 +14,8 @@ QueryEvent, RotateEvent, FormatDescriptionEvent, XidEvent, GtidEvent, StopEvent, XAPrepareEvent, BeginLoadQueryEvent, ExecuteLoadQueryEvent, - RandEvent, - HeartbeatLogEvent, NotImplementedEvent, MariadbGtidEvent) + HeartbeatLogEvent, NotImplementedEvent, + MariadbGtidEvent, RandEvent) from .exceptions import BinLogNotEnabled from .row_event import ( UpdateRowsEvent, WriteRowsEvent, DeleteRowsEvent, TableMapEvent) From b12b7fe4d584aed06c7b201f23d77b475af04728 Mon Sep 17 00:00:00 2001 From: heehehe Date: Sat, 29 Jul 2023 15:12:29 +0900 Subject: [PATCH 11/42] revert: remove resume_stream=True in example --- examples/dump_events.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/examples/dump_events.py b/examples/dump_events.py index 1e99bcc0..e0de09f1 100644 --- a/examples/dump_events.py +++ b/examples/dump_events.py @@ -21,9 +21,7 @@ def main(): # the end of the stream stream = BinLogStreamReader(connection_settings=MYSQL_SETTINGS, server_id=3, - blocking=True, - resume_stream = True, - ) + blocking=True) for binlogevent in stream: binlogevent.dump() From 9f13346232cfadb9c4f6228d0c0a07efd582f2aa Mon Sep 17 00:00:00 2001 From: sean Date: Wed, 2 Aug 2023 16:20:01 +0900 Subject: [PATCH 12/42] test: add rand event seed type --- pymysqlreplication/tests/test_basic.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/pymysqlreplication/tests/test_basic.py b/pymysqlreplication/tests/test_basic.py index d9b1a3f1..9e16f5c3 100644 --- a/pymysqlreplication/tests/test_basic.py +++ b/pymysqlreplication/tests/test_basic.py @@ -1022,7 +1022,11 @@ def test_rand_event(self): self.assertEqual(self.bin_log_format(), "STATEMENT") self.assertIsInstance(self.stream.fetchone(), QueryEvent) self.assertIsInstance(self.stream.fetchone(), QueryEvent) - self.assertIsInstance(self.stream.fetchone(), RandEvent) + + expect_rand_event = self.stream.fetchone() + self.assertIsInstance(expect_rand_event, RandEvent) + self.assertEqual(type(expect_rand_event.seed1), int) + self.assertEqual(type(expect_rand_event.seed2), int) def tearDown(self): self.execute("SET @@binlog_format='ROW'") From dd8079c44c34632143753dcae78eaf776a8e68e7 Mon Sep 17 00:00:00 2001 From: mikaniz Date: Sat, 29 Jul 2023 14:47:48 +0900 Subject: [PATCH 13/42] test: add test_rand_event --- pymysqlreplication/tests/test_basic.py | 29 +++++++++++++++++++++++++- 1 file changed, 28 insertions(+), 1 deletion(-) diff --git a/pymysqlreplication/tests/test_basic.py b/pymysqlreplication/tests/test_basic.py index 396f8cbc..2a6ec54a 100644 --- a/pymysqlreplication/tests/test_basic.py +++ b/pymysqlreplication/tests/test_basic.py @@ -17,7 +17,7 @@ from pymysqlreplication.constants.BINLOG import * from pymysqlreplication.row_event import * -__all__ = ["TestBasicBinLogStreamReader", "TestMultipleRowBinLogStreamReader", "TestCTLConnectionSettings", "TestGtidBinLogStreamReader"] +__all__ = ["TestBasicBinLogStreamReader", "TestMultipleRowBinLogStreamReader", "TestCTLConnectionSettings", "TestGtidBinLogStreamReader", "TestStatementConnectionSetting"] class TestBasicBinLogStreamReader(base.PyMySQLReplicationTestCase): @@ -1002,6 +1002,33 @@ def test_parsing(self): gtid = Gtid("57b70f4e-20d3-11e5-a393-4a63946f7eac:1-:1") gtid = Gtid("57b70f4e-20d3-11e5-a393-4a63946f7eac::1") +class TestStatementConnectionSetting(base.PyMySQLReplicationTestCase): + def setUp(self): + super(TestStatementConnectionSetting, self).setUp() + self.stream.close() + self.stream = BinLogStreamReader( + self.database, + server_id=1024, + only_events=(RandEvent, QueryEvent), + fail_on_table_metadata_unavailable=True + ) + + def test_rand_event(self): + self.execute("SET @@binlog_format='STATEMENT'") + self.execute("CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data INT NOT NULL, PRIMARY KEY (id))") + self.execute("INSERT INTO test (data) VALUES(RAND())") + self.execute("COMMIT") + + self.assertEqual(self.bin_log_format(), "STATEMENT") + self.assertIsInstance(self.stream.fetchone(), QueryEvent) + self.assertIsInstance(self.stream.fetchone(), QueryEvent) + self.assertIsInstance(self.stream.fetchone(), RandEvent) + + def tearDown(self): + self.execute("SET @@binlog_format='ROW'") + self.assertEqual(self.bin_log_format(), "ROW") + super(TestStatementConnectionSetting, self).tearDown() + if __name__ == "__main__": import unittest From ff4d18443f8937598e9e34840083a3ca1056288d Mon Sep 17 00:00:00 2001 From: heehehe Date: Sat, 29 Jul 2023 14:55:47 +0900 Subject: [PATCH 14/42] test: move statement setting to setUp --- pymysqlreplication/tests/test_basic.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pymysqlreplication/tests/test_basic.py b/pymysqlreplication/tests/test_basic.py index 2a6ec54a..d9b1a3f1 100644 --- a/pymysqlreplication/tests/test_basic.py +++ b/pymysqlreplication/tests/test_basic.py @@ -1012,9 +1012,9 @@ def setUp(self): only_events=(RandEvent, QueryEvent), fail_on_table_metadata_unavailable=True ) + self.execute("SET @@binlog_format='STATEMENT'") def test_rand_event(self): - self.execute("SET @@binlog_format='STATEMENT'") self.execute("CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data INT NOT NULL, PRIMARY KEY (id))") self.execute("INSERT INTO test (data) VALUES(RAND())") self.execute("COMMIT") From baad3a947b0e450c16c593519598050ab04c876f Mon Sep 17 00:00:00 2001 From: starcat37 Date: Sat, 29 Jul 2023 14:59:53 +0900 Subject: [PATCH 15/42] Chore: Delete unnecessary comment --- .gitignore | 3 ++- examples/dump_events.py | 4 +--- pymysqlreplication/binlogstream.py | 8 ++++---- 3 files changed, 7 insertions(+), 8 deletions(-) diff --git a/.gitignore b/.gitignore index 27e7b3c9..d22f7738 100644 --- a/.gitignore +++ b/.gitignore @@ -44,4 +44,5 @@ _build .python-version MANIFEST -venv/ \ No newline at end of file +# venv +venv/* \ No newline at end of file diff --git a/examples/dump_events.py b/examples/dump_events.py index 3287a5a9..1e99bcc0 100644 --- a/examples/dump_events.py +++ b/examples/dump_events.py @@ -22,9 +22,7 @@ def main(): stream = BinLogStreamReader(connection_settings=MYSQL_SETTINGS, server_id=3, blocking=True, - # log_file = "binlog.000022", - # log_pos = 1265, - resume_stream = True, # log-pos도 명시할 경우 + resume_stream = True, ) for binlogevent in stream: diff --git a/pymysqlreplication/binlogstream.py b/pymysqlreplication/binlogstream.py index e06760fe..485ec10b 100644 --- a/pymysqlreplication/binlogstream.py +++ b/pymysqlreplication/binlogstream.py @@ -347,9 +347,9 @@ def __connect_to_stream(self): + bytes(bytearray([COM_BINLOG_DUMP])) if self.__resume_stream: - prelude += struct.pack(' Date: Sat, 29 Jul 2023 15:03:22 +0900 Subject: [PATCH 16/42] Chore: delete unnecessary venv/* in .gitignore --- .gitignore | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/.gitignore b/.gitignore index d22f7738..224181fe 100644 --- a/.gitignore +++ b/.gitignore @@ -42,7 +42,4 @@ _build # Pyenv .python-version -MANIFEST - -# venv -venv/* \ No newline at end of file +MANIFEST \ No newline at end of file From af6425d79536360ca29345fc75bce3fc0b099e2f Mon Sep 17 00:00:00 2001 From: heehehe Date: Sat, 29 Jul 2023 15:05:29 +0900 Subject: [PATCH 17/42] comment: add rand_event warning description --- pymysqlreplication/event.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pymysqlreplication/event.py b/pymysqlreplication/event.py index 248735e0..ec72c6c9 100644 --- a/pymysqlreplication/event.py +++ b/pymysqlreplication/event.py @@ -440,6 +440,9 @@ class RandEvent(BinLogEvent): """ RandEvent is generated every time a statement uses the RAND() function. Indicates the seed values to use for generating a random number with RAND() in the next statement. + Warning + - RAND_EVENT only works in statement-based logging. (need to set binlog_format as 'STATEMENT') + - RAND_EVENT only works when the seed number is not specified. Attributes: seed1 From 243f099bcec610337c238298c780cb89c83baf32 Mon Sep 17 00:00:00 2001 From: heehehe Date: Sat, 29 Jul 2023 15:07:17 +0900 Subject: [PATCH 18/42] refactor: change RandEvent import position --- .gitignore | 2 +- pymysqlreplication/binlogstream.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/.gitignore b/.gitignore index 224181fe..9a27e404 100644 --- a/.gitignore +++ b/.gitignore @@ -42,4 +42,4 @@ _build # Pyenv .python-version -MANIFEST \ No newline at end of file +MANIFEST diff --git a/pymysqlreplication/binlogstream.py b/pymysqlreplication/binlogstream.py index 485ec10b..2c300f16 100644 --- a/pymysqlreplication/binlogstream.py +++ b/pymysqlreplication/binlogstream.py @@ -14,8 +14,8 @@ QueryEvent, RotateEvent, FormatDescriptionEvent, XidEvent, GtidEvent, StopEvent, XAPrepareEvent, BeginLoadQueryEvent, ExecuteLoadQueryEvent, - RandEvent, - HeartbeatLogEvent, NotImplementedEvent, MariadbGtidEvent) + HeartbeatLogEvent, NotImplementedEvent, + MariadbGtidEvent, RandEvent) from .exceptions import BinLogNotEnabled from .row_event import ( UpdateRowsEvent, WriteRowsEvent, DeleteRowsEvent, TableMapEvent) From 2edca88b5499bb1b9690ed48b2cb4fa42d4ec302 Mon Sep 17 00:00:00 2001 From: heehehe Date: Sat, 29 Jul 2023 15:12:29 +0900 Subject: [PATCH 19/42] revert: remove resume_stream=True in example --- examples/dump_events.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/examples/dump_events.py b/examples/dump_events.py index 1e99bcc0..e0de09f1 100644 --- a/examples/dump_events.py +++ b/examples/dump_events.py @@ -21,9 +21,7 @@ def main(): # the end of the stream stream = BinLogStreamReader(connection_settings=MYSQL_SETTINGS, server_id=3, - blocking=True, - resume_stream = True, - ) + blocking=True) for binlogevent in stream: binlogevent.dump() From 6fda5457c760ec8d7d021dbe36fccd218a84191c Mon Sep 17 00:00:00 2001 From: sean Date: Wed, 2 Aug 2023 16:20:01 +0900 Subject: [PATCH 20/42] test: add rand event seed type --- pymysqlreplication/tests/test_basic.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/pymysqlreplication/tests/test_basic.py b/pymysqlreplication/tests/test_basic.py index d9b1a3f1..9e16f5c3 100644 --- a/pymysqlreplication/tests/test_basic.py +++ b/pymysqlreplication/tests/test_basic.py @@ -1022,7 +1022,11 @@ def test_rand_event(self): self.assertEqual(self.bin_log_format(), "STATEMENT") self.assertIsInstance(self.stream.fetchone(), QueryEvent) self.assertIsInstance(self.stream.fetchone(), QueryEvent) - self.assertIsInstance(self.stream.fetchone(), RandEvent) + + expect_rand_event = self.stream.fetchone() + self.assertIsInstance(expect_rand_event, RandEvent) + self.assertEqual(type(expect_rand_event.seed1), int) + self.assertEqual(type(expect_rand_event.seed2), int) def tearDown(self): self.execute("SET @@binlog_format='ROW'") From 66c606987b7c1f256dddbc24e5b15f20ecf8e8e8 Mon Sep 17 00:00:00 2001 From: heehehe Date: Sat, 5 Aug 2023 13:26:37 +0900 Subject: [PATCH 21/42] comment: add description for attributes --- pymysqlreplication/event.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pymysqlreplication/event.py b/pymysqlreplication/event.py index ec72c6c9..7150f203 100644 --- a/pymysqlreplication/event.py +++ b/pymysqlreplication/event.py @@ -445,8 +445,8 @@ class RandEvent(BinLogEvent): - RAND_EVENT only works when the seed number is not specified. Attributes: - seed1 - seed2 + seed1: value for the first seed + seed2: value for the second seed """ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs): From 7aa81b45b46e2ae42b3a7447691de058ab5913b1 Mon Sep 17 00:00:00 2001 From: mjs Date: Mon, 7 Aug 2023 22:14:21 +0900 Subject: [PATCH 22/42] Feat: implement UserVarEvent --- pymysqlreplication/binlogstream.py | 5 ++-- pymysqlreplication/event.py | 39 ++++++++++++++++++++++++++++++ pymysqlreplication/packet.py | 1 + 3 files changed, 43 insertions(+), 2 deletions(-) diff --git a/pymysqlreplication/binlogstream.py b/pymysqlreplication/binlogstream.py index 2c300f16..136a5c06 100644 --- a/pymysqlreplication/binlogstream.py +++ b/pymysqlreplication/binlogstream.py @@ -15,7 +15,7 @@ XidEvent, GtidEvent, StopEvent, XAPrepareEvent, BeginLoadQueryEvent, ExecuteLoadQueryEvent, HeartbeatLogEvent, NotImplementedEvent, - MariadbGtidEvent, RandEvent) + MariadbGtidEvent, RandEvent, UserVarEvent) from .exceptions import BinLogNotEnabled from .row_event import ( UpdateRowsEvent, WriteRowsEvent, DeleteRowsEvent, TableMapEvent) @@ -602,7 +602,8 @@ def _allowed_event_list(self, only_events, ignored_events, HeartbeatLogEvent, NotImplementedEvent, MariadbGtidEvent, - RandEvent + RandEvent, + UserVarEvent )) if ignored_events is not None: for e in ignored_events: diff --git a/pymysqlreplication/event.py b/pymysqlreplication/event.py index 7150f203..34d391c9 100644 --- a/pymysqlreplication/event.py +++ b/pymysqlreplication/event.py @@ -471,6 +471,45 @@ def _dump(self): print("seed1: %d" % (self.seed1)) print("seed2: %d" % (self.seed2)) +class UserVarEvent(BinLogEvent): + """ + UserVarEvent is generated every time a statement uses a user variable. + Indicates the value to use for the user variable in the next statement. + + :ivar name - User variable name. + :ivar value - Value of the user variable. + :ivar type - Type of the user variable. + :ivar charset - The number of the character set for the user variable. + :ivar is_null - Non-zero if the variable value is the SQL NULL value, 0 otherwise. + :ivar flags - Extra flags associated with the user variable. + """ + + def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs): + super(UserVarEvent, self).__init__(from_packet, event_size, table_map, ctl_connection, **kwargs) + + # Payload + self.name_len = self.packet.read_uint32() + self.name = self.packet.read(self.name_len).decode() + self.is_null = self.packet.read_uint8() + + if not self.is_null: + self.type = self.packet.read_uint8() + self.charset = self.packet.read_uint32() + self.value_len = self.packet.read_uint32() + self.value = self.packet.read(self.value_len).decode() + self.flags = self.packet.read_uint8() + + def _dump(self): + super(UserVarEvent, self)._dump() + print("User variable name: %s" % self.name) + print("Is NULL: %s" % ("Yes" if self.is_null else "No")) + if not self.is_null: + print("Type: %s" % self.type) + print("Charset: %s" % self.charset) + print("Value: %s" % self.value) + if self.flags is not None: + print("Flags: %s" % self.flags) + class NotImplementedEvent(BinLogEvent): def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs): super(NotImplementedEvent, self).__init__( diff --git a/pymysqlreplication/packet.py b/pymysqlreplication/packet.py index 59b575ae..b11e3d8c 100644 --- a/pymysqlreplication/packet.py +++ b/pymysqlreplication/packet.py @@ -71,6 +71,7 @@ class BinLogPacketWrapper(object): constants.HEARTBEAT_LOG_EVENT: event.HeartbeatLogEvent, constants.XA_PREPARE_EVENT: event.XAPrepareEvent, constants.RAND_EVENT: event.RandEvent, + constants.USER_VAR_EVENT: event.UserVarEvent, # row_event constants.UPDATE_ROWS_EVENT_V1: row_event.UpdateRowsEvent, constants.WRITE_ROWS_EVENT_V1: row_event.WriteRowsEvent, From 795f12a25ca5fe480bcc02371bfef19674661740 Mon Sep 17 00:00:00 2001 From: mjs Date: Mon, 7 Aug 2023 22:25:19 +0900 Subject: [PATCH 23/42] fix: change number of testcases --- pymysqlreplication/tests/test_basic.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pymysqlreplication/tests/test_basic.py b/pymysqlreplication/tests/test_basic.py index 9e16f5c3..0cc04ac2 100644 --- a/pymysqlreplication/tests/test_basic.py +++ b/pymysqlreplication/tests/test_basic.py @@ -25,9 +25,9 @@ def ignoredEvents(self): return [GtidEvent] def test_allowed_event_list(self): - self.assertEqual(len(self.stream._allowed_event_list(None, None, False)), 17) - self.assertEqual(len(self.stream._allowed_event_list(None, None, True)), 16) - self.assertEqual(len(self.stream._allowed_event_list(None, [RotateEvent], False)), 16) + self.assertEqual(len(self.stream._allowed_event_list(None, None, False)), 18) + self.assertEqual(len(self.stream._allowed_event_list(None, None, True)), 17) + self.assertEqual(len(self.stream._allowed_event_list(None, [RotateEvent], False)), 17) self.assertEqual(len(self.stream._allowed_event_list([RotateEvent], None, False)), 1) def test_read_query_event(self): From 9a985d6648ee453f06719ed39de912ea2fe41d62 Mon Sep 17 00:00:00 2001 From: heehehe Date: Mon, 7 Aug 2023 22:57:44 +0900 Subject: [PATCH 24/42] docs: add name_len ivar comment --- pymysqlreplication/event.py | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/pymysqlreplication/event.py b/pymysqlreplication/event.py index 34d391c9..8ade0c53 100644 --- a/pymysqlreplication/event.py +++ b/pymysqlreplication/event.py @@ -440,15 +440,14 @@ class RandEvent(BinLogEvent): """ RandEvent is generated every time a statement uses the RAND() function. Indicates the seed values to use for generating a random number with RAND() in the next statement. - Warning - - RAND_EVENT only works in statement-based logging. (need to set binlog_format as 'STATEMENT') - - RAND_EVENT only works when the seed number is not specified. - Attributes: - seed1: value for the first seed - seed2: value for the second seed + RandEvent only works in statement-based logging (need to set binlog_format as 'STATEMENT') + and only works when the seed number is not specified. + + :ivar seed1: int - value for the first seed + :ivar seed2: int - value for the second seed """ - + def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs): super(RandEvent, self).__init__(from_packet, event_size, table_map, ctl_connection, **kwargs) @@ -476,12 +475,13 @@ class UserVarEvent(BinLogEvent): UserVarEvent is generated every time a statement uses a user variable. Indicates the value to use for the user variable in the next statement. - :ivar name - User variable name. - :ivar value - Value of the user variable. - :ivar type - Type of the user variable. - :ivar charset - The number of the character set for the user variable. - :ivar is_null - Non-zero if the variable value is the SQL NULL value, 0 otherwise. - :ivar flags - Extra flags associated with the user variable. + :ivar name_len: int - Length of user variable + :ivar name: str - User variable name + :ivar value: str - Value of the user variable + :ivar type: int - Type of the user variable + :ivar charset: int - The number of the character set for the user variable + :ivar is_null: int - Non-zero if the variable value is the SQL NULL value, 0 otherwise + :ivar flags: int - Extra flags associated with the user variable """ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs): From f3b00126904b1d3f1ac648fd009486856f9d39fe Mon Sep 17 00:00:00 2001 From: heehehe Date: Mon, 7 Aug 2023 22:58:16 +0900 Subject: [PATCH 25/42] test: add user_var_event test --- pymysqlreplication/tests/test_basic.py | 29 +++++++++++++++++++++----- 1 file changed, 24 insertions(+), 5 deletions(-) diff --git a/pymysqlreplication/tests/test_basic.py b/pymysqlreplication/tests/test_basic.py index 0cc04ac2..2c97296d 100644 --- a/pymysqlreplication/tests/test_basic.py +++ b/pymysqlreplication/tests/test_basic.py @@ -1009,7 +1009,7 @@ def setUp(self): self.stream = BinLogStreamReader( self.database, server_id=1024, - only_events=(RandEvent, QueryEvent), + only_events=(RandEvent, UserVarEvent, QueryEvent), fail_on_table_metadata_unavailable=True ) self.execute("SET @@binlog_format='STATEMENT'") @@ -1023,10 +1023,29 @@ def test_rand_event(self): self.assertIsInstance(self.stream.fetchone(), QueryEvent) self.assertIsInstance(self.stream.fetchone(), QueryEvent) - expect_rand_event = self.stream.fetchone() - self.assertIsInstance(expect_rand_event, RandEvent) - self.assertEqual(type(expect_rand_event.seed1), int) - self.assertEqual(type(expect_rand_event.seed2), int) + expected_rand_event = self.stream.fetchone() + self.assertIsInstance(expected_rand_event, RandEvent) + self.assertEqual(type(expected_rand_event.seed1), int) + self.assertEqual(type(expected_rand_event.seed2), int) + + def test_user_var_event(self): + self.execute("CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR(50), PRIMARY KEY (id))") + self.execute("SET @test_user_var = 'foo'") + self.execute("INSERT INTO test (data) VALUES(@test_user_var)") + self.execute("COMMIT") + + self.assertEqual(self.bin_log_format(), "STATEMENT") + self.assertIsInstance(self.stream.fetchone(), QueryEvent) + self.assertIsInstance(self.stream.fetchone(), QueryEvent) + + expected_user_var_event = self.stream.fetchone() + self.assertIsInstance(expected_user_var_event, UserVarEvent) + self.assertEqual(type(expected_user_var_event.name_len), int) + self.assertEqual(expected_user_var_event.name, "test_user_var") + self.assertEqual(expected_user_var_event.value, "foo") + self.assertEqual(expected_user_var_event.is_null, 0) + self.assertEqual(expected_user_var_event.type, 0) + self.assertEqual(expected_user_var_event.charset, 33) def tearDown(self): self.execute("SET @@binlog_format='ROW'") From 832a8874d709d9211172e215dfe421b96b69be18 Mon Sep 17 00:00:00 2001 From: mjs Date: Sat, 12 Aug 2023 09:49:27 +0900 Subject: [PATCH 26/42] Add type_codes for UserVarEvent --- pymysqlreplication/event.py | 29 ++++++++++++++++++++++++++--- 1 file changed, 26 insertions(+), 3 deletions(-) diff --git a/pymysqlreplication/event.py b/pymysqlreplication/event.py index 8ade0c53..2795e924 100644 --- a/pymysqlreplication/event.py +++ b/pymysqlreplication/event.py @@ -484,6 +484,14 @@ class UserVarEvent(BinLogEvent): :ivar flags: int - Extra flags associated with the user variable """ + type_codes = { + 0x00: 'STRING_RESULT', + 0x01: 'REAL_RESULT', + 0x02: 'INT_RESULT', + 0x03: 'ROW_RESULT', + 0x04: 'DECIMAL_RESULT', + } + def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs): super(UserVarEvent, self).__init__(from_packet, event_size, table_map, ctl_connection, **kwargs) @@ -496,15 +504,30 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs) self.type = self.packet.read_uint8() self.charset = self.packet.read_uint32() self.value_len = self.packet.read_uint32() - self.value = self.packet.read(self.value_len).decode() + if self.type == 0x00: + self.value = self.packet.read(self.value_len).decode() + elif self.type == 0x01: + self.value = str(struct.unpack('d', self.packet.read(8))[0]) + elif self.type == 0x02: + self.value = str(self.packet.read_uint_by_size(self.value_len)) + elif self.type == 0x04: + raw_decimal = self.packet.read(self.value_len) + self.value = binascii.hexlify(raw_decimal).decode('ascii') + else: + self.value = self.packet.read(self.value_len) self.flags = self.packet.read_uint8() - + else: + self.type = None + self.charset = None + self.value = None + self.flags = None + def _dump(self): super(UserVarEvent, self)._dump() print("User variable name: %s" % self.name) print("Is NULL: %s" % ("Yes" if self.is_null else "No")) if not self.is_null: - print("Type: %s" % self.type) + print("Type: %s" % self.type_codes.get(self.type, 'UNKNOWN_TYPE')) print("Charset: %s" % self.charset) print("Value: %s" % self.value) if self.flags is not None: From e7c1b10e2022dbb0ac4929830ab103227e60ba9a Mon Sep 17 00:00:00 2001 From: mjs Date: Wed, 16 Aug 2023 20:18:40 +0900 Subject: [PATCH 27/42] Refactor UserVarEvent and add decimal parsing --- pymysqlreplication/event.py | 87 ++++++++++++++++++++++++++++++------- 1 file changed, 72 insertions(+), 15 deletions(-) diff --git a/pymysqlreplication/event.py b/pymysqlreplication/event.py index 2795e924..8caf1bf8 100644 --- a/pymysqlreplication/event.py +++ b/pymysqlreplication/event.py @@ -3,6 +3,7 @@ import binascii import struct import datetime +import decimal from pymysqlreplication.constants.STATUS_VAR_KEY import * from pymysqlreplication.exceptions import StatusVariableMismatch @@ -504,24 +505,80 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs) self.type = self.packet.read_uint8() self.charset = self.packet.read_uint32() self.value_len = self.packet.read_uint32() - if self.type == 0x00: - self.value = self.packet.read(self.value_len).decode() - elif self.type == 0x01: - self.value = str(struct.unpack('d', self.packet.read(8))[0]) - elif self.type == 0x02: - self.value = str(self.packet.read_uint_by_size(self.value_len)) - elif self.type == 0x04: - raw_decimal = self.packet.read(self.value_len) - self.value = binascii.hexlify(raw_decimal).decode('ascii') - else: - self.value = self.packet.read(self.value_len) + + type_to_method = { + 0x00: self._read_string, + 0x01: self._read_real, + 0x02: self._read_int, + 0x04: self._read_decimal + } + + self.value = type_to_method.get(self.type, self._read_default)() self.flags = self.packet.read_uint8() else: - self.type = None - self.charset = None - self.value = None - self.flags = None + self.type, self.charset, self.value, self.flags = None, None, None, None + + def _read_string(self): + return self.packet.read(self.value_len).decode() + + def _read_real(self): + return str(struct.unpack('d', self.packet.read(8))[0]) + + def _read_int(self): + return str(self.packet.read_uint_by_size(self.value_len)) + + def _read_decimal(self): + self.precision = self.packet.read_uint8() + self.decimals = self.packet.read_uint8() + raw_decimal = self.packet.read(self.value_len) + return self._parse_decimal_from_bytes(raw_decimal, self.precision, self.decimals) + + def _read_default(self): + return self.packet.read(self.value_len) + + @staticmethod + def _parse_decimal_from_bytes(raw_decimal, precision, decimals): + digits_per_integer = 9 + compressed_bytes = [0, 1, 1, 2, 2, 3, 3, 4, 4, 4] + integral = precision - decimals + + uncomp_integral, comp_integral = divmod(integral, digits_per_integer) + uncomp_fractional, comp_fractional = divmod(decimals, digits_per_integer) + + res = "-" if not raw_decimal[0] & 0x80 else "" + mask = -1 if res == "-" else 0 + raw_decimal = bytearray([raw_decimal[0] ^ 0x80]) + raw_decimal[1:] + + def decode_decimal_decompress_value(comp_indx, data, mask): + size = compressed_bytes[comp_indx] + if size > 0: + databuff = bytearray(data[:size]) + for i in range(size): + databuff[i] ^= mask + return size, int.from_bytes(databuff, byteorder='big') + return 0, 0 + + pointer, value = decode_decimal_decompress_value(comp_integral, raw_decimal, mask) + res += str(value) + + for _ in range(uncomp_integral): + value = struct.unpack('>i', raw_decimal[pointer:pointer+4])[0] ^ mask + res += '%09d' % value + pointer += 4 + + res += "." + + for _ in range(uncomp_fractional): + value = struct.unpack('>i', raw_decimal[pointer:pointer+4])[0] ^ mask + res += '%09d' % value + pointer += 4 + + size, value = decode_decimal_decompress_value(comp_fractional, raw_decimal[pointer:], mask) + if size > 0: + res += '%0*d' % (comp_fractional, value) + return decimal.Decimal(res) + def _dump(self): super(UserVarEvent, self)._dump() print("User variable name: %s" % self.name) From e7cecc4a13809195c198788214fd9079ce1036c6 Mon Sep 17 00:00:00 2001 From: heehehe Date: Sat, 19 Aug 2023 10:13:29 +0900 Subject: [PATCH 28/42] test: add user_var_event tests by types --- pymysqlreplication/tests/test_basic.py | 61 +++++++++++++++++++++++++- 1 file changed, 59 insertions(+), 2 deletions(-) diff --git a/pymysqlreplication/tests/test_basic.py b/pymysqlreplication/tests/test_basic.py index 2c97296d..cf9d3aa8 100644 --- a/pymysqlreplication/tests/test_basic.py +++ b/pymysqlreplication/tests/test_basic.py @@ -1028,7 +1028,7 @@ def test_rand_event(self): self.assertEqual(type(expected_rand_event.seed1), int) self.assertEqual(type(expected_rand_event.seed2), int) - def test_user_var_event(self): + def test_user_var_string_event(self): self.execute("CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR(50), PRIMARY KEY (id))") self.execute("SET @test_user_var = 'foo'") self.execute("INSERT INTO test (data) VALUES(@test_user_var)") @@ -1040,13 +1040,70 @@ def test_user_var_event(self): expected_user_var_event = self.stream.fetchone() self.assertIsInstance(expected_user_var_event, UserVarEvent) - self.assertEqual(type(expected_user_var_event.name_len), int) + self.assertIsInstance(expected_user_var_event.name_len, int) self.assertEqual(expected_user_var_event.name, "test_user_var") self.assertEqual(expected_user_var_event.value, "foo") self.assertEqual(expected_user_var_event.is_null, 0) self.assertEqual(expected_user_var_event.type, 0) self.assertEqual(expected_user_var_event.charset, 33) + def test_user_var_real_event(self): + self.execute("CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR(50), PRIMARY KEY (id))") + self.execute("SET @test_user_var = @@timestamp") + self.execute("INSERT INTO test (data) VALUES(@test_user_var)") + self.execute("COMMIT") + + self.assertEqual(self.bin_log_format(), "STATEMENT") + self.assertIsInstance(self.stream.fetchone(), QueryEvent) + self.assertIsInstance(self.stream.fetchone(), QueryEvent) + + expected_user_var_event = self.stream.fetchone() + self.assertIsInstance(expected_user_var_event, UserVarEvent) + self.assertIsInstance(expected_user_var_event.name_len, int) + self.assertEqual(expected_user_var_event.name, "test_user_var") + self.assertIsInstance(expected_user_var_event.value, str) + self.assertEqual(expected_user_var_event.is_null, 0) + self.assertEqual(expected_user_var_event.type, 1) + self.assertEqual(expected_user_var_event.charset, 33) + + def test_user_var_int_event(self): + self.execute("CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR(50), PRIMARY KEY (id))") + self.execute("SET @test_user_var = 5") + self.execute("INSERT INTO test (data) VALUES(@test_user_var)") + self.execute("COMMIT") + + self.assertEqual(self.bin_log_format(), "STATEMENT") + self.assertIsInstance(self.stream.fetchone(), QueryEvent) + self.assertIsInstance(self.stream.fetchone(), QueryEvent) + + expected_user_var_event = self.stream.fetchone() + self.assertIsInstance(expected_user_var_event, UserVarEvent) + self.assertIsInstance(expected_user_var_event.name_len, int) + self.assertEqual(expected_user_var_event.name, "test_user_var") + self.assertEqual(expected_user_var_event.value, '5') # TODO: have to change int 5 + self.assertEqual(expected_user_var_event.is_null, 0) + self.assertEqual(expected_user_var_event.type, 2) + self.assertEqual(expected_user_var_event.charset, 33) + + def test_user_var_int_event(self): + self.execute("CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR(50), PRIMARY KEY (id))") + self.execute("SET @test_user_var = 5.25") + self.execute("INSERT INTO test (data) VALUES(@test_user_var)") + self.execute("COMMIT") + + self.assertEqual(self.bin_log_format(), "STATEMENT") + self.assertIsInstance(self.stream.fetchone(), QueryEvent) + self.assertIsInstance(self.stream.fetchone(), QueryEvent) + + expected_user_var_event = self.stream.fetchone() + self.assertIsInstance(expected_user_var_event, UserVarEvent) + self.assertIsInstance(expected_user_var_event.name_len, int) + self.assertEqual(expected_user_var_event.name, "test_user_var") + self.assertEqual(expected_user_var_event.value, 5.25) + self.assertEqual(expected_user_var_event.is_null, 0) + self.assertEqual(expected_user_var_event.type, 4) + self.assertEqual(expected_user_var_event.charset, 33) + def tearDown(self): self.execute("SET @@binlog_format='ROW'") self.assertEqual(self.bin_log_format(), "ROW") From 8c5d321ba36bc90c7d88da4fe64b028300f5acb5 Mon Sep 17 00:00:00 2001 From: heehehe Date: Sat, 19 Aug 2023 10:17:54 +0900 Subject: [PATCH 29/42] test: fix int / decimal table creation --- pymysqlreplication/tests/test_basic.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/pymysqlreplication/tests/test_basic.py b/pymysqlreplication/tests/test_basic.py index cf9d3aa8..47081843 100644 --- a/pymysqlreplication/tests/test_basic.py +++ b/pymysqlreplication/tests/test_basic.py @@ -1048,7 +1048,7 @@ def test_user_var_string_event(self): self.assertEqual(expected_user_var_event.charset, 33) def test_user_var_real_event(self): - self.execute("CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR(50), PRIMARY KEY (id))") + self.execute("CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data REAL, PRIMARY KEY (id))") self.execute("SET @test_user_var = @@timestamp") self.execute("INSERT INTO test (data) VALUES(@test_user_var)") self.execute("COMMIT") @@ -1067,7 +1067,7 @@ def test_user_var_real_event(self): self.assertEqual(expected_user_var_event.charset, 33) def test_user_var_int_event(self): - self.execute("CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR(50), PRIMARY KEY (id))") + self.execute("CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data INT, PRIMARY KEY (id))") self.execute("SET @test_user_var = 5") self.execute("INSERT INTO test (data) VALUES(@test_user_var)") self.execute("COMMIT") @@ -1080,13 +1080,13 @@ def test_user_var_int_event(self): self.assertIsInstance(expected_user_var_event, UserVarEvent) self.assertIsInstance(expected_user_var_event.name_len, int) self.assertEqual(expected_user_var_event.name, "test_user_var") - self.assertEqual(expected_user_var_event.value, '5') # TODO: have to change int 5 + self.assertEqual(expected_user_var_event.value, 5) self.assertEqual(expected_user_var_event.is_null, 0) self.assertEqual(expected_user_var_event.type, 2) self.assertEqual(expected_user_var_event.charset, 33) - def test_user_var_int_event(self): - self.execute("CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR(50), PRIMARY KEY (id))") + def test_user_var_decimal_event(self): + self.execute("CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data DECIMAL, PRIMARY KEY (id))") self.execute("SET @test_user_var = 5.25") self.execute("INSERT INTO test (data) VALUES(@test_user_var)") self.execute("COMMIT") From 0c2a4bd054ebc04e0691f8e879e84210ee444ef6 Mon Sep 17 00:00:00 2001 From: heehehe Date: Sat, 19 Aug 2023 10:23:08 +0900 Subject: [PATCH 30/42] test: fix int_event error --- pymysqlreplication/tests/test_basic.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pymysqlreplication/tests/test_basic.py b/pymysqlreplication/tests/test_basic.py index 47081843..9440beed 100644 --- a/pymysqlreplication/tests/test_basic.py +++ b/pymysqlreplication/tests/test_basic.py @@ -1080,7 +1080,7 @@ def test_user_var_int_event(self): self.assertIsInstance(expected_user_var_event, UserVarEvent) self.assertIsInstance(expected_user_var_event.name_len, int) self.assertEqual(expected_user_var_event.name, "test_user_var") - self.assertEqual(expected_user_var_event.value, 5) + self.assertEqual(expected_user_var_event.value, '5') # TODO: have to fix to int 5 self.assertEqual(expected_user_var_event.is_null, 0) self.assertEqual(expected_user_var_event.type, 2) self.assertEqual(expected_user_var_event.charset, 33) From b13f1dd6e687c87d0c5854179ccfea6ac97daf62 Mon Sep 17 00:00:00 2001 From: mjs Date: Sun, 20 Aug 2023 14:33:57 +0900 Subject: [PATCH 31/42] fix unpacking of REAL_RESULT and adjust assertion --- pymysqlreplication/event.py | 2 +- pymysqlreplication/tests/test_basic.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pymysqlreplication/event.py b/pymysqlreplication/event.py index 8caf1bf8..442d86c9 100644 --- a/pymysqlreplication/event.py +++ b/pymysqlreplication/event.py @@ -522,7 +522,7 @@ def _read_string(self): return self.packet.read(self.value_len).decode() def _read_real(self): - return str(struct.unpack('d', self.packet.read(8))[0]) + return struct.unpack(' Date: Mon, 21 Aug 2023 01:15:32 +0900 Subject: [PATCH 32/42] feat: modify read int by using packet.read --- pymysqlreplication/event.py | 2 +- pymysqlreplication/tests/test_basic.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pymysqlreplication/event.py b/pymysqlreplication/event.py index 442d86c9..72d188c3 100644 --- a/pymysqlreplication/event.py +++ b/pymysqlreplication/event.py @@ -525,7 +525,7 @@ def _read_real(self): return struct.unpack(' Date: Sat, 26 Aug 2023 13:29:55 +0900 Subject: [PATCH 33/42] refactor : type code and type method --- pymysqlreplication/event.py | 34 ++++++++++++---------------------- 1 file changed, 12 insertions(+), 22 deletions(-) diff --git a/pymysqlreplication/event.py b/pymysqlreplication/event.py index 72d188c3..8afcd964 100644 --- a/pymysqlreplication/event.py +++ b/pymysqlreplication/event.py @@ -485,14 +485,6 @@ class UserVarEvent(BinLogEvent): :ivar flags: int - Extra flags associated with the user variable """ - type_codes = { - 0x00: 'STRING_RESULT', - 0x01: 'REAL_RESULT', - 0x02: 'INT_RESULT', - 0x03: 'ROW_RESULT', - 0x04: 'DECIMAL_RESULT', - } - def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs): super(UserVarEvent, self).__init__(from_packet, event_size, table_map, ctl_connection, **kwargs) @@ -500,20 +492,20 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs) self.name_len = self.packet.read_uint32() self.name = self.packet.read(self.name_len).decode() self.is_null = self.packet.read_uint8() + self.type_to_codes_and_method = { + 0x00: ['STRING_RESULT', self._read_string], + 0x01: ['REAL_RESULT', self._read_real], + 0x02: ['INT_RESULT', self._read_int], + 0x03: ['ROW_RESULT', self._read_default], + 0x04: ['DECIMAL_RESULT', self._read_decimal] + } if not self.is_null: self.type = self.packet.read_uint8() self.charset = self.packet.read_uint32() self.value_len = self.packet.read_uint32() - type_to_method = { - 0x00: self._read_string, - 0x01: self._read_real, - 0x02: self._read_int, - 0x04: self._read_decimal - } - - self.value = type_to_method.get(self.type, self._read_default)() + self.value = self.type_to_codes_and_method.get(self.type, ["UNKNOWN_RESULT", self._read_default])[1]() self.flags = self.packet.read_uint8() else: self.type, self.charset, self.value, self.flags = None, None, None, None @@ -562,14 +554,14 @@ def decode_decimal_decompress_value(comp_indx, data, mask): res += str(value) for _ in range(uncomp_integral): - value = struct.unpack('>i', raw_decimal[pointer:pointer+4])[0] ^ mask + value = struct.unpack('>i', raw_decimal[pointer:pointer + 4])[0] ^ mask res += '%09d' % value pointer += 4 res += "." for _ in range(uncomp_fractional): - value = struct.unpack('>i', raw_decimal[pointer:pointer+4])[0] ^ mask + value = struct.unpack('>i', raw_decimal[pointer:pointer + 4])[0] ^ mask res += '%09d' % value pointer += 4 @@ -578,17 +570,15 @@ def decode_decimal_decompress_value(comp_indx, data, mask): res += '%0*d' % (comp_fractional, value) return decimal.Decimal(res) - def _dump(self): super(UserVarEvent, self)._dump() print("User variable name: %s" % self.name) print("Is NULL: %s" % ("Yes" if self.is_null else "No")) if not self.is_null: - print("Type: %s" % self.type_codes.get(self.type, 'UNKNOWN_TYPE')) + print("Type: %s" % self.type_to_codes_and_method.get(self.type, ['UNKNOWN_TYPE'])[0]) print("Charset: %s" % self.charset) print("Value: %s" % self.value) - if self.flags is not None: - print("Flags: %s" % self.flags) + print("Flags: %s" % self.flags) class NotImplementedEvent(BinLogEvent): def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs): From 4dabf1451fa76fd2fbf0ae0576d402708c71d767 Mon Sep 17 00:00:00 2001 From: heehehe Date: Sat, 26 Aug 2023 23:50:30 +0900 Subject: [PATCH 34/42] fix: modify negative decimal extract process --- pymysqlreplication/event.py | 57 +++++++++++++++++++++++++++++++++++-- 1 file changed, 54 insertions(+), 3 deletions(-) diff --git a/pymysqlreplication/event.py b/pymysqlreplication/event.py index 8afcd964..6b113811 100644 --- a/pymysqlreplication/event.py +++ b/pymysqlreplication/event.py @@ -52,7 +52,6 @@ def _dump(self): """Core data dumped for the event""" pass - class GtidEvent(BinLogEvent): """GTID change in binlog event """ @@ -522,8 +521,9 @@ def _read_int(self): def _read_decimal(self): self.precision = self.packet.read_uint8() self.decimals = self.packet.read_uint8() - raw_decimal = self.packet.read(self.value_len) - return self._parse_decimal_from_bytes(raw_decimal, self.precision, self.decimals) + # raw_decimal = self.packet.read(self.value_len) + # return self._parse_decimal_from_bytes(raw_decimal, self.precision, self.decimals) + return self.__read_new_decimal(self.precision, self.decimals) def _read_default(self): return self.packet.read(self.value_len) @@ -570,6 +570,57 @@ def decode_decimal_decompress_value(comp_indx, data, mask): res += '%0*d' % (comp_fractional, value) return decimal.Decimal(res) + + def __read_new_decimal(self, precision, decimals): + """Read MySQL's new decimal format introduced in MySQL 5""" + + # This project was a great source of inspiration for + # understanding this storage format. + # https://github.com/jeremycole/mysql_binlog + + digits_per_integer = 9 + compressed_bytes = [0, 1, 1, 2, 2, 3, 3, 4, 4, 4] + integral = (precision - decimals) + uncomp_integral = int(integral / digits_per_integer) + uncomp_fractional = int(decimals / digits_per_integer) + comp_integral = integral - (uncomp_integral * digits_per_integer) + comp_fractional = decimals - (uncomp_fractional * digits_per_integer) + + # Support negative + # The sign is encoded in the high bit of the byte + # But this bit can also be used in the value + + value = self.packet.read_uint8() + if value & 0x80 != 0: + res = "" + mask = 0 + else: + mask = -1 + res = "-" + self.packet.unread(struct.pack(' 0: + value = self.packet.read_int_be_by_size(size) ^ mask + res += str(value) + + for i in range(0, uncomp_integral): + value = struct.unpack('>i', self.packet.read(4))[0] ^ mask + res += '%09d' % value + + res += "." + + for i in range(0, uncomp_fractional): + value = struct.unpack('>i', self.packet.read(4))[0] ^ mask + res += '%09d' % value + + size = compressed_bytes[comp_fractional] + if size > 0: + value = self.packet.read_int_be_by_size(size) ^ mask + res += '%0*d' % (comp_fractional, value) + + return float(decimal.Decimal(res)) + def _dump(self): super(UserVarEvent, self)._dump() print("User variable name: %s" % self.name) From b890ec657faf6d18b9e730f5eb8664d06a835fc7 Mon Sep 17 00:00:00 2001 From: Heeseon Cheon Date: Sun, 27 Aug 2023 00:24:15 +0900 Subject: [PATCH 35/42] fix: modify test_allowed_event_list --- pymysqlreplication/tests/test_basic.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/pymysqlreplication/tests/test_basic.py b/pymysqlreplication/tests/test_basic.py index 5c09eea8..21bf0bef 100644 --- a/pymysqlreplication/tests/test_basic.py +++ b/pymysqlreplication/tests/test_basic.py @@ -27,9 +27,6 @@ def ignoredEvents(self): return [GtidEvent] def test_allowed_event_list(self): - self.assertEqual(len(self.stream._allowed_event_list(None, None, False)), 18) - self.assertEqual(len(self.stream._allowed_event_list(None, None, True)), 17) - self.assertEqual(len(self.stream._allowed_event_list(None, [RotateEvent], False)), 17) self.assertEqual(len(self.stream._allowed_event_list(None, None, False)), 21) self.assertEqual(len(self.stream._allowed_event_list(None, None, True)), 20) self.assertEqual(len(self.stream._allowed_event_list(None, [RotateEvent], False)), 20) From e48369e013bb7e1ed96d5a8022bb784a563352c6 Mon Sep 17 00:00:00 2001 From: mjs Date: Sun, 27 Aug 2023 22:58:26 +0900 Subject: [PATCH 36/42] remove: duplicated TestStatementConnectionSetting --- pymysqlreplication/tests/test_basic.py | 32 -------------------------- 1 file changed, 32 deletions(-) diff --git a/pymysqlreplication/tests/test_basic.py b/pymysqlreplication/tests/test_basic.py index 21bf0bef..15c7a942 100644 --- a/pymysqlreplication/tests/test_basic.py +++ b/pymysqlreplication/tests/test_basic.py @@ -1185,38 +1185,6 @@ def test_start_encryption_event(self): self.assertEqual(len(nonce), 12) -class TestStatementConnectionSetting(base.PyMySQLReplicationTestCase): - def setUp(self): - super().setUp() - self.stream.close() - self.stream = BinLogStreamReader( - self.database, - server_id=1024, - only_events=(RandEvent, QueryEvent), - fail_on_table_metadata_unavailable=True - ) - self.execute("SET @@binlog_format='STATEMENT'") - - def test_rand_event(self): - self.execute("CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data INT NOT NULL, PRIMARY KEY (id))") - self.execute("INSERT INTO test (data) VALUES(RAND())") - self.execute("COMMIT") - - self.assertEqual(self.bin_log_format(), "STATEMENT") - self.assertIsInstance(self.stream.fetchone(), QueryEvent) - self.assertIsInstance(self.stream.fetchone(), QueryEvent) - - expect_rand_event = self.stream.fetchone() - self.assertIsInstance(expect_rand_event, RandEvent) - self.assertEqual(type(expect_rand_event.seed1), int) - self.assertEqual(type(expect_rand_event.seed2), int) - - def tearDown(self): - self.execute("SET @@binlog_format='ROW'") - self.assertEqual(self.bin_log_format(), "ROW") - super().tearDown() - - class TestRowsQueryLogEvents(base.PyMySQLReplicationTestCase): def setUp(self): super(TestRowsQueryLogEvents, self).setUp() From 047f429a7d10ff311e7d0c67483283b1225f6ee5 Mon Sep 17 00:00:00 2001 From: heehehe Date: Thu, 31 Aug 2023 02:29:19 +0900 Subject: [PATCH 37/42] refactor: add _read_new_decimal to BinLogEvent method --- pymysqlreplication/event.py | 147 +++++++++++--------------------- pymysqlreplication/row_event.py | 53 +----------- 2 files changed, 55 insertions(+), 145 deletions(-) diff --git a/pymysqlreplication/event.py b/pymysqlreplication/event.py index b05afb9b..e929396a 100644 --- a/pymysqlreplication/event.py +++ b/pymysqlreplication/event.py @@ -52,6 +52,55 @@ def _dump(self): """Core data dumped for the event""" pass + def _read_new_decimal(self, precision, decimals): + """ + Read MySQL's new decimal format introduced in MySQL 5. + This project was a great source of inspiration for understanding this storage format. + (https://github.com/jeremycole/mysql_binlog) + """ + digits_per_integer = 9 + compressed_bytes = [0, 1, 1, 2, 2, 3, 3, 4, 4, 4] + integral = (precision - decimals) + uncomp_integral = int(integral / digits_per_integer) + uncomp_fractional = int(decimals / digits_per_integer) + comp_integral = integral - (uncomp_integral * digits_per_integer) + comp_fractional = decimals - (uncomp_fractional * digits_per_integer) + + # Support negative + # The sign is encoded in the high bit of the byte + # But this bit can also be used in the value + + value = self.packet.read_uint8() + if value & 0x80 != 0: + res = "" + mask = 0 + else: + mask = -1 + res = "-" + self.packet.unread(struct.pack(' 0: + value = self.packet.read_int_be_by_size(size) ^ mask + res += str(value) + + for i in range(0, uncomp_integral): + value = struct.unpack('>i', self.packet.read(4))[0] ^ mask + res += '%09d' % value + + res += "." + + for i in range(0, uncomp_fractional): + value = struct.unpack('>i', self.packet.read(4))[0] ^ mask + res += '%09d' % value + + size = compressed_bytes[comp_fractional] + if size > 0: + value = self.packet.read_int_be_by_size(size) ^ mask + res += '%0*d' % (comp_fractional, value) + + return float(decimal.Decimal(res)) + class GtidEvent(BinLogEvent): """GTID change in binlog event """ @@ -539,105 +588,13 @@ def _read_int(self): def _read_decimal(self): self.precision = self.packet.read_uint8() self.decimals = self.packet.read_uint8() - # raw_decimal = self.packet.read(self.value_len) - # return self._parse_decimal_from_bytes(raw_decimal, self.precision, self.decimals) - return self.__read_new_decimal(self.precision, self.decimals) + return self._read_new_decimal(self.precision, self.decimals) def _read_default(self): return self.packet.read(self.value_len) - @staticmethod - def _parse_decimal_from_bytes(raw_decimal, precision, decimals): - digits_per_integer = 9 - compressed_bytes = [0, 1, 1, 2, 2, 3, 3, 4, 4, 4] - integral = precision - decimals - - uncomp_integral, comp_integral = divmod(integral, digits_per_integer) - uncomp_fractional, comp_fractional = divmod(decimals, digits_per_integer) - - res = "-" if not raw_decimal[0] & 0x80 else "" - mask = -1 if res == "-" else 0 - raw_decimal = bytearray([raw_decimal[0] ^ 0x80]) + raw_decimal[1:] - - def decode_decimal_decompress_value(comp_indx, data, mask): - size = compressed_bytes[comp_indx] - if size > 0: - databuff = bytearray(data[:size]) - for i in range(size): - databuff[i] ^= mask - return size, int.from_bytes(databuff, byteorder='big') - return 0, 0 - - pointer, value = decode_decimal_decompress_value(comp_integral, raw_decimal, mask) - res += str(value) - - for _ in range(uncomp_integral): - value = struct.unpack('>i', raw_decimal[pointer:pointer + 4])[0] ^ mask - res += '%09d' % value - pointer += 4 - - res += "." - - for _ in range(uncomp_fractional): - value = struct.unpack('>i', raw_decimal[pointer:pointer + 4])[0] ^ mask - res += '%09d' % value - pointer += 4 - - size, value = decode_decimal_decompress_value(comp_fractional, raw_decimal[pointer:], mask) - if size > 0: - res += '%0*d' % (comp_fractional, value) - - return decimal.Decimal(res) - - def __read_new_decimal(self, precision, decimals): - """Read MySQL's new decimal format introduced in MySQL 5""" - - # This project was a great source of inspiration for - # understanding this storage format. - # https://github.com/jeremycole/mysql_binlog - - digits_per_integer = 9 - compressed_bytes = [0, 1, 1, 2, 2, 3, 3, 4, 4, 4] - integral = (precision - decimals) - uncomp_integral = int(integral / digits_per_integer) - uncomp_fractional = int(decimals / digits_per_integer) - comp_integral = integral - (uncomp_integral * digits_per_integer) - comp_fractional = decimals - (uncomp_fractional * digits_per_integer) - - # Support negative - # The sign is encoded in the high bit of the byte - # But this bit can also be used in the value - - value = self.packet.read_uint8() - if value & 0x80 != 0: - res = "" - mask = 0 - else: - mask = -1 - res = "-" - self.packet.unread(struct.pack(' 0: - value = self.packet.read_int_be_by_size(size) ^ mask - res += str(value) - - for i in range(0, uncomp_integral): - value = struct.unpack('>i', self.packet.read(4))[0] ^ mask - res += '%09d' % value - - res += "." - - for i in range(0, uncomp_fractional): - value = struct.unpack('>i', self.packet.read(4))[0] ^ mask - res += '%09d' % value - - size = compressed_bytes[comp_fractional] - if size > 0: - value = self.packet.read_int_be_by_size(size) ^ mask - res += '%0*d' % (comp_fractional, value) - - return float(decimal.Decimal(res)) + def _read_new_decimal(self, precision, decimals): + return super()._read_new_decimal(precision, decimals) def _dump(self): super(UserVarEvent, self)._dump() diff --git a/pymysqlreplication/row_event.py b/pymysqlreplication/row_event.py index 513ce2dc..7b1db1b0 100644 --- a/pymysqlreplication/row_event.py +++ b/pymysqlreplication/row_event.py @@ -164,7 +164,7 @@ def _read_column_data(self, cols_bitmap): values[name] += b'\x00' * nr_pad elif column.type == FIELD_TYPE.NEWDECIMAL: - values[name] = self.__read_new_decimal(column) + values[name] = self._read_new_decimal(column.precision, column.decimals) elif column.type == FIELD_TYPE.BLOB: values[name] = self.__read_string(column.length_size, column) elif column.type == FIELD_TYPE.DATETIME: @@ -386,55 +386,8 @@ def __read_datetime2(self, column): return None return self.__add_fsp_to_time(t, column) - def __read_new_decimal(self, column): - """Read MySQL's new decimal format introduced in MySQL 5""" - - # This project was a great source of inspiration for - # understanding this storage format. - # https://github.com/jeremycole/mysql_binlog - - digits_per_integer = 9 - compressed_bytes = [0, 1, 1, 2, 2, 3, 3, 4, 4, 4] - integral = (column.precision - column.decimals) - uncomp_integral = int(integral / digits_per_integer) - uncomp_fractional = int(column.decimals / digits_per_integer) - comp_integral = integral - (uncomp_integral * digits_per_integer) - comp_fractional = column.decimals - (uncomp_fractional - * digits_per_integer) - - # Support negative - # The sign is encoded in the high bit of the the byte - # But this bit can also be used in the value - value = self.packet.read_uint8() - if value & 0x80 != 0: - res = "" - mask = 0 - else: - mask = -1 - res = "-" - self.packet.unread(struct.pack(' 0: - value = self.packet.read_int_be_by_size(size) ^ mask - res += str(value) - - for i in range(0, uncomp_integral): - value = struct.unpack('>i', self.packet.read(4))[0] ^ mask - res += '%09d' % value - - res += "." - - for i in range(0, uncomp_fractional): - value = struct.unpack('>i', self.packet.read(4))[0] ^ mask - res += '%09d' % value - - size = compressed_bytes[comp_fractional] - if size > 0: - value = self.packet.read_int_be_by_size(size) ^ mask - res += '%0*d' % (comp_fractional, value) - - return decimal.Decimal(res) + def _read_new_decimal(self, precision, decimals): + return super()._read_new_decimal(precision, decimals) def __read_binary_slice(self, binary, start, size, data_length): """ From 8c69c4730f7c519e643b6f3e8dbb55dc0d7d6106 Mon Sep 17 00:00:00 2001 From: heehehe Date: Thu, 31 Aug 2023 02:32:15 +0900 Subject: [PATCH 38/42] fix: modify _read_new_decimal return type --- pymysqlreplication/event.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pymysqlreplication/event.py b/pymysqlreplication/event.py index e929396a..1dfd2cbe 100644 --- a/pymysqlreplication/event.py +++ b/pymysqlreplication/event.py @@ -99,7 +99,7 @@ def _read_new_decimal(self, precision, decimals): value = self.packet.read_int_be_by_size(size) ^ mask res += '%0*d' % (comp_fractional, value) - return float(decimal.Decimal(res)) + return decimal.Decimal(res) class GtidEvent(BinLogEvent): """GTID change in binlog event @@ -594,7 +594,7 @@ def _read_default(self): return self.packet.read(self.value_len) def _read_new_decimal(self, precision, decimals): - return super()._read_new_decimal(precision, decimals) + return float(super()._read_new_decimal(precision, decimals)) def _dump(self): super(UserVarEvent, self)._dump() From 180c866c834aac21013594f30629e4a0b6354efa Mon Sep 17 00:00:00 2001 From: starcat37 Date: Thu, 31 Aug 2023 15:32:00 +0900 Subject: [PATCH 39/42] Feat: modify _read_int function --- pymysqlreplication/event.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/pymysqlreplication/event.py b/pymysqlreplication/event.py index b05afb9b..3581be21 100644 --- a/pymysqlreplication/event.py +++ b/pymysqlreplication/event.py @@ -534,7 +534,13 @@ def _read_real(self): return struct.unpack(' Date: Thu, 31 Aug 2023 19:11:28 +0900 Subject: [PATCH 40/42] refactor: add type hints and optimize read methods --- pymysqlreplication/event.py | 131 ++++++++++++++++++++++++++++-------- 1 file changed, 102 insertions(+), 29 deletions(-) diff --git a/pymysqlreplication/event.py b/pymysqlreplication/event.py index 1c9314a7..a2c5ccb3 100644 --- a/pymysqlreplication/event.py +++ b/pymysqlreplication/event.py @@ -6,6 +6,7 @@ import decimal from pymysqlreplication.constants.STATUS_VAR_KEY import * from pymysqlreplication.exceptions import StatusVariableMismatch +from typing import Union, Optional class BinLogEvent(object): @@ -555,10 +556,10 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs) super(UserVarEvent, self).__init__(from_packet, event_size, table_map, ctl_connection, **kwargs) # Payload - self.name_len = self.packet.read_uint32() - self.name = self.packet.read(self.name_len).decode() - self.is_null = self.packet.read_uint8() - self.type_to_codes_and_method = { + self.name_len: int = self.packet.read_uint32() + self.name: str = self.packet.read(self.name_len).decode() + self.is_null: int = self.packet.read_uint8() + self.type_to_codes_and_method: dict = { 0x00: ['STRING_RESULT', self._read_string], 0x01: ['REAL_RESULT', self._read_real], 0x02: ['INT_RESULT', self._read_int], @@ -566,43 +567,115 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs) 0x04: ['DECIMAL_RESULT', self._read_decimal] } - if not self.is_null: - self.type = self.packet.read_uint8() - self.charset = self.packet.read_uint32() - self.value_len = self.packet.read_uint32() + self.value: Optional[Union[str, float, int, decimal.Decimal]] = None + self.flags: Optional[int] = None + self.temp_value_buffer: Union[bytes, memoryview] = b'' - self.value = self.type_to_codes_and_method.get(self.type, ["UNKNOWN_RESULT", self._read_default])[1]() - self.flags = self.packet.read_uint8() + if not self.is_null: + self.type: int = self.packet.read_uint8() + self.charset: int = self.packet.read_uint32() + self.value_len: int = self.packet.read_uint32() + self.temp_value_buffer: Union[bytes, memoryview] = self.packet.read(self.value_len) + self.flags: int = self.packet.read_uint8() + self._set_value_from_temp_buffer() else: - self.type, self.charset, self.value, self.flags = None, None, None, None + self.type, self.charset, self.value_len, self.value, self.flags = None, None, None, None, None - def _read_string(self): - return self.packet.read(self.value_len).decode() + def _set_value_from_temp_buffer(self): + """ + Set the value from the temporary buffer based on the type code. + """ + if self.temp_value_buffer: + type_code, read_method = self.type_to_codes_and_method.get(self.type, ["UNKNOWN_RESULT", self._read_default]) + if type_code == 'INT_RESULT': + self.value = read_method(self.temp_value_buffer, self.flags) + else: + self.value = read_method(self.temp_value_buffer) + + def _read_string(self, buffer: bytes) -> str: + """ + Read string data. + """ + return buffer.decode() - def _read_real(self): - return struct.unpack(' float: + """ + Read real data. + """ + return struct.unpack(' int: + """ + Read integer data. + """ + fmt = ' decimal.Decimal: + """ + Read decimal data. + """ + self.precision = self.temp_value_buffer[0] + self.decimals = self.temp_value_buffer[1] + raw_decimal = self.temp_value_buffer[2:] + return self._parse_decimal_from_bytes(raw_decimal, self.precision, self.decimals) - def _read_default(self): + def _read_default(self) -> bytes: + """ + Read default data. + Used when the type is None. + """ return self.packet.read(self.value_len) + @staticmethod + def _parse_decimal_from_bytes(raw_decimal: bytes, precision: int, decimals: int) -> decimal.Decimal: + """ + Parse decimal from bytes. + """ + digits_per_integer = 9 + compressed_bytes = [0, 1, 1, 2, 2, 3, 3, 4, 4, 4] + integral = precision - decimals + + uncomp_integral, comp_integral = divmod(integral, digits_per_integer) + uncomp_fractional, comp_fractional = divmod(decimals, digits_per_integer) + + res = "-" if not raw_decimal[0] & 0x80 else "" + mask = -1 if res == "-" else 0 + raw_decimal = bytearray([raw_decimal[0] ^ 0x80]) + raw_decimal[1:] + + def decode_decimal_decompress_value(comp_indx, data, mask): + size = compressed_bytes[comp_indx] + if size > 0: + databuff = bytearray(data[:size]) + for i in range(size): + databuff[i] = (databuff[i] ^ mask) & 0xFF + return size, int.from_bytes(databuff, byteorder='big') + return 0, 0 + + pointer, value = decode_decimal_decompress_value(comp_integral, raw_decimal, mask) + res += str(value) + + for _ in range(uncomp_integral): + value = struct.unpack('>i', raw_decimal[pointer:pointer+4])[0] ^ mask + res += '%09d' % value + pointer += 4 + + res += "." + + for _ in range(uncomp_fractional): + value = struct.unpack('>i', raw_decimal[pointer:pointer+4])[0] ^ mask + res += '%09d' % value + pointer += 4 + + size, value = decode_decimal_decompress_value(comp_fractional, raw_decimal[pointer:], mask) + if size > 0: + res += '%0*d' % (comp_fractional, value) + return decimal.Decimal(res) + def _read_new_decimal(self, precision, decimals): return float(super()._read_new_decimal(precision, decimals)) - def _dump(self): + def _dump(self) -> None: super(UserVarEvent, self)._dump() print("User variable name: %s" % self.name) print("Is NULL: %s" % ("Yes" if self.is_null else "No")) From de6d8e7ed288549e1fa68e637d917a713fc8824e Mon Sep 17 00:00:00 2001 From: mikaniz Date: Thu, 31 Aug 2023 20:58:26 +0900 Subject: [PATCH 41/42] test: add user_var_event tests by types --- pymysqlreplication/tests/test_basic.py | 124 +++++++++++++++++++++++-- 1 file changed, 116 insertions(+), 8 deletions(-) diff --git a/pymysqlreplication/tests/test_basic.py b/pymysqlreplication/tests/test_basic.py index 15c7a942..3589692f 100644 --- a/pymysqlreplication/tests/test_basic.py +++ b/pymysqlreplication/tests/test_basic.py @@ -1075,9 +1075,11 @@ def test_user_var_real_event(self): self.assertEqual(expected_user_var_event.charset, 33) def test_user_var_int_event(self): - self.execute("CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data INT, PRIMARY KEY (id))") - self.execute("SET @test_user_var = 5") - self.execute("INSERT INTO test (data) VALUES(@test_user_var)") + self.execute("CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data1 INT, data2 INT, data3 INT, PRIMARY KEY (id))") + self.execute("SET @test_user_var1 = 5") + self.execute("SET @test_user_var2 = 0") + self.execute("SET @test_user_var3 = -5") + self.execute("INSERT INTO test (data1, data2, data3) VALUES(@test_user_var1, @test_user_var2, @test_user_var3)") self.execute("COMMIT") self.assertEqual(self.bin_log_format(), "STATEMENT") @@ -1087,16 +1089,113 @@ def test_user_var_int_event(self): expected_user_var_event = self.stream.fetchone() self.assertIsInstance(expected_user_var_event, UserVarEvent) self.assertIsInstance(expected_user_var_event.name_len, int) - self.assertEqual(expected_user_var_event.name, "test_user_var") + self.assertEqual(expected_user_var_event.name, "test_user_var1") self.assertEqual(expected_user_var_event.value, 5) self.assertEqual(expected_user_var_event.is_null, 0) self.assertEqual(expected_user_var_event.type, 2) self.assertEqual(expected_user_var_event.charset, 33) + expected_user_var_event = self.stream.fetchone() + self.assertIsInstance(expected_user_var_event, UserVarEvent) + self.assertIsInstance(expected_user_var_event.name_len, int) + self.assertEqual(expected_user_var_event.name, "test_user_var2") + self.assertEqual(expected_user_var_event.value, 0) + self.assertEqual(expected_user_var_event.is_null, 0) + self.assertEqual(expected_user_var_event.type, 2) + self.assertEqual(expected_user_var_event.charset, 33) + + expected_user_var_event = self.stream.fetchone() + self.assertIsInstance(expected_user_var_event, UserVarEvent) + self.assertIsInstance(expected_user_var_event.name_len, int) + self.assertEqual(expected_user_var_event.name, "test_user_var3") + self.assertEqual(expected_user_var_event.value, -5) + self.assertEqual(expected_user_var_event.is_null, 0) + self.assertEqual(expected_user_var_event.type, 2) + self.assertEqual(expected_user_var_event.charset, 33) + + def test_user_var_int24_event(self): + self.execute("CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data1 MEDIUMINT, data2 MEDIUMINT, data3 MEDIUMINT UNSIGNED, PRIMARY KEY (id))") + self.execute("SET @test_user_var1 = 8388607") + self.execute("SET @test_user_var2 = -8388607") + self.execute("SET @test_user_var3 = 16777215") + self.execute("INSERT INTO test (data1, data2, data3) VALUES(@test_user_var1, @test_user_var2, @test_user_var3)") + self.execute("COMMIT") + + self.assertEqual(self.bin_log_format(), "STATEMENT") + self.assertIsInstance(self.stream.fetchone(), QueryEvent) + self.assertIsInstance(self.stream.fetchone(), QueryEvent) + + expected_user_var_event = self.stream.fetchone() + self.assertIsInstance(expected_user_var_event, UserVarEvent) + self.assertIsInstance(expected_user_var_event.name_len, int) + self.assertEqual(expected_user_var_event.name, "test_user_var1") + self.assertEqual(expected_user_var_event.value, 8388607) + self.assertEqual(expected_user_var_event.is_null, 0) + self.assertEqual(expected_user_var_event.type, 2) + self.assertEqual(expected_user_var_event.charset, 33) + + expected_user_var_event = self.stream.fetchone() + self.assertIsInstance(expected_user_var_event, UserVarEvent) + self.assertIsInstance(expected_user_var_event.name_len, int) + self.assertEqual(expected_user_var_event.name, "test_user_var2") + self.assertEqual(expected_user_var_event.value, -8388607) + self.assertEqual(expected_user_var_event.is_null, 0) + self.assertEqual(expected_user_var_event.type, 2) + self.assertEqual(expected_user_var_event.charset, 33) + + expected_user_var_event = self.stream.fetchone() + self.assertIsInstance(expected_user_var_event, UserVarEvent) + self.assertIsInstance(expected_user_var_event.name_len, int) + self.assertEqual(expected_user_var_event.name, "test_user_var3") + self.assertEqual(expected_user_var_event.value, 16777215) + self.assertEqual(expected_user_var_event.is_null, 0) + self.assertEqual(expected_user_var_event.type, 2) + self.assertEqual(expected_user_var_event.charset, 33) + + def test_user_var_longlong_event(self): + self.execute("CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data1 BIGINT, data2 BIGINT, data3 BIGINT UNSIGNED, PRIMARY KEY (id))") + self.execute("SET @test_user_var1 = 9223372036854775807") + self.execute("SET @test_user_var2 = -9223372036854775808") + self.execute("SET @test_user_var3 = 18446744073709551615") + self.execute("INSERT INTO test (data1, data2, data3) VALUES(@test_user_var1, @test_user_var2, @test_user_var3)") + self.execute("COMMIT") + + self.assertEqual(self.bin_log_format(), "STATEMENT") + self.assertIsInstance(self.stream.fetchone(), QueryEvent) + self.assertIsInstance(self.stream.fetchone(), QueryEvent) + + expected_user_var_event = self.stream.fetchone() + self.assertIsInstance(expected_user_var_event, UserVarEvent) + self.assertIsInstance(expected_user_var_event.name_len, int) + self.assertEqual(expected_user_var_event.name, "test_user_var1") + self.assertEqual(expected_user_var_event.value, 9223372036854775807) + self.assertEqual(expected_user_var_event.is_null, 0) + self.assertEqual(expected_user_var_event.type, 2) + self.assertEqual(expected_user_var_event.charset, 33) + + expected_user_var_event = self.stream.fetchone() + self.assertIsInstance(expected_user_var_event, UserVarEvent) + self.assertIsInstance(expected_user_var_event.name_len, int) + self.assertEqual(expected_user_var_event.name, "test_user_var2") + self.assertEqual(expected_user_var_event.value, -9223372036854775808) + self.assertEqual(expected_user_var_event.is_null, 0) + self.assertEqual(expected_user_var_event.type, 2) + self.assertEqual(expected_user_var_event.charset, 33) + + expected_user_var_event = self.stream.fetchone() + self.assertIsInstance(expected_user_var_event, UserVarEvent) + self.assertIsInstance(expected_user_var_event.name_len, int) + self.assertEqual(expected_user_var_event.name, "test_user_var3") + self.assertEqual(expected_user_var_event.value, 18446744073709551615) + self.assertEqual(expected_user_var_event.is_null, 0) + self.assertEqual(expected_user_var_event.type, 2) + self.assertEqual(expected_user_var_event.charset, 33) + def test_user_var_decimal_event(self): - self.execute("CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data DECIMAL, PRIMARY KEY (id))") - self.execute("SET @test_user_var = 5.25") - self.execute("INSERT INTO test (data) VALUES(@test_user_var)") + self.execute("CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data1 DECIMAL, data2 DECIMAL, PRIMARY KEY (id))") + self.execute("SET @test_user_var1 = 5.25") + self.execute("SET @test_user_var2 = -5.25") + self.execute("INSERT INTO test (data1, data2) VALUES(@test_user_var1, @test_user_var2)") self.execute("COMMIT") self.assertEqual(self.bin_log_format(), "STATEMENT") @@ -1106,12 +1205,21 @@ def test_user_var_decimal_event(self): expected_user_var_event = self.stream.fetchone() self.assertIsInstance(expected_user_var_event, UserVarEvent) self.assertIsInstance(expected_user_var_event.name_len, int) - self.assertEqual(expected_user_var_event.name, "test_user_var") + self.assertEqual(expected_user_var_event.name, "test_user_var1") self.assertEqual(expected_user_var_event.value, 5.25) self.assertEqual(expected_user_var_event.is_null, 0) self.assertEqual(expected_user_var_event.type, 4) self.assertEqual(expected_user_var_event.charset, 33) + expected_user_var_event = self.stream.fetchone() + self.assertIsInstance(expected_user_var_event, UserVarEvent) + self.assertIsInstance(expected_user_var_event.name_len, int) + self.assertEqual(expected_user_var_event.name, "test_user_var2") + self.assertEqual(expected_user_var_event.value, -5.25) + self.assertEqual(expected_user_var_event.is_null, 0) + self.assertEqual(expected_user_var_event.type, 4) + self.assertEqual(expected_user_var_event.charset, 33) + def tearDown(self): self.execute("SET @@binlog_format='ROW'") self.assertEqual(self.bin_log_format(), "ROW") From 9d53176101d54fac51502955457b369e62ff9d4a Mon Sep 17 00:00:00 2001 From: heehehe Date: Thu, 31 Aug 2023 21:32:45 +0900 Subject: [PATCH 42/42] revert: move __read_new_decimal back to row_event --- pymysqlreplication/event.py | 52 -------------------------------- pymysqlreplication/row_event.py | 53 +++++++++++++++++++++++++++++++-- 2 files changed, 50 insertions(+), 55 deletions(-) diff --git a/pymysqlreplication/event.py b/pymysqlreplication/event.py index a2c5ccb3..f998e176 100644 --- a/pymysqlreplication/event.py +++ b/pymysqlreplication/event.py @@ -53,55 +53,6 @@ def _dump(self): """Core data dumped for the event""" pass - def _read_new_decimal(self, precision, decimals): - """ - Read MySQL's new decimal format introduced in MySQL 5. - This project was a great source of inspiration for understanding this storage format. - (https://github.com/jeremycole/mysql_binlog) - """ - digits_per_integer = 9 - compressed_bytes = [0, 1, 1, 2, 2, 3, 3, 4, 4, 4] - integral = (precision - decimals) - uncomp_integral = int(integral / digits_per_integer) - uncomp_fractional = int(decimals / digits_per_integer) - comp_integral = integral - (uncomp_integral * digits_per_integer) - comp_fractional = decimals - (uncomp_fractional * digits_per_integer) - - # Support negative - # The sign is encoded in the high bit of the byte - # But this bit can also be used in the value - - value = self.packet.read_uint8() - if value & 0x80 != 0: - res = "" - mask = 0 - else: - mask = -1 - res = "-" - self.packet.unread(struct.pack(' 0: - value = self.packet.read_int_be_by_size(size) ^ mask - res += str(value) - - for i in range(0, uncomp_integral): - value = struct.unpack('>i', self.packet.read(4))[0] ^ mask - res += '%09d' % value - - res += "." - - for i in range(0, uncomp_fractional): - value = struct.unpack('>i', self.packet.read(4))[0] ^ mask - res += '%09d' % value - - size = compressed_bytes[comp_fractional] - if size > 0: - value = self.packet.read_int_be_by_size(size) ^ mask - res += '%0*d' % (comp_fractional, value) - - return decimal.Decimal(res) - class GtidEvent(BinLogEvent): """GTID change in binlog event """ @@ -672,9 +623,6 @@ def decode_decimal_decompress_value(comp_indx, data, mask): res += '%0*d' % (comp_fractional, value) return decimal.Decimal(res) - def _read_new_decimal(self, precision, decimals): - return float(super()._read_new_decimal(precision, decimals)) - def _dump(self) -> None: super(UserVarEvent, self)._dump() print("User variable name: %s" % self.name) diff --git a/pymysqlreplication/row_event.py b/pymysqlreplication/row_event.py index 7b1db1b0..513ce2dc 100644 --- a/pymysqlreplication/row_event.py +++ b/pymysqlreplication/row_event.py @@ -164,7 +164,7 @@ def _read_column_data(self, cols_bitmap): values[name] += b'\x00' * nr_pad elif column.type == FIELD_TYPE.NEWDECIMAL: - values[name] = self._read_new_decimal(column.precision, column.decimals) + values[name] = self.__read_new_decimal(column) elif column.type == FIELD_TYPE.BLOB: values[name] = self.__read_string(column.length_size, column) elif column.type == FIELD_TYPE.DATETIME: @@ -386,8 +386,55 @@ def __read_datetime2(self, column): return None return self.__add_fsp_to_time(t, column) - def _read_new_decimal(self, precision, decimals): - return super()._read_new_decimal(precision, decimals) + def __read_new_decimal(self, column): + """Read MySQL's new decimal format introduced in MySQL 5""" + + # This project was a great source of inspiration for + # understanding this storage format. + # https://github.com/jeremycole/mysql_binlog + + digits_per_integer = 9 + compressed_bytes = [0, 1, 1, 2, 2, 3, 3, 4, 4, 4] + integral = (column.precision - column.decimals) + uncomp_integral = int(integral / digits_per_integer) + uncomp_fractional = int(column.decimals / digits_per_integer) + comp_integral = integral - (uncomp_integral * digits_per_integer) + comp_fractional = column.decimals - (uncomp_fractional + * digits_per_integer) + + # Support negative + # The sign is encoded in the high bit of the the byte + # But this bit can also be used in the value + value = self.packet.read_uint8() + if value & 0x80 != 0: + res = "" + mask = 0 + else: + mask = -1 + res = "-" + self.packet.unread(struct.pack(' 0: + value = self.packet.read_int_be_by_size(size) ^ mask + res += str(value) + + for i in range(0, uncomp_integral): + value = struct.unpack('>i', self.packet.read(4))[0] ^ mask + res += '%09d' % value + + res += "." + + for i in range(0, uncomp_fractional): + value = struct.unpack('>i', self.packet.read(4))[0] ^ mask + res += '%09d' % value + + size = compressed_bytes[comp_fractional] + if size > 0: + value = self.packet.read_int_be_by_size(size) ^ mask + res += '%0*d' % (comp_fractional, value) + + return decimal.Decimal(res) def __read_binary_slice(self, binary, start, size, data_length): """