@@ -141,33 +141,33 @@ resolve (MxForPid pid) = \msg -> send pid msg
141
141
startTableCoordinator :: Fork -> Process ()
142
142
startTableCoordinator fork = run Map. empty
143
143
where
144
-
145
- -- TODO: rethink this design. routing via the
146
- -- coordinator will be a publishing bottle neck,
147
- -- plus the code in 'fetch' really requires a
148
- -- pid to send to, so that'll not work via an
149
- -- intermediary anyway
150
-
151
144
run :: MxTables -> Process ()
152
145
run tables =
153
146
receiveWait [
154
- match ( \ ( MxAgentStart ch agent) ->
155
- lookupAgent tables agent >>= \ (p, t ) -> do
156
- sendChan ch p >> return t )
157
- , match (\ (agent, Get k sp ) -> do
147
+ -- note that this state change can race with MxAgentStart requests
148
+ match ( \ ( ProcessMonitorNotification _ pid _ ) -> do
149
+ return $ Map. filter ( /= pid) tables )
150
+ , match (\ (MxAgentStart ch agent ) -> do
158
151
lookupAgent tables agent >>= \ (p, t) -> do
159
- safeFetch p k >>= sendChan sp >> return t)
160
- , match $ handleRequest tables
152
+ sendChan ch p >> return t)
153
+ , match (\ req@ (agent, tReq :: MxTableRequest ) -> do
154
+ case tReq of
155
+ Get k sp -> do
156
+ lookupAgent tables agent >>= \ (p, t) -> do
157
+ safeFetch p k >>= sendChan sp >> return t
158
+ _ -> do
159
+ handleRequest tables req)
160
+ , matchAny (\ _ -> return tables) -- unrecognised messages are dropped
161
161
] >>= run
162
162
163
163
handleRequest :: MxTables
164
164
-> (MxAgentId , MxTableRequest )
165
165
-> Process MxTables
166
- handleRequest tables' (agent, req) =
167
- lookupAgent tables' agent >>= \ (p, t) -> do send p req >> return t
166
+ handleRequest tables' (agent, req) = do
167
+ lookupAgent tables' agent >>= \ (p, t) -> send p req >> return t
168
168
169
169
lookupAgent :: MxTables -> MxAgentId -> Process (ProcessId , MxTables )
170
- lookupAgent tables' agentId' =
170
+ lookupAgent tables' agentId' = do
171
171
case Map. lookup agentId' tables' of
172
172
Nothing -> launchNew agentId' tables'
173
173
Just p -> return (p, tables')
@@ -188,28 +188,31 @@ startTableCoordinator fork = run Map.empty
188
188
-- break an import cycle with Node.hs via the management
189
189
-- agent, management API and the tracing modules
190
190
them <- liftIO $ fork $ link us >> proc
191
- -- them <- spawnLocal nid $ link us >> proc
192
191
ref <- monitor them
193
192
return (them, ref)
194
193
195
194
tableHandler :: MxTableState -> Process ()
196
195
tableHandler state = do
197
196
ns <- receiveWait [
198
- match (\ Delete -> return Nothing )
199
- , match (\ Purge -> return $ Just $ (entries ^= Map. empty) $ state)
200
- , match (\ (Clear k) -> return $ Just $ (entries ^: Map. delete k) $ state)
201
- , match (\ (Set k v) -> return $ Just $ (entries ^: Map. insert k v) state)
202
- , match (\ (Get k c) -> getEntry k c state >> return (Just state))
197
+ match (handleTableRequest state)
198
+ , matchAny (\ _ -> return (Just state))
203
199
]
204
200
case ns of
205
201
Nothing -> return ()
206
202
Just s' -> tableHandler s'
203
+ where
204
+ handleTableRequest _ Delete = return Nothing
205
+ handleTableRequest st Purge = return $ Just $ (entries ^= Map. empty) $ st
206
+ handleTableRequest st (Clear k) = return $ Just $ (entries ^: Map. delete k) $ st
207
+ handleTableRequest st (Set k v) = return $ Just $ (entries ^: Map. insert k v) st
208
+ handleTableRequest st (Get k c) = getEntry k c st >> return (Just st)
207
209
208
210
getEntry :: String
209
211
-> SendPort (Maybe Message )
210
212
-> MxTableState
211
213
-> Process ()
212
- getEntry k m MxTableState {.. } = sendChan m =<< return (Map. lookup k _entries)
214
+ getEntry k m MxTableState {.. } = do
215
+ sendChan m =<< return (Map. lookup k _entries)
213
216
214
217
entries :: Accessor MxTableState (Map String Message )
215
218
entries = accessor _entries (\ ls st -> st { _entries = ls })
0 commit comments