Skip to content

simplify builder server #21

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Dec 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion bin/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ async fn main() -> eyre::Result<()> {
let tx_poller_jh = tx_poller.spawn(tx_channel.clone());
let bundle_poller_jh = bundle_poller.spawn(bundle_channel);

let server = serve_builder_with_span(tx_channel, ([0, 0, 0, 0], port), span);
let server = serve_builder_with_span(([0, 0, 0, 0], port), span);

select! {
_ = submit_jh => {
Expand Down
110 changes: 4 additions & 106 deletions src/service.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,11 @@
use std::{fmt::Debug, net::SocketAddr};

use alloy::consensus::TxEnvelope;
use alloy::network::eip2718::Decodable2718;
use alloy::rpc::json_rpc::{ErrorPayload, Id};
use alloy_primitives::B256;
use axum::{
extract::State,
http::StatusCode,
response::{IntoResponse, Response},
routing::{get, post},
Json, Router,
routing::get,
Router,
};
use serde_json::Value;
use tokio::sync::mpsc;
use tracing::{Instrument, Span};

/// App result
Expand Down Expand Up @@ -51,11 +44,6 @@ impl IntoResponse for AppError {
}
}

#[derive(Debug, Clone)]
pub struct ServiceState {
dispatch: mpsc::UnboundedSender<TxEnvelope>,
}

/// Return a 404 Not Found response
pub async fn return_404() -> Response {
(StatusCode::NOT_FOUND, "not found").into_response()
Expand All @@ -66,109 +54,19 @@ pub async fn return_200() -> Response {
(StatusCode::OK, "ok").into_response()
}

/// Dispatches a transaction to the backend.
pub async fn include_tx(state: ServiceState, tx: TxEnvelope) -> Result<B256, AppError> {
// Simple check to see if the transaction is signed correctly.
if let Err(e) = tx.recover_signer() {
return Err(AppError::bad_req(e));
}

let hash = *tx.tx_hash();
// send it to the backend
state.dispatch.send(tx).map_err(AppError::server_err)?;
// return the hash
Ok(hash)
}

/// Handler for the /sendTransaction endpoint
pub async fn ingest_handler(
State(state): State<ServiceState>,
Json(tx): Json<TxEnvelope>,
) -> Result<Response, AppError> {
let hash = include_tx(state, tx).await?;
Ok(hash.to_string().into_response())
}

/// Handler for the /sendRawTransaction endpoint
pub async fn ingest_raw_handler(
State(state): State<ServiceState>,
body: String,
) -> Result<Response, AppError> {
let body = body.strip_prefix("0x").unwrap_or(&body);
let buf = hex::decode(body).map_err(AppError::bad_req)?;
let envelope = TxEnvelope::decode_2718(&mut buf.as_slice()).map_err(AppError::bad_req)?;

ingest_handler(State(state), Json(envelope)).await
}

/// Handler for the /rpc endpoint.
/// Simulates the eth_sendRawTransaction JSON-RPC method
pub async fn ingest_rpc_handler(
State(state): State<ServiceState>,
body: String,
) -> Result<Response, AppError> {
// parse JSON-RPC values from request
let json = serde_json::from_str::<Value>(&body).map_err(AppError::bad_req)?;
let method = json["method"].as_str().expect("method not found");
let tx = json["params"][0].as_str().expect("params malformed");

let id = match &json["id"] {
Value::Number(n) => Id::Number(n.as_u64().unwrap_or_default()),
Value::String(s) => Id::String(s.clone()),
_ => Id::None,
};

// MUST be eth_sendRawTransaction method
if method != "eth_sendRawTransaction" {
return Ok(Json(alloy::rpc::json_rpc::Response {
payload: alloy::rpc::json_rpc::ResponsePayload::<(), ()>::Failure(ErrorPayload {
code: -6969,
message: "Method not found".into(),
data: None,
}),
id,
})
.into_response());
}

// parse TxEnvelope
let body: &str = tx.strip_prefix("0x").unwrap_or(tx);
let buf = hex::decode(body).map_err(AppError::bad_req)?;
let tx = TxEnvelope::decode_2718(&mut buf.as_slice()).map_err(AppError::bad_req)?;

let hash = include_tx(state, tx).await?;

// return JSON-RPC response
let resp = alloy::rpc::json_rpc::Response {
payload: alloy::rpc::json_rpc::ResponsePayload::<_, ()>::Success(hash),
id,
};

Ok(Json(resp).into_response())
}

/// Serve a builder service on the given socket address.
pub fn serve_builder_with_span(
dispatch: mpsc::UnboundedSender<TxEnvelope>,
socket: impl Into<SocketAddr>,
span: Span,
) -> tokio::task::JoinHandle<()> {
let state = ServiceState { dispatch };

let router: Router<ServiceState> = Router::new()
.route("/sendTransaction", post(ingest_handler))
.route("/sendRawTransaction", post(ingest_raw_handler))
.route("/rpc", post(ingest_rpc_handler))
.route("/healthcheck", get(return_200))
.fallback(return_404);
let app = router.with_state(state);
let router = Router::new().route("/healthcheck", get(return_200)).fallback(return_404);

let addr = socket.into();
tokio::spawn(
async move {
match tokio::net::TcpListener::bind(&addr).await {
Ok(listener) => {
if let Err(err) = axum::serve(listener, app).await {
if let Err(err) = axum::serve(listener, router).await {
tracing::error!(%err, "serve failed");
}
}
Expand Down
Loading