Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dp 110 fd #199

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions distributed-process.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ Library
Control.Distributed.Process.Internal.StrictContainerAccessors,
Control.Distributed.Process.Internal.StrictList,
Control.Distributed.Process.Internal.StrictMVar,
Control.Distributed.Process.Internal.ThreadPool,
Control.Distributed.Process.Internal.Types,
Control.Distributed.Process.Internal.WeakTQueue,
Control.Distributed.Process.Management,
Expand Down
78 changes: 78 additions & 0 deletions src/Control/Distributed/Process/Internal/ThreadPool.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
-- | An implementation of a pool of threads
--
{-# LANGUAGE RecursiveDo #-}
module Control.Distributed.Process.Internal.ThreadPool
( newThreadPool
, submitTask
, lookupWorker
, ThreadPool
) where

import Control.Exception
import Control.Monad
import Data.IORef
import qualified Data.Map as Map


-- | A pool of worker threads that execute tasks.
--
-- Each worker thread is named with a key @k@. Tasks are submitted to a
-- specific worker using its key. While the worker is busy the tasks are queued.
-- When there are no more queued tasks the worker ceases to exist.
--
-- The next time a task is submitted the worker will be respawned.
--
newtype ThreadPool k w = ThreadPool (IORef (Map.Map k (Maybe (IO ()), w)))

-- Each worker has an entry in the map with a closure that contains all
-- queued actions fot it.
--
-- No entry in the map is kept for defunct workers.

-- | Creates a pool with no workers.
newThreadPool :: IO (ThreadPool k w)
newThreadPool = fmap ThreadPool $ newIORef Map.empty

-- | @submitTask pool fork k task@ submits a task for the worker @k@.
--
-- If worker @k@ is busy, then the task is queued until the worker is available.
--
-- If worker @k@ does not exist, then the given @fork@ operation is used to
-- spawn the worker. @fork@ returns whatever information is deemed useful for
-- later retrieval via 'lookupWorker'.
--
submitTask :: Ord k
=> ThreadPool k w
-> (IO () -> IO w)
-> k -> IO () -> IO ()
submitTask (ThreadPool mapRef) fork k task = mdo
m' <- join $ atomicModifyIORef mapRef $ \m ->
case Map.lookup k m of
-- There is no worker for this key, create one.
Nothing -> ( m'
, do w <- fork $ flip onException terminateWorker $ do
task
continue
return $ Map.insert k (Nothing, w) m
)
-- Queue an action for the existing worker.
Just (mp, w) ->
(m', return $ Map.insert k (Just $ maybe task (>> task) mp, w) m)
return ()
where
continue = join $ atomicModifyIORef mapRef $ \m ->
case Map.lookup k m of
-- Execute the next batch of queued actions.
Just (Just p, w) -> (Map.insert k (Nothing, w) m, p >> continue)
-- There are no more queued actions. Terminate the worker.
Just (Nothing, w) -> (Map.delete k m, return ())
-- The worker key was removed already (?)
Nothing -> (m, return ())
-- Remove the worker key regardless of whether there are more queued
-- actions.
terminateWorker = atomicModifyIORef mapRef $ \m -> (Map.delete k m, ())

-- | Looks up a worker with the given key.
lookupWorker :: Ord k => ThreadPool k w -> k -> IO (Maybe w)
lookupWorker (ThreadPool mapRef) k =
atomicModifyIORef mapRef $ \m -> (m, fmap snd $ Map.lookup k m)
3 changes: 3 additions & 0 deletions src/Control/Distributed/Process/Internal/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ import Control.Distributed.Process.Internal.StrictMVar
, withMVar
, modifyMVar_
)
import Control.Distributed.Process.Internal.ThreadPool
import Control.Distributed.Process.Internal.WeakTQueue (TQueue)
import Control.Distributed.Static (RemoteTable, Closure)
import qualified Control.Distributed.Process.Internal.StrictContainerAccessors as DAC (mapMaybe)
Expand Down Expand Up @@ -259,6 +260,8 @@ data LocalNode = LocalNode
-- | Runtime lookup table for supporting closures
-- TODO: this should be part of the CH state, not the local endpoint state
, remoteTable :: !RemoteTable
-- The pool of threads and queues to send messages
, localSendPool :: ThreadPool NodeId ThreadId
}

data ImplicitReconnect = WithImplicitReconnect | NoImplicitReconnect
Expand Down
43 changes: 29 additions & 14 deletions src/Control/Distributed/Process/Node.hs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,12 @@ import Control.Distributed.Process.Internal.StrictMVar
, putMVar
, takeMVar
)
import Control.Distributed.Process.Internal.ThreadPool
( newThreadPool
, submitTask
, lookupWorker
, ThreadPool
)
import Control.Concurrent.Chan (newChan, writeChan, readChan)
import qualified Control.Concurrent.MVar as MVar (newEmptyMVar, takeMVar)
import Control.Concurrent.STM
Expand Down Expand Up @@ -242,12 +248,14 @@ createBareLocalNode endPoint rtable = do
, _localConnections = Map.empty
}
ctrlChan <- newChan
sendPool <- newThreadPool
let node = LocalNode { localNodeId = NodeId $ NT.address endPoint
, localEndPoint = endPoint
, localState = state
, localCtrlChan = ctrlChan
, localEventBus = MxEventBusInitialising
, remoteTable = rtable
, localSendPool = sendPool
}
tracedNode <- startMxAgent node

Expand Down Expand Up @@ -560,6 +568,8 @@ handleIncomingMessages node = go initConnectionState
, ctrlMsgSignal = Died nid DiedDisconnect
}
let notLost k = not (k `Set.member` (st ^. incomingFrom theirAddr))
liftIO $ lookupWorker (localSendPool node) (NodeId theirAddr)
>>= maybe (return ()) killThread
closeImplicitReconnections node nid
go ( (incomingFrom theirAddr ^= Set.empty)
. (incoming ^: Map.filterWithKey (const . notLost))
Expand Down Expand Up @@ -614,6 +624,9 @@ data NCState = NCState
, _registeredOnNodes :: !(Map ProcessId [(NodeId,Int)])
}

submitSendPool :: LocalNode -> NodeId -> IO () -> IO ()
submitSendPool node nid task = submitTask (localSendPool node) forkIO nid task

newtype NC a = NC { unNC :: StateT NCState (ReaderT LocalNode IO) a }
deriving ( Applicative
, Functor
Expand All @@ -624,11 +637,12 @@ newtype NC a = NC { unNC :: StateT NCState (ReaderT LocalNode IO) a }
)

initNCState :: NCState
initNCState = NCState { _links = Map.empty
, _monitors = Map.empty
, _registeredHere = Map.empty
, _registeredOnNodes = Map.empty
}
initNCState = NCState
{ _links = Map.empty
, _monitors = Map.empty
, _registeredHere = Map.empty
, _registeredOnNodes = Map.empty
}

-- | Thrown in response to the user invoking 'kill' (see Primitives.hs). This
-- type is deliberately not exported so it cannot be caught explicitly.
Expand Down Expand Up @@ -683,7 +697,7 @@ nodeController = do
-- [Unified: Table 7, rule nc_forward]
case destNid (ctrlMsgSignal msg) of
Just nid' | nid' /= localNodeId node ->
liftIO $ sendBinary node
liftIO $ submitSendPool node nid' $ sendBinary node
(ctrlMsgSender msg)
(NodeIdentifier nid')
WithImplicitReconnect
Expand Down Expand Up @@ -754,7 +768,7 @@ ncEffectMonitor from them mRef = do
-- TODO: this is the right sender according to the Unified semantics,
-- but perhaps having 'them' as the sender would make more sense
-- (see also: notifyDied)
liftIO $ sendBinary node
liftIO $ submitSendPool node (processNodeId from) $ sendBinary node
(NodeIdentifier $ localNodeId node)
(NodeIdentifier $ processNodeId from)
WithImplicitReconnect
Expand Down Expand Up @@ -823,7 +837,7 @@ ncEffectDied ident reason = do
modify' $ registeredOnNodes ^= (Map.fromList (catMaybes remaining))
where
forwardNameDeath node nid =
liftIO $ sendBinary node
liftIO $ submitSendPool node nid $ sendBinary node
(NodeIdentifier $ localNodeId node)
(NodeIdentifier $ nid)
WithImplicitReconnect
Expand All @@ -844,7 +858,7 @@ ncEffectSpawn pid cProc ref = do
Right p -> p
node <- ask
pid' <- liftIO $ forkProcess node proc
liftIO $ sendMessage node
liftIO $ submitSendPool node (processNodeId pid) $ sendMessage node
(NodeIdentifier (localNodeId node))
(ProcessIdentifier pid)
WithImplicitReconnect
Expand Down Expand Up @@ -872,7 +886,8 @@ ncEffectRegister from label atnode mPid reregistration = do
case mPid of
(Just p) -> liftIO $ trace node (MxRegistered p label)
Nothing -> liftIO $ trace node (MxUnRegistered (fromJust currentVal) label)
liftIO $ sendMessage node
liftIO $ submitSendPool node (processNodeId from) $
sendMessage node
(NodeIdentifier (localNodeId node))
(ProcessIdentifier from)
WithImplicitReconnect
Expand Down Expand Up @@ -906,7 +921,7 @@ ncEffectRegister from label atnode mPid reregistration = do
decList (x:xs) tag = x:decList xs tag
forward node to reg =
when (not $ isLocal node (NodeIdentifier to)) $
liftIO $ sendBinary node
liftIO $ submitSendPool node to $ sendBinary node
(ProcessIdentifier from)
(NodeIdentifier to)
WithImplicitReconnect
Expand All @@ -921,7 +936,7 @@ ncEffectWhereIs :: ProcessId -> String -> NC ()
ncEffectWhereIs from label = do
node <- ask
mPid <- gets (^. registeredHereFor label)
liftIO $ sendMessage node
liftIO $ submitSendPool node (processNodeId from) $ sendMessage node
(NodeIdentifier (localNodeId node))
(ProcessIdentifier from)
WithImplicitReconnect
Expand Down Expand Up @@ -1022,7 +1037,7 @@ ncEffectGetInfo from pid =
-> NC ()
dispatch True dest _ pInfo = postAsMessage dest $ pInfo
dispatch False dest node pInfo = do
liftIO $ sendMessage node
liftIO $ submitSendPool node (processNodeId dest) $ sendMessage node
(NodeIdentifier (localNodeId node))
(ProcessIdentifier dest)
WithImplicitReconnect
Expand Down Expand Up @@ -1073,7 +1088,7 @@ notifyDied dest src reason mRef = do
throwException dest $ PortLinkException pid reason
(False, _, _) ->
-- The change in sender comes from [Unified: Table 10]
liftIO $ sendBinary node
liftIO $ submitSendPool node (processNodeId dest) $ sendBinary node
(NodeIdentifier $ localNodeId node)
(NodeIdentifier $ processNodeId dest)
WithImplicitReconnect
Expand Down