-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathetl_2_batch.py
executable file
·78 lines (63 loc) · 2.23 KB
/
etl_2_batch.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
#!/usr/bin/python
import MySQLdb
from pymongo import MongoClient
client = MongoClient("localhost:27017")
db = client.etlpro
orders = db.orders
cnx = MySQLdb.connect(user='root', passwd='hello', db='etlpro')
#cnx.time_zone = 'UTC'
cursor = cnx.cursor()
order_query_stmt = ("select id as order_id, first_name, last_name, shipping_address "
"from orders")
bulk = orders.initialize_ordered_bulk_op()
bulk_count = 0
cursor.execute(order_query_stmt)
for (order_id, first_name, last_name, shipping_address) in cursor:
doc = { "order_id" : order_id,
"first_name" : first_name,
"last_name" : last_name,
"shipping_address" : shipping_address,
"items" : [],
"tracking" : [] }
if bulk_count > 0 and bulk_count % 1000 == 0:
bulk.execute()
bulk = orders.initialize_ordered_bulk_op()
bulk_count = 0
bulk.insert(doc)
bulk_count += 1
bulk.execute()
bulk = orders.initialize_ordered_bulk_op()
bulk_count = 0
cursor.execute("""
select id as item_id, order_id, qty, description, price from items
""")
for (item_id, order_id, qty, description, price) in cursor:
if bulk_count > 0 and bulk_count % 1000 == 0:
bulk.execute()
bulk = orders.initialize_ordered_bulk_op()
bulk_count = 0
bulk.find({"order_id" : order_id}).update(
{"$push" : { "items" : { "item_id" : item_id,
"qty" : qty,
"description" : description,
"price" : price }}})
bulk_count += 1
bulk.execute()
bulk = orders.initialize_ordered_bulk_op()
bulk_count = 0
cursor.execute("""
select order_id, status, timestamp from tracking
""" )
for (order_id, status, time_stamp) in cursor:
if bulk_count > 0 and bulk_count % 1000 == 0:
bulk.execute()
bulk = orders.initialize_ordered_bulk_op()
bulk_count = 0
bulk.find({"order_id" : order_id}).update(
{"$push" : { "tracking" : { "status" : status,
"timestamp" : time_stamp }}})
bulk_count += 1
bulk.execute()
cursor.close()
cnx.close()
client.close()