Module MakeGroup.MakeWorker

Parameters

Signature

module Name = Name
module Request = Request
module Types = Types
type 'kind t

A handle to a specific worker, parameterized by the type of internal message buffer.

type 'kind table

A handle to a table of workers.

type 'a queue

Internal buffer kinds used as parameters to t.

and bounded
and infinite
type dropbox
type 'a message_error =
  1. | Closed of Mavryk_base.TzPervasives.error list option
  2. | Request_error of 'a
  3. | Any of exn

An error returned when waiting for a message pushed to the worker. Closed errs is returned if the worker is terminated or has crashed. If the worker is terminated, errs is an empty list. Request_error err is returned if the request failed with an error. Any exn is returned if the request failed with an exception.

type _ buffer_kind =
  1. | Queue : infinite queue buffer_kind
  2. | Bounded : {
    1. size : int;
    } -> bounded queue buffer_kind
  3. | Dropbox : {
    1. merge : dropbox t -> any_request -> any_request option -> any_request option;
    } -> dropbox buffer_kind

Supported kinds of internal buffers.

and any_request =
  1. | Any_request : (_, _) Request.t -> any_request
val create_table : 'kind buffer_kind -> 'kind table

Create a table of workers.

module type HANDLERS = sig ... end

The callback handlers specific to each worker instance.

val launch : 'kind table -> ?timeout:Mavryk_base.Time.System.Span.t -> Name.t -> Types.parameters -> (module HANDLERS with type launch_error = 'launch_error and type self = 'kind t) -> ('kind t, 'launch_error) Stdlib.result Lwt.t

Creates a new worker instance. Parameter queue_size not passed means unlimited queue.

val shutdown : _ t -> unit Lwt.t

Triggers a worker termination and waits for its completion. Cannot be called from within the handlers.

module type BOX = sig ... end

The following interface are common elements of multiple modules below. They are used to minimize repetition.

module type QUEUE = sig ... end
module Dropbox : sig ... end
module Queue : sig ... end
val canceler : _ t -> Lwt_canceler.t

Exports the canceler to allow cancellation of other tasks when this worker is shutdown or when it dies.

val trigger_shutdown : _ t -> unit

Triggers a worker termination.

val state : _ t -> Types.state

Access the internal state, once initialized.

val with_state : _ t -> (Types.state -> (unit, 'request_error) Stdlib.result Lwt.t) -> (unit, 'request_error) Stdlib.result Lwt.t

with_state w f calls f on the current state of worker w if it was intialized and not closed or crashed, otherwise returns immediately.

val pending_requests : _ queue t -> (Mavryk_base.Time.System.t * Request.view) list

Introspect the message queue, gives the times requests were pushed.

Get the running status of a worker.

val current_request : _ t -> (Mavryk_base.Time.System.t * Mavryk_base.Time.System.t * Request.view) option

Get the request being treated by a worker. Gives the time the request was pushed, and the time its treatment started.

val list : 'a table -> (Name.t * 'a t) list

Lists the running workers in this group.

val find_opt : 'a table -> Name.t -> 'a t option

find_opt table n is Some worker if the worker is in the table and has name n.