diff --git a/apps/hermes/server/Cargo.lock b/apps/hermes/server/Cargo.lock index 7ed3df5239..885efef7f3 100644 --- a/apps/hermes/server/Cargo.lock +++ b/apps/hermes/server/Cargo.lock @@ -1868,7 +1868,7 @@ checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" [[package]] name = "hermes" -version = "0.8.2" +version = "0.8.3" dependencies = [ "anyhow", "async-trait", diff --git a/apps/hermes/server/Cargo.toml b/apps/hermes/server/Cargo.toml index b2a367efdd..f48c9d1ef4 100644 --- a/apps/hermes/server/Cargo.toml +++ b/apps/hermes/server/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "hermes" -version = "0.8.2" +version = "0.8.3" description = "Hermes is an agent that provides Verified Prices from the Pythnet Pyth Oracle." edition = "2021" diff --git a/apps/hermes/server/src/api/rest/v2/sse.rs b/apps/hermes/server/src/api/rest/v2/sse.rs index 70c5766870..2fb9161dc1 100644 --- a/apps/hermes/server/src/api/rest/v2/sse.rs +++ b/apps/hermes/server/src/api/rest/v2/sse.rs @@ -93,34 +93,36 @@ where // Convert the broadcast receiver into a Stream let stream = BroadcastStream::new(update_rx); - let sse_stream = stream.then(move |message| { - let state_clone = state.clone(); // Clone again to use inside the async block - let price_ids_clone = price_ids.clone(); // Clone again for use inside the async block - async move { - match message { - Ok(event) => { - match handle_aggregation_event( - event, - state_clone, - price_ids_clone, - params.encoding, - params.parsed, - params.benchmarks_only, - params.allow_unordered, - ) - .await - { - Ok(Some(update)) => Ok(Event::default() - .json_data(update) - .unwrap_or_else(error_event)), - Ok(None) => Ok(Event::default().comment("No update available")), - Err(e) => Ok(error_event(e)), + let sse_stream = stream + .then(move |message| { + let state_clone = state.clone(); // Clone again to use inside the async block + let price_ids_clone = price_ids.clone(); // Clone again for use inside the async block + async move { + match message { + Ok(event) => { + match handle_aggregation_event( + event, + state_clone, + price_ids_clone, + params.encoding, + params.parsed, + params.benchmarks_only, + params.allow_unordered, + ) + .await + { + Ok(Some(update)) => Some(Ok(Event::default() + .json_data(update) + .unwrap_or_else(error_event))), + Ok(None) => None, + Err(e) => Some(Ok(error_event(e))), + } } + Err(e) => Some(Ok(error_event(e))), } - Err(e) => Ok(error_event(e)), } - } - }); + }) + .filter_map(|x| x); Ok(Sse::new(sse_stream).keep_alive(KeepAlive::default())) } diff --git a/apps/hermes/server/src/state/aggregate.rs b/apps/hermes/server/src/state/aggregate.rs index 43928b9dba..593a315201 100644 --- a/apps/hermes/server/src/state/aggregate.rs +++ b/apps/hermes/server/src/state/aggregate.rs @@ -282,13 +282,23 @@ where WormholePayload::Merkle(proof) => { tracing::info!(slot = proof.slot, "Storing VAA Merkle Proof."); - store_wormhole_merkle_verified_message( + // Store the wormhole merkle verified message and check if it was already stored + let is_new = store_wormhole_merkle_verified_message( self, proof.clone(), update_vaa.to_owned(), ) .await?; + // If the message was already stored, return early + if !is_new { + tracing::info!( + slot = proof.slot, + "VAA Merkle Proof already stored, skipping." + ); + return Ok(()); + } + self.into() .data .write() @@ -304,9 +314,22 @@ where let slot = accumulator_messages.slot; tracing::info!(slot = slot, "Storing Accumulator Messages."); - self.store_accumulator_messages(accumulator_messages) + // Store the accumulator messages and check if they were already stored in a single operation + // This avoids the race condition where multiple threads could check and find nothing + // but then both store the same messages + let is_new = self + .store_accumulator_messages(accumulator_messages) .await?; + // If the messages were already stored, return early + if !is_new { + tracing::info!( + slot = slot, + "Accumulator Messages already stored, skipping." + ); + return Ok(()); + } + self.into() .data .write() @@ -351,28 +374,23 @@ where // Update the aggregate state let mut aggregate_state = self.into().data.write().await; - // Send update event to subscribers. We are purposefully ignoring the result - // because there might be no subscribers. - let _ = match aggregate_state.latest_completed_slot { + // Atomic check and update + let event = match aggregate_state.latest_completed_slot { None => { - aggregate_state.latest_completed_slot.replace(slot); - self.into() - .api_update_tx - .send(AggregationEvent::New { slot }) + aggregate_state.latest_completed_slot = Some(slot); + AggregationEvent::New { slot } } Some(latest) if slot > latest => { self.prune_removed_keys(message_state_keys).await; - aggregate_state.latest_completed_slot.replace(slot); - self.into() - .api_update_tx - .send(AggregationEvent::New { slot }) + aggregate_state.latest_completed_slot = Some(slot); + AggregationEvent::New { slot } } - _ => self - .into() - .api_update_tx - .send(AggregationEvent::OutOfOrder { slot }), + _ => AggregationEvent::OutOfOrder { slot }, }; + // Only send the event after the state has been updated + let _ = self.into().api_update_tx.send(event); + aggregate_state.latest_completed_slot = aggregate_state .latest_completed_slot .map(|latest| latest.max(slot)) @@ -1374,6 +1392,115 @@ mod test { assert_eq!(result.unwrap_err().to_string(), "Message not found"); } + + /// Test that verifies only one event is sent per slot, even when updates arrive out of order + /// or when a slot is processed multiple times. + #[tokio::test] + pub async fn test_out_of_order_updates_send_single_event_per_slot() { + let (state, mut update_rx) = setup_state(10).await; + + // Create price feed messages + let price_feed_100 = create_dummy_price_feed_message(100, 10, 9); + let price_feed_101 = create_dummy_price_feed_message(100, 11, 10); + + // First, process slot 100 + store_multiple_concurrent_valid_updates( + state.clone(), + generate_update(vec![Message::PriceFeedMessage(price_feed_100)], 100, 20), + ) + .await; + + // Check that we received the New event for slot 100 + assert_eq!( + update_rx.recv().await, + Ok(AggregationEvent::New { slot: 100 }) + ); + + // Next, process slot 101 + store_multiple_concurrent_valid_updates( + state.clone(), + generate_update(vec![Message::PriceFeedMessage(price_feed_101)], 101, 21), + ) + .await; + + // Check that we received the New event for slot 101 + assert_eq!( + update_rx.recv().await, + Ok(AggregationEvent::New { slot: 101 }) + ); + + // Now, process slot 100 again + store_multiple_concurrent_valid_updates( + state.clone(), + generate_update(vec![Message::PriceFeedMessage(price_feed_100)], 100, 22), + ) + .await; + + // Try to receive another event with a timeout to ensure no more events were sent + // We should not receive an OutOfOrder event for slot 100 since we've already sent an event for it + let timeout_result = + tokio::time::timeout(std::time::Duration::from_millis(100), update_rx.recv()).await; + + // The timeout should occur, indicating no more events were received + assert!( + timeout_result.is_err(), + "Received unexpected additional event" + ); + + // Verify that both price feeds were stored correctly + let price_feed_ids = (*state).get_price_feed_ids().await; + assert_eq!(price_feed_ids.len(), 1); + assert!(price_feed_ids.contains(&PriceIdentifier::new([100; 32]))); + } + + /// Test that verifies only one event is sent when multiple concurrent updates + /// for the same slot are processed. + #[tokio::test] + pub async fn test_concurrent_updates_same_slot_sends_single_event() { + let (state, mut update_rx) = setup_state(10).await; + + // Create a single price feed message + let price_feed = create_dummy_price_feed_message(100, 10, 9); + + // Generate 100 identical updates for the same slot but with different sequence numbers + let mut all_updates = Vec::new(); + for seq in 0..100 { + let updates = generate_update(vec![Message::PriceFeedMessage(price_feed)], 10, seq); + all_updates.extend(updates); + } + + // Process updates concurrently - we don't care if some fail due to the race condition + // The important thing is that only one event is sent + let state_arc = Arc::clone(&state); + let futures = all_updates.into_iter().map(move |u| { + let state_clone = Arc::clone(&state_arc); + async move { + let _ = state_clone.store_update(u).await; + } + }); + futures::future::join_all(futures).await; + + // Check that only one AggregationEvent::New is received + assert_eq!( + update_rx.recv().await, + Ok(AggregationEvent::New { slot: 10 }) + ); + + // Try to receive another event with a timeout to ensure no more events were sent + let timeout_result = + tokio::time::timeout(std::time::Duration::from_millis(100), update_rx.recv()).await; + + // The timeout should occur, indicating no more events were received + assert!( + timeout_result.is_err(), + "Received unexpected additional event" + ); + + // Verify that the price feed was stored correctly + let price_feed_ids = (*state).get_price_feed_ids().await; + assert_eq!(price_feed_ids.len(), 1); + assert!(price_feed_ids.contains(&PriceIdentifier::new([100; 32]))); + } } #[cfg(test)] /// Unit tests for the core TWAP calculation logic in `calculate_twap` diff --git a/apps/hermes/server/src/state/aggregate/wormhole_merkle.rs b/apps/hermes/server/src/state/aggregate/wormhole_merkle.rs index 0c41df204b..cbe40a1d5a 100644 --- a/apps/hermes/server/src/state/aggregate/wormhole_merkle.rs +++ b/apps/hermes/server/src/state/aggregate/wormhole_merkle.rs @@ -55,14 +55,16 @@ pub async fn store_wormhole_merkle_verified_message( state: &S, root: WormholeMerkleRoot, vaa: VaaBytes, -) -> Result<()> +) -> Result where S: Cache, { + // Store the state and check if it was already stored in a single operation + // This avoids the race condition where multiple threads could check and find nothing + // but then both store the same state state .store_wormhole_merkle_state(WormholeMerkleState { root, vaa }) - .await?; - Ok(()) + .await } pub fn construct_message_states_proofs( diff --git a/apps/hermes/server/src/state/cache.rs b/apps/hermes/server/src/state/cache.rs index 2562b1932b..25f76fdb69 100644 --- a/apps/hermes/server/src/state/cache.rs +++ b/apps/hermes/server/src/state/cache.rs @@ -122,12 +122,12 @@ pub trait Cache { async fn store_accumulator_messages( &self, accumulator_messages: AccumulatorMessages, - ) -> Result<()>; + ) -> Result; async fn fetch_accumulator_messages(&self, slot: Slot) -> Result>; async fn store_wormhole_merkle_state( &self, wormhole_merkle_state: WormholeMerkleState, - ) -> Result<()>; + ) -> Result; async fn fetch_wormhole_merkle_state(&self, slot: Slot) -> Result>; async fn message_state_keys(&self) -> Vec; async fn fetch_message_states( @@ -226,13 +226,22 @@ where async fn store_accumulator_messages( &self, accumulator_messages: AccumulatorMessages, - ) -> Result<()> { + ) -> Result { let mut cache = self.into().accumulator_messages_cache.write().await; - cache.insert(accumulator_messages.slot, accumulator_messages); + let slot = accumulator_messages.slot; + + // Check if we already have messages for this slot while holding the lock + if cache.contains_key(&slot) { + // Messages already exist, return false to indicate no insertion happened + return Ok(false); + } + + // Messages don't exist, store them + cache.insert(slot, accumulator_messages); while cache.len() > self.into().cache_size as usize { cache.pop_first(); } - Ok(()) + Ok(true) } async fn fetch_accumulator_messages(&self, slot: Slot) -> Result> { @@ -243,13 +252,22 @@ where async fn store_wormhole_merkle_state( &self, wormhole_merkle_state: WormholeMerkleState, - ) -> Result<()> { + ) -> Result { let mut cache = self.into().wormhole_merkle_state_cache.write().await; - cache.insert(wormhole_merkle_state.root.slot, wormhole_merkle_state); + let slot = wormhole_merkle_state.root.slot; + + // Check if we already have a state for this slot while holding the lock + if cache.contains_key(&slot) { + // State already exists, return false to indicate no insertion happened + return Ok(false); + } + + // State doesn't exist, store it + cache.insert(slot, wormhole_merkle_state); while cache.len() > self.into().cache_size as usize { cache.pop_first(); } - Ok(()) + Ok(true) } async fn fetch_wormhole_merkle_state(&self, slot: Slot) -> Result> { @@ -702,18 +720,7 @@ mod test { let (state, _) = setup_state(2).await; // Make sure the retrieved accumulator messages is what we store. - let mut accumulator_messages_at_10 = create_empty_accumulator_messages_at_slot(10); - state - .store_accumulator_messages(accumulator_messages_at_10.clone()) - .await - .unwrap(); - assert_eq!( - state.fetch_accumulator_messages(10).await.unwrap().unwrap(), - accumulator_messages_at_10 - ); - - // Make sure overwriting the accumulator messages works. - accumulator_messages_at_10.ring_size = 5; // Change the ring size from 3 to 5. + let accumulator_messages_at_10 = create_empty_accumulator_messages_at_slot(10); state .store_accumulator_messages(accumulator_messages_at_10.clone()) .await @@ -764,22 +771,7 @@ mod test { let (state, _) = setup_state(2).await; // Make sure the retrieved wormhole merkle state is what we store - let mut wormhole_merkle_state_at_10 = create_empty_wormhole_merkle_state_at_slot(10); - state - .store_wormhole_merkle_state(wormhole_merkle_state_at_10.clone()) - .await - .unwrap(); - assert_eq!( - state - .fetch_wormhole_merkle_state(10) - .await - .unwrap() - .unwrap(), - wormhole_merkle_state_at_10 - ); - - // Make sure overwriting the wormhole merkle state works. - wormhole_merkle_state_at_10.root.ring_size = 5; // Change the ring size from 3 to 5. + let wormhole_merkle_state_at_10 = create_empty_wormhole_merkle_state_at_slot(10); state .store_wormhole_merkle_state(wormhole_merkle_state_at_10.clone()) .await