Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit ba23828

Browse files
committedMar 10, 2025·
Allow to force the encoding for old MySQL release
For MySQL before 5.XX we guess the encoding, this allow to manually set it.
1 parent 7de3c1e commit ba23828

File tree

5 files changed

+103
-83
lines changed

5 files changed

+103
-83
lines changed
 

‎pymysqlreplication/binlogstream.py

+24-19
Original file line numberDiff line numberDiff line change
@@ -1,43 +1,43 @@
1-
import struct
21
import logging
3-
from packaging.version import Version
2+
import struct
43

54
import pymysql
5+
from packaging.version import Version
66
from pymysql.constants.COMMAND import COM_BINLOG_DUMP, COM_REGISTER_SLAVE
77
from pymysql.cursors import DictCursor
88

9-
from .constants.BINLOG import TABLE_MAP_EVENT, ROTATE_EVENT, FORMAT_DESCRIPTION_EVENT
9+
from .constants.BINLOG import FORMAT_DESCRIPTION_EVENT, ROTATE_EVENT, TABLE_MAP_EVENT
1010
from .event import (
11-
QueryEvent,
12-
RotateEvent,
13-
FormatDescriptionEvent,
14-
XidEvent,
15-
GtidEvent,
16-
StopEvent,
17-
XAPrepareEvent,
1811
BeginLoadQueryEvent,
1912
ExecuteLoadQueryEvent,
13+
FormatDescriptionEvent,
14+
GtidEvent,
2015
HeartbeatLogEvent,
21-
NotImplementedEvent,
22-
MariadbGtidEvent,
2316
MariadbAnnotateRowsEvent,
24-
RandEvent,
17+
MariadbBinLogCheckPointEvent,
18+
MariadbGtidEvent,
19+
MariadbGtidListEvent,
2520
MariadbStartEncryptionEvent,
21+
NotImplementedEvent,
22+
PreviousGtidsEvent,
23+
QueryEvent,
24+
RandEvent,
25+
RotateEvent,
2626
RowsQueryLogEvent,
27-
MariadbGtidListEvent,
28-
MariadbBinLogCheckPointEvent,
27+
StopEvent,
2928
UserVarEvent,
30-
PreviousGtidsEvent,
29+
XAPrepareEvent,
30+
XidEvent,
3131
)
3232
from .exceptions import BinLogNotEnabled
3333
from .gtid import GtidSet
3434
from .packet import BinLogPacketWrapper
3535
from .row_event import (
36-
UpdateRowsEvent,
37-
WriteRowsEvent,
3836
DeleteRowsEvent,
39-
TableMapEvent,
4037
PartialUpdateRowsEvent,
38+
TableMapEvent,
39+
UpdateRowsEvent,
40+
WriteRowsEvent,
4141
)
4242

4343
try:
@@ -185,6 +185,7 @@ def __init__(
185185
slave_heartbeat=None,
186186
is_mariadb=False,
187187
annotate_rows_event=False,
188+
force_encoding=None,
188189
ignore_decode_errors=False,
189190
verify_checksum=False,
190191
enable_logging=True,
@@ -225,6 +226,8 @@ def __init__(
225226
to point to Mariadb specific GTID.
226227
annotate_rows_event: Parameter value to enable annotate rows event in mariadb,
227228
used with 'is_mariadb'
229+
force_encoding: Force the encoding to decode a string this for MySQL 5.XXX This is the charset
230+
to use example latin-1
228231
ignore_decode_errors: If true, any decode errors encountered
229232
when reading column data will be ignored.
230233
verify_checksum: If true, verify events read from the binary log by examining checksums.
@@ -252,6 +255,7 @@ def __init__(
252255
only_events, ignored_events, filter_non_implemented_events
253256
)
254257
self.__ignore_decode_errors = ignore_decode_errors
258+
self.__force_encoding = force_encoding
255259
self.__verify_checksum = verify_checksum
256260
self.__optional_meta_data = False
257261

@@ -628,6 +632,7 @@ def fetchone(self):
628632
self.__ignored_schemas,
629633
self.__freeze_schema,
630634
self.__ignore_decode_errors,
635+
self.__force_encoding,
631636
self.__verify_checksum,
632637
self.__optional_meta_data,
633638
)

‎pymysqlreplication/event.py

+6-4
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,15 @@
11
import binascii
2-
import struct
32
import datetime
43
import decimal
5-
import zlib
4+
import json
65
import logging
6+
import struct
7+
import zlib
8+
from typing import Optional, Union
79

810
from pymysqlreplication.constants.STATUS_VAR_KEY import *
911
from pymysqlreplication.exceptions import StatusVariableMismatch
1012
from pymysqlreplication.util.bytes import parse_decimal_from_bytes
11-
from typing import Union, Optional
12-
import json
1313

1414

1515
class BinLogEvent(object):
@@ -26,6 +26,7 @@ def __init__(
2626
ignored_schemas=None,
2727
freeze_schema=False,
2828
ignore_decode_errors=False,
29+
force_encoding=None,
2930
verify_checksum=False,
3031
optional_meta_data=False,
3132
):
@@ -37,6 +38,7 @@ def __init__(
3738
self._ctl_connection = ctl_connection
3839
self.mysql_version = mysql_version
3940
self._ignore_decode_errors = ignore_decode_errors
41+
self._force_encoding = force_encoding
4042
self._verify_checksum = verify_checksum
4143
self._is_event_valid = None
4244
# The event have been fully processed, if processed is false

‎pymysqlreplication/packet.py

+4-2
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
from pymysqlreplication import constants, event, row_event
2-
from pymysqlreplication.json_binary import parse_json, JsonDiff, JsonDiffOperation
3-
from pymysqlreplication.util.bytes import *
42
from pymysqlreplication.constants import BINLOG
3+
from pymysqlreplication.json_binary import JsonDiff, JsonDiffOperation, parse_json
4+
from pymysqlreplication.util.bytes import *
55

66
# Constants from PyMYSQL source code
77
NULL_COLUMN = 251
@@ -72,6 +72,7 @@ def __init__(
7272
ignored_schemas,
7373
freeze_schema,
7474
ignore_decode_errors,
75+
force_encoding,
7576
verify_checksum,
7677
optional_meta_data,
7778
):
@@ -125,6 +126,7 @@ def __init__(
125126
ignored_schemas=ignored_schemas,
126127
freeze_schema=freeze_schema,
127128
ignore_decode_errors=ignore_decode_errors,
129+
force_encoding=force_encoding,
128130
verify_checksum=verify_checksum,
129131
optional_meta_data=optional_meta_data,
130132
)

‎pymysqlreplication/row_event.py

+10-10
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,15 @@
1-
import struct
2-
import decimal
31
import datetime
2+
import decimal
3+
import struct
4+
from enum import Enum
45

56
from pymysql.charset import charset_by_name
6-
from enum import Enum
77

8-
from .event import BinLogEvent
9-
from .constants import FIELD_TYPE
10-
from .constants import BINLOG
11-
from .constants import CHARSET
12-
from .constants import NONE_SOURCE
8+
from .bitmap import BitCount, BitGet
139
from .column import Column
10+
from .constants import BINLOG, CHARSET, FIELD_TYPE, NONE_SOURCE
11+
from .event import BinLogEvent
1412
from .table import Table
15-
from .bitmap import BitCount, BitGet
1613

1714

1815
class RowsEvent(BinLogEvent):
@@ -332,7 +329,10 @@ def __read_string(self, size, column):
332329
else:
333330
# MYSQL 5.xx Version Goes Here
334331
# We don't know encoding type So apply Default Utf-8
335-
string = string.decode(errors=decode_errors)
332+
if self._force_encoding:
333+
string = string.decode(encoding=self._force_encoding, errors=decode_errors)
334+
else:
335+
string = string.decode(errors=decode_errors)
336336
return string
337337

338338
def __read_bit(self, column):

‎pymysqlreplication/tests/test_basic.py

+59-48
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,20 @@
1+
import copy
12
import io
23
import time
34
import unittest
5+
from unittest.mock import patch
6+
7+
from pymysql.protocol import MysqlPacket
48

5-
from pymysqlreplication.json_binary import JsonDiff, JsonDiffOperation
6-
from pymysqlreplication.tests import base
79
from pymysqlreplication import BinLogStreamReader
8-
from pymysqlreplication.gtid import GtidSet, Gtid
9-
from pymysqlreplication.event import *
1010
from pymysqlreplication.constants.BINLOG import *
1111
from pymysqlreplication.constants.NONE_SOURCE import *
12-
from pymysqlreplication.row_event import *
12+
from pymysqlreplication.event import *
13+
from pymysqlreplication.gtid import Gtid, GtidSet
14+
from pymysqlreplication.json_binary import JsonDiff, JsonDiffOperation
1315
from pymysqlreplication.packet import BinLogPacketWrapper
14-
from pymysql.protocol import MysqlPacket
15-
from unittest.mock import patch
16-
16+
from pymysqlreplication.row_event import *
17+
from pymysqlreplication.tests import base
1718

1819
__all__ = [
1920
"TestBasicBinLogStreamReader",
@@ -284,43 +285,11 @@ def test_delete_row_event(self):
284285
self.execute("COMMIT")
285286

286287
self.assertIsInstance(self.stream.fetchone(), RotateEvent)
287-
self.assertIsInstance(self.stream.fetchone(), FormatDescriptionEvent)
288-
289-
# QueryEvent for the BEGIN
290-
if not self.isMariaDB():
291-
self.assertIsInstance(self.stream.fetchone(), QueryEvent)
292-
293-
self.assertIsInstance(self.stream.fetchone(), TableMapEvent)
294-
295-
event = self.stream.fetchone()
296-
if self.isMySQL56AndMore():
297-
self.assertEqual(event.event_type, DELETE_ROWS_EVENT_V2)
298-
else:
299-
self.assertEqual(event.event_type, DELETE_ROWS_EVENT_V1)
300-
self.assertIsInstance(event, DeleteRowsEvent)
301-
if event.table_map[event.table_id].column_name_flag:
302-
self.assertEqual(event.rows[0]["values"]["id"], 1)
303-
self.assertEqual(event.rows[0]["values"]["data"], "Hello World")
304-
305-
def test_update_row_event(self):
306-
query = "CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))"
307-
self.execute(query)
308-
query = "INSERT INTO test (data) VALUES('Hello')"
309-
self.execute(query)
310-
311-
self.resetBinLog()
312-
313-
query = "UPDATE test SET data = 'World' WHERE id = 1"
314-
self.execute(query)
288+
self.execute(
289+
"INSERT INTO test (data) VALUES ('ó')".encode("latin-1")
290+
)
315291
self.execute("COMMIT")
316292

317-
self.assertIsInstance(self.stream.fetchone(), RotateEvent)
318-
self.assertIsInstance(self.stream.fetchone(), FormatDescriptionEvent)
319-
320-
# QueryEvent for the BEGIN
321-
if not self.isMariaDB():
322-
self.assertIsInstance(self.stream.fetchone(), QueryEvent)
323-
324293
self.assertIsInstance(self.stream.fetchone(), TableMapEvent)
325294

326295
event = self.stream.fetchone()
@@ -617,6 +586,7 @@ def create_binlog_packet_wrapper(pkt):
617586
self.stream._BinLogStreamReader__ignored_schemas,
618587
self.stream._BinLogStreamReader__freeze_schema,
619588
self.stream._BinLogStreamReader__ignore_decode_errors,
589+
self.stream._BinLogStreamReader__force_encoding,
620590
self.stream._BinLogStreamReader__verify_checksum,
621591
self.stream._BinLogStreamReader__optional_meta_data,
622592
)
@@ -837,42 +807,83 @@ def test_delete_multiple_row_event(self):
837807
self.assertEqual(event.rows[1]["values"]["id"], 2)
838808
self.assertEqual(event.rows[1]["values"]["data"], "World")
839809

810+
def test_force_encoding(self):
811+
if self.isMySQL80AndMore():
812+
self.skipTest("MYSQL 8 Version don't need force encoding")
813+
self.stream.close()
814+
815+
db = copy.copy(self.database)
816+
db["charset"] = "latin1"
817+
self.connect_conn_control(db)
818+
819+
string = "\u00e9"
820+
821+
create_query = (
822+
"CREATE TABLE test (test CHAR(12)) CHARACTER SET latin1 COLLATE latin1_bin;"
823+
)
824+
insert_query = b"INSERT INTO test VALUES('" + string.encode("latin-1") + b"');"
825+
self.execute(create_query)
826+
self.execute(insert_query)
827+
self.execute("COMMIT")
828+
829+
self.stream = BinLogStreamReader(
830+
self.database,
831+
server_id=1024,
832+
only_events=(WriteRowsEvent,),
833+
force_encoding="utf-8",
834+
)
835+
with self.assertRaises(UnicodeError):
836+
event = self.stream.fetchone()
837+
data = event.rows[0]["values"]["data"]
838+
839+
self.stream = BinLogStreamReader(
840+
self.database,
841+
server_id=1024,
842+
only_events=(WriteRowsEvent,),
843+
force_encoding="latin-1",
844+
)
845+
event = self.stream.fetchone()
846+
if event.table_map[event.table_id].column_name_flag:
847+
data = event.rows[0]["values"]["data"]
848+
self.assertEqual(data, '[{"text":" Some string"}]')
849+
850+
840851
def test_ignore_decode_errors(self):
841852
if self.isMySQL80AndMore():
842853
self.skipTest("MYSQL 8 Version Pymysql Data Error Incorrect string value")
854+
problematic_unicode_string = "ó".encode("latin-1")
855+
self.stream.close()
843856
problematic_unicode_string = (
844857
b'[{"text":"\xed\xa0\xbd \xed\xb1\x8d Some string"}]'
845858
)
846-
self.stream.close()
847859
self.execute("CREATE TABLE test (data VARCHAR(50) CHARACTER SET utf8mb4)")
848860
self.execute_with_args(
849861
"INSERT INTO test (data) VALUES (%s)", (problematic_unicode_string)
850862
)
851863
self.execute("COMMIT")
852864

853-
# Initialize with ignore_decode_errors=False
854865
self.stream = BinLogStreamReader(
855866
self.database,
856867
server_id=1024,
857868
only_events=(WriteRowsEvent,),
858-
ignore_decode_errors=False,
869+
force_encoding=None,
859870
)
860871
with self.assertRaises(UnicodeError):
861872
event = self.stream.fetchone()
862873
data = event.rows[0]["values"]["data"]
863874

864-
# Initialize with ignore_decode_errors=True
865875
self.stream = BinLogStreamReader(
866876
self.database,
867877
server_id=1024,
868878
only_events=(WriteRowsEvent,),
869-
ignore_decode_errors=True,
879+
force_encoding="latin-1",
870880
)
871881
event = self.stream.fetchone()
872882
if event.table_map[event.table_id].column_name_flag:
873883
data = event.rows[0]["values"]["data"]
874884
self.assertEqual(data, '[{"text":" Some string"}]')
875885

886+
876887
def test_drop_column(self):
877888
self.stream.close()
878889
self.execute("CREATE TABLE test_drop_column (id INTEGER(11), data VARCHAR(50))")

0 commit comments

Comments
 (0)
Please sign in to comment.