Skip to content

Commit 13657f7

Browse files
committed
Process all closed uTP streams in UtpListener and handle the payload in overlay service
1 parent 6b8888f commit 13657f7

File tree

4 files changed

+100
-41
lines changed

4 files changed

+100
-41
lines changed

newsfragments/325.added.md

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Process all closed uTP streams in UtpListener and pass the payload to overlay service.

trin-core/src/portalnet/overlay_service.rs

+44-1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use crate::{
1919
utp::stream::UtpListenerRequest,
2020
};
2121

22+
use crate::utp::{stream::UtpSocket, trin_helpers::UtpStreamId};
2223
use delay_map::HashSetDelay;
2324
use discv5::{
2425
enr::NodeId,
@@ -45,6 +46,10 @@ pub const FIND_CONTENT_MAX_NODES: usize = 32;
4546
/// With even distribution assumptions, 2**17 is enough to put each node (estimating 100k nodes,
4647
/// which is more than 10x the ethereum mainnet node count) into a unique bucket by the 17th bucket index.
4748
const EXPECTED_NON_EMPTY_BUCKETS: usize = 17;
49+
/// Bucket refresh lookup interval in seconds
50+
const BUCKET_REFRESH_INTERVAL: u64 = 60;
51+
/// Process uTP streams interval in milliseconds
52+
const PROCESS_UTP_STREAMS_INTERVAL: u64 = 20;
4853

4954
/// An overlay request error.
5055
#[derive(Clone, Error, Debug)]
@@ -377,7 +382,10 @@ impl<TContentKey: OverlayContentKey + Send, TMetric: Metric + Send>
377382
/// Bucket maintenance: Maintain the routing table (more info documented above function).
378383
async fn start(&mut self) {
379384
// Construct bucket refresh interval
380-
let mut bucket_refresh_interval = tokio::time::interval(Duration::from_secs(60));
385+
let mut bucket_refresh_interval =
386+
tokio::time::interval(Duration::from_secs(BUCKET_REFRESH_INTERVAL));
387+
let mut process_utp_streams_interval =
388+
tokio::time::interval(Duration::from_millis(PROCESS_UTP_STREAMS_INTERVAL));
381389

382390
loop {
383391
tokio::select! {
@@ -408,6 +416,22 @@ impl<TContentKey: OverlayContentKey + Send, TMetric: Metric + Send>
408416
self.peers_to_ping.insert(node_id);
409417
}
410418
}
419+
_ = process_utp_streams_interval.tick() => {
420+
let (tx, rx) = tokio::sync::oneshot::channel::<Vec<(UtpSocket, UtpStreamId)>>();
421+
422+
// Send request to uTP listener to process all closed uTP streams and wait for response
423+
if let Err(err) = self.utp_listener_tx.send(UtpListenerRequest::ProcessClosedStreams(tx)) {
424+
error!("Unable to send ProcessClosedStreams request to uTP listener: {err}");
425+
continue
426+
}
427+
428+
match rx.await {
429+
Ok(streams) => {
430+
self.handle_closed_utp_streams(streams);
431+
}
432+
Err(err) => error!("Unable to receive ProcessClosedStreams response from uTP listener: {err}")
433+
}
434+
}
411435
_ = OverlayService::<TContentKey, TMetric>::bucket_maintenance_poll(self.protocol.clone(), &self.kbuckets) => {}
412436
_ = bucket_refresh_interval.tick() => {
413437
debug!("[{:?}] Overlay bucket refresh lookup", self.protocol);
@@ -700,6 +724,19 @@ impl<TContentKey: OverlayContentKey + Send, TMetric: Metric + Send>
700724
Ok(accept)
701725
}
702726

727+
/// Handle all closed uTP streams, currently we process only AcceptStream here.
728+
/// FindContent payload is processed explicitly when we send FindContent request.
729+
fn handle_closed_utp_streams(&self, streams: Vec<(UtpSocket, UtpStreamId)>) {
730+
for stream in streams {
731+
match stream {
732+
(socket, UtpStreamId::AcceptStream(content_keys)) => {
733+
self.process_accept_utp_payload(content_keys, socket.recv_data_stream);
734+
}
735+
_ => {}
736+
}
737+
}
738+
}
739+
703740
/// Sends a TALK request via Discovery v5 to some destination node.
704741
fn send_talk_req(&self, request: Request, request_id: OverlayRequestId, destination: Enr) {
705742
let discovery = Arc::clone(&self.discovery);
@@ -731,6 +768,12 @@ impl<TContentKey: OverlayContentKey + Send, TMetric: Metric + Send>
731768
});
732769
}
733770

771+
/// Process accepted uTP payload of the OFFER?ACCEPT stream
772+
fn process_accept_utp_payload(&self, content_keys: Vec<Vec<u8>>, payload: Vec<u8>) {
773+
// TODO: Verify the payload, store the content and propagate gossip.
774+
warn!("DEBUG: Processing content keys: {content_keys:?}, with payload: {payload:?}");
775+
}
776+
734777
/// Processes an incoming request from some source node.
735778
fn process_incoming_request(&mut self, request: Request, _id: RequestId, source: NodeId) {
736779
// Look up the node in the routing table.

trin-core/src/utp/stream.rs

+52-36
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ use crate::{
2525
utp::{
2626
packets::{ExtensionType, Packet, PacketType, HEADER_SIZE},
2727
time::{now_microseconds, Delay, Timestamp},
28-
trin_helpers::{UtpMessage, UtpMessageId},
28+
trin_helpers::{UtpMessage, UtpStreamId},
2929
util::{abs_diff, ewma, generate_sequential_identifiers},
3030
},
3131
};
@@ -111,6 +111,8 @@ pub enum UtpListenerRequest {
111111
FindContentData(ConnId, ByteList),
112112
/// Request to listen for FindContent stream
113113
FindContentStream(ConnId),
114+
/// Process all streams where uTP socket state is "Closed"
115+
ProcessClosedStreams(oneshot::Sender<Vec<(UtpSocket, UtpStreamId)>>),
114116
/// Request to listen for Offer stream
115117
OfferStream(ConnId),
116118
}
@@ -122,7 +124,7 @@ pub struct UtpListener {
122124
/// Store all active connections
123125
utp_connections: HashMap<ConnectionKey, UtpSocket>,
124126
/// uTP connection ids to listen for
125-
listening: HashMap<ConnId, UtpMessageId>,
127+
listening: HashMap<ConnId, UtpStreamId>,
126128
/// Receiver for uTP events sent from the main portal event handler
127129
utp_event_rx: UnboundedReceiver<TalkRequest>,
128130
/// Receiver for uTP requests sent from the overlay layer
@@ -221,9 +223,9 @@ impl UtpListener {
221223
// TODO: Probably there is a better way with lifetimes to pass the HashMap value to a
222224
// different thread without removing the key and re-adding it.
223225
self.listening
224-
.insert(conn.sender_connection_id, UtpMessageId::FindContentStream);
226+
.insert(conn.sender_connection_id, UtpStreamId::FindContentStream);
225227

226-
if let Some(UtpMessageId::FindContentData(Content(content_data))) =
228+
if let Some(UtpStreamId::FindContentData(Content(content_data))) =
227229
utp_message_id
228230
{
229231
// We want to send uTP data only if the content is Content(ByteList)
@@ -264,12 +266,17 @@ impl UtpListener {
264266
return;
265267
}
266268

269+
let mut result = Vec::new();
270+
267271
let mut buf = [0; BUF_SIZE];
268-
if let Err(msg) = conn.recv(&mut buf).await {
269-
error!("Unable to receive uTP DATA packet: {msg}")
270-
} else {
271-
conn.recv_data_stream
272-
.append(&mut Vec::from(packet.payload()));
272+
match conn.recv(&mut buf).await {
273+
Ok(bytes_read) => {
274+
if let Some(bytes) = bytes_read {
275+
result.extend_from_slice(&buf[..bytes]);
276+
conn.recv_data_stream.append(&mut result);
277+
}
278+
}
279+
Err(err) => error!("Unable to receive uTP DATA packet: {err}"),
273280
}
274281
}
275282
}
@@ -314,24 +321,29 @@ impl UtpListener {
314321
match request {
315322
UtpListenerRequest::FindContentStream(conn_id) => {
316323
self.listening
317-
.insert(conn_id, UtpMessageId::FindContentStream);
324+
.insert(conn_id, UtpStreamId::FindContentStream);
318325
}
319326
UtpListenerRequest::Connect(conn_id, node_id, tx) => {
320327
let conn = self.connect(conn_id, node_id).await;
321328
if tx.send(conn).is_err() {
322-
warn!("Unable to send uTP socket to requester")
329+
error!("Unable to send uTP socket to requester")
323330
};
324331
}
325332
UtpListenerRequest::OfferStream(conn_id) => {
326-
self.listening.insert(conn_id, UtpMessageId::OfferStream);
333+
self.listening.insert(conn_id, UtpStreamId::OfferStream);
327334
}
328335
UtpListenerRequest::FindContentData(conn_id, content) => {
329336
self.listening
330-
.insert(conn_id, UtpMessageId::FindContentData(Content(content)));
337+
.insert(conn_id, UtpStreamId::FindContentData(Content(content)));
331338
}
332339
UtpListenerRequest::AcceptStream(conn_id, accepted_keys) => {
333340
self.listening
334-
.insert(conn_id, UtpMessageId::AcceptStream(accepted_keys));
341+
.insert(conn_id, UtpStreamId::AcceptStream(accepted_keys));
342+
}
343+
UtpListenerRequest::ProcessClosedStreams(tx) => {
344+
if tx.send(self.process_closed_streams()).is_err() {
345+
error!("Unable to send closed uTP streams to requester")
346+
};
335347
}
336348
}
337349
}
@@ -355,28 +367,32 @@ impl UtpListener {
355367
}
356368
}
357369

358-
// https://github.com/ethereum/portal-network-specs/pull/98\
359-
// Currently the way to handle data over uTP isn't finalized yet, so we are going to use the
360-
// handle data on connection closed method, as that seems to be the accepted method for now.
361-
pub async fn process_utp_byte_stream(&mut self) {
362-
let mut utp_connections = self.utp_connections.clone();
363-
for (conn_key, conn) in self.utp_connections.iter_mut() {
364-
if conn.state == SocketState::Closed {
365-
let received_stream = conn.recv_data_stream.clone();
366-
debug!("Received data: with len: {}", received_stream.len());
367-
368-
match self.listening.get(&conn.receiver_connection_id) {
369-
Some(message_type) => {
370-
if let UtpMessageId::AcceptStream(content_keys) = message_type {
371-
// TODO: Implement this with overlay store and decode receiver stream if multiple content values are send
372-
debug!("Store {content_keys:?}, {received_stream:?}");
373-
}
374-
}
375-
_ => warn!("uTP listening HashMap doesn't have uTP stream message type"),
376-
}
377-
utp_connections.remove(conn_key);
378-
}
379-
}
370+
/// Return and cleanup all active uTP streams where socket state is "Closed"
371+
pub fn process_closed_streams(&mut self) -> Vec<(UtpSocket, UtpStreamId)> {
372+
// This seems to be a hot loop, we may need to optimise it and find a better way to filter by closed
373+
// connections without cloning all records. One reasonable way is to use some data-oriented
374+
// design principles like Struct of Arrays vs. Array of Structs.
375+
self.utp_connections
376+
.clone()
377+
.iter()
378+
.filter(|conn| conn.1.state == SocketState::Closed)
379+
.map(|conn| {
380+
// Remove the closed connections from active connections
381+
let receiver_stream_id = self
382+
.listening
383+
.remove(&conn.1.receiver_connection_id)
384+
.expect("Receiver connection id should match active listening connections.");
385+
self.listening
386+
.remove(&conn.1.sender_connection_id)
387+
.expect("Sender connection id should match active listening connections.");
388+
let utp_socket = self
389+
.utp_connections
390+
.remove(conn.0)
391+
.expect("uTP socket should match asctive utp connections.");
392+
393+
(utp_socket, receiver_stream_id)
394+
})
395+
.collect()
380396
}
381397
}
382398

trin-core/src/utp/trin_helpers.rs

+3-4
Original file line numberDiff line numberDiff line change
@@ -51,10 +51,9 @@ pub struct UtpAccept {
5151
pub message: Vec<(Vec<u8>, Vec<u8>)>,
5252
}
5353

54-
// This is not in a spec, this is just for internally tracking for what portal message
55-
// negotiated the uTP stream
56-
#[derive(Debug, Clone)]
57-
pub enum UtpMessageId {
54+
/// Used to track which stream to which overlay request correspond
55+
#[derive(Debug, Clone, PartialEq)]
56+
pub enum UtpStreamId {
5857
FindContentStream,
5958
FindContentData(Content),
6059
OfferStream,

0 commit comments

Comments
 (0)