@@ -290,7 +290,6 @@ def close(self):
290
290
if self .__connected_ctl :
291
291
# break reference cycle between stream reader and underlying
292
292
# mysql connection object
293
- self ._ctl_connection ._get_table_information = None
294
293
self ._ctl_connection .close ()
295
294
self .__connected_ctl = False
296
295
@@ -301,9 +300,9 @@ def __connect_to_ctl(self):
301
300
self ._ctl_connection_settings ["cursorclass" ] = DictCursor
302
301
self ._ctl_connection_settings ["autocommit" ] = True
303
302
self ._ctl_connection = self .pymysql_wrapper (** self ._ctl_connection_settings )
304
- self ._ctl_connection ._get_table_information = self .__get_table_information
305
303
self ._ctl_connection ._get_dbms = self .__get_dbms
306
304
self .__connected_ctl = True
305
+ self .__check_optional_meta_data ()
307
306
308
307
def __checksum_enabled (self ):
309
308
"""Return True if binlog-checksum = CRC32. Only for MySQL > 5.6"""
@@ -548,6 +547,21 @@ def __set_mariadb_settings(self):
548
547
549
548
return prelude
550
549
550
+ def __check_optional_meta_data (self ):
551
+ cur = self ._ctl_connection .cursor ()
552
+ cur .execute ("SHOW VARIABLES LIKE 'BINLOG_ROW_METADATA';" )
553
+ value = cur .fetchone ()
554
+ if value is None :
555
+ logging .log (logging .WARN , """
556
+ Before using MARIADB 10.5.0 and MYSQL 8.0.14 versions,
557
+ use python-mysql-replication version Before 1.0 version """ )
558
+ value = value .get ("Value" , "" )
559
+ if value .upper () != "FULL" :
560
+ logging .log (logging .WARN , """
561
+ Setting The Variable Value BINLOG_ROW_METADATA = FULL
562
+ By Applying this, provide properly mapped column information on UPDATE,DELETE,INSERT.
563
+ """ )
564
+
551
565
def fetchone (self ):
552
566
while True :
553
567
if self .end_log_pos and self .is_past_end_log_pos :
@@ -709,37 +723,37 @@ def _allowed_event_list(
709
723
pass
710
724
return frozenset (events )
711
725
712
- def __get_table_information (self , schema , table ):
713
- for i in range (1 , 3 ):
714
- try :
715
- if not self .__connected_ctl :
716
- self .__connect_to_ctl ()
717
-
718
- cur = self ._ctl_connection .cursor ()
719
- cur .execute (
720
- """
721
- SELECT
722
- COLUMN_NAME, COLLATION_NAME, CHARACTER_SET_NAME,
723
- COLUMN_COMMENT, COLUMN_TYPE, COLUMN_KEY, ORDINAL_POSITION,
724
- DATA_TYPE, CHARACTER_OCTET_LENGTH
725
- FROM
726
- information_schema.columns
727
- WHERE
728
- table_schema = %s AND table_name = %s
729
- """ ,
730
- (schema , table ),
731
- )
732
- result = sorted (cur .fetchall (), key = lambda x : x ["ORDINAL_POSITION" ])
733
- cur .close ()
734
-
735
- return result
736
- except pymysql .OperationalError as error :
737
- code , message = error .args
738
- if code in MYSQL_EXPECTED_ERROR_CODES :
739
- self .__connected_ctl = False
740
- continue
741
- else :
742
- raise error
726
+ # def __get_table_information(self, schema, table):
727
+ # for i in range(1, 3):
728
+ # try:
729
+ # if not self.__connected_ctl:
730
+ # self.__connect_to_ctl()
731
+ #
732
+ # cur = self._ctl_connection.cursor()
733
+ # cur.execute(
734
+ # """
735
+ # SELECT
736
+ # COLUMN_NAME, COLLATION_NAME, CHARACTER_SET_NAME,
737
+ # COLUMN_COMMENT, COLUMN_TYPE, COLUMN_KEY, ORDINAL_POSITION,
738
+ # DATA_TYPE, CHARACTER_OCTET_LENGTH
739
+ # FROM
740
+ # information_schema.columns
741
+ # WHERE
742
+ # table_schema = %s AND table_name = %s
743
+ # """,
744
+ # (schema, table),
745
+ # )
746
+ # result = sorted(cur.fetchall(), key=lambda x: x["ORDINAL_POSITION"])
747
+ # cur.close()
748
+ #
749
+ # return result
750
+ # except pymysql.OperationalError as error:
751
+ # code, message = error.args
752
+ # if code in MYSQL_EXPECTED_ERROR_CODES:
753
+ # self.__connected_ctl = False
754
+ # continue
755
+ # else:
756
+ # raise error
743
757
744
758
def __get_dbms (self ):
745
759
if not self .__connected_ctl :
0 commit comments