Skip to content

Commit 23282e0

Browse files
author
Tim Watson
committed
Expose Message and broaden the scope of polymorphic expect
Remove AbstractMessage and replace its forward and maybeHandleMessage API with versions that operate directly on Message. Create Typeable and Binary instances for Message and export the type from Process.hs (though not the constructor). Provide functions for matching, wrapping and unwrapping Message.
1 parent ba2361c commit 23282e0

File tree

4 files changed

+168
-64
lines changed

4 files changed

+168
-64
lines changed

src/Control/Distributed/Process.hs

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,10 +42,16 @@ module Control.Distributed.Process
4242
, match
4343
, matchIf
4444
, matchUnknown
45-
, AbstractMessage(..)
4645
, matchAny
4746
, matchAnyIf
4847
, matchChan
48+
, Message
49+
, matchMessage
50+
, matchMessageIf
51+
, wrapMessage
52+
, unwrapMessage
53+
, handleMessage
54+
, forward
4955
-- * Process management
5056
, spawn
5157
, call
@@ -167,6 +173,7 @@ import Control.Distributed.Process.Internal.Types
167173
, WhereIsReply(..)
168174
, RegisterReply(..)
169175
, LocalProcess(processNode)
176+
, Message
170177
, nullProcessId
171178
)
172179
import Control.Distributed.Process.Serializable (Serializable, SerializableDict)
@@ -199,10 +206,15 @@ import Control.Distributed.Process.Internal.Primitives
199206
, match
200207
, matchIf
201208
, matchUnknown
202-
, AbstractMessage(..)
203209
, matchAny
204210
, matchAnyIf
205211
, matchChan
212+
, matchMessage
213+
, matchMessageIf
214+
, wrapMessage
215+
, unwrapMessage
216+
, handleMessage
217+
, forward
206218
-- Process management
207219
, terminate
208220
, ProcessTerminationException(..)

src/Control/Distributed/Process/Internal/Primitives.hs

Lines changed: 103 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,15 @@ module Control.Distributed.Process.Internal.Primitives
1919
, match
2020
, matchIf
2121
, matchUnknown
22-
, AbstractMessage(..)
2322
, matchAny
2423
, matchAnyIf
2524
, matchChan
25+
, matchMessage
26+
, matchMessageIf
27+
, wrapMessage
28+
, unwrapMessage
29+
, handleMessage
30+
, forward
2631
-- * Process management
2732
, terminate
2833
, ProcessTerminationException(..)
@@ -320,69 +325,115 @@ matchIf c p = Match $ MatchMsg $ \msg ->
320325
!decoded = decode (messageEncoding msg)
321326
_ -> Nothing
322327

323-
-- | Represents a received message and provides two basic operations on it.
324-
data AbstractMessage = AbstractMessage {
325-
forward :: ProcessId -> Process () -- ^ forward the message to @ProcessId@
326-
, maybeHandleMessage :: forall a b. (Serializable a)
327-
=> (a -> Process b) -> Process (Maybe b) {- ^ Handle the message.
328-
If the type of the message matches the type of the first argument to
329-
the supplied expression, then the expression will be evaluated against
330-
it. If this runtime type checking fails, then @Nothing@ will be returned
331-
to indicate the fact. If the check succeeds and evaluation proceeds
332-
however, the resulting value with be wrapped with @Just@.
333-
-}
334-
}
328+
-- | Match against any message, regardless of the underlying (contained) type
329+
matchMessage :: (Message -> Process Message) -> Match Message
330+
matchMessage p = Match $ MatchMsg $ \msg -> Just (p msg)
331+
332+
-- | Match against any message (regardless of underlying type) that satisfies a predicate
333+
matchMessageIf :: (Message -> Bool) -> (Message -> Process Message) -> Match Message
334+
matchMessageIf c p = Match $ MatchMsg $ \msg ->
335+
case (c msg) of
336+
True -> Just (p msg)
337+
False -> Nothing
338+
339+
-- | Forward a raw 'Message' to the given 'ProcessId'.
340+
forward :: Message -> ProcessId -> Process ()
341+
forward msg them = do
342+
proc <- ask
343+
liftIO $ sendPayload (processNode proc)
344+
(ProcessIdentifier (processId proc))
345+
(ProcessIdentifier them)
346+
NoImplicitReconnect
347+
(messageToPayload msg)
348+
349+
-- | Wrap a 'Serializable' value in a 'Message'. Note that 'Message's are
350+
-- 'Serializable' - like the datum they contain - but remember that deserializing
351+
-- a 'Message' will yield a 'Message', not the type within it! To obtain the
352+
-- wrapped datum, use 'unwrapMessage' or 'handleMessage' with a specific type.
353+
--
354+
-- @
355+
-- do
356+
-- self <- getSelfPid
357+
-- send self (wrapMessage "blah")
358+
-- Nothing <- expectTimeout 1000000 :: Process (Maybe String)
359+
-- (Just m) <- expectTimeout 1000000 :: Process (Maybe Message)
360+
-- "blah" <- unwrapMessage m :: Process (Maybe String)
361+
-- @
362+
--
363+
wrapMessage :: Serializable a => a -> Message
364+
wrapMessage = createMessage
365+
366+
-- | Attempt to unwrap a raw 'Message'.
367+
-- If the type of the decoded message payload matches the expected type, the
368+
-- value will be returned with @Just@, otherwise @Nothing@ indicates the types
369+
-- do not match.
370+
--
371+
-- This expression, for example, will evaluate to @Nothing@
372+
-- > unwrapMessage (wrapMessage "foobar") :: Process (Maybe Int)
373+
--
374+
-- Whereas this expression, will yield @Just "foo"@
375+
-- > unwrapMessage (wrapMessage "foo") :: Process (Maybe String)
376+
--
377+
unwrapMessage :: forall a. Serializable a => Message -> Process (Maybe a)
378+
unwrapMessage msg =
379+
case messageFingerprint msg == fingerprint (undefined :: a) of
380+
True -> return (Just (decoded))
381+
where
382+
decoded :: a
383+
-- Make sure the value is fully decoded so that we don't hang to
384+
-- bytestrings when the calling process doesn't evaluate immediately
385+
!decoded = decode (messageEncoding msg)
386+
_ -> return Nothing
387+
388+
-- | Attempt to handle a raw 'Message'.
389+
-- If the type of the message matches the type of the first argument to
390+
-- the supplied expression, then the message will be decoded and the expression
391+
-- evaluated against its value. If this runtime type checking fails however,
392+
-- @Nothing@ will be returned to indicate the fact. If the check succeeds and
393+
-- evaluation proceeds, the resulting value with be wrapped with @Just@.
394+
--
395+
-- Intended for use in `catchesExit` and `matchAny` primitives.
396+
--
397+
handleMessage :: forall a b. (Serializable a)
398+
=> Message -> (a -> Process b) -> Process (Maybe b)
399+
handleMessage msg proc = do
400+
case messageFingerprint msg == fingerprint (undefined :: a) of
401+
True -> do { r <- proc (decoded :: a); return (Just r) }
402+
where
403+
decoded :: a
404+
!decoded = decode (messageEncoding msg)
405+
_ -> return Nothing
335406

336407
-- | Match against an arbitrary message. 'matchAny' removes the first available
337-
-- message from the process mailbox, and via the 'AbstractMessage' type,
338-
-- supports forwarding /or/ handling the message /if/ it is of the correct
339-
-- type. If /not/ of the right type, then the 'AbstractMessage'
340-
-- @maybeHandleMessage@ function will not evaluate the supplied expression,
341-
-- /but/ the message will still have been removed from the process mailbox!
342-
--
343-
matchAny :: forall b. (AbstractMessage -> Process b) -> Match b
344-
matchAny p = Match $ MatchMsg $ Just . p . abstract
345-
346-
-- | Match against an arbitrary message. 'matchAnyIf' will /only/ remove the
347-
-- message from the process mailbox, /if/ the supplied condition matches. The
348-
-- success (or failure) of runtime type checks in @maybeHandleMessage@ does not
349-
-- count here, i.e., if the condition evaluates to @True@ then the message will
408+
-- message from the process mailbox. To handle arbitrary /raw/ messages once
409+
-- removed from the mailbox, see 'handleMessage' and 'unwrapMessage'.
410+
--
411+
matchAny :: forall b. (Message -> Process b) -> Match b
412+
matchAny p = Match $ MatchMsg $ \msg -> Just (p msg)
413+
414+
-- | Match against an arbitrary message. Intended for use with 'handleMessage'
415+
-- and 'unwrapMessage', this function /only/ removes a message from the process
416+
-- mailbox, /if/ the supplied condition matches. The success (or failure) of
417+
-- runtime type checks deferred to @handleMessage@ and friends is irrelevant
418+
-- here, i.e., if the condition evaluates to @True@ then the message will
350419
-- be removed from the process mailbox and decoded, but that does /not/
351-
-- guarantee that an expression passed to @maybeHandleMessage@ will pass the
352-
-- runtime type checks and therefore be evaluated. If the types do not match
353-
-- up, then @maybeHandleMessage@ returns 'Nothing'.
420+
-- guarantee that an expression passed to @handleMessage@ will pass the
421+
-- runtime type checks and therefore be evaluated.
422+
--
354423
matchAnyIf :: forall a b. (Serializable a)
355424
=> (a -> Bool)
356-
-> (AbstractMessage -> Process b)
425+
-> (Message -> Process b)
357426
-> Match b
358427
matchAnyIf c p = Match $ MatchMsg $ \msg ->
359428
case messageFingerprint msg == fingerprint (undefined :: a) of
360-
True | c decoded -> Just (p (abstract msg))
429+
True | c decoded -> Just (p msg)
361430
where
362431
decoded :: a
363432
-- Make sure the value is fully decoded so that we don't hang to
364433
-- bytestrings when the calling process doesn't evaluate immediately
365434
!decoded = decode (messageEncoding msg)
366435
_ -> Nothing
367436

368-
abstract :: Message -> AbstractMessage
369-
abstract msg = AbstractMessage {
370-
forward = \them -> do
371-
proc <- ask
372-
liftIO $ sendPayload (processNode proc)
373-
(ProcessIdentifier (processId proc))
374-
(ProcessIdentifier them)
375-
NoImplicitReconnect
376-
(messageToPayload msg)
377-
, maybeHandleMessage = \(proc :: (a -> Process b)) -> do
378-
case messageFingerprint msg == fingerprint (undefined :: a) of
379-
True -> do { r <- proc (decoded :: a); return (Just r) }
380-
where
381-
decoded :: a
382-
!decoded = decode (messageEncoding msg)
383-
_ -> return Nothing
384-
}
385-
386437
-- | Remove any message from the queue
387438
matchUnknown :: Process b -> Match b
388439
matchUnknown p = Match $ MatchMsg (const (Just p))
@@ -461,16 +512,16 @@ catchExit act exitHandler = catch act handleExit
461512
--
462513
-- See 'maybeHandleMessage' and 'AsbtractMessage' for more details.
463514
catchesExit :: Process b
464-
-> [(ProcessId -> AbstractMessage -> (Process (Maybe b)))]
515+
-> [(ProcessId -> Message -> (Process (Maybe b)))]
465516
-> Process b
466517
catchesExit act handlers = catch act ((flip handleExit) handlers)
467518
where
468519
handleExit :: ProcessExitException
469-
-> [(ProcessId -> AbstractMessage -> Process (Maybe b))]
520+
-> [(ProcessId -> Message -> Process (Maybe b))]
470521
-> Process b
471522
handleExit ex [] = liftIO $ throwIO ex
472523
handleExit ex@(ProcessExitException from msg) (h:hs) = do
473-
r <- h from (abstract msg)
524+
r <- h from msg
474525
case r of
475526
Nothing -> handleExit ex hs
476527
Just p -> return p

src/Control/Distributed/Process/Internal/Types.hs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -307,6 +307,7 @@ data Message = Message
307307
{ messageFingerprint :: !Fingerprint
308308
, messageEncoding :: !BSL.ByteString
309309
}
310+
deriving (Typeable)
310311

311312
instance Show Message where
312313
show (Message fp enc) = show enc ++ " :: " ++ showFingerprint fp []
@@ -489,6 +490,10 @@ data ProcessSignal =
489490
-- Binary instances --
490491
--------------------------------------------------------------------------------
491492

493+
instance Binary Message where
494+
put msg = put $ messageToPayload msg
495+
get = payloadToMessage <$> get
496+
492497
instance Binary LocalProcessId where
493498
put lpid = put (lpidUnique lpid) >> put (lpidCounter lpid)
494499
get = LocalProcessId <$> get <*> get

tests/TestCH.hs

Lines changed: 46 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import Control.Distributed.Process.Internal.Types
3232
, LocalNode(localEndPoint)
3333
, ProcessExitException(..)
3434
, nullProcessId
35+
, Message
3536
)
3637
import Control.Distributed.Process.Node
3738
import Control.Distributed.Process.Serializable (Serializable)
@@ -789,9 +790,9 @@ testMatchAnyHandle transport = do
789790
liftIO $ putMVar clientDone ()
790791

791792
takeMVar clientDone
792-
where maybeForward :: ProcessId -> AbstractMessage -> Process (Maybe ())
793+
where maybeForward :: ProcessId -> Message -> Process (Maybe ())
793794
maybeForward s msg =
794-
maybeHandleMessage msg (\m@(Add _ _ _) -> send s m)
795+
handleMessage msg (\m@(Add _ _ _) -> send s m)
795796

796797
testMatchAnyNoHandle :: NT.Transport -> Assertion
797798
testMatchAnyNoHandle transport = do
@@ -810,7 +811,7 @@ testMatchAnyNoHandle transport = do
810811
-- the match `AbstractMessage -> Process ()` will succeed!
811812
(\m -> do
812813
-- `String -> Process ()` does *not* match the input types however
813-
r <- (maybeHandleMessage m (\(_ :: String) -> die "NONSENSE" ))
814+
r <- (handleMessage m (\(_ :: String) -> die "NONSENSE" ))
814815
case r of
815816
Nothing -> return ()
816817
Just _ -> die "NONSENSE")
@@ -848,7 +849,7 @@ testMatchAnyIf transport = do
848849
echoServer <- forkProcess localNode $ forever $ do
849850
receiveWait [
850851
matchAnyIf (\(_ :: ProcessId, (s :: String)) -> s /= "bar")
851-
handleMessage
852+
tryHandleMessage
852853
]
853854
putMVar echoAddr echoServer
854855

@@ -868,11 +869,45 @@ testMatchAnyIf transport = do
868869
liftIO $ putMVar clientDone ()
869870

870871
takeMVar clientDone
871-
where handleMessage :: AbstractMessage -> Process (Maybe ())
872-
handleMessage msg =
873-
maybeHandleMessage msg (\(pid :: ProcessId, (m :: String))
872+
where tryHandleMessage :: Message -> Process (Maybe ())
873+
tryHandleMessage msg =
874+
handleMessage msg (\(pid :: ProcessId, (m :: String))
874875
-> do { send pid m; return () })
875876

877+
testMatchMessageWithUnwrap :: NT.Transport -> Assertion
878+
testMatchMessageWithUnwrap transport = do
879+
echoAddr <- newEmptyMVar
880+
clientDone <- newEmptyMVar
881+
882+
-- echo server
883+
forkIO $ do
884+
localNode <- newLocalNode transport initRemoteTable
885+
echoServer <- forkProcess localNode $ forever $ do
886+
msg <- receiveWait [
887+
matchMessage (\(m :: Message) -> do
888+
return m)
889+
]
890+
unwrapped <- unwrapMessage msg :: Process (Maybe (ProcessId, Message))
891+
case unwrapped of
892+
(Just (p, msg')) -> forward msg' p
893+
Nothing -> die "unable to unwrap the message"
894+
putMVar echoAddr echoServer
895+
896+
-- Client
897+
forkIO $ do
898+
localNode <- newLocalNode transport initRemoteTable
899+
server <- readMVar echoAddr
900+
901+
runProcess localNode $ do
902+
pid <- getSelfPid
903+
send server (pid, wrapMessage ("foo" :: String))
904+
"foo" <- expect
905+
send server (pid, wrapMessage ("baz" :: String))
906+
"baz" <- expect
907+
liftIO $ putMVar clientDone ()
908+
909+
takeMVar clientDone
910+
876911
-- Test 'receiveChanTimeout'
877912
testReceiveChanTimeout :: NT.Transport -> Assertion
878913
testReceiveChanTimeout transport = do
@@ -1018,9 +1053,9 @@ testCatchesExit transport = do
10181053
_ <- forkProcess localNode $ do
10191054
(die ("foobar", 123 :: Int))
10201055
`catchesExit` [
1021-
(\_ m -> maybeHandleMessage m (\(_ :: String) -> return ()))
1022-
, (\_ m -> maybeHandleMessage m (\(_ :: Maybe Int) -> return ()))
1023-
, (\_ m -> maybeHandleMessage m (\(_ :: String, _ :: Int)
1056+
(\_ m -> handleMessage m (\(_ :: String) -> return ()))
1057+
, (\_ m -> handleMessage m (\(_ :: Maybe Int) -> return ()))
1058+
, (\_ m -> handleMessage m (\(_ :: String, _ :: Int)
10241059
-> (liftIO $ putMVar done ()) >> return ()))
10251060
]
10261061

@@ -1135,6 +1170,7 @@ tests (transport, transportInternals) = [
11351170
, testCase "MatchAnyHandle" (testMatchAnyHandle transport)
11361171
, testCase "MatchAnyNoHandle" (testMatchAnyNoHandle transport)
11371172
, testCase "MatchAnyIf" (testMatchAnyIf transport)
1173+
, testCase "MatchMessageUnwrap" (testMatchMessageWithUnwrap transport)
11381174
, testCase "ReceiveChanTimeout" (testReceiveChanTimeout transport)
11391175
, testCase "ReceiveChanFeatures" (testReceiveChanFeatures transport)
11401176
, testCase "KillLocal" (testKillLocal transport)

0 commit comments

Comments
 (0)