forked from rganowski/citus-kafka
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathconsumer.py
32 lines (27 loc) · 1.18 KB
/
consumer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import json
from datetime import datetime
from kafka import KafkaConsumer
consumer = KafkaConsumer(
bootstrap_servers=['localhost:19093', 'localhost:29093'],
# Requesting all already existing topic entries (defaults to those appearing after subscribing)
auto_offset_reset="earliest",
auto_commit_interval_ms=1000,
enable_auto_commit=True,
value_deserializer=lambda value: json.loads(value) if value else None,
# You may also want to play with groups for checking consumers with the same or different group working concurrently
# group_id="group_name",
)
# This could be done as a part of KafkaConnect creation. Here we can utilize regex patterns. Worth to know.
consumer.subscribe(pattern="table_1")
operations = {'c': 'INSERT', 'u': 'UPDATE', 'd': 'DELETE', 'r': 'INIT'}
try:
for msg in consumer:
if msg.value:
op = operations[msg.value['payload']['op']]
print(
f"P{msg.partition}: {datetime.fromtimestamp(msg.timestamp/1000)}: "
f"{op}: "
f"{msg.value['payload']['before'] if op == 'DELETE' else msg.value['payload']['after']}"
)
except KeyboardInterrupt:
print("…Bye")