@@ -299,9 +299,8 @@ impl Network {
299
299
GossipsubEvent :: Message ( peer_id, msg_id, msg) => {
300
300
log:: trace!( "Received message {:?} from peer {:?}: {:?}" , msg_id, peer_id, msg) ;
301
301
for topic in msg. topics . iter ( ) {
302
- if let Some ( output) = state. gossip_topics . get ( & topic) {
303
- // let peer = Self::get_peer(peer_id).unwrap();
304
- output. send ( ( msg, peer) ) ;
302
+ if let Some ( output) = state. gossip_topics . get_mut ( & topic) {
303
+ output. send ( ( msg. clone ( ) , peer_id. clone ( ) ) ) . await . ok ( ) ;
305
304
} else {
306
305
log:: warn!( "Unknown topic hash: {:?}" , topic) ;
307
306
}
@@ -398,7 +397,7 @@ impl NetworkInterface for Network {
398
397
self . events_tx . subscribe ( )
399
398
}
400
399
401
- async fn subscribe < T > ( & self , topic : & T ) -> Box < dyn Stream < Item = ( T :: Item , Arc < Self :: PeerType > ) > + Send >
400
+ async fn subscribe < T > ( & self , topic : & T ) -> Box < dyn Stream < Item = ( T :: Item , PeerId ) > + Send >
402
401
where
403
402
T : Topic + Sync ,
404
403
{
@@ -414,9 +413,9 @@ impl NetworkInterface for Network {
414
413
. await
415
414
. expect ( "Couldn't subscribe to pubsub topic" ) ;
416
415
417
- Box :: new ( rx. map ( |( msg, peer ) | {
418
- let item: <T as Topic >:: Item = Deserialize :: deserialize_from_vec ( & msg. data ) ;
419
- ( item, peer )
416
+ Box :: new ( rx. map ( |( msg, peer_id ) | {
417
+ let item: <T as Topic >:: Item = Deserialize :: deserialize_from_vec ( & msg. data ) . unwrap ( ) ;
418
+ ( item, peer_id )
420
419
} ) )
421
420
}
422
421
0 commit comments