@@ -39,6 +39,7 @@ module Control.Distributed.Process.Internal.Primitives
39
39
-- keep the exception constructor hidden, so that handling exit
40
40
-- reasons /must/ take place via the 'catchExit' family of primitives
41
41
, ProcessExitException ()
42
+ , exitSource
42
43
, getSelfPid
43
44
, getSelfNode
44
45
, ProcessInfo (.. )
@@ -96,11 +97,6 @@ module Control.Distributed.Process.Internal.Primitives
96
97
import Prelude hiding (catch )
97
98
#endif
98
99
99
- import Data.Binary (decode )
100
- import Data.Time.Clock (getCurrentTime )
101
- import Data.Time.Format (formatTime )
102
- import System.Locale (defaultTimeLocale )
103
- import System.Timeout (timeout )
104
100
import Control.Monad (when )
105
101
import Control.Monad.Reader (ask )
106
102
import Control.Monad.IO.Class (MonadIO , liftIO )
@@ -111,6 +107,7 @@ import Control.Distributed.Process.Internal.StrictMVar
111
107
( StrictMVar
112
108
, modifyMVar
113
109
, modifyMVar_
110
+ , withMVar
114
111
)
115
112
import Control.Concurrent.Chan (writeChan )
116
113
import Control.Concurrent.STM
@@ -124,13 +121,12 @@ import Control.Concurrent.STM
124
121
)
125
122
import Control.Distributed.Process.Internal.CQueue
126
123
( dequeue
124
+ , enqueue
127
125
, BlockSpec (.. )
128
126
, MatchOn (.. )
129
127
)
130
128
import Control.Distributed.Process.Serializable (Serializable , fingerprint )
131
- import Data.Accessor ((^.) , (^:) , (^=) )
132
129
import Control.Distributed.Static (Closure , Static )
133
- import Data.Rank1Typeable (Typeable )
134
130
import qualified Control.Distributed.Static as Static (unstatic , unclosure )
135
131
import Control.Distributed.Process.Internal.Types
136
132
( NodeId (.. )
@@ -149,10 +145,12 @@ import Control.Distributed.Process.Internal.Types
149
145
, ReceivePort (.. )
150
146
, channelCounter
151
147
, typedChannelWithId
148
+ , localProcessWithId
152
149
, TypedChannel (.. )
153
150
, SendPortId (.. )
154
151
, Identifier (.. )
155
152
, ProcessExitException (.. )
153
+ , exitSource
156
154
, DidUnmonitor (.. )
157
155
, DidUnlinkProcess (.. )
158
156
, DidUnlinkNode (.. )
@@ -166,7 +164,7 @@ import Control.Distributed.Process.Internal.Types
166
164
, createUnencodedMessage
167
165
, runLocalProcess
168
166
, ImplicitReconnect (WithImplicitReconnect , NoImplicitReconnect )
169
- , LocalProcessState
167
+ , LocalProcessState ( .. )
170
168
, LocalSendPortId
171
169
, messageToPayload
172
170
)
@@ -178,10 +176,21 @@ import Control.Distributed.Process.Internal.Messaging
178
176
)
179
177
import qualified Control.Distributed.Process.Internal.Trace as Trace
180
178
import Control.Distributed.Process.Internal.WeakTQueue
181
- ( newTQueueIO
179
+ ( TQueue
180
+ , newTQueueIO
182
181
, readTQueue
183
182
, mkWeakTQueue
183
+ , writeTQueue
184
184
)
185
+ import Data.Accessor ((^.) , (^:) , (^=) )
186
+ import Data.Binary (decode )
187
+ import Data.Foldable (forM_ )
188
+ import Data.Rank1Typeable (Typeable )
189
+ import Data.Time.Clock (getCurrentTime )
190
+ import Data.Time.Format (formatTime )
191
+ import System.Locale (defaultTimeLocale )
192
+ import System.Mem.Weak (deRefWeak )
193
+ import System.Timeout (timeout )
185
194
import Unsafe.Coerce
186
195
187
196
--------------------------------------------------------------------------------
@@ -1031,20 +1040,63 @@ trace s = do
1031
1040
node <- processNode <$> ask
1032
1041
liftIO $ Trace. trace (localTracer node) s
1033
1042
1043
+ --------------------------------------------------------------------------------
1044
+ -- Messages to local processes --
1045
+ --------------------------------------------------------------------------------
1046
+
1047
+ deliverToLocalProcess :: ProcessId -> Message -> Process ()
1048
+ deliverToLocalProcess pid msg = do
1049
+ withLocalProc pid $ \ p -> enqueue (processQueue p) msg
1050
+
1051
+ deliverToLocalPort :: SendPortId -> Message -> Process ()
1052
+ deliverToLocalPort spId msg =
1053
+ let pid = sendPortProcessId spId
1054
+ cid = sendPortLocalId spId
1055
+ in do
1056
+ withLocalProc pid $ \ proc -> do
1057
+ mChan <- withMVar (processState proc ) $ return . (^. typedChannelWithId cid)
1058
+ case mChan of
1059
+ -- in the unlikely event we know nothing about this channel id,
1060
+ -- see [note: missing recipients]
1061
+ Nothing -> return ()
1062
+ Just (TypedChannel chan') -> do
1063
+ -- If ch is Nothing, the process has given up the read end of
1064
+ -- the channel and we simply ignore the incoming message - this
1065
+ ch <- deRefWeak chan'
1066
+ forM_ ch $ \ chan -> deliverChan msg chan
1067
+ where deliverChan :: forall a . Message -> TQueue a -> IO ()
1068
+ deliverChan (UnencodedMessage _ raw) chan' =
1069
+ atomically $ writeTQueue chan' ((unsafeCoerce raw) :: a )
1070
+ deliverChan (EncodedMessage _ _) _ =
1071
+ -- this will /never/ happen
1072
+ error " invalid local channel delivery"
1073
+
1074
+ withLocalProc :: ProcessId -> (LocalProcess -> IO () ) -> Process ()
1075
+ withLocalProc pid p = do
1076
+ node <- processNode <$> ask
1077
+ liftIO $ do
1078
+ -- see [note: missing recipients]
1079
+ let lpid = processLocalId pid
1080
+ mProc <- withMVar (localState node) $ return . (^. localProcessWithId lpid)
1081
+ forM_ mProc p
1082
+
1083
+ -- [note: missing recipients]
1084
+ -- By [Unified: table 6, rule missing_process] messages to dead processes
1085
+ -- can silently be dropped
1086
+
1034
1087
--------------------------------------------------------------------------------
1035
1088
-- Auxiliary functions --
1036
1089
--------------------------------------------------------------------------------
1037
1090
1038
1091
sendLocal :: (Serializable a ) => ProcessId -> a -> Process ()
1039
1092
sendLocal pid msg =
1040
- sendCtrlMsg Nothing $ LocalSend pid (createUnencodedMessage msg)
1093
+ -- sendCtrlMsg Nothing $ LocalSend pid (createUnencodedMessage msg)
1094
+ deliverToLocalProcess pid (createUnencodedMessage msg)
1041
1095
1042
1096
sendChanLocal :: (Serializable a ) => SendPortId -> a -> Process ()
1043
1097
sendChanLocal spId msg =
1044
- -- we *must* fully serialize/encode the message here, because
1045
- -- attempting to use `unsafeCoerce' in the node controller
1046
- -- won't work since we know nothing about the required type
1047
- sendCtrlMsg Nothing $ LocalPortSend spId (createUnencodedMessage msg)
1098
+ -- sendCtrlMsg Nothing $ LocalPortSend spId (createUnencodedMessage msg)
1099
+ deliverToLocalPort spId (createUnencodedMessage msg)
1048
1100
1049
1101
getMonitorRefFor :: Identifier -> Process MonitorRef
1050
1102
getMonitorRefFor ident = do
0 commit comments