Skip to content

Commit 699ce88

Browse files
committed
First implementation
1 parent a0ce1d3 commit 699ce88

10 files changed

+678
-11
lines changed

.gitignore

+2
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,4 @@
11
node_modules/
22
.idea/
3+
private/*.json
4+
private/env/production/

.meteor/packages

+1
Original file line numberDiff line numberDiff line change
@@ -24,3 +24,4 @@ quave:collections
2424
quave:accounts-passwordless-react
2525
quave:logged-user-react
2626
quave:alert-react-tailwind
27+
littledata:synced-cron

.meteor/versions

+1
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ [email protected]
3535
3636
3737
38+
3839
3940
4041

app/cron.js

+32
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
import { SyncedCron } from 'meteor/littledata:synced-cron';
2+
import { Meteor } from 'meteor/meteor';
3+
import { runSyncs } from './runSyncs';
4+
5+
const {
6+
dataSyncCronHistoryCollectionName = 'dataSyncCronHistory',
7+
cronLogs = true,
8+
runSyncParserText = 'every 45 mins',
9+
disableCronRunSyncs,
10+
} = Meteor.settings || {};
11+
12+
SyncedCron.config({
13+
log: cronLogs,
14+
collectionName: dataSyncCronHistoryCollectionName,
15+
utc: false,
16+
});
17+
18+
Meteor.startup(() => {
19+
if (!disableCronRunSyncs) {
20+
console.log('runSyncs cron enabled.');
21+
SyncedCron.add({
22+
name: 'Run syncs',
23+
schedule: (parser) => parser.text(runSyncParserText),
24+
job: () =>
25+
runSyncs().catch((err) => {
26+
console.error('Error running cron syncs.', err);
27+
}),
28+
});
29+
}
30+
31+
SyncedCron.start();
32+
});

app/mongo.js

+15
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
import { Meteor } from 'meteor/meteor';
2+
import { createCollection } from 'meteor/quave:collections';
3+
4+
const collections = {};
5+
6+
export const createOrGetCollection = (arg) => {
7+
const { name } = arg;
8+
if (!collections[name]) {
9+
collections[name] = createCollection(
10+
name === 'users' ? { ...arg, instance: Meteor.users } : arg
11+
);
12+
}
13+
14+
return collections[name];
15+
};

app/runSync.js

+151
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
import { Meteor } from 'meteor/meteor';
2+
import { MongoInternals } from 'meteor/mongo';
3+
import { BigQuery } from '@google-cloud/bigquery';
4+
import { createOrGetCollection } from './mongo';
5+
6+
const {
7+
debug,
8+
dataSyncsCollectionName = 'dataSyncs',
9+
mongoOplogReadBatchSize = 100,
10+
mongoDatabase,
11+
bigQueryAccountsServiceFileName,
12+
bigQueryProjectId,
13+
bigQueryDatabase,
14+
} = Meteor.settings || {};
15+
16+
const options = {
17+
// eslint-disable-next-line no-undef
18+
keyFilename: Assets.absoluteFilePath(bigQueryAccountsServiceFileName),
19+
projectId: bigQueryProjectId,
20+
};
21+
22+
const bigquery = new BigQuery(options);
23+
24+
export const insertRows = async ({ table, rows }) => {
25+
try {
26+
await bigquery.dataset(bigQueryDatabase).table(table).insert(rows);
27+
} catch (e) {
28+
console.error('Error inserting rows to bigquery.', e);
29+
if (e.errors) {
30+
console.error(
31+
'Error inserting rows to bigquery.',
32+
JSON.stringify(e.errors, null, 2)
33+
);
34+
}
35+
throw e;
36+
}
37+
};
38+
39+
const DataSyncsCollection = createOrGetCollection({
40+
name: dataSyncsCollectionName,
41+
});
42+
43+
const getDataSync = async ({ collectionName }) => {
44+
const dataSync = await DataSyncsCollection.findOneAsync({ collectionName });
45+
46+
if (dataSync) {
47+
return dataSync;
48+
}
49+
return DataSyncsCollection.findOneAsync(
50+
await DataSyncsCollection.insertAsync({
51+
collectionName,
52+
createdAt: new Date(),
53+
updatedAt: new Date(),
54+
})
55+
);
56+
};
57+
const setDataSync = async ({ _id, ...rest }) => {
58+
await DataSyncsCollection.updateAsync(
59+
{ _id },
60+
{ $set: { ...rest, updatedAt: new Date() } }
61+
);
62+
};
63+
export const runSync = async ({ name: collectionName, projection = {} }) => {
64+
if (!process.env.MONGO_URL) {
65+
console.error('runSync error: missing MONGO_URL.');
66+
return;
67+
}
68+
if (!process.env.MONGO_OPLOG_URL) {
69+
console.error('runSync error: missing MONGO_OPLOG_URL.');
70+
return;
71+
}
72+
if (
73+
!mongoDatabase ||
74+
!bigQueryAccountsServiceFileName ||
75+
!bigQueryProjectId ||
76+
!bigQueryDatabase
77+
) {
78+
console.error(
79+
'runSync error: missing settings. All these fields are required: mongoDatabase, bigQueryAccountsServiceFileName, bigQueryProjectId, and bigQueryDatabase.'
80+
);
81+
return;
82+
}
83+
if (!collectionName) {
84+
console.error('runSync error: collectionName is required.');
85+
return;
86+
}
87+
console.log('runSync');
88+
89+
const dataSync = await getDataSync({ collectionName });
90+
const { lastSyncedDocumentWall, lastSyncedDocumentTs } = dataSync;
91+
const collection = createOrGetCollection({ name: collectionName });
92+
93+
const oplogDb =
94+
MongoInternals.defaultRemoteCollectionDriver().mongo._oplogHandle
95+
._oplogTailConnection.db;
96+
97+
const oplogCollection = oplogDb.collection('oplog.rs');
98+
99+
const query = {
100+
...(lastSyncedDocumentWall && { wall: { $gte: lastSyncedDocumentWall } }),
101+
// as we are using equal we should ignore the last one synced last time to not duplicate
102+
...(lastSyncedDocumentTs && { ts: { $ne: lastSyncedDocumentTs } }),
103+
op: { $in: ['u', 'i'] },
104+
ns: `${mongoDatabase}.${collectionName}`,
105+
};
106+
107+
const findOptions = {
108+
limit: mongoOplogReadBatchSize,
109+
sort: { wall: 1 },
110+
projection: { ts: 1, 'o2._id': 1, wall: 1 },
111+
};
112+
113+
const startOplogFind = new Date();
114+
const prefix = `runSync:${collectionName}:`;
115+
console.log(
116+
`${prefix}oplog find\n${JSON.stringify(query, null, 2)}\n${JSON.stringify(
117+
findOptions,
118+
null,
119+
2
120+
)}`
121+
);
122+
123+
const cursor = oplogCollection.find(query, findOptions);
124+
const data = await cursor.toArray();
125+
const tsOplogMessage = `(${(
126+
new Date().getTime() - startOplogFind.getTime()
127+
).toLocaleString()}ms):`;
128+
if (!data.length) {
129+
console.log(`${prefix}${tsOplogMessage}oplog no data`);
130+
return;
131+
}
132+
console.log(`${prefix}${tsOplogMessage}oplog data (${data.length})`);
133+
if (debug) {
134+
console.log(`${JSON.stringify(data, null, 2)}`);
135+
}
136+
const ids = data.map(({ o2 }) => o2._id);
137+
const rows = collection.find({ _id: { $in: ids } }, { projection }).fetch();
138+
await insertRows({
139+
table: collectionName,
140+
rows,
141+
});
142+
const lastDatum = data[rows.length - 1];
143+
144+
const newDataSync = {
145+
...dataSync,
146+
lastSyncedDocumentTs: lastDatum.ts,
147+
lastSyncedDocumentWall: lastDatum.wall,
148+
};
149+
console.log(`${prefix}setDataSync\n${JSON.stringify(newDataSync, null, 2)}`);
150+
await setDataSync(newDataSync);
151+
};

app/runSyncs.js

+29
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
import { runSync } from './runSync';
2+
import { Meteor } from 'meteor/meteor';
3+
4+
const { mongoCollections = [] } = Meteor.settings || {};
5+
export const runSyncs = async () => {
6+
if (!mongoCollections || !mongoCollections.length) {
7+
console.error('runSyncs error: missing mongoCollections in settings.');
8+
return;
9+
}
10+
for await (const collectionData of mongoCollections) {
11+
const start = new Date();
12+
try {
13+
console.log(`Starting run sync for collection ${collectionData.name}`);
14+
await runSync(collectionData);
15+
console.log(
16+
`Finished run sync for collection ${collectionData.name} (${(
17+
new Date().getTime() - start.getTime()
18+
).toLocaleString()}ms)`
19+
);
20+
} catch (e) {
21+
console.error(
22+
`Error run sync for collection ${collectionData.name} (${(
23+
new Date().getTime() - start.getTime()
24+
).toLocaleString()}ms)`,
25+
e
26+
);
27+
}
28+
}
29+
};

0 commit comments

Comments
 (0)