Skip to content

Commit 218f762

Browse files
committed
Initial stab at _not_ chewing CPU cycles
1 parent 283c287 commit 218f762

File tree

1 file changed

+44
-18
lines changed

1 file changed

+44
-18
lines changed

src/Control/Distributed/Process/Management.hs

Lines changed: 44 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -289,6 +289,7 @@ import Control.Distributed.Process.Internal.Primitives
289289
, receiveTimeout
290290
, matchChan
291291
, matchAny
292+
, matchSTM
292293
, unwrapMessage
293294
, onException
294295
, register
@@ -505,8 +506,8 @@ mxAgentWithFinalize mxId initState handlers dtor = do
505506
-> TChan Message
506507
-> MxAgentState s
507508
-> Process ()
508-
runAgent eh hs r c s =
509-
runAgentWithFinalizer eh hs r c s
509+
runAgent eh hs cs c s =
510+
runAgentWithFinalizer eh hs cs c s
510511
`onException` runAgentFinalizer eh s
511512

512513
runAgentWithFinalizer :: MxAgent s ()
@@ -516,23 +517,33 @@ mxAgentWithFinalize mxId initState handlers dtor = do
516517
-> MxAgentState s
517518
-> Process ()
518519
runAgentWithFinalizer eh' hs' cs' c' s' = do
519-
(msg, selector') <- getNextInput cs' c'
520+
msg <- getNextInput cs' c'
520521
(action, state) <- runPipeline msg s' $ pipeline hs'
521522
case action of
522-
MxAgentReady -> runAgent eh' hs' selector' c' state
523+
MxAgentReady -> runAgent eh' hs' InputChan c' state
523524
MxAgentPrioritise priority -> runAgent eh' hs' priority c' state
524525
MxAgentDeactivate _ -> runAgentFinalizer eh' state
525526
MxAgentSkip -> error "IllegalState"
526527
-- MxAgentBecome h' -> runAgent h' c state
527528

528-
getNextInput sel chan = getNextInput' sel chan (10 :: Int)
529+
getNextInput sel chan =
530+
let stmRead = atomically . readTChan
531+
matches =
532+
case sel of
533+
Mailbox -> [ matchAny return
534+
, matchSTM (readTChan chan) return]
535+
InputChan -> [ matchSTM (readTChan chan) return
536+
, matchAny return]
537+
in getNextInput' matches 100
529538

530539
-- when reading inputs, we generally want to maintain a degree of
531540
-- fairness in choosing between the TChan and our mailbox, but to
532541
-- ultimately favour the TChan overall. We do this by flipping
533542
-- between the two (using the ChannelSelector) each time we call
534543
-- getNextInput - it returns the opposite ChannelSelector to the
535544
-- one which succeeded last time it was called.
545+
546+
-----------------------------------------------------------------------
536547
--
537548
-- This strategy works well, yet we wish to avoid blocking on one
538549
-- input if the other is empty/busy, so we begin by reading both
@@ -543,6 +554,8 @@ mxAgentWithFinalize mxId initState handlers dtor = do
543554
-- system resoures. Instead, we switch ten times, after which (if no
544555
-- data were obtained) we block on the event bus, since that is our
545556
-- main priority.
557+
-----------------------------------------------------------------------
558+
546559
--
547560
-- An agent can of course, choose to override which source should be
548561
-- checked first. We consider this a /hint/ rather than a dictat.
@@ -553,19 +566,32 @@ mxAgentWithFinalize mxId initState handlers dtor = do
553566
-- in which case we needn't worry too much about the overheads
554567
-- described thus far.
555568

556-
getNextInput' InputChan c' 0 = do
557-
m <- liftIO $ atomically $ readTChan c'
558-
return (m, Mailbox)
559-
getNextInput' InputChan c' n = do
560-
inputs <- liftIO $ atomically $ tryReadTChan c'
561-
case inputs of
562-
Nothing -> getNextInput' Mailbox c' (n - 1)
563-
Just m -> return (m, Mailbox)
564-
getNextInput' Mailbox c' n = do
565-
m <- receiveTimeout 0 [ matchAny return ]
566-
case m of
567-
Nothing -> getNextInput' InputChan c' (n - 1)
568-
Just msg -> return (msg, InputChan)
569+
-- TODO: use mersenne-random / (System.Random.MWC) to get a
570+
-- uniform distribution of values between 10..2000
571+
572+
getNextInput' ms n = do
573+
mIn <- receiveTimeout n ms
574+
case mIn of
575+
Nothing -> tryNextInput ms n
576+
Just msg -> return msg
577+
578+
tryNextInput ms n
579+
| n < 2000 = getNextInput' ms $ n * 2
580+
| otherwise = getNextInput' ms 100
581+
582+
-- getNextInput' c' 0 = do
583+
-- m <- liftIO $ atomically $ readTChan c'
584+
-- return (m, Mailbox)
585+
-- getNextInput' InputChan c' n = do
586+
-- inputs <- liftIO $ atomically $ tryReadTChan c'
587+
-- case inputs of
588+
-- Nothing -> getNextInput' Mailbox c' (n - 1)
589+
-- Just m -> return (m, Mailbox)
590+
-- getNextInput' Mailbox c' n = do
591+
-- m <- receiveTimeout 0 [ matchAny return ]
592+
-- case m of
593+
-- Nothing -> getNextInput' InputChan c' (n - 1)
594+
-- Just msg -> return (msg, InputChan)
569595

570596
runAgentFinalizer :: MxAgent s () -> MxAgentState s -> Process ()
571597
runAgentFinalizer f s = ST.runStateT (unAgent f) s >>= return . fst

0 commit comments

Comments
 (0)