diff --git a/Cargo.lock b/Cargo.lock index 0cab694ea..b30d9392c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1818,6 +1818,53 @@ dependencies = [ "winapi 0.3.9", ] +[[package]] +name = "ntest" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "241de7455530a09d0d91dacd07165fbf78c422d2c06ff5c77791988ee1f6bf13" +dependencies = [ + "ntest_proc_macro_helper 0.8.0", + "ntest_test_cases", + "ntest_timeout", +] + +[[package]] +name = "ntest_proc_macro_helper" +version = "0.7.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f52e34b414605b77efc95c3f0ecef01df0c324bcc7f68d9a9cb7a7552777e52" + +[[package]] +name = "ntest_proc_macro_helper" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a0e328d267a679d683b55222b3d06c2fb7358220857945bfc4e65a6b531e9994" + +[[package]] +name = "ntest_test_cases" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6f7caf063242bb66721e74515dc01a915901063fa1f994bee7a2b9136f13370e" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "ntest_timeout" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8efc215375b8c392c77dc45dbe9d7f4802e36b7936808ccd71047a9b03443e6" +dependencies = [ + "ntest_proc_macro_helper 0.7.5", + "proc-macro-crate", + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "num" version = "0.4.0" @@ -3535,6 +3582,7 @@ dependencies = [ "keccak-hash", "lazy_static 1.4.0", "log 0.4.14", + "ntest", "num", "parking_lot 0.11.2", "prometheus_exporter", diff --git a/newsfragments/325.added.md b/newsfragments/325.added.md new file mode 100644 index 000000000..f137e04d5 --- /dev/null +++ b/newsfragments/325.added.md @@ -0,0 +1,6 @@ +- Rename `UtpSocket` to `UtpStream`. +- Refactor the way we are storing the received payload (DATA packets) in the uTP stream. +- Add a new AddActiveConnection UtpListener request and move the initialization of a uTP stream inside UtpListener. +- Add UtpStream -> UtpListener event channel and emit event inside UtpStream when stream state changes to Closed or Reset. +- Emit a global uTP listener event containing a uTP payload when a stream is closed. +- Remove redundant and dead code. diff --git a/src/lib.rs b/src/lib.rs index d4e8c1513..108b9e334 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -52,7 +52,7 @@ pub async fn run_trin( }; // Initialize and spawn UTP listener - let (utp_events_tx, utp_listener_tx, mut utp_listener) = + let (utp_events_tx, utp_listener_tx, _, mut utp_listener) = UtpListener::new(Arc::clone(&discovery)); tokio::spawn(async move { utp_listener.start().await }); diff --git a/trin-core/Cargo.toml b/trin-core/Cargo.toml index b89d90bae..1321852ff 100644 --- a/trin-core/Cargo.toml +++ b/trin-core/Cargo.toml @@ -69,3 +69,4 @@ features = ["bundled"] [dev-dependencies] quickcheck = "1.0.3" +ntest = "0.8.0" diff --git a/trin-core/src/jsonrpc/types.rs b/trin-core/src/jsonrpc/types.rs index e2841d6ae..fbccde635 100644 --- a/trin-core/src/jsonrpc/types.rs +++ b/trin-core/src/jsonrpc/types.rs @@ -10,7 +10,7 @@ use validator::{Validate, ValidationError}; use crate::{ jsonrpc::endpoints::{HistoryEndpoint, StateEndpoint, TrinEndpoint}, portalnet::types::{ - content_key::OverlayContentKey, + content_key::{OverlayContentKey, RawContentKey}, messages::{ByteList, CustomPayload, SszEnr}, }, utils::bytes::hex_decode, @@ -275,7 +275,7 @@ impl TryFrom<[&Value; 2]> for OfferParams { .collect(); if let Ok(content_keys) = content_keys { - let content_keys: Result>, _> = content_keys + let content_keys: Result, _> = content_keys .iter() .map(|s| hex_decode(s.as_str())) .collect(); diff --git a/trin-core/src/portalnet/discovery.rs b/trin-core/src/portalnet/discovery.rs index a7a415848..30d2d7bf8 100644 --- a/trin-core/src/portalnet/discovery.rs +++ b/trin-core/src/portalnet/discovery.rs @@ -14,6 +14,7 @@ use rand::seq::SliceRandom; use serde_json::{json, Value}; use std::{ convert::TryFrom, + fmt, net::{IpAddr, SocketAddr}, sync::Arc, time::Duration, @@ -54,6 +55,18 @@ pub struct Discovery { pub listen_socket: SocketAddr, } +impl fmt::Debug for Discovery { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!( + f, + "Discovery: ( enr: {}, started: {}, listen_socket: {} )", + self.discv5.local_enr(), + self.started, + self.listen_socket + ) + } +} + impl Discovery { pub fn new(portal_config: PortalnetConfig) -> Result { let listen_all_ips = SocketAddr::new("0.0.0.0".parse().unwrap(), portal_config.listen_port); diff --git a/trin-core/src/portalnet/overlay.rs b/trin-core/src/portalnet/overlay.rs index 082a7c040..5a2b60d72 100644 --- a/trin-core/src/portalnet/overlay.rs +++ b/trin-core/src/portalnet/overlay.rs @@ -15,9 +15,12 @@ use crate::portalnet::{ }, }; -use crate::utp::{ - stream::{UtpListenerRequest, UtpSocket, BUF_SIZE}, - trin_helpers::{UtpAccept, UtpMessage}, +use crate::{ + portalnet::types::content_key::RawContentKey, + utp::{ + stream::{UtpListenerRequest, UtpStream, BUF_SIZE}, + trin_helpers::{UtpAccept, UtpMessage, UtpStreamId}, + }, }; use discv5::{ enr::NodeId, @@ -277,48 +280,42 @@ impl enr: Enr, conn_id: u16, ) -> Result { - let utp_request = UtpListenerRequest::FindContentStream(conn_id); - if let Err(err) = self.utp_listener_tx.send(utp_request) { - return Err(OverlayRequestError::UtpError(format!( - "Unable to send uTP FindContent stream request: {err}" - ))); - } - // initiate the connection to the acceptor - let (tx, rx) = tokio::sync::oneshot::channel::>(); - - let _ = self - .utp_listener_tx - .send(UtpListenerRequest::Connect(conn_id, enr.node_id(), tx)); + let (tx, rx) = tokio::sync::oneshot::channel::(); + let utp_request = UtpListenerRequest::Connect( + conn_id, + enr, + self.protocol.clone(), + UtpStreamId::FindContentStream, + tx, + ); + + self.utp_listener_tx.send(utp_request).map_err(|err| { + OverlayRequestError::UtpError(format!( + "Unable to send Connect request with FindContent stream to UtpListener: {err}" + )) + })?; match rx.await { - Ok(conn) => { - match conn { - Ok(mut conn) => { - let mut result = Vec::new(); - // Loop and receive all DATA packets, similar to `read_to_end` - loop { - let mut buf = [0; BUF_SIZE]; - match conn.recv_from(&mut buf).await { - Ok((0, _)) => { - break; - } - Ok((bytes, _)) => { - result.extend_from_slice(&mut buf[..bytes]); - } - Err(err) => { - warn!("Unable to receive content via uTP: {err}"); - return Err(OverlayRequestError::UtpError(err.to_string())); - } - } + Ok(mut conn) => { + let mut result = Vec::new(); + // Loop and receive all DATA packets, similar to `read_to_end` + loop { + let mut buf = [0; BUF_SIZE]; + match conn.recv_from(&mut buf).await { + Ok((0, _)) => { + break; + } + Ok((bytes, _)) => { + result.extend_from_slice(&mut buf[..bytes]); + } + Err(err) => { + warn!("Unable to receive content via uTP: {err}"); + return Err(OverlayRequestError::UtpError(err.to_string())); } - Ok(Content::Content(VariableList::from(result))) - } - Err(err) => { - warn!("Unable to initiate uTP stream with remote node. Error initializing uTP socket: {err}"); - Err(OverlayRequestError::UtpError(err.to_string())) } } + Ok(Content::Content(VariableList::from(result))) } Err(err) => { warn!("Unable to receive from uTP listener channel: {err}"); @@ -331,7 +328,7 @@ impl /// Offer is also sent to nodes after FindContent (POKE) pub async fn send_offer( &self, - content_keys: Vec>, + content_keys: Vec, enr: Enr, ) -> Result { // Construct the request. @@ -384,49 +381,44 @@ impl return Ok(response); } - let utp_request = UtpListenerRequest::OfferStream(conn_id); - if let Err(err) = self.utp_listener_tx.send(utp_request) { - return Err(anyhow!("Unable to send uTP Offer stream request: {err}")); - } - // initiate the connection to the acceptor - let (tx, rx) = tokio::sync::oneshot::channel::>(); - - let _ = self - .utp_listener_tx - .send(UtpListenerRequest::Connect(conn_id, enr.node_id(), tx)); + let (tx, rx) = tokio::sync::oneshot::channel::(); + let utp_request = UtpListenerRequest::Connect( + conn_id, + enr, + self.protocol.clone(), + UtpStreamId::OfferStream, + tx, + ); + + self.utp_listener_tx + .send(utp_request).map_err(|err| anyhow!("Unable to send Connect request to UtpListener when processing ACCEPT message: {err}"))?; + + let mut conn = rx.await?; + // Handle STATE packet for SYN + let mut buf = [0; BUF_SIZE]; + conn.recv(&mut buf).await?; + + let content_items = self.provide_requested_content(&response, content_keys_offered); + + let content_message = UtpAccept { + message: content_items, + }; - match rx.await? { - Ok(mut conn) => { - // Handle STATE packet for SYN - let mut buf = [0; BUF_SIZE]; - conn.recv(&mut buf).await?; - - let content_items = self.provide_requested_content(&response, content_keys_offered); - - let content_message = UtpAccept { - message: content_items, - }; - - tokio::spawn(async move { - // send the content to the acceptor over a uTP stream - if let Err(err) = conn - .send_to(&UtpMessage::new(content_message.as_ssz_bytes()).encode()[..]) - .await - { - warn!("Error sending content {err}"); - }; - // Close uTP connection - if let Err(err) = conn.close().await { - warn!("Unable to close uTP connection!: {err}") - }; - }); - Ok(response) - } - Err(err) => Err(anyhow!( - "Unable to initialize Offer uTP stream with remote node: {err}" - )), - } + tokio::spawn(async move { + // send the content to the acceptor over a uTP stream + if let Err(err) = conn + .send_to(&UtpMessage::new(content_message.as_ssz_bytes()).encode()[..]) + .await + { + warn!("Error sending content {err}"); + }; + // Close uTP connection + if let Err(err) = conn.close().await { + warn!("Unable to close uTP connection!: {err}") + }; + }); + Ok(response) } /// Provide the requested content key and content value for the acceptor diff --git a/trin-core/src/portalnet/overlay_service.rs b/trin-core/src/portalnet/overlay_service.rs index aa9b3e443..91e706f8e 100644 --- a/trin-core/src/portalnet/overlay_service.rs +++ b/trin-core/src/portalnet/overlay_service.rs @@ -19,6 +19,7 @@ use crate::{ utp::stream::UtpListenerRequest, }; +use crate::utp::trin_helpers::UtpStreamId; use delay_map::HashSetDelay; use discv5::{ enr::NodeId, @@ -45,6 +46,8 @@ pub const FIND_CONTENT_MAX_NODES: usize = 32; /// With even distribution assumptions, 2**17 is enough to put each node (estimating 100k nodes, /// which is more than 10x the ethereum mainnet node count) into a unique bucket by the 17th bucket index. const EXPECTED_NON_EMPTY_BUCKETS: usize = 17; +/// Bucket refresh lookup interval in seconds +const BUCKET_REFRESH_INTERVAL_SECS: u64 = 60; /// An overlay request error. #[derive(Clone, Error, Debug)] @@ -377,7 +380,8 @@ impl /// Bucket maintenance: Maintain the routing table (more info documented above function). async fn start(&mut self) { // Construct bucket refresh interval - let mut bucket_refresh_interval = tokio::time::interval(Duration::from_secs(60)); + let mut bucket_refresh_interval = + tokio::time::interval(Duration::from_secs(BUCKET_REFRESH_INTERVAL_SECS)); loop { tokio::select! { @@ -417,6 +421,34 @@ impl } } + /// Send request to UtpListener to add a uTP stream to the active connections + fn add_utp_connection( + &self, + source: &NodeId, + conn_id_recv: u16, + stream_id: UtpStreamId, + ) -> Result<(), OverlayRequestError> { + if let Some(enr) = self.find_enr(source) { + // Initialize active uTP stream with requesting node + let utp_request = UtpListenerRequest::InitiateConnection( + enr, + self.protocol.clone(), + stream_id, + conn_id_recv, + ); + if let Err(err) = self.utp_listener_tx.send(utp_request) { + return Err(OverlayRequestError::UtpError(format!( + "Unable to send uTP AddActiveConnection request: {err}" + ))); + } + Ok(()) + } else { + Err(OverlayRequestError::UtpError( + "Can't find ENR in overlay routing table matching remote NodeId".to_string(), + )) + } + } + /// Main bucket refresh lookup logic fn bucket_refresh_lookup(&self) { // Look at local routing table and select the largest 17 buckets. @@ -479,6 +511,16 @@ impl .await } + /// Returns an ENR if one is known for the given NodeId in overlay routing table + pub fn find_enr(&self, node_id: &NodeId) -> Option { + // check if we know this node id in our routing table + let key = kbucket::Key::from(*node_id); + if let kbucket::Entry::Present(entry, _) = self.kbuckets.write().entry(&key) { + return Some(entry.value().enr.clone()); + } + None + } + /// Processes an overlay request. fn process_request(&mut self, request: OverlayRequest) { // For incoming requests, handle the request, possibly send the response over the channel, @@ -489,8 +531,7 @@ impl // channel if the request was initiated internally (e.g. for maintenance). match request.direction { RequestDirection::Incoming { id, source } => { - let response = - self.handle_request(request.request.clone(), id.clone(), source.clone()); + let response = self.handle_request(request.request.clone(), id.clone(), &source); // Send response to responder if present. if let Some(responder) = request.responder { let _ = responder.send(response); @@ -522,9 +563,9 @@ impl fn initialize_request(&mut self, request: Request) -> Result { debug!("[{:?}] Initializing request", self.protocol); match request { - Request::FindContent(find_content) => { - Ok(Response::Content(self.handle_find_content(find_content)?)) - } + Request::FindContent(find_content) => Ok(Response::Content( + self.handle_find_content(find_content, None)?, + )), _ => Err(OverlayRequestError::InvalidRequest( "Initializing this overlay service request is not yet supported.".to_string(), )), @@ -536,23 +577,23 @@ impl &mut self, request: Request, id: RequestId, - source: NodeId, + source: &NodeId, ) -> Result { debug!("[{:?}] Handling request {}", self.protocol, id); match request { - Request::Ping(ping) => Ok(Response::Pong(self.handle_ping(ping, source))), + Request::Ping(ping) => Ok(Response::Pong(self.handle_ping(ping, &source))), Request::FindNodes(find_nodes) => { Ok(Response::Nodes(self.handle_find_nodes(find_nodes))) } - Request::FindContent(find_content) => { - Ok(Response::Content(self.handle_find_content(find_content)?)) - } - Request::Offer(offer) => Ok(Response::Accept(self.handle_offer(offer)?)), + Request::FindContent(find_content) => Ok(Response::Content( + self.handle_find_content(find_content, Some(&source))?, + )), + Request::Offer(offer) => Ok(Response::Accept(self.handle_offer(offer, source)?)), } } /// Builds a `Pong` response for a `Ping` request. - fn handle_ping(&self, request: Ping, source: NodeId) -> Pong { + fn handle_ping(&self, request: Ping, source: &NodeId) -> Pong { debug!( "[{:?}] Handling ping request from node={}. Ping={:?}", self.protocol, source, request @@ -582,7 +623,11 @@ impl } /// Attempts to build a `Content` response for a `FindContent` request. - fn handle_find_content(&self, request: FindContent) -> Result { + fn handle_find_content( + &self, + request: FindContent, + source: Option<&NodeId>, + ) -> Result { self.metrics .as_ref() .and_then(|m| Some(m.report_inbound_find_content())); @@ -604,29 +649,30 @@ impl if content.len() < 1000 { Ok(Content::Content(content)) } else { - let conn_id: u16 = crate::utp::stream::rand(); - - // listen for incoming connection request on conn_id, as part of utp handshake and - // temporarily storing content data, so we can send it right after we receive - // SYN packet from the requester - let utp_request = UtpListenerRequest::FindContentData(conn_id, content); - if let Err(err) = self.utp_listener_tx.send(utp_request) { - return Err(OverlayRequestError::UtpError(format!( - "Unable to send uTP FindContentData stream request: {err}" - ))); - } - - // also listen on conn_id + 1 because this is the receive path - let conn_id_recv = conn_id.wrapping_add(1); - let utp_request = UtpListenerRequest::FindContentStream(conn_id_recv); - if let Err(err) = self.utp_listener_tx.send(utp_request) { - return Err(OverlayRequestError::UtpError(format!( - "Unable to send uTP FindContent stream request: {err}" - ))); + match source { + Some(source) => { + let conn_id: u16 = crate::utp::stream::rand(); + + // Listen for incoming uTP connection request on as part of uTP handshake and + // storing content data, so we can send it inside UtpListener right after we receive + // SYN packet from the requester + let conn_id_recv = conn_id.wrapping_add(1); + + self.add_utp_connection( + source, + conn_id_recv, + UtpStreamId::ContentStream(content), + )?; + + // Connection id is send as BE because uTP header values are stored also as BE + Ok(Content::ConnectionId(conn_id.to_be())) + + }, + None => { + return Err(OverlayRequestError::UtpError( + "Unable to start listening for uTP stream because source NodeID is not provided".to_string())) + } } - - // Connection id is send as BE because uTP header values are stored also as BE - Ok(Content::ConnectionId(conn_id.to_be())) } } Ok(None) => { @@ -637,14 +683,13 @@ impl } } Err(msg) => Err(OverlayRequestError::Failure(format!( - "Unable to respond to FindContent: {}", - msg + "Unable to respond to FindContent: {msg}", ))), } } /// Attempts to build an `Accept` response for an `Offer` request. - fn handle_offer(&self, request: Offer) -> Result { + fn handle_offer(&self, request: Offer, source: &NodeId) -> Result { self.metrics .as_ref() .and_then(|m| Some(m.report_inbound_offer())); @@ -681,23 +726,10 @@ impl })?; } - // listen for incoming connection request on conn_id, as part of utp handshake + // Listen for incoming connection request on conn_id, as part of utp handshake let conn_id: u16 = crate::utp::stream::rand(); - let utp_request = UtpListenerRequest::OfferStream(conn_id); - if let Err(err) = self.utp_listener_tx.send(utp_request) { - return Err(OverlayRequestError::UtpError(format!( - "Unable to send uTP Offer stream request: {err}" - ))); - } - // also listen on conn_id + 1 because this is the actual receive path for acceptor - let conn_id_recv = conn_id.wrapping_add(1); - let utp_request = UtpListenerRequest::AcceptStream(conn_id_recv, accept_keys); - if let Err(err) = self.utp_listener_tx.send(utp_request) { - return Err(OverlayRequestError::UtpError(format!( - "Unable to send uTP Accept stream request: {err}" - ))); - } + self.add_utp_connection(source, conn_id, UtpStreamId::AcceptStream(accept_keys))?; let accept = Accept { connection_id: conn_id.to_be(), diff --git a/trin-core/src/portalnet/types/content_key.rs b/trin-core/src/portalnet/types/content_key.rs index 0cccbaf33..0646b3353 100644 --- a/trin-core/src/portalnet/types/content_key.rs +++ b/trin-core/src/portalnet/types/content_key.rs @@ -6,6 +6,9 @@ use ssz::{self, Decode, Encode}; use ssz_derive::{Decode, Encode}; use ssz_types::{typenum, FixedVector, VariableList}; +/// SSZ encoded overlay content key as bytes +pub type RawContentKey = Vec; + /// Types whose values represent keys to lookup content items in an overlay network. /// Keys are serializable. pub trait OverlayContentKey: Into> + TryFrom> + Clone { diff --git a/trin-core/src/portalnet/types/messages.rs b/trin-core/src/portalnet/types/messages.rs index b66274d62..748baf128 100644 --- a/trin-core/src/portalnet/types/messages.rs +++ b/trin-core/src/portalnet/types/messages.rs @@ -17,7 +17,7 @@ use ssz_types::{typenum, BitList, VariableList}; use thiserror::Error; use validator::ValidationError; -use crate::portalnet::Enr; +use crate::portalnet::{types::content_key::RawContentKey, Enr}; pub type ByteList = VariableList; @@ -143,7 +143,7 @@ pub enum ProtocolIdError { } /// Protocol identifiers -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq)] pub enum ProtocolId { State, History, @@ -492,7 +492,7 @@ impl TryInto for Content { #[derive(Debug, PartialEq, Clone, Encode, Decode)] pub struct Offer { - pub content_keys: Vec>, + pub content_keys: Vec, } #[derive(Debug, PartialEq, Clone, Encode, Decode)] diff --git a/trin-core/src/utp/stream.rs b/trin-core/src/utp/stream.rs index 280ccbf67..e66e48caa 100644 --- a/trin-core/src/utp/stream.rs +++ b/trin-core/src/utp/stream.rs @@ -21,11 +21,11 @@ use tokio::{ use crate::{ locks::RwLoggingExt, - portalnet::types::messages::{ByteList, Content::Content, ProtocolId}, + portalnet::types::messages::ProtocolId, utp::{ packets::{ExtensionType, Packet, PacketType, HEADER_SIZE}, time::{now_microseconds, Delay, Timestamp}, - trin_helpers::{UtpMessage, UtpMessageId}, + trin_helpers::{UtpMessage, UtpStreamId}, util::{abs_diff, ewma, generate_sequential_identifiers}, }, }; @@ -62,6 +62,9 @@ const DISCV5_SOCKET_TIMEOUT: u64 = 25; /// uTP connection id type ConnId = u16; +/// uTP payload data +pub type UtpPayload = Vec; + pub fn rand() -> u16 { rand::thread_rng().gen() } @@ -82,9 +85,9 @@ impl ConnectionKey { } } -/// uTP socket connection state +/// uTP stream connection state #[derive(PartialEq, Eq, Clone, Copy, Debug)] -pub enum SocketState { +pub enum StreamState { Uninitialized, SynSent, SynRecv, @@ -102,31 +105,56 @@ struct DelayDifferenceSample { /// Represent overlay to uTP listener request. It is used as a way to communicate between the overlay protocol /// and uTP listener +#[derive(Debug)] pub enum UtpListenerRequest { - /// Request to listen for Accept stream - AcceptStream(ConnId, Vec>), - /// Request to initialize uTP streram with remote node - Connect(ConnId, NodeId, oneshot::Sender>), - /// Request to listen for FindCOntent stream and send content data - FindContentData(ConnId, ByteList), - /// Request to listen for FindContent stream - FindContentStream(ConnId), - /// Request to listen for Offer stream - OfferStream(ConnId), + /// Request to create and connect to a uTP stream initiated by a remote node + Connect( + ConnId, + Enr, + ProtocolId, + UtpStreamId, + oneshot::Sender, + ), + /// Request to initiate and add uTP stream to the connections hash map + InitiateConnection(Enr, ProtocolId, UtpStreamId, ConnId), +} + +/// Emit global event to overlay handler +#[derive(Debug, PartialEq)] +pub enum UtpListenerEvent { + /// uTP stream is closed + ClosedStream(UtpPayload, ProtocolId, UtpStreamId), + /// uTP stream is reset + ResetStream(ProtocolId, UtpStreamId), +} + +/// uTP stream state events emitted from `UtpStream` +#[derive(Clone, Debug)] +pub enum UtpStreamEvent { + /// Event signaling that a UtpStream has completed, containing received uTP payload, protocol id, + /// receive connection id and node id of the remote peer + Closed(UtpPayload, ProtocolId, UtpStreamId, ConnectionKey), + /// Event signaling that a UtpStream has been reset, containing protocol id, receive connection id + /// and node id of the remote peer + Reset(ProtocolId, UtpStreamId, ConnectionKey), } -// Basically the same idea as in the official Bit Torrent library we will store all of the active connections data here +/// Main uTP service used to listen and handle all uTP connections and streams pub struct UtpListener { /// Base discv5 layer discovery: Arc, /// Store all active connections - utp_connections: HashMap, - /// uTP connection ids to listen for - listening: HashMap, + utp_connections: HashMap, /// Receiver for uTP events sent from the main portal event handler utp_event_rx: UnboundedReceiver, + /// Sender to overlay layer with processed uTP stream + overlay_tx: UnboundedSender, /// Receiver for uTP requests sent from the overlay layer overlay_rx: UnboundedReceiver, + /// Sender used in UtpStream to emit stream state events + stream_tx: UnboundedSender, + /// Receiver for uTP stream state events + stream_rx: UnboundedReceiver, } impl UtpListener { @@ -135,22 +163,30 @@ impl UtpListener { ) -> ( UnboundedSender, UnboundedSender, + UnboundedReceiver, Self, ) { // Channel to process uTP TalkReq packets from main portal event handler let (utp_event_tx, utp_event_rx) = unbounded_channel::(); // Channel to process portal overlay requests let (utp_listener_tx, utp_listener_rx) = unbounded_channel::(); + // Channel to emit processed uTP payload to overlay service + let (overlay_tx, overlay_rx) = unbounded_channel::(); + // Channel to emit stream events from UtpStream + let (stream_tx, stream_rx) = unbounded_channel::(); ( utp_event_tx, utp_listener_tx, + overlay_rx, UtpListener { discovery, utp_connections: HashMap::new(), - listening: HashMap::new(), utp_event_rx, + overlay_tx, overlay_rx: utp_listener_rx, + stream_tx, + stream_rx, }, ) } @@ -164,6 +200,9 @@ impl UtpListener { }, Some(overlay_request) = self.overlay_rx.recv() => { self.process_overlay_request(overlay_request).await + }, + Some(stream_event) = self.stream_rx.recv() => { + self.process_stream_event(stream_event) } } } @@ -195,9 +234,8 @@ impl UtpListener { } } PacketType::Syn => { - if let Some(enr) = self.discovery.discv5.find_enr(node_id) { - // If neither of those cases happened handle this is a new request - let mut conn = UtpSocket::new(Arc::clone(&self.discovery), enr.clone()); + let conn_key = ConnectionKey::new(*node_id, connection_id); + if let Some(conn) = self.utp_connections.get_mut(&conn_key) { if conn.discv5_tx.send(packet).is_err() { error!("Unable to send SYN packet to uTP stream handler"); return; @@ -210,73 +248,65 @@ impl UtpListener { return; } - self.utp_connections.insert( - ConnectionKey::new(*node_id, conn.receiver_connection_id), - conn.clone(), - ); - - // Get ownership of FindContentData and re-add the receiver connection - let utp_message_id = self.listening.remove(&conn.sender_connection_id); - - // TODO: Probably there is a better way with lifetimes to pass the HashMap value to a - // different thread without removing the key and re-adding it. - self.listening - .insert(conn.sender_connection_id, UtpMessageId::FindContentStream); - - if let Some(UtpMessageId::FindContentData(Content(content_data))) = - utp_message_id + // Send content data if the stream is listening for FindContent SYN packet + if let UtpStreamId::ContentStream(content_data) = conn.stream_id.clone() + // TODO: Change this `clone` to borrow after rust 1.62 { // We want to send uTP data only if the content is Content(ByteList) - tokio::spawn(async move { - debug!( - "Sending content data via uTP with len: {}", - content_data.len() - ); - // send the content to the requestor over a uTP stream - if let Err(msg) = conn - .send_to( - &UtpMessage::new(content_data.as_ssz_bytes()).encode() - [..], - ) - .await - { - error!("Error sending content {msg}"); - } else { - // Close uTP connection - if let Err(msg) = conn.close().await { - error!("Unable to close uTP connection!: {msg}") - } - }; - }); + debug!( + "Sending content data via uTP with len: {}", + content_data.len() + ); + // send the content to the requestor over a uTP stream + if let Err(msg) = conn + .send_to( + &UtpMessage::new(content_data.as_ssz_bytes()).encode()[..], + ) + .await + { + error!("Error sending content {msg}"); + } else { + // Close uTP connection + if let Err(msg) = conn.close().await { + error!("Unable to close uTP connection!: {msg}") + } + }; } } else { - warn!("Query requested an unknown ENR"); + warn!("Received SYN packet for an unknown active uTP stream"); } } // Receive DATA and FIN packets PacketType::Data => { if let Some(conn) = self .utp_connections - .get_mut(&ConnectionKey::new(*node_id, connection_id)) + .get_mut(&ConnectionKey::new(*node_id, connection_id.wrapping_sub(1))) { if conn.discv5_tx.send(packet.clone()).is_err() { error!("Unable to send DATA packet to uTP stream handler"); return; } + let mut result = Vec::new(); + let mut buf = [0; BUF_SIZE]; - if let Err(msg) = conn.recv(&mut buf).await { - error!("Unable to receive uTP DATA packet: {msg}") - } else { - conn.recv_data_stream - .append(&mut Vec::from(packet.payload())); + match conn.recv(&mut buf).await { + Ok(bytes_read) => { + if let Some(bytes) = bytes_read { + result.extend_from_slice(&buf[..bytes]); + conn.recv_data_stream.append(&mut result); + } + } + Err(err) => error!("Unable to receive uTP DATA packet: {err}"), } + } else { + warn!("Received DATA packet for an unknown active uTP stream") } } PacketType::Fin => { if let Some(conn) = self .utp_connections - .get_mut(&ConnectionKey::new(*node_id, connection_id)) + .get_mut(&ConnectionKey::new(*node_id, connection_id.wrapping_sub(1))) { if conn.discv5_tx.send(packet).is_err() { error!("Unable to send FIN packet to uTP stream handler"); @@ -287,6 +317,8 @@ impl UtpListener { if let Err(msg) = conn.recv(&mut buf).await { error!("Unable to receive uTP FIN packet: {msg}") } + } else { + warn!("Received FIN packet for an unknown active uTP stream") } } PacketType::State => { @@ -299,6 +331,8 @@ impl UtpListener { } // We don't handle STATE packets here, because the uTP client is handling them // implicitly in the background when sending FIN packet with conn.close() + } else { + warn!("Received STATE packet for an unknown active uTP stream"); } } } @@ -312,26 +346,61 @@ impl UtpListener { /// Process overlay uTP requests async fn process_overlay_request(&mut self, request: UtpListenerRequest) { match request { - UtpListenerRequest::FindContentStream(conn_id) => { - self.listening - .insert(conn_id, UtpMessageId::FindContentStream); + UtpListenerRequest::InitiateConnection( + connected_to, + protocol_id, + stream_id, + conn_id_recv, + ) => { + let conn = UtpStream::new( + Arc::clone(&self.discovery), + connected_to.clone(), + protocol_id, + stream_id, + Some(self.stream_tx.clone()), + ); + let conn_key = ConnectionKey::new(connected_to.node_id(), conn_id_recv); + self.utp_connections.insert(conn_key, conn); } - UtpListenerRequest::Connect(conn_id, node_id, tx) => { - let conn = self.connect(conn_id, node_id).await; + UtpListenerRequest::Connect(conn_id, enr, protocol_id, stream_id, tx) => { + let conn = self.connect(conn_id, enr, protocol_id, stream_id).await; if tx.send(conn).is_err() { - warn!("Unable to send uTP socket to requester") + error!("Unable to send the uTP stream to requester") }; } - UtpListenerRequest::OfferStream(conn_id) => { - self.listening.insert(conn_id, UtpMessageId::OfferStream); - } - UtpListenerRequest::FindContentData(conn_id, content) => { - self.listening - .insert(conn_id, UtpMessageId::FindContentData(Content(content))); + } + } + + /// Emit global uTP listener event upon processing uTP stream event + fn process_stream_event(&mut self, event: UtpStreamEvent) { + match event { + UtpStreamEvent::Closed(utp_payload, protocol_id, stream_id, conn_key) => { + // Remove closed stream from active connections + if self.utp_connections.remove(&conn_key).is_none() { + error!("Unable to remove closed uTP stream from active connections, STREAM_CONN_ID_RECV: {}, CONNECTED_TO: {}", conn_key.conn_id_recv, conn_key.node_id); + } + + // Emit global event to overlay handler + if let Err(err) = self.overlay_tx.send(UtpListenerEvent::ClosedStream( + utp_payload, + protocol_id, + stream_id, + )) { + error!("Unable to send ClosedStream event to overlay handler: {err}"); + } } - UtpListenerRequest::AcceptStream(conn_id, accepted_keys) => { - self.listening - .insert(conn_id, UtpMessageId::AcceptStream(accepted_keys)); + UtpStreamEvent::Reset(protocol_id, stream_id, conn_key) => { + // Remove reset stream from active connections + if self.utp_connections.remove(&conn_key).is_none() { + error!("Unable to remove reset uTP stream from active connections, STREAM_CONN_ID_RECV: {}, CONNECTED_TO: {}", conn_key.conn_id_recv, conn_key.node_id); + } + + if let Err(err) = self + .overlay_tx + .send(UtpListenerEvent::ResetStream(protocol_id, stream_id)) + { + error!("Unable to send ResetStream event to overlay handler: {err}"); + } } } } @@ -340,58 +409,45 @@ impl UtpListener { async fn connect( &mut self, connection_id: ConnId, - node_id: NodeId, - ) -> anyhow::Result { - if let Some(enr) = self.discovery.discv5.find_enr(&node_id) { - let mut conn = UtpSocket::new(Arc::clone(&self.discovery), enr); - conn.make_connection(connection_id).await; - self.utp_connections.insert( - ConnectionKey::new(node_id, conn.receiver_connection_id), - conn.clone(), - ); - Ok(conn) - } else { - Err(anyhow!("Trying to connect to unknow Enr")) - } - } + enr: Enr, + protocol_id: ProtocolId, + stream_id: UtpStreamId, + ) -> UtpStream { + let mut conn = UtpStream::new( + Arc::clone(&self.discovery), + enr.clone(), + protocol_id, + stream_id, + Some(self.stream_tx.clone()), + ); + conn.make_connection(connection_id).await; + self.utp_connections.insert( + ConnectionKey::new(enr.node_id(), conn.receiver_connection_id), + conn.clone(), + ); - // https://github.com/ethereum/portal-network-specs/pull/98\ - // Currently the way to handle data over uTP isn't finalized yet, so we are going to use the - // handle data on connection closed method, as that seems to be the accepted method for now. - pub async fn process_utp_byte_stream(&mut self) { - let mut utp_connections = self.utp_connections.clone(); - for (conn_key, conn) in self.utp_connections.iter_mut() { - if conn.state == SocketState::Closed { - let received_stream = conn.recv_data_stream.clone(); - debug!("Received data: with len: {}", received_stream.len()); - - match self.listening.get(&conn.receiver_connection_id) { - Some(message_type) => { - if let UtpMessageId::AcceptStream(content_keys) = message_type { - // TODO: Implement this with overlay store and decode receiver stream if multiple content values are send - debug!("Store {content_keys:?}, {received_stream:?}"); - } - } - _ => warn!("uTP listening HashMap doesn't have uTP stream message type"), - } - utp_connections.remove(conn_key); - } - } + conn } } // Used to be MicroTransportProtocol impl but it is basically just called UtpStream compared to the // Rust Tcp Lib so I changed it -#[derive(Clone)] -pub struct UtpSocket { +#[derive(Debug, Clone)] +pub struct UtpStream { /// The wrapped discv5 protocol socket: Arc, - /// Socket state - pub state: SocketState, + /// uTP stream state + pub state: StreamState, - // Remote peer - connected_to: Enr, + /// ENR of the connected remote peer + pub connected_to: Enr, + + /// Overlay protocol identifier + protocol_id: ProtocolId, + + /// Overlay uTP stream id + stream_id: UtpStreamId, /// Sequence number for the next packet seq_nr: u16, @@ -423,7 +479,7 @@ pub struct UtpSocket { /// Sent but not yet acknowledged packets send_window: Vec, - /// How many ACKs did the socket receive for packet with sequence number equal to `ack_nr` + /// How many ACKs did the stream receive for packet with sequence number equal to `ack_nr` duplicate_ack_count: u8, /// Sequence number of the latest packet the remote peer acknowledged @@ -468,17 +524,29 @@ pub struct UtpSocket { /// Receive channel for discv5 socket discv5_rx: Arc>>, + /// Sender to emit stream events to UtpListener + event_tx: Option>, + + /// Store received uTP payload data over the stream pub recv_data_stream: Vec, } -impl UtpSocket { - pub fn new(socket: Arc, connected_to: Enr) -> Self { +impl UtpStream { + pub fn new( + socket: Arc, + connected_to: Enr, + protocol_id: ProtocolId, + stream_id: UtpStreamId, + utp_listener_tx: Option>, + ) -> Self { let (receiver_id, sender_id) = generate_sequential_identifiers(); let (discv5_tx, discv5_rx) = unbounded_channel::(); Self { - state: SocketState::Uninitialized, + state: StreamState::Uninitialized, + protocol_id, + stream_id, seq_nr: 1, ack_nr: 0, receiver_connection_id: receiver_id, @@ -507,6 +575,7 @@ impl UtpSocket { max_retransmission_retries: MAX_RETRANSMISSION_RETRIES, discv5_tx, discv5_rx: Arc::new(RwLock::new(discv5_rx)), + event_tx: utp_listener_tx, } } @@ -522,8 +591,8 @@ impl UtpSocket { // Note that the buffer passed to `send_to` might exceed the maximum packet // size, which will result in the data being split over several packets. pub async fn send_to(&mut self, buf: &[u8]) -> anyhow::Result { - if self.state == SocketState::Closed { - return Err(anyhow!("The socket is closed")); + if self.state == StreamState::Closed { + return Err(anyhow!("The stream is closed")); } let total_length = buf.len(); @@ -685,7 +754,7 @@ impl UtpSocket { } async fn make_connection(&mut self, connection_id: ConnId) { - if self.state == SocketState::Uninitialized { + if self.state == StreamState::Uninitialized { self.receiver_connection_id = connection_id; self.sender_connection_id = self.receiver_connection_id + 1; @@ -695,7 +764,7 @@ impl UtpSocket { packet.set_seq_nr(self.seq_nr); self.send_packet(&mut packet).await; - self.state = SocketState::SynSent; + self.state = StreamState::SynSent; } } @@ -730,7 +799,7 @@ impl UtpSocket { packet.set_ack_nr(self.ack_nr); self.send_packet(&mut packet).await; - self.state = SocketState::FinSent; + self.state = StreamState::FinSent; } #[async_recursion] @@ -742,7 +811,7 @@ impl UtpSocket { ); // To make uTP connection bidirectional, we want to always acknowledge the received packet - if self.state == SocketState::SynSent { + if self.state == StreamState::SynSent { self.ack_nr = packet.seq_nr(); } else { // Only acknowledge this if this follows the last one, else do it when we advance the send @@ -755,8 +824,9 @@ impl UtpSocket { } // Reset connection if connection id doesn't match and this isn't a SYN + if packet.get_type() != PacketType::Syn - && self.state != SocketState::SynSent + && self.state != StreamState::SynSent && !(packet.connection_id() == self.sender_connection_id || packet.connection_id() == self.receiver_connection_id) { @@ -772,13 +842,13 @@ impl UtpSocket { match (self.state, packet.get_type()) { // New connection, when we receive SYN packet, respond with STATE packet - (SocketState::Uninitialized, PacketType::Syn) => { + (StreamState::Uninitialized, PacketType::Syn) => { self.connected_to = src; self.ack_nr = packet.seq_nr(); self.seq_nr = rand::random(); self.receiver_connection_id = packet.connection_id() + 1; self.sender_connection_id = packet.connection_id(); - self.state = SocketState::Connected; + self.state = StreamState::Connected; self.last_dropped = self.ack_nr; let reply = self.prepare_reply(packet, PacketType::State); @@ -792,29 +862,29 @@ impl UtpSocket { // we want to forcibly terminate the connection (_, PacketType::Syn) => Ok(Some(self.prepare_reply(packet, PacketType::Reset))), // When SYN is send and we receive STATE, do not reply - (SocketState::SynSent, PacketType::State) => { + (StreamState::SynSent, PacketType::State) => { self.connected_to = src; self.ack_nr = packet.seq_nr() - 1; self.seq_nr += 1; - self.state = SocketState::Connected; + self.state = StreamState::Connected; self.last_acked = packet.ack_nr(); self.last_acked_timestamp = now_microseconds(); Ok(None) } // To make uTP connection bidirectional, we also can expect DATA packet if state is SynSent - (SocketState::SynSent, PacketType::Data) => Ok(self.handle_data_packet(packet)), - // Handle data packet if socket state is `Connected` or `FinSent` and packet type is DATA - (SocketState::Connected, PacketType::Data) - | (SocketState::FinSent, PacketType::Data) => Ok(self.handle_data_packet(packet)), - // Handle state packet if socket state is `Connected` and packet type is STATE - (SocketState::Connected, PacketType::State) => { + (StreamState::SynSent, PacketType::Data) => Ok(self.handle_data_packet(packet)), + // Handle data packet if stream state is `Connected` or `FinSent` and packet type is DATA + (StreamState::Connected, PacketType::Data) + | (StreamState::FinSent, PacketType::Data) => Ok(self.handle_data_packet(packet)), + // Handle state packet if stream state is `Connected` and packet type is STATE + (StreamState::Connected, PacketType::State) => { self.handle_state_packet(packet).await; Ok(None) } // Handle FIN packet. Check if all send packets are acknowledged. - (SocketState::Connected, PacketType::Fin) - | (SocketState::FinSent, PacketType::Fin) - | (SocketState::SynSent, PacketType::Fin) => { + (StreamState::Connected, PacketType::Fin) + | (StreamState::FinSent, PacketType::Fin) + | (StreamState::SynSent, PacketType::Fin) => { if packet.ack_nr() < self.seq_nr { debug!("FIN received but there are missing acknowledgements for sent packets"); } @@ -836,16 +906,19 @@ impl UtpSocket { } // Give up, the remote peer might not care about our missing packets - self.state = SocketState::Closed; + self.state = StreamState::Closed; + self.emit_close_event(); + Ok(Some(reply)) } - // Confirm with STATE packet when socket state is `Closed` and we receive FIN packet - (SocketState::Closed, PacketType::Fin) => { + // Confirm with STATE packet when stream state is `Closed` and we receive FIN packet + (StreamState::Closed, PacketType::Fin) => { Ok(Some(self.prepare_reply(packet, PacketType::State))) } - (SocketState::FinSent, PacketType::State) => { + (StreamState::FinSent, PacketType::State) => { if packet.ack_nr() == self.seq_nr { - self.state = SocketState::Closed; + self.state = StreamState::Closed; + self.emit_close_event(); } else { self.handle_state_packet(packet).await; } @@ -853,7 +926,19 @@ impl UtpSocket { } // Reset connection when receiving RESET packet (_, PacketType::Reset) => { - self.state = SocketState::ResetReceived; + self.state = StreamState::ResetReceived; + // Emit stream state event to UtpListener + if let Some(listener_tx) = self.event_tx.clone() { + let conn_key = self.get_conn_key(); + + if let Err(err) = listener_tx.send(UtpStreamEvent::Reset( + self.protocol_id.clone(), + self.stream_id.clone(), + conn_key, + )) { + error!("Unable to send uTP RESET event to uTP listener: {err}"); + } + } Err(anyhow!("Connection reset by remote peer")) } (state, ty) => { @@ -864,6 +949,33 @@ impl UtpSocket { } } + /// Emit stream state event to UtpListener + fn emit_close_event(&mut self) { + if let Some(listener_tx) = self.event_tx.clone() { + let conn_key = self.get_conn_key(); + + if let Err(err) = listener_tx.send(UtpStreamEvent::Closed( + self.recv_data_stream.clone(), + self.protocol_id.clone(), + self.stream_id.clone(), + conn_key, + )) { + error!("Unable to send uTP CLOSED event to uTP listener: {err}"); + } + } + } + + /// Get connection key used in uTP listener to store active uTP connections + fn get_conn_key(&self) -> ConnectionKey { + let conn_id = match self.stream_id { + UtpStreamId::FindContentStream => self.receiver_connection_id, + UtpStreamId::ContentStream(_) => self.sender_connection_id, + UtpStreamId::OfferStream => self.receiver_connection_id, + UtpStreamId::AcceptStream(_) => self.sender_connection_id, + }; + ConnectionKey::new(self.connected_to.node_id(), conn_id) + } + fn prepare_reply(&self, original: &Packet, t: PacketType) -> Packet { let mut resp = Packet::new(); resp.set_type(t); @@ -881,14 +993,14 @@ impl UtpSocket { fn handle_data_packet(&mut self, packet: &Packet) -> Option { // We increase packet seq_nr if we are going to send DATA packet right after SYN-ACK. - if self.state == SocketState::SynSent { + if self.state == StreamState::SynSent { self.seq_nr += 1; - self.state = SocketState::Connected + self.state = StreamState::Connected } // If a FIN was previously sent, reply with a FIN packet acknowledging the received packet. let packet_type = match self.state { - SocketState::FinSent => PacketType::Fin, + StreamState::FinSent => PacketType::Fin, _ => PacketType::State, }; @@ -1106,15 +1218,15 @@ impl UtpSocket { return Ok((read, self.connected_to.clone())); } - // If the socket received a reset packet and all data has been flushed, then it can't + // If the stream received a reset packet and all data has been flushed, then it can't // receive anything else - if self.state == SocketState::ResetReceived { + if self.state == StreamState::ResetReceived { return Err(anyhow!("Connection reset by remote peer")); } loop { - // A closed socket with no pending data can only "read" 0 new bytes. - if self.state == SocketState::Closed { + // A closed stream with no pending data can only "read" 0 new bytes. + if self.state == StreamState::Closed { return Ok((0, self.connected_to.clone())); } @@ -1250,7 +1362,7 @@ impl UtpSocket { } } - /// Inserts a packet into the socket's buffer. + /// Inserts a packet into the stream's buffer. /// /// The packet is inserted in such a way that the packets in the buffer are sorted according to /// their sequence number in ascending order. This allows storing packets that were received out @@ -1290,10 +1402,10 @@ impl UtpSocket { /// This method allows both peers to receive all packets still in /// flight. pub async fn close(&mut self) -> anyhow::Result<()> { - // Nothing to do if the socket's already closed or not connected - if self.state == SocketState::Closed - || self.state == SocketState::Uninitialized - || self.state == SocketState::SynSent + // Nothing to do if the stream's already closed or not connected + if self.state == StreamState::Closed + || self.state == StreamState::Uninitialized + || self.state == StreamState::SynSent { return Ok(()); } @@ -1324,11 +1436,11 @@ impl UtpSocket { } debug!("CLosing connection, sent {:?}", packet); - self.state = SocketState::FinSent; + self.state = StreamState::FinSent; // Receive JAKE let mut buf = [0; BUF_SIZE]; - while self.state != SocketState::Closed { + while self.state != StreamState::Closed { self.recv(&mut buf).await?; } @@ -1358,8 +1470,9 @@ mod tests { utils::node_id::generate_random_remote_enr, utp::{ packets::{Packet, PacketType}, - stream::{SocketState, UtpSocket, BUF_SIZE}, + stream::{StreamState, UtpStream, BUF_SIZE}, time::now_microseconds, + trin_helpers::UtpStreamId, }, }; use discv5::Discv5Event; @@ -1377,7 +1490,7 @@ mod tests { BASE_PORT + NEXT_OFFSET.fetch_add(1, Ordering::Relaxed) as u16 } - async fn server_setup() -> UtpSocket { + async fn server_setup() -> UtpStream { let ip_addr = socket::find_assigned_ip().expect("Could not find an IP for local connections"); let port = next_test_port(); @@ -1392,14 +1505,20 @@ mod tests { let discv5 = Arc::new(discv5); - let conn = UtpSocket::new(Arc::clone(&discv5), enr); + let conn = UtpStream::new( + Arc::clone(&discv5), + enr, + ProtocolId::History, + UtpStreamId::OfferStream, + None, + ); // TODO: Create `Discv5Socket` struct to encapsulate all socket logic spawn_socket_recv(Arc::clone(&discv5), conn.clone()); conn } - async fn client_setup(connected_to: Enr) -> (Enr, UtpSocket) { + async fn client_setup(connected_to: Enr) -> (Enr, UtpStream) { let port = next_test_port(); let matching_ip = connected_to.ip().unwrap(); let config = PortalnetConfig { @@ -1412,13 +1531,19 @@ mod tests { let discv5 = Arc::new(discv5); - let conn = UtpSocket::new(Arc::clone(&discv5), connected_to); + let conn = UtpStream::new( + Arc::clone(&discv5), + connected_to, + ProtocolId::History, + UtpStreamId::OfferStream, + None, + ); spawn_socket_recv(Arc::clone(&discv5), conn.clone()); (discv5.local_enr(), conn) } - fn spawn_socket_recv(discv5: Arc, conn: UtpSocket) { + fn spawn_socket_recv(discv5: Arc, conn: UtpStream) { tokio::spawn(async move { let mut receiver = discv5.discv5.event_stream().await.unwrap(); while let Some(event) = receiver.recv().await { @@ -1455,7 +1580,7 @@ mod tests { let initial_connection_id: u16 = rand::random(); let sender_connection_id = initial_connection_id + 1; let (_, client_enr) = generate_random_remote_enr(); - let mut socket = server_setup().await; + let mut stream = server_setup().await; // --------------------------------- // Test connection setup - SYN packet @@ -1466,7 +1591,7 @@ mod tests { packet.set_connection_id(initial_connection_id); // Do we have a response? - let response = socket.handle_packet(&packet, client_enr.clone()).await; + let response = stream.handle_packet(&packet, client_enr.clone()).await; assert!(response.is_ok()); let response = response.unwrap(); assert!(response.is_some()); @@ -1496,7 +1621,7 @@ mod tests { packet.set_seq_nr(old_packet.seq_nr() + 1); packet.set_ack_nr(old_response.seq_nr()); - let response = socket.handle_packet(&packet, client_enr.clone()).await; + let response = stream.handle_packet(&packet, client_enr.clone()).await; assert!(response.is_ok()); let response = response.unwrap(); assert!(response.is_some()); @@ -1529,7 +1654,7 @@ mod tests { packet.set_seq_nr(old_packet.seq_nr() + 1); packet.set_ack_nr(old_response.seq_nr()); - let response = socket.handle_packet(&packet, client_enr).await; + let response = stream.handle_packet(&packet, client_enr).await; assert!(response.is_ok()); let response = response.unwrap(); assert!(response.is_some()); @@ -1553,7 +1678,7 @@ mod tests { // Boilerplate test setup let initial_connection_id: u16 = rand::random(); let (_, client_enr) = generate_random_remote_enr(); - let mut socket = server_setup().await; + let mut stream = server_setup().await; // Establish connection let mut packet = Packet::new(); @@ -1561,7 +1686,7 @@ mod tests { packet.set_type(PacketType::Syn); packet.set_connection_id(initial_connection_id); - let response = socket.handle_packet(&packet, client_enr.clone()).await; + let response = stream.handle_packet(&packet, client_enr.clone()).await; assert!(response.is_ok()); let response = response.unwrap(); assert!(response.is_some()); @@ -1579,19 +1704,19 @@ mod tests { packet.set_seq_nr(old_packet.seq_nr() + 1); packet.set_ack_nr(old_response.seq_nr()); - let response = socket.handle_packet(&packet, client_enr.clone()).await; + let response = stream.handle_packet(&packet, client_enr.clone()).await; assert!(response.is_ok()); let response = response.unwrap(); assert!(response.is_none()); // Send a second keepalive packet, identical to the previous one - let response = socket.handle_packet(&packet, client_enr.clone()).await; + let response = stream.handle_packet(&packet, client_enr.clone()).await; assert!(response.is_ok()); let response = response.unwrap(); assert!(response.is_none()); - // Mark socket as closed - socket.state = SocketState::Closed; + // Mark stream as closed + stream.state = StreamState::Closed; } #[tokio::test] @@ -1599,7 +1724,7 @@ mod tests { // Boilerplate test setup let initial_connection_id: u16 = rand::random(); let (_, client_enr) = generate_random_remote_enr(); - let mut socket = server_setup().await; + let mut stream = server_setup().await; // Establish connection let mut packet = Packet::new(); @@ -1607,7 +1732,7 @@ mod tests { packet.set_type(PacketType::Syn); packet.set_connection_id(initial_connection_id); - let response = socket.handle_packet(&packet, client_enr.clone()).await; + let response = stream.handle_packet(&packet, client_enr.clone()).await; assert!(response.is_ok()); let response = response.unwrap(); assert!(response.is_some()); @@ -1621,7 +1746,7 @@ mod tests { packet.set_type(PacketType::State); packet.set_connection_id(new_connection_id); - let response = socket.handle_packet(&packet, client_enr).await; + let response = stream.handle_packet(&packet, client_enr).await; assert!(response.is_ok()); let response = response.unwrap(); assert!(response.is_some()); @@ -1630,8 +1755,8 @@ mod tests { assert_eq!(response.get_type(), PacketType::Reset); assert_eq!(response.ack_nr(), packet.seq_nr()); - // Mark socket as closed - socket.state = SocketState::Closed; + // Mark stream as closed + stream.state = StreamState::Closed; } #[tokio::test] @@ -1639,7 +1764,7 @@ mod tests { // Boilerplate test setup let initial_connection_id: u16 = rand::random(); let (_, client_enr) = generate_random_remote_enr(); - let mut socket = server_setup().await; + let mut stream = server_setup().await; // Establish connection let mut packet = Packet::new(); @@ -1647,7 +1772,7 @@ mod tests { packet.set_type(PacketType::Syn); packet.set_connection_id(initial_connection_id); - let response = socket.handle_packet(&packet, client_enr.clone()).await; + let response = stream.handle_packet(&packet, client_enr.clone()).await; assert!(response.is_ok()); let response = response.unwrap(); assert!(response.is_some()); @@ -1675,20 +1800,20 @@ mod tests { window.push(packet); // Send packets in reverse order - let response = socket.handle_packet(&window[1], client_enr.clone()).await; + let response = stream.handle_packet(&window[1], client_enr.clone()).await; assert!(response.is_ok()); let response = response.unwrap(); assert!(response.is_some()); let response = response.unwrap(); assert!(response.ack_nr() != window[1].seq_nr()); - let response = socket.handle_packet(&window[0], client_enr).await; + let response = stream.handle_packet(&window[0], client_enr).await; assert!(response.is_ok()); let response = response.unwrap(); assert!(response.is_some()); - // Mark socket as closed - socket.state = SocketState::Closed; + // Mark stream as closed + stream.state = StreamState::Closed; } #[tokio::test] @@ -1703,20 +1828,20 @@ mod tests { (minute_in_microseconds + 2, 19), (minute_in_microseconds + 3, 9), ]; - let mut socket = server_setup().await; + let mut stream = server_setup().await; for (timestamp, delay) in samples { - socket.update_base_delay(delay.into(), ((timestamp + delay) as u32).into()); + stream.update_base_delay(delay.into(), ((timestamp + delay) as u32).into()); } let expected = vec![7i64, 9i64] .into_iter() .map(Into::into) .collect::>(); - let actual = socket.base_delays.iter().cloned().collect::>(); + let actual = stream.base_delays.iter().cloned().collect::>(); assert_eq!(expected, actual); assert_eq!( - socket.min_base_delay(), + stream.min_base_delay(), expected.iter().min().cloned().unwrap_or_default() ); } @@ -1814,40 +1939,40 @@ mod tests { #[tokio::test] async fn test_sorted_buffer_insertion() { - let mut socket = server_setup().await; + let mut stream = server_setup().await; let mut packet = Packet::new(); packet.set_seq_nr(1); - assert!(socket.incoming_buffer.is_empty()); + assert!(stream.incoming_buffer.is_empty()); - socket.insert_into_buffer(packet.clone()); - assert_eq!(socket.incoming_buffer.len(), 1); + stream.insert_into_buffer(packet.clone()); + assert_eq!(stream.incoming_buffer.len(), 1); packet.set_seq_nr(2); packet.set_timestamp(128.into()); - socket.insert_into_buffer(packet.clone()); - assert_eq!(socket.incoming_buffer.len(), 2); - assert_eq!(socket.incoming_buffer[1].seq_nr(), 2); - assert_eq!(socket.incoming_buffer[1].timestamp(), 128.into()); + stream.insert_into_buffer(packet.clone()); + assert_eq!(stream.incoming_buffer.len(), 2); + assert_eq!(stream.incoming_buffer[1].seq_nr(), 2); + assert_eq!(stream.incoming_buffer[1].timestamp(), 128.into()); packet.set_seq_nr(3); packet.set_timestamp(256.into()); - socket.insert_into_buffer(packet.clone()); - assert_eq!(socket.incoming_buffer.len(), 3); - assert_eq!(socket.incoming_buffer[2].seq_nr(), 3); - assert_eq!(socket.incoming_buffer[2].timestamp(), 256.into()); + stream.insert_into_buffer(packet.clone()); + assert_eq!(stream.incoming_buffer.len(), 3); + assert_eq!(stream.incoming_buffer[2].seq_nr(), 3); + assert_eq!(stream.incoming_buffer[2].timestamp(), 256.into()); // Replacing a packet with a more recent version doesn't work packet.set_seq_nr(2); packet.set_timestamp(456.into()); - socket.insert_into_buffer(packet.clone()); - assert_eq!(socket.incoming_buffer.len(), 3); - assert_eq!(socket.incoming_buffer[1].seq_nr(), 2); - assert_eq!(socket.incoming_buffer[1].timestamp(), 128.into()); + stream.insert_into_buffer(packet.clone()); + assert_eq!(stream.incoming_buffer.len(), 3); + assert_eq!(stream.incoming_buffer[1].seq_nr(), 2); + assert_eq!(stream.incoming_buffer[1].timestamp(), 128.into()); } #[tokio::test] @@ -1856,8 +1981,8 @@ mod tests { let mut server = server_setup().await; let (enr, mut client) = client_setup(server.connected_to.clone()).await; - assert_eq!(server.state, SocketState::Uninitialized); - assert_eq!(client.state, SocketState::Uninitialized); + assert_eq!(server.state, StreamState::Uninitialized); + assert_eq!(client.state, StreamState::Uninitialized); // Check proper difference in client's send connection id and receive connection id assert_eq!( @@ -1872,14 +1997,14 @@ mod tests { client.recv_from(&mut buf).await.unwrap(); // Expect SYN packet - assert_eq!(client.state, SocketState::Connected); + assert_eq!(client.state, StreamState::Connected); // After establishing a new connection, the server's ids are a mirror of the client's. assert_eq!( server.receiver_connection_id, server.sender_connection_id + 1 ); - assert_eq!(server.state, SocketState::Connected); + assert_eq!(server.state, StreamState::Connected); let mut packet = Packet::with_payload(&[1, 2, 3]); packet.set_wnd_size(BUF_SIZE as u32); diff --git a/trin-core/src/utp/trin_helpers.rs b/trin-core/src/utp/trin_helpers.rs index 9835b6fab..7341ec14b 100644 --- a/trin-core/src/utp/trin_helpers.rs +++ b/trin-core/src/utp/trin_helpers.rs @@ -1,7 +1,7 @@ #![allow(dead_code)] // These are just some Trin helper functions -use crate::portalnet::types::messages::Content; +use crate::portalnet::types::{content_key::RawContentKey, messages::ByteList}; use ssz_derive::{Decode, Encode}; // These Utp impl are related to sending messages over uTP not the implementation itself or stream @@ -51,14 +51,17 @@ pub struct UtpAccept { pub message: Vec<(Vec, Vec)>, } -// This is not in a spec, this is just for internally tracking for what portal message -// negotiated the uTP stream -#[derive(Debug, Clone)] -pub enum UtpMessageId { +/// Used to track which stream an overlay request corresponds with +#[derive(Debug, Clone, PartialEq)] +pub enum UtpStreamId { + /// Stream id to initialize FindContent uTP connection and to listen for content payload FindContentStream, - FindContentData(Content), + /// Stream id to listen for incoming FindContent connection and to send back the content data to the requester + ContentStream(ByteList), + /// Stream id to send requested content from received ACCEPT response OfferStream, - AcceptStream(Vec>), + /// Stream id to listen for OFFER uTP payload. Contains requested content keys. + AcceptStream(Vec), } #[cfg(test)] diff --git a/trin-core/tests/overlay.rs b/trin-core/tests/overlay.rs index 09f7fee4e..de726fee7 100644 --- a/trin-core/tests/overlay.rs +++ b/trin-core/tests/overlay.rs @@ -23,6 +23,7 @@ use tokio::{ sync::{mpsc, mpsc::unbounded_channel}, time::{self, Duration}, }; + use trin_core::utp::stream::UtpListenerRequest; async fn init_overlay( diff --git a/trin-core/tests/utp_listener.rs b/trin-core/tests/utp_listener.rs new file mode 100644 index 000000000..138642e8b --- /dev/null +++ b/trin-core/tests/utp_listener.rs @@ -0,0 +1,152 @@ +use discv5::Discv5Event; +use ntest::timeout; +use ssz::Encode; +use std::{ + net::{IpAddr, Ipv4Addr, SocketAddr}, + str::FromStr, + sync::Arc, +}; +use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; +use trin_core::{ + portalnet::{ + discovery::Discovery, + types::messages::{PortalnetConfig, ProtocolId}, + Enr, + }, + utp::{ + stream::{UtpListener, UtpListenerEvent, UtpListenerRequest, UtpStream, BUF_SIZE}, + trin_helpers::{ + UtpAccept, UtpMessage, + UtpStreamId::{AcceptStream, OfferStream}, + }, + }, +}; + +fn next_test_port() -> u16 { + use std::sync::atomic::{AtomicUsize, Ordering}; + // static here allow us to modify the global value and AtomicUsize can be shared safely between threads + static NEXT_OFFSET: AtomicUsize = AtomicUsize::new(0); + const BASE_PORT: u16 = 11600; + BASE_PORT + NEXT_OFFSET.fetch_add(1, Ordering::Relaxed) as u16 +} + +/// Spawn uTP listener instance and start discv5 event handler +async fn spawn_utp_listener() -> ( + Enr, + UnboundedSender, + UnboundedReceiver, +) { + let ip_addr = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)); + let port = next_test_port(); + let config = PortalnetConfig { + listen_port: port, + external_addr: Some(SocketAddr::new(ip_addr, port)), + ..Default::default() + }; + let mut discv5 = Discovery::new(config).unwrap(); + let enr = discv5.discv5.local_enr(); + discv5.start().await.unwrap(); + + let discv5 = Arc::new(discv5); + + let (utp_event_tx, utp_listener_tx, utp_listener_rx, mut utp_listener) = + UtpListener::new(Arc::clone(&discv5)); + + tokio::spawn(async move { + let mut receiver = discv5.discv5.event_stream().await.unwrap(); + while let Some(event) = receiver.recv().await { + match event { + Discv5Event::TalkRequest(request) => { + let protocol_id = + ProtocolId::from_str(&hex::encode_upper(request.protocol())).unwrap(); + + match protocol_id { + ProtocolId::Utp => utp_event_tx.send(request).unwrap(), + _ => continue, + } + } + _ => continue, + } + } + }); + tokio::spawn(async move { utp_listener.start().await }); + + (enr, utp_listener_tx, utp_listener_rx) +} + +#[tokio::test] +#[timeout(100)] +/// Simulate simple OFFER -> ACCEPT uTP payload transfer +async fn utp_listener_events() { + let protocol_id = ProtocolId::History; + + // Initialize offer uTP listener + let (enr_offer, listener_tx_offer, mut listener_rx_offer) = spawn_utp_listener().await; + // Initialize acceptor uTP listener + let (enr_accept, listener_tx_accept, mut listener_rx_accept) = spawn_utp_listener().await; + + // Prepare to receive uTP stream from the offer node + let (requested_content_key, requested_content_value) = (vec![1], vec![1, 1, 1, 1]); + let stream_id = AcceptStream(vec![requested_content_key.clone()]); + let conn_id = 1234; + let request = UtpListenerRequest::InitiateConnection( + enr_offer.clone(), + protocol_id.clone(), + stream_id, + conn_id, + ); + listener_tx_accept.send(request).unwrap(); + + // Initialise an OFFER stream and send handshake uTP packet to the acceptor node + let stream_id = OfferStream; + let (tx, rx) = tokio::sync::oneshot::channel::(); + let offer_request = UtpListenerRequest::Connect( + conn_id, + enr_accept.clone(), + protocol_id.clone(), + stream_id, + tx, + ); + listener_tx_offer.send(offer_request).unwrap(); + + // Handle STATE packet for SYN handshake in the offer node + let mut conn = rx.await.unwrap(); + assert_eq!(conn.connected_to, enr_accept); + + let mut buf = [0; BUF_SIZE]; + conn.recv(&mut buf).await.unwrap(); + + // Send content key with content value to the acceptor node + let content_items = vec![( + requested_content_key.clone(), + requested_content_value.clone(), + )]; + + let content_message = UtpAccept { + message: content_items, + }; + + let utp_payload = UtpMessage::new(content_message.as_ssz_bytes()).encode(); + let expected_utp_payload = utp_payload.clone(); + + tokio::spawn(async move { + // Send the content to the acceptor over a uTP stream + conn.send_to(&utp_payload).await.unwrap(); + // Close uTP connection + conn.close().await.unwrap(); + }); + + // Check if the expected uTP listener events match the events in offer and accept nodes + let offer_event = listener_rx_offer.recv().await.unwrap(); + let expected_offer_event = + UtpListenerEvent::ClosedStream(vec![], protocol_id.clone(), OfferStream); + assert_eq!(offer_event, expected_offer_event); + + let accept_event = listener_rx_accept.recv().await.unwrap(); + let expected_accept_event = UtpListenerEvent::ClosedStream( + expected_utp_payload, + protocol_id.clone(), + AcceptStream(vec![requested_content_key]), + ); + assert_eq!(accept_event, expected_accept_event); +} diff --git a/trin-history/src/lib.rs b/trin-history/src/lib.rs index 401327bd8..09cf3c2b4 100644 --- a/trin-history/src/lib.rs +++ b/trin-history/src/lib.rs @@ -51,7 +51,8 @@ pub async fn main() -> Result<(), Box> { tokio::spawn(Arc::clone(&discovery).bucket_refresh_lookup()); // Initialize and spawn UTP listener - let (utp_sender, overlay_sender, mut utp_listener) = UtpListener::new(Arc::clone(&discovery)); + let (utp_sender, overlay_sender, _, mut utp_listener) = + UtpListener::new(Arc::clone(&discovery)); tokio::spawn(async move { utp_listener.start().await }); let (history_event_tx, history_event_rx) = mpsc::unbounded_channel::(); diff --git a/trin-state/src/lib.rs b/trin-state/src/lib.rs index 87cf67b8b..747c4a113 100644 --- a/trin-state/src/lib.rs +++ b/trin-state/src/lib.rs @@ -49,7 +49,8 @@ pub async fn main() -> Result<(), Box> { tokio::spawn(Arc::clone(&discovery).bucket_refresh_lookup()); // Initialize and spawn UTP listener - let (utp_sender, overlay_sender, mut utp_listener) = UtpListener::new(Arc::clone(&discovery)); + let (utp_sender, overlay_sender, _, mut utp_listener) = + UtpListener::new(Arc::clone(&discovery)); tokio::spawn(async move { utp_listener.start().await }); let (state_event_tx, state_event_rx) = mpsc::unbounded_channel::(); diff --git a/utp-testing/src/main.rs b/utp-testing/src/main.rs index 51444491e..1bdf18551 100644 --- a/utp-testing/src/main.rs +++ b/utp-testing/src/main.rs @@ -1,7 +1,7 @@ use discv5::{Discv5Event, TalkRequest}; use log::debug; use std::{net::SocketAddr, str::FromStr, sync::Arc}; -use tokio::sync::mpsc::UnboundedSender; +use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; use trin_core::{ portalnet::{ discovery::Discovery, @@ -10,29 +10,31 @@ use trin_core::{ }, socket, utp::{ - stream::{UtpListener, UtpListenerRequest, UtpSocket}, - trin_helpers::UtpMessage, + stream::{UtpListener, UtpListenerEvent, UtpListenerRequest, UtpStream}, + trin_helpers::{UtpMessage, UtpStreamId}, }, }; +#[allow(dead_code)] pub struct TestApp { discovery: Arc, utp_listener_tx: UnboundedSender, + utp_listener_rx: UnboundedReceiver, utp_event_tx: UnboundedSender, } impl TestApp { async fn send_utp_request(&mut self, conn_id: u16, payload: Vec, enr: Enr) { - let _ = self - .utp_listener_tx - .send(UtpListenerRequest::OfferStream(conn_id)); - - let (tx, rx) = tokio::sync::oneshot::channel::>(); - let _ = self - .utp_listener_tx - .send(UtpListenerRequest::Connect(conn_id, enr.node_id(), tx)); + let (tx, rx) = tokio::sync::oneshot::channel::(); + let _ = self.utp_listener_tx.send(UtpListenerRequest::Connect( + conn_id, + enr, + ProtocolId::History, + UtpStreamId::OfferStream, + tx, + )); - let mut conn = rx.await.unwrap().unwrap(); + let mut conn = rx.await.unwrap(); let mut buf = [0; 1500]; conn.recv(&mut buf).await.unwrap(); @@ -70,17 +72,16 @@ impl TestApp { }); } - async fn prepare_to_receive(&self, conn_id: u16) { - // listen for incoming connection request on conn_id, as part of utp handshake + async fn prepare_to_receive(&self, source: Enr, conn_id: u16) { + // Listen for incoming connection request on conn_id, as part of uTP handshake let _ = self .utp_listener_tx - .send(UtpListenerRequest::OfferStream(conn_id)); - - // also listen on conn_id + 1 because this is the actual receive path for acceptor - let conn_id_recv = conn_id.wrapping_add(1); - let _ = self - .utp_listener_tx - .send(UtpListenerRequest::OfferStream(conn_id_recv)); + .send(UtpListenerRequest::InitiateConnection( + source, + ProtocolId::History, + UtpStreamId::AcceptStream(vec![vec![]]), + conn_id, + )); } } @@ -112,7 +113,9 @@ async fn main() { .await .unwrap(); - server.prepare_to_receive(connection_id).await; + server + .prepare_to_receive(client.discovery.discv5.local_enr(), connection_id) + .await; client .send_utp_request(connection_id, payload, server_enr) @@ -134,13 +137,14 @@ async fn run_test_app(discv5_port: u16, socket_addr: SocketAddr) -> TestApp { discovery.start().await.unwrap(); let discovery = Arc::new(discovery); - let (utp_event_sender, utp_listener_tx, mut utp_listener) = + let (utp_event_sender, utp_listener_tx, utp_listener_rx, mut utp_listener) = UtpListener::new(Arc::clone(&discovery)); tokio::spawn(async move { utp_listener.start().await }); let test_app = TestApp { discovery, utp_listener_tx, + utp_listener_rx, utp_event_tx: utp_event_sender, };