R6 class for controller groups.
See also
Other controller_group:
crew_controller_group()
Active bindings
controllersList of
R6controller objects.relayRelay object for event-driven programming on a downstream condition variable.
Methods
Method new()
Multi-controller constructor.
Usage
crew_class_controller_group$new(controllers = NULL, relay = NULL)Arguments
controllersList of
R6controller objects.relayRelay object for event-driven programming on a downstream condition variable.
Examples
if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) {
persistent <- crew_controller_local(name = "persistent")
transient <- crew_controller_local(
name = "transient",
tasks_max = 1L
)
group <- crew_controller_group(persistent, transient)
group$start()
group$push(name = "task", command = sqrt(4), controller = "transient")
group$wait()
group$pop()
group$terminate()
}Method empty()
See if the controllers are empty.
Method nonempty()
Check if the controller group is nonempty.
Method resolved()
Number of resolved mirai() tasks.
Method saturated()
Check if a controller is saturated.
Arguments
collectDeprecated in version 0.5.0.9003 (2023-10-02). Not used.
throttleDeprecated in version 0.5.0.9003 (2023-10-02). Not used.
controllerCharacter vector of length 1 with the controller name. Set to
NULLto select the default controller thatpush()would choose.
Method start()
Start one or more controllers.
Method launch()
Launch one or more workers on one or more controllers.
Method scale()
Automatically scale up the number of workers if needed in one or more controller objects.
Arguments
throttleTRUEto skip auto-scaling if it already happened within the last polling interval.FALSEto auto-scale every timescale()is called. Throttling avoids overburdening themiraidispatcher and other resources.controllersCharacter vector of controller names. Set to
NULLto select all controllers.
Details
See the scale() method in individual controller classes.
Method autoscale()
Run worker auto-scaling in a later loop.
Usage
crew_class_controller_group$autoscale(
loop = later::current_loop(),
controllers = NULL
)Method crashes()
Report the number of consecutive crashes of a task, summed over all selected controllers in the group.
Arguments
nameCharacter string, name of the task to check.
controllersNot used. Included to ensure the signature is compatible with the analogous method of controller groups.
Details
See the crashes_max argument of crew_controller().
Method push()
Push a task to the head of the task list.
Arguments
commandLanguage object with R code to run.
dataNamed list of local data objects in the evaluation environment.
globalsNamed list of objects to temporarily assign to the global environment for the task. See the
reset_globalsargument ofcrew_controller_local().substituteLogical of length 1, whether to call
base::substitute()on the supplied value of thecommandargument. IfTRUE(default) thencommandis quoted literally as you write it, e.g.push(command = your_function_call()). IfFALSE, thencrewassumescommandis a language object and you are passing its value, e.g.push(command = quote(your_function_call())).substitute = TRUEis appropriate for interactive use, whereassubstitute = FALSEis meant for automated R programs that invokecrewcontrollers.seedInteger of length 1 with the pseudo-random number generator seed to set for the evaluation of the task. Passed to the
seedargument ofset.seed()if notNULL. Ifalgorithmandseedare bothNULL, then the random number generator defaults to the widely spaced worker-specific L'Ecuyer streams as supported bymirai::nextstream(). Seevignette("parallel", package = "parallel")for details.algorithmInteger of length 1 with the pseudo-random number generator algorithm to set for the evaluation of the task. Passed to the
kindargument ofRNGkind()if notNULL. Ifalgorithmandseedare bothNULL, then the random number generator defaults to the recommended widely spaced worker-specific L'Ecuyer streams as supported bymirai::nextstream(). Seevignette("parallel", package = "parallel")for details.packagesCharacter vector of packages to load for the task.
libraryLibrary path to load the packages. See the
lib.locargument ofrequire().seconds_timeoutOptional task timeout passed to the
.timeoutargument ofmirai::mirai()(after converting to milliseconds).scaleLogical, whether to automatically scale workers to meet demand. See the
scaleargument of thepush()method of ordinary single controllers.throttleTRUEto skip auto-scaling if it already happened within the last polling interval.FALSEto auto-scale every timescale()is called. Throttling avoids overburdening themiraidispatcher and other resources.nameCharacter string, name of the task. If
NULL, a random name is automatically generated. The task name must not conflict with an existing task in the controller where it is submitted. To reuse the name, wait for the existing task to finish, then eitherpop()orcollect()it to remove it from its controller.save_commandDeprecated on 2025-01-22 (
crewversion 0.10.2.9004).controllerCharacter of length 1, name of the controller to submit the task. If
NULL, the controller defaults to the first controller in the list.
Returns
Invisibly return the mirai object of the pushed task.
This allows you to interact with the task directly, e.g.
to create a promise object with promises::as.promise().
Method walk()
Apply a single command to multiple inputs, and return control to the user without waiting for any task to complete.
Usage
crew_class_controller_group$walk(
command,
iterate,
data = list(),
globals = list(),
substitute = TRUE,
seed = NULL,
algorithm = NULL,
packages = character(0),
library = NULL,
seconds_timeout = NULL,
names = NULL,
save_command = NULL,
verbose = interactive(),
scale = TRUE,
throttle = TRUE,
controller = NULL
)Arguments
commandLanguage object with R code to run.
iterateNamed list of vectors or lists to iterate over. For example, to run function calls
f(x = 1, y = "a")andf(x = 2, y = "b"), setcommandtof(x, y), and setiteratetolist(x = c(1, 2), y = c("a", "b")). The individual function calls are evaluated asf(x = iterate$x[[1]], y = iterate$y[[1]])andf(x = iterate$x[[2]], y = iterate$y[[2]]). All the elements ofiteratemust have the same length. If there are any name conflicts betweeniterateanddata,iteratetakes precedence.dataNamed list of constant local data objects in the evaluation environment. Objects in this list are treated as single values and are held constant for each iteration of the map.
globalsNamed list of constant objects to temporarily assign to the global environment for each task. This list should include any functions you previously defined in the global environment which are required to run tasks. See the
reset_globalsargument ofcrew_controller_local(). Objects in this list are treated as single values and are held constant for each iteration of the map.substituteLogical of length 1, whether to call
base::substitute()on the supplied value of thecommandargument. IfTRUE(default) thencommandis quoted literally as you write it, e.g.push(command = your_function_call()). IfFALSE, thencrewassumescommandis a language object and you are passing its value, e.g.push(command = quote(your_function_call())).substitute = TRUEis appropriate for interactive use, whereassubstitute = FALSEis meant for automated R programs that invokecrewcontrollers.seedInteger of length 1 with the pseudo-random number generator seed to set for the evaluation of the task. Passed to the
seedargument ofset.seed()if notNULL. Ifalgorithmandseedare bothNULL, then the random number generator defaults to the recommended widely spaced worker-specific L'Ecuyer streams as supported bymirai::nextstream(). Seevignette("parallel", package = "parallel")for details.algorithmInteger of length 1 with the pseudo-random number generator algorithm to set for the evaluation of the task. Passed to the
kindargument ofRNGkind()if notNULL. Ifalgorithmandseedare bothNULL, then the random number generator defaults to the recommended widely spaced worker-specific L'Ecuyer streams as supported bymirai::nextstream(). Seevignette("parallel", package = "parallel")for details.packagesCharacter vector of packages to load for the task.
libraryLibrary path to load the packages. See the
lib.locargument ofrequire().seconds_timeoutOptional task timeout passed to the
.timeoutargument ofmirai::mirai()(after converting to milliseconds).namesOptional character of length 1, name of the element of
iteratewith names for the tasks. Ifnamesis supplied, theniterate[[names]]must be a character vector.save_commandDeprecated on 2025-01-22 (
crewversion 0.10.2.9004).verboseLogical of length 1, whether to print to a progress bar when pushing tasks.
scaleLogical, whether to automatically scale workers to meet demand. See also the
throttleargument.throttleTRUEto skip auto-scaling if it already happened within the last polling interval.FALSEto auto-scale every timescale()is called. Throttling avoids overburdening themiraidispatcher and other resources.controllerCharacter of length 1, name of the controller to submit the tasks. If
NULL, the controller defaults to the first controller in the list.
Method map()
Apply a single command to multiple inputs.
Usage
crew_class_controller_group$map(
command,
iterate,
data = list(),
globals = list(),
substitute = TRUE,
seed = NULL,
algorithm = NULL,
packages = character(0),
library = NULL,
seconds_interval = NULL,
seconds_timeout = NULL,
names = NULL,
save_command = NULL,
error = "stop",
warnings = TRUE,
verbose = interactive(),
scale = TRUE,
throttle = TRUE,
controller = NULL
)Arguments
commandLanguage object with R code to run.
iterateNamed list of vectors or lists to iterate over. For example, to run function calls
f(x = 1, y = "a")andf(x = 2, y = "b"), setcommandtof(x, y), and setiteratetolist(x = c(1, 2), y = c("a", "b")). The individual function calls are evaluated asf(x = iterate$x[[1]], y = iterate$y[[1]])andf(x = iterate$x[[2]], y = iterate$y[[2]]). All the elements ofiteratemust have the same length. If there are any name conflicts betweeniterateanddata,iteratetakes precedence.dataNamed list of constant local data objects in the evaluation environment. Objects in this list are treated as single values and are held constant for each iteration of the map.
globalsNamed list of constant objects to temporarily assign to the global environment for each task. This list should include any functions you previously defined in the global environment which are required to run tasks. See the
reset_globalsargument ofcrew_controller_local(). Objects in this list are treated as single values and are held constant for each iteration of the map.substituteLogical of length 1, whether to call
base::substitute()on the supplied value of thecommandargument. IfTRUE(default) thencommandis quoted literally as you write it, e.g.push(command = your_function_call()). IfFALSE, thencrewassumescommandis a language object and you are passing its value, e.g.push(command = quote(your_function_call())).substitute = TRUEis appropriate for interactive use, whereassubstitute = FALSEis meant for automated R programs that invokecrewcontrollers.seedInteger of length 1 with the pseudo-random number generator seed to set for the evaluation of the task. Passed to the
seedargument ofset.seed()if notNULL. Ifalgorithmandseedare bothNULL, then the random number generator defaults to the recommended widely spaced worker-specific L'Ecuyer streams as supported bymirai::nextstream(). Seevignette("parallel", package = "parallel")for details.algorithmInteger of length 1 with the pseudo-random number generator algorithm to set for the evaluation of the task. Passed to the
kindargument ofRNGkind()if notNULL. Ifalgorithmandseedare bothNULL, then the random number generator defaults to the recommended widely spaced worker-specific L'Ecuyer streams as supported bymirai::nextstream(). Seevignette("parallel", package = "parallel")for details.packagesCharacter vector of packages to load for the task.
libraryLibrary path to load the packages. See the
lib.locargument ofrequire().seconds_intervalDeprecated on 2025-01-17 (
crewversion 0.10.2.9003). Instead, theseconds_intervalargument passed tocrew_controller_group()is used asseconds_maxin acrew_throttle()object which orchestrates exponential backoff.seconds_timeoutOptional task timeout passed to the
.timeoutargument ofmirai::mirai()(after converting to milliseconds).namesOptional character of length 1, name of the element of
iteratewith names for the tasks. Ifnamesis supplied, theniterate[[names]]must be a character vector.save_commandDeprecated on 2025-01-22 (
crewversion 0.10.2.9004).errorCharacter vector of length 1, choice of action if a task has an error. Possible values:
"stop": throw an error in the main R session instead of returning a value. In case of an error, the results from the last erroredmap()are in theerrorfield of the controller, e.g.controller_object$error. To reduce memory consumption, setcontroller_object$error <- NULLafter you are finished troubleshooting."warn": throw a warning. This allows the return value with all the error messages and tracebacks to be generated."silent": do nothing special.
warningsLogical of length 1, whether to throw a warning in the interactive session if at least one task encounters an error.
verboseLogical of length 1, whether to print progress messages.
scaleLogical, whether to automatically scale workers to meet demand. See also the
throttleargument.throttleTRUEto skip auto-scaling if it already happened within the last polling interval.FALSEto auto-scale every timescale()is called. Throttling avoids overburdening themiraidispatcher and other resources.controllerCharacter of length 1, name of the controller to submit the tasks. If
NULL, the controller defaults to the first controller in the list.
Method pop()
Pop a completed task from the results data frame.
Usage
crew_class_controller_group$pop(
scale = TRUE,
collect = NULL,
throttle = TRUE,
error = NULL,
controllers = NULL
)Arguments
scaleLogical, whether to automatically scale workers to meet demand. See the
scaleargument of thepop()method of ordinary single controllers.collectDeprecated in version 0.5.0.9003 (2023-10-02). Not used.
throttleTRUEto skip auto-scaling if it already happened within the last polling interval.FALSEto auto-scale every timescale()is called. Throttling avoids overburdening themiraidispatcher and other resources.errorNULLor character of length 1, choice of action if the popped task threw an error. Possible values:"stop": throw an error in the main R session instead of returning a value."warn": throw a warning.NULLor"silent": do not react to errors.
controllersCharacter vector of controller names. Set to
NULLto select all controllers.
Method collect()
Pop all available task results and return them in a tidy
tibble.
Usage
crew_class_controller_group$collect(
scale = TRUE,
throttle = TRUE,
error = NULL,
controllers = NULL
)Arguments
scaleLogical of length 1, whether to automatically call
scale()to auto-scale workers to meet the demand of the task load.throttleTRUEto skip auto-scaling if it already happened within the last polling interval.FALSEto auto-scale every timescale()is called. Throttling avoids overburdening themiraidispatcher and other resources.errorNULLor character of length 1, choice of action if the popped task threw an error. Possible values:"stop": throw an error in the main R session instead of returning a value."warn": throw a warning.NULLor"silent": do not react to errors.
controllersCharacter vector of controller names. Set to
NULLto select all controllers.
Method wait()
Wait for tasks.
Usage
crew_class_controller_group$wait(
mode = "all",
seconds_interval = NULL,
seconds_timeout = Inf,
scale = TRUE,
throttle = TRUE,
controllers = NULL
)Arguments
modeCharacter string, name of the waiting condition.
wait(mode = "all")waits until all tasks in themiraicompute profile resolve, andwait(mode = "one")waits until at least one task is available topush()orcollect()from the controller. The former still works if the controller is not the only means of submitting tasks to the compute profile, whereas the latter assumes only the controller submits tasks.seconds_intervalDeprecated on 2025-01-17 (
crewversion 0.10.2.9003). Instead, theseconds_intervalargument passed tocrew_controller_group()is used asseconds_maxin acrew_throttle()object which orchestrates exponential backoff.seconds_timeoutTimeout length in seconds waiting for results to become available.
scaleLogical of length 1, whether to call
scale_later()on each selected controller to schedule auto-scaling. See thescaleargument of thewait()method of ordinary single controllers.throttleTRUEto skip auto-scaling if it already happened within the last polling interval.FALSEto auto-scale every timescale()is called. Throttling avoids overburdening themiraidispatcher and other resources.controllersCharacter vector of controller names. Set to
NULLto select all controllers.
Details
The wait() method blocks the calling R session
until the condition in the mode argument is met.
During the wait, wait() iteratively auto-scales the workers.
Returns
A logical of length 1, invisibly.
wait(mode = "all") returns TRUE if all tasks in the mirai
compute profile have resolved (FALSE otherwise).
wait(mode = "one") returns TRUE if the controller is ready
to pop or collect at least one resolved task (FALSE otherwise).
wait(mode = "one") assumes all
tasks were submitted through the controller and not by other means.
Method push_backlog()
Push the name of a task to the backlog.
Method pop_backlog()
Pop the task names from the head of the backlog which can be pushed without saturating the controller.
Method summary()
Summarize the workers of one or more controllers.
Returns
A data frame of aggregated worker summary statistics
of all the selected controllers. It has one row per worker,
and the rows are grouped by controller.
See the documentation of the summary() method of the controller
class for specific information about the columns in the output.
Examples
if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) {
persistent <- crew_controller_local(name = "persistent")
transient <- crew_controller_local(
name = "transient",
tasks_max = 1L
)
group <- crew_controller_group(persistent, transient)
group$start()
group$push(name = "task", command = sqrt(4), controller = "transient")
group$wait()
group$pop()
group$terminate()
}
## ------------------------------------------------
## Method `crew_class_controller_group$new`
## ------------------------------------------------
if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) {
persistent <- crew_controller_local(name = "persistent")
transient <- crew_controller_local(
name = "transient",
tasks_max = 1L
)
group <- crew_controller_group(persistent, transient)
group$start()
group$push(name = "task", command = sqrt(4), controller = "transient")
group$wait()
group$pop()
group$terminate()
}