Skip to content

Commit 3c3c95d

Browse files
Allow to force the encoding for old MySQL release
For MySQL before 5.XX we guess the encoding, this allow to manually set it.
1 parent 7fd9252 commit 3c3c95d

File tree

5 files changed

+70
-49
lines changed

5 files changed

+70
-49
lines changed

pymysqlreplication/binlogstream.py

+24-19
Original file line numberDiff line numberDiff line change
@@ -1,43 +1,43 @@
1-
import struct
21
import logging
3-
from packaging.version import Version
2+
import struct
43

54
import pymysql
5+
from packaging.version import Version
66
from pymysql.constants.COMMAND import COM_BINLOG_DUMP, COM_REGISTER_SLAVE
77
from pymysql.cursors import DictCursor
88

9-
from .constants.BINLOG import TABLE_MAP_EVENT, ROTATE_EVENT, FORMAT_DESCRIPTION_EVENT
9+
from .constants.BINLOG import FORMAT_DESCRIPTION_EVENT, ROTATE_EVENT, TABLE_MAP_EVENT
1010
from .event import (
11-
QueryEvent,
12-
RotateEvent,
13-
FormatDescriptionEvent,
14-
XidEvent,
15-
GtidEvent,
16-
StopEvent,
17-
XAPrepareEvent,
1811
BeginLoadQueryEvent,
1912
ExecuteLoadQueryEvent,
13+
FormatDescriptionEvent,
14+
GtidEvent,
2015
HeartbeatLogEvent,
21-
NotImplementedEvent,
22-
MariadbGtidEvent,
2316
MariadbAnnotateRowsEvent,
24-
RandEvent,
17+
MariadbBinLogCheckPointEvent,
18+
MariadbGtidEvent,
19+
MariadbGtidListEvent,
2520
MariadbStartEncryptionEvent,
21+
NotImplementedEvent,
22+
PreviousGtidsEvent,
23+
QueryEvent,
24+
RandEvent,
25+
RotateEvent,
2626
RowsQueryLogEvent,
27-
MariadbGtidListEvent,
28-
MariadbBinLogCheckPointEvent,
27+
StopEvent,
2928
UserVarEvent,
30-
PreviousGtidsEvent,
29+
XAPrepareEvent,
30+
XidEvent,
3131
)
3232
from .exceptions import BinLogNotEnabled
3333
from .gtid import GtidSet
3434
from .packet import BinLogPacketWrapper
3535
from .row_event import (
36-
UpdateRowsEvent,
37-
WriteRowsEvent,
3836
DeleteRowsEvent,
39-
TableMapEvent,
4037
PartialUpdateRowsEvent,
38+
TableMapEvent,
39+
UpdateRowsEvent,
40+
WriteRowsEvent,
4141
)
4242

4343
try:
@@ -185,6 +185,7 @@ def __init__(
185185
slave_heartbeat=None,
186186
is_mariadb=False,
187187
annotate_rows_event=False,
188+
force_encoding=None,
188189
ignore_decode_errors=False,
189190
verify_checksum=False,
190191
enable_logging=True,
@@ -225,6 +226,8 @@ def __init__(
225226
to point to Mariadb specific GTID.
226227
annotate_rows_event: Parameter value to enable annotate rows event in mariadb,
227228
used with 'is_mariadb'
229+
force_encoding: Force the encoding to decode a string this for MySQL 5.XXX This is the charset
230+
to use example latin-1
228231
ignore_decode_errors: If true, any decode errors encountered
229232
when reading column data will be ignored.
230233
verify_checksum: If true, verify events read from the binary log by examining checksums.
@@ -252,6 +255,7 @@ def __init__(
252255
only_events, ignored_events, filter_non_implemented_events
253256
)
254257
self.__ignore_decode_errors = ignore_decode_errors
258+
self.__force_encoding = force_encoding
255259
self.__verify_checksum = verify_checksum
256260
self.__optional_meta_data = False
257261

@@ -628,6 +632,7 @@ def fetchone(self):
628632
self.__ignored_schemas,
629633
self.__freeze_schema,
630634
self.__ignore_decode_errors,
635+
self.__force_encoding,
631636
self.__verify_checksum,
632637
self.__optional_meta_data,
633638
)

pymysqlreplication/event.py

+6-4
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,15 @@
11
import binascii
2-
import struct
32
import datetime
43
import decimal
5-
import zlib
4+
import json
65
import logging
6+
import struct
7+
import zlib
8+
from typing import Optional, Union
79

810
from pymysqlreplication.constants.STATUS_VAR_KEY import *
911
from pymysqlreplication.exceptions import StatusVariableMismatch
1012
from pymysqlreplication.util.bytes import parse_decimal_from_bytes
11-
from typing import Union, Optional
12-
import json
1313

1414

1515
class BinLogEvent(object):
@@ -26,6 +26,7 @@ def __init__(
2626
ignored_schemas=None,
2727
freeze_schema=False,
2828
ignore_decode_errors=False,
29+
force_encoding=None,
2930
verify_checksum=False,
3031
optional_meta_data=False,
3132
):
@@ -37,6 +38,7 @@ def __init__(
3738
self._ctl_connection = ctl_connection
3839
self.mysql_version = mysql_version
3940
self._ignore_decode_errors = ignore_decode_errors
41+
self._force_encoding = force_encoding
4042
self._verify_checksum = verify_checksum
4143
self._is_event_valid = None
4244
# The event have been fully processed, if processed is false

pymysqlreplication/packet.py

+4-2
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
from pymysqlreplication import constants, event, row_event
2-
from pymysqlreplication.json_binary import parse_json, JsonDiff, JsonDiffOperation
3-
from pymysqlreplication.util.bytes import *
42
from pymysqlreplication.constants import BINLOG
3+
from pymysqlreplication.json_binary import JsonDiff, JsonDiffOperation, parse_json
4+
from pymysqlreplication.util.bytes import *
55

66
# Constants from PyMYSQL source code
77
NULL_COLUMN = 251
@@ -72,6 +72,7 @@ def __init__(
7272
ignored_schemas,
7373
freeze_schema,
7474
ignore_decode_errors,
75+
force_encoding,
7576
verify_checksum,
7677
optional_meta_data,
7778
):
@@ -125,6 +126,7 @@ def __init__(
125126
ignored_schemas=ignored_schemas,
126127
freeze_schema=freeze_schema,
127128
ignore_decode_errors=ignore_decode_errors,
129+
force_encoding=force_encoding,
128130
verify_checksum=verify_checksum,
129131
optional_meta_data=optional_meta_data,
130132
)

pymysqlreplication/row_event.py

+10-10
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,15 @@
1-
import struct
2-
import decimal
31
import datetime
2+
import decimal
3+
import struct
4+
from enum import Enum
45

56
from pymysql.charset import charset_by_name
6-
from enum import Enum
77

8-
from .event import BinLogEvent
9-
from .constants import FIELD_TYPE
10-
from .constants import BINLOG
11-
from .constants import CHARSET
12-
from .constants import NONE_SOURCE
8+
from .bitmap import BitCount, BitGet
139
from .column import Column
10+
from .constants import BINLOG, CHARSET, FIELD_TYPE, NONE_SOURCE
11+
from .event import BinLogEvent
1412
from .table import Table
15-
from .bitmap import BitCount, BitGet
1613

1714

1815
class RowsEvent(BinLogEvent):
@@ -332,7 +329,10 @@ def __read_string(self, size, column):
332329
else:
333330
# MYSQL 5.xx Version Goes Here
334331
# We don't know encoding type So apply Default Utf-8
335-
string = string.decode(errors=decode_errors)
332+
if self._force_encoding:
333+
string = string.decode(encoding=self._force_encoding, errors=decode_errors)
334+
else:
335+
string = string.decode(errors=decode_errors)
336336
return string
337337

338338
def __read_bit(self, column):

pymysqlreplication/tests/test_basic.py

+26-14
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,20 @@
11
import io
22
import time
33
import unittest
4+
from unittest.mock import patch
5+
6+
import pytest
7+
from pymysql.protocol import MysqlPacket
48

5-
from pymysqlreplication.json_binary import JsonDiff, JsonDiffOperation
6-
from pymysqlreplication.tests import base
79
from pymysqlreplication import BinLogStreamReader
8-
from pymysqlreplication.gtid import GtidSet, Gtid
9-
from pymysqlreplication.event import *
1010
from pymysqlreplication.constants.BINLOG import *
1111
from pymysqlreplication.constants.NONE_SOURCE import *
12-
from pymysqlreplication.row_event import *
12+
from pymysqlreplication.event import *
13+
from pymysqlreplication.gtid import Gtid, GtidSet
14+
from pymysqlreplication.json_binary import JsonDiff, JsonDiffOperation
1315
from pymysqlreplication.packet import BinLogPacketWrapper
14-
from pymysql.protocol import MysqlPacket
15-
from unittest.mock import patch
16-
import pytest
17-
16+
from pymysqlreplication.row_event import *
17+
from pymysqlreplication.tests import base
1818

1919
__all__ = [
2020
"TestBasicBinLogStreamReader",
@@ -601,6 +601,7 @@ def create_binlog_packet_wrapper(pkt):
601601
self.stream._BinLogStreamReader__ignored_schemas,
602602
self.stream._BinLogStreamReader__freeze_schema,
603603
self.stream._BinLogStreamReader__ignore_decode_errors,
604+
self.stream._BinlogStreamReader__force_encoding,
604605
self.stream._BinLogStreamReader__verify_checksum,
605606
self.stream._BinLogStreamReader__optional_meta_data,
606607
)
@@ -808,7 +809,7 @@ def test_delete_multiple_row_event(self):
808809
self.assertEqual(event.rows[1]["values"]["id"], 2)
809810
self.assertEqual(event.rows[1]["values"]["data"], "World")
810811

811-
def test_ignore_decode_errors(self):
812+
def test_force_encoding(self):
812813
if self.isMySQL80AndMore():
813814
self.skipTest("MYSQL 8 Version Pymysql Data Error Incorrect string value")
814815
problematic_unicode_string = (
@@ -821,29 +822,40 @@ def test_ignore_decode_errors(self):
821822
)
822823
self.execute("COMMIT")
823824

824-
# Initialize with ignore_decode_errors=False
825+
826+
def test_ignore_decode_errors(self):
827+
if self.isMySQL80AndMore():
828+
self.skipTest("MYSQL 8 Version Pymysql Data Error Incorrect string value")
829+
problematic_unicode_string = "ó".encode("latin-1")
830+
self.stream.close()
831+
self.execute("CREATE TABLE test (data VARCHAR(50) CHARACTER SET latin1)")
832+
self.execute_with_args(
833+
"INSERT INTO test (data) VALUES (%s)", (problematic_unicode_string)
834+
)
835+
self.execute("COMMIT")
836+
825837
self.stream = BinLogStreamReader(
826838
self.database,
827839
server_id=1024,
828840
only_events=(WriteRowsEvent,),
829-
ignore_decode_errors=False,
841+
force_encoding=None,
830842
)
831843
with self.assertRaises(UnicodeError):
832844
event = self.stream.fetchone()
833845
data = event.rows[0]["values"]["data"]
834846

835-
# Initialize with ignore_decode_errors=True
836847
self.stream = BinLogStreamReader(
837848
self.database,
838849
server_id=1024,
839850
only_events=(WriteRowsEvent,),
840-
ignore_decode_errors=True,
851+
force_encoding="latin-1",
841852
)
842853
event = self.stream.fetchone()
843854
if event.table_map[event.table_id].column_name_flag:
844855
data = event.rows[0]["values"]["data"]
845856
self.assertEqual(data, '[{"text":" Some string"}]')
846857

858+
847859
def test_drop_column(self):
848860
self.stream.close()
849861
self.execute("CREATE TABLE test_drop_column (id INTEGER(11), data VARCHAR(50))")

0 commit comments

Comments
 (0)