This repository was archived by the owner on Sep 3, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathBroadcast.hs
338 lines (297 loc) · 11.6 KB
/
Broadcast.hs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
{-# LANGUAGE DeriveDataTypeable #-}
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE StandaloneDeriving #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE PatternGuards #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE EmptyDataDecls #-}
{-# LANGUAGE TemplateHaskell #-}
{-# LANGUAGE ImpredicativeTypes #-}
{-# LANGUAGE UndecidableInstances #-}
{-# LANGUAGE MultiParamTypeClasses #-}
-- | An exchange type that broadcasts all incomings 'Post' messages.
module Control.Distributed.Process.Execution.Exchange.Broadcast
(
broadcastExchange
, broadcastExchangeT
, broadcastClient
, bindToBroadcaster
, BroadcastExchange
) where
import Control.Concurrent.STM (STM, atomically)
import Control.Concurrent.STM.TChan
( TChan
, newBroadcastTChanIO
, dupTChan
, readTChan
, writeTChan
)
import Control.DeepSeq (NFData)
import Control.Distributed.Process
( Process
, MonitorRef
, ProcessMonitorNotification(..)
, ProcessId
, SendPort
, processNodeId
, getSelfPid
, getSelfNode
, liftIO
, newChan
, sendChan
, unsafeSend
, unsafeSendChan
, receiveWait
, match
, matchIf
, die
, handleMessage
, Match
)
import qualified Control.Distributed.Process as P
import Control.Distributed.Process.Serializable()
import Control.Distributed.Process.Execution.Exchange.Internal
( startExchange
, configureExchange
, Message(..)
, Exchange(..)
, ExchangeType(..)
, applyHandlers
)
import Control.Distributed.Process.Extras.Internal.Types
( Channel
, ServerDisconnected(..)
)
import Control.Distributed.Process.Extras.Internal.Unsafe -- see [note: pcopy]
( PCopy
, pCopy
, pUnwrap
, matchChanP
, InputStream(Null)
, newInputStream
)
import Control.Monad (forM_, void)
import Data.Accessor
( Accessor
, accessor
, (^:)
)
import Data.Binary
import qualified Data.Foldable as Foldable (toList)
import Data.Map.Strict (Map)
import qualified Data.Map.Strict as Map
import Data.Typeable (Typeable)
import GHC.Generics
-- newtype RoutingTable r =
-- RoutingTable { routes :: (Map String (Map ProcessId r)) }
-- [note: BindSTM, BindPort and safety]
-- We keep these two /bind types/ separate, since only one of them
-- is truly serializable. The risk of unifying them is that at some
-- later time a maintainer might not realise that BindSTM cannot be
-- sent over the wire due to our use of PCopy.
--
data BindPort = BindPort { portClient :: !ProcessId
, portSend :: !(SendPort Message)
} deriving (Typeable, Generic)
instance Binary BindPort where
instance NFData BindPort where
data BindSTM =
BindSTM { stmClient :: !ProcessId
, stmSend :: !(SendPort (PCopy (InputStream Message)))
} deriving (Typeable)
{- | forall r. (Routable r) =>
BindR { client :: !ProcessId
, key :: !String
, chanC :: !r
}
deriving (Typeable, Generic)
-}
data OutputStream =
WriteChan (SendPort Message)
| WriteSTM (Message -> STM ())
-- | WriteP ProcessId
| NoWrite
deriving (Typeable)
data Binding = Binding { outputStream :: !OutputStream
, inputStream :: !(InputStream Message)
}
| PidBinding !ProcessId
deriving (Typeable)
data BindOk = BindOk
deriving (Typeable, Generic)
instance Binary BindOk where
instance NFData BindOk where
data BindFail = BindFail !String
deriving (Typeable, Generic)
instance Binary BindFail where
instance NFData BindFail where
data BindPlease = BindPlease
deriving (Typeable, Generic)
instance Binary BindPlease where
instance NFData BindPlease where
type BroadcastClients = Map ProcessId Binding
data BroadcastEx =
BroadcastEx { _routingTable :: !BroadcastClients
, channel :: !(TChan Message)
}
type BroadcastExchange = ExchangeType BroadcastEx
--------------------------------------------------------------------------------
-- Starting/Running the Exchange --
--------------------------------------------------------------------------------
-- | Start a new /broadcast exchange/ and return a handle to the exchange.
broadcastExchange :: Process Exchange
broadcastExchange = broadcastExchangeT >>= startExchange
-- | The 'ExchangeType' of a broadcast exchange. Can be combined with the
-- @startSupervisedRef@ and @startSupervised@ APIs.
--
broadcastExchangeT :: Process BroadcastExchange
broadcastExchangeT = do
ch <- liftIO newBroadcastTChanIO
return $ ExchangeType { name = "BroadcastExchange"
, state = BroadcastEx Map.empty ch
, configureEx = apiConfigure
, routeEx = apiRoute
}
--------------------------------------------------------------------------------
-- Client Facing API --
--------------------------------------------------------------------------------
-- | Create a binding to the given /broadcast exchange/ for the calling process
-- and return an 'InputStream' that can be used in the @expect@ and
-- @receiveWait@ family of messaging primitives. This form of client interaction
-- helps avoid cluttering the caller's mailbox with 'Message' data, since the
-- 'InputChannel' provides a separate input stream (in a similar fashion to
-- a typed channel).
-- Example:
--
-- > is <- broadcastClient ex
-- > msg <- receiveWait [ matchInputStream is ]
-- > handleMessage (payload msg)
--
broadcastClient :: Exchange -> Process (InputStream Message)
broadcastClient ex@Exchange{..} = do
myNode <- getSelfNode
us <- getSelfPid
if processNodeId pid == myNode -- see [note: pcopy]
then do (sp, rp) <- newChan
configureExchange ex $ pCopy (BindSTM us sp)
mRef <- P.monitor pid
P.finally (receiveWait [ matchChanP rp
, handleServerFailure mRef ])
(P.unmonitor mRef)
else do (sp, rp) <- newChan :: Process (Channel Message)
configureExchange ex $ BindPort us sp
mRef <- P.monitor pid
P.finally (receiveWait [
match (\(_ :: BindOk) -> return $ newInputStream $ Left rp)
, match (\(f :: BindFail) -> die f)
, handleServerFailure mRef
])
(P.unmonitor mRef)
-- | Bind the calling process to the given /broadcast exchange/. For each
-- 'Message' the exchange receives, /only the payload will be sent/
-- to the calling process' mailbox.
--
-- Example:
--
-- (producer)
-- > post ex "Hello"
--
-- (consumer)
-- > bindToBroadcaster ex
-- > expect >>= liftIO . putStrLn
--
bindToBroadcaster :: Exchange -> Process ()
bindToBroadcaster ex@Exchange{..} = do
us <- getSelfPid
configureExchange ex $ (BindPlease, us)
--------------------------------------------------------------------------------
-- Exchage Definition/State & API Handlers --
--------------------------------------------------------------------------------
apiRoute :: BroadcastEx -> Message -> Process BroadcastEx
apiRoute ex@BroadcastEx{..} msg = do
liftIO $ atomically $ writeTChan channel msg
forM_ (Foldable.toList _routingTable) $ routeToClient msg
return ex
where
routeToClient m (PidBinding p) = P.forward (payload m) p
routeToClient m b@(Binding _ _) = writeToStream (outputStream b) m
-- TODO: implement unbind!!?
apiConfigure :: BroadcastEx -> P.Message -> Process BroadcastEx
apiConfigure ex msg = do
-- for unsafe / non-serializable message passing hacks, see [note: pcopy]
applyHandlers ex msg $ [ \m -> handleMessage m (handleBindPort ex)
, \m -> handleBindSTM ex m
, \m -> handleMessage m (handleBindPlease ex)
, \m -> handleMessage m (handleMonitorSignal ex)
, (const $ return $ Just ex)
]
where
handleBindPlease ex' (BindPlease, p) = do
case lookupBinding ex' p of
Nothing -> return $ (routingTable ^: Map.insert p (PidBinding p)) ex'
Just _ -> return ex'
handleMonitorSignal bx (ProcessMonitorNotification _ p _) =
return $ (routingTable ^: Map.delete p) bx
handleBindSTM ex'@BroadcastEx{..} msg' = do
bind' <- pUnwrap msg' :: Process (Maybe BindSTM) -- see [note: pcopy]
case bind' of
Nothing -> return Nothing
Just s -> do
let binding = lookupBinding ex' (stmClient s)
case binding of
Nothing -> createBinding ex' s >>= \ex'' -> handleBindSTM ex'' msg'
Just b -> sendBinding (stmSend s) b >> return (Just ex')
createBinding bEx'@BroadcastEx{..} BindSTM{..} = do
void $ P.monitor stmClient
nch <- liftIO $ atomically $ dupTChan channel
let istr = newInputStream $ Right (readTChan nch)
let ostr = NoWrite -- we write to our own channel, not the broadcast
let bnd = Binding ostr istr
return $ (routingTable ^: Map.insert stmClient bnd) bEx'
sendBinding sp' bs = unsafeSendChan sp' $ pCopy (inputStream bs)
handleBindPort :: BroadcastEx -> BindPort -> Process BroadcastEx
handleBindPort x@BroadcastEx{..} BindPort{..} = do
let binding = lookupBinding x portClient
case binding of
Just _ -> unsafeSend portClient (BindFail "DuplicateBinding") >> return x
Nothing -> do
let istr = Null
let ostr = WriteChan portSend
let bound = Binding ostr istr
void $ P.monitor portClient
unsafeSend portClient BindOk
return $ (routingTable ^: Map.insert portClient bound) x
lookupBinding BroadcastEx{..} k = Map.lookup k $ _routingTable
{- [note: pcopy]
We rely on risky techniques here, in order to allow for sharing useful
data that is not really serializable. For Cloud Haskell generally, this is
a bad idea, since we want message passing to work both locally and in a
distributed setting. In this case however, what we're really attempting is
an optimisation, since we only use unsafe PCopy based techniques when dealing
with exchange clients residing on our (local) node.
The PCopy mechanism is defined in the (aptly named) "Unsafe" module.
-}
-- TODO: move handleServerFailure into Primitives.hs
writeToStream :: OutputStream -> Message -> Process ()
writeToStream (WriteChan sp) = sendChan sp -- see [note: safe remote send]
writeToStream (WriteSTM stm) = liftIO . atomically . stm
writeToStream NoWrite = const $ return ()
{-# INLINE writeToStream #-}
{- [note: safe remote send]
Although we go to great lengths here to avoid serialization and/or copying
overheads, there are some activities for which we prefer to play it safe.
Chief among these is delivering messages to remote clients. Thankfully, our
unsafe @sendChan@ primitive will crash the caller/sender if there are any
encoding problems, however it is only because we /know/ for certain that
our recipient is remote, that we've chosen to write via a SendPort in the
first place! It makes sense therefore, to use the safe @sendChan@ operation
here, since for a remote call we /cannot/ avoid the overhead of serialization
anyway.
-}
handleServerFailure :: MonitorRef -> Match (InputStream Message)
handleServerFailure mRef =
matchIf (\(ProcessMonitorNotification r _ _) -> r == mRef)
(\(ProcessMonitorNotification _ _ d) -> die $ ServerDisconnected d)
routingTable :: Accessor BroadcastEx BroadcastClients
routingTable = accessor _routingTable (\r e -> e { _routingTable = r })