Skip to content

Commit 2cfa987

Browse files
author
Tim Watson
committed
Deliver intra-node messages directly to the destination process' mailbox
1 parent fb58b07 commit 2cfa987

File tree

7 files changed

+85
-31
lines changed

7 files changed

+85
-31
lines changed

distributed-process.cabal

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -59,19 +59,19 @@ Library
5959
rank1dynamic >= 0.1 && < 0.2,
6060
syb >= 0.3 && < 0.4
6161
Exposed-modules: Control.Distributed.Process,
62-
Control.Distributed.Process.Serializable,
6362
Control.Distributed.Process.Closure,
6463
Control.Distributed.Process.Node,
65-
Control.Distributed.Process.Internal.Primitives,
64+
Control.Distributed.Process.Serializable
65+
Other-modules: Control.Distributed.Process.Internal.Closure.BuiltIn,
6666
Control.Distributed.Process.Internal.CQueue,
67-
Control.Distributed.Process.Internal.Types,
68-
Control.Distributed.Process.Internal.Trace,
69-
Control.Distributed.Process.Internal.Closure.BuiltIn,
7067
Control.Distributed.Process.Internal.Messaging,
68+
Control.Distributed.Process.Internal.Primitives,
69+
Control.Distributed.Process.Internal.StrictContainerAccessors,
7170
Control.Distributed.Process.Internal.StrictList,
7271
Control.Distributed.Process.Internal.StrictMVar,
72+
Control.Distributed.Process.Internal.Trace,
73+
Control.Distributed.Process.Internal.Types,
7374
Control.Distributed.Process.Internal.WeakTQueue
74-
Control.Distributed.Process.Internal.StrictContainerAccessors
7575
Extensions: RankNTypes,
7676
ScopedTypeVariables,
7777
FlexibleInstances,

src/Control/Distributed/Process.hs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ please see the distributed-process wiki page on github:
1515
module Control.Distributed.Process
1616
( -- * Basic types
1717
ProcessId
18-
, NodeId
18+
, NodeId(..)
1919
, Process
2020
, SendPortId
2121
, processNodeId
@@ -81,6 +81,8 @@ module Control.Distributed.Process
8181
, unmonitor
8282
, withMonitor
8383
, MonitorRef -- opaque
84+
, ProcessExitException()
85+
, exitSource
8486
, ProcessLinkException(..)
8587
, NodeLinkException(..)
8688
, PortLinkException(..)
@@ -127,6 +129,7 @@ module Control.Distributed.Process
127129
, spawnMonitor
128130
, spawnChannel
129131
, DidSpawn(..)
132+
, nullProcessId
130133
-- * Local versions of 'spawn'
131134
, spawnLocal
132135
, spawnChannelLocal
@@ -160,6 +163,8 @@ import Control.Distributed.Process.Internal.Types
160163
, ProcessMonitorNotification(..)
161164
, NodeMonitorNotification(..)
162165
, PortMonitorNotification(..)
166+
, ProcessExitException()
167+
, exitSource
163168
, ProcessLinkException(..)
164169
, NodeLinkException(..)
165170
, PortLinkException(..)

src/Control/Distributed/Process/Internal/Primitives.hs

Lines changed: 66 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ module Control.Distributed.Process.Internal.Primitives
3939
-- keep the exception constructor hidden, so that handling exit
4040
-- reasons /must/ take place via the 'catchExit' family of primitives
4141
, ProcessExitException()
42+
, exitSource
4243
, getSelfPid
4344
, getSelfNode
4445
, ProcessInfo(..)
@@ -96,11 +97,6 @@ module Control.Distributed.Process.Internal.Primitives
9697
import Prelude hiding (catch)
9798
#endif
9899

99-
import Data.Binary (decode)
100-
import Data.Time.Clock (getCurrentTime)
101-
import Data.Time.Format (formatTime)
102-
import System.Locale (defaultTimeLocale)
103-
import System.Timeout (timeout)
104100
import Control.Monad (when)
105101
import Control.Monad.Reader (ask)
106102
import Control.Monad.IO.Class (MonadIO, liftIO)
@@ -111,6 +107,7 @@ import Control.Distributed.Process.Internal.StrictMVar
111107
( StrictMVar
112108
, modifyMVar
113109
, modifyMVar_
110+
, withMVar
114111
)
115112
import Control.Concurrent.Chan (writeChan)
116113
import Control.Concurrent.STM
@@ -124,13 +121,12 @@ import Control.Concurrent.STM
124121
)
125122
import Control.Distributed.Process.Internal.CQueue
126123
( dequeue
124+
, enqueue
127125
, BlockSpec(..)
128126
, MatchOn(..)
129127
)
130128
import Control.Distributed.Process.Serializable (Serializable, fingerprint)
131-
import Data.Accessor ((^.), (^:), (^=))
132129
import Control.Distributed.Static (Closure, Static)
133-
import Data.Rank1Typeable (Typeable)
134130
import qualified Control.Distributed.Static as Static (unstatic, unclosure)
135131
import Control.Distributed.Process.Internal.Types
136132
( NodeId(..)
@@ -149,10 +145,12 @@ import Control.Distributed.Process.Internal.Types
149145
, ReceivePort(..)
150146
, channelCounter
151147
, typedChannelWithId
148+
, localProcessWithId
152149
, TypedChannel(..)
153150
, SendPortId(..)
154151
, Identifier(..)
155152
, ProcessExitException(..)
153+
, exitSource
156154
, DidUnmonitor(..)
157155
, DidUnlinkProcess(..)
158156
, DidUnlinkNode(..)
@@ -166,7 +164,7 @@ import Control.Distributed.Process.Internal.Types
166164
, createUnencodedMessage
167165
, runLocalProcess
168166
, ImplicitReconnect(WithImplicitReconnect, NoImplicitReconnect)
169-
, LocalProcessState
167+
, LocalProcessState(..)
170168
, LocalSendPortId
171169
, messageToPayload
172170
)
@@ -178,10 +176,21 @@ import Control.Distributed.Process.Internal.Messaging
178176
)
179177
import qualified Control.Distributed.Process.Internal.Trace as Trace
180178
import Control.Distributed.Process.Internal.WeakTQueue
181-
( newTQueueIO
179+
( TQueue
180+
, newTQueueIO
182181
, readTQueue
183182
, mkWeakTQueue
183+
, writeTQueue
184184
)
185+
import Data.Accessor ((^.), (^:), (^=))
186+
import Data.Binary (decode)
187+
import Data.Foldable (forM_)
188+
import Data.Rank1Typeable (Typeable)
189+
import Data.Time.Clock (getCurrentTime)
190+
import Data.Time.Format (formatTime)
191+
import System.Locale (defaultTimeLocale)
192+
import System.Mem.Weak (deRefWeak)
193+
import System.Timeout (timeout)
185194
import Unsafe.Coerce
186195

187196
--------------------------------------------------------------------------------
@@ -1031,20 +1040,63 @@ trace s = do
10311040
node <- processNode <$> ask
10321041
liftIO $ Trace.trace (localTracer node) s
10331042

1043+
--------------------------------------------------------------------------------
1044+
-- Messages to local processes --
1045+
--------------------------------------------------------------------------------
1046+
1047+
deliverToLocalProcess :: ProcessId -> Message -> Process ()
1048+
deliverToLocalProcess pid msg = do
1049+
withLocalProc pid $ \p -> enqueue (processQueue p) msg
1050+
1051+
deliverToLocalPort :: SendPortId -> Message -> Process ()
1052+
deliverToLocalPort spId msg =
1053+
let pid = sendPortProcessId spId
1054+
cid = sendPortLocalId spId
1055+
in do
1056+
withLocalProc pid $ \proc -> do
1057+
mChan <- withMVar (processState proc) $ return . (^. typedChannelWithId cid)
1058+
case mChan of
1059+
-- in the unlikely event we know nothing about this channel id,
1060+
-- see [note: missing recipients]
1061+
Nothing -> return ()
1062+
Just (TypedChannel chan') -> do
1063+
-- If ch is Nothing, the process has given up the read end of
1064+
-- the channel and we simply ignore the incoming message - this
1065+
ch <- deRefWeak chan'
1066+
forM_ ch $ \chan -> deliverChan msg chan
1067+
where deliverChan :: forall a . Message -> TQueue a -> IO ()
1068+
deliverChan (UnencodedMessage _ raw) chan' =
1069+
atomically $ writeTQueue chan' ((unsafeCoerce raw) :: a)
1070+
deliverChan (EncodedMessage _ _) _ =
1071+
-- this will /never/ happen
1072+
error "invalid local channel delivery"
1073+
1074+
withLocalProc :: ProcessId -> (LocalProcess -> IO ()) -> Process ()
1075+
withLocalProc pid p = do
1076+
node <- processNode <$> ask
1077+
liftIO $ do
1078+
-- see [note: missing recipients]
1079+
let lpid = processLocalId pid
1080+
mProc <- withMVar (localState node) $ return . (^. localProcessWithId lpid)
1081+
forM_ mProc p
1082+
1083+
-- [note: missing recipients]
1084+
-- By [Unified: table 6, rule missing_process] messages to dead processes
1085+
-- can silently be dropped
1086+
10341087
--------------------------------------------------------------------------------
10351088
-- Auxiliary functions --
10361089
--------------------------------------------------------------------------------
10371090

10381091
sendLocal :: (Serializable a) => ProcessId -> a -> Process ()
10391092
sendLocal pid msg =
1040-
sendCtrlMsg Nothing $ LocalSend pid (createUnencodedMessage msg)
1093+
-- sendCtrlMsg Nothing $ LocalSend pid (createUnencodedMessage msg)
1094+
deliverToLocalProcess pid (createUnencodedMessage msg)
10411095

10421096
sendChanLocal :: (Serializable a) => SendPortId -> a -> Process ()
10431097
sendChanLocal spId msg =
1044-
-- we *must* fully serialize/encode the message here, because
1045-
-- attempting to use `unsafeCoerce' in the node controller
1046-
-- won't work since we know nothing about the required type
1047-
sendCtrlMsg Nothing $ LocalPortSend spId (createUnencodedMessage msg)
1098+
-- sendCtrlMsg Nothing $ LocalPortSend spId (createUnencodedMessage msg)
1099+
deliverToLocalPort spId (createUnencodedMessage msg)
10481100

10491101
getMonitorRefFor :: Identifier -> Process MonitorRef
10501102
getMonitorRefFor ident = do

src/Control/Distributed/Process/Internal/Types.hs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ module Control.Distributed.Process.Internal.Types
3939
, NodeMonitorNotification(..)
4040
, PortMonitorNotification(..)
4141
, ProcessExitException(..)
42+
, exitSource
4243
, ProcessLinkException(..)
4344
, NodeLinkException(..)
4445
, PortLinkException(..)
@@ -411,6 +412,9 @@ instance Exception ProcessExitException
411412
instance Show ProcessExitException where
412413
show (ProcessExitException pid _) = "exit-from=" ++ (show pid)
413414

415+
exitSource :: ProcessExitException -> ProcessId
416+
exitSource (ProcessExitException from _) = from
417+
414418
instance Exception ProcessLinkException
415419
instance Exception NodeLinkException
416420
instance Exception PortLinkException

src/Control/Distributed/Process/Node.hs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
-- | Local nodes
22
--
33
module Control.Distributed.Process.Node
4-
( LocalNode
4+
( LocalNode(localEndPoint)
55
, newLocalNode
66
, closeLocalNode
77
, forkProcess

tests/TestCH.hs

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,6 @@ import Network.Transport.TCP
2727
, defaultTCPParameters
2828
)
2929
import Control.Distributed.Process
30-
import Control.Distributed.Process.Internal.Types
31-
( NodeId(nodeAddress)
32-
, LocalNode(localEndPoint)
33-
, ProcessExitException(..)
34-
, nullProcessId
35-
)
3630
import Control.Distributed.Process.Node
3731
import Control.Distributed.Process.Serializable (Serializable)
3832

@@ -1095,8 +1089,8 @@ testPrettyExit transport = do
10951089

10961090
_ <- forkProcess localNode $ do
10971091
(die "timeout")
1098-
`catch` \ex@(ProcessExitException from _) ->
1099-
let expected = "exit-from=" ++ (show from)
1092+
`catch` \(ex :: ProcessExitException) ->
1093+
let expected = "exit-from=" ++ (show (exitSource ex))
11001094
in do
11011095
True <- return $ (show ex) == expected
11021096
liftIO $ putMVar done ()

tests/TestClosure.hs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ import Network.Socket (sClose)
2727
import Control.Distributed.Process
2828
import Control.Distributed.Process.Closure
2929
import Control.Distributed.Process.Node
30-
import Control.Distributed.Process.Internal.Types (NodeId(nodeAddress))
3130
import Control.Distributed.Static (staticLabel, staticClosure)
3231

3332
import Test.HUnit (Assertion)

0 commit comments

Comments
 (0)