@@ -289,6 +289,7 @@ import Control.Distributed.Process.Internal.Primitives
289
289
, receiveTimeout
290
290
, matchChan
291
291
, matchAny
292
+ , matchSTM
292
293
, unwrapMessage
293
294
, onException
294
295
, register
@@ -505,8 +506,8 @@ mxAgentWithFinalize mxId initState handlers dtor = do
505
506
-> TChan Message
506
507
-> MxAgentState s
507
508
-> Process ()
508
- runAgent eh hs r c s =
509
- runAgentWithFinalizer eh hs r c s
509
+ runAgent eh hs cs c s =
510
+ runAgentWithFinalizer eh hs cs c s
510
511
`onException` runAgentFinalizer eh s
511
512
512
513
runAgentWithFinalizer :: MxAgent s ()
@@ -516,23 +517,33 @@ mxAgentWithFinalize mxId initState handlers dtor = do
516
517
-> MxAgentState s
517
518
-> Process ()
518
519
runAgentWithFinalizer eh' hs' cs' c' s' = do
519
- ( msg, selector') <- getNextInput cs' c'
520
+ msg <- getNextInput cs' c'
520
521
(action, state) <- runPipeline msg s' $ pipeline hs'
521
522
case action of
522
- MxAgentReady -> runAgent eh' hs' selector' c' state
523
+ MxAgentReady -> runAgent eh' hs' InputChan c' state
523
524
MxAgentPrioritise priority -> runAgent eh' hs' priority c' state
524
525
MxAgentDeactivate _ -> runAgentFinalizer eh' state
525
526
MxAgentSkip -> error " IllegalState"
526
527
-- MxAgentBecome h' -> runAgent h' c state
527
528
528
- getNextInput sel chan = getNextInput' sel chan (10 :: Int )
529
+ getNextInput sel chan =
530
+ let stmRead = atomically . readTChan
531
+ matches =
532
+ case sel of
533
+ Mailbox -> [ matchAny return
534
+ , matchSTM (readTChan chan) return ]
535
+ InputChan -> [ matchSTM (readTChan chan) return
536
+ , matchAny return ]
537
+ in getNextInput' matches 100
529
538
530
539
-- when reading inputs, we generally want to maintain a degree of
531
540
-- fairness in choosing between the TChan and our mailbox, but to
532
541
-- ultimately favour the TChan overall. We do this by flipping
533
542
-- between the two (using the ChannelSelector) each time we call
534
543
-- getNextInput - it returns the opposite ChannelSelector to the
535
544
-- one which succeeded last time it was called.
545
+
546
+ -----------------------------------------------------------------------
536
547
--
537
548
-- This strategy works well, yet we wish to avoid blocking on one
538
549
-- input if the other is empty/busy, so we begin by reading both
@@ -543,6 +554,8 @@ mxAgentWithFinalize mxId initState handlers dtor = do
543
554
-- system resoures. Instead, we switch ten times, after which (if no
544
555
-- data were obtained) we block on the event bus, since that is our
545
556
-- main priority.
557
+ -----------------------------------------------------------------------
558
+
546
559
--
547
560
-- An agent can of course, choose to override which source should be
548
561
-- checked first. We consider this a /hint/ rather than a dictat.
@@ -553,19 +566,32 @@ mxAgentWithFinalize mxId initState handlers dtor = do
553
566
-- in which case we needn't worry too much about the overheads
554
567
-- described thus far.
555
568
556
- getNextInput' InputChan c' 0 = do
557
- m <- liftIO $ atomically $ readTChan c'
558
- return (m, Mailbox )
559
- getNextInput' InputChan c' n = do
560
- inputs <- liftIO $ atomically $ tryReadTChan c'
561
- case inputs of
562
- Nothing -> getNextInput' Mailbox c' (n - 1 )
563
- Just m -> return (m, Mailbox )
564
- getNextInput' Mailbox c' n = do
565
- m <- receiveTimeout 0 [ matchAny return ]
566
- case m of
567
- Nothing -> getNextInput' InputChan c' (n - 1 )
568
- Just msg -> return (msg, InputChan )
569
+ -- TODO: use mersenne-random / (System.Random.MWC) to get a
570
+ -- uniform distribution of values between 10..2000
571
+
572
+ getNextInput' ms n = do
573
+ mIn <- receiveTimeout n ms
574
+ case mIn of
575
+ Nothing -> tryNextInput ms n
576
+ Just msg -> return msg
577
+
578
+ tryNextInput ms n
579
+ | n < 2000 = getNextInput' ms $ n * 2
580
+ | otherwise = getNextInput' ms 100
581
+
582
+ -- getNextInput' c' 0 = do
583
+ -- m <- liftIO $ atomically $ readTChan c'
584
+ -- return (m, Mailbox)
585
+ -- getNextInput' InputChan c' n = do
586
+ -- inputs <- liftIO $ atomically $ tryReadTChan c'
587
+ -- case inputs of
588
+ -- Nothing -> getNextInput' Mailbox c' (n - 1)
589
+ -- Just m -> return (m, Mailbox)
590
+ -- getNextInput' Mailbox c' n = do
591
+ -- m <- receiveTimeout 0 [ matchAny return ]
592
+ -- case m of
593
+ -- Nothing -> getNextInput' InputChan c' (n - 1)
594
+ -- Just msg -> return (msg, InputChan)
569
595
570
596
runAgentFinalizer :: MxAgent s () -> MxAgentState s -> Process ()
571
597
runAgentFinalizer f s = ST. runStateT (unAgent f) s >>= return . fst
0 commit comments