R6
abstract class to build other subclasses
which launch and manage workers.
See also
Other launcher:
crew_launcher()
Active bindings
workers
Data frame of worker information.
name
Name of the launcher.
seconds_interval
See
crew_launcher()
.seconds_timeout
See
crew_launcher()
.seconds_launch
See
crew_launcher()
.seconds_idle
See
crew_launcher()
.seconds_wall
See
crew_launcher()
.tasks_max
See
crew_launcher()
.tasks_timers
See
crew_launcher()
.reset_globals
See
crew_launcher()
.reset_packages
See
crew_launcher()
.reset_options
See
crew_launcher()
.garbage_collection
See
crew_launcher()
.crashes_error
See
crew_launcher()
.tls
See
crew_launcher()
.processes
See
crew_launcher()
. asynchronously.r_arguments
See
crew_launcher()
.options_metrics
See
crew_launcher()
.async
A
crew_async()
object to run low-level launcher tasks asynchronously.throttle
A
crew_throttle()
object to throttle scaling.
Methods
Method new()
Launcher constructor.
Usage
crew_class_launcher$new(
name = NULL,
seconds_interval = NULL,
seconds_timeout = NULL,
seconds_launch = NULL,
seconds_idle = NULL,
seconds_wall = NULL,
seconds_exit = NULL,
tasks_max = NULL,
tasks_timers = NULL,
reset_globals = NULL,
reset_packages = NULL,
reset_options = NULL,
garbage_collection = NULL,
crashes_error = NULL,
launch_max = NULL,
tls = NULL,
processes = NULL,
r_arguments = NULL,
options_metrics = NULL
)
Arguments
name
See
crew_launcher()
.seconds_interval
See
crew_launcher()
.seconds_timeout
See
crew_launcher()
.seconds_launch
See
crew_launcher()
.seconds_idle
See
crew_launcher()
.seconds_wall
See
crew_launcher()
.seconds_exit
See
crew_launcher()
.tasks_max
See
crew_launcher()
.tasks_timers
See
crew_launcher()
.reset_globals
See
crew_launcher()
.reset_packages
See
crew_launcher()
.reset_options
See
crew_launcher()
.garbage_collection
See
crew_launcher()
.crashes_error
See
crew_launcher()
.launch_max
Deprecated.
tls
See
crew_launcher()
.processes
See
crew_launcher()
.r_arguments
See
crew_launcher()
.options_metrics
See
crew_launcher()
.
Examples
if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) {
client <- crew_client()
client$start()
launcher <- crew_launcher_local(name = client$name)
launcher$start(workers = client$workers)
launcher$launch(index = 1L)
m <- mirai::mirai("result", .compute = client$name)
Sys.sleep(0.25)
m$data
client$terminate()
}
Method settings()
List of arguments for mirai::daemon()
.
Returns
List of arguments for mirai::daemon()
.
Method call()
Create a call to crew_worker()
to
help create custom launchers.
Arguments
socket
Socket where the worker will receive tasks.
launcher
Character of length 1, name of the launcher.
worker
Positive integer of length 1, index of the worker. This worker index remains the same even when the current instance of the worker exits and a new instance launches.
instance
Character of length 1 to uniquely identify the instance of the worker.
Returns
Character of length 1 with a call to crew_worker()
.
Examples
launcher <- crew_launcher_local()
launcher$call(
socket = "ws://127.0.0.1:5000/3/cba033e58",
launcher = "launcher_a",
worker = 3L,
instance = "cba033e58"
)
Method start()
Start the launcher.
Method summary()
Summarize the workers.
Returns
NULL
if the launcher is not started. Otherwise, a tibble
with one row per crew
worker and the following columns:
worker
: integer index of the worker.launches
: number of times the worker was launched. Each launch occurs at a different websocket because the token at the end of the URL is rotated before each new launch.online
: logical vector, whether the current instance of each worker was actively connected to its NNG socket during the time of the last call totally()
.discovered
: logical vector, whether the current instance of each worker had connected to its NNG socket at some point (and then possibly disconnected) during the time of the last call totally()
.assigned
: cumulative number of tasks assigned, reported bymirai::daemons()
and summed over all completed instances of the worker. Does not reflect the activity of the currently running instance of the worker.complete
: cumulative number of tasks completed, reported bymirai::daemons()
and summed over all completed instances of the worker. Does not reflect the activity of the currently running instance of the worker.crashes
: number of consecutive times a worker launched without completing all its assigned tasks.
Method unlaunched()
Get indexes of unlaunched workers.
Method booting()
Get workers that may still be booting up.
Details
A worker is "booting" if its launch time is within the last
seconds_launch
seconds. seconds_launch
is a configurable grace
period when crew
allows a worker to start up and connect to the
mirai
dispatcher. The booting()
function does not know about the
actual worker connection status, it just knows about launch times,
so it may return TRUE
for workers that have already connected
and started doing tasks.
Method active()
Get active workers.
Method done()
Get done workers.
Method rotate()
Method forward()
Forward an asynchronous launch/termination error condition of a worker.
Arguments
index
Integer of length 1, index of the worker to inspect.
condition
Character of length 1 indicating what to do with an error if found.
"error"
to throw an error,"warning"
to throw a warning,"message"
to print a message, and"character"
to return a character vector of specific task-level error messages. The return value isNULL
if no error is found.
Method errors()
Collect and return the most recent error messages from asynchronous worker launching and termination.
Method scale()
Auto-scale workers out to meet the demand of tasks.
Arguments
demand
Number of unresolved tasks.
throttle
TRUE
to skip auto-scaling if it already happened within the lastseconds_interval
seconds.FALSE
to auto-scale every timescale()
is called. Throttling avoids overburdening themirai
dispatcher and other resources.
Method launch_worker()
Abstract worker launch method.
Arguments
call
Character of length 1 with a namespaced call to
crew_worker()
which will run in the worker and accept tasks.name
Character of length 1 with an informative worker name.
launcher
Character of length 1, name of the launcher.
worker
Positive integer of length 1, index of the worker. This worker index remains the same even when the current instance of the worker exits and a new instance launches. It is always between 1 and the maximum number of concurrent workers.
instance
Character of length 1 to uniquely identify the current instance of the worker a the index in the launcher.
Method crashes()
Return the number of consecutive times a worker launched without completing all its assigned tasks.
Arguments
index
Non-negative integer, index of the worker pointing to a row of the data frame output of the
summary()
method of the launcher.
Method terminate_worker()
Abstract worker termination method.
Examples
if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) {
client <- crew_client()
client$start()
launcher <- crew_launcher_local(name = client$name)
launcher$start(workers = client$workers)
launcher$launch(index = 1L)
m <- mirai::mirai("result", .compute = client$name)
Sys.sleep(0.25)
m$data
client$terminate()
}
## ------------------------------------------------
## Method `crew_class_launcher$new`
## ------------------------------------------------
if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) {
client <- crew_client()
client$start()
launcher <- crew_launcher_local(name = client$name)
launcher$start(workers = client$workers)
launcher$launch(index = 1L)
m <- mirai::mirai("result", .compute = client$name)
Sys.sleep(0.25)
m$data
client$terminate()
}
## ------------------------------------------------
## Method `crew_class_launcher$call`
## ------------------------------------------------
launcher <- crew_launcher_local()
launcher$call(
socket = "ws://127.0.0.1:5000/3/cba033e58",
launcher = "launcher_a",
worker = 3L,
instance = "cba033e58"
)
#> [1] "crew::crew_worker(settings = list(url = \"ws://127.0.0.1:5000/3/cba033e58\", asyncdial = FALSE, autoexit = 15L, cleanup = 1L, output = TRUE, maxtasks = Inf, idletime = Inf, walltime = Inf, timerstart = 0L, tls = NULL, rs = NULL), launcher = \"launcher_a\", worker = 3L, instance = \"cba033e58\", options_metrics = crew::crew_options_metrics(path = NULL, seconds_interval = 5))"