Module Worker.GS

The gossipsub automaton that will be used by the worker.

The gossipsub automaton that will be used by the worker.

module Peer = C.GS.Peer

Module for peer

module Topic = C.GS.Topic

Module for topic

module Message_id = C.GS.Message_id

Module for message_id

module Message = C.GS.Message

Module for message

module Time = C.GS.Time

Module for time

module Span = C.GS.Span

Module for time duration

module Score = C.GS.Score

Module for peers scores

type message = Message.t
type span = Span.t
type state = C.GS.state

The state managed by the gossipsub automaton. The state is purely functional.

Limits of the gossipsub protocol.

Parameters of the gossipsub protocol.

The types of payloads for inputs to the gossipsub automaton.

type add_peer = C.GS.add_peer = {
  1. direct : bool;
  2. outbound : bool;
  3. peer : Peer.t;
}
type remove_peer = C.GS.remove_peer = {
  1. peer : Peer.t;
}
type ihave = C.GS.ihave = {
  1. peer : Peer.t;
  2. topic : Topic.t;
  3. message_ids : Message_id.t list;
}
type iwant = C.GS.iwant = {
  1. peer : Peer.t;
  2. message_ids : Message_id.t list;
}
type graft = C.GS.graft = {
  1. peer : Peer.t;
  2. topic : Topic.t;
}
type prune = C.GS.prune = {
  1. peer : Peer.t;
  2. topic : Topic.t;
  3. px : Peer.t Mavryk_base.TzPervasives.Seq.t;
  4. backoff : span;
}
type publish_message = C.GS.publish_message = {
  1. topic : Topic.t;
  2. message_id : Message_id.t;
  3. message : message;
}
type receive_message = C.GS.receive_message = {
  1. sender : Peer.t;
  2. topic : Topic.t;
  3. message_id : Message_id.t;
  4. message : message;
}
type join = C.GS.join = {
  1. topic : Topic.t;
}
type leave = C.GS.leave = {
  1. topic : Topic.t;
}
type subscribe = C.GS.subscribe = {
  1. topic : Topic.t;
  2. peer : Peer.t;
}
type unsubscribe = C.GS.unsubscribe = {
  1. topic : Topic.t;
  2. peer : Peer.t;
}
type set_application_score = C.GS.set_application_score = {
  1. peer : Peer.t;
  2. score : float;
}
type _ output = _ C.GS.output =
  1. | Ihave_from_peer_with_low_score : {
    1. score : Score.t;
    2. threshold : float;
    } -> [ `IHave ] output
    (*

    The peer who sent an IHave message has a score below threshold.

    *)
  2. | Too_many_recv_ihave_messages : {
    1. count : int;
    2. max : int;
    } -> [ `IHave ] output
    (*

    The peer sent us more than max IHave messages within two successive heartbeat calls.

    *)
  3. | Too_many_sent_iwant_messages : {
    1. count : int;
    2. max : int;
    } -> [ `IHave ] output
    (*

    We sent more than max IWant messages to this peer within two successive heartbeat calls.

    *)
  4. | Message_topic_not_tracked : [ `IHave ] output
    (*

    We received an IHave message for a topic we don't track.

    *)
  5. | Message_requested_message_ids : Message_id.t list -> [ `IHave ] output
    (*

    The messages ids we want to request from the peer which sent us an IHave message. The implementation honors the max_sent_iwant_per_heartbeat limit.

    *)
  6. | Invalid_message_id : [ `IHave ] output
    (*

    A message id received via IHave message is invalid.

    *)
  7. | Iwant_from_peer_with_low_score : {
    1. score : Score.t;
    2. threshold : float;
    } -> [ `IWant ] output
    (*

    The peer who sent an IWant message has a score below threshold.

    *)
  8. | On_iwant_messages_to_route : {
    1. routed_message_ids : [ `Ignored | `Not_found | `Too_many_requests | `Message of message ] Message_id.Map.t;
    } -> [ `IWant ] output
    (*

    As an answer for an `IWant message, the automaton returns a map associating to each requested message_id either `Ignored if the peer is filtered out by peer_filter, `Not_found if the message is not found, or Message m if m is the message with the given id.

    *)
  9. | Peer_filtered : [ `Graft ] output
    (*

    The peer we attempt to graft has not been selected by peer_filter.

    *)
  10. | Unsubscribed_topic : [ `Graft ] output
    (*

    We didn't join the topic for which we are attempting to graft a peer.

    *)
  11. | Peer_already_in_mesh : [ `Graft ] output
    (*

    Attempting to graft a peer which has already been grafted.

    *)
  12. | Grafting_direct_peer : [ `Graft ] output
    (*

    Attempting to graft a direct peer.

    *)
  13. | Unexpected_grafting_peer : [ `Graft ] output
    (*

    The peer we attempt to graft is not known.

    *)
  14. | Grafting_peer_with_negative_score : [ `Graft ] output
    (*

    Attempting to graft a peer with a negative score.

    *)
  15. | Grafting_successfully : [ `Graft ] output
    (*

    Grafting the given peer for the provided topic succeeded.

    *)
  16. | Peer_backed_off : [ `Graft ] output
    (*

    We cannot graft the given peer because it is backed off.

    *)
  17. | Mesh_full : [ `Graft ] output
    (*

    Grafting a peer for a topic whose mesh has already sufficiently many peers.

    *)
  18. | Prune_topic_not_tracked : [ `Prune ] output
    (*

    Attempting to prune a peer for a non-tracked topic.

    *)
  19. | Peer_not_in_mesh : [ `Prune ] output
    (*

    Attempting to prune a peer which is not in the mesh.

    *)
  20. | Ignore_PX_score_too_low : Score.t -> [ `Prune ] output
    (*

    The given peer has been pruned for the given topic, but no alternative peers are returned because the peer's score is too low. The score of the peer is included in the return value.

    *)
  21. | No_PX : [ `Prune ] output
    (*

    The given peer has been pruned for the given topic. No alternatives peers was provided in prune.

    *)
  22. | PX : Peer.Set.t -> [ `Prune ] output
    (*

    The given peer has been pruned for the given topic. The given set of peers alternatives in prune for that topic is returned.

    *)
  23. | Publish_message : {
    1. to_publish : Peer.Set.t;
    } -> [ `Publish_message ] output
    (*

    to_publish contains:

    • Direct peers for the message's topic;
    • The peers in the topic's mesh, if the peer is subscribed to the topic. Otherwise, the peers in the topic's fanout.
    *)
  24. | Already_published : [ `Publish_message ] output
    (*

    Attempting to publish a message that has already been published.

    *)
  25. | Route_message : {
    1. to_route : Peer.Set.t;
    } -> [ `Receive_message ] output
    (*

    to_route contains:

    • Direct peers for the message's topic;
    • The peers in the topic's mesh minus the original sender of the message.
    *)
  26. | Already_received : [ `Receive_message ] output
    (*

    Received a message that has already been recevied before.

    *)
  27. | Not_subscribed : [ `Receive_message ] output
    (*

    Received a message from a remote peer for a topic we are not subscribed to (called "unknown topic" in the Go implementation).

    *)
  28. | Invalid_message : [ `Receive_message ] output
  29. | Unknown_validity : [ `Receive_message ] output
    (*

    Attempting to publish a message that is invalid.

    *)
  30. | Already_joined : [ `Join ] output
    (*

    Attempting to join a topic we already joined.

    *)
  31. | Joining_topic : {
    1. to_graft : Peer.Set.t;
    } -> [ `Join ] output
    (*

    When successfully joining a topic, the set of grafted peers for that topic is returned.

    *)
  32. | Not_joined : [ `Leave ] output
    (*

    Attempting to leave a topic which we didn't join or had already left.

    *)
  33. | Leaving_topic : {
    1. to_prune : Peer.Set.t;
    2. noPX_peers : Peer.Set.t;
    } -> [ `Leave ] output
    (*

    When successfully leaving a topic, the set of pruned peers for that topic is returned alongside a subset of those peers for which no alternative PX will be proposed.

    *)
  34. | Heartbeat : {
    1. to_graft : Topic.Set.t Peer.Map.t;
      (*

      The set of topics per peer that have been grafted.

      *)
    2. to_prune : Topic.Set.t Peer.Map.t;
      (*

      The set of topics per peer that have been pruned.

      *)
    3. noPX_peers : Peer.Set.t;
      (*

      Set of peers for which peer exchange (PX) will not be proposed.

      *)
    } -> [ `Heartbeat ] output
  35. | Peer_added : [ `Add_peer ] output
    (*

    The output returned when successfully adding a peer.

    *)
  36. | Peer_already_known : [ `Add_peer ] output
    (*

    The output returned when attempting to add a peer which is already known.

    *)
  37. | Removing_peer : [ `Remove_peer ] output
    (*

    The output returned when successfully removing a peer.

    *)
  38. | Subscribed : [ `Subscribe ] output
    (*

    The output returned once we successfully processed a subscribe request sent from a peer.

    *)
  39. | Subscribe_to_unknown_peer : [ `Subscribe ] output
    (*

    The output returned when we receive a subscribe message from a peer we don't know.

    *)
  40. | Unsubscribed : [ `Unsubscribe ] output
    (*

    The output returned once we successfully processed an unsubscribe request sent from a peer.

    *)
  41. | Unsubscribe_from_unknown_peer : [ `Unsubscribe ] output
    (*

    The output returned when we receive an unsubscribe message from a peer we don't know.

    *)
  42. | Set_application_score : [ `Set_application_score ] output
    (*

    The output returned when we set the application score of a peer

    *)

Output produced by one of the actions below.

type 'a monad := state -> state * 'a output

A type alias for the state monad.

val make : Stdlib.Random.State.t -> limits -> parameters -> state

Initialise a state.

val add_peer : add_peer -> [ `Add_peer ] monad

add_peer { direct; outbound; peer } is called to notify a new connection. If direct is true, the gossipsub always forwards messages to those peers. outbound is true if it is an outbound connection, that is, a connection initiated by the local (not the remote) peer. Note however that the notion of "outbound" connections can be refined, relaxed or redefined by the application layer to fit its own needs.

val remove_peer : remove_peer -> [ `Remove_peer ] monad

remove_peer { peer } notifies gossipsub that we are disconnected from a peer. Do note that the state still maintain information for this connection for retain_duration seconds.

val handle_subscribe : subscribe -> [ `Subscribe ] monad

handle_subscribe {topic; peer} handles a request from a remote peer informing us that it is subscribed to topic.

val handle_unsubscribe : unsubscribe -> [ `Unsubscribe ] monad

handle_unsubscribe {topic; peer} handles a request from a remote peer informing us that it unsubscribed from topic.

val handle_ihave : ihave -> [ `IHave ] monad

handle_ihave { peer; topic; message_ids } handles the gossip message IHave emitted by peer for topic with the message_ids.

val handle_iwant : iwant -> [ `IWant ] monad

handle_iwant { peer; message_ids } handles the gossip message IWant emitted by peer for topic with the message_ids.

val handle_graft : graft -> [ `Graft ] monad

handle_graft { peer; topic } handles the gossip message Graft emitted by peer for topic. This action allows to graft a connection to a full connection allowing the transmission of full messages for the given topic.

val handle_prune : prune -> [ `Prune ] monad

handle_prune { peer; topic; px; backoff } handles the gossip message Prune emitted by peer for topic. This action allows to prune a full connection. In that case, the remote peer can send a list of peers to connect to as well as a backoff time, which is a duration for which we cannot Graft this peer on this topic.

val handle_receive_message : receive_message -> [ `Receive_message ] monad

handle_receive_message { sender; topic; message_id; message } handles a message received from sender on the gossip network. The function returns a set of peers to which the (full) message will be directly forwarded.

val publish_message : publish_message -> [ `Publish_message ] monad

publish { topic; message_id; message } allows to publish a message on the gossip network from the local node. The function returns a set of peers to which the (full) message will be directly forwarded.

val heartbeat : [ `Heartbeat ] monad

heartbeat executes the heartbeat routine of the algorithm.

val join : join -> [ `Join ] monad

join { topic } handles a join to a new topic. On success, the function returns the set of peers that have been grafted to form the mesh of the joined topic.

val leave : leave -> [ `Leave ] monad

leave { topic } handles a leave from a topic. On success, the function returns the set of peers, forming the mesh, that have been pruned for that topic.

val set_application_score : set_application_score -> [ `Set_application_score ] monad

set_application_score {peer; score} handles setting the application score of peer. If the peer is not known, this does nothing.

val select_px_peers : state -> peer_to_prune:Peer.t -> Topic.t -> noPX_peers:Peer.Set.t -> Peer.t list

Select random peers for Peer eXchange. Note that function is deterministic; however, it has side effects in that it updates the state's random state.

val select_gossip_messages : state -> ihave list

Select the gossip messages to be sent. These are IHave control messages referring to recently seen messages (that is, sent during the last history_gossip_length heartbeat ticks), to be sent to a random selection of peers. The message ids for a peer and a topic are also selected at random among the possible ones. At most max_sent_iwant_per_heartbeat message ids are sent.

The local peer will send gossip to at most gossip_factor * (total number of non-mesh/non-fanout peers), or degree_lazy random peers, whichever is greater.

Note that function is deterministic; however, it has side effects in that it updates the state's random state.

val pp_add_peer : Stdlib.Format.formatter -> add_peer -> unit
val pp_remove_peer : Stdlib.Format.formatter -> remove_peer -> unit
val pp_ihave : Stdlib.Format.formatter -> ihave -> unit
val pp_iwant : Stdlib.Format.formatter -> iwant -> unit
val pp_graft : Stdlib.Format.formatter -> graft -> unit
val pp_prune : Stdlib.Format.formatter -> prune -> unit
val pp_receive_message : Stdlib.Format.formatter -> receive_message -> unit
val pp_publish_message : Stdlib.Format.formatter -> publish_message -> unit
val pp_join : Stdlib.Format.formatter -> join -> unit
val pp_leave : Stdlib.Format.formatter -> leave -> unit
val pp_subscribe : Stdlib.Format.formatter -> subscribe -> unit
val pp_unsubscribe : Stdlib.Format.formatter -> unsubscribe -> unit
val pp_set_application_score : Stdlib.Format.formatter -> set_application_score -> unit
val pp_output : Stdlib.Format.formatter -> 'a output -> unit
module Introspection = C.GS.Introspection