Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

typing - binlogfilereader.py #75

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 30 additions & 28 deletions pymysqlreplication/tests/binlogfilereader.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,24 @@
from pymysqlreplication.event import QueryEvent
from pymysqlreplication.event import RotateEvent
from pymysqlreplication.event import XidEvent
from pymysqlreplication.event import BinLogEvent
from pymysqlreplication.row_event import TableMapEvent
from pymysqlreplication.row_event import WriteRowsEvent
from typing import Optional, List, Type, Union, Iterator, IO

class SimpleBinLogFileReader(object):
'''Read binlog files'''

_expected_magic = b'\xfebin'

def __init__(self, file_path, only_events=None):
self._current_event = None
self._file = None
self._file_path = file_path
self._only_events = only_events
self._pos = None
def __init__(self, file_path: str, only_events: Optional[List[BinLogEvent]] = None) -> None:
self._current_event: Optional[SimpleBinLogEvent] = None
self._file: Optional[IO[bytes]] = None
self._file_path: str = file_path
self._only_events: Optional[List[BinLogEvent]] = only_events
self._pos: Optional[int] = None

def fetchone(self):
def fetchone(self) -> Optional["SimpleBinLogEvent"]:
'''Fetch one record from the binlog file'''
if self._pos is None or self._pos < 4:
self._read_magic()
Expand All @@ -33,12 +35,12 @@ def fetchone(self):
if self._filter_events(event):
return event

def truncatebinlog(self):
def truncatebinlog(self) -> None:
'''Truncate the binlog file at the current event'''
if self._current_event is not None:
self._file.truncate(self._current_event.pos)

def _filter_events(self, event):
def _filter_events(self, event: BinLogEvent) -> bool:
'''Return True if an event can be returned'''
# It would be good if we could reuse the __event_map in
# packet.BinLogPacketWrapper.
Expand All @@ -52,14 +54,14 @@ def _filter_events(self, event):
}.get(event.event_type)
return event_type in self._only_events

def _open_file(self):
def _open_file(self) -> None:
'''Open the file at ``self._file_path``'''
if self._file is None:
self._file = open(self._file_path, 'rb+')
self._pos = self._file.tell()
assert self._pos == 0

def _read_event(self):
def _read_event(self) -> Optional["SimpleBinLogEvent"]:
'''Read an event from the binlog file'''
# Assuming a binlog version > 1
headerlength = 19
Expand All @@ -80,7 +82,7 @@ def _read_event(self):
event.set_body(body)
return event

def _read_magic(self):
def _read_magic(self) -> None:
'''Read the first four *magic* bytes of the binlog file'''
self._open_file()
if self._pos == 0:
Expand All @@ -92,10 +94,10 @@ def _read_magic(self):
message = messagefmt.format(magic, self._expected_magic)
raise BadMagicBytesError(message)

def __iter__(self):
def __iter__(self) -> Iterator[Optional["SimpleBinLogEvent"]]:
return iter(self.fetchone, None)

def __repr__(self):
def __repr__(self) -> str:
cls = self.__class__
mod = cls.__module__
name = cls.__name__
Expand All @@ -108,28 +110,28 @@ def __repr__(self):
class SimpleBinLogEvent(object):
'''An event from a binlog file'''

def __init__(self, header):
def __init__(self, header: bytes) -> None:
'''Initialize the Event with the event header'''
unpacked = struct.unpack('<IBIIIH', header)
self.timestamp = unpacked[0]
self.event_type = unpacked[1]
self.server_id = unpacked[2]
self.event_size = unpacked[3]
self.log_pos = unpacked[4]
self.flags = unpacked[5]
self.timestamp: int = unpacked[0]
self.event_type: int = unpacked[1]
self.server_id: int = unpacked[2]
self.event_size: int = unpacked[3]
self.log_pos: int = unpacked[4]
self.flags: int = unpacked[5]

self.body = None
self.pos = None
self.body: Optional[bytes] = None
self.pos: Optional[int] = None

def set_body(self, body):
def set_body(self, body: bytes) -> None:
'''Save the body bytes'''
self.body = body

def set_pos(self, pos):
def set_pos(self, pos: int) -> None:
'''Save the event position'''
self.pos = pos

def __repr__(self):
def __repr__(self) -> str:
cls = self.__class__
mod = cls.__module__
name = cls.__name__
Expand All @@ -146,4 +148,4 @@ class BadMagicBytesError(Exception):
'''The binlog file magic bytes did not match the specification'''

class EventSizeTooSmallError(Exception):
'''The event size was smaller than the length of the event header'''
'''The event size was smaller than the length of the event header'''