Skip to content

Commit ae6ef8e

Browse files
author
Tim Watson
committed
Link the node controller and node event listener threads
If either fails, the exception will be rethrown to the other thread. In addition, when the NT listening loop ends normally (i.e., the endpoint has been shut down) and control channel signal notifies the NC thread to terminate as well - no more potential zombies. DP-39 #resolve
1 parent 1ab91c9 commit ae6ef8e

File tree

2 files changed

+48
-20
lines changed

2 files changed

+48
-20
lines changed

src/Control/Distributed/Process/Internal/Types.hs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -484,6 +484,7 @@ data ProcessSignal =
484484
| Kill !ProcessId !String
485485
| Exit !ProcessId !Message
486486
| GetInfo !ProcessId
487+
| SigShutdown
487488
deriving Show
488489

489490
--------------------------------------------------------------------------------
@@ -535,6 +536,7 @@ instance Binary ProcessSignal where
535536
put (Kill pid reason) = putWord8 9 >> put pid >> put reason
536537
put (Exit pid reason) = putWord8 10 >> put pid >> put (messageToPayload reason)
537538
put (GetInfo about) = putWord8 30 >> put about
539+
put (SigShutdown) = putWord8 31
538540
get = do
539541
header <- getWord8
540542
case header of
@@ -550,6 +552,7 @@ instance Binary ProcessSignal where
550552
9 -> Kill <$> get <*> get
551553
10 -> Exit <$> get <*> (payloadToMessage <$> get)
552554
30 -> GetInfo <$> get
555+
31 -> return SigShutdown
553556
_ -> fail "ProcessSignal.get: invalid"
554557

555558
instance Binary DiedReason where

src/Control/Distributed/Process/Node.hs

Lines changed: 45 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,13 @@ import Control.Monad.IO.Class (MonadIO, liftIO)
4949
import Control.Monad.State.Strict (MonadState, StateT, evalStateT, gets)
5050
import qualified Control.Monad.State.Strict as StateT (get, put)
5151
import Control.Monad.Reader (MonadReader, ReaderT, runReaderT, ask)
52-
import Control.Exception (throwIO, SomeException, Exception, throwTo)
52+
import Control.Exception
53+
( throwIO
54+
, AsyncException(ThreadKilled)
55+
, SomeException
56+
, Exception
57+
, throwTo
58+
)
5359
import qualified Control.Exception as Exception (catch)
5460
import Control.Concurrent (forkIO)
5561
import Control.Distributed.Process.Internal.StrictMVar
@@ -193,25 +199,41 @@ newLocalNode transport rtable = do
193199
-- | Create a new local node (without any service processes running)
194200
createBareLocalNode :: NT.EndPoint -> RemoteTable -> IO LocalNode
195201
createBareLocalNode endPoint rtable = do
196-
unq <- randomIO
197-
state <- newMVar LocalNodeState
198-
{ _localProcesses = Map.empty
199-
, _localPidCounter = firstNonReservedProcessId
200-
, _localPidUnique = unq
201-
, _localConnections = Map.empty
202-
}
203-
ctrlChan <- newChan
204-
let node = LocalNode { localNodeId = NodeId $ NT.address endPoint
205-
, localEndPoint = endPoint
206-
, localState = state
207-
, localCtrlChan = ctrlChan
208-
, localTracer = InactiveTracer
209-
, remoteTable = rtable
210-
}
211-
tracedNode <- startTracing node
212-
void . forkIO $ runNodeController tracedNode
213-
void . forkIO $ handleIncomingMessages tracedNode
214-
return tracedNode
202+
unq <- randomIO
203+
state <- newMVar LocalNodeState
204+
{ _localProcesses = Map.empty
205+
, _localPidCounter = firstNonReservedProcessId
206+
, _localPidUnique = unq
207+
, _localConnections = Map.empty
208+
}
209+
ctrlChan <- newChan
210+
let node = LocalNode { localNodeId = NodeId $ NT.address endPoint
211+
, localEndPoint = endPoint
212+
, localState = state
213+
, localCtrlChan = ctrlChan
214+
, localTracer = InactiveTracer
215+
, remoteTable = rtable
216+
}
217+
tracedNode <- startTracing node
218+
219+
ncGo <- newEmptyMVar
220+
ncTid <- forkIO $ do
221+
tid <- takeMVar ncGo
222+
(runNodeController tracedNode
223+
`Exception.catch` \(e :: SomeException) -> throwTo tid e)
224+
225+
evTid <- forkIO $ do
226+
(handleIncomingMessages tracedNode >> (stopNC node))
227+
`Exception.catch` \(e :: SomeException) -> throwTo ncTid e
228+
229+
putMVar ncGo evTid
230+
return tracedNode
231+
where
232+
stopNC node =
233+
writeChan (localCtrlChan node) NCMsg
234+
{ ctrlMsgSender = NodeIdentifier (localNodeId node)
235+
, ctrlMsgSignal = SigShutdown
236+
}
215237

216238
-- | Start and register the service processes on a node
217239
-- (for now, this is only the logger)
@@ -583,6 +605,8 @@ nodeController = do
583605
ncEffectExit from to reason
584606
NCMsg (ProcessIdentifier from) (GetInfo pid) ->
585607
ncEffectGetInfo from pid
608+
NCMsg _ SigShutdown ->
609+
liftIO $ throwIO ThreadKilled -- seems to make more sense than fail/error
586610
unexpected ->
587611
error $ "nodeController: unexpected message " ++ show unexpected
588612

@@ -902,6 +926,7 @@ destNid (Died _ _) = Nothing
902926
destNid (Kill pid _) = Just $ processNodeId pid
903927
destNid (Exit pid _) = Just $ processNodeId pid
904928
destNid (GetInfo pid) = Just $ processNodeId pid
929+
destNid (SigShutdown) = Nothing
905930

906931
-- | Check if a process is local to our own node
907932
isLocal :: LocalNode -> Identifier -> Bool

0 commit comments

Comments
 (0)