Module Requester.Make

Parameters

module Hash : HASH
module Disk_table : DISK_TABLE with type key := Hash.t
module Memory_table : MEMORY_TABLE with type key := Hash.t
module Request : REQUEST with type key := Hash.t
module Probe : PROBE with type key := Hash.t and type value := Disk_table.value

Signature

include REQUESTER with type key = Hash.t with type value = Disk_table.value with type param = Probe.param
type t
type key = Hash.t

The index key

type value = Disk_table.value

The indexed data

type param = Probe.param

validation clue

val known : t -> key -> bool Lwt.t

know t k returns true iff the key is present in the memory table or the disk.

type Mavryk_base.TzPervasives.error +=
  1. | Missing_data of key
type Mavryk_base.TzPervasives.error +=
  1. | Canceled of key
type Mavryk_base.TzPervasives.error +=
  1. | Timeout of key

Return value if it is found in-memory, or else on disk. Otherwise fail with error Missing_data.

val read_opt : t -> key -> value option Lwt.t

Same as read but returns None if not found.

val inject : t -> key -> value -> bool Lwt.t

inject t k v returns false if k is already present in the memory table or in the disk, or has already been requested. Otherwise it updates the memory table and return true

fetch t ?peer ?timeout k param returns the value when it is known. It can fail with Timeout k if timeout is provided and the value isn't know before the timeout expires. It can fail with Cancel if the request is canceled.

The key is first looked up in memory, then on disk. If not present and not already requested, it schedules a request, and blocks until the requester is notified with notify. param is used to validate the notified value once it is received. (see also PROBE and notify).

Requests are re-sent via a 1.5 exponential back-off, with initial delay set to Request.initial_delay. If the function is called multiple time with the same key but with distinct peers, the internal scheduler randomly chooses the requested peer (at each retry).

val clear_or_cancel : t -> key -> unit

Remove the data from the memory table if present. Otherwise decrements the number of watcher to this data. If their is only one watcher any pending fetch promises are resolved with the error Canceled.

type store = Disk_table.store

The "disk" storage

type request_param = Request.param

Configuration parameter to the Request service

type notified_value = Probe.notified_value

type of values notified to the requester

val pending : t -> key -> bool

pending t k returns true iff the key status is pending

val watch : t -> (key * value) Lwt_stream.t * Lwt_watcher.stopper

Monitor all the fetched data. A given data will appear only once.

val notify : t -> Mavryk_base.TzPervasives.P2p_peer.Id.t -> key -> notified_value -> unit Lwt.t

notify t peer k nv notifies the requester that a value has been received for key k, from peer peer. nv is a *notified value*. The notified value is validated using Probe.probe, and the param provided at fetching time (see PROBE). If valid, the memory table is updated and all promises waiting on this key are resolved.

If the key is not pending the notification is ignored.

val memory_table_length : t -> int

memory_table_length t returns the number of keys either known or pending in the memory table of t

val pending_requests : t -> int

Returns the number of requests currently pending

val create : ?random_table:bool -> ?global_input:(key * value) Lwt_watcher.input -> request_param -> store -> t

create ?random_table ?global_input r s creates a requester. r is the configuration parameter passed to Request functions.

The value for random_table determines whether the underlying hashtable randomises its hash (see Stdlib.Hashtbl.create and specifically the documentation of the random parameter). The default depends on environment variables and Stdlib.Hashtbl.randomize has been called.

val shutdown : t -> unit Lwt.t