Skip to content

Commit 2ffa2bc

Browse files
committed
Allow Management Agents to utilise their mailbox
Previously, management agents only processed messages delivered over a broadcast TChan, duplicated from the primary management event bus. To facilitate sending messages to a specific agent (and not the entire event processing subsystem) and reduce load on other agents (and the event bus itself), we perform a non-blocking read on the agent’s own mailbox whenever its input channel is empty.
1 parent 83a2e8c commit 2ffa2bc

File tree

1 file changed

+16
-2
lines changed

1 file changed

+16
-2
lines changed

src/Control/Distributed/Process/Management.hs

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,16 +69,19 @@ module Control.Distributed.Process.Management
6969
import Control.Applicative ((<$>))
7070
import Control.Concurrent.STM (atomically)
7171
import Control.Concurrent.STM.TChan
72-
( readTChan
72+
( tryReadTChan
7373
, writeTChan
7474
)
7575
import Control.Distributed.Process.Internal.Primitives
7676
( newChan
7777
, nsend
7878
, receiveWait
79+
, receiveTimeout
7980
, matchChan
81+
, matchAny
8082
, unwrapMessage
8183
, onException
84+
, register
8285
)
8386
import Control.Distributed.Process.Internal.Types
8487
( Process
@@ -222,6 +225,7 @@ mxAgentWithFinalize :: MxAgentId
222225
mxAgentWithFinalize mxId initState handlers dtor = do
223226
node <- processNode <$> ask
224227
pid <- liftIO $ mxNew (localEventBus node) $ start
228+
register (agentId mxId) pid
225229
return pid
226230
where
227231
start (sendTChan, recvTChan) = do
@@ -238,13 +242,23 @@ mxAgentWithFinalize mxId initState handlers dtor = do
238242
runAgentWithFinalizer eh hs c s `onException` runAgentFinalizer eh s
239243

240244
runAgentWithFinalizer eh' hs' c' s' = do
241-
msg <- (liftIO $ atomically $ readTChan c')
245+
msg <- getNextMessage c'
242246
(action, state) <- runPipeline msg s' $ pipeline hs'
243247
case action of
244248
MxAgentReady -> runAgent eh' hs' c' state
245249
MxAgentDeactivate _ -> runAgentFinalizer eh' state
246250
-- MxAgentBecome h' -> runAgent h' c state
247251

252+
getNextMessage tch = do
253+
inputs <- liftIO $ atomically $ tryReadTChan tch
254+
case inputs of
255+
Nothing -> do
256+
m <- receiveTimeout 0 [ matchAny return ]
257+
case m of
258+
Nothing -> getNextMessage tch
259+
Just msg -> return msg
260+
Just m -> return m
261+
248262
runAgentFinalizer :: MxAgent s () -> MxAgentState s -> Process ()
249263
runAgentFinalizer f s = ST.runStateT (unAgent f) s >>= return . fst
250264

0 commit comments

Comments
 (0)