@@ -127,7 +127,7 @@ pub enum NetworkAction {
127
127
} ,
128
128
RegisterTopic {
129
129
topic_hash : TopicHash ,
130
- output : mpsc:: Sender < ( GossipsubMessage , Arc < Peer > ) > ,
130
+ output : mpsc:: Sender < ( GossipsubMessage , PeerId ) > ,
131
131
} ,
132
132
Subscribe {
133
133
topic_name : & ' static str ,
@@ -145,7 +145,7 @@ struct TaskState {
145
145
dht_puts : HashMap < QueryId , oneshot:: Sender < Result < ( ) , NetworkError > > > ,
146
146
dht_gets : HashMap < QueryId , oneshot:: Sender < Result < Vec < u8 > , NetworkError > > > ,
147
147
gossip_sub : HashMap < TopicHash , oneshot:: Sender < TopicHash > > ,
148
- gossip_topics : HashMap < TopicHash , mpsc:: Sender < ( GossipsubMessage , Arc < Peer > ) > > ,
148
+ gossip_topics : HashMap < TopicHash , mpsc:: Sender < ( GossipsubMessage , PeerId ) > > ,
149
149
}
150
150
151
151
pub struct Network {
@@ -304,9 +304,8 @@ impl Network {
304
304
GossipsubEvent :: Message ( peer_id, msg_id, msg) => {
305
305
log:: trace!( "Received message {:?} from peer {:?}: {:?}" , msg_id, peer_id, msg) ;
306
306
for topic in msg. topics . iter ( ) {
307
- if let Some ( output) = state. gossip_topics . get ( & topic) {
308
- // let peer = Self::get_peer(peer_id).unwrap();
309
- output. send ( ( msg, peer) ) ;
307
+ if let Some ( output) = state. gossip_topics . get_mut ( & topic) {
308
+ output. send ( ( msg. clone ( ) , peer_id. clone ( ) ) ) . await . ok ( ) ;
310
309
} else {
311
310
log:: warn!( "Unknown topic hash: {:?}" , topic) ;
312
311
}
@@ -409,7 +408,7 @@ impl NetworkInterface for Network {
409
408
self . events_tx . subscribe ( )
410
409
}
411
410
412
- async fn subscribe < T > ( & self , topic : & T ) -> Box < dyn Stream < Item = ( T :: Item , Arc < Self :: PeerType > ) > + Send >
411
+ async fn subscribe < T > ( & self , topic : & T ) -> Box < dyn Stream < Item = ( T :: Item , PeerId ) > + Send >
413
412
where
414
413
T : Topic + Sync ,
415
414
{
@@ -436,9 +435,9 @@ impl NetworkInterface for Network {
436
435
} )
437
436
. await ;
438
437
439
- Box :: new ( rx. map ( |( msg, peer ) | {
440
- let item: <T as Topic >:: Item = Deserialize :: deserialize_from_vec ( & msg. data ) ;
441
- ( item, peer )
438
+ Box :: new ( rx. map ( |( msg, peer_id ) | {
439
+ let item: <T as Topic >:: Item = Deserialize :: deserialize_from_vec ( & msg. data ) . unwrap ( ) ;
440
+ ( item, peer_id )
442
441
} ) )
443
442
}
444
443
0 commit comments