Skip to content

Commit 0514f3c

Browse files
committed
Fix MxSend trace messages for unsafe primitives
1 parent d2b5d82 commit 0514f3c

File tree

3 files changed

+155
-92
lines changed

3 files changed

+155
-92
lines changed

distributed-process-tests/src/Control/Distributed/Process/Tests/Internal/Utils.hs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,12 +74,13 @@ import Control.Concurrent
7474
import Control.Concurrent.MVar
7575
( putMVar
7676
)
77-
import Control.Distributed.Process
77+
import Control.Distributed.Process hiding (finally, catch)
7878
import Control.Distributed.Process.Node
7979
import Control.Distributed.Process.Serializable()
8080

8181
import Control.Exception (AsyncException(ThreadKilled), SomeException)
8282
import Control.Monad (forever, void)
83+
import Control.Monad.Catch (finally, catch)
8384
import Control.Monad.STM (atomically)
8485
import Control.Rematch hiding (match)
8586
import Control.Rematch.Run

distributed-process-tests/src/Control/Distributed/Process/Tests/Mx.hs

Lines changed: 125 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,9 @@ module Control.Distributed.Process.Tests.Mx (tests) where
33

44
import Control.Distributed.Process.Tests.Internal.Utils
55
import Network.Transport.Test (TestTransport(..))
6-
7-
import Control.Distributed.Process
6+
import Control.Exception (SomeException)
7+
import Control.Distributed.Process hiding (bracket, finally, try)
8+
import Control.Distributed.Process.Internal.Types (ProcessExitException(..))
89
import Control.Distributed.Process.Node
910
import qualified Control.Distributed.Process.UnsafePrimitives as Unsafe
1011
( send
@@ -28,12 +29,13 @@ import Control.Distributed.Process.Management
2829
, mxBroadcast
2930
)
3031
import Control.Monad (void, unless)
32+
import Control.Monad.Catch(finally, bracket, try)
3133
import Control.Rematch (equalTo)
3234
import Data.Binary
3335
import Data.List (find, sort)
3436
import Data.Maybe (isJust, isNothing)
3537
import Data.Typeable
36-
import GHC.Generics
38+
import GHC.Generics hiding (from)
3739
#if ! MIN_VERSION_base(4,6,0)
3840
import Prelude hiding (catch, log)
3941
#endif
@@ -57,7 +59,7 @@ awaitExit pid =
5759
(\_ -> return ())
5860
]
5961
where
60-
withMonitorRef pid = bracket (monitor pid) unmonitor
62+
withMonitorRef p = bracket (monitor p) unmonitor
6163

6264
testAgentBroadcast :: TestResult () -> Process ()
6365
testAgentBroadcast result = do
@@ -319,101 +321,163 @@ testMxRegMon remoteNode result = do
319321
ensure :: Process () -> Process () -> Process ()
320322
ensure = flip finally
321323

322-
323-
testMxUnsafeSendEvents :: LocalNode -> Process ()
324-
testMxUnsafeSendEvents remoteNode = do
325-
326-
-- ensure that when a registered process dies, we get a notification that
327-
-- it has been unregistered as well as seeing the name get removed
328-
329-
let label1 = "aaaaa"
330-
let label2 = "bbbbb"
331-
let isValid l = l ==label1 || l == label2
332-
let agentLabel = "listener-agent"
333-
let delay = 1000000
334-
(regChan, regSink) <- newChan
335-
(unRegChan, unRegSink) <- newChan
336-
agent <- mxAgent (MxAgentId agentLabel) () [
324+
type SendTest = ProcessId -> ReceivePort MxEvent -> Process Bool
325+
326+
testNSend :: (String -> () -> Process ()) -> Maybe LocalNode -> TestResult Bool -> Process ()
327+
testNSend op n r = testMxSend n r $ \p1 sink -> do
328+
let delay = 5000000
329+
let label = "testMxSend"
330+
331+
register label p1
332+
reg1 <- receiveChanTimeout delay sink
333+
case reg1 of
334+
Just (MxRegistered pd lb)
335+
| pd == p1 && lb == label -> return ()
336+
_ -> die $ "Reg-Failed: " ++ show reg1
337+
338+
op label ()
339+
340+
us <- getSelfPid
341+
sent <- receiveChanTimeout delay sink
342+
case sent of
343+
Just (MxSentToName lb by _)
344+
| by == us && lb == label -> return True
345+
_ -> die $ "Send-Failed: " ++ show sent
346+
347+
testSend :: (ProcessId -> () -> Process ()) -> Maybe LocalNode -> TestResult Bool -> Process ()
348+
testSend op n r = testMxSend n r $ \p1 sink -> do
349+
-- initiate a send
350+
op p1 ()
351+
352+
-- verify the management event
353+
us <- getSelfPid
354+
sent <- receiveChanTimeout 5000000 sink
355+
case sent of
356+
Just (MxSent pidTo pidFrom _)
357+
| pidTo == p1 && pidFrom == us -> return True
358+
_ -> return False
359+
360+
testMxSend :: Maybe LocalNode -> TestResult Bool -> SendTest -> Process ()
361+
testMxSend mNode result test = do
362+
us <- getSelfPid
363+
(chan, sink) <- newChan
364+
agent <- mxAgent (MxAgentId $ agentLabel us) () [
337365
mxSink $ \ev -> do
338366
case ev of
339-
MxRegistered pid label
340-
| isValid label -> liftMX $ sendChan regChan (label, pid)
341-
MxUnRegistered pid label
342-
| isValid label -> liftMX $ sendChan unRegChan (label, pid)
367+
m@(MxSent _ fromPid _)
368+
| fromPid == us -> liftMX $ sendChan chan m
369+
m@(MxSentToName _ fromPid _)
370+
| fromPid == us -> liftMX $ sendChan chan m
371+
m@(MxRegistered _ name)
372+
| name == label -> liftMX $ sendChan chan m
343373
_ -> return ()
344374
mxReady
345375
]
346376

347377
(sp, rp) <- newChan
348-
liftIO $ forkProcess remoteNode $ do
349-
getSelfPid >>= sendChan sp
350-
expect :: Process ()
378+
case mNode of
379+
Nothing -> void $ spawnLocal (proc sp)
380+
Just remoteNode -> void $ liftIO $ forkProcess remoteNode $ proc sp
351381

352382
p1 <- receiveChan rp
383+
res <- try (test p1 sink)
384+
case res of
385+
Left (ProcessExitException _ m) -> (liftIO $ putStrLn $ "SomeException-" ++ show m) >> stash result False
386+
Right tr -> stash result tr
387+
kill agent "bye"
388+
kill p1 "bye"
353389

354-
register label1 p1
355-
reg1 <- receiveChanTimeout delay regSink
356-
reg1 `shouldBe` equalTo (Just (label1, p1))
357-
358-
register label2 p1
359-
reg2 <- receiveChanTimeout delay regSink
360-
reg2 `shouldBe` equalTo (Just (label2, p1))
361-
362-
n1 <- whereis label1
363-
n1 `shouldBe` equalTo (Just p1)
364-
365-
n2 <- whereis label2
366-
n2 `shouldBe` equalTo (Just p1)
367-
368-
kill p1 "goodbye"
369-
370-
unreg1 <- receiveChanTimeout delay unRegSink
371-
unreg2 <- receiveChanTimeout delay unRegSink
372-
373-
sort [unreg1, unreg2]
374-
`shouldBe` equalTo [Just (label1, p1), Just (label2, p1)]
375-
376-
kill agent "test-complete"
390+
where
391+
label = "testMxSend"
392+
agentLabel s = "mx-unsafe-check-agent-" ++ show s
393+
proc sp' = getSelfPid >>= sendChan sp' >> expect :: Process ()
377394

378395
tests :: TestTransport -> IO [Test]
379396
tests TestTransport{..} = do
380397
node1 <- newLocalNode testTransport initRemoteTable
381398
node2 <- newLocalNode testTransport initRemoteTable
399+
let nid = localNodeId node2
382400
return [
383-
testGroup "Mx Agents" [
384-
testCase "Event Handling"
401+
testGroup "MxAgents" [
402+
testCase "EventHandling"
385403
(delayedAssertion
386404
"expected True, but events where not as expected"
387405
node1 True testAgentEventHandling)
388-
, testCase "Inter-Agent Broadcast"
406+
, testCase "InterAgentBroadcast"
389407
(delayedAssertion
390408
"expected (), but no broadcast was received"
391409
node1 () testAgentBroadcast)
392-
, testCase "Agent Mailbox Handling"
410+
, testCase "AgentMailboxHandling"
393411
(delayedAssertion
394412
"expected (Just ()), but no regular (mailbox) input was handled"
395413
node1 (Just ()) testAgentMailboxHandling)
396-
, testCase "Agent Dual Input Handling"
414+
, testCase "AgentDualInputHandling"
397415
(delayedAssertion
398416
"expected sum = 15, but the result was Nothing"
399417
node1 (Just 15 :: Maybe Int) testAgentDualInput)
400-
, testCase "Agent Input Prioritisation"
418+
, testCase "AgentInputPrioritisation"
401419
(delayedAssertion
402420
"expected [first, second, third, fourth, fifth], but result diverged"
403421
node1 (sort ["first", "second",
404422
"third", "fourth",
405423
"fifth"]) testAgentPrioritisation)
406424
]
407-
, testGroup "Mx Events" [
408-
testCase "Name Registration Events"
425+
, testGroup "MxEvents" [
426+
testCase "NameRegistrationEvents"
409427
(delayedAssertion
410428
"expected registration events to map to the correct ProcessId"
411429
node1 () testMxRegEvents)
412-
, testCase "Post Death Name UnRegistration Events"
430+
, testCase "PostDeathNameUnRegistrationEvents"
413431
(delayedAssertion
414432
"expected process deaths to result in unregistration events"
415433
node1 () (testMxRegMon node2))
416-
, testCase "Monitor Events"
417-
(runProcess node1 (testMxUnsafeSendEvents node2))
434+
, testGroup "SentEvents" [
435+
testGroup "RemoteTargets" [
436+
testCase "Unsafe.nsend"
437+
(delayedAssertion "expected mx events failed"
438+
node1 True (testNSend Unsafe.nsend $ Just node2))
439+
, testCase "Unsafe.nsendRemote"
440+
(delayedAssertion "expected mx events failed"
441+
node1 True (testNSend (Unsafe.nsendRemote nid) $ Just node2))
442+
, testCase "Unsafe.send"
443+
(delayedAssertion "expected mx events failed"
444+
node1 True (testSend Unsafe.send $ Just node2))
445+
, testCase "Unsafe.usend"
446+
(delayedAssertion "expected mx events failed"
447+
node1 True (testSend Unsafe.usend $ Just node2))
448+
, testCase "nsend"
449+
(delayedAssertion "expected mx events failed"
450+
node1 True (testNSend nsend $ Just node2))
451+
, testCase "nsendRemote"
452+
(delayedAssertion "expected mx events failed"
453+
node1 True (testNSend (nsendRemote nid) $ Just node2))
454+
, testCase "send"
455+
(delayedAssertion "expected mx events failed"
456+
node1 True (testSend send $ Just node2))
457+
, testCase "usend"
458+
(delayedAssertion "expected mx events failed"
459+
node1 True (testSend usend $ Just node2))
460+
]
461+
, testGroup "LocalTargets" [
462+
testCase "Unsafe.nsend"
463+
(delayedAssertion "expected mx events failed"
464+
node1 True (testNSend Unsafe.nsend Nothing))
465+
, testCase "Unsafe.send"
466+
(delayedAssertion "expected mx events failed"
467+
node1 True (testSend Unsafe.send Nothing))
468+
, testCase "Unsafe.usend"
469+
(delayedAssertion "expected mx events failed"
470+
node1 True (testSend Unsafe.usend Nothing))
471+
, testCase "nsend"
472+
(delayedAssertion "expected mx events failed"
473+
node1 True (testNSend nsend Nothing))
474+
, testCase "send"
475+
(delayedAssertion "expected mx events failed"
476+
node1 True (testSend send Nothing))
477+
, testCase "usend"
478+
(delayedAssertion "expected mx events failed"
479+
node1 True (testSend usend Nothing))
480+
]
481+
]
482+
]
418483
]
419-
]

src/Control/Distributed/Process/UnsafePrimitives.hs

Lines changed: 28 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -113,10 +113,12 @@ nsend label msg = do
113113

114114
-- [note: tracing]
115115
-- In the remote case, we do not fire a trace event until after sending is
116-
-- complete, since 'sendMessage' can block in the networking stack.
116+
-- complete, since 'sendMessage' can block in the networking stack. For locally
117+
-- registered processes, we trace before sending the NC a control message, since
118+
-- we cannot guarantee ordering otherwise.
117119
--
118120
-- In addition, MxSent trace messages are dispatched by the node controller's
119-
-- thread for local sends, which also covers local nsend, and local usend.
121+
-- thread for local sends, which also covers usend.
120122
--
121123
-- Also note that tracing writes to the local node's control channel, and this
122124
-- module explicitly specifies to its clients that it does unsafe message
@@ -133,28 +135,31 @@ nsendRemote nid label msg = do
133135
let node = processNode proc
134136
if localNodeId (processNode proc) == nid
135137
then nsend label msg
136-
else do -- see [note: tracing] NB: this is a remote call to another NC...
137-
sendCtrlMsg (Just nid) (NamedSend label (createMessage msg))
138-
liftIO $ traceEvent (localEventBus node)
139-
(MxSentToName label us (wrapMessage msg))
138+
else
139+
let lbl = label ++ "@" ++ show nid in do
140+
-- see [note: tracing] NB: this is a remote call to another NC...
141+
liftIO $ traceEvent (localEventBus node)
142+
(MxSentToName label us (wrapMessage msg))
143+
sendCtrlMsg (Just nid) (NamedSend label (createMessage msg))
140144

141145
-- | Send a message
142146
send :: Serializable a => ProcessId -> a -> Process ()
143147
send them msg = do
144148
proc <- ask
145149
let node = localNodeId (processNode proc)
146150
destNode = (processNodeId them)
147-
us = (processId proc) in do
148-
case destNode == node of
149-
True -> unsafeSendLocal them msg
150-
False -> liftIO $ do sendMessage (processNode proc)
151-
(ProcessIdentifier (processId proc))
152-
(ProcessIdentifier them)
153-
NoImplicitReconnect
154-
msg
155-
-- see [note: tracing]
156-
liftIO $ traceEvent (localEventBus (processNode proc))
157-
(MxSent them us (wrapMessage msg))
151+
us = (processId proc)
152+
msg' = wrapMessage msg in do
153+
-- see [note: tracing]
154+
liftIO $ traceEvent (localEventBus (processNode proc))
155+
(MxSent them us msg')
156+
if destNode == node
157+
then sendCtrlMsg Nothing $ LocalSend them msg'
158+
else liftIO $ sendMessage (processNode proc)
159+
(ProcessIdentifier (processId proc))
160+
(ProcessIdentifier them)
161+
NoImplicitReconnect
162+
msg
158163

159164
-- | Send a message unreliably.
160165
--
@@ -170,19 +175,12 @@ usend them msg = do
170175
proc <- ask
171176
let there = processNodeId them
172177
let (us, node) = (processId proc, processNode proc)
178+
let msg' = wrapMessage msg
179+
liftIO $ traceEvent (localEventBus node) (MxSent them us msg')
173180
if localNodeId (processNode proc) == there
174-
then unsafeSendLocal them msg
175-
else do sendCtrlMsg (Just there) $ UnreliableSend (processLocalId them)
176-
(createMessage msg)
177-
-- I would assert that is it *still* cheaper to not encode here...
178-
liftIO $ traceEvent (localEventBus node)
179-
(MxSent them us (wrapMessage msg))
180-
181-
unsafeSendLocal :: (Serializable a) => ProcessId -> a -> Process ()
182-
unsafeSendLocal pid msg =
183-
let msg' = wrapMessage msg in do
184-
sendCtrlMsg Nothing $ LocalSend pid msg'
185-
-- see [note: tracing]
181+
then sendCtrlMsg Nothing $ LocalSend them msg'
182+
else sendCtrlMsg (Just there) $ UnreliableSend (processLocalId them)
183+
(createMessage msg)
186184

187185
-- | Send a message on a typed channel
188186
sendChan :: Serializable a => SendPort a -> a -> Process ()
@@ -192,7 +190,7 @@ sendChan (SendPort cid) msg = do
192190
destNode = processNodeId (sendPortProcessId cid) in do
193191
case destNode == node of
194192
True -> unsafeSendChanLocal cid msg
195-
False -> do
193+
False ->
196194
liftIO $ sendBinary (processNode proc)
197195
(ProcessIdentifier (processId proc))
198196
(SendPortIdentifier cid)

0 commit comments

Comments
 (0)