Skip to content

refactor: deprecate tx & bundle polling tasks #22

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Dec 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 6 additions & 17 deletions bin/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@

use builder::config::BuilderConfig;
use builder::service::serve_builder_with_span;
use builder::tasks::bundler::BundlePoller;
use builder::tasks::block::BlockBuilder;
use builder::tasks::oauth::Authenticator;
use builder::tasks::tx_poller::TxPoller;
use builder::tasks::submit::SubmitTask;

use tokio::select;

Expand All @@ -22,12 +22,8 @@ async fn main() -> eyre::Result<()> {
let sequencer_signer = config.connect_sequencer_signer().await?;
let zenith = config.connect_zenith(provider.clone());

let port = config.builder_port;
let tx_poller = TxPoller::new(&config);
let bundle_poller = BundlePoller::new(&config, authenticator.clone()).await;
let builder = builder::tasks::block::BlockBuilder::new(&config);

let submit = builder::tasks::submit::SubmitTask {
let builder = BlockBuilder::new(&config, authenticator.clone());
let submit = SubmitTask {
authenticator: authenticator.clone(),
provider,
zenith,
Expand All @@ -38,10 +34,9 @@ async fn main() -> eyre::Result<()> {

let authenticator_jh = authenticator.spawn();
let (submit_channel, submit_jh) = submit.spawn();
let (tx_channel, bundle_channel, build_jh) = builder.spawn(submit_channel);
let tx_poller_jh = tx_poller.spawn(tx_channel.clone());
let bundle_poller_jh = bundle_poller.spawn(bundle_channel);
let build_jh = builder.spawn(submit_channel);

let port = config.builder_port;
let server = serve_builder_with_span(([0, 0, 0, 0], port), span);

select! {
Expand All @@ -54,12 +49,6 @@ async fn main() -> eyre::Result<()> {
_ = server => {
tracing::info!("server finished");
}
_ = tx_poller_jh => {
tracing::info!("tx_poller finished");
}
_ = bundle_poller_jh => {
tracing::info!("bundle_poller finished");
}
_ = authenticator_jh => {
tracing::info!("authenticator finished");
}
Expand Down
4 changes: 0 additions & 4 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ const BLOCK_CONFIRMATION_BUFFER: &str = "BLOCK_CONFIRMATION_BUFFER";
const BUILDER_REWARDS_ADDRESS: &str = "BUILDER_REWARDS_ADDRESS";
const ROLLUP_BLOCK_GAS_LIMIT: &str = "ROLLUP_BLOCK_GAS_LIMIT";
const TX_POOL_URL: &str = "TX_POOL_URL";
const TX_POOL_POLL_INTERVAL: &str = "TX_POOL_POLL_INTERVAL";
const AUTH_TOKEN_REFRESH_INTERVAL: &str = "AUTH_TOKEN_REFRESH_INTERVAL";
const TX_POOL_CACHE_DURATION: &str = "TX_POOL_CACHE_DURATION";
const OAUTH_CLIENT_ID: &str = "OAUTH_CLIENT_ID";
Expand Down Expand Up @@ -69,8 +68,6 @@ pub struct BuilderConfig {
pub rollup_block_gas_limit: u64,
/// URL of the tx pool to poll for incoming transactions.
pub tx_pool_url: Cow<'static, str>,
//// Interval in seconds to poll the tx-pool for new transactions.
pub tx_pool_poll_interval: u64,
/// Duration in seconds transactions can live in the tx-pool cache.
pub tx_pool_cache_duration: u64,
/// OAuth client ID for the builder.
Expand Down Expand Up @@ -155,7 +152,6 @@ impl BuilderConfig {
builder_rewards_address: load_address(BUILDER_REWARDS_ADDRESS)?,
rollup_block_gas_limit: load_u64(ROLLUP_BLOCK_GAS_LIMIT)?,
tx_pool_url: load_url(TX_POOL_URL)?,
tx_pool_poll_interval: load_u64(TX_POOL_POLL_INTERVAL)?,
tx_pool_cache_duration: load_u64(TX_POOL_CACHE_DURATION)?,
oauth_client_id: load_string(OAUTH_CLIENT_ID)?,
oauth_client_secret: load_string(OAUTH_CLIENT_SECRET)?,
Expand Down
112 changes: 58 additions & 54 deletions src/tasks/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,15 @@ use alloy::{
use alloy_primitives::{keccak256, Bytes, B256};
use alloy_rlp::Buf;
use std::{sync::OnceLock, time::Duration};
use tokio::{select, sync::mpsc, task::JoinHandle};
use tokio::{sync::mpsc, task::JoinHandle};
use tracing::Instrument;
use zenith_types::{encode_txns, Alloy2718Coder};

use super::bundler::{Bundle, BundlePoller};
use super::oauth::Authenticator;
use super::tx_poller::TxPoller;
use crate::config::BuilderConfig;

use super::bundler::Bundle;

#[derive(Debug, Default, Clone)]
/// A block in progress.
pub struct InProgressBlock {
Expand Down Expand Up @@ -109,75 +110,78 @@ impl InProgressBlock {
pub struct BlockBuilder {
pub incoming_transactions_buffer: u64,
pub config: BuilderConfig,
pub tx_poller: TxPoller,
pub bundle_poller: BundlePoller,
}

impl BlockBuilder {
// create a new block builder with the given config.
pub fn new(config: &BuilderConfig) -> Self {
pub fn new(config: &BuilderConfig, authenticator: Authenticator) -> Self {
Self {
config: config.clone(),
incoming_transactions_buffer: config.incoming_transactions_buffer,
tx_poller: TxPoller::new(config),
bundle_poller: BundlePoller::new(config, authenticator),
}
}

/// Spawn the block builder task, returning the inbound channel to it, and
/// a handle to the running task.
pub fn spawn(
self,
outbound: mpsc::UnboundedSender<InProgressBlock>,
) -> (mpsc::UnboundedSender<TxEnvelope>, mpsc::UnboundedSender<Bundle>, JoinHandle<()>) {
let mut in_progress = InProgressBlock::default();

let (tx_sender, mut tx_inbound) = mpsc::unbounded_channel();
let (bundle_sender, mut bundle_inbound) = mpsc::unbounded_channel();
async fn get_transactions(&mut self, in_progress: &mut InProgressBlock) {
let txns = self.tx_poller.check_tx_cache().await;
match txns {
Ok(txns) => {
for txn in txns.into_iter() {
in_progress.ingest_tx(&txn);
}
}
Err(e) => {
tracing::error!(error = %e, "error polling transactions");
}
}
self.tx_poller.evict();
}

let mut sleep =
Box::pin(tokio::time::sleep(Duration::from_secs(self.incoming_transactions_buffer)));
async fn get_bundles(&mut self, in_progress: &mut InProgressBlock) {
let bundles = self.bundle_poller.check_bundle_cache().await;
match bundles {
Ok(bundles) => {
for bundle in bundles {
in_progress.ingest_bundle(bundle);
}
}
Err(e) => {
tracing::error!(error = %e, "error polling bundles");
}
}
self.bundle_poller.evict();
}

let handle = tokio::spawn(
/// Spawn the block builder task, returning the inbound channel to it, and
/// a handle to the running task.
pub fn spawn(mut self, outbound: mpsc::UnboundedSender<InProgressBlock>) -> JoinHandle<()> {
tokio::spawn(
async move {
loop {

select! {
biased;
_ = &mut sleep => {
if !in_progress.is_empty() {
tracing::debug!(txns = in_progress.len(), "sending block to submit task");
let in_progress_block = std::mem::take(&mut in_progress);
if outbound.send(in_progress_block).is_err() {
tracing::debug!("downstream task gone");
break
}
}

// Reset the sleep timer, as we want to do so when (and only when) our sleep future has elapsed,
// irrespective of whether we have any blocks to build.
sleep.as_mut().reset(tokio::time::Instant::now() + Duration::from_secs(self.incoming_transactions_buffer));
}
tx_resp = tx_inbound.recv() => {
match tx_resp {
Some(tx) => in_progress.ingest_tx(&tx),
None => {
tracing::debug!("upstream task gone");
break
}
}
}
bundle_resp = bundle_inbound.recv() => {
match bundle_resp {
Some(bundle) => in_progress.ingest_bundle(bundle),
None => {
tracing::debug!("upstream task gone");
break
}
}
// sleep the buffer time
tokio::time::sleep(Duration::from_secs(self.incoming_transactions_buffer))
.await;

// Build a block
let mut in_progress = InProgressBlock::default();
self.get_transactions(&mut in_progress).await;
self.get_bundles(&mut in_progress).await;
Comment on lines +170 to +171
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

an interesting byproduct of this is that bundles will land end of block. nothing to do here now, but something we might wanna think about later

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes i think any logic around ordering could be layered in later


// submit the block if it has transactions
if !in_progress.is_empty() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we maybe log if we find there are no txs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i have this in next PR :)

tracing::debug!(txns = in_progress.len(), "sending block to submit task");
let in_progress_block = std::mem::take(&mut in_progress);
if outbound.send(in_progress_block).is_err() {
tracing::debug!("downstream task gone");
break;
}
}
}
}
.in_current_span(),
);

(tx_sender, bundle_sender, handle)
)
}
}
36 changes: 2 additions & 34 deletions src/tasks/bundler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ use alloy_primitives::map::HashMap;
use reqwest::Url;
use serde::{Deserialize, Serialize};
use signet_types::SignetEthBundle;
use tokio::{sync::mpsc, task::JoinHandle};
use tracing::debug;

use oauth2::TokenResponse;

Expand All @@ -34,7 +32,7 @@ pub struct BundlePoller {
/// Implements a poller for the block builder to pull bundles from the tx cache.
impl BundlePoller {
/// Creates a new BundlePoller from the provided builder config.
pub async fn new(config: &BuilderConfig, authenticator: Authenticator) -> Self {
pub fn new(config: &BuilderConfig, authenticator: Authenticator) -> Self {
Self { config: config.clone(), authenticator, seen_uuids: HashMap::new() }
}

Expand Down Expand Up @@ -73,7 +71,7 @@ impl BundlePoller {
}

/// Evicts expired bundles from the cache.
fn evict(&mut self) {
pub fn evict(&mut self) {
let expired_keys: Vec<String> = self
.seen_uuids
.iter()
Expand All @@ -92,34 +90,4 @@ impl BundlePoller {
self.seen_uuids.remove(&key);
}
}

pub fn spawn(mut self, bundle_channel: mpsc::UnboundedSender<Bundle>) -> JoinHandle<()> {
let handle: JoinHandle<()> = tokio::spawn(async move {
loop {
let bundle_channel = bundle_channel.clone();
let bundles = self.check_bundle_cache().await;

match bundles {
Ok(bundles) => {
for bundle in bundles {
let result = bundle_channel.send(bundle);
if result.is_err() {
tracing::debug!("bundle_channel failed to send bundle");
}
}
}
Err(err) => {
debug!(?err, "error fetching bundles from tx-pool");
}
}

// evict expired bundles once every loop
self.evict();

tokio::time::sleep(Duration::from_secs(self.config.tx_pool_poll_interval)).await;
}
});

handle
}
}
9 changes: 4 additions & 5 deletions src/tasks/oauth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ impl Authenticator {
}

mod tests {
use crate::{config::BuilderConfig, tasks::block::BlockBuilder};
use crate::config::BuilderConfig;
use alloy_primitives::Address;
use eyre::Result;

Expand All @@ -124,7 +124,7 @@ mod tests {
use super::*;
use oauth2::TokenResponse;

let config = setup_test_builder()?.1;
let config = setup_test_config()?;
let auth = Authenticator::new(&config);
let token = auth.fetch_oauth_token().await?;
dbg!(&token);
Expand All @@ -135,7 +135,7 @@ mod tests {
}

#[allow(dead_code)]
pub fn setup_test_builder() -> Result<(BlockBuilder, BuilderConfig)> {
pub fn setup_test_config() -> Result<BuilderConfig> {
let config = BuilderConfig {
host_chain_id: 17000,
ru_chain_id: 17001,
Expand All @@ -151,7 +151,6 @@ mod tests {
rollup_block_gas_limit: 100_000,
tx_pool_url: "http://localhost:9000/".into(),
tx_pool_cache_duration: 5,
tx_pool_poll_interval: 5,
oauth_client_id: "some_client_id".into(),
oauth_client_secret: "some_client_secret".into(),
oauth_authenticate_url: "http://localhost:9000".into(),
Expand All @@ -160,6 +159,6 @@ mod tests {
tx_broadcast_urls: vec!["http://localhost:9000".into()],
oauth_token_refresh_interval: 300, // 5 minutes
};
Ok((BlockBuilder::new(&config), config))
Ok(config)
}
}
37 changes: 1 addition & 36 deletions src/tasks/tx_poller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ use eyre::Error;
use reqwest::{Client, Url};
use serde::{Deserialize, Serialize};
use serde_json::from_slice;
use tokio::sync::mpsc;
use tokio::task::JoinHandle;

pub use crate::config::BuilderConfig;

Expand Down Expand Up @@ -62,7 +60,7 @@ impl TxPoller {
}

/// removes entries from seen_txns that have lived past expiry
fn evict(&mut self) {
pub fn evict(&mut self) {
let expired_keys: Vec<TxHash> = self
.seen_txns
.iter()
Expand All @@ -81,37 +79,4 @@ impl TxPoller {
self.seen_txns.remove(&key);
}
}

/// spawns a task that polls the tx-pool for unique transactions and ingests them into the tx_channel.
pub fn spawn(mut self, tx_channel: mpsc::UnboundedSender<TxEnvelope>) -> JoinHandle<()> {
let handle: JoinHandle<()> = tokio::spawn(async move {
loop {
let channel = tx_channel.clone();
let txns = self.check_tx_cache().await;

// send recently discovered transactions to the builder pipeline
match txns {
Ok(txns) => {
for txn in txns.into_iter() {
let result = channel.send(txn);
if result.is_err() {
tracing::debug!("tx_poller failed to send tx");
continue;
}
}
}
Err(e) => {
println!("Error polling transactions: {}", e);
}
}

// evict expired txns once every loop
self.evict();

tokio::time::sleep(Duration::from_secs(self.config.tx_pool_poll_interval)).await;
}
});

handle
}
}
Loading
Loading