diff --git a/docker-compose.yml b/docker-compose.yml index d6449d2c..ffebb04b 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -15,3 +15,16 @@ services: ports: - 3307:3307 command: mysqld --log-bin=mysql-bin.log --server-id 1 --binlog-format=row --gtid_mode=on --enforce-gtid-consistency=on --log_slave_updates -P 3307 + + mariadb-10.6: + image: mariadb:10.6 + environment: + MARIADB_ALLOW_EMPTY_ROOT_PASSWORD: 1 + ports: + - "3308:3306" + command: | + --server-id=1 + --default-authentication-plugin=mysql_native_password + --log-bin=master-bin + --binlog-format=row + --log-slave-updates=on \ No newline at end of file diff --git a/examples/mariadb_gtid/read_event.py b/examples/mariadb_gtid/read_event.py index cc88a97f..b0fff943 100644 --- a/examples/mariadb_gtid/read_event.py +++ b/examples/mariadb_gtid/read_event.py @@ -1,7 +1,7 @@ import pymysql from pymysqlreplication import BinLogStreamReader, gtid -from pymysqlreplication.event import GtidEvent, RotateEvent, MariadbGtidEvent, QueryEvent +from pymysqlreplication.event import GtidEvent, RotateEvent, MariadbGtidEvent, QueryEvent, MariadbBinLogCheckPointEvent from pymysqlreplication.row_event import WriteRowsEvent, UpdateRowsEvent, DeleteRowsEvent MARIADB_SETTINGS = { @@ -62,6 +62,7 @@ def query_server_id(self): blocking=False, only_events=[ MariadbGtidEvent, + MariadbBinLogCheckPointEvent, RotateEvent, WriteRowsEvent, UpdateRowsEvent, diff --git a/pymysqlreplication/binlogstream.py b/pymysqlreplication/binlogstream.py index fa65aa22..61d8f53a 100644 --- a/pymysqlreplication/binlogstream.py +++ b/pymysqlreplication/binlogstream.py @@ -14,7 +14,7 @@ QueryEvent, RotateEvent, FormatDescriptionEvent, XidEvent, GtidEvent, StopEvent, XAPrepareEvent, BeginLoadQueryEvent, ExecuteLoadQueryEvent, - HeartbeatLogEvent, NotImplementedEvent, MariadbGtidEvent) + HeartbeatLogEvent, NotImplementedEvent, MariadbGtidEvent, MariadbBinLogCheckPointEvent) from .exceptions import BinLogNotEnabled from .row_event import ( UpdateRowsEvent, WriteRowsEvent, DeleteRowsEvent, TableMapEvent) @@ -600,7 +600,8 @@ def _allowed_event_list(self, only_events, ignored_events, TableMapEvent, HeartbeatLogEvent, NotImplementedEvent, - MariadbGtidEvent + MariadbGtidEvent, + MariadbBinLogCheckPointEvent )) if ignored_events is not None: for e in ignored_events: diff --git a/pymysqlreplication/event.py b/pymysqlreplication/event.py index d75c9db8..2864a02b 100644 --- a/pymysqlreplication/event.py +++ b/pymysqlreplication/event.py @@ -53,7 +53,19 @@ def _dump(self): class GtidEvent(BinLogEvent): - """GTID change in binlog event + """ + GTID change in binlog event + + For more inforamtion : `[GTID] `_ `[see also] `_ + + :ivar commit_flag: 1byte - 00000001 = Transaction may have changes logged with SBR. + In 5.6, 5.7.0-5.7.18, and 8.0.0-8.0.1, this flag is always set. Starting in 5.7.19 and 8.0.2, this flag is cleared if the transaction only contains row events. It is set if any part of the transaction is written in statement format. + :ivar sid: 16 byte sequence - UUID representing the SID + :ivar gno: int - Group number, second component of GTID. + :ivar lt_type: int(1 byte) - The type of logical timestamp used in the logical clock fields. + :ivar last_committed: Store the transaction's commit parent sequence_number + :ivar sequence_number: The transaction's logical timestamp assigned at prepare phase + """ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs): super(GtidEvent, self).__init__(from_packet, event_size, table_map, @@ -92,8 +104,16 @@ def __repr__(self): class MariadbGtidEvent(BinLogEvent): """ - GTID change in binlog event in MariaDB - https://mariadb.com/kb/en/gtid_event/ + Represents GTID(Global Transaction Identifier) change in binlog event in MariaDB + + For more information: `[see details] `_. + + :ivar server_id: int - The ID of the server where the GTID event occurred. + :ivar gtid_seq_no: int - The sequence number of the GTID event. + :ivar domain_id: int - The domain ID associated with the GTID event. + :ivar flags: int - Flags related to the GTID event. + :ivar gtid: str - The Global Transaction Identifier in the format ‘domain_id-server_id-gtid_seq_no’. + """ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs): @@ -110,13 +130,33 @@ def _dump(self): print("Flags:", self.flags) print('GTID:', self.gtid) +class MariadbBinLogCheckPointEvent(BinLogEvent): + """ + Represents a checkpoint in a binlog event in MariaDB. + + For more information: `[see details] `_. + + :ivar filename_length: int - The length of the filename. + :ivar filename: str - The name of the file saved at the checkpoint. + """ + + def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs): + super(MariadbBinLogCheckPointEvent, self).__init__(from_packet, event_size, table_map, ctl_connection, + **kwargs) + filename_length = self.packet.read_uint32() + self.filename = self.packet.read(filename_length).decode() + + def _dump(self): + print('Filename:', self.filename) class RotateEvent(BinLogEvent): - """Change MySQL bin log file + """ + Represents information for the slave to know the name of the binary log it is going to receive. - Attributes: - position: Position inside next binlog - next_binlog: Name of next binlog file + For more information: `[see details] `_. + + :ivar position: int - Position inside next binlog + :ivar next_binlog: str - Name of next binlog file """ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs): super(RotateEvent, self).__init__(from_packet, event_size, table_map, @@ -130,14 +170,16 @@ def dump(self): print("Next binlog file: %s" % self.next_binlog) print() - class XAPrepareEvent(BinLogEvent): - """An XA prepare event is generated for a XA prepared transaction. - Like Xid_event it contans XID of the *prepared* transaction - - Attributes: - one_phase: current XA transaction commit method - xid: serialized XID representation of XA transaction + """ + Generated for a XA prepared transaction. + Like Xid_event, it contains XID of the **prepared** transaction. + + For more information: `[see details] `_. + + :ivar one_phase: current XA transaction commit method + :ivar xid_format_id: a number that identifies the format used by the gtrid and bqual values + :ivar xid: serialized XID representation of XA transaction (xid_gtrid + xid_bqual) """ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs): super(XAPrepareEvent, self).__init__(from_packet, event_size, table_map, @@ -181,10 +223,12 @@ class StopEvent(BinLogEvent): class XidEvent(BinLogEvent): - """A COMMIT event + """ + Generated when COMMIT of a transaction that modifies one or more tables of an XA-capable storage engine occurs. - Attributes: - xid: Transaction ID for 2PC + For more information : `[see details] `_. + + :ivar xid: uint - Transaction ID for 2 Phrase Commit. """ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs): @@ -198,23 +242,28 @@ def _dump(self): class HeartbeatLogEvent(BinLogEvent): - """A Heartbeat event - Heartbeats are sent by the master only if there are no unsent events in the - binary log file for a period longer than the interval defined by - MASTER_HEARTBEAT_PERIOD connection setting. - - A mysql server will also play those to the slave for each skipped - events in the log. I (baloo) believe the intention is to make the slave - bump its position so that if a disconnection occurs, the slave only - reconnects from the last skipped position (see Binlog_sender::send_events - in sql/rpl_binlog_sender.cc). That makes 106 bytes of data for skipped - event in the binlog. *this is also the case with GTID replication*. To - mitigate such behavior, you are expected to keep the binlog small (see - max_binlog_size, defaults to 1G). - In any case, the timestamp is 0 (as in 1970-01-01T00:00:00). + """ + Heartbeats are sent by the master. + Master sends heartbeats when there are no unsent events in the binary log file after certain period of time. + The interval is defined by MASTER_HEARTBEAT_PERIOD connection setting. - Attributes: - ident: Name of the current binlog + `[see MASTER_HEARTBEAT_PERIOD] `_. + + A Mysql server also does it for each skipped events in the log. + This is because to make the slave bump its position so that + if a disconnection occurs, the slave will only reconnects from the lasted skipped position. (Baloo's idea) + + + (see Binlog_sender::send_events in sql/rpl_binlog_sender.cc). + + Warning: + That makes 106 bytes of data for skipped event in the binlog. + *this is also the case with GTID replication*. + To mitigate such behavior, you are expected to keep the binlog small + (see max_binlog_size, defaults to 1G). + In any case, the timestamp is 0 (as in 1970-01-01T00:00:00). + + :ivar ident: Name of the current binlog """ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs): @@ -418,10 +467,12 @@ def _dump(self): class IntvarEvent(BinLogEvent): """ - - Attributes: - type - value + Stores the value of auto-increment variables. + This event will be created just before a QueryEvent. + + :ivar type: int - 1 byte identifying the type of variable stored. + Can be either LAST_INSERT_ID_EVENT (1) or INSERT_ID_EVENT (2). + :ivar value: int - The value of the variable """ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs): super(IntvarEvent, self).__init__(from_packet, event_size, table_map, @@ -438,6 +489,11 @@ def _dump(self): class NotImplementedEvent(BinLogEvent): + """ + Used as a temporary class for events that have not yet been implemented. + + The event referencing this class skips parsing. + """ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs): super(NotImplementedEvent, self).__init__( from_packet, event_size, table_map, ctl_connection, **kwargs) diff --git a/pymysqlreplication/packet.py b/pymysqlreplication/packet.py index 94baefdf..2fc821e8 100644 --- a/pymysqlreplication/packet.py +++ b/pymysqlreplication/packet.py @@ -84,7 +84,7 @@ class BinLogPacketWrapper(object): constants.PREVIOUS_GTIDS_LOG_EVENT: event.NotImplementedEvent, # MariaDB GTID constants.MARIADB_ANNOTATE_ROWS_EVENT: event.NotImplementedEvent, - constants.MARIADB_BINLOG_CHECKPOINT_EVENT: event.NotImplementedEvent, + constants.MARIADB_BINLOG_CHECKPOINT_EVENT: event.MariadbBinLogCheckPointEvent, constants.MARIADB_GTID_EVENT: event.MariadbGtidEvent, constants.MARIADB_GTID_GTID_LIST_EVENT: event.NotImplementedEvent, constants.MARIADB_START_ENCRYPTION_EVENT: event.NotImplementedEvent diff --git a/pymysqlreplication/tests/base.py b/pymysqlreplication/tests/base.py index a7975714..80682005 100644 --- a/pymysqlreplication/tests/base.py +++ b/pymysqlreplication/tests/base.py @@ -121,3 +121,34 @@ def bin_log_basename(self): bin_log_basename = cursor.fetchone()[0] bin_log_basename = bin_log_basename.split("/")[-1] return bin_log_basename + + +class PyMySQLReplicationMariaDbTestCase(PyMySQLReplicationTestCase): + def setUp(self): + # default + self.database = { + "host": "localhost", + "user": "root", + "passwd": "", + "port": 3308, + "use_unicode": True, + "charset": "utf8", + "db": "pymysqlreplication_test" + } + + self.conn_control = None + db = copy.copy(self.database) + db["db"] = None + self.connect_conn_control(db) + self.execute("DROP DATABASE IF EXISTS pymysqlreplication_test") + self.execute("CREATE DATABASE pymysqlreplication_test") + db = copy.copy(self.database) + self.connect_conn_control(db) + self.stream = None + self.resetBinLog() + + def bin_log_basename(self): + cursor = self.execute('select @@log_bin_basename') + bin_log_basename = cursor.fetchone()[0] + bin_log_basename = bin_log_basename.split("/")[-1] + return bin_log_basename \ No newline at end of file diff --git a/pymysqlreplication/tests/test_basic.py b/pymysqlreplication/tests/test_basic.py index 0db8a264..6d0d90ed 100644 --- a/pymysqlreplication/tests/test_basic.py +++ b/pymysqlreplication/tests/test_basic.py @@ -17,7 +17,7 @@ from pymysqlreplication.constants.BINLOG import * from pymysqlreplication.row_event import * -__all__ = ["TestBasicBinLogStreamReader", "TestMultipleRowBinLogStreamReader", "TestCTLConnectionSettings", "TestGtidBinLogStreamReader"] +__all__ = ["TestBasicBinLogStreamReader", "TestMultipleRowBinLogStreamReader", "TestCTLConnectionSettings", "TestGtidBinLogStreamReader", "TestMariadbBinlogStreamReader"] class TestBasicBinLogStreamReader(base.PyMySQLReplicationTestCase): @@ -25,9 +25,9 @@ def ignoredEvents(self): return [GtidEvent] def test_allowed_event_list(self): - self.assertEqual(len(self.stream._allowed_event_list(None, None, False)), 16) - self.assertEqual(len(self.stream._allowed_event_list(None, None, True)), 15) - self.assertEqual(len(self.stream._allowed_event_list(None, [RotateEvent], False)), 15) + self.assertEqual(len(self.stream._allowed_event_list(None, None, False)), 17) + self.assertEqual(len(self.stream._allowed_event_list(None, None, True)), 16) + self.assertEqual(len(self.stream._allowed_event_list(None, [RotateEvent], False)), 16) self.assertEqual(len(self.stream._allowed_event_list([RotateEvent], None, False)), 1) def test_read_query_event(self): @@ -1002,6 +1002,32 @@ def test_parsing(self): gtid = Gtid("57b70f4e-20d3-11e5-a393-4a63946f7eac:1-:1") gtid = Gtid("57b70f4e-20d3-11e5-a393-4a63946f7eac::1") +class TestMariadbBinlogStreamReader(base.PyMySQLReplicationMariaDbTestCase): + def test_binlog_checkpoint_event(self): + self.stream.close() + self.stream = BinLogStreamReader( + self.database, + server_id=1023, + blocking=False, + is_mariadb=True + ) + + query = "DROP TABLE IF EXISTS test" + self.execute(query) + + query = "CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" + self.execute(query) + self.stream.close() + + event = self.stream.fetchone() + self.assertIsInstance(event, RotateEvent) + + event = self.stream.fetchone() + self.assertIsInstance(event,FormatDescriptionEvent) + + event = self.stream.fetchone() + self.assertIsInstance(event, MariadbBinLogCheckPointEvent) + self.assertEqual(event.filename, self.bin_log_basename()+".000001") if __name__ == "__main__": import unittest