Skip to content

Commit 13a3746

Browse files
committed
work in progress
1 parent e0092ac commit 13a3746

File tree

4 files changed

+182
-58
lines changed

4 files changed

+182
-58
lines changed

distributed-process-tests/src/Control/Distributed/Process/Tests/Mx.hs

Lines changed: 90 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,12 @@ import Network.Transport.Test (TestTransport(..))
77
import Control.Concurrent (threadDelay)
88
import Control.Distributed.Process
99
import Control.Distributed.Process.Node
10+
import qualified Control.Distributed.Process.UnsafePrimitives as Unsafe
11+
( send
12+
, nsend
13+
, nsendRemote
14+
, usend
15+
)
1016
import Control.Distributed.Process.Management
1117
( MxEvent(..)
1218
, MxAgentId(..)
@@ -175,31 +181,92 @@ testAgentEventHandling result = do
175181

176182
stash result $ seenAlive && seenDead
177183

184+
185+
testMxUnsafeSendEvents :: LocalNode -> Process ()
186+
testMxUnsafeSendEvents remoteNode = do
187+
188+
-- ensure that when a registered process dies, we get a notification that
189+
-- it has been unregistered as well as seeing the name get removed
190+
191+
let label1 = "aaaaa"
192+
let label2 = "bbbbb"
193+
let isValid l = l ==label1 || l == label2
194+
let agentLabel = "listener-agent"
195+
let delay = 1000000
196+
(regChan, regSink) <- newChan
197+
(unRegChan, unRegSink) <- newChan
198+
agent <- mxAgent (MxAgentId agentLabel) () [
199+
mxSink $ \ev -> do
200+
case ev of
201+
MxRegistered pid label
202+
| isValid label -> liftMX $ sendChan regChan (label, pid)
203+
MxUnRegistered pid label
204+
| isValid label -> liftMX $ sendChan unRegChan (label, pid)
205+
_ -> return ()
206+
mxReady
207+
]
208+
209+
(sp, rp) <- newChan
210+
liftIO $ forkProcess remoteNode $ do
211+
getSelfPid >>= sendChan sp
212+
expect :: Process ()
213+
214+
p1 <- receiveChan rp
215+
216+
register label1 p1
217+
reg1 <- receiveChanTimeout delay regSink
218+
reg1 `shouldBe` equalTo (Just (label1, p1))
219+
220+
register label2 p1
221+
reg2 <- receiveChanTimeout delay regSink
222+
reg2 `shouldBe` equalTo (Just (label2, p1))
223+
224+
n1 <- whereis label1
225+
n1 `shouldBe` equalTo (Just p1)
226+
227+
n2 <- whereis label2
228+
n2 `shouldBe` equalTo (Just p1)
229+
230+
kill p1 "goodbye"
231+
232+
unreg1 <- receiveChanTimeout delay unRegSink
233+
unreg2 <- receiveChanTimeout delay unRegSink
234+
235+
sort [unreg1, unreg2]
236+
`shouldBe` equalTo [Just (label1, p1), Just (label2, p1)]
237+
238+
kill agent "test-complete"
239+
178240
tests :: TestTransport -> IO [Test]
179241
tests TestTransport{..} = do
180242
node1 <- newLocalNode testTransport initRemoteTable
243+
node2 <- newLocalNode testTransport initRemoteTable
181244
return [
182-
testGroup "Mx Agents" [
183-
testCase "Event Handling"
184-
(delayedAssertion
185-
"expected True, but events where not as expected"
186-
node1 True testAgentEventHandling)
187-
, testCase "Inter-Agent Broadcast"
188-
(delayedAssertion
189-
"expected (), but no broadcast was received"
190-
node1 () testAgentBroadcast)
191-
, testCase "Agent Mailbox Handling"
192-
(delayedAssertion
193-
"expected (Just ()), but no regular (mailbox) input was handled"
194-
node1 (Just ()) testAgentMailboxHandling)
195-
, testCase "Agent Dual Input Handling"
196-
(delayedAssertion
197-
"expected sum = 15, but the result was Nothing"
198-
node1 (Just 15 :: Maybe Int) testAgentDualInput)
199-
, testCase "Agent Input Prioritisation"
200-
(delayedAssertion
201-
"expected [first, second, third, fourth, fifth], but result diverged"
202-
node1 (sort ["first", "second",
203-
"third", "fourth",
204-
"fifth"]) testAgentPrioritisation)
245+
testGroup "Mx Agents" [
246+
testCase "Event Handling"
247+
(delayedAssertion
248+
"expected True, but events where not as expected"
249+
node1 True testAgentEventHandling)
250+
, testCase "Inter-Agent Broadcast"
251+
(delayedAssertion
252+
"expected (), but no broadcast was received"
253+
node1 () testAgentBroadcast)
254+
, testCase "Agent Mailbox Handling"
255+
(delayedAssertion
256+
"expected (Just ()), but no regular (mailbox) input was handled"
257+
node1 (Just ()) testAgentMailboxHandling)
258+
, testCase "Agent Dual Input Handling"
259+
(delayedAssertion
260+
"expected sum = 15, but the result was Nothing"
261+
node1 (Just 15 :: Maybe Int) testAgentDualInput)
262+
, testCase "Agent Input Prioritisation"
263+
(delayedAssertion
264+
"expected [first, second, third, fourth, fifth], but result diverged"
265+
node1 (sort ["first", "second",
266+
"third", "fourth",
267+
"fifth"]) testAgentPrioritisation)
268+
testGroup "Mx Events" [
269+
testCase "Monitor Events"
270+
(runProcess node1 (testMxUnsafeSendEvents node2))
271+
]
205272
]]

src/Control/Distributed/Process/Internal/Primitives.hs

Lines changed: 13 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -482,15 +482,13 @@ forward msg them = do
482482
destNode = (processNodeId them) in do
483483
case destNode == nid of
484484
True -> sendCtrlMsg Nothing (LocalSend them msg)
485-
False -> liftIO $ sendPayload (processNode proc)
486-
(ProcessIdentifier (processId proc))
487-
(ProcessIdentifier them)
488-
NoImplicitReconnect
489-
(messageToPayload msg)
490-
-- We do not fire the trace event until after the sending is complete;
491-
-- In the remote case, 'sendMessage' can block in the networking stack.
492-
liftIO $ traceEvent (localEventBus node)
493-
(MxSent them us msg)
485+
False ->
486+
liftIO $ do
487+
sendPayload (processNode proc) (ProcessIdentifier (processId proc)) (ProcessIdentifier them) NoImplicitReconnect (messageToPayload msg)
488+
-- We do not fire the trace event until after the sending is complete;
489+
-- In this remote case, 'sendMessage' can block in the networking stack.
490+
liftIO $ traceEvent (localEventBus node)
491+
(MxSent them us msg)
494492

495493
-- | Forward a raw 'Message' to the given 'ProcessId'.
496494
--
@@ -506,12 +504,12 @@ uforward msg them = do
506504
destNode = (processNodeId them) in do
507505
case destNode == nid of
508506
True -> sendCtrlMsg Nothing (LocalSend them msg)
509-
False -> sendCtrlMsg (Just destNode) $ UnreliableSend (processLocalId them)
510-
msg
511-
-- We do not fire the trace event until after the sending is complete;
512-
-- In the remote case, 'sendCtrlMsg' can block in the networking stack.
513-
liftIO $ traceEvent (localEventBus node)
514-
(MxSent them us msg)
507+
False -> do
508+
sendCtrlMsg (Just destNode) $ UnreliableSend (processLocalId them) msg
509+
-- We do not fire the trace event until after the sending is complete;
510+
-- In the remote case, 'sendCtrlMsg' can block in the networking stack.
511+
liftIO $ traceEvent (localEventBus node)
512+
(MxSent them us msg)
515513

516514
-- | Wrap a 'Serializable' value in a 'Message'. Note that 'Message's are
517515
-- 'Serializable' - like the datum they contain - but also note, deserialising

src/Control/Distributed/Process/Management/Internal/Types.hs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,8 @@ data MxEvent =
5656
-- ^ fired whenever a node /dies/ (i.e., the connection is broken/disconnected)
5757
| MxSent ProcessId ProcessId Message
5858
-- ^ fired whenever a message is sent from a local process
59+
| MxSentToName String ProcessId Message
60+
-- ^ fired whenever a named send occurs
5961
| MxReceived ProcessId Message
6062
-- ^ fired whenever a message is received by a local process
6163
| MxConnected ConnectionId EndPointAddress

src/Control/Distributed/Process/UnsafePrimitives.hs

Lines changed: 77 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,24 @@
3232
-- the /normal/ strategy).
3333
--
3434
-- Use of the functions in this module can potentially change the runtime
35-
-- behaviour of your application. You have been warned!
35+
-- behaviour of your application. In addition, messages passed between Cloud
36+
-- Haskell processes are written to a tracing infrastructure on the local node,
37+
-- to provide improved introspection and debugging facilities for complex actor
38+
-- based systems. This module makes no attempt to force evaluation in these
39+
-- cases either, thus evaluation problems in passed data structures could not
40+
-- only crash your processes, but could also bring down critical internal
41+
-- services on which the node relies to function correctly.
42+
--
43+
-- If you wish to repudiate such issues, you are advised to consider the use
44+
-- of NFSerialisable in the distributed-process-extras package, which type
45+
-- class brings NFData into scope along with Serializable, such that we can
46+
-- force evaluation. Intended for use with modules such as this one, this
47+
-- approach guarantees correct evaluatedness in terms of @NFData@. Please note
48+
-- however, that we /cannot/ guarantee that an @NFData@ instance will behave the
49+
-- same way as a @Binary@ one with regards evaluation, so it is still possible
50+
-- to introduce unexpected behaviour by using /unsafe/ primitives in this way.
51+
--
52+
-- You have been warned!
3653
--
3754
-- This module is exported so that you can replace the use of Cloud Haskell's
3855
-- /safe/ messaging primitives. If you want to use both variants, then you can
@@ -55,7 +72,12 @@ import Control.Distributed.Process.Internal.Messaging
5572
, sendBinary
5673
, sendCtrlMsg
5774
)
58-
75+
import Control.Distributed.Process.Management.Internal.Types
76+
( MxEvent(..)
77+
)
78+
import Control.Distributed.Process.Management.Internal.Trace.Types
79+
( traceEvent
80+
)
5981
import Control.Distributed.Process.Internal.Types
6082
( ProcessId(..)
6183
, NodeId(..)
@@ -79,34 +101,60 @@ import Control.Monad.Reader (ask)
79101

80102
-- | Named send to a process in the local registry (asynchronous)
81103
nsend :: Serializable a => String -> a -> Process ()
82-
nsend label msg =
83-
sendCtrlMsg Nothing (NamedSend label (unsafeCreateUnencodedMessage msg))
104+
nsend label msg = do
105+
proc <- ask
106+
let us = processId proc
107+
let node = localNodeId (processNode proc)
108+
let msg' = wrapMessage msg
109+
-- see [note: tracing]
110+
liftIO $ traceEvent (localEventBus (processNode proc))
111+
(MxSentToName label us msg')
112+
sendCtrlMsg Nothing (NamedSend label msg')
113+
114+
-- [note: tracing]
115+
-- In the remote case, we do not fire a trace event until after sending is
116+
-- complete, since 'sendMessage' can block in the networking stack.
117+
--
118+
-- In addition, MxSent trace messages are dispatched by the node controller's
119+
-- thread for local sends, which also covers local nsend, and local usend.
120+
--
121+
-- Also note that tracing writes to the local node's control channel, and this
122+
-- module explicitly specifies to its clients that it does unsafe message
123+
-- encoding. The same is true for the messages it puts onto the Management
124+
-- event bus, however we do *not* want unevaluated thunks hitting the event
125+
-- bus control thread. Hence the word /Unsafe/ in this module's name!
126+
--
84127

85128
-- | Named send to a process in a remote registry (asynchronous)
86129
nsendRemote :: Serializable a => NodeId -> String -> a -> Process ()
87130
nsendRemote nid label msg = do
88131
proc <- ask
132+
let us = processId proc
133+
let node = processNode proc
89134
if localNodeId (processNode proc) == nid
90135
then nsend label msg
91-
else sendCtrlMsg (Just nid) (NamedSend label (createMessage msg))
136+
else do -- see [note: tracing] NB: this is a remote call to another NC...
137+
sendCtrlMsg (Just nid) (NamedSend label (createMessage msg))
138+
liftIO $ traceEvent (localEventBus node)
139+
(MxSentToName label us (wrapMessage msg))
92140

93141
-- | Send a message
94142
send :: Serializable a => ProcessId -> a -> Process ()
95143
send them msg = do
96144
proc <- ask
97145
let node = localNodeId (processNode proc)
98-
destNode = (processNodeId them) in do
146+
destNode = (processNodeId them)
147+
us = (processId proc) in do
99148
case destNode == node of
100149
True -> unsafeSendLocal them msg
101-
False -> liftIO $ sendMessage (processNode proc)
102-
(ProcessIdentifier (processId proc))
103-
(ProcessIdentifier them)
104-
NoImplicitReconnect
105-
msg
106-
where
107-
unsafeSendLocal :: (Serializable a) => ProcessId -> a -> Process ()
108-
unsafeSendLocal pid msg' =
109-
sendCtrlMsg Nothing $ LocalSend pid (unsafeCreateUnencodedMessage msg')
150+
False -> liftIO $ do sendMessage (processNode proc)
151+
(ProcessIdentifier (processId proc))
152+
(ProcessIdentifier them)
153+
NoImplicitReconnect
154+
msg
155+
-- see [note: tracing]
156+
liftIO $ traceEvent (localEventBus (processNode proc))
157+
(MxSent them us (wrapMessage msg))
110158

111159
-- | Send a message unreliably.
112160
--
@@ -121,11 +169,20 @@ usend :: Serializable a => ProcessId -> a -> Process ()
121169
usend them msg = do
122170
proc <- ask
123171
let there = processNodeId them
172+
let (us, node) = (processId proc, processNode proc)
124173
if localNodeId (processNode proc) == there
125-
then sendCtrlMsg Nothing $
126-
LocalSend them (unsafeCreateUnencodedMessage msg)
127-
else sendCtrlMsg (Just there) $ UnreliableSend (processLocalId them)
128-
(createMessage msg)
174+
then unsafeSendLocal them msg
175+
else do sendCtrlMsg (Just there) $ UnreliableSend (processLocalId them)
176+
(createMessage msg)
177+
-- I would assert that is it *still* cheaper to not encode here...
178+
liftIO $ traceEvent (localEventBus node)
179+
(MxSent them us (wrapMessage msg))
180+
181+
unsafeSendLocal :: (Serializable a) => ProcessId -> a -> Process ()
182+
unsafeSendLocal pid msg =
183+
let msg' = wrapMessage msg in do
184+
sendCtrlMsg Nothing $ LocalSend pid msg'
185+
-- see [note: tracing]
129186

130187
-- | Send a message on a typed channel
131188
sendChan :: Serializable a => SendPort a -> a -> Process ()
@@ -144,7 +201,7 @@ sendChan (SendPort cid) msg = do
144201
where
145202
unsafeSendChanLocal :: (Serializable a) => SendPortId -> a -> Process ()
146203
unsafeSendChanLocal spId msg' =
147-
sendCtrlMsg Nothing $ LocalPortSend spId (unsafeCreateUnencodedMessage msg')
204+
sendCtrlMsg Nothing $ LocalPortSend spId (wrapMessage msg')
148205

149206
-- | Create an unencoded @Message@ for any @Serializable@ type.
150207
wrapMessage :: Serializable a => a -> Message

0 commit comments

Comments
 (0)