Skip to content

Commit 2c9691b

Browse files
committed
Prevent registered names from leaking
1 parent 629a6fa commit 2c9691b

File tree

5 files changed

+252
-28
lines changed

5 files changed

+252
-28
lines changed

distributed-process-tests/src/Control/Distributed/Process/Tests/CH.hs

Lines changed: 111 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,11 @@ import Control.Distributed.Process.Internal.Types
3737
, createUnencodedMessage
3838
)
3939
import Control.Distributed.Process.Node
40+
import Control.Distributed.Process.Debug
41+
import Control.Distributed.Process.Management.Internal.Types
4042
import Control.Distributed.Process.Serializable (Serializable)
4143

42-
import Test.HUnit (Assertion, assertFailure)
44+
import Test.HUnit (Assertion, assertFailure, assertBool)
4345
import Test.Framework (Test, testGroup)
4446
import Test.Framework.Providers.HUnit (testCase)
4547
import Control.Rematch hiding (match)
@@ -1369,6 +1371,113 @@ testExitRemote TestTransport{..} = do
13691371
takeMVar supervisedDone
13701372
takeMVar supervisorDone
13711373

1374+
testRegistryMonitoring :: TestTransport -> Assertion
1375+
testRegistryMonitoring TestTransport{..} = do
1376+
localNode <- newLocalNode testTransport initRemoteTable
1377+
remoteNode <- newLocalNode testTransport initRemoteTable
1378+
return ()
1379+
1380+
-- Local process. Test if local process will be removed from
1381+
-- registry when it dies.
1382+
box <- newEmptyMVar
1383+
runProcess localNode $ do
1384+
pid <- spawnLocal $ do
1385+
expect
1386+
register "test" pid
1387+
tpid <- whereis "test"
1388+
if tpid == Just pid
1389+
then do _ <- monitor pid
1390+
send pid ()
1391+
ProcessMonitorNotification{} <- expect
1392+
tpid1 <- whereis "test"
1393+
liftIO $ putMVar box (Nothing == tpid1)
1394+
else liftIO $ putMVar box False
1395+
1396+
takeMVar box >>= assertBool "expected local process to not be registered"
1397+
return ()
1398+
1399+
-- Remote process. Test if remote process entry is removed
1400+
-- from registry when process dies.
1401+
remote1 <- testRemote remoteNode
1402+
runProcess localNode $
1403+
let waitpoll = do
1404+
w <- whereis "test" :: Process (Maybe ProcessId)
1405+
forM_ w (const waitpoll)
1406+
in do register "test" remote1
1407+
send remote1 ()
1408+
waitpoll
1409+
return ()
1410+
return ()
1411+
1412+
-- Many labels. Test if all labels associated with process
1413+
-- are removed from registry when it dies.
1414+
remote2 <- testRemote remoteNode
1415+
runProcess localNode $
1416+
let waitpoll = do
1417+
w1 <- whereis "test-3" :: Process (Maybe ProcessId)
1418+
w2 <- whereis "test-4" :: Process (Maybe ProcessId)
1419+
forM_ (w1 <|> w2) (const waitpoll)
1420+
in do register "test-3" remote2
1421+
register "test-4" remote2
1422+
send remote2 ()
1423+
waitpoll
1424+
return ()
1425+
1426+
{- XXX: waiting including patch for nsend for remote process
1427+
remote3 <- testRemote remoteNode
1428+
remote4 <- testRemote remoteNode
1429+
-- test many labels
1430+
runProcess localNode $ do
1431+
register "test-3" remote3
1432+
reregister "test-3" remote4
1433+
send remote3 ()
1434+
liftIO $ threadDelay 50000 -- XXX: racy
1435+
monitor remote4
1436+
nsend "test-3" ()
1437+
ProcessMonitorNotification{} <- expect
1438+
return ()
1439+
-}
1440+
1441+
-- Test registerRemoteAsync properties. Add a local process to
1442+
-- remote registry and checks that it is removed
1443+
-- when the process dies.
1444+
remote5 <- testRemote remoteNode
1445+
runProcess localNode $ do
1446+
registerRemoteAsync (localNodeId remoteNode) "test" remote5
1447+
receiveWait [
1448+
match (\(RegisterReply _ True _) -> return ())
1449+
] >>= send remote5
1450+
let waitpoll = do
1451+
whereisRemoteAsync (localNodeId remoteNode) "test"
1452+
receiveWait [
1453+
match (\(WhereIsReply _ mr) -> forM_ mr (const waitpoll))
1454+
]
1455+
waitpoll
1456+
1457+
-- Add remote process to remote registry and checks if
1458+
-- entry is removed then process is dead.
1459+
remote6 <- testRemote localNode
1460+
runProcess localNode $ do
1461+
registerRemoteAsync (localNodeId remoteNode) "test" remote6
1462+
receiveWait [
1463+
match (\(RegisterReply _ True _) -> return ())
1464+
] >>= send remote6
1465+
let waitpoll = do
1466+
whereisRemoteAsync (localNodeId remoteNode) "test"
1467+
receiveWait [
1468+
match (\(WhereIsReply _ mr) -> forM_ mr (const waitpoll))
1469+
]
1470+
waitpoll
1471+
where
1472+
testRemote node = do
1473+
-- test many labels
1474+
pidBox <- newEmptyMVar
1475+
forkProcess node $ do
1476+
us <- getSelfPid
1477+
liftIO $ putMVar pidBox us
1478+
expect :: Process ()
1479+
takeMVar pidBox
1480+
13721481
testUnsafeSend :: TestTransport -> Assertion
13731482
testUnsafeSend TestTransport{..} = do
13741483
serverAddr <- newEmptyMVar
@@ -1579,6 +1688,7 @@ tests testtrans = return [
15791688
, testCase "MaskRestoreScope" (testMaskRestoreScope testtrans)
15801689
, testCase "ExitLocal" (testExitLocal testtrans)
15811690
, testCase "ExitRemote" (testExitRemote testtrans)
1691+
, testCase "TestRegistryMonitor" (testRegistryMonitoring testtrans)
15821692
, testCase "TextCallLocal" (testCallLocal testtrans)
15831693
-- Unsafe Primitives
15841694
, testCase "TestUnsafeSend" (testUnsafeSend testtrans)

distributed-process-tests/src/Control/Distributed/Process/Tests/Mx.hs

Lines changed: 77 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import Control.Distributed.Process.Management
2424
, mxGetId
2525
)
2626
import Control.Monad (void)
27+
import Control.Rematch (equalTo)
2728
import Data.Binary
2829
import Data.List (find, sort)
2930
import Data.Maybe (isJust)
@@ -175,31 +176,83 @@ testAgentEventHandling result = do
175176

176177
stash result $ seenAlive && seenDead
177178

179+
testMxMonitorEvents :: Process ()
180+
testMxMonitorEvents = do
181+
182+
{- This test only deals with the local case, to ensure that we are being
183+
notified in the expected order - the remote cases related to the
184+
behaviour of the node controller are contained in the CH test suite. -}
185+
186+
let label = "testMxMonitorEvents"
187+
let agentLabel = "listener-agent"
188+
let delay = 1000000
189+
(regChan, regSink) <- newChan
190+
(unRegChan, unRegSink) <- newChan
191+
agent <- mxAgent (MxAgentId agentLabel) () [
192+
mxSink $ \ev -> do
193+
case ev of
194+
MxRegistered pid label
195+
| label /= agentLabel -> liftMX $ sendChan regChan (label, pid)
196+
MxUnRegistered pid label
197+
| label /= agentLabel -> liftMX $ sendChan unRegChan (label, pid)
198+
_ -> return ()
199+
mxReady
200+
]
201+
202+
p1 <- spawnLocal expect
203+
p2 <- spawnLocal expect
204+
205+
register label p1
206+
reg1 <- receiveChanTimeout delay regSink
207+
reg1 `shouldBe` equalTo (Just (label, p1))
208+
209+
unregister label
210+
unreg1 <- receiveChanTimeout delay unRegSink
211+
unreg1 `shouldBe` equalTo (Just (label, p1))
212+
213+
register label p2
214+
reg2 <- receiveChanTimeout delay regSink
215+
reg2 `shouldBe` equalTo (Just (label, p2))
216+
217+
reregister label p1
218+
unreg2 <- receiveChanTimeout delay unRegSink
219+
unreg2 `shouldBe` equalTo (Just (label, p2))
220+
221+
reg3 <- receiveChanTimeout delay regSink
222+
reg3 `shouldBe` equalTo (Just (label, p1))
223+
224+
mapM_ (flip kill $ "test-complete") [agent, p1, p2]
225+
178226
tests :: TestTransport -> IO [Test]
179227
tests TestTransport{..} = do
180228
node1 <- newLocalNode testTransport initRemoteTable
181229
return [
182-
testGroup "Mx Agents" [
183-
testCase "Event Handling"
184-
(delayedAssertion
185-
"expected True, but events where not as expected"
186-
node1 True testAgentEventHandling)
187-
, testCase "Inter-Agent Broadcast"
188-
(delayedAssertion
189-
"expected (), but no broadcast was received"
190-
node1 () testAgentBroadcast)
191-
, testCase "Agent Mailbox Handling"
192-
(delayedAssertion
193-
"expected (Just ()), but no regular (mailbox) input was handled"
194-
node1 (Just ()) testAgentMailboxHandling)
195-
, testCase "Agent Dual Input Handling"
196-
(delayedAssertion
197-
"expected sum = 15, but the result was Nothing"
198-
node1 (Just 15 :: Maybe Int) testAgentDualInput)
199-
, testCase "Agent Input Prioritisation"
200-
(delayedAssertion
201-
"expected [first, second, third, fourth, fifth], but result diverged"
202-
node1 (sort ["first", "second",
203-
"third", "fourth",
204-
"fifth"]) testAgentPrioritisation)
205-
]]
230+
testGroup "Mx Agents" [
231+
testCase "Event Handling"
232+
(delayedAssertion
233+
"expected True, but events where not as expected"
234+
node1 True testAgentEventHandling)
235+
, testCase "Inter-Agent Broadcast"
236+
(delayedAssertion
237+
"expected (), but no broadcast was received"
238+
node1 () testAgentBroadcast)
239+
, testCase "Agent Mailbox Handling"
240+
(delayedAssertion
241+
"expected (Just ()), but no regular (mailbox) input was handled"
242+
node1 (Just ()) testAgentMailboxHandling)
243+
, testCase "Agent Dual Input Handling"
244+
(delayedAssertion
245+
"expected sum = 15, but the result was Nothing"
246+
node1 (Just 15 :: Maybe Int) testAgentDualInput)
247+
, testCase "Agent Input Prioritisation"
248+
(delayedAssertion
249+
"expected [first, second, third, fourth, fifth], but result diverged"
250+
node1 (sort ["first", "second",
251+
"third", "fourth",
252+
"fifth"]) testAgentPrioritisation)
253+
]
254+
, testGroup "Mx Events" [
255+
testCase "Monitor Events"
256+
(runProcess node1 testMxMonitorEvents)
257+
]
258+
]

src/Control/Distributed/Process/Internal/Primitives.hs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ module Control.Distributed.Process.Internal.Primitives
7676
, unlink
7777
, monitor
7878
, unmonitor
79+
, unmonitorAsync
7980
, withMonitor
8081
, withMonitor_
8182
-- * Logging

src/Control/Distributed/Process/Management/Internal/Types.hs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import qualified Control.Monad.State as ST
3030
( MonadState
3131
, StateT
3232
)
33+
import Control.Monad.Fix (MonadFix)
3334
import Data.Binary
3435
import Data.Typeable (Typeable)
3536
import GHC.Generics
@@ -112,6 +113,7 @@ newtype MxAgent s a =
112113
} deriving ( Functor
113114
, Monad
114115
, MonadIO
116+
, MonadFix
115117
, ST.MonadState (MxAgentState s)
116118
, Typeable
117119
, Applicative

src/Control/Distributed/Process/Node.hs

Lines changed: 61 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,13 @@ import Data.Binary (decode)
2828
import Data.Map (Map)
2929
import qualified Data.Map as Map
3030
( empty
31+
, delete
3132
, toList
3233
, fromList
3334
, filter
35+
, insert
36+
, lookup
37+
, insertLookupWithKey
3438
, partitionWithKey
3539
, elems
3640
, size
@@ -183,6 +187,17 @@ import Control.Distributed.Process.Internal.Types
183187
import Control.Distributed.Process.Management.Internal.Agent
184188
( mxAgentController
185189
)
190+
import Control.Distributed.Process.Management
191+
( mxAgent
192+
, mxSink
193+
, liftMX
194+
, mxUpdateLocal
195+
, mxGetLocal
196+
, mxSetLocal
197+
, mxReady
198+
, MxEvent(..)
199+
, MxAgentId(..)
200+
)
186201
import qualified Control.Distributed.Process.Management.Internal.Trace.Remote as Trace
187202
( remoteTable
188203
)
@@ -195,9 +210,6 @@ import Control.Distributed.Process.Management.Internal.Trace.Types
195210
, traceLogFmt
196211
, enableTrace
197212
)
198-
import Control.Distributed.Process.Management.Internal.Types
199-
( MxEvent(..)
200-
)
201213
import Control.Distributed.Process.Serializable (Serializable)
202214
import Control.Distributed.Process.Internal.Messaging
203215
( sendBinary
@@ -210,6 +222,9 @@ import Control.Distributed.Process.Internal.Primitives
210222
, match
211223
, sendChan
212224
, unwrapMessage
225+
, monitor
226+
, unmonitorAsync
227+
, getSelfNode
213228
, SayMessage(..)
214229
)
215230
import Control.Distributed.Process.Internal.Types (SendPort, Tracer(..))
@@ -310,6 +325,49 @@ startDefaultTracer node' = do
310325

311326
-- TODO: we need a better mechanism for defining and registering services
312327

328+
registryMonitorAgentId :: MxAgentId
329+
registryMonitorAgentId = MxAgentId "service.registry.monitoring"
330+
331+
-- note [registry monitoring agent]
332+
-- this agent listens for 'MxRegistered' and 'MxUnRegistered' events and tracks
333+
-- all remote 'ProcessId's that are stored in the registry.
334+
--
335+
-- When a remote process is registered, the agent starts monitoring it until it
336+
-- is unregistered or the monitor notification arrives.
337+
--
338+
-- The agent keeps the amount of labels associated with each registered remote
339+
-- process. This is necessary so the process is unmonitored only when it has
340+
-- been unregistered from all of the labels.
341+
--
342+
343+
registryMonitorAgent :: Process ProcessId
344+
registryMonitorAgent = do
345+
nid <- getSelfNode
346+
-- For each process the map associates the 'MonitorRef' used to monitor it and
347+
-- the amount of labels associated with it.
348+
mxAgent registryMonitorAgentId (Map.empty :: Map String MonitorRef)
349+
[ mxSink $ \(ProcessMonitorNotification mref _ _) -> do
350+
mxUpdateLocal $ Map.filter (== mref)
351+
mxReady
352+
, mxSink $ \ev -> do
353+
case ev of
354+
MxRegistered pid label
355+
| processNodeId pid /= nid -> do
356+
mref <- liftMX $ monitor pid
357+
mxUpdateLocal (Map.insert label mref)
358+
MxUnRegistered pid label
359+
| processNodeId pid /= nid -> do
360+
mrefs <- mxGetLocal
361+
forM_ (label `Map.lookup` mrefs) $ \mref -> do
362+
liftMX $ unmonitorAsync mref
363+
mxUpdateLocal (Map.delete label)
364+
_ -> return ()
365+
mxReady
366+
-- remove async answers from mailbox
367+
, mxSink $ \RegisterReply{} -> mxReady
368+
, mxSink $ \DidUnmonitor{} -> mxReady
369+
]
370+
313371
-- | Start and register the service processes on a node
314372
startServiceProcesses :: LocalNode -> IO ()
315373
startServiceProcesses node = do

0 commit comments

Comments
 (0)