Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(apps/hermes): fix duplicate events for processed slots #2474

Merged
merged 10 commits into from
Mar 19, 2025
2 changes: 1 addition & 1 deletion apps/hermes/server/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion apps/hermes/server/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "hermes"
version = "0.8.2"
version = "0.8.3"
description = "Hermes is an agent that provides Verified Prices from the Pythnet Pyth Oracle."
edition = "2021"

Expand Down
52 changes: 27 additions & 25 deletions apps/hermes/server/src/api/rest/v2/sse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,34 +93,36 @@ where
// Convert the broadcast receiver into a Stream
let stream = BroadcastStream::new(update_rx);

let sse_stream = stream.then(move |message| {
let state_clone = state.clone(); // Clone again to use inside the async block
let price_ids_clone = price_ids.clone(); // Clone again for use inside the async block
async move {
match message {
Ok(event) => {
match handle_aggregation_event(
event,
state_clone,
price_ids_clone,
params.encoding,
params.parsed,
params.benchmarks_only,
params.allow_unordered,
)
.await
{
Ok(Some(update)) => Ok(Event::default()
.json_data(update)
.unwrap_or_else(error_event)),
Ok(None) => Ok(Event::default().comment("No update available")),
Err(e) => Ok(error_event(e)),
let sse_stream = stream
.then(move |message| {
let state_clone = state.clone(); // Clone again to use inside the async block
let price_ids_clone = price_ids.clone(); // Clone again for use inside the async block
async move {
match message {
Ok(event) => {
match handle_aggregation_event(
event,
state_clone,
price_ids_clone,
params.encoding,
params.parsed,
params.benchmarks_only,
params.allow_unordered,
)
.await
{
Ok(Some(update)) => Some(Ok(Event::default()
.json_data(update)
.unwrap_or_else(error_event))),
Ok(None) => None,
Err(e) => Some(Ok(error_event(e))),
}
}
Err(e) => Some(Ok(error_event(e))),
}
Err(e) => Ok(error_event(e)),
}
}
});
})
.filter_map(|x| x);

Ok(Sse::new(sse_stream).keep_alive(KeepAlive::default()))
}
Expand Down
161 changes: 144 additions & 17 deletions apps/hermes/server/src/state/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,13 +282,23 @@ where
WormholePayload::Merkle(proof) => {
tracing::info!(slot = proof.slot, "Storing VAA Merkle Proof.");

store_wormhole_merkle_verified_message(
// Store the wormhole merkle verified message and check if it was already stored
let is_new = store_wormhole_merkle_verified_message(
self,
proof.clone(),
update_vaa.to_owned(),
)
.await?;

// If the message was already stored, return early
if !is_new {
tracing::info!(
slot = proof.slot,
"VAA Merkle Proof already stored, skipping."
);
return Ok(());
}

self.into()
.data
.write()
Expand All @@ -304,9 +314,22 @@ where
let slot = accumulator_messages.slot;
tracing::info!(slot = slot, "Storing Accumulator Messages.");

self.store_accumulator_messages(accumulator_messages)
// Store the accumulator messages and check if they were already stored in a single operation
// This avoids the race condition where multiple threads could check and find nothing
// but then both store the same messages
let is_new = self
.store_accumulator_messages(accumulator_messages)
.await?;

// If the messages were already stored, return early
if !is_new {
tracing::info!(
slot = slot,
"Accumulator Messages already stored, skipping."
);
return Ok(());
}

self.into()
.data
.write()
Expand Down Expand Up @@ -351,28 +374,23 @@ where
// Update the aggregate state
let mut aggregate_state = self.into().data.write().await;

// Send update event to subscribers. We are purposefully ignoring the result
// because there might be no subscribers.
let _ = match aggregate_state.latest_completed_slot {
// Atomic check and update
let event = match aggregate_state.latest_completed_slot {
None => {
aggregate_state.latest_completed_slot.replace(slot);
self.into()
.api_update_tx
.send(AggregationEvent::New { slot })
aggregate_state.latest_completed_slot = Some(slot);
AggregationEvent::New { slot }
}
Some(latest) if slot > latest => {
self.prune_removed_keys(message_state_keys).await;
aggregate_state.latest_completed_slot.replace(slot);
self.into()
.api_update_tx
.send(AggregationEvent::New { slot })
aggregate_state.latest_completed_slot = Some(slot);
AggregationEvent::New { slot }
}
_ => self
.into()
.api_update_tx
.send(AggregationEvent::OutOfOrder { slot }),
_ => AggregationEvent::OutOfOrder { slot },
};

// Only send the event after the state has been updated
let _ = self.into().api_update_tx.send(event);

aggregate_state.latest_completed_slot = aggregate_state
.latest_completed_slot
.map(|latest| latest.max(slot))
Expand Down Expand Up @@ -1374,6 +1392,115 @@ mod test {

assert_eq!(result.unwrap_err().to_string(), "Message not found");
}

/// Test that verifies only one event is sent per slot, even when updates arrive out of order
/// or when a slot is processed multiple times.
#[tokio::test]
pub async fn test_out_of_order_updates_send_single_event_per_slot() {
let (state, mut update_rx) = setup_state(10).await;

// Create price feed messages
let price_feed_100 = create_dummy_price_feed_message(100, 10, 9);
let price_feed_101 = create_dummy_price_feed_message(100, 11, 10);

// First, process slot 100
store_multiple_concurrent_valid_updates(
state.clone(),
generate_update(vec![Message::PriceFeedMessage(price_feed_100)], 100, 20),
)
.await;

// Check that we received the New event for slot 100
assert_eq!(
update_rx.recv().await,
Ok(AggregationEvent::New { slot: 100 })
);

// Next, process slot 101
store_multiple_concurrent_valid_updates(
state.clone(),
generate_update(vec![Message::PriceFeedMessage(price_feed_101)], 101, 21),
)
.await;

// Check that we received the New event for slot 101
assert_eq!(
update_rx.recv().await,
Ok(AggregationEvent::New { slot: 101 })
);

// Now, process slot 100 again
store_multiple_concurrent_valid_updates(
state.clone(),
generate_update(vec![Message::PriceFeedMessage(price_feed_100)], 100, 22),
)
.await;

// Try to receive another event with a timeout to ensure no more events were sent
// We should not receive an OutOfOrder event for slot 100 since we've already sent an event for it
let timeout_result =
tokio::time::timeout(std::time::Duration::from_millis(100), update_rx.recv()).await;

// The timeout should occur, indicating no more events were received
assert!(
timeout_result.is_err(),
"Received unexpected additional event"
);

// Verify that both price feeds were stored correctly
let price_feed_ids = (*state).get_price_feed_ids().await;
assert_eq!(price_feed_ids.len(), 1);
assert!(price_feed_ids.contains(&PriceIdentifier::new([100; 32])));
}

/// Test that verifies only one event is sent when multiple concurrent updates
/// for the same slot are processed.
#[tokio::test]
pub async fn test_concurrent_updates_same_slot_sends_single_event() {
let (state, mut update_rx) = setup_state(10).await;

// Create a single price feed message
let price_feed = create_dummy_price_feed_message(100, 10, 9);

// Generate 100 identical updates for the same slot but with different sequence numbers
let mut all_updates = Vec::new();
for seq in 0..100 {
let updates = generate_update(vec![Message::PriceFeedMessage(price_feed)], 10, seq);
all_updates.extend(updates);
}

// Process updates concurrently - we don't care if some fail due to the race condition
// The important thing is that only one event is sent
let state_arc = Arc::clone(&state);
let futures = all_updates.into_iter().map(move |u| {
let state_clone = Arc::clone(&state_arc);
async move {
let _ = state_clone.store_update(u).await;
}
});
futures::future::join_all(futures).await;

// Check that only one AggregationEvent::New is received
assert_eq!(
update_rx.recv().await,
Ok(AggregationEvent::New { slot: 10 })
);

// Try to receive another event with a timeout to ensure no more events were sent
let timeout_result =
tokio::time::timeout(std::time::Duration::from_millis(100), update_rx.recv()).await;

// The timeout should occur, indicating no more events were received
assert!(
timeout_result.is_err(),
"Received unexpected additional event"
);

// Verify that the price feed was stored correctly
let price_feed_ids = (*state).get_price_feed_ids().await;
assert_eq!(price_feed_ids.len(), 1);
assert!(price_feed_ids.contains(&PriceIdentifier::new([100; 32])));
}
}
#[cfg(test)]
/// Unit tests for the core TWAP calculation logic in `calculate_twap`
Expand Down
8 changes: 5 additions & 3 deletions apps/hermes/server/src/state/aggregate/wormhole_merkle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,16 @@ pub async fn store_wormhole_merkle_verified_message<S>(
state: &S,
root: WormholeMerkleRoot,
vaa: VaaBytes,
) -> Result<()>
) -> Result<bool>
where
S: Cache,
{
// Store the state and check if it was already stored in a single operation
// This avoids the race condition where multiple threads could check and find nothing
// but then both store the same state
state
.store_wormhole_merkle_state(WormholeMerkleState { root, vaa })
.await?;
Ok(())
.await
}

pub fn construct_message_states_proofs(
Expand Down
Loading
Loading