Skip to content

Commit b62193e

Browse files
authored
fix(rln-relay): scope of getEvents (#1672)
1 parent 9616253 commit b62193e

File tree

1 file changed

+46
-28
lines changed

1 file changed

+46
-28
lines changed

waku/v2/protocol/waku_rln_relay/group_manager/on_chain/group_manager.nim

+46-28
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ type
6060
const DefaultKeyStorePath* = "rlnKeystore.json"
6161
const DefaultKeyStorePassword* = "password"
6262

63-
template initializedGuard*(g: OnchainGroupManager): untyped =
63+
template initializedGuard(g: OnchainGroupManager): untyped =
6464
if not g.initialized:
6565
raise newException(ValueError, "OnchainGroupManager is not initialized")
6666

@@ -156,7 +156,7 @@ method withdrawBatch*(g: OnchainGroupManager, idCommitments: seq[IDCommitment]):
156156

157157
# TODO: after slashing is enabled on the contract
158158

159-
proc parseEvent*(event: type MemberRegistered,
159+
proc parseEvent(event: type MemberRegistered,
160160
log: JsonNode): GroupManagerResult[Membership] =
161161
## parses the `data` parameter of the `MemberRegistered` event `log`
162162
## returns an error if it cannot parse the `data` parameter
@@ -196,7 +196,16 @@ proc backfillRootQueue*(g: OnchainGroupManager, blockTable: BlockTable): Future[
196196
# add the backfilled root
197197
g.validRoots.addLast(g.validRootBuffer.popLast())
198198

199-
proc getEvents*(g: OnchainGroupManager, fromBlock: BlockNumber, toBlock: Option[BlockNumber] = none(BlockNumber)): Future[BlockTable] {.async.} =
199+
proc insert(blockTable: var BlockTable, blockNumber: BlockNumber, member: Membership) =
200+
if blockTable.hasKeyOrPut(blockNumber, @[member]):
201+
try:
202+
blockTable[blockNumber].add(member)
203+
except KeyError: # qed
204+
error "could not insert member into block table", blockNumber=blockNumber, member=member
205+
206+
proc getRawEvents(g: OnchainGroupManager,
207+
fromBlock: BlockNumber,
208+
toBlock: Option[BlockNumber] = none(BlockNumber)): Future[JsonNode] {.async.} =
200209
initializedGuard(g)
201210

202211
let ethRpc = g.ethRpc.get()
@@ -212,13 +221,24 @@ proc getEvents*(g: OnchainGroupManager, fromBlock: BlockNumber, toBlock: Option[
212221
else:
213222
normalizedToBlock = fromBlock
214223

224+
let events = await rlnContract.getJsonLogs(MemberRegistered,
225+
fromBlock = some(fromBlock.blockId()),
226+
toBlock = some(normalizedToBlock.blockId()))
227+
return events
228+
229+
proc getBlockTables(g: OnchainGroupManager,
230+
fromBlock: BlockNumber,
231+
toBlock: Option[BlockNumber] = none(BlockNumber)): Future[(BlockTable, BlockTable)] {.async.} =
232+
initializedGuard(g)
233+
215234
var blockTable = default(BlockTable)
216235
var toRemoveBlockTable = default(BlockTable)
217236

218-
let events = await rlnContract.getJsonLogs(MemberRegistered, fromBlock = some(fromBlock.blockId()), toBlock = some(normalizedToBlock.blockId()))
237+
let events = await g.getRawEvents(fromBlock, toBlock)
238+
219239
if events.len == 0:
220240
debug "no events found"
221-
return blockTable
241+
return (blockTable, toRemoveBlockTable)
222242

223243
for event in events:
224244
let blockNumber = parseHexInt(event["blockNumber"].getStr()).uint
@@ -232,21 +252,13 @@ proc getEvents*(g: OnchainGroupManager, fromBlock: BlockNumber, toBlock: Option[
232252
if removed:
233253
# remove the registration from the tree, per block
234254
warn "member removed from the tree as per canonical chain", index=parsedEvent.index
235-
if toRemoveBlockTable.hasKey(blockNumber):
236-
toRemoveBlockTable[blockNumber].add(parsedEvent)
237-
else:
238-
toRemoveBlockTable[blockNumber] = @[parsedEvent]
239-
240-
await g.backfillRootQueue(toRemoveBlockTable)
241-
242-
if blockTable.hasKey(blockNumber):
243-
blockTable[blockNumber].add(parsedEvent)
255+
toRemoveBlockTable.insert(blockNumber, parsedEvent)
244256
else:
245-
blockTable[blockNumber] = @[parsedEvent]
257+
blockTable.insert(blockNumber, parsedEvent)
246258

247-
return blockTable
259+
return (blockTable, toRemoveBlockTable)
248260

249-
proc seedBlockTableIntoTree*(g: OnchainGroupManager, blockTable: BlockTable): Future[void] {.async.} =
261+
proc handleValidEvents(g: OnchainGroupManager, blockTable: BlockTable): Future[void] {.async.} =
250262
initializedGuard(g)
251263

252264
for blockNumber, members in blockTable.pairs():
@@ -268,30 +280,36 @@ proc seedBlockTableIntoTree*(g: OnchainGroupManager, blockTable: BlockTable): Fu
268280

269281
return
270282

271-
proc getEventsAndSeedIntoTree*(g: OnchainGroupManager,
272-
fromBlock: BlockNumber,
273-
toBlock: Option[BlockNumber] = none(BlockNumber)): Future[void] {.async.} =
283+
proc handleRemovedEvents(g: OnchainGroupManager, toRemoveBlockTable: BlockTable): Future[void] {.async.} =
284+
initializedGuard(g)
285+
286+
await g.backfillRootQueue(toRemoveBlockTable)
287+
288+
proc getAndHandleEvents(g: OnchainGroupManager,
289+
fromBlock: BlockNumber,
290+
toBlock: Option[BlockNumber] = none(BlockNumber)): Future[void] {.async.} =
274291
initializedGuard(g)
275292

276-
let events = await g.getEvents(fromBlock, toBlock)
277-
await g.seedBlockTableIntoTree(events)
293+
let (validEvents, removedEvents) = await g.getBlockTables(fromBlock, toBlock)
294+
await g.handleRemovedEvents(removedEvents)
295+
await g.handleValidEvents(validEvents)
278296
return
279297

280-
proc getNewHeadCallback*(g: OnchainGroupManager): BlockHeaderHandler =
298+
proc getNewHeadCallback(g: OnchainGroupManager): BlockHeaderHandler =
281299
proc newHeadCallback(blockheader: BlockHeader) {.gcsafe.} =
282300
let latestBlock = blockheader.number.uint
283301
debug "block received", blockNumber = latestBlock
284302
# get logs from the last block
285303
try:
286-
asyncSpawn g.getEventsAndSeedIntoTree(latestBlock)
304+
asyncSpawn g.getAndHandleEvents(latestBlock)
287305
except CatchableError:
288306
warn "failed to handle log: ", error=getCurrentExceptionMsg()
289307
return newHeadCallback
290308

291309
proc newHeadErrCallback(error: CatchableError) =
292310
warn "failed to get new head", error=error.msg
293311

294-
proc startListeningToEvents*(g: OnchainGroupManager): Future[void] {.async.} =
312+
proc startListeningToEvents(g: OnchainGroupManager): Future[void] {.async.} =
295313
initializedGuard(g)
296314

297315
let ethRpc = g.ethRpc.get()
@@ -301,11 +319,11 @@ proc startListeningToEvents*(g: OnchainGroupManager): Future[void] {.async.} =
301319
except CatchableError:
302320
raise newException(ValueError, "failed to subscribe to block headers: " & getCurrentExceptionMsg())
303321

304-
proc startOnchainSync*(g: OnchainGroupManager, fromBlock: BlockNumber = BlockNumber(0)): Future[void] {.async.} =
322+
proc startOnchainSync(g: OnchainGroupManager, fromBlock: BlockNumber = BlockNumber(0)): Future[void] {.async.} =
305323
initializedGuard(g)
306324

307325
try:
308-
await g.getEventsAndSeedIntoTree(fromBlock, some(fromBlock))
326+
await g.getAndHandleEvents(fromBlock, some(fromBlock))
309327
except CatchableError:
310328
raise newException(ValueError, "failed to get the history/reconcile missed blocks: " & getCurrentExceptionMsg())
311329

@@ -315,7 +333,7 @@ proc startOnchainSync*(g: OnchainGroupManager, fromBlock: BlockNumber = BlockNum
315333
except CatchableError:
316334
raise newException(ValueError, "failed to start listening to events: " & getCurrentExceptionMsg())
317335

318-
proc persistCredentials*(g: OnchainGroupManager): GroupManagerResult[void] =
336+
proc persistCredentials(g: OnchainGroupManager): GroupManagerResult[void] =
319337
if not g.saveKeystore:
320338
return ok()
321339
if g.idCredentials.isNone():

0 commit comments

Comments
 (0)