@@ -282,13 +282,23 @@ where
282
282
WormholePayload :: Merkle ( proof) => {
283
283
tracing:: info!( slot = proof. slot, "Storing VAA Merkle Proof." ) ;
284
284
285
- store_wormhole_merkle_verified_message (
285
+ // Store the wormhole merkle verified message and check if it was already stored
286
+ let is_new = store_wormhole_merkle_verified_message (
286
287
self ,
287
288
proof. clone ( ) ,
288
289
update_vaa. to_owned ( ) ,
289
290
)
290
291
. await ?;
291
292
293
+ // If the message was already stored, return early
294
+ if !is_new {
295
+ tracing:: info!(
296
+ slot = proof. slot,
297
+ "VAA Merkle Proof already stored, skipping."
298
+ ) ;
299
+ return Ok ( ( ) ) ;
300
+ }
301
+
292
302
self . into ( )
293
303
. data
294
304
. write ( )
@@ -304,9 +314,22 @@ where
304
314
let slot = accumulator_messages. slot ;
305
315
tracing:: info!( slot = slot, "Storing Accumulator Messages." ) ;
306
316
307
- self . store_accumulator_messages ( accumulator_messages)
317
+ // Store the accumulator messages and check if they were already stored in a single operation
318
+ // This avoids the race condition where multiple threads could check and find nothing
319
+ // but then both store the same messages
320
+ let is_new = self
321
+ . store_accumulator_messages ( accumulator_messages)
308
322
. await ?;
309
323
324
+ // If the messages were already stored, return early
325
+ if !is_new {
326
+ tracing:: info!(
327
+ slot = slot,
328
+ "Accumulator Messages already stored, skipping."
329
+ ) ;
330
+ return Ok ( ( ) ) ;
331
+ }
332
+
310
333
self . into ( )
311
334
. data
312
335
. write ( )
@@ -351,28 +374,23 @@ where
351
374
// Update the aggregate state
352
375
let mut aggregate_state = self . into ( ) . data . write ( ) . await ;
353
376
354
- // Send update event to subscribers. We are purposefully ignoring the result
355
- // because there might be no subscribers.
356
- let _ = match aggregate_state. latest_completed_slot {
377
+ // Atomic check and update
378
+ let event = match aggregate_state. latest_completed_slot {
357
379
None => {
358
- aggregate_state. latest_completed_slot . replace ( slot) ;
359
- self . into ( )
360
- . api_update_tx
361
- . send ( AggregationEvent :: New { slot } )
380
+ aggregate_state. latest_completed_slot = Some ( slot) ;
381
+ AggregationEvent :: New { slot }
362
382
}
363
383
Some ( latest) if slot > latest => {
364
384
self . prune_removed_keys ( message_state_keys) . await ;
365
- aggregate_state. latest_completed_slot . replace ( slot) ;
366
- self . into ( )
367
- . api_update_tx
368
- . send ( AggregationEvent :: New { slot } )
385
+ aggregate_state. latest_completed_slot = Some ( slot) ;
386
+ AggregationEvent :: New { slot }
369
387
}
370
- _ => self
371
- . into ( )
372
- . api_update_tx
373
- . send ( AggregationEvent :: OutOfOrder { slot } ) ,
388
+ _ => AggregationEvent :: OutOfOrder { slot } ,
374
389
} ;
375
390
391
+ // Only send the event after the state has been updated
392
+ let _ = self . into ( ) . api_update_tx . send ( event) ;
393
+
376
394
aggregate_state. latest_completed_slot = aggregate_state
377
395
. latest_completed_slot
378
396
. map ( |latest| latest. max ( slot) )
@@ -1374,6 +1392,115 @@ mod test {
1374
1392
1375
1393
assert_eq ! ( result. unwrap_err( ) . to_string( ) , "Message not found" ) ;
1376
1394
}
1395
+
1396
+ /// Test that verifies only one event is sent per slot, even when updates arrive out of order
1397
+ /// or when a slot is processed multiple times.
1398
+ #[ tokio:: test]
1399
+ pub async fn test_out_of_order_updates_send_single_event_per_slot ( ) {
1400
+ let ( state, mut update_rx) = setup_state ( 10 ) . await ;
1401
+
1402
+ // Create price feed messages
1403
+ let price_feed_100 = create_dummy_price_feed_message ( 100 , 10 , 9 ) ;
1404
+ let price_feed_101 = create_dummy_price_feed_message ( 100 , 11 , 10 ) ;
1405
+
1406
+ // First, process slot 100
1407
+ store_multiple_concurrent_valid_updates (
1408
+ state. clone ( ) ,
1409
+ generate_update ( vec ! [ Message :: PriceFeedMessage ( price_feed_100) ] , 100 , 20 ) ,
1410
+ )
1411
+ . await ;
1412
+
1413
+ // Check that we received the New event for slot 100
1414
+ assert_eq ! (
1415
+ update_rx. recv( ) . await ,
1416
+ Ok ( AggregationEvent :: New { slot: 100 } )
1417
+ ) ;
1418
+
1419
+ // Next, process slot 101
1420
+ store_multiple_concurrent_valid_updates (
1421
+ state. clone ( ) ,
1422
+ generate_update ( vec ! [ Message :: PriceFeedMessage ( price_feed_101) ] , 101 , 21 ) ,
1423
+ )
1424
+ . await ;
1425
+
1426
+ // Check that we received the New event for slot 101
1427
+ assert_eq ! (
1428
+ update_rx. recv( ) . await ,
1429
+ Ok ( AggregationEvent :: New { slot: 101 } )
1430
+ ) ;
1431
+
1432
+ // Now, process slot 100 again
1433
+ store_multiple_concurrent_valid_updates (
1434
+ state. clone ( ) ,
1435
+ generate_update ( vec ! [ Message :: PriceFeedMessage ( price_feed_100) ] , 100 , 22 ) ,
1436
+ )
1437
+ . await ;
1438
+
1439
+ // Try to receive another event with a timeout to ensure no more events were sent
1440
+ // We should not receive an OutOfOrder event for slot 100 since we've already sent an event for it
1441
+ let timeout_result =
1442
+ tokio:: time:: timeout ( std:: time:: Duration :: from_millis ( 100 ) , update_rx. recv ( ) ) . await ;
1443
+
1444
+ // The timeout should occur, indicating no more events were received
1445
+ assert ! (
1446
+ timeout_result. is_err( ) ,
1447
+ "Received unexpected additional event"
1448
+ ) ;
1449
+
1450
+ // Verify that both price feeds were stored correctly
1451
+ let price_feed_ids = ( * state) . get_price_feed_ids ( ) . await ;
1452
+ assert_eq ! ( price_feed_ids. len( ) , 1 ) ;
1453
+ assert ! ( price_feed_ids. contains( & PriceIdentifier :: new( [ 100 ; 32 ] ) ) ) ;
1454
+ }
1455
+
1456
+ /// Test that verifies only one event is sent when multiple concurrent updates
1457
+ /// for the same slot are processed.
1458
+ #[ tokio:: test]
1459
+ pub async fn test_concurrent_updates_same_slot_sends_single_event ( ) {
1460
+ let ( state, mut update_rx) = setup_state ( 10 ) . await ;
1461
+
1462
+ // Create a single price feed message
1463
+ let price_feed = create_dummy_price_feed_message ( 100 , 10 , 9 ) ;
1464
+
1465
+ // Generate 100 identical updates for the same slot but with different sequence numbers
1466
+ let mut all_updates = Vec :: new ( ) ;
1467
+ for seq in 0 ..100 {
1468
+ let updates = generate_update ( vec ! [ Message :: PriceFeedMessage ( price_feed) ] , 10 , seq) ;
1469
+ all_updates. extend ( updates) ;
1470
+ }
1471
+
1472
+ // Process updates concurrently - we don't care if some fail due to the race condition
1473
+ // The important thing is that only one event is sent
1474
+ let state_arc = Arc :: clone ( & state) ;
1475
+ let futures = all_updates. into_iter ( ) . map ( move |u| {
1476
+ let state_clone = Arc :: clone ( & state_arc) ;
1477
+ async move {
1478
+ let _ = state_clone. store_update ( u) . await ;
1479
+ }
1480
+ } ) ;
1481
+ futures:: future:: join_all ( futures) . await ;
1482
+
1483
+ // Check that only one AggregationEvent::New is received
1484
+ assert_eq ! (
1485
+ update_rx. recv( ) . await ,
1486
+ Ok ( AggregationEvent :: New { slot: 10 } )
1487
+ ) ;
1488
+
1489
+ // Try to receive another event with a timeout to ensure no more events were sent
1490
+ let timeout_result =
1491
+ tokio:: time:: timeout ( std:: time:: Duration :: from_millis ( 100 ) , update_rx. recv ( ) ) . await ;
1492
+
1493
+ // The timeout should occur, indicating no more events were received
1494
+ assert ! (
1495
+ timeout_result. is_err( ) ,
1496
+ "Received unexpected additional event"
1497
+ ) ;
1498
+
1499
+ // Verify that the price feed was stored correctly
1500
+ let price_feed_ids = ( * state) . get_price_feed_ids ( ) . await ;
1501
+ assert_eq ! ( price_feed_ids. len( ) , 1 ) ;
1502
+ assert ! ( price_feed_ids. contains( & PriceIdentifier :: new( [ 100 ; 32 ] ) ) ) ;
1503
+ }
1377
1504
}
1378
1505
#[ cfg( test) ]
1379
1506
/// Unit tests for the core TWAP calculation logic in `calculate_twap`
0 commit comments