Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add UserVarEvent #44

Closed
wants to merge 46 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
f4f4316
Feat: implement rand_event
starcat37 Jul 27, 2023
cc8f7ff
fix: change number of testcases
heehehe Jul 27, 2023
13efa87
Add property for rand event
mjs1995 Jul 27, 2023
c3be8c9
fix: rename seed to _seed
mjs1995 Jul 27, 2023
f8324cc
test: add test_rand_event
heehehe Jul 29, 2023
6511f85
test: move statement setting to setUp
heehehe Jul 29, 2023
f267674
Chore: Delete unnecessary comment
starcat37 Jul 29, 2023
ce87120
Chore: delete unnecessary venv/* in .gitignore
starcat37 Jul 29, 2023
bf9ec0b
comment: add rand_event warning description
heehehe Jul 29, 2023
ccd5b86
refactor: change RandEvent import position
heehehe Jul 29, 2023
b12b7fe
revert: remove resume_stream=True in example
heehehe Jul 29, 2023
9f13346
test: add rand event seed type
sean-k1 Aug 2, 2023
dd8079c
test: add test_rand_event
mikaniz Jul 29, 2023
ff4d184
test: move statement setting to setUp
heehehe Jul 29, 2023
baad3a9
Chore: Delete unnecessary comment
starcat37 Jul 29, 2023
f04e228
Chore: delete unnecessary venv/* in .gitignore
starcat37 Jul 29, 2023
af6425d
comment: add rand_event warning description
heehehe Jul 29, 2023
243f099
refactor: change RandEvent import position
heehehe Jul 29, 2023
2edca88
revert: remove resume_stream=True in example
heehehe Jul 29, 2023
6fda545
test: add rand event seed type
sean-k1 Aug 2, 2023
7928c37
Merge branch 'feature/rand-event' of github.com:23-OSSCA-python-mysql…
heehehe Aug 5, 2023
66c6069
comment: add description for attributes
heehehe Aug 5, 2023
7aa81b4
Feat: implement UserVarEvent
mjs1995 Aug 7, 2023
795f12a
fix: change number of testcases
mjs1995 Aug 7, 2023
9a985d6
docs: add name_len ivar comment
heehehe Aug 7, 2023
f3b0012
test: add user_var_event test
heehehe Aug 7, 2023
832a887
Add type_codes for UserVarEvent
mjs1995 Aug 12, 2023
e7c1b10
Refactor UserVarEvent and add decimal parsing
mjs1995 Aug 16, 2023
e7cecc4
test: add user_var_event tests by types
heehehe Aug 19, 2023
8c5d321
test: fix int / decimal table creation
heehehe Aug 19, 2023
0c2a4bd
test: fix int_event error
heehehe Aug 19, 2023
b13f1dd
fix unpacking of REAL_RESULT and adjust assertion
mjs1995 Aug 20, 2023
799baaa
feat: modify read int by using packet.read
heehehe Aug 20, 2023
5bde42a
refactor : type code and type method
sean-k1 Aug 26, 2023
4dabf14
fix: modify negative decimal extract process
heehehe Aug 26, 2023
69e296e
Merge branch 'main' into feature/user-var-event
heehehe Aug 26, 2023
b890ec6
fix: modify test_allowed_event_list
heehehe Aug 26, 2023
e48369e
remove: duplicated TestStatementConnectionSetting
mjs1995 Aug 27, 2023
047f429
refactor: add _read_new_decimal to BinLogEvent method
heehehe Aug 30, 2023
8c69c47
fix: modify _read_new_decimal return type
heehehe Aug 30, 2023
180c866
Feat: modify _read_int function
starcat37 Aug 31, 2023
005f645
Merge branch 'feature/user-var-event' of https://github.com/23-OSSCA-…
starcat37 Aug 31, 2023
d3f8b4f
refactor: add type hints and optimize read methods
mjs1995 Aug 31, 2023
de6d8e7
test: add user_var_event tests by types
mikaniz Aug 31, 2023
9d53176
revert: move __read_new_decimal back to row_event
heehehe Aug 31, 2023
b51bcfa
Merge branch 'main' into feature/user-var-event
mjs1995 Aug 31, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions pymysqlreplication/binlogstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
BeginLoadQueryEvent, ExecuteLoadQueryEvent,
HeartbeatLogEvent, NotImplementedEvent, MariadbGtidEvent,
MariadbAnnotateRowsEvent, RandEvent, MariadbStartEncryptionEvent, RowsQueryLogEvent,
MariadbGtidListEvent, MariadbBinLogCheckPointEvent)
MariadbGtidListEvent, MariadbBinLogCheckPointEvent, UserVarEvent)
from .exceptions import BinLogNotEnabled
from .gtid import GtidSet
from .packet import BinLogPacketWrapper
Expand Down Expand Up @@ -620,11 +620,12 @@ def _allowed_event_list(self, only_events, ignored_events,
TableMapEvent,
HeartbeatLogEvent,
NotImplementedEvent,
MariadbGtidEvent,
RowsQueryLogEvent,
MariadbGtidEvent,
MariadbAnnotateRowsEvent,
RandEvent,
MariadbStartEncryptionEvent,
RandEvent,
UserVarEvent,
MariadbGtidListEvent,
MariadbBinLogCheckPointEvent
))
Expand Down
180 changes: 179 additions & 1 deletion pymysqlreplication/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@
import binascii
import struct
import datetime
import decimal
from pymysqlreplication.constants.STATUS_VAR_KEY import *
from pymysqlreplication.exceptions import StatusVariableMismatch
from typing import Union, Optional


class BinLogEvent(object):
Expand Down Expand Up @@ -51,7 +53,6 @@ def _dump(self):
"""Core data dumped for the event"""
pass


class GtidEvent(BinLogEvent):
"""GTID change in binlog event
"""
Expand Down Expand Up @@ -507,6 +508,183 @@ def _dump(self):
print("type: %d" % (self.type))
print("Value: %d" % (self.value))

class RandEvent(BinLogEvent):
"""
RandEvent is generated every time a statement uses the RAND() function.
Indicates the seed values to use for generating a random number with RAND() in the next statement.

RandEvent only works in statement-based logging (need to set binlog_format as 'STATEMENT')
and only works when the seed number is not specified.

:ivar seed1: int - value for the first seed
:ivar seed2: int - value for the second seed
"""

def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs):
super(RandEvent, self).__init__(from_packet, event_size, table_map,
ctl_connection, **kwargs)
# Payload
self._seed1 = self.packet.read_uint64()
self._seed2 = self.packet.read_uint64()

@property
def seed1(self):
"""Get the first seed value"""
return self._seed1

@property
def seed2(self):
"""Get the second seed value"""
return self._seed2

def _dump(self):
super(RandEvent, self)._dump()
print("seed1: %d" % (self.seed1))
print("seed2: %d" % (self.seed2))

class UserVarEvent(BinLogEvent):
"""
UserVarEvent is generated every time a statement uses a user variable.
Indicates the value to use for the user variable in the next statement.

:ivar name_len: int - Length of user variable
:ivar name: str - User variable name
:ivar value: str - Value of the user variable
:ivar type: int - Type of the user variable
:ivar charset: int - The number of the character set for the user variable
:ivar is_null: int - Non-zero if the variable value is the SQL NULL value, 0 otherwise
:ivar flags: int - Extra flags associated with the user variable
"""

def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs):
super(UserVarEvent, self).__init__(from_packet, event_size, table_map, ctl_connection, **kwargs)

# Payload
self.name_len: int = self.packet.read_uint32()
self.name: str = self.packet.read(self.name_len).decode()
self.is_null: int = self.packet.read_uint8()
self.type_to_codes_and_method: dict = {
0x00: ['STRING_RESULT', self._read_string],
0x01: ['REAL_RESULT', self._read_real],
0x02: ['INT_RESULT', self._read_int],
0x03: ['ROW_RESULT', self._read_default],
0x04: ['DECIMAL_RESULT', self._read_decimal]
}

self.value: Optional[Union[str, float, int, decimal.Decimal]] = None
self.flags: Optional[int] = None
self.temp_value_buffer: Union[bytes, memoryview] = b''

if not self.is_null:
self.type: int = self.packet.read_uint8()
self.charset: int = self.packet.read_uint32()
self.value_len: int = self.packet.read_uint32()
self.temp_value_buffer: Union[bytes, memoryview] = self.packet.read(self.value_len)
self.flags: int = self.packet.read_uint8()
self._set_value_from_temp_buffer()
else:
self.type, self.charset, self.value_len, self.value, self.flags = None, None, None, None, None

def _set_value_from_temp_buffer(self):
"""
Set the value from the temporary buffer based on the type code.
"""
if self.temp_value_buffer:
type_code, read_method = self.type_to_codes_and_method.get(self.type, ["UNKNOWN_RESULT", self._read_default])
if type_code == 'INT_RESULT':
self.value = read_method(self.temp_value_buffer, self.flags)
else:
self.value = read_method(self.temp_value_buffer)

def _read_string(self, buffer: bytes) -> str:
"""
Read string data.
"""
return buffer.decode()

def _read_real(self, buffer: bytes) -> float:
"""
Read real data.
"""
return struct.unpack('<d', buffer)[0]

def _read_int(self, buffer: bytes, flags: int) -> int:
"""
Read integer data.
"""
fmt = '<Q' if flags == 1 else '<q'
return struct.unpack(fmt, buffer)[0]

def _read_decimal(self, buffer: bytes) -> decimal.Decimal:
"""
Read decimal data.
"""
self.precision = self.temp_value_buffer[0]
self.decimals = self.temp_value_buffer[1]
raw_decimal = self.temp_value_buffer[2:]
return self._parse_decimal_from_bytes(raw_decimal, self.precision, self.decimals)

def _read_default(self) -> bytes:
"""
Read default data.
Used when the type is None.
"""
return self.packet.read(self.value_len)

@staticmethod
def _parse_decimal_from_bytes(raw_decimal: bytes, precision: int, decimals: int) -> decimal.Decimal:
"""
Parse decimal from bytes.
"""
digits_per_integer = 9
compressed_bytes = [0, 1, 1, 2, 2, 3, 3, 4, 4, 4]
integral = precision - decimals

uncomp_integral, comp_integral = divmod(integral, digits_per_integer)
uncomp_fractional, comp_fractional = divmod(decimals, digits_per_integer)

res = "-" if not raw_decimal[0] & 0x80 else ""
mask = -1 if res == "-" else 0
raw_decimal = bytearray([raw_decimal[0] ^ 0x80]) + raw_decimal[1:]

def decode_decimal_decompress_value(comp_indx, data, mask):
size = compressed_bytes[comp_indx]
if size > 0:
databuff = bytearray(data[:size])
for i in range(size):
databuff[i] = (databuff[i] ^ mask) & 0xFF
return size, int.from_bytes(databuff, byteorder='big')
return 0, 0

pointer, value = decode_decimal_decompress_value(comp_integral, raw_decimal, mask)
res += str(value)

for _ in range(uncomp_integral):
value = struct.unpack('>i', raw_decimal[pointer:pointer+4])[0] ^ mask
res += '%09d' % value
pointer += 4

res += "."

for _ in range(uncomp_fractional):
value = struct.unpack('>i', raw_decimal[pointer:pointer+4])[0] ^ mask
res += '%09d' % value
pointer += 4

size, value = decode_decimal_decompress_value(comp_fractional, raw_decimal[pointer:], mask)
if size > 0:
res += '%0*d' % (comp_fractional, value)
return decimal.Decimal(res)

def _dump(self) -> None:
super(UserVarEvent, self)._dump()
print("User variable name: %s" % self.name)
print("Is NULL: %s" % ("Yes" if self.is_null else "No"))
if not self.is_null:
print("Type: %s" % self.type_to_codes_and_method.get(self.type, ['UNKNOWN_TYPE'])[0])
print("Charset: %s" % self.charset)
print("Value: %s" % self.value)
print("Flags: %s" % self.flags)

class RandEvent(BinLogEvent):
"""
Expand Down
1 change: 1 addition & 0 deletions pymysqlreplication/packet.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ class BinLogPacketWrapper(object):
constants.XA_PREPARE_EVENT: event.XAPrepareEvent,
constants.ROWS_QUERY_LOG_EVENT: event.RowsQueryLogEvent,
constants.RAND_EVENT: event.RandEvent,
constants.USER_VAR_EVENT: event.UserVarEvent,
# row_event
constants.UPDATE_ROWS_EVENT_V1: row_event.UpdateRowsEvent,
constants.WRITE_ROWS_EVENT_V1: row_event.WriteRowsEvent,
Expand Down
Loading