Module Distributed_lwt.Make

Functor to create a module of type Distributed.Process given a message module M of type Distributed.Message_type and a custom logger module L of type CustomerLogger.

Parameters

Signature

exception Init_more_than_once
exception InvalidNode of Distributed.Node_id.t
exception Local_only_mode
type 'a t

The abstract monadic type representing a computation returning 'a.

type 'a io = 'a Lwt.t

Abstract type for monadic concurrent IO returning 'a.

type message_type = M.t

The abstract type representing the messages that will be sent between processes.

type 'a matcher_list

The abstract type representing a non-empty list of matchers to be used with receive function.

type monitor_ref

The abstract type representing a monitor_ref that is returned when a processes is monitored and can be used to unmonitor it.

type monitor_reason =
| Normal of Distributed.Process_id.t

Process terminated normally.

| Exception of Distributed.Process_id.t * exn

Process terminated with exception.

| UnkownNodeId of Distributed.Process_id.t * Distributed.Node_id.t

An operation failed because the remote node id is unknown.

| NoProcess of Distributed.Process_id.t

Attempted to monitor a process that does not exist.

Reason for process termination.

module Remote_config : sig ... end

The configuration of a node to be run as a remote node i.e., one that can both send an receive messages with other nodes.

module Local_config : sig ... end

The configuration of a node to be run as a local node i.e., one that can not send or receive messages with other nodes.

type node_config =
| Local of Local_config.t
| Remote of Remote_config.t

The configuration of a node. Can be one of node_config.Local or node_config.Remote.

val return : 'a -> 'a t

return v creates a computation returning v.

val (>>=) : 'a t -> ('a -> 'b t) -> 'b t

c >>= f is a computation which first waits for the computation c to terminate and then, if the computation succeeds, behaves as the application of function f to the return value of c. If the computation c fails, c >>= f also fails, with the same exception.

val fail : exn -> 'a t

fail e is a process that fails with the exception e.

val catch : (unit -> 'a t) -> (exn -> 'a t) -> 'a t

catch p f is a process that behaves as the process p () if this process succeeds. If the process p () fails with some exception, catch p f behaves as the application of f to this exception.

val spawn : ?⁠monitor:bool -> Distributed.Node_id.t -> (unit -> unit t) -> (Distributed.Process_id.t * monitor_ref option) t

spawn monitor name node_id process will spawn process on node_id returning the Distributed.Process_id.t associated with the newly spawned process. If monitor is true (default value is false) then the spawned process will also be monitored and the associated monitor_ref will be returned.

If node_id is an unknown node then InvalidNode exception is raised.

val case : (message_type -> (unit -> 'a t) option) -> 'a matcher_list

case match_fn will create a matcher_list which will use match_fn to match on potential messages. match_fn should return None to indicate no match or Some handler where handler is the function that should be called to handle the matching message.

val termination_case : (monitor_reason -> 'a t) -> 'a matcher_list

termination_case handler will create a matcher_list which can use used to match against termination_reason for a process that is being monitored. If this process is monitoring another process then providing this matcher in the list of matchers to receive will allow this process to act on the termination of the monitored process.

NOTE : when a remote process (i.e., one running on another node) raises an exception you will not be able to pattern match on the exception . This is a limitation of the Marshal OCaml module : " Values of extensible variant types, for example exceptions (of extensible type exn), returned by the unmarshaller should not be pattern-matched over through match ... with or try ... with, because unmarshalling does not preserve the information required for matching their constructors. Structural equalities with other extensible variant values does not work either. Most other uses such as Printexc.to_string, will still work as expected. "

See http://caml.inria.fr/pub/docs/manual-ocaml/libref/Marshal.html.

val (|.) : 'a matcher_list -> 'a matcher_list -> 'a matcher_list

a_matcher |. b_matcher is a matcher_list consiting of the matchers in a_matcher followed by the matchers in b_matcher.

val receive : ?⁠timeout_duration:float -> 'a matcher_list -> 'a option t

receive timeout matchers will wait for a message to be sent to this process which matches one of matchers provided in matchers. The first matching matcher in matchers will used process the matching message returning Some result where result is result of the matcher processing the matched message. All the other non-matching messages are left in the same order they came in.

If a time out is provided and no matching messages has arrived in the time out period then None will be returned.

If the matchers is empty then an Empty_matchers exception is raised.

val receive_loop : ?⁠timeout_duration:float -> bool matcher_list -> unit t

receive_loop timeout matchers is a convenience function which will loop until a matcher in matchers returns false.

val send : Distributed.Process_id.t -> message_type -> unit t

send process_id msg will send, asynchronously, message msg to the process with id process_id (possibly running on a remote node).

If process_id is resides on an unknown node then InvalidNode exception is raised.

If process_id is an unknown process but the node on which it resides is known then send will still succeed (i.e., will not raise any exceptions).

val (>!) : Distributed.Process_id.t -> message_type -> unit t

pid >! msg is equivalent to send pid msg. >! is an infix alias for send.

val broadcast : Distributed.Node_id.t -> message_type -> unit t

broadcast node_id msg will send, asynchronously, message msg to all the processes on node_id.

If node_id is an unknown node then InvalidNode exception is raised.

val monitor : Distributed.Process_id.t -> monitor_ref t

monitor pid will allows the calling process to monitor pid. When pid terminates (normally or abnormally) this monitoring process will receive a termination_reason message, which can be matched in receive using termination_matcher. A single process can be monitored my multiple processes.

If process_id is resides on an unknown node then InvalidNode exception is raised.

val unmonitor : monitor_ref -> unit t

unmonitor mref will cause this process to stop monitoring the process which is referenced by mref. If the current process is not monitoring the process referenced by mref then unmonitor is a no-op.

If process being unmonitored as indicated by monitor_ref is resides on an unknown node then InvalidNode exception is raised.

val get_self_pid : Distributed.Process_id.t t

get_self_pid process will return the process id associated with process.

val get_self_node : Distributed.Node_id.t t

get_self_node process will return the node id associated with process.

val get_remote_node : string -> Distributed.Node_id.t option t

get_remote_node node_name will return the node id associated with name, if there is no record of a node with name at this time then None is returned.

val get_remote_nodes : Distributed.Node_id.t list t

The list of all nodes currently active and inactive.

val add_remote_node : string -> int -> string -> Distributed.Node_id.t t

add_remote_node ip port name will connect to the remote node at ip:port with name name and add it to the current nodes list of connected remote nodes. The newly added node id is returned as the result. Adding a remote node that already exists is a no-op.

If the node is operating in local only mode then Local_only_mode is raised.

val remove_remote_node : Distributed.Node_id.t -> unit t

remove_remote_node node_id will remove node_id from the list of connected remote nodes.

If the node is operating in local only mode then Local_only_mode is raised.

val lift_io : 'a io -> 'a t

lift_io io lifts the io computation into the process.

val run_node : ?⁠process:(unit -> unit t) -> node_config -> unit io

run_node process node_monitor_fn node_config performs the necessary bootstrapping to start this node according to node_config. If provided, runs the initial process returning the resulting io.

If it's called more than once then an exception of Init_more_than_once is raised.