Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Delayed block processing for Fortuna #2307

Merged
merged 5 commits into from
Jan 30, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion apps/fortuna/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion apps/fortuna/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "fortuna"
version = "7.3.0"
version = "7.4.0"
edition = "2021"

[dependencies]
Expand Down
5 changes: 5 additions & 0 deletions apps/fortuna/config.sample.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ chains:
target_profit_pct: 20
max_profit_pct: 100

# A list of block delays for processing blocks multiple times. Each number represents
# how many blocks to wait before processing. For example, [5, 10, 20] means process
# blocks after 5 blocks, then again after 10 blocks, and finally after 20 blocks.
block_delays: [5, 10, 20]

# Historical commitments -- delete this block for local development purposes
commitments:
# prettier-ignore
Expand Down
10 changes: 10 additions & 0 deletions apps/fortuna/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,16 @@ pub struct EthereumConfig {
/// Maximum number of hashes to record in a request.
/// This should be set according to the maximum gas limit the provider supports for callbacks.
pub max_num_hashes: Option<u32>,

/// A list of delays (in blocks) that indicates how many blocks should be delayed
/// before we process a block. For retry logic, we can process blocks multiple times
/// at each specified delay. For example: [5, 10, 20].
#[serde(default = "default_block_delays")]
pub block_delays: Vec<u64>,
}

fn default_block_delays() -> Vec<u64> {
vec![5]
}

fn default_priority_fee_multiplier_pct() -> u64 {
Expand Down
33 changes: 29 additions & 4 deletions apps/fortuna/src/keeper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ impl KeeperMetrics {
}
}

#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct BlockRange {
pub from: BlockNumber,
pub to: BlockNumber,
Expand Down Expand Up @@ -346,7 +346,8 @@ pub async fn run_keeper_threads(
)
.in_current_span(),
);
// Spawn a thread that listens for block ranges on the `rx` channel and processes the events for those blocks.

// Spawn a thread for block processing with configured delays
spawn(
process_new_blocks(
chain_state.clone(),
Expand All @@ -356,6 +357,7 @@ pub async fn run_keeper_threads(
chain_eth_config.escalation_policy.clone(),
metrics.clone(),
fulfilled_requests_cache.clone(),
chain_eth_config.block_delays.clone(),
)
.in_current_span(),
);
Expand Down Expand Up @@ -965,8 +967,10 @@ pub async fn watch_blocks(
}
}

/// It waits on rx channel to receive block ranges and then calls process_block_range to process them.
/// It waits on rx channel to receive block ranges and then calls process_block_range to process them
/// for each configured block delay.
#[tracing::instrument(skip_all)]
#[allow(clippy::too_many_arguments)]
pub async fn process_new_blocks(
chain_state: BlockchainState,
mut rx: mpsc::Receiver<BlockRange>,
Expand All @@ -975,12 +979,14 @@ pub async fn process_new_blocks(
escalation_policy: EscalationPolicyConfig,
metrics: Arc<KeeperMetrics>,
fulfilled_requests_cache: Arc<RwLock<HashSet<u64>>>,
block_delays: Vec<u64>,
) {
tracing::info!("Waiting for new block ranges to process");
loop {
if let Some(block_range) = rx.recv().await {
// Process blocks immediately first
process_block_range(
block_range,
block_range.clone(),
Arc::clone(&contract),
gas_limit,
escalation_policy.clone(),
Expand All @@ -990,6 +996,25 @@ pub async fn process_new_blocks(
)
.in_current_span()
.await;

// Then process with each configured delay
for delay in &block_delays {
let adjusted_range = BlockRange {
from: block_range.from.saturating_sub(*delay),
to: block_range.to.saturating_sub(*delay),
};
process_block_range(
adjusted_range,
Arc::clone(&contract),
gas_limit,
escalation_policy.clone(),
chain_state.clone(),
metrics.clone(),
fulfilled_requests_cache.clone(),
)
.in_current_span()
.await;
}
}
}
}
Expand Down
Loading