Module Super.D

include Distributed.Process with type 'a io = 'a Lwt.t
exception Init_more_than_once

Exception that is raised if run_node is called more than once.

exception InvalidNode of Distributed.Node_id.t

Exception that is raised when spawn, broadcast, monitor are called with an invalid node or if send is called with a process which resides on an unknown node.

exception Local_only_mode

Exception that is raised when add_remote_node or remove_remote_node is called on a node that is operating in local only mode.

type 'a t

The abstract monadic type representing a computation returning 'a.

type 'a io = 'a Lwt.t

Abstract type for monadic concurrent IO returning 'a.

type message_type

The abstract type representing the messages that will be sent between processes.

type 'a matcher_list

The abstract type representing a non-empty list of matchers to be used with receive function.

type monitor_ref

The abstract type representing a monitor_ref that is returned when a processes is monitored and can be used to unmonitor it.

type monitor_reason =
  1. | Normal of Distributed.Process_id.t
    (*

    Process terminated normally.

    *)
  2. | Exception of Distributed.Process_id.t * exn
    (*

    Process terminated with exception.

    *)
  3. | UnkownNodeId of Distributed.Process_id.t * Distributed.Node_id.t
    (*

    An operation failed because the remote node id is unknown.

    *)
  4. | NoProcess of Distributed.Process_id.t
    (*

    Attempted to monitor a process that does not exist.

    *)

Reason for process termination.

module Remote_config : sig ... end

The configuration of a node to be run as a remote node i.e., one that can both send an receive messages with other nodes.

module Local_config : sig ... end

The configuration of a node to be run as a local node i.e., one that can not send or receive messages with other nodes.

type node_config =
  1. | Local of Local_config.t
  2. | Remote of Remote_config.t

The configuration of a node. Can be one of node_config.Local or node_config.Remote.

val return : 'a -> 'a t

return v creates a computation returning v.

val (>>=) : 'a t -> ('a -> 'b t) -> 'b t

c >>= f is a computation which first waits for the computation c to terminate and then, if the computation succeeds, behaves as the application of function f to the return value of c. If the computation c fails, c >>= f also fails, with the same exception.

type proc_rep =
  1. | Fun of unit -> unit t
  2. | Registered of string
val register : string -> (Distributed.Process_id.t -> unit -> unit t) -> unit t
val fail : exn -> 'a t

fail e is a process that fails with the exception e.

val catch : (unit -> 'a t) -> (exn -> 'a t) -> 'a t

catch p f is a process that behaves as the process p () if this process succeeds. If the process p () fails with some exception, catch p f behaves as the application of f to this exception.

spawn monitor name node_id process will spawn process on node_id returning the Process_id.t associated with the newly spawned process. If monitor is true (default value is false) then the spawned process will also be monitored and the associated monitor_ref will be returned.

If node_id is an unknown node then InvalidNode exception is raised.

val case : (message_type -> (unit -> 'a t) option) -> 'a matcher_list

case match_fn will create a matcher_list which will use match_fn to match on potential messages. match_fn should return None to indicate no match or Some handler where handler is the function that should be called to handle the matching message.

val termination_case : (monitor_reason -> 'a t) -> 'a matcher_list

termination_case handler will create a matcher_list which can use used to match against termination_reason for a process that is being monitored. If this process is monitoring another process then providing this matcher in the list of matchers to receive will allow this process to act on the termination of the monitored process.

NOTE : when a remote process (i.e., one running on another node) raises an exception you will not be able to pattern match on the exception . This is a limitation of the Marshal OCaml module : " Values of extensible variant types, for example exceptions (of extensible type exn), returned by the unmarshaller should not be pattern-matched over through match ... with or try ... with, because unmarshalling does not preserve the information required for matching their constructors. Structural equalities with other extensible variant values does not work either. Most other uses such as Printexc.to_string, will still work as expected. "

See http://caml.inria.fr/pub/docs/manual-ocaml/libref/Marshal.html.

val (|.) : 'a matcher_list -> 'a matcher_list -> 'a matcher_list

a_matcher |. b_matcher is a matcher_list consiting of the matchers in a_matcher followed by the matchers in b_matcher.

val receive : ?timeout_duration:float -> 'a matcher_list -> 'a option t

receive timeout matchers will wait for a message to be sent to this process which matches one of matchers provided in matchers. The first matching matcher in matchers will used process the matching message returning Some result where result is result of the matcher processing the matched message. All the other non-matching messages are left in the same order they came in.

If a time out is provided and no matching messages has arrived in the time out period then None will be returned.

If the matchers is empty then an Empty_matchers exception is raised.

val receive_loop : ?timeout_duration:float -> bool matcher_list -> unit t

receive_loop timeout matchers is a convenience function which will loop until a matcher in matchers returns false.

send process_id msg will send, asynchronously, message msg to the process with id process_id (possibly running on a remote node).

If process_id is resides on an unknown node then InvalidNode exception is raised.

If process_id is an unknown process but the node on which it resides is known then send will still succeed (i.e., will not raise any exceptions).

pid >! msg is equivalent to send pid msg. >! is an infix alias for send.

val broadcast : Distributed.Node_id.t -> message_type -> unit t

broadcast node_id msg will send, asynchronously, message msg to all the processes on node_id.

If node_id is an unknown node then InvalidNode exception is raised.

monitor pid will allows the calling process to monitor pid. When pid terminates (normally or abnormally) this monitoring process will receive a termination_reason message, which can be matched in receive using termination_matcher. A single process can be monitored my multiple processes.

If process_id is resides on an unknown node then InvalidNode exception is raised.

val unmonitor : monitor_ref -> unit t

unmonitor mref will cause this process to stop monitoring the process which is referenced by mref. If the current process is not monitoring the process referenced by mref then unmonitor is a no-op.

If process being unmonitored as indicated by monitor_ref is resides on an unknown node then InvalidNode exception is raised.

val get_self_pid : Distributed.Process_id.t t

get_self_pid process will return the process id associated with process.

val get_self_node : Distributed.Node_id.t t

get_self_node process will return the node id associated with process.

val get_remote_node : string -> Distributed.Node_id.t option t

get_remote_node node_name will return the node id associated with name, if there is no record of a node with name at this time then None is returned.

val get_remote_nodes : Distributed.Node_id.t list t

The list of all nodes currently active and inactive.

val add_remote_node : string -> int -> string -> Distributed.Node_id.t t

add_remote_node ip port name will connect to the remote node at ip:port with name name and add it to the current nodes list of connected remote nodes. The newly added node id is returned as the result. Adding a remote node that already exists is a no-op.

If the node is operating in local only mode then Local_only_mode is raised.

val remove_remote_node : Distributed.Node_id.t -> unit t

remove_remote_node node_id will remove node_id from the list of connected remote nodes.

If the node is operating in local only mode then Local_only_mode is raised.

val lift_io : 'a io -> 'a t

lift_io io lifts the io computation into the process.

val run_node : ?process:(unit -> unit t) -> node_config -> unit io

run_node process node_monitor_fn node_config performs the necessary bootstrapping to start this node according to node_config. If provided, runs the initial process returning the resulting io.

If it's called more than once then an exception of Init_more_than_once is raised.

module M : Communication.Distributed_wrapper.Enriched_message_type with type 'a step = 'a Msg.step with type 'a request = 'a Msg.request with type 'a reply = 'a Msg.reply

Additional monadic interface

val let* : 'a t -> ('a -> 'b t) -> 'b t
val let+ : 'a t -> ('a -> 'b) -> 'b t
val mapM : ('a -> 'b t) -> 'a list -> 'b list t
val dmap : pids:Distributed.Process_id.t list -> request:('a -> index:int -> 'step M.request) -> reply:('step M.reply -> (unit -> 'b t) option) -> 'a list -> 'b list t

dmap ~pids ~request ~reply l sends requests built by applying request to the elements of l to the workers pids and waits to receive a valid reply from each worker.

val handle_request : Distributed.Process_id.t -> step:'step M.step -> handler:('step M.request -> (unit -> ('step M.reply * 'b) t) option) -> 'b t

handle_request master_pid ~setp ~handler l waits to receive a request for a given step, process it through handler and sends the reply to master_pid. The handler might also return some additional data ('b) that isn't meant to be sent back to the master, but rather kept by the worker for future computation.