Skip to content

Add metrics for flashblock and message tracking in OpRBuilder #543

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Apr 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions crates/op-rbuilder/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,20 @@ pub struct OpRBuilderMetrics {
pub builder_landed_blocks_missed: Gauge,
/// Block built success
pub block_built_success: Counter,
/// Number of flashblocks added to block (Total per block)
#[cfg(feature = "flashblocks")]
pub flashblock_count: Histogram,
/// Number of messages sent
#[cfg(feature = "flashblocks")]
pub messages_sent_count: Counter,
/// Total duration of building a block
pub total_block_built_duration: Histogram,
/// Flashblock build duration
#[cfg(feature = "flashblocks")]
pub flashblock_build_duration: Histogram,
/// Number of invalid blocks
#[cfg(feature = "flashblocks")]
pub invalid_blocks_count: Counter,
/// Duration of fetching transactions from the pool
pub transaction_pool_fetch_duration: Histogram,
/// Duration of state root calculation
Expand Down
148 changes: 128 additions & 20 deletions crates/op-rbuilder/src/payload_builder.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::{
generator::{BlockCell, BlockPayloadJobGenerator, BuildArguments, PayloadBuilder},
metrics::OpRBuilderMetrics,
primitives::reth::ExecutionInfo,
tx_signer::Signer,
};
Expand Down Expand Up @@ -47,7 +48,7 @@ use reth_payload_builder_primitives::PayloadBuilderError;
use reth_payload_primitives::PayloadBuilderAttributes;
use reth_payload_util::{BestPayloadTransactions, PayloadTransactions};
use reth_primitives::{BlockBody, SealedHeader};
use reth_primitives_traits::{proofs, Block as _, SignedTransaction};
use reth_primitives_traits::{proofs, Block as _, InMemorySize, SignedTransaction};
use reth_provider::{
CanonStateSubscriptions, HashedPostStateProvider, ProviderError, StateProviderFactory,
StateRootProvider, StorageRootProvider,
Expand All @@ -65,7 +66,7 @@ use rollup_boost::{
use serde::{Deserialize, Serialize};
use std::{
sync::{Arc, Mutex},
time::Duration,
time::{Duration, Instant},
};
use tokio::{
net::{TcpListener, TcpStream},
Expand Down Expand Up @@ -231,6 +232,8 @@ pub struct OpPayloadBuilder<Pool, Client> {
pub chain_block_time: u64,
/// Flashblock block time
pub flashblock_block_time: u64,
/// The metrics for the builder
pub metrics: OpRBuilderMetrics,
}

impl<Pool, Client> OpPayloadBuilder<Pool, Client> {
Expand Down Expand Up @@ -259,6 +262,7 @@ impl<Pool, Client> OpPayloadBuilder<Pool, Client> {
tx,
chain_block_time,
flashblock_block_time,
metrics: Default::default(),
}
}

Expand Down Expand Up @@ -320,7 +324,9 @@ where
{
/// Send a message to be published
pub fn send_message(&self, message: String) -> Result<(), Box<dyn std::error::Error>> {
self.tx.send(message).map_err(|e| e.into())
self.tx.send(message)?;
self.metrics.messages_sent_count.increment(1);
Ok(())
}

/// Constructs an Optimism payload from the transactions sent via the
Expand All @@ -336,6 +342,7 @@ where
args: BuildArguments<OpPayloadBuilderAttributes<OpTransactionSigned>, OpBuiltPayload>,
best_payload: BlockCell<OpBuiltPayload>,
) -> Result<(), PayloadBuilderError> {
let block_build_start_time = Instant::now();
let BuildArguments { config, cancel, .. } = args;

let chain_spec = self.client.chain_spec();
Expand Down Expand Up @@ -374,31 +381,44 @@ where
evm_env,
block_env_attributes,
cancel,
metrics: Default::default(),
};

let state_provider = self.client.state_by_block_hash(ctx.parent().hash())?;
let state = StateProviderDatabase::new(&state_provider);

// 1. execute the pre steps and seal an early block with that
let sequencer_tx_start_time = Instant::now();
let mut db = State::builder()
.with_database(state)
.with_bundle_update()
.build();

// 1. execute the pre steps and seal an early block with that
let mut info = execute_pre_steps(&mut db, &ctx)?;
ctx.metrics
.sequencer_tx_duration
.record(sequencer_tx_start_time.elapsed());

let (payload, fb_payload, mut bundle_state) = build_block(db, &ctx, &mut info)?;

best_payload.set(payload.clone());
let _ = self.send_message(serde_json::to_string(&fb_payload).unwrap_or_default());

tracing::info!(target: "payload_builder", "Fallback block built");
ctx.metrics
.payload_num_tx
.record(info.executed_transactions.len() as f64);

if ctx.attributes().no_tx_pool {
tracing::info!(
target: "payload_builder",
"No transaction pool, skipping transaction pool processing",
);

self.metrics
.total_block_built_duration
.record(block_build_start_time.elapsed());

// return early since we don't need to build a block with transactions from the pool
return Ok(());
}
Expand All @@ -408,7 +428,6 @@ where
let mut total_gas_per_batch = gas_per_batch;

let mut flashblock_count = 0;

// Create a channel to coordinate flashblock building
let (build_tx, mut build_rx) = mpsc::channel(1);

Expand Down Expand Up @@ -470,6 +489,7 @@ where
flashblock_count,
);

let flashblock_build_start_time = Instant::now();
let state = StateProviderDatabase::new(&state_provider);

let mut db = State::builder()
Expand All @@ -478,16 +498,25 @@ where
.with_bundle_prestate(bundle_state)
.build();

let best_txs_start_time = Instant::now();
let best_txs = BestPayloadTransactions::new(
self.pool
.best_transactions_with_attributes(ctx.best_transaction_attributes()),
);
ctx.metrics
.transaction_pool_fetch_duration
.record(best_txs_start_time.elapsed());

let tx_execution_start_time = Instant::now();
ctx.execute_best_transactions(
&mut info,
&mut db,
best_txs,
total_gas_per_batch,
)?;
ctx.metrics
.payload_tx_simulation_duration
.record(tx_execution_start_time.elapsed());

if ctx.cancel.is_cancelled() {
tracing::info!(
Expand All @@ -498,23 +527,57 @@ where
return Ok(());
}

let (new_payload, mut fb_payload, new_bundle_state) =
build_block(db, &ctx, &mut info)?;

fb_payload.index = flashblock_count + 1; // we do this because the fallback block is index 0
fb_payload.base = None;
let _ =
self.send_message(serde_json::to_string(&fb_payload).unwrap_or_default());

best_payload.set(new_payload.clone());
bundle_state = new_bundle_state;
total_gas_per_batch += gas_per_batch;
flashblock_count += 1;
let total_block_built_duration = Instant::now();
let build_result = build_block(db, &ctx, &mut info);
ctx.metrics
.total_block_built_duration
.record(total_block_built_duration.elapsed());

// Handle build errors with match pattern
match build_result {
Err(err) => {
// Track invalid/bad block
self.metrics.invalid_blocks_count.increment(1);
error!(target: "payload_builder", "Failed to build block {}, flashblock {}: {}", ctx.block_number(), flashblock_count, err);
// Return the error
return Err(err);
}
Ok((new_payload, mut fb_payload, new_bundle_state)) => {
fb_payload.index = flashblock_count + 1; // we do this because the fallback block is index 0
fb_payload.base = None;

if let Err(err) = self.send_message(
serde_json::to_string(&fb_payload).unwrap_or_default(),
) {
error!(target: "payload_builder", "Failed to send flashblock message: {}", err);
}

tracing::info!(target: "payload_builder", "Flashblock {} built", flashblock_count);
// Record flashblock build duration
self.metrics
.flashblock_build_duration
.record(flashblock_build_start_time.elapsed());
ctx.metrics
.payload_byte_size
.record(new_payload.block().size() as f64);
ctx.metrics
.payload_num_tx
.record(info.executed_transactions.len() as f64);

best_payload.set(new_payload.clone());
// Update bundle_state for next iteration
bundle_state = new_bundle_state;
total_gas_per_batch += gas_per_batch;
flashblock_count += 1;
tracing::info!(target: "payload_builder", "Flashblock {} built", flashblock_count);
}
}
}
None => {
// Exit loop if channel closed or cancelled
self.metrics.block_built_success.increment(1);
self.metrics
.flashblock_count
.record(flashblock_count as f64);
return Ok(());
}
}
Expand Down Expand Up @@ -552,7 +615,11 @@ where
// TODO: We must run this only once per block, but we are running it on every flashblock
// merge all transitions into bundle state, this would apply the withdrawal balance changes
// and 4788 contract call
let state_merge_start_time = Instant::now();
state.merge_transitions(BundleRetention::Reverts);
ctx.metrics
.state_transition_merge_duration
.record(state_merge_start_time.elapsed());

let new_bundle = state.take_bundle();

Expand Down Expand Up @@ -580,6 +647,7 @@ where
.expect("Number is in range");

// // calculate the state root
let state_root_start_time = Instant::now();
let state_provider = state.database.as_ref();
let hashed_state = state_provider.hashed_post_state(execution_outcome.state());
let (state_root, _trie_output) = {
Expand All @@ -595,6 +663,9 @@ where
);
})?
};
ctx.metrics
.state_root_calculation_duration
.record(state_root_start_time.elapsed());

let withdrawals_root = if ctx
.chain_spec
Expand Down Expand Up @@ -794,6 +865,8 @@ pub struct OpPayloadBuilderCtx<ChainSpec> {
pub block_env_attributes: OpNextBlockEnvAttributes,
/// Marker to check whether the job has been cancelled.
pub cancel: CancellationToken,
/// The metrics for the builder
pub metrics: OpRBuilderMetrics,
}

impl<ChainSpec> OpPayloadBuilderCtx<ChainSpec>
Expand Down Expand Up @@ -1046,11 +1119,17 @@ where
DB: Database<Error = ProviderError>,
{
let base_fee = self.base_fee();
let mut num_txs_considered = 0;
let mut num_txs_simulated = 0;
let mut num_txs_simulated_success = 0;
let mut num_txs_simulated_fail = 0;

let mut evm = self.evm_config.evm_with_env(&mut *db, self.evm_env.clone());

while let Some(tx) = best_txs.next(()) {
let tx = tx.into_consensus();
num_txs_considered += 1;

// check in info if the txn has been executed already
if info.executed_transactions.contains(&tx) {
continue;
Expand All @@ -1077,6 +1156,7 @@ where
return Ok(Some(()));
}

let tx_simulation_start_time = Instant::now();
let ResultAndState { result, state } = match evm.transact(&tx) {
Ok(res) => res,
Err(err) => {
Expand All @@ -1098,8 +1178,23 @@ where
}
};

// add gas used by the transaction to cumulative gas used, before creating the
// receipt
self.metrics
.tx_simulation_duration
.record(tx_simulation_start_time.elapsed());
self.metrics.tx_byte_size.record(tx.inner().size() as f64);
num_txs_simulated += 1;

if result.is_success() {
num_txs_simulated_success += 1;
} else {
num_txs_simulated_fail += 1;
trace!(target: "payload_builder", ?tx, "skipping reverted transaction");
best_txs.mark_invalid(tx.signer(), tx.nonce());
info.invalid_tx_hashes.insert(*tx.tx_hash());
continue;
}

// add gas used by the transaction to cumulative gas used, before creating the receipt
let gas_used = result.gas_used();
info.cumulative_gas_used += gas_used;

Expand All @@ -1126,6 +1221,19 @@ where
info.executed_transactions.push(tx.into_inner());
}

self.metrics
.payload_num_tx_considered
.record(num_txs_considered as f64);
self.metrics
.payload_num_tx_simulated
.record(num_txs_simulated as f64);
self.metrics
.payload_num_tx_simulated_success
.record(num_txs_simulated_success as f64);
self.metrics
.payload_num_tx_simulated_fail
.record(num_txs_simulated_fail as f64);

Ok(None)
}
}
Loading