|
| 1 | +{- ProcessRing benchmarks. |
| 2 | +
|
| 3 | +To run the benchmarks, select a value for the ring size (sz) and |
| 4 | +the number of times to send a message around the ring |
| 5 | +-} |
| 6 | +import Control.Monad |
| 7 | +import Control.Distributed.Process hiding (catch) |
| 8 | +import Control.Distributed.Process.Node |
| 9 | +import Network.Transport.TCP (createTransport, defaultTCPParameters) |
| 10 | +import System.Environment |
| 11 | +import System.Console.GetOpt |
| 12 | + |
| 13 | +data Options = Options |
| 14 | + { optRingSize :: Int |
| 15 | + , optIterations :: Int |
| 16 | + , optForward :: Bool |
| 17 | + , optParallel :: Bool |
| 18 | + } deriving Show |
| 19 | + |
| 20 | +initialProcess :: Options -> Process () |
| 21 | +initialProcess op = |
| 22 | + let ringSz = optRingSize op |
| 23 | + msgCnt = optIterations op |
| 24 | + fwd = optForward op |
| 25 | + msg = ("foobar", "baz") |
| 26 | + in do |
| 27 | + self <- getSelfPid |
| 28 | + ring <- makeRing fwd ringSz self |
| 29 | + forM_ [1..msgCnt] (\_ -> send ring msg) |
| 30 | + collect msgCnt |
| 31 | + where relay' pid = do |
| 32 | + msg <- expect :: Process (String, String) |
| 33 | + send pid msg |
| 34 | + relay' pid |
| 35 | + |
| 36 | + forward' pid = |
| 37 | + receiveWait [ matchAny (\m -> forward m pid) ] >> forward' pid |
| 38 | + |
| 39 | + makeRing :: Bool -> Int -> ProcessId -> Process ProcessId |
| 40 | + makeRing !f !n !pid |
| 41 | + | n == 0 = go f pid |
| 42 | + | otherwise = go f pid >>= makeRing f (n - 1) |
| 43 | + |
| 44 | + go :: Bool -> ProcessId -> Process ProcessId |
| 45 | + go False next = spawnLocal $ relay' next |
| 46 | + go True next = spawnLocal $ forward' next |
| 47 | + |
| 48 | + collect :: Int -> Process () |
| 49 | + collect !n |
| 50 | + | n == 0 = return () |
| 51 | + | otherwise = do |
| 52 | + receiveWait [ |
| 53 | + matchIf (\(a, b) -> a == "foobar" && b == "baz") |
| 54 | + (\_ -> return ()) |
| 55 | + , matchAny (\_ -> error "unexpected input!") |
| 56 | + ] |
| 57 | + collect (n - 1) |
| 58 | + |
| 59 | +defaultOptions :: Options |
| 60 | +defaultOptions = Options |
| 61 | + { optRingSize = 10 |
| 62 | + , optIterations = 100 |
| 63 | + , optForward = False |
| 64 | + , optParallel = False |
| 65 | + } |
| 66 | + |
| 67 | +options :: [OptDescr (Options -> Options)] |
| 68 | +options = |
| 69 | + [ Option ['s'] ["ring-size"] (OptArg optSz "SIZE") "# of processes in ring" |
| 70 | + , Option ['i'] ["iterations"] (OptArg optMsgCnt "ITER") "# of times to send" |
| 71 | + , Option ['f'] ["forward"] |
| 72 | + (NoArg (\opts -> opts { optForward = True })) |
| 73 | + "use `forward' instead of send - default = False" |
| 74 | + , Option ['p'] ["parallel"] |
| 75 | + (NoArg (\opts -> opts { optForward = True })) |
| 76 | + "send in parallel and consume sequentially - default = False" |
| 77 | + ] |
| 78 | + |
| 79 | +optMsgCnt :: Maybe String -> Options -> Options |
| 80 | +optMsgCnt Nothing opts = opts |
| 81 | +optMsgCnt (Just c) opts = opts { optIterations = ((read c) :: Int) } |
| 82 | + |
| 83 | +optSz :: Maybe String -> Options -> Options |
| 84 | +optSz Nothing opts = opts |
| 85 | +optSz (Just s) opts = opts { optRingSize = ((read s) :: Int) } |
| 86 | + |
| 87 | +parseArgv :: [String] -> IO (Options, [String]) |
| 88 | +parseArgv argv = do |
| 89 | + pn <- getProgName |
| 90 | + case getOpt Permute options argv of |
| 91 | + (o,n,[] ) -> return (foldl (flip id) defaultOptions o, n) |
| 92 | + (_,_,errs) -> ioError (userError (concat errs ++ usageInfo (header pn) options)) |
| 93 | + where header pn' = "Usage: " ++ pn' ++ " [OPTION...]" |
| 94 | + |
| 95 | +main :: IO () |
| 96 | +main = do |
| 97 | + argv <- getArgs |
| 98 | + (opt, _) <- parseArgv argv |
| 99 | + putStrLn $ "options: " ++ (show opt) |
| 100 | + Right transport <- createTransport "127.0.0.1" "8090" defaultTCPParameters |
| 101 | + node <- newLocalNode transport initRemoteTable |
| 102 | + catch (runProcess node $ initialProcess opt) |
| 103 | + (\e -> putStrLn $ "ERROR: " ++ (show e)) |
| 104 | + |
0 commit comments