Skip to content

Commit 2e682fb

Browse files
committed
Implement reconnect. Use it to clean up in 'call'
1 parent 19953e1 commit 2e682fb

File tree

5 files changed

+72
-1
lines changed

5 files changed

+72
-1
lines changed

distributed-process/src/Control/Distributed/Process.hs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,10 @@ module Control.Distributed.Process
9898
-- * Local versions of 'spawn'
9999
, spawnLocal
100100
, spawnChannelLocal
101+
-- * Reconnecting
102+
, reconnect
103+
, reconnectNode
104+
, reconnectPort
101105
) where
102106

103107
#if ! MIN_VERSION_base(4,6,0)
@@ -206,6 +210,10 @@ import Control.Distributed.Process.Internal.Primitives
206210
-- Auxiliary API
207211
, expectTimeout
208212
, spawnAsync
213+
-- Reconnecting
214+
, reconnect
215+
, reconnectNode
216+
, reconnectPort
209217
)
210218
import Control.Distributed.Process.Serializable (Serializable)
211219
import Control.Distributed.Process.Node (forkProcess)
@@ -333,7 +341,7 @@ spawnMonitor nid proc = do
333341
call :: Serializable a => Static (SerializableDict a) -> NodeId -> Closure (Process a) -> Process a
334342
call dict nid proc = do
335343
us <- getSelfPid
336-
(_, mRef) <- spawnMonitor nid (proc `bindCP` cpSend dict us)
344+
(pid, mRef) <- spawnMonitor nid (proc `bindCP` cpSend dict us)
337345
-- We are guaranteed to receive the reply before the monitor notification
338346
-- (if a reply is sent at all)
339347
-- NOTE: This might not be true if we switch to unreliable delivery.
@@ -349,6 +357,8 @@ call dict nid proc = do
349357
[ matchIf (\(ProcessMonitorNotification ref _ _) -> ref == mRef)
350358
(\(ProcessMonitorNotification {}) -> return ())
351359
]
360+
-- Clean up connection to pid
361+
reconnect pid
352362
return a
353363
Left err ->
354364
fail $ "call: remote process died: " ++ show err

distributed-process/src/Control/Distributed/Process/Internal/Node.hs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ module Control.Distributed.Process.Internal.Node
33
sendPayload
44
, sendBinary
55
, sendMessage
6+
, reconnect
67
) where
78

89
import Data.Accessor ((^.), (^=))
@@ -90,3 +91,8 @@ connBetween node from to = do
9091
Nothing -> setupConnBetween node from to
9192
where
9293
nodeState = localState node
94+
95+
reconnect :: LocalNode -> Identifier -> Identifier -> IO ()
96+
reconnect node from to =
97+
modifyMVar_ (localState node) $ return .
98+
(localConnectionBetween from to ^= Nothing)

distributed-process/src/Control/Distributed/Process/Internal/Primitives.hs

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,10 @@ module Control.Distributed.Process.Internal.Primitives
5959
, unlinkPort
6060
, monitorNode
6161
, monitorPort
62+
-- * Reconnecting
63+
, reconnect
64+
, reconnectNode
65+
, reconnectPort
6266
) where
6367

6468
#if ! MIN_VERSION_base(4,6,0)
@@ -506,6 +510,44 @@ unClosure closure = do
506510
Left err -> fail $ "Could not resolve closure: " ++ err
507511
Right x -> return x
508512

513+
--------------------------------------------------------------------------------
514+
-- Reconnecting --
515+
--------------------------------------------------------------------------------
516+
517+
-- | Cloud Haskell provides the illusion of connection-less, reliable, ordered
518+
-- message passing. However, when network connections get disrupted this
519+
-- illusion cannot always be maintained. Once a network connection breaks (even
520+
-- temporarily) no further communication on that connection will be possible.
521+
-- For example, if process A sends a message to process B, and A is then
522+
-- notified (by monitor notification) that it got disconnected from B, A will
523+
-- not be able to send any further messages to B, /unless/ A explicitly
524+
-- indicates that it is acceptable to attempt to reconnect to B using the
525+
-- Cloud Haskell 'reconnect' primitive.
526+
--
527+
-- Importantly, when A calls 'reconnect' it acknowledges that some messages to
528+
-- B might have been lost. For instance, if A sends messages m1 and m2 to B,
529+
-- then receives a monitor notification that its connection to B has been lost,
530+
-- calls 'reconnect' and then sends m3, it is possible that B will receive m1
531+
-- and m3 but not m2.
532+
--
533+
-- Note that 'reconnect' does not mean /reconnect now/ but rather /it is okay
534+
-- to attempt to reconnect on the next send/. In particular, if no further
535+
-- communication attempts are made to B then A can use reconnect to clean up
536+
-- its connection to B.
537+
reconnect :: ProcessId -> Process ()
538+
reconnect =
539+
sendCtrlMsg Nothing . Reconnect . ProcessIdentifier
540+
541+
-- | Reconnect to a node. See 'reconnect' for more information.
542+
reconnectNode :: NodeId -> Process ()
543+
reconnectNode =
544+
sendCtrlMsg Nothing . Reconnect . NodeIdentifier
545+
546+
-- | Reconnect to a sendport. See 'reconnect' for more information.
547+
reconnectPort :: SendPort a -> Process ()
548+
reconnectPort =
549+
sendCtrlMsg Nothing . Reconnect . SendPortIdentifier . sendPortId
550+
509551
--------------------------------------------------------------------------------
510552
-- Auxiliary functions --
511553
--------------------------------------------------------------------------------

distributed-process/src/Control/Distributed/Process/Internal/Types.hs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -385,6 +385,7 @@ data ProcessSignal =
385385
| WhereIs String
386386
| Register String (Maybe ProcessId) -- Nothing to unregister
387387
| NamedSend String Message
388+
| Reconnect Identifier
388389
deriving Show
389390

390391
--------------------------------------------------------------------------------
@@ -429,6 +430,7 @@ instance Binary ProcessSignal where
429430
put (WhereIs label) = putWord8 6 >> put label
430431
put (Register label pid) = putWord8 7 >> put label >> put pid
431432
put (NamedSend label msg) = putWord8 8 >> put label >> put (messageToPayload msg)
433+
put (Reconnect dest) = putWord8 9 >> put dest
432434
get = do
433435
header <- getWord8
434436
case header of
@@ -441,6 +443,7 @@ instance Binary ProcessSignal where
441443
6 -> WhereIs <$> get
442444
7 -> Register <$> get <*> get
443445
8 -> NamedSend <$> get <*> (payloadToMessage <$> get)
446+
9 -> Reconnect <$> get
444447
_ -> fail "ProcessSignal.get: invalid"
445448

446449
instance Binary DiedReason where

distributed-process/src/Control/Distributed/Process/Node.hs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,7 @@ import Control.Distributed.Process.Internal.Node
121121
( sendBinary
122122
, sendMessage
123123
, sendPayload
124+
, reconnect
124125
)
125126
import Control.Distributed.Process.Internal.Primitives (expect, register, finally)
126127
import qualified Control.Distributed.Process.Internal.Closure.BuiltIn as BuiltIn (remoteTable)
@@ -434,6 +435,8 @@ nodeController = do
434435
ncEffectWhereIs from label
435436
NCMsg from (NamedSend label msg') ->
436437
ncEffectNamedSend from label msg'
438+
NCMsg from (Reconnect to) ->
439+
ncEffectReconnect from to
437440
unexpected ->
438441
error $ "nodeController: unexpected message " ++ show unexpected
439442

@@ -557,6 +560,12 @@ ncEffectNamedSend from label msg = do
557560
(ProcessIdentifier pid)
558561
(messageToPayload msg)
559562

563+
-- Reconnecting
564+
ncEffectReconnect :: Identifier -> Identifier -> NC ()
565+
ncEffectReconnect from to = do
566+
node <- ask
567+
liftIO $ reconnect node from to
568+
560569
--------------------------------------------------------------------------------
561570
-- Auxiliary --
562571
--------------------------------------------------------------------------------
@@ -602,6 +611,7 @@ destNid (Spawn _ _) = Nothing
602611
destNid (Register _ _) = Nothing
603612
destNid (WhereIs _) = Nothing
604613
destNid (NamedSend _ _) = Nothing
614+
destNid (Reconnect _) = Nothing
605615
-- We don't need to forward 'Died' signals; if monitoring/linking is setup,
606616
-- then when a local process dies the monitoring/linking machinery will take
607617
-- care of notifying remote nodes

0 commit comments

Comments
 (0)