forked from julien-duponchelle/python-mysql-replication
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrethinkdb_sync.py
65 lines (51 loc) · 1.51 KB
/
rethinkdb_sync.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Insert a new element in a RethinkDB database
# when an evenement is trigger in MySQL replication log
#
# Please test with MySQL employees DB available here:
# https://launchpad.net/test-db/
#
import rethinkdb
from pymysqlreplication import BinLogStreamReader
from pymysqlreplication.row_event import (
DeleteRowsEvent,
UpdateRowsEvent,
WriteRowsEvent,
)
MYSQL_SETTINGS = {
"host": "127.0.0.1",
"port": 3306,
"user": "root",
"passwd": ""
}
def main():
# connect rethinkdb
rethinkdb.connect("localhost", 28015, "mysql")
try:
rethinkdb.db_drop("mysql").run()
except:
pass
rethinkdb.db_create("mysql").run()
tables = ["dept_emp", "dept_manager", "titles",
"salaries", "employees", "departments"]
for table in tables:
rethinkdb.db("mysql").table_create(table).run()
stream = BinLogStreamReader(
connection_settings=MYSQL_SETTINGS,
blocking=True,
only_events=[DeleteRowsEvent, WriteRowsEvent, UpdateRowsEvent],
)
# process Feed
for binlogevent in stream:
if not isinstance(binlogevent, WriteRowsEvent):
continue
for row in binlogevent.rows:
if not binlogevent.schema == "employees":
continue
vals = dict((str(k), str(v)) for k, v in row["values"].iteritems())
rethinkdb.table(binlogevent.table).insert(vals).run()
stream.close()
if __name__ == "__main__":
main()