Skip to content

Commit fd46c61

Browse files
Monitor the NC and handleIncomingMessages.
1 parent 56e5305 commit fd46c61

File tree

1 file changed

+108
-29
lines changed
  • src/Control/Distributed/Process

1 file changed

+108
-29
lines changed

src/Control/Distributed/Process/Node.hs

Lines changed: 108 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import System.IO (fixIO, hPutStrLn, stderr)
2525
import System.Mem.Weak (Weak, deRefWeak)
2626
import qualified Data.ByteString.Lazy as BSL (fromChunks)
2727
import Data.Binary (decode)
28+
import Data.Function (fix)
2829
import Data.Map (Map)
2930
import qualified Data.Map as Map
3031
( empty
@@ -78,6 +79,8 @@ import qualified Control.Exception as Exception
7879
, catch
7980
, catches
8081
, finally
82+
, bracket
83+
, mask_
8184
)
8285
import Control.Concurrent (forkIO, killThread)
8386
import Control.Distributed.Process.Internal.BiMultiMap (BiMultiMap)
@@ -98,7 +101,11 @@ import Control.Distributed.Process.Internal.ThreadPool
98101
, ThreadPool
99102
)
100103
import Control.Concurrent.Chan (newChan, writeChan, readChan)
101-
import qualified Control.Concurrent.MVar as MVar (newEmptyMVar, takeMVar)
104+
import qualified Control.Concurrent.MVar as MVar
105+
( newEmptyMVar
106+
, takeMVar
107+
, putMVar
108+
)
102109
import Control.Concurrent.STM
103110
( atomically
104111
)
@@ -230,6 +237,7 @@ import Control.Monad.Catch (try)
230237
import GHC.IO (IO(..), unsafeUnmask)
231238
import GHC.Base ( maskAsyncExceptions# )
232239

240+
import System.Timeout
233241
import Unsafe.Coerce
234242
import Prelude
235243

@@ -520,24 +528,78 @@ incomingFrom addr = aux >>> DAC.mapDefault Set.empty addr
520528
where
521529
aux = accessor _incomingFrom (\fr st -> st { _incomingFrom = fr })
522530

531+
-- | And 'EventMonitor' monitors the handling of events reporting when an event
532+
-- is not handled promptly. It monitors events which are handled in sequence.
533+
data EventMonitor a = EventMonitor
534+
{ -- | Notifies that an event was received. Takes the event as argument.
535+
eventMonitorGotEvent :: a -> IO ()
536+
-- | Notifies that the last received event was handled.
537+
--
538+
-- Might be called even when there is no event, in which case it will have
539+
-- no effect.
540+
, eventMonitorHandledEvent :: IO ()
541+
-- | Stops the event monitor.
542+
, eventMonitorStop :: IO ()
543+
}
544+
545+
-- | `eventMon <- monitorThread t action`
546+
--
547+
-- Creates an event monitor that executes `action` when an event hasn't been
548+
-- handled withing `t` microseconds. `action` takes as argument the event that
549+
-- wasn't handled timely.
550+
--
551+
monitorThread :: Int -> (a -> IO ()) -> IO (EventMonitor a)
552+
monitorThread tmicrosecs action = do
553+
mv <- MVar.newEmptyMVar
554+
tid <- forkIO $ forever' $ do
555+
-- loop until some event is reported
556+
ev <- fix $ \loop -> MVar.takeMVar mv >>= maybe loop return
557+
-- wait until the event is handled
558+
Exception.mask_ (timeout tmicrosecs $ MVar.takeMVar mv) >>=
559+
maybe (do action ev -- report the unhandled event
560+
-- wait for the slow event handler
561+
void $ MVar.takeMVar mv
562+
)
563+
(const $ return ())
564+
return EventMonitor
565+
{ eventMonitorGotEvent = MVar.putMVar mv . Just
566+
, eventMonitorHandledEvent = MVar.putMVar mv Nothing
567+
, eventMonitorStop = killThread tid
568+
}
569+
523570
handleIncomingMessages :: LocalNode -> IO ()
524-
handleIncomingMessages node = go initConnectionState
525-
`Exception.catch` \(NodeClosedException _) -> return ()
571+
handleIncomingMessages node =
572+
Exception.bracket
573+
(monitorThread 1000000 $ \ev ->
574+
hPutStrLn stderr $
575+
"handleIncomingMessages blocked " ++ show (localNodeId node) ++
576+
" on " ++ show ev
577+
)
578+
eventMonitorStop $ \eventMon ->
579+
(go eventMon initConnectionState
580+
`Exception.catch` \(NodeClosedException _) -> return ()
581+
) `Exception.catch` \e -> do
582+
hPutStrLn stderr $
583+
"handleIncomingMessages failed: " ++
584+
show (localNodeId node, e :: SomeException)
585+
throwIO e
526586
where
527-
go :: ConnectionState -> IO ()
528-
go !st = do
587+
go :: EventMonitor NT.Event -> ConnectionState -> IO ()
588+
go eventMon !st = do
589+
eventMonitorHandledEvent eventMon
529590
event <- NT.receive endpoint
591+
eventMonitorGotEvent eventMon event
530592
case event of
531593
NT.ConnectionOpened cid rel theirAddr ->
532594
if rel == NT.ReliableOrdered
533595
then
534596
trace node (MxConnected cid theirAddr)
535-
>> go (
597+
>> go eventMon (
536598
(incomingAt cid ^= Just (theirAddr, Uninit))
537599
. (incomingFrom theirAddr ^: Set.insert cid)
538600
$ st
539601
)
540-
else invalidRequest cid st $
602+
else invalidRequest eventMon cid st $
541603
"attempt to connect with unsupported reliability " ++ show rel
542604
NT.Received cid payload ->
543605
case st ^. incomingAt cid of
@@ -549,31 +611,31 @@ handleIncomingMessages node = go initConnectionState
549611
let msg = payloadToMessage payload
550612
enqueue queue msg -- 'enqueue' is strict
551613
trace node (MxReceived pid msg)
552-
go st
614+
go eventMon st
553615
Just (_, ToChan (TypedChannel chan')) -> do
554616
mChan <- deRefWeak chan'
555617
-- If mChan is Nothing, the process has given up the read end of
556618
-- the channel and we simply ignore the incoming message
557619
forM_ mChan $ \chan -> atomically $
558620
-- We make sure the message is fully decoded when it is enqueued
559621
writeTQueue chan $! decode (BSL.fromChunks payload)
560-
go st
622+
go eventMon st
561623
Just (_, ToNode) -> do
562624
let ctrlMsg = decode . BSL.fromChunks $ payload
563625
writeChan ctrlChan $! ctrlMsg
564-
go st
626+
go eventMon st
565627
Just (src, Uninit) ->
566628
case decode (BSL.fromChunks payload) of
567629
ProcessIdentifier pid -> do
568630
let lpid = processLocalId pid
569631
mProc <- withValidLocalState node $ return . (^. localProcessWithId lpid)
570632
case mProc of
571633
Just proc ->
572-
go (incomingAt cid ^= Just (src, ToProc pid (processWeakQ proc)) $ st)
634+
go eventMon (incomingAt cid ^= Just (src, ToProc pid (processWeakQ proc)) $ st)
573635
Nothing ->
574636
-- incoming attempt to connect to unknown process - might
575637
-- be dead already
576-
go (incomingAt cid ^= Nothing $ st)
638+
go eventMon (incomingAt cid ^= Nothing $ st)
577639
SendPortIdentifier chId -> do
578640
let lcid = sendPortLocalId chId
579641
lpid = processLocalId (sendPortProcessId chId)
@@ -583,33 +645,34 @@ handleIncomingMessages node = go initConnectionState
583645
mChannel <- withMVar (processState proc) $ return . (^. typedChannelWithId lcid)
584646
case mChannel of
585647
Just channel ->
586-
go (incomingAt cid ^= Just (src, ToChan channel) $ st)
648+
go eventMon
649+
(incomingAt cid ^= Just (src, ToChan channel) $ st)
587650
Nothing ->
588-
invalidRequest cid st $
651+
invalidRequest eventMon cid st $
589652
"incoming attempt to connect to unknown channel of"
590653
++ " process " ++ show (sendPortProcessId chId)
591654
Nothing ->
592655
-- incoming attempt to connect to channel of unknown
593656
-- process - might be dead already
594-
go (incomingAt cid ^= Nothing $ st)
657+
go eventMon (incomingAt cid ^= Nothing $ st)
595658
NodeIdentifier nid ->
596659
if nid == localNodeId node
597-
then go (incomingAt cid ^= Just (src, ToNode) $ st)
598-
else invalidRequest cid st $
660+
then go eventMon (incomingAt cid ^= Just (src, ToNode) $ st)
661+
else invalidRequest eventMon cid st $
599662
"incoming attempt to connect to a different node -"
600663
++ " I'm " ++ show (localNodeId node)
601664
++ " but the remote peer wants to connect to "
602665
++ show nid
603666
Nothing ->
604-
invalidRequest cid st
667+
invalidRequest eventMon cid st
605668
"message received from an unknown connection"
606669
NT.ConnectionClosed cid ->
607670
case st ^. incomingAt cid of
608671
Nothing ->
609-
invalidRequest cid st "closed unknown connection"
672+
invalidRequest eventMon cid st "closed unknown connection"
610673
Just (src, _) -> do
611674
trace node (MxDisconnected cid src)
612-
go ( (incomingAt cid ^= Nothing)
675+
go eventMon ( (incomingAt cid ^= Nothing)
613676
. (incomingFrom src ^: Set.delete cid)
614677
$ st
615678
)
@@ -624,7 +687,7 @@ handleIncomingMessages node = go initConnectionState
624687
liftIO $ lookupWorker (localSendPool node) (NodeId theirAddr)
625688
>>= maybe (return ()) killThread
626689
closeImplicitReconnections node nid
627-
go ( (incomingFrom theirAddr ^= Set.empty)
690+
go eventMon ( (incomingFrom theirAddr ^= Set.empty)
628691
. (incoming ^: Map.filterWithKey (const . notLost))
629692
$ st
630693
)
@@ -639,8 +702,10 @@ handleIncomingMessages node = go initConnectionState
639702
-- and we just give up
640703
fail "Cloud Haskell fatal error: received unexpected multicast"
641704

642-
invalidRequest :: NT.ConnectionId -> ConnectionState -> String -> IO ()
643-
invalidRequest cid st msg = do
705+
invalidRequest :: EventMonitor NT.Event
706+
-> NT.ConnectionId
707+
-> ConnectionState -> String -> IO ()
708+
invalidRequest eventMon cid st msg = do
644709
-- TODO: We should treat this as a fatal error on the part of the remote
645710
-- node. That is, we should report the remote node as having died, and we
646711
-- should close incoming connections (this requires a Transport layer
@@ -649,7 +714,7 @@ handleIncomingMessages node = go initConnectionState
649714
++ " (" ++ msg ++ "): "
650715
, (Trace cid)
651716
]
652-
go ( incomingAt cid ^= Nothing
717+
go eventMon ( incomingAt cid ^= Nothing
653718
$ st
654719
)
655720

@@ -661,9 +726,21 @@ handleIncomingMessages node = go initConnectionState
661726
--------------------------------------------------------------------------------
662727

663728
runNodeController :: LocalNode -> IO ()
664-
runNodeController node =
665-
runReaderT (evalStateT (unNC nodeController) initNCState) node
666-
`Exception.catch` \(NodeClosedException _) -> return ()
729+
runNodeController node = do
730+
Exception.bracket
731+
(monitorThread 1000000 $ \ev ->
732+
hPutStrLn stderr $
733+
"nodeController blocked " ++ show (localNodeId node) ++
734+
" on " ++ show ev
735+
)
736+
eventMonitorStop $ \eventMon ->
737+
(runReaderT (evalStateT (unNC (nodeController eventMon)) initNCState) node
738+
`Exception.catch` \(NodeClosedException _) -> return ()
739+
) `Exception.catch` \e -> do
740+
hPutStrLn stderr $
741+
"nodeController failed: " ++
742+
show (localNodeId node, e :: SomeException)
743+
throwIO e
667744

668745
--------------------------------------------------------------------------------
669746
-- Internal data types --
@@ -772,11 +849,13 @@ withLocalTracer node act = act (localEventBus node)
772849
--------------------------------------------------------------------------------
773850

774851
-- [Unified: Table 7]
775-
nodeController :: NC ()
776-
nodeController = do
852+
nodeController :: EventMonitor NCMsg -> NC ()
853+
nodeController eventMon = do
777854
node <- ask
778855
forever' $ do
856+
liftIO $ eventMonitorHandledEvent eventMon
779857
msg <- liftIO $ readChan (localCtrlChan node)
858+
liftIO $ eventMonitorGotEvent eventMon msg
780859

781860
-- [Unified: Table 7, rule nc_forward]
782861
case destNid (ctrlMsgSignal msg) of

0 commit comments

Comments
 (0)