Skip to content

Commit 6deb13d

Browse files
Merge pull request haskell-distributed#263 from haskell-distributed/fd/more-unsafe-prims
Unsafe primitives for usend and nsendRemote.
2 parents deadc44 + bb52b3b commit 6deb13d

File tree

3 files changed

+51
-0
lines changed

3 files changed

+51
-0
lines changed

src/Control/Distributed/Process.hs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,10 @@ module Control.Distributed.Process
4040
, mergePortsRR
4141
-- * Unsafe messaging variants
4242
, unsafeSend
43+
, unsafeUSend
4344
, unsafeSendChan
4445
, unsafeNSend
46+
, unsafeNSendRemote
4547
, unsafeWrapMessage
4648
-- * Advanced messaging
4749
, Match
@@ -212,8 +214,10 @@ import Control.Distributed.Process.Internal.Primitives
212214
, mergePortsBiased
213215
, mergePortsRR
214216
, unsafeSend
217+
, unsafeUSend
215218
, unsafeSendChan
216219
, unsafeNSend
220+
, unsafeNSendRemote
217221
-- Advanced messaging
218222
, Match
219223
, receiveWait

src/Control/Distributed/Process/Internal/Primitives.hs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,10 @@ module Control.Distributed.Process.Internal.Primitives
2323
, mergePortsRR
2424
-- * Unsafe messaging variants
2525
, unsafeSend
26+
, unsafeUSend
2627
, unsafeSendChan
2728
, unsafeNSend
29+
, unsafeNSendRemote
2830
-- * Advanced messaging
2931
, Match
3032
, receiveWait
@@ -279,6 +281,13 @@ usend them msg = do
279281
else sendCtrlMsg (Just there) $ UnreliableSend (processLocalId them)
280282
(createMessage msg)
281283

284+
-- | /Unsafe/ variant of 'usend'. This function makes /no/ attempt to serialize
285+
-- the message when the destination process resides on the same local
286+
-- node. Therefore, a local receiver would need to be prepared to cope with any
287+
-- errors resulting from evaluation of the message.
288+
unsafeUSend :: Serializable a => ProcessId -> a -> Process ()
289+
unsafeUSend = Unsafe.usend
290+
282291
-- | Wait for a message of a specific type
283292
expect :: forall a. Serializable a => Process a
284293
expect = receiveWait [match return]
@@ -1176,6 +1185,13 @@ nsendRemote :: Serializable a => NodeId -> String -> a -> Process ()
11761185
nsendRemote nid label msg =
11771186
sendCtrlMsg (Just nid) (NamedSend label (createMessage msg))
11781187

1188+
-- | Named send to a process in a remote registry (asynchronous)
1189+
-- This function makes /no/ attempt to serialize and (in the case when the
1190+
-- destination process resides on the same local node) therefore ensure that
1191+
-- the payload is fully evaluated before it is delivered.
1192+
unsafeNSendRemote :: Serializable a => NodeId -> String -> a -> Process ()
1193+
unsafeNSendRemote = Unsafe.nsendRemote
1194+
11791195
--------------------------------------------------------------------------------
11801196
-- Closures --
11811197
--------------------------------------------------------------------------------

src/Control/Distributed/Process/UnsafePrimitives.hs

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@ module Control.Distributed.Process.UnsafePrimitives
4545
send
4646
, sendChan
4747
, nsend
48+
, nsendRemote
49+
, usend
4850
, wrapMessage
4951
) where
5052

@@ -56,6 +58,7 @@ import Control.Distributed.Process.Internal.Messaging
5658

5759
import Control.Distributed.Process.Internal.Types
5860
( ProcessId(..)
61+
, NodeId(..)
5962
, LocalNode(..)
6063
, LocalProcess(..)
6164
, Process(..)
@@ -65,6 +68,7 @@ import Control.Distributed.Process.Internal.Types
6568
, ImplicitReconnect(..)
6669
, SendPortId(..)
6770
, Message
71+
, createMessage
6872
, sendPortProcessId
6973
, unsafeCreateUnencodedMessage
7074
)
@@ -78,6 +82,14 @@ nsend :: Serializable a => String -> a -> Process ()
7882
nsend label msg =
7983
sendCtrlMsg Nothing (NamedSend label (unsafeCreateUnencodedMessage msg))
8084

85+
-- | Named send to a process in a remote registry (asynchronous)
86+
nsendRemote :: Serializable a => NodeId -> String -> a -> Process ()
87+
nsendRemote nid label msg = do
88+
proc <- ask
89+
if localNodeId (processNode proc) == nid
90+
then nsend label msg
91+
else sendCtrlMsg (Just nid) (NamedSend label (createMessage msg))
92+
8193
-- | Send a message
8294
send :: Serializable a => ProcessId -> a -> Process ()
8395
send them msg = do
@@ -96,6 +108,25 @@ send them msg = do
96108
unsafeSendLocal pid msg' =
97109
sendCtrlMsg Nothing $ LocalSend pid (unsafeCreateUnencodedMessage msg')
98110

111+
-- | Send a message unreliably.
112+
--
113+
-- Unlike 'send', this function is insensitive to 'reconnect'. It will
114+
-- try to send the message regardless of the history of connection failures
115+
-- between the nodes.
116+
--
117+
-- Message passing with 'usend' is ordered for a given sender and receiver
118+
-- if the messages arrive at all.
119+
--
120+
usend :: Serializable a => ProcessId -> a -> Process ()
121+
usend them msg = do
122+
proc <- ask
123+
let there = processNodeId them
124+
if localNodeId (processNode proc) == there
125+
then sendCtrlMsg Nothing $
126+
LocalSend them (unsafeCreateUnencodedMessage msg)
127+
else sendCtrlMsg (Just there) $ UnreliableSend (processLocalId them)
128+
(createMessage msg)
129+
99130
-- | Send a message on a typed channel
100131
sendChan :: Serializable a => SendPort a -> a -> Process ()
101132
sendChan (SendPort cid) msg = do

0 commit comments

Comments
 (0)