Skip to content

Commit 8f4fade

Browse files
committed
add inserting-data-to-elasticsearch-in-bulk feature
1 parent a341a89 commit 8f4fade

File tree

1 file changed

+25
-13
lines changed

1 file changed

+25
-13
lines changed

pymyelarepl/pymyelarepl.py

+25-13
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import yaml
77

88
from pymysqlreplication import BinLogStreamReader
9+
from pymysqlreplication.event import XidEvent
910
from pymysqlreplication.row_event import DeleteRowsEvent, UpdateRowsEvent, WriteRowsEvent
1011

1112

@@ -34,7 +35,7 @@ def __init__(self, config_path):
3435
self.binlog_stream_reader = BinLogStreamReader(
3536
connection_settings=self.mysql_conf,
3637
server_id=self.config['mysql']['server_id'],
37-
only_events=[DeleteRowsEvent, WriteRowsEvent, UpdateRowsEvent],
38+
only_events=[DeleteRowsEvent, WriteRowsEvent, UpdateRowsEvent, XidEvent],
3839
log_file=self.config['mysql']['log_file'],
3940
log_pos=self.config['mysql']['log_pos'],
4041
resume_stream=True if self.config['mysql']['log_pos'] != 0 else False,
@@ -61,22 +62,33 @@ def serialize_not_serializable(self, obj):
6162
return str(obj)
6263
raise TypeError('Type not serializable for obj {obj}'.format(obj=obj))
6364

64-
def convert_event_to_valid_es_data_format(self, event):
65-
meta = json.dumps({event['action']: {'_index': event['index'], '_id': event['id']}})
65+
def convert_event_to_valid_es_data_format(self, event):
66+
converted = ''
67+
68+
for e in event:
69+
meta = json.dumps({e['action']: {'_index': e['index'], '_id': e['id']}})
6670

67-
if event['action'] == 'delete':
68-
converted = meta + '\n'
69-
elif event['action'] == 'update':
70-
body = json.dumps({'doc': event['doc']}, default=self.serialize_not_serializable)
71-
converted = meta + '\n' + body + '\n'
72-
elif event['action'] == 'create':
73-
body = json.dumps(event['doc'], default=self.serialize_not_serializable)
74-
converted = meta + '\n' + body + '\n'
71+
if e['action'] == 'delete':
72+
converted += ''.join([meta, '\n'])
73+
elif e['action'] == 'update':
74+
body = json.dumps({'doc': e['doc']}, default=self.serialize_not_serializable)
75+
converted += ''.join([meta, '\n', body, '\n'])
76+
elif e['action'] == 'create':
77+
body = json.dumps(e['doc'], default=self.serialize_not_serializable)
78+
converted += ''.join([meta, '\n', body, '\n'])
7579

7680
return converted
7781

78-
def get_binlog_event(self):
82+
def get_binlog_event(self):
83+
extracted_collection = []
84+
7985
for event in self.binlog_stream_reader:
86+
if isinstance(event, XidEvent):
87+
yield extracted_collection
88+
89+
extracted_collection = []
90+
continue
91+
8092
for row in event.rows:
8193
if isinstance(event, DeleteRowsEvent):
8294
extracted = {
@@ -99,7 +111,7 @@ def get_binlog_event(self):
99111
'doc': {k: v for k, v in row['values'].items() if k != event.primary_key}
100112
}
101113

102-
yield extracted
114+
extracted_collection.append(extracted)
103115

104116
self.binlog_stream_reader.close()
105117
print('Info: Mysql connection closed successfully after reading all binlog events.')

0 commit comments

Comments
 (0)