Skip to content

Commit 32d3bf4

Browse files
committed
Support spawn (as well as call)
1 parent b0fb243 commit 32d3bf4

File tree

4 files changed

+121
-49
lines changed

4 files changed

+121
-49
lines changed

distributed-process-azure/distributed-process-azure.cabal

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,8 @@ Library
3131
network-transport-tcp >= 0.2 && < 0.3,
3232
optparse-applicative >= 0.2 && < 0.4,
3333
transformers >= 0.3 && < 0.4,
34-
certificate == 1.2.3
34+
certificate == 1.2.3,
35+
unix >= 2.5 && < 2.6
3536
Exposed-modules: Control.Distributed.Process.Backend.Azure,
3637
Control.Distributed.Process.Backend.Azure.GenericMain
3738
Extensions: ViewPatterns,

distributed-process-azure/src/Control/Distributed/Process/Backend/Azure.hs

Lines changed: 34 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ import System.Environment.Executable (getExecutablePath)
1717
import Data.Binary (encode, decode)
1818
import Data.Digest.Pure.MD5 (md5, MD5Digest)
1919
import qualified Data.ByteString.Char8 as BSSC (pack)
20-
import qualified Data.ByteString.Lazy as BSL (ByteString, readFile, putStr)
20+
import qualified Data.ByteString.Lazy as BSL (ByteString, readFile, putStr, writeFile)
2121
import Data.Typeable (Typeable)
2222
import Control.Applicative ((<$>))
2323
import Control.Monad (void)
@@ -102,6 +102,9 @@ data Backend = Backend {
102102
, checkMD5 :: VirtualMachine -> IO Bool
103103
-- | @runOnVM dict vm port p@ starts a CH node on port 'port' and runs 'p'
104104
, callOnVM :: forall a. Serializable a => Static (SerializableDict a) -> VirtualMachine -> String -> Closure (Process a) -> IO a
105+
-- | Create a new CH node and run the specified process in the background.
106+
-- The CH node will exit when the process exists.
107+
, spawnOnVM :: VirtualMachine -> String -> Closure (Process ()) -> IO ()
105108
}
106109

107110
data AzureParameters = AzureParameters {
@@ -150,6 +153,7 @@ initializeBackend params = do
150153
, copyToVM = apiCopyToVM params
151154
, checkMD5 = apiCheckMD5 params
152155
, callOnVM = apiCallOnVM params
156+
, spawnOnVM = apiSpawnOnVM params
153157
}
154158

155159
-- | Start a CH node on the given virtual machine
@@ -161,21 +165,35 @@ apiCopyToVM params vm =
161165
-- | Call a process on a VM
162166
apiCallOnVM :: Serializable a => AzureParameters -> Static (SerializableDict a) -> VirtualMachine -> String -> Closure (Process a) -> IO a
163167
apiCallOnVM params dict vm port proc =
164-
withSSH2 params vm $ \s -> do
165-
let exe = "PATH=. " ++ azureSshRemotePath params
166-
++ " onvm run "
167-
++ " --host " ++ vmIpAddress vm
168-
++ " --port " ++ port
169-
++ " 2>&1"
170-
(_, r) <- SSH.withChannelBy (SSH.openChannelSession s) id $ \ch -> do
171-
SSH.channelExecute ch exe
172-
_cnt <- SSH.writeAllChannel ch (encode proc')
173-
SSH.channelSendEOF ch
174-
SSH.readAllChannel ch
175-
return (decode r)
176-
where
177-
proc' :: Closure (Process ())
178-
proc' = proc `cpBind` cpEncodeToStdout dict
168+
withSSH2 params vm $ \s -> do
169+
let exe = "PATH=. " ++ azureSshRemotePath params
170+
++ " onvm run "
171+
++ " --host " ++ vmIpAddress vm
172+
++ " --port " ++ port
173+
++ " 2>&1"
174+
(_, r) <- SSH.withChannelBy (SSH.openChannelSession s) id $ \ch -> do
175+
SSH.channelExecute ch exe
176+
_cnt <- SSH.writeAllChannel ch (encode $ proc `cpBind` cpEncodeToStdout dict)
177+
SSH.channelSendEOF ch
178+
SSH.readAllChannel ch
179+
return (decode r)
180+
181+
apiSpawnOnVM :: AzureParameters -> VirtualMachine -> String -> Closure (Process ()) -> IO ()
182+
apiSpawnOnVM params vm port proc =
183+
withSSH2 params vm $ \s -> do
184+
let exe = "PATH=. " ++ azureSshRemotePath params
185+
++ " onvm run "
186+
++ " --host " ++ vmIpAddress vm
187+
++ " --port " ++ port
188+
++ " --background "
189+
++ " 2>&1"
190+
BSL.writeFile "closure" (encode proc)
191+
r <- SSH.withChannelBy (SSH.openChannelSession s) id $ \ch -> do
192+
SSH.channelExecute ch exe
193+
_cnt <- SSH.writeAllChannel ch (encode proc)
194+
SSH.channelSendEOF ch
195+
SSH.readAllChannel ch
196+
print r
179197

180198
-- | Check the MD5 hash of the executable on the remote machine
181199
apiCheckMD5 :: AzureParameters -> VirtualMachine -> IO Bool
Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
{-# LANGUAGE TemplateHaskell #-}
22

33
import Control.Monad.IO.Class (liftIO)
4-
import Control.Distributed.Process (Process, ProcessId, getSelfPid)
4+
import Control.Concurrent (threadDelay)
5+
import Control.Distributed.Process (Process, ProcessId, getSelfPid, Closure)
56
import Control.Distributed.Process.Closure (remotable, mkClosure, sdictProcessId)
67
import Control.Distributed.Process.Backend.Azure.GenericMain
78
( genericMain
@@ -13,9 +14,24 @@ getPid () = do
1314
liftIO $ appendFile "Log" "getPid did run"
1415
getSelfPid
1516

16-
remotable ['getPid]
17+
logN :: Int -> Process ()
18+
logN 0 =
19+
liftIO $ appendFile "Log" "logN done\n"
20+
logN n = do
21+
liftIO $ do
22+
appendFile "Log" $ "logN " ++ show n ++ "\n"
23+
threadDelay 1000000
24+
logN (n - 1)
1725

18-
main = genericMain __remoteTable $ \cmd ->
19-
case cmd of
20-
"hello" -> return $ ProcessPair ($(mkClosure 'getPid) ()) print sdictProcessId
21-
_ -> error "unknown command"
26+
remotable ['getPid, 'logN]
27+
28+
main :: IO ()
29+
main = genericMain __remoteTable callable spawnable
30+
where
31+
callable :: String -> IO (ProcessPair ())
32+
callable "getPid" = return $ ProcessPair ($(mkClosure 'getPid) ()) print sdictProcessId
33+
callable _ = error "spawnable: unknown"
34+
35+
spawnable :: String -> IO (Closure (Process ()))
36+
spawnable "logN" = return $ $(mkClosure 'logN) (10 :: Int)
37+
spawnable _ = error "callable: unknown"

distributed-process-azure/src/Control/Distributed/Process/Backend/Azure/GenericMain.hs

Lines changed: 63 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -10,15 +10,20 @@ import System.IO
1010
( hFlush
1111
, stdout
1212
, stdin
13+
, stderr
1314
, hSetBinaryMode
15+
, hClose
1416
)
1517
import Data.Binary (decode)
16-
import qualified Data.ByteString.Lazy as BSL (ByteString, getContents)
17-
import Control.Monad (unless, forM, forM_, join)
18-
import Control.Exception (throwIO, SomeException)
18+
import qualified Data.ByteString.Lazy as BSL (getContents, length)
19+
import Control.Monad (unless, forM, forM_, join, void)
20+
import Control.Exception (throwIO, SomeException, evaluate)
1921
import Control.Applicative ((<$>), (<*>), (<|>))
2022
import Control.Monad.IO.Class (liftIO)
2123

24+
-- Posix
25+
import System.Posix.Process (forkProcess, createSession)
26+
2227
-- SSH
2328
import qualified Network.SSH.Client.LibSSH2.Foreign as SSH
2429
( initialize
@@ -64,7 +69,7 @@ import Control.Distributed.Process.Backend.Azure
6469
, cloudServices
6570
, CloudService(cloudServiceName, cloudServiceVMs)
6671
, VirtualMachine(vmName)
67-
, Backend(copyToVM, checkMD5, callOnVM)
72+
, Backend(copyToVM, checkMD5, callOnVM, spawnOnVM)
6873
)
6974
import qualified Control.Distributed.Process.Backend.Azure as Azure (remoteTable)
7075

@@ -78,10 +83,11 @@ data ProcessPair b = forall a. Serializable a => ProcessPair {
7883
, ppairDict :: Static (SerializableDict a)
7984
}
8085

81-
genericMain :: (RemoteTable -> RemoteTable) -- ^ Standard CH remote table
82-
-> (String -> IO (ProcessPair ())) -- ^ Closures to support in 'run'
86+
genericMain :: (RemoteTable -> RemoteTable) -- ^ Standard CH remote table
87+
-> (String -> IO (ProcessPair ())) -- ^ Closures to support in 'run'
88+
-> (String -> IO (Closure (Process ()))) -- ^ Closures to support in @run --background@
8389
-> IO ()
84-
genericMain remoteTable cmds = do
90+
genericMain remoteTable callable spawnable = do
8591
_ <- SSH.initialize True
8692
cmd <- execParser opts
8793
case cmd of
@@ -110,20 +116,31 @@ genericMain remoteTable cmds = do
110116
if and matches
111117
then exitSuccess
112118
else exitFailure
113-
RunOn {} -> do
114-
procPair <- cmds (closureId cmd)
119+
RunOn {} | background cmd -> do
120+
rProc <- spawnable (closureId cmd)
121+
params <- azureParameters (azureOptions cmd) (Just (sshOptions cmd))
122+
backend <- initializeBackend params
123+
css <- cloudServices backend
124+
forM_ (findTarget (target cmd) css) $ \vm -> do
125+
putStr (vmName vm ++ ": ") >> hFlush stdout
126+
spawnOnVM backend vm (remotePort cmd) rProc
127+
putStrLn "OK"
128+
RunOn {} {- not (background cmd) -} -> do
129+
procPair <- callable (closureId cmd)
115130
params <- azureParameters (azureOptions cmd) (Just (sshOptions cmd))
116131
backend <- initializeBackend params
117132
css <- cloudServices backend
118133
forM_ (findTarget (target cmd) css) $ \vm -> do
119134
putStr (vmName vm ++ ": ") >> hFlush stdout
120135
case procPair of
121-
ProcessPair rProc lProc dict ->
122-
callOnVM backend dict vm (remotePort cmd) rProc >>= lProc
136+
ProcessPair rProc lProc dict -> do
137+
result <- callOnVM backend dict vm (remotePort cmd) rProc
138+
lProc result
123139
OnVmCommand (vmCmd@OnVmRun {}) ->
124140
onVmRun (remoteTable . Azure.remoteTable $ initRemoteTable)
125141
(onVmIP vmCmd)
126142
(onVmPort vmCmd)
143+
(onVmBackground vmCmd)
127144
SSH.exit
128145
where
129146
opts = info (helper <*> commandParser)
@@ -152,19 +169,35 @@ azureParameters opts (Just sshOpts) = do
152169
azureSshUserName = remoteUser sshOpts
153170
}
154171

155-
onVmRun :: RemoteTable -> String -> String -> IO ()
156-
onVmRun rtable host port = do
157-
hSetBinaryMode stdin True
158-
hSetBinaryMode stdout True
159-
proc <- BSL.getContents :: IO BSL.ByteString
160-
mTransport <- createTransport host port defaultTCPParameters
161-
case mTransport of
162-
Left err -> throwIO err
163-
Right transport -> do
164-
node <- newLocalNode transport rtable
165-
runProcess node $
166-
catch (join . unClosure . decode $ proc)
167-
(\e -> liftIO (print (e :: SomeException) >> throwIO e))
172+
onVmRun :: RemoteTable -> String -> String -> Bool -> IO ()
173+
onVmRun rtable host port bg = do
174+
hSetBinaryMode stdin True
175+
hSetBinaryMode stdout True
176+
procEnc <- BSL.getContents
177+
-- Force evaluation (so that we can safely close stdin)
178+
_length <- evaluate (BSL.length procEnc)
179+
let proc = decode procEnc
180+
if bg
181+
then do
182+
hClose stdin
183+
hClose stdout
184+
hClose stderr
185+
void . forkProcess $ do
186+
void createSession
187+
startCH proc
188+
else
189+
startCH proc
190+
where
191+
startCH :: Closure (Process ()) -> IO ()
192+
startCH proc = do
193+
mTransport <- createTransport host port defaultTCPParameters
194+
case mTransport of
195+
Left err -> throwIO err
196+
Right transport -> do
197+
node <- newLocalNode transport rtable
198+
runProcess node $
199+
catch (join . unClosure $ proc)
200+
(\e -> liftIO (print (e :: SomeException) >> throwIO e))
168201

169202
--------------------------------------------------------------------------------
170203
-- Command line options --
@@ -217,8 +250,9 @@ data Command =
217250

218251
data OnVmCommand =
219252
OnVmRun {
220-
onVmIP :: String
221-
, onVmPort :: String
253+
onVmIP :: String
254+
, onVmPort :: String
255+
, onVmBackground :: Bool
222256
}
223257
deriving Show
224258

@@ -317,6 +351,9 @@ onVmRunParser = OnVmRun
317351
& metavar "PORT"
318352
& help "port number"
319353
)
354+
<*> switch ( long "background"
355+
& help "Run the process in the background"
356+
)
320357

321358
onVmCommandParser :: Parser Command
322359
onVmCommandParser = OnVmCommand <$> subparser

0 commit comments

Comments
 (0)