@@ -492,7 +492,7 @@ type IncomingConnection = (NT.EndPointAddress, IncomingTarget)
492
492
data IncomingTarget =
493
493
Uninit
494
494
| ToProc ProcessId (Weak (CQueue Message ))
495
- | ToChan TypedChannel
495
+ | ToChan SendPortId TypedChannel
496
496
| ToNode
497
497
498
498
data ConnectionState = ConnectionState {
@@ -547,13 +547,17 @@ handleIncomingMessages node = go initConnectionState
547
547
enqueue queue msg -- 'enqueue' is strict
548
548
trace node (MxReceived pid msg)
549
549
go st
550
- Just (_, ToChan (TypedChannel chan')) -> do
550
+ Just (_, ToChan chId (TypedChannel chan')) -> do
551
551
mChan <- deRefWeak chan'
552
552
-- If mChan is Nothing, the process has given up the read end of
553
553
-- the channel and we simply ignore the incoming message
554
- forM_ mChan $ \ chan -> atomically $
555
- -- We make sure the message is fully decoded when it is enqueued
556
- writeTQueue chan $! decode (BSL. fromChunks payload)
554
+ forM_ mChan $ \ chan -> do
555
+ msg' <- atomically $ do
556
+ msg <- return $! decode (BSL. fromChunks payload)
557
+ -- We make sure the message is fully decoded when it is enqueued
558
+ writeTQueue chan msg
559
+ return msg
560
+ trace node $ MxReceivedPort chId $ unsafeCreateUnencodedMessage msg'
557
561
go st
558
562
Just (_, ToNode ) -> do
559
563
let ctrlMsg = decode . BSL. fromChunks $ payload
@@ -580,7 +584,7 @@ handleIncomingMessages node = go initConnectionState
580
584
mChannel <- withMVar (processState proc ) $ return . (^. typedChannelWithId lcid)
581
585
case mChannel of
582
586
Just channel ->
583
- go (incomingAt cid ^= Just (src, ToChan channel) $ st)
587
+ go (incomingAt cid ^= Just (src, ToChan chId channel) $ st)
584
588
Nothing ->
585
589
invalidRequest cid st $
586
590
" incoming attempt to connect to unknown channel of"
@@ -1050,11 +1054,12 @@ ncEffectLocalPortSend from msg = do
1050
1054
-- If ch is Nothing, the process has given up the read end of
1051
1055
-- the channel and we simply ignore the incoming message - this
1052
1056
ch <- deRefWeak chan'
1053
- forM_ ch $ \ chan -> deliverChan msg chan
1054
- where deliverChan :: forall a . Message -> TQueue a -> IO ()
1055
- deliverChan (UnencodedMessage _ raw) chan' =
1057
+ forM_ ch $ \ chan -> deliverChan node from msg chan
1058
+ where deliverChan :: forall a . LocalNode -> SendPortId -> Message -> TQueue a -> IO ()
1059
+ deliverChan n p (UnencodedMessage _ raw) chan' = do
1056
1060
atomically $ writeTQueue chan' ((unsafeCoerce raw) :: a )
1057
- deliverChan (EncodedMessage _ _) _ =
1061
+ trace n (MxReceivedPort p $ unsafeCreateUnencodedMessage raw)
1062
+ deliverChan _ _ (EncodedMessage _ _) _ =
1058
1063
-- this will not happen unless someone screws with Primitives.hs
1059
1064
error " invalid local channel delivery"
1060
1065
0 commit comments