Skip to contents

R6 abstract class to build other subclasses which launch and manage workers.

See also

Other launcher: crew_launcher()

Active bindings

workers

Data frame of worker information.

name

Name of the launcher.

seconds_interval

See crew_launcher().

seconds_timeout

See crew_launcher().

seconds_launch

See crew_launcher().

seconds_idle

See crew_launcher().

seconds_wall

See crew_launcher().

tasks_max

See crew_launcher().

tasks_timers

See crew_launcher().

reset_globals

See crew_launcher().

reset_packages

See crew_launcher().

reset_options

See crew_launcher().

garbage_collection

See crew_launcher().

crashes_error

See crew_launcher().

tls

See crew_launcher().

processes

See crew_launcher(). asynchronously.

r_arguments

See crew_launcher().

options_metrics

See crew_launcher().

async

A crew_async() object to run low-level launcher tasks asynchronously.

throttle

A crew_throttle() object to throttle scaling.

Methods


Method new()

Launcher constructor.

Usage

crew_class_launcher$new(
  name = NULL,
  seconds_interval = NULL,
  seconds_timeout = NULL,
  seconds_launch = NULL,
  seconds_idle = NULL,
  seconds_wall = NULL,
  seconds_exit = NULL,
  tasks_max = NULL,
  tasks_timers = NULL,
  reset_globals = NULL,
  reset_packages = NULL,
  reset_options = NULL,
  garbage_collection = NULL,
  crashes_error = NULL,
  launch_max = NULL,
  tls = NULL,
  processes = NULL,
  r_arguments = NULL,
  options_metrics = NULL
)

Arguments

name

See crew_launcher().

seconds_interval

See crew_launcher().

seconds_timeout

See crew_launcher().

seconds_launch

See crew_launcher().

seconds_idle

See crew_launcher().

seconds_wall

See crew_launcher().

seconds_exit

See crew_launcher().

tasks_max

See crew_launcher().

tasks_timers

See crew_launcher().

reset_globals

See crew_launcher().

reset_packages

See crew_launcher().

reset_options

See crew_launcher().

garbage_collection

See crew_launcher().

crashes_error

See crew_launcher().

launch_max

Deprecated.

tls

See crew_launcher().

processes

See crew_launcher().

r_arguments

See crew_launcher().

options_metrics

See crew_launcher().

Returns

An R6 object with the launcher.

Examples

if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) {
client <- crew_client()
client$start()
launcher <- crew_launcher_local(name = client$name)
launcher$start(workers = client$workers)
launcher$launch(index = 1L)
m <- mirai::mirai("result", .compute = client$name)
Sys.sleep(0.25)
m$data
client$terminate()
}


Method validate()

Validate the launcher.

Usage

crew_class_launcher$validate()

Returns

NULL (invisibly).


Method set_name()

Set the name of the launcher.

Usage

crew_class_launcher$set_name(name)

Arguments

name

Character of length 1, name to set for the launcher.

Returns

NULL (invisibly).


Method settings()

List of arguments for mirai::daemon().

Usage

crew_class_launcher$settings(socket)

Arguments

socket

Character of length 1, websocket address of the worker to launch.

Returns

List of arguments for mirai::daemon().


Method call()

Create a call to crew_worker() to help create custom launchers.

Usage

crew_class_launcher$call(socket, launcher, worker, instance)

Arguments

socket

Socket where the worker will receive tasks.

launcher

Character of length 1, name of the launcher.

worker

Positive integer of length 1, index of the worker. This worker index remains the same even when the current instance of the worker exits and a new instance launches.

instance

Character of length 1 to uniquely identify the instance of the worker.

Returns

Character of length 1 with a call to crew_worker().

Examples

launcher <- crew_launcher_local()
launcher$call(
  socket = "ws://127.0.0.1:5000/3/cba033e58",
  launcher = "launcher_a",
  worker = 3L,
  instance = "cba033e58"
)


Method start()

Start the launcher.

Usage

crew_class_launcher$start(sockets = NULL)

Arguments

sockets

For testing purposes only.

Details

Creates the workers data frame. Meant to be called once at the beginning of the launcher life cycle, after the client has started.

Returns

NULL (invisibly).


Method terminate()

Terminate the whole launcher, including all workers.

Usage

crew_class_launcher$terminate()

Returns

NULL (invisibly).


Method summary()

Summarize the workers.

Usage

crew_class_launcher$summary()

Returns

NULL if the launcher is not started. Otherwise, a tibble with one row per crew worker and the following columns:

  • worker: integer index of the worker.

  • launches: number of times the worker was launched. Each launch occurs at a different websocket because the token at the end of the URL is rotated before each new launch.

  • online: logical vector, whether the current instance of each worker was actively connected to its NNG socket during the time of the last call to tally().

  • discovered: logical vector, whether the current instance of each worker had connected to its NNG socket at some point (and then possibly disconnected) during the time of the last call to tally().

  • assigned: cumulative number of tasks assigned, reported by mirai::daemons() and summed over all completed instances of the worker. Does not reflect the activity of the currently running instance of the worker.

  • complete: cumulative number of tasks completed, reported by mirai::daemons() and summed over all completed instances of the worker. Does not reflect the activity of the currently running instance of the worker.

  • crashes: number of consecutive times a worker launched without completing all its assigned tasks.


Method tally()

Update the daemons-related columns of the internal workers data frame.

Usage

crew_class_launcher$tally(daemons = NULL)

Arguments

daemons

mirai daemons matrix. For testing only. Users should not set this.

Returns

NULL (invisibly).


Method unlaunched()

Get indexes of unlaunched workers.

Usage

crew_class_launcher$unlaunched(n = Inf)

Arguments

n

Maximum number of worker indexes to return.

Details

A worker is "unlaunched" if it has never connected to the current instance of its websocket. Once a worker launches with the launch() method, it is considered "launched" until it disconnects and its websocket is rotated with rotate().

Returns

Integer index of workers available for launch. The backlogged workers are listed first. A worker is backlogged if it is assigned more tasks than it completed.


Method booting()

Get workers that may still be booting up.

Usage

crew_class_launcher$booting()

Details

A worker is "booting" if its launch time is within the last seconds_launch seconds. seconds_launch is a configurable grace period when crew allows a worker to start up and connect to the mirai dispatcher. The booting() function does not know about the actual worker connection status, it just knows about launch times, so it may return TRUE for workers that have already connected and started doing tasks.


Method active()

Get active workers.

Usage

crew_class_launcher$active()

Details

A worker is "active" if its current instance is online and connected, or if it is within its booting time window and has never connected. In other words, "active" means online | (!discovered & booting).

Returns

Logical vector with TRUE for active workers and FALSE for inactive ones.


Method done()

Get done workers.

Usage

crew_class_launcher$done()

Details

A worker is "done" if it is launched and inactive. A worker is "launched" if launch() was called and the worker websocket has not been rotated since.

Returns

Integer index of inactive workers.


Method rotate()

Usage

crew_class_launcher$rotate()

Details

Rotate websockets at all unlaunched workers and throw an error if a worker launched at least crashes_error times in a row without completing all its assigned tasks.

Returns

NULL (invisibly).


Method launch()

Launch a worker.

Usage

crew_class_launcher$launch(index)

Arguments

index

Positive integer of length 1, index of the worker to launch.

Returns

NULL (invisibly).


Method forward()

Forward an asynchronous launch/termination error condition of a worker.

Usage

crew_class_launcher$forward(index, condition = "error")

Arguments

index

Integer of length 1, index of the worker to inspect.

condition

Character of length 1 indicating what to do with an error if found. "error" to throw an error, "warning" to throw a warning, "message" to print a message, and "character" to return a character vector of specific task-level error messages. The return value is NULL if no error is found.

Returns

Throw an error, throw a warning, or return a character string, depending on the condition argument.


Method errors()

Collect and return the most recent error messages from asynchronous worker launching and termination.

Usage

crew_class_launcher$errors()

Returns

Character vector of all the most recent error messages from asynchronous worker launching and termination. NULL if there are no errors.


Method wait()

Wait for any local asynchronous launch or termination tasks to complete.

Usage

crew_class_launcher$wait()

Details

Only relevant if processes is a positive integer.

Returns

NULL (invisibly).


Method scale()

Auto-scale workers out to meet the demand of tasks.

Usage

crew_class_launcher$scale(demand, throttle = TRUE)

Arguments

demand

Number of unresolved tasks.

throttle

TRUE to skip auto-scaling if it already happened within the last seconds_interval seconds. FALSE to auto-scale every time scale() is called. Throttling avoids overburdening the mirai dispatcher and other resources.

Returns

NULL (invisibly)


Method launch_worker()

Abstract worker launch method.

Usage

crew_class_launcher$launch_worker(call, name, launcher, worker, instance)

Arguments

call

Character of length 1 with a namespaced call to crew_worker() which will run in the worker and accept tasks.

name

Character of length 1 with an informative worker name.

launcher

Character of length 1, name of the launcher.

worker

Positive integer of length 1, index of the worker. This worker index remains the same even when the current instance of the worker exits and a new instance launches. It is always between 1 and the maximum number of concurrent workers.

instance

Character of length 1 to uniquely identify the current instance of the worker a the index in the launcher.

Details

Launcher plugins will overwrite this method.

Returns

A handle to mock the worker launch.


Method crashes()

Return the number of consecutive times a worker launched without completing all its assigned tasks.

Usage

crew_class_launcher$crashes(index)

Arguments

index

Non-negative integer, index of the worker pointing to a row of the data frame output of the summary() method of the launcher.

Returns

Non-negative integer, number of consecutive times a worker launched without completing all its assigned tasks.


Method terminate_worker()

Abstract worker termination method.

Usage

crew_class_launcher$terminate_worker(handle)

Arguments

handle

A handle object previously returned by launch_worker() which allows the termination of the worker.

Details

Launcher plugins will overwrite this method.

Returns

A handle to mock worker termination.


Method terminate_workers()

Terminate one or more workers.

Usage

crew_class_launcher$terminate_workers(index = NULL)

Arguments

index

Integer vector of the indexes of the workers to terminate. If NULL, all current workers are terminated.

Returns

NULL (invisibly).

Examples

if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) {
client <- crew_client()
client$start()
launcher <- crew_launcher_local(name = client$name)
launcher$start(workers = client$workers)
launcher$launch(index = 1L)
m <- mirai::mirai("result", .compute = client$name)
Sys.sleep(0.25)
m$data
client$terminate()
}

## ------------------------------------------------
## Method `crew_class_launcher$new`
## ------------------------------------------------

if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) {
client <- crew_client()
client$start()
launcher <- crew_launcher_local(name = client$name)
launcher$start(workers = client$workers)
launcher$launch(index = 1L)
m <- mirai::mirai("result", .compute = client$name)
Sys.sleep(0.25)
m$data
client$terminate()
}

## ------------------------------------------------
## Method `crew_class_launcher$call`
## ------------------------------------------------

launcher <- crew_launcher_local()
launcher$call(
  socket = "ws://127.0.0.1:5000/3/cba033e58",
  launcher = "launcher_a",
  worker = 3L,
  instance = "cba033e58"
)
#> [1] "crew::crew_worker(settings = list(url = \"ws://127.0.0.1:5000/3/cba033e58\", asyncdial = FALSE, autoexit = 15L, cleanup = 1L, output = TRUE, maxtasks = Inf, idletime = Inf, walltime = Inf, timerstart = 0L, tls = NULL, rs = NULL), launcher = \"launcher_a\", worker = 3L, instance = \"cba033e58\", options_metrics = crew::crew_options_metrics(path = NULL, seconds_interval = 5))"