Skip to content

Commit 07c5422

Browse files
Stop using the transport for local communication.
The NC was still using the transport to communicate with itself or with local processes when using the primitives in C.D.P.I.Messaging. As a side effect, this patch also stops creating connections involving processes when the NC sends a message. Only the node-to-node connection is used by the NC to send messages.
1 parent deadc44 commit 07c5422

File tree

1 file changed

+54
-76
lines changed
  • src/Control/Distributed/Process

1 file changed

+54
-76
lines changed

src/Control/Distributed/Process/Node.hs

Lines changed: 54 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,7 @@ import Control.Distributed.Process.Internal.Types
169169
, payloadToMessage
170170
, messageToPayload
171171
, createUnencodedMessage
172+
, unsafeCreateUnencodedMessage
172173
, runLocalProcess
173174
, firstNonReservedProcessId
174175
, ImplicitReconnect(WithImplicitReconnect,NoImplicitReconnect)
@@ -198,7 +199,6 @@ import Control.Distributed.Process.Management.Internal.Types
198199
import Control.Distributed.Process.Serializable (Serializable)
199200
import Control.Distributed.Process.Internal.Messaging
200201
( sendBinary
201-
, sendMessage
202202
, sendPayload
203203
, closeImplicitReconnections
204204
, impliesDeathOf
@@ -661,6 +661,30 @@ instance Show ProcessKillException where
661661
show (ProcessKillException pid reason) =
662662
"killed-by=" ++ show pid ++ ",reason=" ++ reason
663663

664+
ncSendToProcess :: ProcessId -> Message -> NC ()
665+
ncSendToProcess pid msg = do
666+
node <- ask
667+
if processNodeId pid == localNodeId node
668+
then ncEffectLocalSend node pid msg
669+
else liftIO $ sendBinary node
670+
(NodeIdentifier $ localNodeId node)
671+
(NodeIdentifier $ processNodeId pid)
672+
WithImplicitReconnect
673+
NCMsg { ctrlMsgSender = NodeIdentifier $ localNodeId node
674+
, ctrlMsgSignal = UnreliableSend (processLocalId pid) msg
675+
}
676+
677+
ncSendToNode :: NodeId -> NCMsg -> NC ()
678+
ncSendToNode to msg = do
679+
node <- ask
680+
liftIO $ if to == localNodeId node
681+
then writeChan (localCtrlChan node) $! msg
682+
else sendBinary node
683+
(NodeIdentifier $ localNodeId node)
684+
(NodeIdentifier to)
685+
WithImplicitReconnect
686+
msg
687+
664688
--------------------------------------------------------------------------------
665689
-- Tracing/Debugging --
666690
--------------------------------------------------------------------------------
@@ -703,11 +727,7 @@ nodeController = do
703727
-- [Unified: Table 7, rule nc_forward]
704728
case destNid (ctrlMsgSignal msg) of
705729
Just nid' | nid' /= localNodeId node ->
706-
liftIO $ sendBinary node
707-
(ctrlMsgSender msg)
708-
(NodeIdentifier nid')
709-
WithImplicitReconnect
710-
msg
730+
ncSendToNode nid' msg
711731
_ ->
712732
return ()
713733

@@ -728,8 +748,8 @@ nodeController = do
728748
ncEffectRegister from label atnode pid force
729749
NCMsg (ProcessIdentifier from) (WhereIs label) ->
730750
ncEffectWhereIs from label
731-
NCMsg (ProcessIdentifier from) (NamedSend label msg') ->
732-
ncEffectNamedSend from label msg'
751+
NCMsg _ (NamedSend label msg') ->
752+
ncEffectNamedSend label msg'
733753
NCMsg _ (UnreliableSend lpid msg') ->
734754
ncEffectLocalSend node (ProcessId (localNodeId node) lpid) msg'
735755
NCMsg _ (LocalSend to msg') ->
@@ -774,14 +794,10 @@ ncEffectMonitor from them mRef = do
774794
-- TODO: this is the right sender according to the Unified semantics,
775795
-- but perhaps having 'them' as the sender would make more sense
776796
-- (see also: notifyDied)
777-
liftIO $ sendBinary node
778-
(NodeIdentifier $ localNodeId node)
779-
(NodeIdentifier $ processNodeId from)
780-
WithImplicitReconnect
781-
NCMsg
782-
{ ctrlMsgSender = NodeIdentifier (localNodeId node)
783-
, ctrlMsgSignal = Died them DiedUnknownId
784-
}
797+
ncSendToNode (processNodeId from) $ NCMsg
798+
{ ctrlMsgSender = NodeIdentifier (localNodeId node)
799+
, ctrlMsgSignal = Died them DiedUnknownId
800+
}
785801

786802
-- [Unified: Table 11]
787803
ncEffectUnlink :: ProcessId -> Identifier -> NC ()
@@ -842,15 +858,10 @@ ncEffectDied ident reason = do
842858
False -> return $ Just (pid,nidlist) )
843859
modify' $ registeredOnNodes ^= (Map.fromList (catMaybes remaining))
844860
where
845-
forwardNameDeath node nid =
846-
liftIO $ sendBinary node
847-
(NodeIdentifier $ localNodeId node)
848-
(NodeIdentifier $ nid)
849-
WithImplicitReconnect
850-
NCMsg
851-
{ ctrlMsgSender = NodeIdentifier (localNodeId node)
852-
, ctrlMsgSignal = Died ident reason
853-
}
861+
forwardNameDeath node nid = ncSendToNode nid
862+
NCMsg { ctrlMsgSender = NodeIdentifier (localNodeId node)
863+
, ctrlMsgSignal = Died ident reason
864+
}
854865

855866
-- [Unified: Table 13]
856867
ncEffectSpawn :: ProcessId -> Closure (Process ()) -> SpawnRef -> NC ()
@@ -864,11 +875,7 @@ ncEffectSpawn pid cProc ref = do
864875
Right p -> p
865876
node <- ask
866877
pid' <- liftIO $ forkProcess node proc
867-
liftIO $ sendMessage node
868-
(NodeIdentifier (localNodeId node))
869-
(ProcessIdentifier pid)
870-
WithImplicitReconnect
871-
(DidSpawn ref pid')
878+
ncSendToProcess pid $ unsafeCreateUnencodedMessage $ DidSpawn ref pid'
872879

873880
-- Unified semantics does not explicitly describe how to implement 'register',
874881
-- but mentions it's "very similar to nsend" (Table 14)
@@ -893,11 +900,8 @@ ncEffectRegister from label atnode mPid reregistration = do
893900
(Just p) -> liftIO $ trace node (MxRegistered p label)
894901
Nothing -> liftIO $ trace node (MxUnRegistered (fromJust currentVal) label)
895902
newVal <- gets (^. registeredHereFor label)
896-
liftIO $ sendMessage node
897-
(NodeIdentifier (localNodeId node))
898-
(ProcessIdentifier from)
899-
WithImplicitReconnect
900-
(RegisterReply label isOk newVal)
903+
ncSendToProcess from $ unsafeCreateUnencodedMessage $
904+
RegisterReply label isOk newVal
901905
else let operation =
902906
case reregistration of
903907
True -> flip decList
@@ -927,39 +931,23 @@ ncEffectRegister from label atnode mPid reregistration = do
927931
decList (x:xs) tag = x:decList xs tag
928932
forward node to reg =
929933
when (not $ isLocal node (NodeIdentifier to)) $
930-
liftIO $ sendBinary node
931-
(ProcessIdentifier from)
932-
(NodeIdentifier to)
933-
WithImplicitReconnect
934-
NCMsg
935-
{ ctrlMsgSender = ProcessIdentifier from
936-
, ctrlMsgSignal = reg
937-
}
934+
ncSendToNode to $ NCMsg { ctrlMsgSender = ProcessIdentifier from
935+
, ctrlMsgSignal = reg
936+
}
938937

939938

940939
-- Unified semantics does not explicitly describe 'whereis'
941940
ncEffectWhereIs :: ProcessId -> String -> NC ()
942941
ncEffectWhereIs from label = do
943-
node <- ask
944942
mPid <- gets (^. registeredHereFor label)
945-
liftIO $ sendMessage node
946-
(NodeIdentifier (localNodeId node))
947-
(ProcessIdentifier from)
948-
WithImplicitReconnect
949-
(WhereIsReply label mPid)
943+
ncSendToProcess from $ unsafeCreateUnencodedMessage $ WhereIsReply label mPid
950944

951945
-- [Unified: Table 14]
952-
ncEffectNamedSend :: ProcessId -> String -> Message -> NC ()
953-
ncEffectNamedSend from label msg = do
946+
ncEffectNamedSend :: String -> Message -> NC ()
947+
ncEffectNamedSend label msg = do
954948
mPid <- gets (^. registeredHereFor label)
955-
node <- ask
956949
-- If mPid is Nothing, we just ignore the named send (as per Table 14)
957-
forM_ mPid $ \pid ->
958-
liftIO $ sendPayload node
959-
(ProcessIdentifier from)
960-
(ProcessIdentifier pid)
961-
NoImplicitReconnect
962-
(messageToPayload msg)
950+
forM_ mPid (`ncSendToProcess` msg)
963951

964952
-- [Issue #DP-20]
965953
ncEffectLocalSend :: LocalNode -> ProcessId -> Message -> NC ()
@@ -1017,7 +1005,7 @@ ncEffectGetInfo from pid =
10171005
$ return . (^. localProcessWithId lpid)
10181006
case mProc of
10191007
Nothing -> dispatch (isLocal node (ProcessIdentifier from))
1020-
from node (ProcessInfoNone DiedUnknownId)
1008+
from (ProcessInfoNone DiedUnknownId)
10211009
Just proc -> do
10221010
itsLinks <- gets (^. linksFor them)
10231011
itsMons <- gets (^. monitorsFor them)
@@ -1027,7 +1015,6 @@ ncEffectGetInfo from pid =
10271015
let reg = registeredNames registered
10281016
dispatch (isLocal node (ProcessIdentifier from))
10291017
from
1030-
node
10311018
ProcessInfo {
10321019
infoNode = (processNodeId pid)
10331020
, infoRegisteredNames = reg
@@ -1038,16 +1025,11 @@ ncEffectGetInfo from pid =
10381025
where dispatch :: (Serializable a, Show a)
10391026
=> Bool
10401027
-> ProcessId
1041-
-> LocalNode
10421028
-> a
10431029
-> NC ()
1044-
dispatch True dest _ pInfo = postAsMessage dest $ pInfo
1045-
dispatch False dest node pInfo = do
1046-
liftIO $ sendMessage node
1047-
(NodeIdentifier (localNodeId node))
1048-
(ProcessIdentifier dest)
1049-
WithImplicitReconnect
1050-
pInfo
1030+
dispatch True dest pInfo = postAsMessage dest $ pInfo
1031+
dispatch False dest pInfo =
1032+
ncSendToProcess dest $ unsafeCreateUnencodedMessage pInfo
10511033

10521034
registeredNames = Map.foldlWithKey (\ks k v -> if v == pid
10531035
then (k:ks)
@@ -1094,14 +1076,10 @@ notifyDied dest src reason mRef = do
10941076
throwException dest $ PortLinkException pid reason
10951077
(False, _, _) ->
10961078
-- The change in sender comes from [Unified: Table 10]
1097-
liftIO $ sendBinary node
1098-
(NodeIdentifier $ localNodeId node)
1099-
(NodeIdentifier $ processNodeId dest)
1100-
WithImplicitReconnect
1101-
NCMsg
1102-
{ ctrlMsgSender = NodeIdentifier (localNodeId node)
1103-
, ctrlMsgSignal = Died src reason
1104-
}
1079+
ncSendToNode (processNodeId dest) $ NCMsg
1080+
{ ctrlMsgSender = NodeIdentifier (localNodeId node)
1081+
, ctrlMsgSignal = Died src reason
1082+
}
11051083

11061084
-- | [Unified: Table 8]
11071085
destNid :: ProcessSignal -> Maybe NodeId

0 commit comments

Comments
 (0)