@@ -169,6 +169,7 @@ import Control.Distributed.Process.Internal.Types
169
169
, payloadToMessage
170
170
, messageToPayload
171
171
, createUnencodedMessage
172
+ , unsafeCreateUnencodedMessage
172
173
, runLocalProcess
173
174
, firstNonReservedProcessId
174
175
, ImplicitReconnect (WithImplicitReconnect ,NoImplicitReconnect )
@@ -198,7 +199,6 @@ import Control.Distributed.Process.Management.Internal.Types
198
199
import Control.Distributed.Process.Serializable (Serializable )
199
200
import Control.Distributed.Process.Internal.Messaging
200
201
( sendBinary
201
- , sendMessage
202
202
, sendPayload
203
203
, closeImplicitReconnections
204
204
, impliesDeathOf
@@ -661,6 +661,30 @@ instance Show ProcessKillException where
661
661
show (ProcessKillException pid reason) =
662
662
" killed-by=" ++ show pid ++ " ,reason=" ++ reason
663
663
664
+ ncSendToProcess :: ProcessId -> Message -> NC ()
665
+ ncSendToProcess pid msg = do
666
+ node <- ask
667
+ if processNodeId pid == localNodeId node
668
+ then ncEffectLocalSend node pid msg
669
+ else liftIO $ sendBinary node
670
+ (NodeIdentifier $ localNodeId node)
671
+ (NodeIdentifier $ processNodeId pid)
672
+ WithImplicitReconnect
673
+ NCMsg { ctrlMsgSender = NodeIdentifier $ localNodeId node
674
+ , ctrlMsgSignal = UnreliableSend (processLocalId pid) msg
675
+ }
676
+
677
+ ncSendToNode :: NodeId -> NCMsg -> NC ()
678
+ ncSendToNode to msg = do
679
+ node <- ask
680
+ liftIO $ if to == localNodeId node
681
+ then writeChan (localCtrlChan node) $! msg
682
+ else sendBinary node
683
+ (NodeIdentifier $ localNodeId node)
684
+ (NodeIdentifier to)
685
+ WithImplicitReconnect
686
+ msg
687
+
664
688
--------------------------------------------------------------------------------
665
689
-- Tracing/Debugging --
666
690
--------------------------------------------------------------------------------
@@ -703,11 +727,7 @@ nodeController = do
703
727
-- [Unified: Table 7, rule nc_forward]
704
728
case destNid (ctrlMsgSignal msg) of
705
729
Just nid' | nid' /= localNodeId node ->
706
- liftIO $ sendBinary node
707
- (ctrlMsgSender msg)
708
- (NodeIdentifier nid')
709
- WithImplicitReconnect
710
- msg
730
+ ncSendToNode nid' msg
711
731
_ ->
712
732
return ()
713
733
@@ -728,8 +748,8 @@ nodeController = do
728
748
ncEffectRegister from label atnode pid force
729
749
NCMsg (ProcessIdentifier from) (WhereIs label) ->
730
750
ncEffectWhereIs from label
731
- NCMsg ( ProcessIdentifier from) (NamedSend label msg') ->
732
- ncEffectNamedSend from label msg'
751
+ NCMsg _ (NamedSend label msg') ->
752
+ ncEffectNamedSend label msg'
733
753
NCMsg _ (UnreliableSend lpid msg') ->
734
754
ncEffectLocalSend node (ProcessId (localNodeId node) lpid) msg'
735
755
NCMsg _ (LocalSend to msg') ->
@@ -774,14 +794,10 @@ ncEffectMonitor from them mRef = do
774
794
-- TODO: this is the right sender according to the Unified semantics,
775
795
-- but perhaps having 'them' as the sender would make more sense
776
796
-- (see also: notifyDied)
777
- liftIO $ sendBinary node
778
- (NodeIdentifier $ localNodeId node)
779
- (NodeIdentifier $ processNodeId from)
780
- WithImplicitReconnect
781
- NCMsg
782
- { ctrlMsgSender = NodeIdentifier (localNodeId node)
783
- , ctrlMsgSignal = Died them DiedUnknownId
784
- }
797
+ ncSendToNode (processNodeId from) $ NCMsg
798
+ { ctrlMsgSender = NodeIdentifier (localNodeId node)
799
+ , ctrlMsgSignal = Died them DiedUnknownId
800
+ }
785
801
786
802
-- [Unified: Table 11]
787
803
ncEffectUnlink :: ProcessId -> Identifier -> NC ()
@@ -842,15 +858,10 @@ ncEffectDied ident reason = do
842
858
False -> return $ Just (pid,nidlist) )
843
859
modify' $ registeredOnNodes ^= (Map. fromList (catMaybes remaining))
844
860
where
845
- forwardNameDeath node nid =
846
- liftIO $ sendBinary node
847
- (NodeIdentifier $ localNodeId node)
848
- (NodeIdentifier $ nid)
849
- WithImplicitReconnect
850
- NCMsg
851
- { ctrlMsgSender = NodeIdentifier (localNodeId node)
852
- , ctrlMsgSignal = Died ident reason
853
- }
861
+ forwardNameDeath node nid = ncSendToNode nid
862
+ NCMsg { ctrlMsgSender = NodeIdentifier (localNodeId node)
863
+ , ctrlMsgSignal = Died ident reason
864
+ }
854
865
855
866
-- [Unified: Table 13]
856
867
ncEffectSpawn :: ProcessId -> Closure (Process () ) -> SpawnRef -> NC ()
@@ -864,11 +875,7 @@ ncEffectSpawn pid cProc ref = do
864
875
Right p -> p
865
876
node <- ask
866
877
pid' <- liftIO $ forkProcess node proc
867
- liftIO $ sendMessage node
868
- (NodeIdentifier (localNodeId node))
869
- (ProcessIdentifier pid)
870
- WithImplicitReconnect
871
- (DidSpawn ref pid')
878
+ ncSendToProcess pid $ unsafeCreateUnencodedMessage $ DidSpawn ref pid'
872
879
873
880
-- Unified semantics does not explicitly describe how to implement 'register',
874
881
-- but mentions it's "very similar to nsend" (Table 14)
@@ -893,11 +900,8 @@ ncEffectRegister from label atnode mPid reregistration = do
893
900
(Just p) -> liftIO $ trace node (MxRegistered p label)
894
901
Nothing -> liftIO $ trace node (MxUnRegistered (fromJust currentVal) label)
895
902
newVal <- gets (^. registeredHereFor label)
896
- liftIO $ sendMessage node
897
- (NodeIdentifier (localNodeId node))
898
- (ProcessIdentifier from)
899
- WithImplicitReconnect
900
- (RegisterReply label isOk newVal)
903
+ ncSendToProcess from $ unsafeCreateUnencodedMessage $
904
+ RegisterReply label isOk newVal
901
905
else let operation =
902
906
case reregistration of
903
907
True -> flip decList
@@ -927,39 +931,23 @@ ncEffectRegister from label atnode mPid reregistration = do
927
931
decList (x: xs) tag = x: decList xs tag
928
932
forward node to reg =
929
933
when (not $ isLocal node (NodeIdentifier to)) $
930
- liftIO $ sendBinary node
931
- (ProcessIdentifier from)
932
- (NodeIdentifier to)
933
- WithImplicitReconnect
934
- NCMsg
935
- { ctrlMsgSender = ProcessIdentifier from
936
- , ctrlMsgSignal = reg
937
- }
934
+ ncSendToNode to $ NCMsg { ctrlMsgSender = ProcessIdentifier from
935
+ , ctrlMsgSignal = reg
936
+ }
938
937
939
938
940
939
-- Unified semantics does not explicitly describe 'whereis'
941
940
ncEffectWhereIs :: ProcessId -> String -> NC ()
942
941
ncEffectWhereIs from label = do
943
- node <- ask
944
942
mPid <- gets (^. registeredHereFor label)
945
- liftIO $ sendMessage node
946
- (NodeIdentifier (localNodeId node))
947
- (ProcessIdentifier from)
948
- WithImplicitReconnect
949
- (WhereIsReply label mPid)
943
+ ncSendToProcess from $ unsafeCreateUnencodedMessage $ WhereIsReply label mPid
950
944
951
945
-- [Unified: Table 14]
952
- ncEffectNamedSend :: ProcessId -> String -> Message -> NC ()
953
- ncEffectNamedSend from label msg = do
946
+ ncEffectNamedSend :: String -> Message -> NC ()
947
+ ncEffectNamedSend label msg = do
954
948
mPid <- gets (^. registeredHereFor label)
955
- node <- ask
956
949
-- If mPid is Nothing, we just ignore the named send (as per Table 14)
957
- forM_ mPid $ \ pid ->
958
- liftIO $ sendPayload node
959
- (ProcessIdentifier from)
960
- (ProcessIdentifier pid)
961
- NoImplicitReconnect
962
- (messageToPayload msg)
950
+ forM_ mPid (`ncSendToProcess` msg)
963
951
964
952
-- [Issue #DP-20]
965
953
ncEffectLocalSend :: LocalNode -> ProcessId -> Message -> NC ()
@@ -1017,7 +1005,7 @@ ncEffectGetInfo from pid =
1017
1005
$ return . (^. localProcessWithId lpid)
1018
1006
case mProc of
1019
1007
Nothing -> dispatch (isLocal node (ProcessIdentifier from))
1020
- from node (ProcessInfoNone DiedUnknownId )
1008
+ from (ProcessInfoNone DiedUnknownId )
1021
1009
Just proc -> do
1022
1010
itsLinks <- gets (^. linksFor them)
1023
1011
itsMons <- gets (^. monitorsFor them)
@@ -1027,7 +1015,6 @@ ncEffectGetInfo from pid =
1027
1015
let reg = registeredNames registered
1028
1016
dispatch (isLocal node (ProcessIdentifier from))
1029
1017
from
1030
- node
1031
1018
ProcessInfo {
1032
1019
infoNode = (processNodeId pid)
1033
1020
, infoRegisteredNames = reg
@@ -1038,16 +1025,11 @@ ncEffectGetInfo from pid =
1038
1025
where dispatch :: (Serializable a , Show a )
1039
1026
=> Bool
1040
1027
-> ProcessId
1041
- -> LocalNode
1042
1028
-> a
1043
1029
-> NC ()
1044
- dispatch True dest _ pInfo = postAsMessage dest $ pInfo
1045
- dispatch False dest node pInfo = do
1046
- liftIO $ sendMessage node
1047
- (NodeIdentifier (localNodeId node))
1048
- (ProcessIdentifier dest)
1049
- WithImplicitReconnect
1050
- pInfo
1030
+ dispatch True dest pInfo = postAsMessage dest $ pInfo
1031
+ dispatch False dest pInfo =
1032
+ ncSendToProcess dest $ unsafeCreateUnencodedMessage pInfo
1051
1033
1052
1034
registeredNames = Map. foldlWithKey (\ ks k v -> if v == pid
1053
1035
then (k: ks)
@@ -1094,14 +1076,10 @@ notifyDied dest src reason mRef = do
1094
1076
throwException dest $ PortLinkException pid reason
1095
1077
(False , _, _) ->
1096
1078
-- The change in sender comes from [Unified: Table 10]
1097
- liftIO $ sendBinary node
1098
- (NodeIdentifier $ localNodeId node)
1099
- (NodeIdentifier $ processNodeId dest)
1100
- WithImplicitReconnect
1101
- NCMsg
1102
- { ctrlMsgSender = NodeIdentifier (localNodeId node)
1103
- , ctrlMsgSignal = Died src reason
1104
- }
1079
+ ncSendToNode (processNodeId dest) $ NCMsg
1080
+ { ctrlMsgSender = NodeIdentifier (localNodeId node)
1081
+ , ctrlMsgSignal = Died src reason
1082
+ }
1105
1083
1106
1084
-- | [Unified: Table 8]
1107
1085
destNid :: ProcessSignal -> Maybe NodeId
0 commit comments