Skip to content

Commit 89c8d50

Browse files
authored
refactor: deprecate tx & bundle polling tasks (#22)
1 parent 860c9fb commit 89c8d50

File tree

8 files changed

+80
-164
lines changed

8 files changed

+80
-164
lines changed

bin/builder.rs

+6-17
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,9 @@
22

33
use builder::config::BuilderConfig;
44
use builder::service::serve_builder_with_span;
5-
use builder::tasks::bundler::BundlePoller;
5+
use builder::tasks::block::BlockBuilder;
66
use builder::tasks::oauth::Authenticator;
7-
use builder::tasks::tx_poller::TxPoller;
7+
use builder::tasks::submit::SubmitTask;
88

99
use tokio::select;
1010

@@ -22,12 +22,8 @@ async fn main() -> eyre::Result<()> {
2222
let sequencer_signer = config.connect_sequencer_signer().await?;
2323
let zenith = config.connect_zenith(provider.clone());
2424

25-
let port = config.builder_port;
26-
let tx_poller = TxPoller::new(&config);
27-
let bundle_poller = BundlePoller::new(&config, authenticator.clone()).await;
28-
let builder = builder::tasks::block::BlockBuilder::new(&config);
29-
30-
let submit = builder::tasks::submit::SubmitTask {
25+
let builder = BlockBuilder::new(&config, authenticator.clone());
26+
let submit = SubmitTask {
3127
authenticator: authenticator.clone(),
3228
provider,
3329
zenith,
@@ -38,10 +34,9 @@ async fn main() -> eyre::Result<()> {
3834

3935
let authenticator_jh = authenticator.spawn();
4036
let (submit_channel, submit_jh) = submit.spawn();
41-
let (tx_channel, bundle_channel, build_jh) = builder.spawn(submit_channel);
42-
let tx_poller_jh = tx_poller.spawn(tx_channel.clone());
43-
let bundle_poller_jh = bundle_poller.spawn(bundle_channel);
37+
let build_jh = builder.spawn(submit_channel);
4438

39+
let port = config.builder_port;
4540
let server = serve_builder_with_span(([0, 0, 0, 0], port), span);
4641

4742
select! {
@@ -54,12 +49,6 @@ async fn main() -> eyre::Result<()> {
5449
_ = server => {
5550
tracing::info!("server finished");
5651
}
57-
_ = tx_poller_jh => {
58-
tracing::info!("tx_poller finished");
59-
}
60-
_ = bundle_poller_jh => {
61-
tracing::info!("bundle_poller finished");
62-
}
6352
_ = authenticator_jh => {
6453
tracing::info!("authenticator finished");
6554
}

src/config.rs

-4
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ const BLOCK_CONFIRMATION_BUFFER: &str = "BLOCK_CONFIRMATION_BUFFER";
2525
const BUILDER_REWARDS_ADDRESS: &str = "BUILDER_REWARDS_ADDRESS";
2626
const ROLLUP_BLOCK_GAS_LIMIT: &str = "ROLLUP_BLOCK_GAS_LIMIT";
2727
const TX_POOL_URL: &str = "TX_POOL_URL";
28-
const TX_POOL_POLL_INTERVAL: &str = "TX_POOL_POLL_INTERVAL";
2928
const AUTH_TOKEN_REFRESH_INTERVAL: &str = "AUTH_TOKEN_REFRESH_INTERVAL";
3029
const TX_POOL_CACHE_DURATION: &str = "TX_POOL_CACHE_DURATION";
3130
const OAUTH_CLIENT_ID: &str = "OAUTH_CLIENT_ID";
@@ -69,8 +68,6 @@ pub struct BuilderConfig {
6968
pub rollup_block_gas_limit: u64,
7069
/// URL of the tx pool to poll for incoming transactions.
7170
pub tx_pool_url: Cow<'static, str>,
72-
//// Interval in seconds to poll the tx-pool for new transactions.
73-
pub tx_pool_poll_interval: u64,
7471
/// Duration in seconds transactions can live in the tx-pool cache.
7572
pub tx_pool_cache_duration: u64,
7673
/// OAuth client ID for the builder.
@@ -155,7 +152,6 @@ impl BuilderConfig {
155152
builder_rewards_address: load_address(BUILDER_REWARDS_ADDRESS)?,
156153
rollup_block_gas_limit: load_u64(ROLLUP_BLOCK_GAS_LIMIT)?,
157154
tx_pool_url: load_url(TX_POOL_URL)?,
158-
tx_pool_poll_interval: load_u64(TX_POOL_POLL_INTERVAL)?,
159155
tx_pool_cache_duration: load_u64(TX_POOL_CACHE_DURATION)?,
160156
oauth_client_id: load_string(OAUTH_CLIENT_ID)?,
161157
oauth_client_secret: load_string(OAUTH_CLIENT_SECRET)?,

src/tasks/block.rs

+58-54
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,15 @@ use alloy::{
55
use alloy_primitives::{keccak256, Bytes, B256};
66
use alloy_rlp::Buf;
77
use std::{sync::OnceLock, time::Duration};
8-
use tokio::{select, sync::mpsc, task::JoinHandle};
8+
use tokio::{sync::mpsc, task::JoinHandle};
99
use tracing::Instrument;
1010
use zenith_types::{encode_txns, Alloy2718Coder};
1111

12+
use super::bundler::{Bundle, BundlePoller};
13+
use super::oauth::Authenticator;
14+
use super::tx_poller::TxPoller;
1215
use crate::config::BuilderConfig;
1316

14-
use super::bundler::Bundle;
15-
1617
#[derive(Debug, Default, Clone)]
1718
/// A block in progress.
1819
pub struct InProgressBlock {
@@ -109,75 +110,78 @@ impl InProgressBlock {
109110
pub struct BlockBuilder {
110111
pub incoming_transactions_buffer: u64,
111112
pub config: BuilderConfig,
113+
pub tx_poller: TxPoller,
114+
pub bundle_poller: BundlePoller,
112115
}
113116

114117
impl BlockBuilder {
115118
// create a new block builder with the given config.
116-
pub fn new(config: &BuilderConfig) -> Self {
119+
pub fn new(config: &BuilderConfig, authenticator: Authenticator) -> Self {
117120
Self {
118121
config: config.clone(),
119122
incoming_transactions_buffer: config.incoming_transactions_buffer,
123+
tx_poller: TxPoller::new(config),
124+
bundle_poller: BundlePoller::new(config, authenticator),
120125
}
121126
}
122127

123-
/// Spawn the block builder task, returning the inbound channel to it, and
124-
/// a handle to the running task.
125-
pub fn spawn(
126-
self,
127-
outbound: mpsc::UnboundedSender<InProgressBlock>,
128-
) -> (mpsc::UnboundedSender<TxEnvelope>, mpsc::UnboundedSender<Bundle>, JoinHandle<()>) {
129-
let mut in_progress = InProgressBlock::default();
130-
131-
let (tx_sender, mut tx_inbound) = mpsc::unbounded_channel();
132-
let (bundle_sender, mut bundle_inbound) = mpsc::unbounded_channel();
128+
async fn get_transactions(&mut self, in_progress: &mut InProgressBlock) {
129+
let txns = self.tx_poller.check_tx_cache().await;
130+
match txns {
131+
Ok(txns) => {
132+
for txn in txns.into_iter() {
133+
in_progress.ingest_tx(&txn);
134+
}
135+
}
136+
Err(e) => {
137+
tracing::error!(error = %e, "error polling transactions");
138+
}
139+
}
140+
self.tx_poller.evict();
141+
}
133142

134-
let mut sleep =
135-
Box::pin(tokio::time::sleep(Duration::from_secs(self.incoming_transactions_buffer)));
143+
async fn get_bundles(&mut self, in_progress: &mut InProgressBlock) {
144+
let bundles = self.bundle_poller.check_bundle_cache().await;
145+
match bundles {
146+
Ok(bundles) => {
147+
for bundle in bundles {
148+
in_progress.ingest_bundle(bundle);
149+
}
150+
}
151+
Err(e) => {
152+
tracing::error!(error = %e, "error polling bundles");
153+
}
154+
}
155+
self.bundle_poller.evict();
156+
}
136157

137-
let handle = tokio::spawn(
158+
/// Spawn the block builder task, returning the inbound channel to it, and
159+
/// a handle to the running task.
160+
pub fn spawn(mut self, outbound: mpsc::UnboundedSender<InProgressBlock>) -> JoinHandle<()> {
161+
tokio::spawn(
138162
async move {
139163
loop {
140-
141-
select! {
142-
biased;
143-
_ = &mut sleep => {
144-
if !in_progress.is_empty() {
145-
tracing::debug!(txns = in_progress.len(), "sending block to submit task");
146-
let in_progress_block = std::mem::take(&mut in_progress);
147-
if outbound.send(in_progress_block).is_err() {
148-
tracing::debug!("downstream task gone");
149-
break
150-
}
151-
}
152-
153-
// Reset the sleep timer, as we want to do so when (and only when) our sleep future has elapsed,
154-
// irrespective of whether we have any blocks to build.
155-
sleep.as_mut().reset(tokio::time::Instant::now() + Duration::from_secs(self.incoming_transactions_buffer));
156-
}
157-
tx_resp = tx_inbound.recv() => {
158-
match tx_resp {
159-
Some(tx) => in_progress.ingest_tx(&tx),
160-
None => {
161-
tracing::debug!("upstream task gone");
162-
break
163-
}
164-
}
165-
}
166-
bundle_resp = bundle_inbound.recv() => {
167-
match bundle_resp {
168-
Some(bundle) => in_progress.ingest_bundle(bundle),
169-
None => {
170-
tracing::debug!("upstream task gone");
171-
break
172-
}
173-
}
164+
// sleep the buffer time
165+
tokio::time::sleep(Duration::from_secs(self.incoming_transactions_buffer))
166+
.await;
167+
168+
// Build a block
169+
let mut in_progress = InProgressBlock::default();
170+
self.get_transactions(&mut in_progress).await;
171+
self.get_bundles(&mut in_progress).await;
172+
173+
// submit the block if it has transactions
174+
if !in_progress.is_empty() {
175+
tracing::debug!(txns = in_progress.len(), "sending block to submit task");
176+
let in_progress_block = std::mem::take(&mut in_progress);
177+
if outbound.send(in_progress_block).is_err() {
178+
tracing::debug!("downstream task gone");
179+
break;
174180
}
175181
}
176182
}
177183
}
178184
.in_current_span(),
179-
);
180-
181-
(tx_sender, bundle_sender, handle)
185+
)
182186
}
183187
}

src/tasks/bundler.rs

+2-34
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,6 @@ use alloy_primitives::map::HashMap;
66
use reqwest::Url;
77
use serde::{Deserialize, Serialize};
88
use signet_types::SignetEthBundle;
9-
use tokio::{sync::mpsc, task::JoinHandle};
10-
use tracing::debug;
119

1210
use oauth2::TokenResponse;
1311

@@ -34,7 +32,7 @@ pub struct BundlePoller {
3432
/// Implements a poller for the block builder to pull bundles from the tx cache.
3533
impl BundlePoller {
3634
/// Creates a new BundlePoller from the provided builder config.
37-
pub async fn new(config: &BuilderConfig, authenticator: Authenticator) -> Self {
35+
pub fn new(config: &BuilderConfig, authenticator: Authenticator) -> Self {
3836
Self { config: config.clone(), authenticator, seen_uuids: HashMap::new() }
3937
}
4038

@@ -73,7 +71,7 @@ impl BundlePoller {
7371
}
7472

7573
/// Evicts expired bundles from the cache.
76-
fn evict(&mut self) {
74+
pub fn evict(&mut self) {
7775
let expired_keys: Vec<String> = self
7876
.seen_uuids
7977
.iter()
@@ -92,34 +90,4 @@ impl BundlePoller {
9290
self.seen_uuids.remove(&key);
9391
}
9492
}
95-
96-
pub fn spawn(mut self, bundle_channel: mpsc::UnboundedSender<Bundle>) -> JoinHandle<()> {
97-
let handle: JoinHandle<()> = tokio::spawn(async move {
98-
loop {
99-
let bundle_channel = bundle_channel.clone();
100-
let bundles = self.check_bundle_cache().await;
101-
102-
match bundles {
103-
Ok(bundles) => {
104-
for bundle in bundles {
105-
let result = bundle_channel.send(bundle);
106-
if result.is_err() {
107-
tracing::debug!("bundle_channel failed to send bundle");
108-
}
109-
}
110-
}
111-
Err(err) => {
112-
debug!(?err, "error fetching bundles from tx-pool");
113-
}
114-
}
115-
116-
// evict expired bundles once every loop
117-
self.evict();
118-
119-
tokio::time::sleep(Duration::from_secs(self.config.tx_pool_poll_interval)).await;
120-
}
121-
});
122-
123-
handle
124-
}
12593
}

src/tasks/oauth.rs

+4-5
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ impl Authenticator {
114114
}
115115

116116
mod tests {
117-
use crate::{config::BuilderConfig, tasks::block::BlockBuilder};
117+
use crate::config::BuilderConfig;
118118
use alloy_primitives::Address;
119119
use eyre::Result;
120120

@@ -124,7 +124,7 @@ mod tests {
124124
use super::*;
125125
use oauth2::TokenResponse;
126126

127-
let config = setup_test_builder()?.1;
127+
let config = setup_test_config()?;
128128
let auth = Authenticator::new(&config);
129129
let token = auth.fetch_oauth_token().await?;
130130
dbg!(&token);
@@ -135,7 +135,7 @@ mod tests {
135135
}
136136

137137
#[allow(dead_code)]
138-
pub fn setup_test_builder() -> Result<(BlockBuilder, BuilderConfig)> {
138+
pub fn setup_test_config() -> Result<BuilderConfig> {
139139
let config = BuilderConfig {
140140
host_chain_id: 17000,
141141
ru_chain_id: 17001,
@@ -151,7 +151,6 @@ mod tests {
151151
rollup_block_gas_limit: 100_000,
152152
tx_pool_url: "http://localhost:9000/".into(),
153153
tx_pool_cache_duration: 5,
154-
tx_pool_poll_interval: 5,
155154
oauth_client_id: "some_client_id".into(),
156155
oauth_client_secret: "some_client_secret".into(),
157156
oauth_authenticate_url: "http://localhost:9000".into(),
@@ -160,6 +159,6 @@ mod tests {
160159
tx_broadcast_urls: vec!["http://localhost:9000".into()],
161160
oauth_token_refresh_interval: 300, // 5 minutes
162161
};
163-
Ok((BlockBuilder::new(&config), config))
162+
Ok(config)
164163
}
165164
}

src/tasks/tx_poller.rs

+1-36
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,6 @@ use eyre::Error;
88
use reqwest::{Client, Url};
99
use serde::{Deserialize, Serialize};
1010
use serde_json::from_slice;
11-
use tokio::sync::mpsc;
12-
use tokio::task::JoinHandle;
1311

1412
pub use crate::config::BuilderConfig;
1513

@@ -62,7 +60,7 @@ impl TxPoller {
6260
}
6361

6462
/// removes entries from seen_txns that have lived past expiry
65-
fn evict(&mut self) {
63+
pub fn evict(&mut self) {
6664
let expired_keys: Vec<TxHash> = self
6765
.seen_txns
6866
.iter()
@@ -81,37 +79,4 @@ impl TxPoller {
8179
self.seen_txns.remove(&key);
8280
}
8381
}
84-
85-
/// spawns a task that polls the tx-pool for unique transactions and ingests them into the tx_channel.
86-
pub fn spawn(mut self, tx_channel: mpsc::UnboundedSender<TxEnvelope>) -> JoinHandle<()> {
87-
let handle: JoinHandle<()> = tokio::spawn(async move {
88-
loop {
89-
let channel = tx_channel.clone();
90-
let txns = self.check_tx_cache().await;
91-
92-
// send recently discovered transactions to the builder pipeline
93-
match txns {
94-
Ok(txns) => {
95-
for txn in txns.into_iter() {
96-
let result = channel.send(txn);
97-
if result.is_err() {
98-
tracing::debug!("tx_poller failed to send tx");
99-
continue;
100-
}
101-
}
102-
}
103-
Err(e) => {
104-
println!("Error polling transactions: {}", e);
105-
}
106-
}
107-
108-
// evict expired txns once every loop
109-
self.evict();
110-
111-
tokio::time::sleep(Duration::from_secs(self.config.tx_pool_poll_interval)).await;
112-
}
113-
});
114-
115-
handle
116-
}
11782
}

0 commit comments

Comments
 (0)