@@ -49,7 +49,13 @@ import Control.Monad.IO.Class (MonadIO, liftIO)
49
49
import Control.Monad.State.Strict (MonadState , StateT , evalStateT , gets )
50
50
import qualified Control.Monad.State.Strict as StateT (get , put )
51
51
import Control.Monad.Reader (MonadReader , ReaderT , runReaderT , ask )
52
- import Control.Exception (throwIO , SomeException , Exception , throwTo )
52
+ import Control.Exception
53
+ ( throwIO
54
+ , AsyncException (ThreadKilled )
55
+ , SomeException
56
+ , Exception
57
+ , throwTo
58
+ )
53
59
import qualified Control.Exception as Exception (catch )
54
60
import Control.Concurrent (forkIO )
55
61
import Control.Distributed.Process.Internal.StrictMVar
@@ -193,25 +199,41 @@ newLocalNode transport rtable = do
193
199
-- | Create a new local node (without any service processes running)
194
200
createBareLocalNode :: NT. EndPoint -> RemoteTable -> IO LocalNode
195
201
createBareLocalNode endPoint rtable = do
196
- unq <- randomIO
197
- state <- newMVar LocalNodeState
198
- { _localProcesses = Map. empty
199
- , _localPidCounter = firstNonReservedProcessId
200
- , _localPidUnique = unq
201
- , _localConnections = Map. empty
202
- }
203
- ctrlChan <- newChan
204
- let node = LocalNode { localNodeId = NodeId $ NT. address endPoint
205
- , localEndPoint = endPoint
206
- , localState = state
207
- , localCtrlChan = ctrlChan
208
- , localTracer = InactiveTracer
209
- , remoteTable = rtable
210
- }
211
- tracedNode <- startTracing node
212
- void . forkIO $ runNodeController tracedNode
213
- void . forkIO $ handleIncomingMessages tracedNode
214
- return tracedNode
202
+ unq <- randomIO
203
+ state <- newMVar LocalNodeState
204
+ { _localProcesses = Map. empty
205
+ , _localPidCounter = firstNonReservedProcessId
206
+ , _localPidUnique = unq
207
+ , _localConnections = Map. empty
208
+ }
209
+ ctrlChan <- newChan
210
+ let node = LocalNode { localNodeId = NodeId $ NT. address endPoint
211
+ , localEndPoint = endPoint
212
+ , localState = state
213
+ , localCtrlChan = ctrlChan
214
+ , localTracer = InactiveTracer
215
+ , remoteTable = rtable
216
+ }
217
+ tracedNode <- startTracing node
218
+
219
+ ncGo <- newEmptyMVar
220
+ ncTid <- forkIO $ do
221
+ tid <- takeMVar ncGo
222
+ (runNodeController tracedNode
223
+ `Exception.catch` \ (e :: SomeException ) -> throwTo tid e)
224
+
225
+ evTid <- forkIO $ do
226
+ (handleIncomingMessages tracedNode >> (stopNC node))
227
+ `Exception.catch` \ (e :: SomeException ) -> throwTo ncTid e
228
+
229
+ putMVar ncGo evTid
230
+ return tracedNode
231
+ where
232
+ stopNC node =
233
+ writeChan (localCtrlChan node) NCMsg
234
+ { ctrlMsgSender = NodeIdentifier (localNodeId node)
235
+ , ctrlMsgSignal = SigShutdown
236
+ }
215
237
216
238
-- | Start and register the service processes on a node
217
239
-- (for now, this is only the logger)
@@ -583,6 +605,8 @@ nodeController = do
583
605
ncEffectExit from to reason
584
606
NCMsg (ProcessIdentifier from) (GetInfo pid) ->
585
607
ncEffectGetInfo from pid
608
+ NCMsg _ SigShutdown ->
609
+ liftIO $ throwIO ThreadKilled -- seems to make more sense than fail/error
586
610
unexpected ->
587
611
error $ " nodeController: unexpected message " ++ show unexpected
588
612
@@ -902,6 +926,7 @@ destNid (Died _ _) = Nothing
902
926
destNid (Kill pid _) = Just $ processNodeId pid
903
927
destNid (Exit pid _) = Just $ processNodeId pid
904
928
destNid (GetInfo pid) = Just $ processNodeId pid
929
+ destNid (SigShutdown ) = Nothing
905
930
906
931
-- | Check if a process is local to our own node
907
932
isLocal :: LocalNode -> Identifier -> Bool
0 commit comments