R6
class for controllers.
Details
See crew_controller()
.
See also
Other controller:
crew_controller()
Active bindings
client
Router object.
launcher
Launcher object.
tasks
A list of
mirai::mirai()
task objects.pushed
Number of tasks pushed since the controller was started.
popped
Number of tasks popped since the controller was started.
error
Tibble of task results (with one result per row) from the last call to
map(error = "stop)
.backlog
Character vector of explicitly backlogged tasks.
autoscaling
TRUE
orFALSE
, whether asynclater
-based auto-scaling is currently running
Methods
Method new()
mirai
controller constructor.
Usage
crew_class_controller$new(client = NULL, launcher = NULL)
Arguments
client
Router object. See
crew_controller()
.launcher
Launcher object. See
crew_controller()
.
Examples
if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) {
client <- crew_client()
launcher <- crew_launcher_local()
controller <- crew_controller(client = client, launcher = launcher)
controller$start()
controller$push(name = "task", command = sqrt(4))
controller$wait()
controller$pop()
controller$terminate()
}
Method empty()
Check if the controller is empty.
Arguments
controllers
Not used. Included to ensure the signature is compatible with the analogous method of controller groups.
Method nonempty()
Check if the controller is nonempty.
Arguments
controllers
Not used. Included to ensure the signature is compatible with the analogous method of controller groups.
Method resolved()
Number of resolved mirai()
tasks.
Arguments
controllers
Not used. Included to ensure the signature is compatible with the analogous method of controller groups.
Method unresolved()
Number of unresolved mirai()
tasks.
Method unpopped()
Number of resolved mirai()
tasks available via pop()
.
Method saturated()
Check if the controller is saturated.
Arguments
collect
Deprecated in version 0.5.0.9003 (2023-10-02). Not used.
throttle
Deprecated in version 0.5.0.9003 (2023-10-02). Not used.
controller
Not used. Included to ensure the signature is compatible with the analogous method of controller groups.
Details
A controller is saturated if the number of unresolved tasks
is greater than or equal to the maximum number of workers.
In other words, in a saturated controller, every available worker
has a task.
You can still push tasks to a saturated controller, but
tools that use crew
such as targets
may choose not to.
Method start()
Start the controller if it is not already started.
Method started()
Check whether the controller is started.
Method launch()
Launch one or more workers.
Arguments
n
Number of workers to try to launch. The actual number launched is capped so that no more than "
workers
" workers running at a given time, where "workers
" is an argument ofcrew_controller()
. The actual cap is the "workers
" argument minus the number of connected workers minus the number of starting workers. A "connected" worker has an active websocket connection to themirai
client, and "starting" means that the worker was launched at mostseconds_start
seconds ago, whereseconds_start
is also an argument ofcrew_controller()
.controllers
Not used. Included to ensure the signature is compatible with the analogous method of controller groups.
Method scale()
Auto-scale workers out to meet the demand of tasks.
Arguments
throttle
TRUE
to skip auto-scaling if it already happened within the lastseconds_interval
seconds.FALSE
to auto-scale every timescale()
is called. Throttling avoids overburdening themirai
dispatcher and other resources.controllers
Not used. Included to ensure the signature is compatible with the analogous method of controller groups.
Details
The scale()
method re-launches all inactive backlogged
workers, then any additional inactive workers needed to
accommodate the demand of unresolved tasks. A worker is
"backlogged" if it was assigned more tasks than it has completed
so far.
Methods push()
, pop()
, and wait()
already invoke
scale()
if the scale
argument is TRUE
.
For finer control of the number of workers launched,
call launch()
on the controller with the exact desired
number of workers.
Method autoscale()
Run worker auto-scaling in a private later
loop
every controller$client$seconds_interval
seconds.
Method descale()
Terminate the auto-scaling loop started by
controller$autoscale()
.
Method push()
Push a task to the head of the task list.
Arguments
command
Language object with R code to run.
data
Named list of local data objects in the evaluation environment.
globals
Named list of objects to temporarily assign to the global environment for the task. This list should include any functions you previously defined in the global environment which are required to run tasks. See the
reset_globals
argument ofcrew_controller_local()
.substitute
Logical of length 1, whether to call
base::substitute()
on the supplied value of thecommand
argument. IfTRUE
(default) thencommand
is quoted literally as you write it, e.g.push(command = your_function_call())
. IfFALSE
, thencrew
assumescommand
is a language object and you are passing its value, e.g.push(command = quote(your_function_call()))
.substitute = TRUE
is appropriate for interactive use, whereassubstitute = FALSE
is meant for automated R programs that invokecrew
controllers.seed
Integer of length 1 with the pseudo-random number generator seed to set for the evaluation of the task. Passed to the
seed
argument ofset.seed()
if notNULL
. Ifalgorithm
andseed
are bothNULL
, then the random number generator defaults to the widely spaced worker-specific L'Ecuyer streams as supported bymirai::nextstream()
. Seevignette("parallel", package = "parallel")
for details.algorithm
Integer of length 1 with the pseudo-random number generator algorithm to set for the evaluation of the task. Passed to the
kind
argument ofRNGkind()
if notNULL
. Ifalgorithm
andseed
are bothNULL
, then the random number generator defaults to the recommended widely spaced worker-specific L'Ecuyer streams as supported bymirai::nextstream()
. Seevignette("parallel", package = "parallel")
for details.packages
Character vector of packages to load for the task.
library
Library path to load the packages. See the
lib.loc
argument ofrequire()
.seconds_timeout
Optional task timeout passed to the
.timeout
argument ofmirai::mirai()
(after converting to milliseconds).scale
Logical, whether to automatically call
scale()
to auto-scale workers to meet the demand of the task load. Also see thethrottle
argument.throttle
TRUE
to skip auto-scaling if it already happened within the lastseconds_interval
seconds.FALSE
to auto-scale every timescale()
is called. Throttling avoids overburdening themirai
dispatcher and other resources.name
Optional name of the task. Must be a character string or
NA
.save_command
Logical of length 1. If
TRUE
, the controller deparses the command and returns it with the output onpop()
. IfFALSE
(default), the controller skips this step to increase speed.controller
Not used. Included to ensure the signature is compatible with the analogous method of controller groups.
Returns
Invisibly return the mirai
object of the pushed task.
This allows you to interact with the task directly, e.g.
to create a promise object with promises::as.promise()
.
Method walk()
Apply a single command to multiple inputs, and return control to the user without waiting for any task to complete.
Arguments
command
Language object with R code to run.
iterate
Named list of vectors or lists to iterate over. For example, to run function calls
f(x = 1, y = "a")
andf(x = 2, y = "b")
, setcommand
tof(x, y)
, and setiterate
tolist(x = c(1, 2), y = c("a", "b"))
. The individual function calls are evaluated asf(x = iterate$x[[1]], y = iterate$y[[1]])
andf(x = iterate$x[[2]], y = iterate$y[[2]])
. All the elements ofiterate
must have the same length. If there are any name conflicts betweeniterate
anddata
,iterate
takes precedence.data
Named list of constant local data objects in the evaluation environment. Objects in this list are treated as single values and are held constant for each iteration of the map.
globals
Named list of constant objects to temporarily assign to the global environment for each task. This list should include any functions you previously defined in the global environment which are required to run tasks. See the
reset_globals
argument ofcrew_controller_local()
. Objects in this list are treated as single values and are held constant for each iteration of the map.substitute
Logical of length 1, whether to call
base::substitute()
on the supplied value of thecommand
argument. IfTRUE
(default) thencommand
is quoted literally as you write it, e.g.push(command = your_function_call())
. IfFALSE
, thencrew
assumescommand
is a language object and you are passing its value, e.g.push(command = quote(your_function_call()))
.substitute = TRUE
is appropriate for interactive use, whereassubstitute = FALSE
is meant for automated R programs that invokecrew
controllers.seed
Integer of length 1 with the pseudo-random number generator seed to set for the evaluation of the task. Passed to the
seed
argument ofset.seed()
if notNULL
. Ifalgorithm
andseed
are bothNULL
, then the random number generator defaults to the recommended widely spaced worker-specific L'Ecuyer streams as supported bymirai::nextstream()
. Seevignette("parallel", package = "parallel")
for details.algorithm
Integer of length 1 with the pseudo-random number generator algorithm to set for the evaluation of the task. Passed to the
kind
argument ofRNGkind()
if notNULL
. Ifalgorithm
andseed
are bothNULL
, then the random number generator defaults to the recommended widely spaced worker-specific L'Ecuyer streams as supported bymirai::nextstream()
. Seevignette("parallel", package = "parallel")
for details.packages
Character vector of packages to load for the task.
library
Library path to load the packages. See the
lib.loc
argument ofrequire()
.seconds_timeout
Optional task timeout passed to the
.timeout
argument ofmirai::mirai()
(after converting to milliseconds).names
Optional character of length 1, name of the element of
iterate
with names for the tasks. Ifnames
is supplied, theniterate[[names]]
must be a character vector.save_command
Logical of length 1, whether to store a text string version of the R command in the output.
scale
Logical, whether to automatically scale workers to meet demand. See also the
throttle
argument.throttle
TRUE
to skip auto-scaling if it already happened within the lastseconds_interval
seconds.FALSE
to auto-scale every timescale()
is called. Throttling avoids overburdening themirai
dispatcher and other resources.controller
Not used. Included to ensure the signature is compatible with the analogous method of controller groups.
Method map()
Apply a single command to multiple inputs, wait for all tasks to complete, and return the results of all tasks.
Usage
crew_class_controller$map(
command,
iterate,
data = list(),
globals = list(),
substitute = TRUE,
seed = NULL,
algorithm = NULL,
packages = character(0),
library = NULL,
seconds_interval = 0.5,
seconds_timeout = NULL,
names = NULL,
save_command = FALSE,
error = "stop",
warnings = TRUE,
verbose = interactive(),
scale = TRUE,
throttle = TRUE,
controller = NULL
)
Arguments
command
Language object with R code to run.
iterate
Named list of vectors or lists to iterate over. For example, to run function calls
f(x = 1, y = "a")
andf(x = 2, y = "b")
, setcommand
tof(x, y)
, and setiterate
tolist(x = c(1, 2), y = c("a", "b"))
. The individual function calls are evaluated asf(x = iterate$x[[1]], y = iterate$y[[1]])
andf(x = iterate$x[[2]], y = iterate$y[[2]])
. All the elements ofiterate
must have the same length. If there are any name conflicts betweeniterate
anddata
,iterate
takes precedence.data
Named list of constant local data objects in the evaluation environment. Objects in this list are treated as single values and are held constant for each iteration of the map.
globals
Named list of constant objects to temporarily assign to the global environment for each task. This list should include any functions you previously defined in the global environment which are required to run tasks. See the
reset_globals
argument ofcrew_controller_local()
. Objects in this list are treated as single values and are held constant for each iteration of the map.substitute
Logical of length 1, whether to call
base::substitute()
on the supplied value of thecommand
argument. IfTRUE
(default) thencommand
is quoted literally as you write it, e.g.push(command = your_function_call())
. IfFALSE
, thencrew
assumescommand
is a language object and you are passing its value, e.g.push(command = quote(your_function_call()))
.substitute = TRUE
is appropriate for interactive use, whereassubstitute = FALSE
is meant for automated R programs that invokecrew
controllers.seed
Integer of length 1 with the pseudo-random number generator seed to set for the evaluation of the task. Passed to the
seed
argument ofset.seed()
if notNULL
. Ifalgorithm
andseed
are bothNULL
, then the random number generator defaults to the recommended widely spaced worker-specific L'Ecuyer streams as supported bymirai::nextstream()
. Seevignette("parallel", package = "parallel")
for details.algorithm
Integer of length 1 with the pseudo-random number generator algorithm to set for the evaluation of the task. Passed to the
kind
argument ofRNGkind()
if notNULL
. Ifalgorithm
andseed
are bothNULL
, then the random number generator defaults to the recommended widely spaced worker-specific L'Ecuyer streams as supported bymirai::nextstream()
. Seevignette("parallel", package = "parallel")
for details.packages
Character vector of packages to load for the task.
library
Library path to load the packages. See the
lib.loc
argument ofrequire()
.seconds_interval
Number of seconds to wait between auto-scaling operations while waiting for tasks to complete.
seconds_timeout
Optional task timeout passed to the
.timeout
argument ofmirai::mirai()
(after converting to milliseconds).names
Optional character of length 1, name of the element of
iterate
with names for the tasks. Ifnames
is supplied, theniterate[[names]]
must be a character vector.save_command
Logical of length 1, whether to store a text string version of the R command in the output.
error
Character of length 1, choice of action if a task has an error. Possible values:
"stop"
: throw an error in the main R session instead of returning a value. In case of an error, the results from the last erroredmap()
are in theerror
field of the controller, e.g.controller_object$error
. To reduce memory consumption, setcontroller_object$error <- NULL
after you are finished troubleshooting."warn"
: throw a warning. This allows the return value with all the error messages and tracebacks to be generated."silent"
: do nothing special.
warnings
Logical of length 1, whether to throw a warning in the interactive session if at least one task encounters an error.
verbose
Logical of length 1, whether to print progress messages.
scale
Logical, whether to automatically scale workers to meet demand. See also the
throttle
argument.throttle
TRUE
to skip auto-scaling if it already happened within the lastseconds_interval
seconds.FALSE
to auto-scale every timescale()
is called. Throttling avoids overburdening themirai
dispatcher and other resources.controller
Not used. Included to ensure the signature is compatible with the analogous method of controller groups.
Method pop()
Pop a completed task from the results data frame.
Usage
crew_class_controller$pop(
scale = TRUE,
collect = NULL,
throttle = TRUE,
error = NULL,
controllers = NULL
)
Arguments
scale
Logical of length 1, whether to automatically call
scale()
to auto-scale workers to meet the demand of the task load. Scaling up onpop()
may be important for transient or nearly transient workers that tend to drop off quickly after doing little work. See also thethrottle
argument.collect
Deprecated in version 0.5.0.9003 (2023-10-02).
throttle
TRUE
to skip auto-scaling if it already happened within the lastseconds_interval
seconds.FALSE
to auto-scale every timescale()
is called. Throttling avoids overburdening themirai
dispatcher and other resources.error
NULL
or character of length 1, choice of action if the popped task threw an error. Possible values:"stop"
: throw an error in the main R session instead of returning a value."warn"
: throw a warning.NULL
or"silent"
: do not react to errors.
controllers
Not used. Included to ensure the signature is compatible with the analogous method of controller groups.
Returns
If there is no task to collect, return NULL
. Otherwise,
return a one-row tibble
with the following columns.
name
: the task name if given.command
: a character string with the R command ifsave_command
was set toTRUE
inpush()
.result
: a list containing the return value of the R command.seconds
: number of seconds that the task ran.seed
: the single integer originally supplied topush()
,NA
otherwise. The pseudo-random number generator state just prior to the task can be restored usingset.seed(seed = seed, kind = algorithm)
, whereseed
andalgorithm
are part of this output.algorithm
: name of the pseudo-random number generator algorithm originally supplied topush()
,NA
otherwise. The pseudo-random number generator state just prior to the task can be restored usingset.seed(seed = seed, kind = algorithm)
, whereseed
andalgorithm
are part of this output.status
: a character string."success"
if the task did not throw an error,"cancel"
if the task was canceled with thecancel()
controller method, or"error"
if the task threw an error.code
: an integer code denoting the specific exit status:0
for successful tasks,1
for tasks with an error in the R command of the task, and another positive integer with an NNG status code if there is an error at the NNG/nanonext
level.error
: the first 2048 characters of the error message if the task threw an error,NA
otherwise.trace
: the first 2048 characters of the text of the traceback if the task threw an error,NA
otherwise.warnings
: the first 2048 characters. of the text of warning messages that the task may have generated,NA
otherwise.launcher
: name of thecrew
launcher where the task ran.
Method collect()
Pop all available task results and return them in a tidy
tibble
.
Usage
crew_class_controller$collect(
scale = TRUE,
throttle = TRUE,
error = NULL,
controllers = NULL
)
Arguments
scale
Logical of length 1, whether to automatically call
scale()
to auto-scale workers to meet the demand of the task load.throttle
TRUE
to skip auto-scaling if it already happened within the lastseconds_interval
seconds.FALSE
to auto-scale every timescale()
is called. Throttling avoids overburdening themirai
dispatcher and other resources.error
NULL
or character of length 1, choice of action if the popped task threw an error. Possible values:"stop"
: throw an error in the main R session instead of returning a value."warn"
: throw a warning.NULL
or"silent"
: do not react to errors.
controllers
Not used. Included to ensure the signature is compatible with the analogous method of controller groups.
Method promise()
Create a promises::promise()
object to asynchronously
pop or collect one or more tasks.
Usage
crew_class_controller$promise(
mode = "one",
seconds_interval = 0.1,
scale = NULL,
throttle = NULL,
controllers = NULL
)
Arguments
mode
Character of length 1, what kind of promise to create.
mode
must be"one"
or"all"
. Details:If
mode
is"one"
, then the promise is fulfilled (or rejected) when at least one task is resolved and available topop()
. When that happens,pop()
runs asynchronously, pops a result off the task list, and returns a value. If the task succeeded, then the promise is fulfilled and its value is the result ofpop()
(a one-rowtibble
with the result and metadata). If the task threw an error, the error message of the task is forwarded to any error callbacks registered with the promise.If
mode
is"all"
, then the promise is fulfilled (or rejected) when there are no unresolved tasks left in the controller. (Be careful: this condition is trivially met in the moment if the controller is empty and you have not submitted any tasks, so it is best to create this kind of promise only after you submit tasks.) When there are no unresolved tasks left,collect()
runs asynchronously, pops all available results off the task list, and returns a value. If the task succeeded, then the promise is fulfilled and its value is the result ofcollect()
(atibble
with one row per task result). If any of the tasks threw an error, then the first error message detected is forwarded to any error callbacks registered with the promise.
seconds_interval
Positive numeric of length 1, delay in the
later::later()
polling interval to asynchronously check if the promise can be resolved.scale
Deprecated on 2024-04-10 (version 0.9.1.9003) and no longer used. Now,
promise()
always turns on auto-scaling in a privatelater
loop (if not already activated).throttle
Deprecated on 2024-04-10 (version 0.9.1.9003) and no longer used. Now,
promise()
always turns on auto-scaling in a privatelater
loop (if not already activated).controllers
Not used. Included to ensure the signature is compatible with the analogous method of controller groups.
Details
Please be aware that pop()
or collect()
will happen
asynchronously at a some unpredictable time after the promise object
is created, even if your local R process appears to be doing
something completely different. This behavior is highly desirable
in a Shiny reactive context, but please be careful as it may be
surprising in other situations.
Returns
A promises::promise()
object whose eventual value will
be a tibble
with results from one or more popped tasks.
If mode = "one"
, only one task is popped and returned (one row).
If mode = "all"
, then all the tasks are returned in a tibble
with one row per task (or NULL
is returned if there are no
tasks to pop).
Method wait()
Wait for tasks.
Usage
crew_class_controller$wait(
mode = "all",
seconds_interval = 0.5,
seconds_timeout = Inf,
scale = TRUE,
throttle = TRUE,
controllers = NULL
)
Arguments
mode
Character of length 1:
"all"
to wait for all tasks to complete,"one"
to wait for a single task to complete.seconds_interval
Number of seconds to interrupt the wait in order to scale up workers as needed.
seconds_timeout
Timeout length in seconds waiting for tasks.
scale
Logical, whether to automatically call
scale()
to auto-scale workers to meet the demand of the task load. See also thethrottle
argument.throttle
TRUE
to skip auto-scaling if it already happened within the lastseconds_interval
seconds.FALSE
to auto-scale every timescale()
is called. Throttling avoids overburdening themirai
dispatcher and other resources.controllers
Not used. Included to ensure the signature is compatible with the analogous method of controller groups.
Method push_backlog()
Push the name of a task to the backlog.
Method pop_backlog()
Pop the task names from the head of the backlog which can be pushed without saturating the controller.
Method summary()
Summarize the workers and tasks of the controller.
Arguments
controllers
Not used. Included to ensure the signature is compatible with the analogous method of controller groups.
Returns
A data frame of summary statistics on the workers and tasks. It has one row per worker websocket and the following columns:
controller
: name of the controller. . *worker
: integer index of the worker.tasks
: number of tasks which were completed by a worker at the websocket and then returned by callingpop()
on the controller object.seconds
: total number of runtime and seconds of all the tasks that ran on a worker connected to this websocket and then were retrieved by callingpop()
on the controller object.errors
: total number of tasks which ran on a worker at the website, encountered an error in R, and then retrieved withpop()
.warnings
: total number of tasks which ran on a worker at the website, encountered one or more warnings in R, and then retrieved withpop()
. Note:warnings
is actually the number of tasks, not the number of warnings. (A task could throw more than one warning.
Method cancel()
Cancel one or more tasks.
Usage
crew_class_controller$cancel(names = character(0L), all = FALSE)
Method pids()
Get the process IDs of the local process and the
mirai
dispatcher (if started).
Examples
if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) {
client <- crew_client()
launcher <- crew_launcher_local()
controller <- crew_controller(client = client, launcher = launcher)
controller$start()
controller$push(name = "task", command = sqrt(4))
controller$wait()
controller$pop()
controller$terminate()
}
## ------------------------------------------------
## Method `crew_class_controller$new`
## ------------------------------------------------
if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) {
client <- crew_client()
launcher <- crew_launcher_local()
controller <- crew_controller(client = client, launcher = launcher)
controller$start()
controller$push(name = "task", command = sqrt(4))
controller$wait()
controller$pop()
controller$terminate()
}