Skip to content

Commit 46f07e1

Browse files
author
Tim Watson
committed
DP-20 #resolve #comment Optimise local message passing
We skip the transport layer altogether for local send and write directly to the local NC control channel. Local sends do serialise the message (to ensure HNF) but we pass the unencoded data along with a fingerprint. Subsequent reads utilise unsafeCoerce if fingerprint comparison on the types succeeds.
1 parent 1ab91c9 commit 46f07e1

File tree

5 files changed

+209
-89
lines changed

5 files changed

+209
-89
lines changed

Makefile

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,26 +2,36 @@
22

33
GHC ?= $(shell which ghc)
44
CABAL ?= $(shell which cabal)
5+
CABAL_DEV ?= $(shell which cabal-fav)
56

67
BASE_GIT := git://github.com/haskell-distributed
78
REPOS=$(shell cat REPOS | sed '/^$$/d')
89

910
.PHONY: all
10-
all: $(REPOS)
11+
all: dev-install
1112

12-
$(REPOS):
13-
git clone $(BASE_GIT)/$@.git
13+
.PHONY: dev-install
14+
ifneq (,$(CABAL_DEV))
15+
dev-install:
16+
$(CABAL_DEV) install
17+
else
18+
dev-install:
19+
$(error install cabal-dev to proceed)
20+
endif
1421

15-
.PHONY: install
16-
install: $(REPOS)
22+
.PHONY: ci
23+
ci: travis-install travis-test
24+
25+
.PHONY: travis-install
26+
travis-install: $(REPOS)
1727
$(CABAL) install --with-ghc=$(GHC) $(REPOS) --force-reinstalls
1828
$(CABAL) install
1929

20-
.PHONY: ci
21-
ci: install test
22-
23-
.PHONY: test
24-
test:
30+
.PHONY: travis-test
31+
travis-test:
2532
$(CABAL) configure --enable-tests
2633
$(CABAL) build
2734
$(CABAL) test --show-details=always
35+
36+
$(REPOS):
37+
git clone $(BASE_GIT)/$@.git

src/Control/Distributed/Process/Internal/Primitives.hs

Lines changed: 99 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,7 @@ import Control.Distributed.Process.Internal.Types
163163
, ProcessInfo(..)
164164
, ProcessInfoNone(..)
165165
, createMessage
166+
, createUnencodedMessage
166167
, runLocalProcess
167168
, ImplicitReconnect(WithImplicitReconnect, NoImplicitReconnect)
168169
, LocalProcessState
@@ -181,6 +182,7 @@ import Control.Distributed.Process.Internal.WeakTQueue
181182
, readTQueue
182183
, mkWeakTQueue
183184
)
185+
import Unsafe.Coerce
184186

185187
--------------------------------------------------------------------------------
186188
-- Basic messaging --
@@ -192,11 +194,15 @@ send :: Serializable a => ProcessId -> a -> Process ()
192194
-- modify serializable to allow for stateful (IO) deserialization
193195
send them msg = do
194196
proc <- ask
195-
liftIO $ sendMessage (processNode proc)
196-
(ProcessIdentifier (processId proc))
197-
(ProcessIdentifier them)
198-
NoImplicitReconnect
199-
msg
197+
let node = localNodeId (processNode proc)
198+
destNode = (processNodeId them) in do
199+
case destNode == node of
200+
True -> sendLocal them msg
201+
False -> liftIO $ sendMessage (processNode proc)
202+
(ProcessIdentifier (processId proc))
203+
(ProcessIdentifier them)
204+
NoImplicitReconnect
205+
msg
200206

201207
-- | Wait for a message of a specific type
202208
expect :: forall a. Serializable a => Process a
@@ -234,11 +240,16 @@ newChan = do
234240
sendChan :: Serializable a => SendPort a -> a -> Process ()
235241
sendChan (SendPort cid) msg = do
236242
proc <- ask
237-
liftIO $ sendBinary (processNode proc)
238-
(ProcessIdentifier (processId proc))
239-
(SendPortIdentifier cid)
240-
NoImplicitReconnect
241-
msg
243+
let node = localNodeId (processNode proc)
244+
destNode = processNodeId (sendPortProcessId cid) in do
245+
case destNode == node of
246+
True -> sendChanLocal cid msg
247+
False -> do
248+
liftIO $ sendBinary (processNode proc)
249+
(ProcessIdentifier (processId proc))
250+
(SendPortIdentifier cid)
251+
NoImplicitReconnect
252+
msg
242253

243254
-- | Wait for a message on a typed channel
244255
receiveChan :: Serializable a => ReceivePort a -> Process a
@@ -315,15 +326,22 @@ match = matchIf (const True)
315326
-- | Match against any message of the right type that satisfies a predicate
316327
matchIf :: forall a b. Serializable a => (a -> Bool) -> (a -> Process b) -> Match b
317328
matchIf c p = Match $ MatchMsg $ \msg ->
318-
case messageFingerprint msg == fingerprint (undefined :: a) of
319-
True | c decoded -> Just (p decoded)
320-
where
321-
decoded :: a
322-
-- Make sure the value is fully decoded so that we don't hang to
323-
-- bytestrings when the process calling 'matchIf' doesn't process
324-
-- the values immediately
325-
!decoded = decode (messageEncoding msg)
326-
_ -> Nothing
329+
case messageFingerprint msg == fingerprint (undefined :: a) of
330+
False -> Nothing
331+
True -> case msg of
332+
(UnencodedMessage _ m) ->
333+
let m' = unsafeCoerce m :: a in
334+
case (c m') of
335+
True -> Just (p m')
336+
False -> Nothing
337+
(EncodedMessage _ _) ->
338+
if (c decoded) then Just (p decoded) else Nothing
339+
where
340+
decoded :: a
341+
-- Make sure the value is fully decoded so that we don't hang to
342+
-- bytestrings when the process calling 'matchIf' doesn't process
343+
-- the values immediately
344+
!decoded = decode (messageEncoding msg)
327345

328346
-- | Match against any message, regardless of the underlying (contained) type
329347
matchMessage :: (Message -> Process Message) -> Match Message
@@ -340,11 +358,16 @@ matchMessageIf c p = Match $ MatchMsg $ \msg ->
340358
forward :: Message -> ProcessId -> Process ()
341359
forward msg them = do
342360
proc <- ask
343-
liftIO $ sendPayload (processNode proc)
344-
(ProcessIdentifier (processId proc))
345-
(ProcessIdentifier them)
346-
NoImplicitReconnect
347-
(messageToPayload msg)
361+
let node = localNodeId (processNode proc)
362+
destNode = (processNodeId them) in do
363+
case destNode == node of
364+
True -> sendCtrlMsg Nothing (LocalSend them msg)
365+
False -> liftIO $ sendPayload (processNode proc)
366+
(ProcessIdentifier (processId proc))
367+
(ProcessIdentifier them)
368+
NoImplicitReconnect
369+
(messageToPayload msg)
370+
348371

349372
-- | Wrap a 'Serializable' value in a 'Message'. Note that 'Message's are
350373
-- 'Serializable' - like the datum they contain - but remember that deserializing
@@ -357,11 +380,11 @@ forward msg them = do
357380
-- send self (wrapMessage "blah")
358381
-- Nothing <- expectTimeout 1000000 :: Process (Maybe String)
359382
-- (Just m) <- expectTimeout 1000000 :: Process (Maybe Message)
360-
-- "blah" <- unwrapMessage m :: Process (Maybe String)
383+
-- (Just "blah") <- unwrapMessage m :: Process (Maybe String)
361384
-- @
362385
--
363386
wrapMessage :: Serializable a => a -> Message
364-
wrapMessage = createMessage
387+
wrapMessage = createMessage
365388

366389
-- | Attempt to unwrap a raw 'Message'.
367390
-- If the type of the decoded message payload matches the expected type, the
@@ -377,13 +400,16 @@ wrapMessage = createMessage
377400
unwrapMessage :: forall a. Serializable a => Message -> Process (Maybe a)
378401
unwrapMessage msg =
379402
case messageFingerprint msg == fingerprint (undefined :: a) of
380-
True -> return (Just (decoded))
381-
where
382-
decoded :: a
383-
-- Make sure the value is fully decoded so that we don't hang to
384-
-- bytestrings when the calling process doesn't evaluate immediately
385-
!decoded = decode (messageEncoding msg)
386-
_ -> return Nothing
403+
False -> return Nothing
404+
True -> case msg of
405+
(UnencodedMessage _ m) ->
406+
let m' = unsafeCoerce m :: a
407+
in return (Just m')
408+
(EncodedMessage _ _) ->
409+
return (Just (decoded))
410+
where
411+
decoded :: a -- note [decoding]
412+
!decoded = decode (messageEncoding msg)
387413

388414
-- | Attempt to handle a raw 'Message'.
389415
-- If the type of the message matches the type of the first argument to
@@ -393,23 +419,28 @@ unwrapMessage msg =
393419
-- evaluation proceeds, the resulting value with be wrapped with @Just@.
394420
--
395421
-- Intended for use in `catchesExit` and `matchAny` primitives.
396-
--
422+
--
397423
handleMessage :: forall a b. (Serializable a)
398424
=> Message -> (a -> Process b) -> Process (Maybe b)
399425
handleMessage msg proc = do
400-
case messageFingerprint msg == fingerprint (undefined :: a) of
401-
True -> do { r <- proc (decoded :: a); return (Just r) }
402-
where
403-
decoded :: a
404-
!decoded = decode (messageEncoding msg)
405-
_ -> return Nothing
426+
case messageFingerprint msg == fingerprint (undefined :: a) of
427+
False -> return Nothing
428+
True -> case msg of
429+
(UnencodedMessage _ m) ->
430+
let m' = unsafeCoerce m :: a -- note [decoding]
431+
in do { r <- proc m'; return (Just r) }
432+
(EncodedMessage _ _) ->
433+
do { r <- proc (decoded :: a); return (Just r) }
434+
where
435+
decoded :: a -- note [decoding]
436+
!decoded = decode (messageEncoding msg)
406437

407438
-- | Match against an arbitrary message. 'matchAny' removes the first available
408439
-- message from the process mailbox. To handle arbitrary /raw/ messages once
409440
-- removed from the mailbox, see 'handleMessage' and 'unwrapMessage'.
410441
--
411442
matchAny :: forall b. (Message -> Process b) -> Match b
412-
matchAny p = Match $ MatchMsg $ \msg -> Just (p msg)
443+
matchAny p = Match $ MatchMsg $ \msg -> Just (p msg)
413444

414445
-- | Match against an arbitrary message. Intended for use with 'handleMessage'
415446
-- and 'unwrapMessage', this function /only/ removes a message from the process
@@ -425,15 +456,26 @@ matchAnyIf :: forall a b. (Serializable a)
425456
-> (Message -> Process b)
426457
-> Match b
427458
matchAnyIf c p = Match $ MatchMsg $ \msg ->
428-
case messageFingerprint msg == fingerprint (undefined :: a) of
429-
True | c decoded -> Just (p msg)
459+
case messageFingerprint msg == fingerprint (undefined :: a) of
460+
True | check -> Just (p msg)
430461
where
431-
decoded :: a
432-
-- Make sure the value is fully decoded so that we don't hang to
433-
-- bytestrings when the calling process doesn't evaluate immediately
462+
check :: Bool
463+
!check =
464+
case msg of
465+
(EncodedMessage _ _) -> c decoded
466+
(UnencodedMessage _ m') -> c (unsafeCoerce m')
467+
468+
decoded :: a -- note [decoding]
434469
!decoded = decode (messageEncoding msg)
435470
_ -> Nothing
436471

472+
{- note [decoding]
473+
For an EncodedMessage, we need to ensure the value is fully decoded so that
474+
we don't hang to bytestrings if the calling process doesn't evaluate
475+
immediately. For UnencodedMessage we know (because the fingerprint comparison
476+
succeeds) that unsafeCoerce will not fail.
477+
-}
478+
437479
-- | Remove any message from the queue
438480
matchUnknown :: Process b -> Match b
439481
matchUnknown p = Match $ MatchMsg (const (Just p))
@@ -993,6 +1035,17 @@ trace s = do
9931035
-- Auxiliary functions --
9941036
--------------------------------------------------------------------------------
9951037

1038+
sendLocal :: (Serializable a) => ProcessId -> a -> Process ()
1039+
sendLocal pid msg =
1040+
sendCtrlMsg Nothing $ LocalSend pid (createUnencodedMessage msg)
1041+
1042+
sendChanLocal :: (Serializable a) => SendPortId -> a -> Process ()
1043+
sendChanLocal spId msg =
1044+
-- we *must* fully serialize/encode the message here, because
1045+
-- attempting to use `unsafeCoerce' in the node controller
1046+
-- won't work since we know nothing about the required type
1047+
sendCtrlMsg Nothing $ LocalPortSend spId (createUnencodedMessage msg)
1048+
9961049
getMonitorRefFor :: Identifier -> Process MonitorRef
9971050
getMonitorRefFor ident = do
9981051
proc <- ask

src/Control/Distributed/Process/Internal/Types.hs

Lines changed: 38 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ module Control.Distributed.Process.Internal.Types
3030
-- * Messages
3131
, Message(..)
3232
, createMessage
33+
, createUnencodedMessage
3334
, messageToPayload
3435
, payloadToMessage
3536
-- * Node controller user-visible data types
@@ -83,6 +84,7 @@ import qualified Data.ByteString.Lazy as BSL
8384
, toChunks
8485
, splitAt
8586
, fromChunks
87+
, length
8688
)
8789
import qualified Data.ByteString.Lazy.Internal as BSL (ByteString(..))
8890
import Data.Accessor (Accessor, accessor)
@@ -303,26 +305,40 @@ data ReceivePort a =
303305
--------------------------------------------------------------------------------
304306

305307
-- | Messages consist of their typeRep fingerprint and their encoding
306-
data Message = Message
308+
data Message =
309+
EncodedMessage
307310
{ messageFingerprint :: !Fingerprint
308311
, messageEncoding :: !BSL.ByteString
312+
} |
313+
forall a . Serializable a =>
314+
UnencodedMessage
315+
{
316+
messageFingerprint :: !Fingerprint
317+
, messagePayload :: !a
309318
}
310319
deriving (Typeable)
311320

312321
instance Show Message where
313-
show (Message fp enc) = show enc ++ " :: " ++ showFingerprint fp []
322+
show (EncodedMessage fp enc) = show enc ++ " :: " ++ showFingerprint fp []
323+
show (UnencodedMessage fp _) = "[unencoded message] :: " ++ (showFingerprint fp [])
314324

315325
-- | Turn any serialiable term into a message
316326
createMessage :: Serializable a => a -> Message
317-
createMessage a = Message (fingerprint a) (encode a)
327+
createMessage a = EncodedMessage (fingerprint a) (encode a)
328+
329+
-- | Turn any serializable term into an unencoded/local message
330+
createUnencodedMessage :: Serializable a => a -> Message
331+
createUnencodedMessage a =
332+
let encoded = encode a in BSL.length encoded `seq` UnencodedMessage (fingerprint a) a
318333

319334
-- | Serialize a message
320335
messageToPayload :: Message -> [BSS.ByteString]
321-
messageToPayload (Message fp enc) = encodeFingerprint fp : BSL.toChunks enc
336+
messageToPayload (EncodedMessage fp enc) = encodeFingerprint fp : BSL.toChunks enc
337+
messageToPayload (UnencodedMessage fp m) = messageToPayload ((EncodedMessage fp (encode m)))
322338

323339
-- | Deserialize a message
324340
payloadToMessage :: [BSS.ByteString] -> Message
325-
payloadToMessage payload = Message fp (copy msg)
341+
payloadToMessage payload = EncodedMessage fp (copy msg)
326342
where
327343
encFp :: BSL.ByteString
328344
msg :: BSL.ByteString
@@ -481,6 +497,8 @@ data ProcessSignal =
481497
| WhereIs !String
482498
| Register !String !NodeId !(Maybe ProcessId) !Bool -- Use 'Nothing' to unregister, use True to force reregister
483499
| NamedSend !String !Message
500+
| LocalSend !ProcessId !Message
501+
| LocalPortSend !SendPortId !Message
484502
| Kill !ProcessId !String
485503
| Exit !ProcessId !Message
486504
| GetInfo !ProcessId
@@ -523,18 +541,20 @@ instance Binary MonitorRef where
523541
get = MonitorRef <$> get <*> get
524542

525543
instance Binary ProcessSignal where
526-
put (Link pid) = putWord8 0 >> put pid
527-
put (Unlink pid) = putWord8 1 >> put pid
528-
put (Monitor ref) = putWord8 2 >> put ref
529-
put (Unmonitor ref) = putWord8 3 >> put ref
530-
put (Died who reason) = putWord8 4 >> put who >> put reason
531-
put (Spawn proc ref) = putWord8 5 >> put proc >> put ref
532-
put (WhereIs label) = putWord8 6 >> put label
544+
put (Link pid) = putWord8 0 >> put pid
545+
put (Unlink pid) = putWord8 1 >> put pid
546+
put (Monitor ref) = putWord8 2 >> put ref
547+
put (Unmonitor ref) = putWord8 3 >> put ref
548+
put (Died who reason) = putWord8 4 >> put who >> put reason
549+
put (Spawn proc ref) = putWord8 5 >> put proc >> put ref
550+
put (WhereIs label) = putWord8 6 >> put label
533551
put (Register label nid pid force) = putWord8 7 >> put label >> put nid >> put pid >> put force
534-
put (NamedSend label msg) = putWord8 8 >> put label >> put (messageToPayload msg)
535-
put (Kill pid reason) = putWord8 9 >> put pid >> put reason
536-
put (Exit pid reason) = putWord8 10 >> put pid >> put (messageToPayload reason)
537-
put (GetInfo about) = putWord8 30 >> put about
552+
put (NamedSend label msg) = putWord8 8 >> put label >> put (messageToPayload msg)
553+
put (Kill pid reason) = putWord8 9 >> put pid >> put reason
554+
put (Exit pid reason) = putWord8 10 >> put pid >> put (messageToPayload reason)
555+
put (LocalSend pid msg) = putWord8 11 >> put pid >> put (messageToPayload msg)
556+
put (LocalPortSend sid msg) = putWord8 12 >> put sid >> put (messageToPayload msg)
557+
put (GetInfo about) = putWord8 30 >> put about
538558
get = do
539559
header <- getWord8
540560
case header of
@@ -549,6 +569,8 @@ instance Binary ProcessSignal where
549569
8 -> NamedSend <$> get <*> (payloadToMessage <$> get)
550570
9 -> Kill <$> get <*> get
551571
10 -> Exit <$> get <*> (payloadToMessage <$> get)
572+
11 -> LocalSend <$> get <*> (payloadToMessage <$> get)
573+
12 -> LocalPortSend <$> get <*> (payloadToMessage <$> get)
552574
30 -> GetInfo <$> get
553575
_ -> fail "ProcessSignal.get: invalid"
554576

0 commit comments

Comments
 (0)