Skip to content

Commit 1ab91c9

Browse files
author
Tim Watson
committed
DP-57 DP-7 #resolve #comment merge branch expose-message into development
2 parents 75ca8db + 23282e0 commit 1ab91c9

File tree

4 files changed

+168
-64
lines changed

4 files changed

+168
-64
lines changed

src/Control/Distributed/Process.hs

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,10 +42,16 @@ module Control.Distributed.Process
4242
, match
4343
, matchIf
4444
, matchUnknown
45-
, AbstractMessage(..)
4645
, matchAny
4746
, matchAnyIf
4847
, matchChan
48+
, Message
49+
, matchMessage
50+
, matchMessageIf
51+
, wrapMessage
52+
, unwrapMessage
53+
, handleMessage
54+
, forward
4955
-- * Process management
5056
, spawn
5157
, call
@@ -167,6 +173,7 @@ import Control.Distributed.Process.Internal.Types
167173
, WhereIsReply(..)
168174
, RegisterReply(..)
169175
, LocalProcess(processNode)
176+
, Message
170177
, nullProcessId
171178
)
172179
import Control.Distributed.Process.Serializable (Serializable, SerializableDict)
@@ -199,10 +206,15 @@ import Control.Distributed.Process.Internal.Primitives
199206
, match
200207
, matchIf
201208
, matchUnknown
202-
, AbstractMessage(..)
203209
, matchAny
204210
, matchAnyIf
205211
, matchChan
212+
, matchMessage
213+
, matchMessageIf
214+
, wrapMessage
215+
, unwrapMessage
216+
, handleMessage
217+
, forward
206218
-- Process management
207219
, terminate
208220
, ProcessTerminationException(..)

src/Control/Distributed/Process/Internal/Primitives.hs

Lines changed: 103 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,15 @@ module Control.Distributed.Process.Internal.Primitives
1919
, match
2020
, matchIf
2121
, matchUnknown
22-
, AbstractMessage(..)
2322
, matchAny
2423
, matchAnyIf
2524
, matchChan
25+
, matchMessage
26+
, matchMessageIf
27+
, wrapMessage
28+
, unwrapMessage
29+
, handleMessage
30+
, forward
2631
-- * Process management
2732
, terminate
2833
, ProcessTerminationException(..)
@@ -320,69 +325,115 @@ matchIf c p = Match $ MatchMsg $ \msg ->
320325
!decoded = decode (messageEncoding msg)
321326
_ -> Nothing
322327

323-
-- | Represents a received message and provides two basic operations on it.
324-
data AbstractMessage = AbstractMessage {
325-
forward :: ProcessId -> Process () -- ^ forward the message to @ProcessId@
326-
, maybeHandleMessage :: forall a b. (Serializable a)
327-
=> (a -> Process b) -> Process (Maybe b) {- ^ Handle the message.
328-
If the type of the message matches the type of the first argument to
329-
the supplied expression, then the expression will be evaluated against
330-
it. If this runtime type checking fails, then @Nothing@ will be returned
331-
to indicate the fact. If the check succeeds and evaluation proceeds
332-
however, the resulting value with be wrapped with @Just@.
333-
-}
334-
}
328+
-- | Match against any message, regardless of the underlying (contained) type
329+
matchMessage :: (Message -> Process Message) -> Match Message
330+
matchMessage p = Match $ MatchMsg $ \msg -> Just (p msg)
331+
332+
-- | Match against any message (regardless of underlying type) that satisfies a predicate
333+
matchMessageIf :: (Message -> Bool) -> (Message -> Process Message) -> Match Message
334+
matchMessageIf c p = Match $ MatchMsg $ \msg ->
335+
case (c msg) of
336+
True -> Just (p msg)
337+
False -> Nothing
338+
339+
-- | Forward a raw 'Message' to the given 'ProcessId'.
340+
forward :: Message -> ProcessId -> Process ()
341+
forward msg them = do
342+
proc <- ask
343+
liftIO $ sendPayload (processNode proc)
344+
(ProcessIdentifier (processId proc))
345+
(ProcessIdentifier them)
346+
NoImplicitReconnect
347+
(messageToPayload msg)
348+
349+
-- | Wrap a 'Serializable' value in a 'Message'. Note that 'Message's are
350+
-- 'Serializable' - like the datum they contain - but remember that deserializing
351+
-- a 'Message' will yield a 'Message', not the type within it! To obtain the
352+
-- wrapped datum, use 'unwrapMessage' or 'handleMessage' with a specific type.
353+
--
354+
-- @
355+
-- do
356+
-- self <- getSelfPid
357+
-- send self (wrapMessage "blah")
358+
-- Nothing <- expectTimeout 1000000 :: Process (Maybe String)
359+
-- (Just m) <- expectTimeout 1000000 :: Process (Maybe Message)
360+
-- "blah" <- unwrapMessage m :: Process (Maybe String)
361+
-- @
362+
--
363+
wrapMessage :: Serializable a => a -> Message
364+
wrapMessage = createMessage
365+
366+
-- | Attempt to unwrap a raw 'Message'.
367+
-- If the type of the decoded message payload matches the expected type, the
368+
-- value will be returned with @Just@, otherwise @Nothing@ indicates the types
369+
-- do not match.
370+
--
371+
-- This expression, for example, will evaluate to @Nothing@
372+
-- > unwrapMessage (wrapMessage "foobar") :: Process (Maybe Int)
373+
--
374+
-- Whereas this expression, will yield @Just "foo"@
375+
-- > unwrapMessage (wrapMessage "foo") :: Process (Maybe String)
376+
--
377+
unwrapMessage :: forall a. Serializable a => Message -> Process (Maybe a)
378+
unwrapMessage msg =
379+
case messageFingerprint msg == fingerprint (undefined :: a) of
380+
True -> return (Just (decoded))
381+
where
382+
decoded :: a
383+
-- Make sure the value is fully decoded so that we don't hang to
384+
-- bytestrings when the calling process doesn't evaluate immediately
385+
!decoded = decode (messageEncoding msg)
386+
_ -> return Nothing
387+
388+
-- | Attempt to handle a raw 'Message'.
389+
-- If the type of the message matches the type of the first argument to
390+
-- the supplied expression, then the message will be decoded and the expression
391+
-- evaluated against its value. If this runtime type checking fails however,
392+
-- @Nothing@ will be returned to indicate the fact. If the check succeeds and
393+
-- evaluation proceeds, the resulting value with be wrapped with @Just@.
394+
--
395+
-- Intended for use in `catchesExit` and `matchAny` primitives.
396+
--
397+
handleMessage :: forall a b. (Serializable a)
398+
=> Message -> (a -> Process b) -> Process (Maybe b)
399+
handleMessage msg proc = do
400+
case messageFingerprint msg == fingerprint (undefined :: a) of
401+
True -> do { r <- proc (decoded :: a); return (Just r) }
402+
where
403+
decoded :: a
404+
!decoded = decode (messageEncoding msg)
405+
_ -> return Nothing
335406

336407
-- | Match against an arbitrary message. 'matchAny' removes the first available
337-
-- message from the process mailbox, and via the 'AbstractMessage' type,
338-
-- supports forwarding /or/ handling the message /if/ it is of the correct
339-
-- type. If /not/ of the right type, then the 'AbstractMessage'
340-
-- @maybeHandleMessage@ function will not evaluate the supplied expression,
341-
-- /but/ the message will still have been removed from the process mailbox!
342-
--
343-
matchAny :: forall b. (AbstractMessage -> Process b) -> Match b
344-
matchAny p = Match $ MatchMsg $ Just . p . abstract
345-
346-
-- | Match against an arbitrary message. 'matchAnyIf' will /only/ remove the
347-
-- message from the process mailbox, /if/ the supplied condition matches. The
348-
-- success (or failure) of runtime type checks in @maybeHandleMessage@ does not
349-
-- count here, i.e., if the condition evaluates to @True@ then the message will
408+
-- message from the process mailbox. To handle arbitrary /raw/ messages once
409+
-- removed from the mailbox, see 'handleMessage' and 'unwrapMessage'.
410+
--
411+
matchAny :: forall b. (Message -> Process b) -> Match b
412+
matchAny p = Match $ MatchMsg $ \msg -> Just (p msg)
413+
414+
-- | Match against an arbitrary message. Intended for use with 'handleMessage'
415+
-- and 'unwrapMessage', this function /only/ removes a message from the process
416+
-- mailbox, /if/ the supplied condition matches. The success (or failure) of
417+
-- runtime type checks deferred to @handleMessage@ and friends is irrelevant
418+
-- here, i.e., if the condition evaluates to @True@ then the message will
350419
-- be removed from the process mailbox and decoded, but that does /not/
351-
-- guarantee that an expression passed to @maybeHandleMessage@ will pass the
352-
-- runtime type checks and therefore be evaluated. If the types do not match
353-
-- up, then @maybeHandleMessage@ returns 'Nothing'.
420+
-- guarantee that an expression passed to @handleMessage@ will pass the
421+
-- runtime type checks and therefore be evaluated.
422+
--
354423
matchAnyIf :: forall a b. (Serializable a)
355424
=> (a -> Bool)
356-
-> (AbstractMessage -> Process b)
425+
-> (Message -> Process b)
357426
-> Match b
358427
matchAnyIf c p = Match $ MatchMsg $ \msg ->
359428
case messageFingerprint msg == fingerprint (undefined :: a) of
360-
True | c decoded -> Just (p (abstract msg))
429+
True | c decoded -> Just (p msg)
361430
where
362431
decoded :: a
363432
-- Make sure the value is fully decoded so that we don't hang to
364433
-- bytestrings when the calling process doesn't evaluate immediately
365434
!decoded = decode (messageEncoding msg)
366435
_ -> Nothing
367436

368-
abstract :: Message -> AbstractMessage
369-
abstract msg = AbstractMessage {
370-
forward = \them -> do
371-
proc <- ask
372-
liftIO $ sendPayload (processNode proc)
373-
(ProcessIdentifier (processId proc))
374-
(ProcessIdentifier them)
375-
NoImplicitReconnect
376-
(messageToPayload msg)
377-
, maybeHandleMessage = \(proc :: (a -> Process b)) -> do
378-
case messageFingerprint msg == fingerprint (undefined :: a) of
379-
True -> do { r <- proc (decoded :: a); return (Just r) }
380-
where
381-
decoded :: a
382-
!decoded = decode (messageEncoding msg)
383-
_ -> return Nothing
384-
}
385-
386437
-- | Remove any message from the queue
387438
matchUnknown :: Process b -> Match b
388439
matchUnknown p = Match $ MatchMsg (const (Just p))
@@ -461,16 +512,16 @@ catchExit act exitHandler = catch act handleExit
461512
--
462513
-- See 'maybeHandleMessage' and 'AsbtractMessage' for more details.
463514
catchesExit :: Process b
464-
-> [(ProcessId -> AbstractMessage -> (Process (Maybe b)))]
515+
-> [(ProcessId -> Message -> (Process (Maybe b)))]
465516
-> Process b
466517
catchesExit act handlers = catch act ((flip handleExit) handlers)
467518
where
468519
handleExit :: ProcessExitException
469-
-> [(ProcessId -> AbstractMessage -> Process (Maybe b))]
520+
-> [(ProcessId -> Message -> Process (Maybe b))]
470521
-> Process b
471522
handleExit ex [] = liftIO $ throwIO ex
472523
handleExit ex@(ProcessExitException from msg) (h:hs) = do
473-
r <- h from (abstract msg)
524+
r <- h from msg
474525
case r of
475526
Nothing -> handleExit ex hs
476527
Just p -> return p

src/Control/Distributed/Process/Internal/Types.hs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -307,6 +307,7 @@ data Message = Message
307307
{ messageFingerprint :: !Fingerprint
308308
, messageEncoding :: !BSL.ByteString
309309
}
310+
deriving (Typeable)
310311

311312
instance Show Message where
312313
show (Message fp enc) = show enc ++ " :: " ++ showFingerprint fp []
@@ -489,6 +490,10 @@ data ProcessSignal =
489490
-- Binary instances --
490491
--------------------------------------------------------------------------------
491492

493+
instance Binary Message where
494+
put msg = put $ messageToPayload msg
495+
get = payloadToMessage <$> get
496+
492497
instance Binary LocalProcessId where
493498
put lpid = put (lpidUnique lpid) >> put (lpidCounter lpid)
494499
get = LocalProcessId <$> get <*> get

tests/TestCH.hs

Lines changed: 46 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import Control.Distributed.Process.Internal.Types
3232
, LocalNode(localEndPoint)
3333
, ProcessExitException(..)
3434
, nullProcessId
35+
, Message
3536
)
3637
import Control.Distributed.Process.Node
3738
import Control.Distributed.Process.Serializable (Serializable)
@@ -789,9 +790,9 @@ testMatchAnyHandle transport = do
789790
liftIO $ putMVar clientDone ()
790791

791792
takeMVar clientDone
792-
where maybeForward :: ProcessId -> AbstractMessage -> Process (Maybe ())
793+
where maybeForward :: ProcessId -> Message -> Process (Maybe ())
793794
maybeForward s msg =
794-
maybeHandleMessage msg (\m@(Add _ _ _) -> send s m)
795+
handleMessage msg (\m@(Add _ _ _) -> send s m)
795796

796797
testMatchAnyNoHandle :: NT.Transport -> Assertion
797798
testMatchAnyNoHandle transport = do
@@ -810,7 +811,7 @@ testMatchAnyNoHandle transport = do
810811
-- the match `AbstractMessage -> Process ()` will succeed!
811812
(\m -> do
812813
-- `String -> Process ()` does *not* match the input types however
813-
r <- (maybeHandleMessage m (\(_ :: String) -> die "NONSENSE" ))
814+
r <- (handleMessage m (\(_ :: String) -> die "NONSENSE" ))
814815
case r of
815816
Nothing -> return ()
816817
Just _ -> die "NONSENSE")
@@ -848,7 +849,7 @@ testMatchAnyIf transport = do
848849
echoServer <- forkProcess localNode $ forever $ do
849850
receiveWait [
850851
matchAnyIf (\(_ :: ProcessId, (s :: String)) -> s /= "bar")
851-
handleMessage
852+
tryHandleMessage
852853
]
853854
putMVar echoAddr echoServer
854855

@@ -868,11 +869,45 @@ testMatchAnyIf transport = do
868869
liftIO $ putMVar clientDone ()
869870

870871
takeMVar clientDone
871-
where handleMessage :: AbstractMessage -> Process (Maybe ())
872-
handleMessage msg =
873-
maybeHandleMessage msg (\(pid :: ProcessId, (m :: String))
872+
where tryHandleMessage :: Message -> Process (Maybe ())
873+
tryHandleMessage msg =
874+
handleMessage msg (\(pid :: ProcessId, (m :: String))
874875
-> do { send pid m; return () })
875876

877+
testMatchMessageWithUnwrap :: NT.Transport -> Assertion
878+
testMatchMessageWithUnwrap transport = do
879+
echoAddr <- newEmptyMVar
880+
clientDone <- newEmptyMVar
881+
882+
-- echo server
883+
forkIO $ do
884+
localNode <- newLocalNode transport initRemoteTable
885+
echoServer <- forkProcess localNode $ forever $ do
886+
msg <- receiveWait [
887+
matchMessage (\(m :: Message) -> do
888+
return m)
889+
]
890+
unwrapped <- unwrapMessage msg :: Process (Maybe (ProcessId, Message))
891+
case unwrapped of
892+
(Just (p, msg')) -> forward msg' p
893+
Nothing -> die "unable to unwrap the message"
894+
putMVar echoAddr echoServer
895+
896+
-- Client
897+
forkIO $ do
898+
localNode <- newLocalNode transport initRemoteTable
899+
server <- readMVar echoAddr
900+
901+
runProcess localNode $ do
902+
pid <- getSelfPid
903+
send server (pid, wrapMessage ("foo" :: String))
904+
"foo" <- expect
905+
send server (pid, wrapMessage ("baz" :: String))
906+
"baz" <- expect
907+
liftIO $ putMVar clientDone ()
908+
909+
takeMVar clientDone
910+
876911
-- Test 'receiveChanTimeout'
877912
testReceiveChanTimeout :: NT.Transport -> Assertion
878913
testReceiveChanTimeout transport = do
@@ -1018,9 +1053,9 @@ testCatchesExit transport = do
10181053
_ <- forkProcess localNode $ do
10191054
(die ("foobar", 123 :: Int))
10201055
`catchesExit` [
1021-
(\_ m -> maybeHandleMessage m (\(_ :: String) -> return ()))
1022-
, (\_ m -> maybeHandleMessage m (\(_ :: Maybe Int) -> return ()))
1023-
, (\_ m -> maybeHandleMessage m (\(_ :: String, _ :: Int)
1056+
(\_ m -> handleMessage m (\(_ :: String) -> return ()))
1057+
, (\_ m -> handleMessage m (\(_ :: Maybe Int) -> return ()))
1058+
, (\_ m -> handleMessage m (\(_ :: String, _ :: Int)
10241059
-> (liftIO $ putMVar done ()) >> return ()))
10251060
]
10261061

@@ -1135,6 +1170,7 @@ tests (transport, transportInternals) = [
11351170
, testCase "MatchAnyHandle" (testMatchAnyHandle transport)
11361171
, testCase "MatchAnyNoHandle" (testMatchAnyNoHandle transport)
11371172
, testCase "MatchAnyIf" (testMatchAnyIf transport)
1173+
, testCase "MatchMessageUnwrap" (testMatchMessageWithUnwrap transport)
11381174
, testCase "ReceiveChanTimeout" (testReceiveChanTimeout transport)
11391175
, testCase "ReceiveChanFeatures" (testReceiveChanFeatures transport)
11401176
, testCase "KillLocal" (testKillLocal transport)

0 commit comments

Comments
 (0)