@@ -25,6 +25,7 @@ import System.IO (fixIO, hPutStrLn, stderr)
25
25
import System.Mem.Weak (Weak , deRefWeak )
26
26
import qualified Data.ByteString.Lazy as BSL (fromChunks )
27
27
import Data.Binary (decode )
28
+ import Data.Function (fix )
28
29
import Data.Map (Map )
29
30
import qualified Data.Map as Map
30
31
( empty
@@ -78,6 +79,8 @@ import qualified Control.Exception as Exception
78
79
, catch
79
80
, catches
80
81
, finally
82
+ , bracket
83
+ , mask_
81
84
)
82
85
import Control.Concurrent (forkIO , killThread )
83
86
import Control.Distributed.Process.Internal.BiMultiMap (BiMultiMap )
@@ -98,7 +101,11 @@ import Control.Distributed.Process.Internal.ThreadPool
98
101
, ThreadPool
99
102
)
100
103
import Control.Concurrent.Chan (newChan , writeChan , readChan )
101
- import qualified Control.Concurrent.MVar as MVar (newEmptyMVar , takeMVar )
104
+ import qualified Control.Concurrent.MVar as MVar
105
+ ( newEmptyMVar
106
+ , takeMVar
107
+ , putMVar
108
+ )
102
109
import Control.Concurrent.STM
103
110
( atomically
104
111
)
@@ -230,6 +237,7 @@ import Control.Monad.Catch (try)
230
237
import GHC.IO (IO (.. ), unsafeUnmask )
231
238
import GHC.Base ( maskAsyncExceptions # )
232
239
240
+ import System.Timeout
233
241
import Unsafe.Coerce
234
242
import Prelude
235
243
@@ -520,24 +528,78 @@ incomingFrom addr = aux >>> DAC.mapDefault Set.empty addr
520
528
where
521
529
aux = accessor _incomingFrom (\ fr st -> st { _incomingFrom = fr })
522
530
531
+ -- | And 'EventMonitor' monitors the handling of events reporting when an event
532
+ -- is not handled promptly. It monitors events which are handled in sequence.
533
+ data EventMonitor a = EventMonitor
534
+ { -- | Notifies that an event was received. Takes the event as argument.
535
+ eventMonitorGotEvent :: a -> IO ()
536
+ -- | Notifies that the last received event was handled.
537
+ --
538
+ -- Might be called even when there is no event, in which case it will have
539
+ -- no effect.
540
+ , eventMonitorHandledEvent :: IO ()
541
+ -- | Stops the event monitor.
542
+ , eventMonitorStop :: IO ()
543
+ }
544
+
545
+ -- | `eventMon <- monitorThread t action`
546
+ --
547
+ -- Creates an event monitor that executes `action` when an event hasn't been
548
+ -- handled withing `t` microseconds. `action` takes as argument the event that
549
+ -- wasn't handled timely.
550
+ --
551
+ monitorThread :: Int -> (a -> IO () ) -> IO (EventMonitor a )
552
+ monitorThread tmicrosecs action = do
553
+ mv <- MVar. newEmptyMVar
554
+ tid <- forkIO $ forever' $ do
555
+ -- loop until some event is reported
556
+ ev <- fix $ \ loop -> MVar. takeMVar mv >>= maybe loop return
557
+ -- wait until the event is handled
558
+ Exception. mask_ (timeout tmicrosecs $ MVar. takeMVar mv) >>=
559
+ maybe (do action ev -- report the unhandled event
560
+ -- wait for the slow event handler
561
+ void $ MVar. takeMVar mv
562
+ )
563
+ (const $ return () )
564
+ return EventMonitor
565
+ { eventMonitorGotEvent = MVar. putMVar mv . Just
566
+ , eventMonitorHandledEvent = MVar. putMVar mv Nothing
567
+ , eventMonitorStop = killThread tid
568
+ }
569
+
523
570
handleIncomingMessages :: LocalNode -> IO ()
524
- handleIncomingMessages node = go initConnectionState
525
- `Exception.catch` \ (NodeClosedException _) -> return ()
571
+ handleIncomingMessages node =
572
+ Exception. bracket
573
+ (monitorThread 1000000 $ \ ev ->
574
+ hPutStrLn stderr $
575
+ " handleIncomingMessages blocked " ++ show (localNodeId node) ++
576
+ " on " ++ show ev
577
+ )
578
+ eventMonitorStop $ \ eventMon ->
579
+ (go eventMon initConnectionState
580
+ `Exception.catch` \ (NodeClosedException _) -> return ()
581
+ ) `Exception.catch` \ e -> do
582
+ hPutStrLn stderr $
583
+ " handleIncomingMessages failed: " ++
584
+ show (localNodeId node, e :: SomeException )
585
+ throwIO e
526
586
where
527
- go :: ConnectionState -> IO ()
528
- go ! st = do
587
+ go :: EventMonitor NT. Event -> ConnectionState -> IO ()
588
+ go eventMon ! st = do
589
+ eventMonitorHandledEvent eventMon
529
590
event <- NT. receive endpoint
591
+ eventMonitorGotEvent eventMon event
530
592
case event of
531
593
NT. ConnectionOpened cid rel theirAddr ->
532
594
if rel == NT. ReliableOrdered
533
595
then
534
596
trace node (MxConnected cid theirAddr)
535
- >> go (
597
+ >> go eventMon (
536
598
(incomingAt cid ^= Just (theirAddr, Uninit ))
537
599
. (incomingFrom theirAddr ^: Set. insert cid)
538
600
$ st
539
601
)
540
- else invalidRequest cid st $
602
+ else invalidRequest eventMon cid st $
541
603
" attempt to connect with unsupported reliability " ++ show rel
542
604
NT. Received cid payload ->
543
605
case st ^. incomingAt cid of
@@ -549,31 +611,31 @@ handleIncomingMessages node = go initConnectionState
549
611
let msg = payloadToMessage payload
550
612
enqueue queue msg -- 'enqueue' is strict
551
613
trace node (MxReceived pid msg)
552
- go st
614
+ go eventMon st
553
615
Just (_, ToChan (TypedChannel chan')) -> do
554
616
mChan <- deRefWeak chan'
555
617
-- If mChan is Nothing, the process has given up the read end of
556
618
-- the channel and we simply ignore the incoming message
557
619
forM_ mChan $ \ chan -> atomically $
558
620
-- We make sure the message is fully decoded when it is enqueued
559
621
writeTQueue chan $! decode (BSL. fromChunks payload)
560
- go st
622
+ go eventMon st
561
623
Just (_, ToNode ) -> do
562
624
let ctrlMsg = decode . BSL. fromChunks $ payload
563
625
writeChan ctrlChan $! ctrlMsg
564
- go st
626
+ go eventMon st
565
627
Just (src, Uninit ) ->
566
628
case decode (BSL. fromChunks payload) of
567
629
ProcessIdentifier pid -> do
568
630
let lpid = processLocalId pid
569
631
mProc <- withValidLocalState node $ return . (^. localProcessWithId lpid)
570
632
case mProc of
571
633
Just proc ->
572
- go (incomingAt cid ^= Just (src, ToProc pid (processWeakQ proc )) $ st)
634
+ go eventMon (incomingAt cid ^= Just (src, ToProc pid (processWeakQ proc )) $ st)
573
635
Nothing ->
574
636
-- incoming attempt to connect to unknown process - might
575
637
-- be dead already
576
- go (incomingAt cid ^= Nothing $ st)
638
+ go eventMon (incomingAt cid ^= Nothing $ st)
577
639
SendPortIdentifier chId -> do
578
640
let lcid = sendPortLocalId chId
579
641
lpid = processLocalId (sendPortProcessId chId)
@@ -583,33 +645,34 @@ handleIncomingMessages node = go initConnectionState
583
645
mChannel <- withMVar (processState proc ) $ return . (^. typedChannelWithId lcid)
584
646
case mChannel of
585
647
Just channel ->
586
- go (incomingAt cid ^= Just (src, ToChan channel) $ st)
648
+ go eventMon
649
+ (incomingAt cid ^= Just (src, ToChan channel) $ st)
587
650
Nothing ->
588
- invalidRequest cid st $
651
+ invalidRequest eventMon cid st $
589
652
" incoming attempt to connect to unknown channel of"
590
653
++ " process " ++ show (sendPortProcessId chId)
591
654
Nothing ->
592
655
-- incoming attempt to connect to channel of unknown
593
656
-- process - might be dead already
594
- go (incomingAt cid ^= Nothing $ st)
657
+ go eventMon (incomingAt cid ^= Nothing $ st)
595
658
NodeIdentifier nid ->
596
659
if nid == localNodeId node
597
- then go (incomingAt cid ^= Just (src, ToNode ) $ st)
598
- else invalidRequest cid st $
660
+ then go eventMon (incomingAt cid ^= Just (src, ToNode ) $ st)
661
+ else invalidRequest eventMon cid st $
599
662
" incoming attempt to connect to a different node -"
600
663
++ " I'm " ++ show (localNodeId node)
601
664
++ " but the remote peer wants to connect to "
602
665
++ show nid
603
666
Nothing ->
604
- invalidRequest cid st
667
+ invalidRequest eventMon cid st
605
668
" message received from an unknown connection"
606
669
NT. ConnectionClosed cid ->
607
670
case st ^. incomingAt cid of
608
671
Nothing ->
609
- invalidRequest cid st " closed unknown connection"
672
+ invalidRequest eventMon cid st " closed unknown connection"
610
673
Just (src, _) -> do
611
674
trace node (MxDisconnected cid src)
612
- go ( (incomingAt cid ^= Nothing )
675
+ go eventMon ( (incomingAt cid ^= Nothing )
613
676
. (incomingFrom src ^: Set. delete cid)
614
677
$ st
615
678
)
@@ -624,7 +687,7 @@ handleIncomingMessages node = go initConnectionState
624
687
liftIO $ lookupWorker (localSendPool node) (NodeId theirAddr)
625
688
>>= maybe (return () ) killThread
626
689
closeImplicitReconnections node nid
627
- go ( (incomingFrom theirAddr ^= Set. empty)
690
+ go eventMon ( (incomingFrom theirAddr ^= Set. empty)
628
691
. (incoming ^: Map. filterWithKey (const . notLost))
629
692
$ st
630
693
)
@@ -639,8 +702,10 @@ handleIncomingMessages node = go initConnectionState
639
702
-- and we just give up
640
703
fail " Cloud Haskell fatal error: received unexpected multicast"
641
704
642
- invalidRequest :: NT. ConnectionId -> ConnectionState -> String -> IO ()
643
- invalidRequest cid st msg = do
705
+ invalidRequest :: EventMonitor NT. Event
706
+ -> NT. ConnectionId
707
+ -> ConnectionState -> String -> IO ()
708
+ invalidRequest eventMon cid st msg = do
644
709
-- TODO: We should treat this as a fatal error on the part of the remote
645
710
-- node. That is, we should report the remote node as having died, and we
646
711
-- should close incoming connections (this requires a Transport layer
@@ -649,7 +714,7 @@ handleIncomingMessages node = go initConnectionState
649
714
++ " (" ++ msg ++ " ): "
650
715
, (Trace cid)
651
716
]
652
- go ( incomingAt cid ^= Nothing
717
+ go eventMon ( incomingAt cid ^= Nothing
653
718
$ st
654
719
)
655
720
@@ -661,9 +726,21 @@ handleIncomingMessages node = go initConnectionState
661
726
--------------------------------------------------------------------------------
662
727
663
728
runNodeController :: LocalNode -> IO ()
664
- runNodeController node =
665
- runReaderT (evalStateT (unNC nodeController) initNCState) node
666
- `Exception.catch` \ (NodeClosedException _) -> return ()
729
+ runNodeController node = do
730
+ Exception. bracket
731
+ (monitorThread 1000000 $ \ ev ->
732
+ hPutStrLn stderr $
733
+ " nodeController blocked " ++ show (localNodeId node) ++
734
+ " on " ++ show ev
735
+ )
736
+ eventMonitorStop $ \ eventMon ->
737
+ (runReaderT (evalStateT (unNC (nodeController eventMon)) initNCState) node
738
+ `Exception.catch` \ (NodeClosedException _) -> return ()
739
+ ) `Exception.catch` \ e -> do
740
+ hPutStrLn stderr $
741
+ " nodeController failed: " ++
742
+ show (localNodeId node, e :: SomeException )
743
+ throwIO e
667
744
668
745
--------------------------------------------------------------------------------
669
746
-- Internal data types --
@@ -772,11 +849,13 @@ withLocalTracer node act = act (localEventBus node)
772
849
--------------------------------------------------------------------------------
773
850
774
851
-- [Unified: Table 7]
775
- nodeController :: NC ()
776
- nodeController = do
852
+ nodeController :: EventMonitor NCMsg -> NC ()
853
+ nodeController eventMon = do
777
854
node <- ask
778
855
forever' $ do
856
+ liftIO $ eventMonitorHandledEvent eventMon
779
857
msg <- liftIO $ readChan (localCtrlChan node)
858
+ liftIO $ eventMonitorGotEvent eventMon msg
780
859
781
860
-- [Unified: Table 7, rule nc_forward]
782
861
case destNid (ctrlMsgSignal msg) of
0 commit comments