Skip to content

Commit 8fed1ed

Browse files
committed
skip the first 2 events after reconnected
* sleep 5s after disconnected accidently and skip the first 2 events after reconnected * some cleanup and compacting works
1 parent b615725 commit 8fed1ed

File tree

1 file changed

+9
-9
lines changed

1 file changed

+9
-9
lines changed

pymysqlreplication/binlogstream.py

+9-9
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import pymysql
44
import struct
5+
import time
56
from distutils.version import LooseVersion
67

78
from pymysql.constants.COMMAND import COM_BINLOG_DUMP, COM_REGISTER_SLAVE
@@ -30,6 +31,7 @@
3031
# 2006 MySQL server has gone away
3132
MYSQL_EXPECTED_ERROR_CODES = [2013, 2006]
3233

34+
__PYMYSQL_VERSION_LT_06 = pymysql.__version__ < LooseVersion("0.6")
3335

3436
class ReportSlave(object):
3537

@@ -208,11 +210,6 @@ def __init__(self, connection_settings, server_id,
208210
each time the client is disconnected and then auto-reconnected
209211
to the mysql server (OperationalError 2006/2013) if resume_stream
210212
is False. so it's suggested to set resume_stream to True.
211-
212-
an additional RotateEvent and FormatDescriptionEvent will be
213-
fetched each time the client is disconnected and then auto-
214-
reconnected to the server. (no matter resume_stream is True
215-
or False)
216213
"""
217214

218215
self.__connection_settings = connection_settings
@@ -263,7 +260,7 @@ def __init__(self, connection_settings, server_id,
263260
self.pymysql_wrapper = pymysql.connect
264261

265262
def close(self):
266-
if getattr(self, '_stream_connection', None) and self._stream_connection.open:
263+
if self.__connected_stream:
267264
self._stream_connection.close()
268265
if getattr(self, '_ctl_connection', None):
269266
# break reference cycle between stream reader and underlying
@@ -304,7 +301,7 @@ def _register_slave(self):
304301

305302
packet = self.report_slave.encoded(self.__server_id)
306303

307-
if pymysql.__version__ < LooseVersion("0.6"):
304+
if __PYMYSQL_VERSION_LT_06:
308305
self._stream_connection.wfile.write(packet)
309306
self._stream_connection.wfile.flush()
310307
self._stream_connection.read_packet()
@@ -334,7 +331,7 @@ def __connect_to_stream(self, force_reconnect=False):
334331
# server_id (4) -- server id of this slave
335332
# log_file (string.EOF) -- filename of the binlog on the master
336333
self._stream_connection = self.pymysql_wrapper(**self.__connection_settings)
337-
if pymysql.__version__ < LooseVersion("0.6"):
334+
if __PYMYSQL_VERSION_LT_06:
338335
self._stream_connection._read_packet = self._stream_connection.read_packet
339336

340337
self.__use_checksum = self.__checksum_enabled()
@@ -467,7 +464,7 @@ def __connect_to_stream(self, force_reconnect=False):
467464
# encoded_data
468465
prelude += gtid_set.encoded()
469466

470-
if pymysql.__version__ < LooseVersion("0.6"):
467+
if __PYMYSQL_VERSION_LT_06:
471468
self._stream_connection.wfile.write(prelude)
472469
self._stream_connection.wfile.flush()
473470
else:
@@ -493,7 +490,10 @@ def __fetchone(self):
493490
except pymysql.OperationalError as error:
494491
code, message = error.args
495492
if code in MYSQL_EXPECTED_ERROR_CODES:
493+
time.sleep(5)
496494
self.__connect_to_stream(force_reconnect=True)
495+
# skip the first 2 events (RotateEvent and FormatDescriptionEvent)
496+
_ = self.__fetchone(), self.__fetchone()
497497
continue
498498
raise
499499

0 commit comments

Comments
 (0)