32
32
-- the /normal/ strategy).
33
33
--
34
34
-- Use of the functions in this module can potentially change the runtime
35
- -- behaviour of your application. You have been warned!
35
+ -- behaviour of your application. In addition, messages passed between Cloud
36
+ -- Haskell processes are written to a tracing infrastructure on the local node,
37
+ -- to provide improved introspection and debugging facilities for complex actor
38
+ -- based systems. This module makes no attempt to force evaluation in these
39
+ -- cases either, thus evaluation problems in passed data structures could not
40
+ -- only crash your processes, but could also bring down critical internal
41
+ -- services on which the node relies to function correctly.
42
+ --
43
+ -- If you wish to repudiate such issues, you are advised to consider the use
44
+ -- of NFSerialisable in the distributed-process-extras package, which type
45
+ -- class brings NFData into scope along with Serializable, such that we can
46
+ -- force evaluation. Intended for use with modules such as this one, this
47
+ -- approach guarantees correct evaluatedness in terms of @NFData@. Please note
48
+ -- however, that we /cannot/ guarantee that an @NFData@ instance will behave the
49
+ -- same way as a @Binary@ one with regards evaluation, so it is still possible
50
+ -- to introduce unexpected behaviour by using /unsafe/ primitives in this way.
51
+ --
52
+ -- You have been warned!
36
53
--
37
54
-- This module is exported so that you can replace the use of Cloud Haskell's
38
55
-- /safe/ messaging primitives. If you want to use both variants, then you can
@@ -55,7 +72,12 @@ import Control.Distributed.Process.Internal.Messaging
55
72
, sendBinary
56
73
, sendCtrlMsg
57
74
)
58
-
75
+ import Control.Distributed.Process.Management.Internal.Types
76
+ ( MxEvent (.. )
77
+ )
78
+ import Control.Distributed.Process.Management.Internal.Trace.Types
79
+ ( traceEvent
80
+ )
59
81
import Control.Distributed.Process.Internal.Types
60
82
( ProcessId (.. )
61
83
, NodeId (.. )
@@ -79,34 +101,60 @@ import Control.Monad.Reader (ask)
79
101
80
102
-- | Named send to a process in the local registry (asynchronous)
81
103
nsend :: Serializable a => String -> a -> Process ()
82
- nsend label msg =
83
- sendCtrlMsg Nothing (NamedSend label (unsafeCreateUnencodedMessage msg))
104
+ nsend label msg = do
105
+ proc <- ask
106
+ let us = processId proc
107
+ let node = localNodeId (processNode proc )
108
+ let msg' = wrapMessage msg
109
+ -- see [note: tracing]
110
+ liftIO $ traceEvent (localEventBus (processNode proc ))
111
+ (MxSentToName label us msg')
112
+ sendCtrlMsg Nothing (NamedSend label msg')
113
+
114
+ -- [note: tracing]
115
+ -- In the remote case, we do not fire a trace event until after sending is
116
+ -- complete, since 'sendMessage' can block in the networking stack.
117
+ --
118
+ -- In addition, MxSent trace messages are dispatched by the node controller's
119
+ -- thread for local sends, which also covers local nsend, and local usend.
120
+ --
121
+ -- Also note that tracing writes to the local node's control channel, and this
122
+ -- module explicitly specifies to its clients that it does unsafe message
123
+ -- encoding. The same is true for the messages it puts onto the Management
124
+ -- event bus, however we do *not* want unevaluated thunks hitting the event
125
+ -- bus control thread. Hence the word /Unsafe/ in this module's name!
126
+ --
84
127
85
128
-- | Named send to a process in a remote registry (asynchronous)
86
129
nsendRemote :: Serializable a => NodeId -> String -> a -> Process ()
87
130
nsendRemote nid label msg = do
88
131
proc <- ask
132
+ let us = processId proc
133
+ let node = processNode proc
89
134
if localNodeId (processNode proc ) == nid
90
135
then nsend label msg
91
- else sendCtrlMsg (Just nid) (NamedSend label (createMessage msg))
136
+ else do -- see [note: tracing] NB: this is a remote call to another NC...
137
+ sendCtrlMsg (Just nid) (NamedSend label (createMessage msg))
138
+ liftIO $ traceEvent (localEventBus node)
139
+ (MxSentToName label us (wrapMessage msg))
92
140
93
141
-- | Send a message
94
142
send :: Serializable a => ProcessId -> a -> Process ()
95
143
send them msg = do
96
144
proc <- ask
97
145
let node = localNodeId (processNode proc )
98
- destNode = (processNodeId them) in do
146
+ destNode = (processNodeId them)
147
+ us = (processId proc ) in do
99
148
case destNode == node of
100
149
True -> unsafeSendLocal them msg
101
- False -> liftIO $ sendMessage (processNode proc )
102
- (ProcessIdentifier (processId proc ))
103
- (ProcessIdentifier them)
104
- NoImplicitReconnect
105
- msg
106
- where
107
- unsafeSendLocal :: (Serializable a ) => ProcessId -> a -> Process ()
108
- unsafeSendLocal pid msg' =
109
- sendCtrlMsg Nothing $ LocalSend pid (unsafeCreateUnencodedMessage msg')
150
+ False -> liftIO $ do sendMessage (processNode proc )
151
+ (ProcessIdentifier (processId proc ))
152
+ (ProcessIdentifier them)
153
+ NoImplicitReconnect
154
+ msg
155
+ -- see [note: tracing]
156
+ liftIO $ traceEvent (localEventBus (processNode proc ))
157
+ (MxSent them us (wrapMessage msg))
110
158
111
159
-- | Send a message unreliably.
112
160
--
@@ -121,11 +169,20 @@ usend :: Serializable a => ProcessId -> a -> Process ()
121
169
usend them msg = do
122
170
proc <- ask
123
171
let there = processNodeId them
172
+ let (us, node) = (processId proc , processNode proc )
124
173
if localNodeId (processNode proc ) == there
125
- then sendCtrlMsg Nothing $
126
- LocalSend them (unsafeCreateUnencodedMessage msg)
127
- else sendCtrlMsg (Just there) $ UnreliableSend (processLocalId them)
128
- (createMessage msg)
174
+ then unsafeSendLocal them msg
175
+ else do sendCtrlMsg (Just there) $ UnreliableSend (processLocalId them)
176
+ (createMessage msg)
177
+ -- I would assert that is it *still* cheaper to not encode here...
178
+ liftIO $ traceEvent (localEventBus node)
179
+ (MxSent them us (wrapMessage msg))
180
+
181
+ unsafeSendLocal :: (Serializable a ) => ProcessId -> a -> Process ()
182
+ unsafeSendLocal pid msg =
183
+ let msg' = wrapMessage msg in do
184
+ sendCtrlMsg Nothing $ LocalSend pid msg'
185
+ -- see [note: tracing]
129
186
130
187
-- | Send a message on a typed channel
131
188
sendChan :: Serializable a => SendPort a -> a -> Process ()
@@ -144,7 +201,7 @@ sendChan (SendPort cid) msg = do
144
201
where
145
202
unsafeSendChanLocal :: (Serializable a ) => SendPortId -> a -> Process ()
146
203
unsafeSendChanLocal spId msg' =
147
- sendCtrlMsg Nothing $ LocalPortSend spId (unsafeCreateUnencodedMessage msg')
204
+ sendCtrlMsg Nothing $ LocalPortSend spId (wrapMessage msg')
148
205
149
206
-- | Create an unencoded @Message@ for any @Serializable@ type.
150
207
wrapMessage :: Serializable a => a -> Message
0 commit comments