@@ -163,6 +163,7 @@ import Control.Distributed.Process.Internal.Types
163
163
, ProcessInfo (.. )
164
164
, ProcessInfoNone (.. )
165
165
, createMessage
166
+ , createUnencodedMessage
166
167
, runLocalProcess
167
168
, ImplicitReconnect (WithImplicitReconnect , NoImplicitReconnect )
168
169
, LocalProcessState
@@ -181,6 +182,7 @@ import Control.Distributed.Process.Internal.WeakTQueue
181
182
, readTQueue
182
183
, mkWeakTQueue
183
184
)
185
+ import Unsafe.Coerce
184
186
185
187
--------------------------------------------------------------------------------
186
188
-- Basic messaging --
@@ -192,11 +194,15 @@ send :: Serializable a => ProcessId -> a -> Process ()
192
194
-- modify serializable to allow for stateful (IO) deserialization
193
195
send them msg = do
194
196
proc <- ask
195
- liftIO $ sendMessage (processNode proc )
196
- (ProcessIdentifier (processId proc ))
197
- (ProcessIdentifier them)
198
- NoImplicitReconnect
199
- msg
197
+ let node = localNodeId (processNode proc )
198
+ destNode = (processNodeId them) in do
199
+ case destNode == node of
200
+ True -> sendLocal them msg
201
+ False -> liftIO $ sendMessage (processNode proc )
202
+ (ProcessIdentifier (processId proc ))
203
+ (ProcessIdentifier them)
204
+ NoImplicitReconnect
205
+ msg
200
206
201
207
-- | Wait for a message of a specific type
202
208
expect :: forall a . Serializable a => Process a
@@ -234,11 +240,16 @@ newChan = do
234
240
sendChan :: Serializable a => SendPort a -> a -> Process ()
235
241
sendChan (SendPort cid) msg = do
236
242
proc <- ask
237
- liftIO $ sendBinary (processNode proc )
238
- (ProcessIdentifier (processId proc ))
239
- (SendPortIdentifier cid)
240
- NoImplicitReconnect
241
- msg
243
+ let node = localNodeId (processNode proc )
244
+ destNode = processNodeId (sendPortProcessId cid) in do
245
+ case destNode == node of
246
+ True -> sendChanLocal cid msg
247
+ False -> do
248
+ liftIO $ sendBinary (processNode proc )
249
+ (ProcessIdentifier (processId proc ))
250
+ (SendPortIdentifier cid)
251
+ NoImplicitReconnect
252
+ msg
242
253
243
254
-- | Wait for a message on a typed channel
244
255
receiveChan :: Serializable a => ReceivePort a -> Process a
@@ -315,15 +326,22 @@ match = matchIf (const True)
315
326
-- | Match against any message of the right type that satisfies a predicate
316
327
matchIf :: forall a b . Serializable a => (a -> Bool ) -> (a -> Process b ) -> Match b
317
328
matchIf c p = Match $ MatchMsg $ \ msg ->
318
- case messageFingerprint msg == fingerprint (undefined :: a ) of
319
- True | c decoded -> Just (p decoded)
320
- where
321
- decoded :: a
322
- -- Make sure the value is fully decoded so that we don't hang to
323
- -- bytestrings when the process calling 'matchIf' doesn't process
324
- -- the values immediately
325
- ! decoded = decode (messageEncoding msg)
326
- _ -> Nothing
329
+ case messageFingerprint msg == fingerprint (undefined :: a ) of
330
+ False -> Nothing
331
+ True -> case msg of
332
+ (UnencodedMessage _ m) ->
333
+ let m' = unsafeCoerce m :: a in
334
+ case (c m') of
335
+ True -> Just (p m')
336
+ False -> Nothing
337
+ (EncodedMessage _ _) ->
338
+ if (c decoded) then Just (p decoded) else Nothing
339
+ where
340
+ decoded :: a
341
+ -- Make sure the value is fully decoded so that we don't hang to
342
+ -- bytestrings when the process calling 'matchIf' doesn't process
343
+ -- the values immediately
344
+ ! decoded = decode (messageEncoding msg)
327
345
328
346
-- | Match against any message, regardless of the underlying (contained) type
329
347
matchMessage :: (Message -> Process Message ) -> Match Message
@@ -340,11 +358,16 @@ matchMessageIf c p = Match $ MatchMsg $ \msg ->
340
358
forward :: Message -> ProcessId -> Process ()
341
359
forward msg them = do
342
360
proc <- ask
343
- liftIO $ sendPayload (processNode proc )
344
- (ProcessIdentifier (processId proc ))
345
- (ProcessIdentifier them)
346
- NoImplicitReconnect
347
- (messageToPayload msg)
361
+ let node = localNodeId (processNode proc )
362
+ destNode = (processNodeId them) in do
363
+ case destNode == node of
364
+ True -> sendCtrlMsg Nothing (LocalSend them msg)
365
+ False -> liftIO $ sendPayload (processNode proc )
366
+ (ProcessIdentifier (processId proc ))
367
+ (ProcessIdentifier them)
368
+ NoImplicitReconnect
369
+ (messageToPayload msg)
370
+
348
371
349
372
-- | Wrap a 'Serializable' value in a 'Message'. Note that 'Message's are
350
373
-- 'Serializable' - like the datum they contain - but remember that deserializing
@@ -357,11 +380,11 @@ forward msg them = do
357
380
-- send self (wrapMessage "blah")
358
381
-- Nothing <- expectTimeout 1000000 :: Process (Maybe String)
359
382
-- (Just m) <- expectTimeout 1000000 :: Process (Maybe Message)
360
- -- "blah" <- unwrapMessage m :: Process (Maybe String)
383
+ -- (Just "blah") <- unwrapMessage m :: Process (Maybe String)
361
384
-- @
362
385
--
363
386
wrapMessage :: Serializable a => a -> Message
364
- wrapMessage = createMessage
387
+ wrapMessage = createMessage
365
388
366
389
-- | Attempt to unwrap a raw 'Message'.
367
390
-- If the type of the decoded message payload matches the expected type, the
@@ -377,13 +400,16 @@ wrapMessage = createMessage
377
400
unwrapMessage :: forall a . Serializable a => Message -> Process (Maybe a )
378
401
unwrapMessage msg =
379
402
case messageFingerprint msg == fingerprint (undefined :: a ) of
380
- True -> return (Just (decoded))
381
- where
382
- decoded :: a
383
- -- Make sure the value is fully decoded so that we don't hang to
384
- -- bytestrings when the calling process doesn't evaluate immediately
385
- ! decoded = decode (messageEncoding msg)
386
- _ -> return Nothing
403
+ False -> return Nothing
404
+ True -> case msg of
405
+ (UnencodedMessage _ m) ->
406
+ let m' = unsafeCoerce m :: a
407
+ in return (Just m')
408
+ (EncodedMessage _ _) ->
409
+ return (Just (decoded))
410
+ where
411
+ decoded :: a -- note [decoding]
412
+ ! decoded = decode (messageEncoding msg)
387
413
388
414
-- | Attempt to handle a raw 'Message'.
389
415
-- If the type of the message matches the type of the first argument to
@@ -393,23 +419,28 @@ unwrapMessage msg =
393
419
-- evaluation proceeds, the resulting value with be wrapped with @Just@.
394
420
--
395
421
-- Intended for use in `catchesExit` and `matchAny` primitives.
396
- --
422
+ --
397
423
handleMessage :: forall a b . (Serializable a )
398
424
=> Message -> (a -> Process b ) -> Process (Maybe b )
399
425
handleMessage msg proc = do
400
- case messageFingerprint msg == fingerprint (undefined :: a ) of
401
- True -> do { r <- proc (decoded :: a ); return (Just r) }
402
- where
403
- decoded :: a
404
- ! decoded = decode (messageEncoding msg)
405
- _ -> return Nothing
426
+ case messageFingerprint msg == fingerprint (undefined :: a ) of
427
+ False -> return Nothing
428
+ True -> case msg of
429
+ (UnencodedMessage _ m) ->
430
+ let m' = unsafeCoerce m :: a -- note [decoding]
431
+ in do { r <- proc m'; return (Just r) }
432
+ (EncodedMessage _ _) ->
433
+ do { r <- proc (decoded :: a ); return (Just r) }
434
+ where
435
+ decoded :: a -- note [decoding]
436
+ ! decoded = decode (messageEncoding msg)
406
437
407
438
-- | Match against an arbitrary message. 'matchAny' removes the first available
408
439
-- message from the process mailbox. To handle arbitrary /raw/ messages once
409
440
-- removed from the mailbox, see 'handleMessage' and 'unwrapMessage'.
410
441
--
411
442
matchAny :: forall b . (Message -> Process b ) -> Match b
412
- matchAny p = Match $ MatchMsg $ \ msg -> Just (p msg)
443
+ matchAny p = Match $ MatchMsg $ \ msg -> Just (p msg)
413
444
414
445
-- | Match against an arbitrary message. Intended for use with 'handleMessage'
415
446
-- and 'unwrapMessage', this function /only/ removes a message from the process
@@ -425,15 +456,26 @@ matchAnyIf :: forall a b. (Serializable a)
425
456
-> (Message -> Process b )
426
457
-> Match b
427
458
matchAnyIf c p = Match $ MatchMsg $ \ msg ->
428
- case messageFingerprint msg == fingerprint (undefined :: a ) of
429
- True | c decoded -> Just (p msg)
459
+ case messageFingerprint msg == fingerprint (undefined :: a ) of
460
+ True | check -> Just (p msg)
430
461
where
431
- decoded :: a
432
- -- Make sure the value is fully decoded so that we don't hang to
433
- -- bytestrings when the calling process doesn't evaluate immediately
462
+ check :: Bool
463
+ ! check =
464
+ case msg of
465
+ (EncodedMessage _ _) -> c decoded
466
+ (UnencodedMessage _ m') -> c (unsafeCoerce m')
467
+
468
+ decoded :: a -- note [decoding]
434
469
! decoded = decode (messageEncoding msg)
435
470
_ -> Nothing
436
471
472
+ {- note [decoding]
473
+ For an EncodedMessage, we need to ensure the value is fully decoded so that
474
+ we don't hang to bytestrings if the calling process doesn't evaluate
475
+ immediately. For UnencodedMessage we know (because the fingerprint comparison
476
+ succeeds) that unsafeCoerce will not fail.
477
+ -}
478
+
437
479
-- | Remove any message from the queue
438
480
matchUnknown :: Process b -> Match b
439
481
matchUnknown p = Match $ MatchMsg (const (Just p))
@@ -993,6 +1035,17 @@ trace s = do
993
1035
-- Auxiliary functions --
994
1036
--------------------------------------------------------------------------------
995
1037
1038
+ sendLocal :: (Serializable a ) => ProcessId -> a -> Process ()
1039
+ sendLocal pid msg =
1040
+ sendCtrlMsg Nothing $ LocalSend pid (createUnencodedMessage msg)
1041
+
1042
+ sendChanLocal :: (Serializable a ) => SendPortId -> a -> Process ()
1043
+ sendChanLocal spId msg =
1044
+ -- we *must* fully serialize/encode the message here, because
1045
+ -- attempting to use `unsafeCoerce' in the node controller
1046
+ -- won't work since we know nothing about the required type
1047
+ sendCtrlMsg Nothing $ LocalPortSend spId (createUnencodedMessage msg)
1048
+
996
1049
getMonitorRefFor :: Identifier -> Process MonitorRef
997
1050
getMonitorRefFor ident = do
998
1051
proc <- ask
0 commit comments