Skip to content

Commit d9fcd8d

Browse files
author
Tim Watson
committed
Merge branch 'local-send' into development
2 parents 1ab91c9 + 46f07e1 commit d9fcd8d

File tree

5 files changed

+209
-89
lines changed

5 files changed

+209
-89
lines changed

Makefile

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,26 +2,36 @@
22

33
GHC ?= $(shell which ghc)
44
CABAL ?= $(shell which cabal)
5+
CABAL_DEV ?= $(shell which cabal-fav)
56

67
BASE_GIT := git://github.com/haskell-distributed
78
REPOS=$(shell cat REPOS | sed '/^$$/d')
89

910
.PHONY: all
10-
all: $(REPOS)
11+
all: dev-install
1112

12-
$(REPOS):
13-
git clone $(BASE_GIT)/$@.git
13+
.PHONY: dev-install
14+
ifneq (,$(CABAL_DEV))
15+
dev-install:
16+
$(CABAL_DEV) install
17+
else
18+
dev-install:
19+
$(error install cabal-dev to proceed)
20+
endif
1421

15-
.PHONY: install
16-
install: $(REPOS)
22+
.PHONY: ci
23+
ci: travis-install travis-test
24+
25+
.PHONY: travis-install
26+
travis-install: $(REPOS)
1727
$(CABAL) install --with-ghc=$(GHC) $(REPOS) --force-reinstalls
1828
$(CABAL) install
1929

20-
.PHONY: ci
21-
ci: install test
22-
23-
.PHONY: test
24-
test:
30+
.PHONY: travis-test
31+
travis-test:
2532
$(CABAL) configure --enable-tests
2633
$(CABAL) build
2734
$(CABAL) test --show-details=always
35+
36+
$(REPOS):
37+
git clone $(BASE_GIT)/$@.git

src/Control/Distributed/Process/Internal/Primitives.hs

Lines changed: 99 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,7 @@ import Control.Distributed.Process.Internal.Types
163163
, ProcessInfo(..)
164164
, ProcessInfoNone(..)
165165
, createMessage
166+
, createUnencodedMessage
166167
, runLocalProcess
167168
, ImplicitReconnect(WithImplicitReconnect, NoImplicitReconnect)
168169
, LocalProcessState
@@ -181,6 +182,7 @@ import Control.Distributed.Process.Internal.WeakTQueue
181182
, readTQueue
182183
, mkWeakTQueue
183184
)
185+
import Unsafe.Coerce
184186

185187
--------------------------------------------------------------------------------
186188
-- Basic messaging --
@@ -192,11 +194,15 @@ send :: Serializable a => ProcessId -> a -> Process ()
192194
-- modify serializable to allow for stateful (IO) deserialization
193195
send them msg = do
194196
proc <- ask
195-
liftIO $ sendMessage (processNode proc)
196-
(ProcessIdentifier (processId proc))
197-
(ProcessIdentifier them)
198-
NoImplicitReconnect
199-
msg
197+
let node = localNodeId (processNode proc)
198+
destNode = (processNodeId them) in do
199+
case destNode == node of
200+
True -> sendLocal them msg
201+
False -> liftIO $ sendMessage (processNode proc)
202+
(ProcessIdentifier (processId proc))
203+
(ProcessIdentifier them)
204+
NoImplicitReconnect
205+
msg
200206

201207
-- | Wait for a message of a specific type
202208
expect :: forall a. Serializable a => Process a
@@ -234,11 +240,16 @@ newChan = do
234240
sendChan :: Serializable a => SendPort a -> a -> Process ()
235241
sendChan (SendPort cid) msg = do
236242
proc <- ask
237-
liftIO $ sendBinary (processNode proc)
238-
(ProcessIdentifier (processId proc))
239-
(SendPortIdentifier cid)
240-
NoImplicitReconnect
241-
msg
243+
let node = localNodeId (processNode proc)
244+
destNode = processNodeId (sendPortProcessId cid) in do
245+
case destNode == node of
246+
True -> sendChanLocal cid msg
247+
False -> do
248+
liftIO $ sendBinary (processNode proc)
249+
(ProcessIdentifier (processId proc))
250+
(SendPortIdentifier cid)
251+
NoImplicitReconnect
252+
msg
242253

243254
-- | Wait for a message on a typed channel
244255
receiveChan :: Serializable a => ReceivePort a -> Process a
@@ -315,15 +326,22 @@ match = matchIf (const True)
315326
-- | Match against any message of the right type that satisfies a predicate
316327
matchIf :: forall a b. Serializable a => (a -> Bool) -> (a -> Process b) -> Match b
317328
matchIf c p = Match $ MatchMsg $ \msg ->
318-
case messageFingerprint msg == fingerprint (undefined :: a) of
319-
True | c decoded -> Just (p decoded)
320-
where
321-
decoded :: a
322-
-- Make sure the value is fully decoded so that we don't hang to
323-
-- bytestrings when the process calling 'matchIf' doesn't process
324-
-- the values immediately
325-
!decoded = decode (messageEncoding msg)
326-
_ -> Nothing
329+
case messageFingerprint msg == fingerprint (undefined :: a) of
330+
False -> Nothing
331+
True -> case msg of
332+
(UnencodedMessage _ m) ->
333+
let m' = unsafeCoerce m :: a in
334+
case (c m') of
335+
True -> Just (p m')
336+
False -> Nothing
337+
(EncodedMessage _ _) ->
338+
if (c decoded) then Just (p decoded) else Nothing
339+
where
340+
decoded :: a
341+
-- Make sure the value is fully decoded so that we don't hang to
342+
-- bytestrings when the process calling 'matchIf' doesn't process
343+
-- the values immediately
344+
!decoded = decode (messageEncoding msg)
327345

328346
-- | Match against any message, regardless of the underlying (contained) type
329347
matchMessage :: (Message -> Process Message) -> Match Message
@@ -340,11 +358,16 @@ matchMessageIf c p = Match $ MatchMsg $ \msg ->
340358
forward :: Message -> ProcessId -> Process ()
341359
forward msg them = do
342360
proc <- ask
343-
liftIO $ sendPayload (processNode proc)
344-
(ProcessIdentifier (processId proc))
345-
(ProcessIdentifier them)
346-
NoImplicitReconnect
347-
(messageToPayload msg)
361+
let node = localNodeId (processNode proc)
362+
destNode = (processNodeId them) in do
363+
case destNode == node of
364+
True -> sendCtrlMsg Nothing (LocalSend them msg)
365+
False -> liftIO $ sendPayload (processNode proc)
366+
(ProcessIdentifier (processId proc))
367+
(ProcessIdentifier them)
368+
NoImplicitReconnect
369+
(messageToPayload msg)
370+
348371

349372
-- | Wrap a 'Serializable' value in a 'Message'. Note that 'Message's are
350373
-- 'Serializable' - like the datum they contain - but remember that deserializing
@@ -357,11 +380,11 @@ forward msg them = do
357380
-- send self (wrapMessage "blah")
358381
-- Nothing <- expectTimeout 1000000 :: Process (Maybe String)
359382
-- (Just m) <- expectTimeout 1000000 :: Process (Maybe Message)
360-
-- "blah" <- unwrapMessage m :: Process (Maybe String)
383+
-- (Just "blah") <- unwrapMessage m :: Process (Maybe String)
361384
-- @
362385
--
363386
wrapMessage :: Serializable a => a -> Message
364-
wrapMessage = createMessage
387+
wrapMessage = createMessage
365388

366389
-- | Attempt to unwrap a raw 'Message'.
367390
-- If the type of the decoded message payload matches the expected type, the
@@ -377,13 +400,16 @@ wrapMessage = createMessage
377400
unwrapMessage :: forall a. Serializable a => Message -> Process (Maybe a)
378401
unwrapMessage msg =
379402
case messageFingerprint msg == fingerprint (undefined :: a) of
380-
True -> return (Just (decoded))
381-
where
382-
decoded :: a
383-
-- Make sure the value is fully decoded so that we don't hang to
384-
-- bytestrings when the calling process doesn't evaluate immediately
385-
!decoded = decode (messageEncoding msg)
386-
_ -> return Nothing
403+
False -> return Nothing
404+
True -> case msg of
405+
(UnencodedMessage _ m) ->
406+
let m' = unsafeCoerce m :: a
407+
in return (Just m')
408+
(EncodedMessage _ _) ->
409+
return (Just (decoded))
410+
where
411+
decoded :: a -- note [decoding]
412+
!decoded = decode (messageEncoding msg)
387413

388414
-- | Attempt to handle a raw 'Message'.
389415
-- If the type of the message matches the type of the first argument to
@@ -393,23 +419,28 @@ unwrapMessage msg =
393419
-- evaluation proceeds, the resulting value with be wrapped with @Just@.
394420
--
395421
-- Intended for use in `catchesExit` and `matchAny` primitives.
396-
--
422+
--
397423
handleMessage :: forall a b. (Serializable a)
398424
=> Message -> (a -> Process b) -> Process (Maybe b)
399425
handleMessage msg proc = do
400-
case messageFingerprint msg == fingerprint (undefined :: a) of
401-
True -> do { r <- proc (decoded :: a); return (Just r) }
402-
where
403-
decoded :: a
404-
!decoded = decode (messageEncoding msg)
405-
_ -> return Nothing
426+
case messageFingerprint msg == fingerprint (undefined :: a) of
427+
False -> return Nothing
428+
True -> case msg of
429+
(UnencodedMessage _ m) ->
430+
let m' = unsafeCoerce m :: a -- note [decoding]
431+
in do { r <- proc m'; return (Just r) }
432+
(EncodedMessage _ _) ->
433+
do { r <- proc (decoded :: a); return (Just r) }
434+
where
435+
decoded :: a -- note [decoding]
436+
!decoded = decode (messageEncoding msg)
406437

407438
-- | Match against an arbitrary message. 'matchAny' removes the first available
408439
-- message from the process mailbox. To handle arbitrary /raw/ messages once
409440
-- removed from the mailbox, see 'handleMessage' and 'unwrapMessage'.
410441
--
411442
matchAny :: forall b. (Message -> Process b) -> Match b
412-
matchAny p = Match $ MatchMsg $ \msg -> Just (p msg)
443+
matchAny p = Match $ MatchMsg $ \msg -> Just (p msg)
413444

414445
-- | Match against an arbitrary message. Intended for use with 'handleMessage'
415446
-- and 'unwrapMessage', this function /only/ removes a message from the process
@@ -425,15 +456,26 @@ matchAnyIf :: forall a b. (Serializable a)
425456
-> (Message -> Process b)
426457
-> Match b
427458
matchAnyIf c p = Match $ MatchMsg $ \msg ->
428-
case messageFingerprint msg == fingerprint (undefined :: a) of
429-
True | c decoded -> Just (p msg)
459+
case messageFingerprint msg == fingerprint (undefined :: a) of
460+
True | check -> Just (p msg)
430461
where
431-
decoded :: a
432-
-- Make sure the value is fully decoded so that we don't hang to
433-
-- bytestrings when the calling process doesn't evaluate immediately
462+
check :: Bool
463+
!check =
464+
case msg of
465+
(EncodedMessage _ _) -> c decoded
466+
(UnencodedMessage _ m') -> c (unsafeCoerce m')
467+
468+
decoded :: a -- note [decoding]
434469
!decoded = decode (messageEncoding msg)
435470
_ -> Nothing
436471

472+
{- note [decoding]
473+
For an EncodedMessage, we need to ensure the value is fully decoded so that
474+
we don't hang to bytestrings if the calling process doesn't evaluate
475+
immediately. For UnencodedMessage we know (because the fingerprint comparison
476+
succeeds) that unsafeCoerce will not fail.
477+
-}
478+
437479
-- | Remove any message from the queue
438480
matchUnknown :: Process b -> Match b
439481
matchUnknown p = Match $ MatchMsg (const (Just p))
@@ -993,6 +1035,17 @@ trace s = do
9931035
-- Auxiliary functions --
9941036
--------------------------------------------------------------------------------
9951037

1038+
sendLocal :: (Serializable a) => ProcessId -> a -> Process ()
1039+
sendLocal pid msg =
1040+
sendCtrlMsg Nothing $ LocalSend pid (createUnencodedMessage msg)
1041+
1042+
sendChanLocal :: (Serializable a) => SendPortId -> a -> Process ()
1043+
sendChanLocal spId msg =
1044+
-- we *must* fully serialize/encode the message here, because
1045+
-- attempting to use `unsafeCoerce' in the node controller
1046+
-- won't work since we know nothing about the required type
1047+
sendCtrlMsg Nothing $ LocalPortSend spId (createUnencodedMessage msg)
1048+
9961049
getMonitorRefFor :: Identifier -> Process MonitorRef
9971050
getMonitorRefFor ident = do
9981051
proc <- ask

src/Control/Distributed/Process/Internal/Types.hs

Lines changed: 38 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ module Control.Distributed.Process.Internal.Types
3030
-- * Messages
3131
, Message(..)
3232
, createMessage
33+
, createUnencodedMessage
3334
, messageToPayload
3435
, payloadToMessage
3536
-- * Node controller user-visible data types
@@ -83,6 +84,7 @@ import qualified Data.ByteString.Lazy as BSL
8384
, toChunks
8485
, splitAt
8586
, fromChunks
87+
, length
8688
)
8789
import qualified Data.ByteString.Lazy.Internal as BSL (ByteString(..))
8890
import Data.Accessor (Accessor, accessor)
@@ -303,26 +305,40 @@ data ReceivePort a =
303305
--------------------------------------------------------------------------------
304306

305307
-- | Messages consist of their typeRep fingerprint and their encoding
306-
data Message = Message
308+
data Message =
309+
EncodedMessage
307310
{ messageFingerprint :: !Fingerprint
308311
, messageEncoding :: !BSL.ByteString
312+
} |
313+
forall a . Serializable a =>
314+
UnencodedMessage
315+
{
316+
messageFingerprint :: !Fingerprint
317+
, messagePayload :: !a
309318
}
310319
deriving (Typeable)
311320

312321
instance Show Message where
313-
show (Message fp enc) = show enc ++ " :: " ++ showFingerprint fp []
322+
show (EncodedMessage fp enc) = show enc ++ " :: " ++ showFingerprint fp []
323+
show (UnencodedMessage fp _) = "[unencoded message] :: " ++ (showFingerprint fp [])
314324

315325
-- | Turn any serialiable term into a message
316326
createMessage :: Serializable a => a -> Message
317-
createMessage a = Message (fingerprint a) (encode a)
327+
createMessage a = EncodedMessage (fingerprint a) (encode a)
328+
329+
-- | Turn any serializable term into an unencoded/local message
330+
createUnencodedMessage :: Serializable a => a -> Message
331+
createUnencodedMessage a =
332+
let encoded = encode a in BSL.length encoded `seq` UnencodedMessage (fingerprint a) a
318333

319334
-- | Serialize a message
320335
messageToPayload :: Message -> [BSS.ByteString]
321-
messageToPayload (Message fp enc) = encodeFingerprint fp : BSL.toChunks enc
336+
messageToPayload (EncodedMessage fp enc) = encodeFingerprint fp : BSL.toChunks enc
337+
messageToPayload (UnencodedMessage fp m) = messageToPayload ((EncodedMessage fp (encode m)))
322338

323339
-- | Deserialize a message
324340
payloadToMessage :: [BSS.ByteString] -> Message
325-
payloadToMessage payload = Message fp (copy msg)
341+
payloadToMessage payload = EncodedMessage fp (copy msg)
326342
where
327343
encFp :: BSL.ByteString
328344
msg :: BSL.ByteString
@@ -481,6 +497,8 @@ data ProcessSignal =
481497
| WhereIs !String
482498
| Register !String !NodeId !(Maybe ProcessId) !Bool -- Use 'Nothing' to unregister, use True to force reregister
483499
| NamedSend !String !Message
500+
| LocalSend !ProcessId !Message
501+
| LocalPortSend !SendPortId !Message
484502
| Kill !ProcessId !String
485503
| Exit !ProcessId !Message
486504
| GetInfo !ProcessId
@@ -523,18 +541,20 @@ instance Binary MonitorRef where
523541
get = MonitorRef <$> get <*> get
524542

525543
instance Binary ProcessSignal where
526-
put (Link pid) = putWord8 0 >> put pid
527-
put (Unlink pid) = putWord8 1 >> put pid
528-
put (Monitor ref) = putWord8 2 >> put ref
529-
put (Unmonitor ref) = putWord8 3 >> put ref
530-
put (Died who reason) = putWord8 4 >> put who >> put reason
531-
put (Spawn proc ref) = putWord8 5 >> put proc >> put ref
532-
put (WhereIs label) = putWord8 6 >> put label
544+
put (Link pid) = putWord8 0 >> put pid
545+
put (Unlink pid) = putWord8 1 >> put pid
546+
put (Monitor ref) = putWord8 2 >> put ref
547+
put (Unmonitor ref) = putWord8 3 >> put ref
548+
put (Died who reason) = putWord8 4 >> put who >> put reason
549+
put (Spawn proc ref) = putWord8 5 >> put proc >> put ref
550+
put (WhereIs label) = putWord8 6 >> put label
533551
put (Register label nid pid force) = putWord8 7 >> put label >> put nid >> put pid >> put force
534-
put (NamedSend label msg) = putWord8 8 >> put label >> put (messageToPayload msg)
535-
put (Kill pid reason) = putWord8 9 >> put pid >> put reason
536-
put (Exit pid reason) = putWord8 10 >> put pid >> put (messageToPayload reason)
537-
put (GetInfo about) = putWord8 30 >> put about
552+
put (NamedSend label msg) = putWord8 8 >> put label >> put (messageToPayload msg)
553+
put (Kill pid reason) = putWord8 9 >> put pid >> put reason
554+
put (Exit pid reason) = putWord8 10 >> put pid >> put (messageToPayload reason)
555+
put (LocalSend pid msg) = putWord8 11 >> put pid >> put (messageToPayload msg)
556+
put (LocalPortSend sid msg) = putWord8 12 >> put sid >> put (messageToPayload msg)
557+
put (GetInfo about) = putWord8 30 >> put about
538558
get = do
539559
header <- getWord8
540560
case header of
@@ -549,6 +569,8 @@ instance Binary ProcessSignal where
549569
8 -> NamedSend <$> get <*> (payloadToMessage <$> get)
550570
9 -> Kill <$> get <*> get
551571
10 -> Exit <$> get <*> (payloadToMessage <$> get)
572+
11 -> LocalSend <$> get <*> (payloadToMessage <$> get)
573+
12 -> LocalPortSend <$> get <*> (payloadToMessage <$> get)
552574
30 -> GetInfo <$> get
553575
_ -> fail "ProcessSignal.get: invalid"
554576

0 commit comments

Comments
 (0)