Skip to content

Commit 56e5305

Browse files
Fix for DP-110. The NC does not block anymore on network operations.
A pool of threads is used for network operations. One thread is used per NodeId. Conflicts: src/Control/Distributed/Process/Internal/Types.hs src/Control/Distributed/Process/Node.hs
1 parent 78ef646 commit 56e5305

File tree

4 files changed

+100
-2
lines changed

4 files changed

+100
-2
lines changed

distributed-process.cabal

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ Library
6868
Control.Distributed.Process.Internal.StrictContainerAccessors,
6969
Control.Distributed.Process.Internal.StrictList,
7070
Control.Distributed.Process.Internal.StrictMVar,
71+
Control.Distributed.Process.Internal.ThreadPool,
7172
Control.Distributed.Process.Internal.Types,
7273
Control.Distributed.Process.Internal.WeakTQueue,
7374
Control.Distributed.Process.Management,
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
-- | An implementation of a pool of threads
2+
--
3+
{-# LANGUAGE RecursiveDo #-}
4+
module Control.Distributed.Process.Internal.ThreadPool
5+
( newThreadPool
6+
, submitTask
7+
, lookupWorker
8+
, ThreadPool
9+
) where
10+
11+
import Control.Exception
12+
import Control.Monad
13+
import Data.IORef
14+
import qualified Data.Map as Map
15+
16+
17+
-- | A pool of worker threads that execute tasks.
18+
--
19+
-- Each worker thread is named with a key @k@. Tasks are submitted to a
20+
-- specific worker using its key. While the worker is busy the tasks are queued.
21+
-- When there are no more queued tasks the worker ceases to exist.
22+
--
23+
-- The next time a task is submitted the worker will be respawned.
24+
--
25+
newtype ThreadPool k w = ThreadPool (IORef (Map.Map k (Maybe (IO ()), w)))
26+
27+
-- Each worker has an entry in the map with a closure that contains all
28+
-- queued actions fot it.
29+
--
30+
-- No entry in the map is kept for defunct workers.
31+
32+
-- | Creates a pool with no workers.
33+
newThreadPool :: IO (ThreadPool k w)
34+
newThreadPool = fmap ThreadPool $ newIORef Map.empty
35+
36+
-- | @submitTask pool fork k task@ submits a task for the worker @k@.
37+
--
38+
-- If worker @k@ is busy, then the task is queued until the worker is available.
39+
--
40+
-- If worker @k@ does not exist, then the given @fork@ operation is used to
41+
-- spawn the worker. @fork@ returns whatever information is deemed useful for
42+
-- later retrieval via 'lookupWorker'.
43+
--
44+
submitTask :: Ord k
45+
=> ThreadPool k w
46+
-> (IO () -> IO w)
47+
-> k -> IO () -> IO ()
48+
submitTask (ThreadPool mapRef) fork k task = mdo
49+
m' <- join $ atomicModifyIORef mapRef $ \m ->
50+
case Map.lookup k m of
51+
-- There is no worker for this key, create one.
52+
Nothing -> ( m'
53+
, do w <- fork $ flip onException terminateWorker $ do
54+
task
55+
continue
56+
return $ Map.insert k (Nothing, w) m
57+
)
58+
-- Queue an action for the existing worker.
59+
Just (mp, w) ->
60+
(m', return $ Map.insert k (Just $ maybe task (>> task) mp, w) m)
61+
return ()
62+
where
63+
continue = join $ atomicModifyIORef mapRef $ \m ->
64+
case Map.lookup k m of
65+
-- Execute the next batch of queued actions.
66+
Just (Just p, w) -> (Map.insert k (Nothing, w) m, p >> continue)
67+
-- There are no more queued actions. Terminate the worker.
68+
Just (Nothing, w) -> (Map.delete k m, return ())
69+
-- The worker key was removed already (?)
70+
Nothing -> (m, return ())
71+
-- Remove the worker key regardless of whether there are more queued
72+
-- actions.
73+
terminateWorker = atomicModifyIORef mapRef $ \m -> (Map.delete k m, ())
74+
75+
-- | Looks up a worker with the given key.
76+
lookupWorker :: Ord k => ThreadPool k w -> k -> IO (Maybe w)
77+
lookupWorker (ThreadPool mapRef) k =
78+
atomicModifyIORef mapRef $ \m -> (m, fmap snd $ Map.lookup k m)

src/Control/Distributed/Process/Internal/Types.hs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,7 @@ import Control.Distributed.Process.Internal.StrictMVar
137137
, modifyMVar
138138
, modifyMVar_
139139
)
140+
import Control.Distributed.Process.Internal.ThreadPool
140141
import Control.Distributed.Process.Internal.WeakTQueue (TQueue)
141142
import Control.Distributed.Static (RemoteTable, Closure)
142143
import qualified Control.Distributed.Process.Internal.StrictContainerAccessors as DAC (mapMaybe)
@@ -267,6 +268,8 @@ data LocalNode = LocalNode
267268
-- | Runtime lookup table for supporting closures
268269
-- TODO: this should be part of the CH state, not the local endpoint state
269270
, remoteTable :: !RemoteTable
271+
-- The pool of threads and queues to send messages
272+
, localSendPool :: ThreadPool NodeId ThreadId
270273
}
271274

272275
data ImplicitReconnect = WithImplicitReconnect | NoImplicitReconnect

src/Control/Distributed/Process/Node.hs

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,12 @@ import Control.Distributed.Process.Internal.StrictMVar
9191
, putMVar
9292
, takeMVar
9393
)
94+
import Control.Distributed.Process.Internal.ThreadPool
95+
( newThreadPool
96+
, submitTask
97+
, lookupWorker
98+
, ThreadPool
99+
)
94100
import Control.Concurrent.Chan (newChan, writeChan, readChan)
95101
import qualified Control.Concurrent.MVar as MVar (newEmptyMVar, takeMVar)
96102
import Control.Concurrent.STM
@@ -264,12 +270,14 @@ createBareLocalNode endPoint rtable = do
264270
, _localConnections = Map.empty
265271
}
266272
ctrlChan <- newChan
273+
sendPool <- newThreadPool
267274
let node = LocalNode { localNodeId = NodeId $ NT.address endPoint
268275
, localEndPoint = endPoint
269276
, localState = state
270277
, localCtrlChan = ctrlChan
271278
, localEventBus = MxEventBusInitialising
272279
, remoteTable = rtable
280+
, localSendPool = sendPool
273281
}
274282
tracedNode <- startMxAgent node
275283

@@ -613,6 +621,8 @@ handleIncomingMessages node = go initConnectionState
613621
, ctrlMsgSignal = Died nid DiedDisconnect
614622
}
615623
let notLost k = not (k `Set.member` (st ^. incomingFrom theirAddr))
624+
liftIO $ lookupWorker (localSendPool node) (NodeId theirAddr)
625+
>>= maybe (return ()) killThread
616626
closeImplicitReconnections node nid
617627
go ( (incomingFrom theirAddr ^= Set.empty)
618628
. (incoming ^: Map.filterWithKey (const . notLost))
@@ -669,6 +679,12 @@ data NCState = NCState
669679
, _registeredOnNodes :: !(Map ProcessId [(NodeId,Int)])
670680
}
671681

682+
submitSendPool :: LocalNode -> NodeId -> IO () -> IO ()
683+
submitSendPool node nid task = submitTask (localSendPool node) fork nid task
684+
where
685+
fork io =
686+
forkIO $ io `Exception.catch` \(NodeClosedException _) -> return ()
687+
672688
newtype NC a = NC { unNC :: StateT NCState (ReaderT LocalNode IO) a }
673689
deriving ( Applicative
674690
, Functor
@@ -704,7 +720,7 @@ ncSendToProcessAndTrace shouldTrace pid msg = do
704720
node <- ask
705721
if processNodeId pid == localNodeId node
706722
then ncEffectLocalSendAndTrace shouldTrace node pid msg
707-
else liftIO $ sendBinary node
723+
else liftIO $ submitSendPool node (processNodeId pid) $ sendBinary node
708724
(NodeIdentifier $ localNodeId node)
709725
(NodeIdentifier $ processNodeId pid)
710726
WithImplicitReconnect
@@ -717,7 +733,7 @@ ncSendToNode to msg = do
717733
node <- ask
718734
liftIO $ if to == localNodeId node
719735
then writeChan (localCtrlChan node) $! msg
720-
else sendBinary node
736+
else submitSendPool node to $ sendBinary node
721737
(NodeIdentifier $ localNodeId node)
722738
(NodeIdentifier to)
723739
WithImplicitReconnect

0 commit comments

Comments
 (0)