Skip to content

Commit 905b487

Browse files
committed
Improve the management data/properties API
1 parent dcaeb3d commit 905b487

File tree

5 files changed

+192
-75
lines changed

5 files changed

+192
-75
lines changed

src/Control/Distributed/Process/Internal/Primitives.hs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -553,7 +553,7 @@ delegate pid p = do
553553
]
554554
delegate pid p
555555

556-
-- | A straight relay that simply forwards all messages to the supplied pid.
556+
-- | A straight relay that forwards all messages to the supplied pid.
557557
relay :: ProcessId -> Process ()
558558
relay !pid = receiveWait [ matchAny (\m -> forward m pid) ] >> relay pid
559559

src/Control/Distributed/Process/Management.hs

Lines changed: 27 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -55,11 +55,14 @@ module Control.Distributed.Process.Management
5555
, mxBroadcast
5656
, mxSetLocal
5757
, mxGetLocal
58-
, mxLift
58+
, liftMX
5959
-- * Mx Data API
6060
, mxPublish
6161
, mxSet
6262
, mxGet
63+
, mxClear
64+
, mxPurgeTable
65+
, mxDropTable
6366
) where
6467

6568
import Control.Applicative ((<$>))
@@ -74,7 +77,6 @@ import Control.Distributed.Process.Internal.Primitives
7477
, receiveWait
7578
, matchChan
7679
, unwrapMessage
77-
, whereis
7880
)
7981
import Control.Distributed.Process.Internal.Types
8082
( Process
@@ -133,14 +135,27 @@ mxPublish a k v = Table.set k v (Table.MxForAgent a)
133135
-- and attempts to force the value later on.
134136
--
135137
mxSet :: Serializable a => MxAgentId -> String -> a -> Process ()
136-
mxSet mxId key msg =
138+
mxSet mxId key msg = do
137139
Table.set key (unsafeCreateUnencodedMessage msg) (Table.MxForAgent mxId)
138140

139141
-- | Fetches a property from the management database for the given key.
140142
-- If the property is not set, or does not match the expected type when
141143
-- typechecked (at runtime), returns @Nothing@.
142144
mxGet :: Serializable a => MxAgentId -> String -> Process (Maybe a)
143-
mxGet mxId = Table.fetch (Table.MxForAgent mxId)
145+
mxGet = Table.fetch . Table.MxForAgent
146+
147+
-- | Clears a property from the management database using the given key.
148+
-- If the key does not exist in the database, this is a noop.
149+
mxClear :: MxAgentId -> String -> Process ()
150+
mxClear mxId key = Table.clear key (Table.MxForAgent mxId)
151+
152+
-- | Purges a table in the management database of all its stored properties.
153+
mxPurgeTable :: MxAgentId -> Process ()
154+
mxPurgeTable = Table.purge . Table.MxForAgent
155+
156+
-- | Deletes a table from the management database.
157+
mxDropTable :: MxAgentId -> Process ()
158+
mxDropTable = Table.delete . Table.MxForAgent
144159

145160
--------------------------------------------------------------------------------
146161
-- API for writing user defined management extensions (i.e., agents) --
@@ -152,7 +167,7 @@ mxGetId = ST.get >>= return . mxAgentId
152167
mxBroadcast :: (Serializable m) => m -> MxAgent s ()
153168
mxBroadcast msg = do
154169
state <- ST.get
155-
mxLift $ liftIO $ atomically $ do
170+
liftMX $ liftIO $ atomically $ do
156171
writeTChan (mxBus state) (unsafeCreateUnencodedMessage msg)
157172

158173
mxDeactivate :: forall s. String -> MxAgent s MxAction
@@ -161,8 +176,8 @@ mxDeactivate = return . MxAgentDeactivate
161176
mxReady :: forall s. MxAgent s MxAction
162177
mxReady = return MxAgentReady
163178

164-
mxLift :: Process a -> MxAgent s a
165-
mxLift p = MxAgent $ ST.lift p
179+
liftMX :: Process a -> MxAgent s a
180+
liftMX p = MxAgent $ ST.lift p
166181

167182
mxSetLocal :: s -> MxAgent s ()
168183
mxSetLocal s = ST.modify $ \st -> st { mxLocalState = s }
@@ -174,7 +189,7 @@ mxSink :: forall s m . (Serializable m)
174189
=> (m -> MxAgent s MxAction)
175190
-> MxSink s
176191
mxSink act msg = do
177-
msg' <- mxLift $ (unwrapMessage msg :: Process (Maybe m))
192+
msg' <- liftMX $ (unwrapMessage msg :: Process (Maybe m))
178193
case msg' of
179194
Nothing -> return Nothing
180195
Just m -> act m >>= return . Just
@@ -200,7 +215,11 @@ mxAgent mxId initState handlers = do
200215
start (sendTChan, recvTChan) = do
201216
(sp, rp) <- newChan
202217
nsend Table.mxTableCoordinator (MxAgentStart sp mxId)
218+
-- liftIO $ putStrLn $ "waiting on table coordinator " ++ Table.mxTableCoordinator
219+
-- p <- whereis Table.mxTableCoordinator
220+
-- liftIO $ putStrLn $ "registration == " ++ (show p)
203221
tablePid <- receiveWait [ matchChan rp (\(p :: ProcessId) -> return p) ]
222+
-- liftIO $ putStrLn "starting agent listener..."
204223
runAgent handlers recvTChan $ MxAgentState mxId sendTChan tablePid initState
205224

206225
runAgent hs c s = do
@@ -228,8 +247,3 @@ mxAgent mxId initState handlers = do
228247
Nothing -> runPipeline msg state next
229248
Just result -> return (result, state')
230249

231-
runAgentST :: MxAgentState s
232-
-> MxAgent s MxAction
233-
-> Process (MxAction, MxAgentState s)
234-
runAgentST state proc = ST.runStateT (unAgent proc) state
235-

src/Control/Distributed/Process/Management/Table.hs

Lines changed: 26 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -141,33 +141,33 @@ resolve (MxForPid pid) = \msg -> send pid msg
141141
startTableCoordinator :: Fork -> Process ()
142142
startTableCoordinator fork = run Map.empty
143143
where
144-
145-
-- TODO: rethink this design. routing via the
146-
-- coordinator will be a publishing bottle neck,
147-
-- plus the code in 'fetch' really requires a
148-
-- pid to send to, so that'll not work via an
149-
-- intermediary anyway
150-
151144
run :: MxTables -> Process ()
152145
run tables =
153146
receiveWait [
154-
match (\(MxAgentStart ch agent) ->
155-
lookupAgent tables agent >>= \(p, t) -> do
156-
sendChan ch p >> return t)
157-
, match (\(agent, Get k sp) -> do
147+
-- note that this state change can race with MxAgentStart requests
148+
match (\(ProcessMonitorNotification _ pid _) -> do
149+
return $ Map.filter (/= pid) tables)
150+
, match (\(MxAgentStart ch agent) -> do
158151
lookupAgent tables agent >>= \(p, t) -> do
159-
safeFetch p k >>= sendChan sp >> return t)
160-
, match $ handleRequest tables
152+
sendChan ch p >> return t)
153+
, match (\req@(agent, tReq :: MxTableRequest) -> do
154+
case tReq of
155+
Get k sp -> do
156+
lookupAgent tables agent >>= \(p, t) -> do
157+
safeFetch p k >>= sendChan sp >> return t
158+
_ -> do
159+
handleRequest tables req)
160+
, matchAny (\_ -> return tables) -- unrecognised messages are dropped
161161
] >>= run
162162

163163
handleRequest :: MxTables
164164
-> (MxAgentId, MxTableRequest)
165165
-> Process MxTables
166-
handleRequest tables' (agent, req) =
167-
lookupAgent tables' agent >>= \(p, t) -> do send p req >> return t
166+
handleRequest tables' (agent, req) = do
167+
lookupAgent tables' agent >>= \(p, t) -> send p req >> return t
168168

169169
lookupAgent :: MxTables -> MxAgentId -> Process (ProcessId, MxTables)
170-
lookupAgent tables' agentId' =
170+
lookupAgent tables' agentId' = do
171171
case Map.lookup agentId' tables' of
172172
Nothing -> launchNew agentId' tables'
173173
Just p -> return (p, tables')
@@ -188,28 +188,31 @@ startTableCoordinator fork = run Map.empty
188188
-- break an import cycle with Node.hs via the management
189189
-- agent, management API and the tracing modules
190190
them <- liftIO $ fork $ link us >> proc
191-
-- them <- spawnLocal nid $ link us >> proc
192191
ref <- monitor them
193192
return (them, ref)
194193

195194
tableHandler :: MxTableState -> Process ()
196195
tableHandler state = do
197196
ns <- receiveWait [
198-
match (\Delete -> return Nothing)
199-
, match (\Purge -> return $ Just $ (entries ^= Map.empty) $ state)
200-
, match (\(Clear k) -> return $ Just $ (entries ^: Map.delete k) $ state)
201-
, match (\(Set k v) -> return $ Just $ (entries ^: Map.insert k v) state)
202-
, match (\(Get k c) -> getEntry k c state >> return (Just state))
197+
match (handleTableRequest state)
198+
, matchAny (\_ -> return (Just state))
203199
]
204200
case ns of
205201
Nothing -> return ()
206202
Just s' -> tableHandler s'
203+
where
204+
handleTableRequest _ Delete = return Nothing
205+
handleTableRequest st Purge = return $ Just $ (entries ^= Map.empty) $ st
206+
handleTableRequest st (Clear k) = return $ Just $ (entries ^: Map.delete k) $ st
207+
handleTableRequest st (Set k v) = return $ Just $ (entries ^: Map.insert k v) st
208+
handleTableRequest st (Get k c) = getEntry k c st >> return (Just st)
207209

208210
getEntry :: String
209211
-> SendPort (Maybe Message)
210212
-> MxTableState
211213
-> Process ()
212-
getEntry k m MxTableState{..} = sendChan m =<< return (Map.lookup k _entries)
214+
getEntry k m MxTableState{..} = do
215+
sendChan m =<< return (Map.lookup k _entries)
213216

214217
entries :: Accessor MxTableState (Map String Message)
215218
entries = accessor _entries (\ls st -> st { _entries = ls })

0 commit comments

Comments
 (0)