R6 class for controllers.
Details
See crew_controller().
See also
Other controller:
crew_controller()
Active bindings
profileCharacter string, compute profile of the controller.
clientClient object.
launcherLauncher object.
tasksA list of
mirai::mirai()task objects. The list of tasks is dynamically generated from an internal, dictionary, so it is not as fast as a simple lookup.reset_globalsSee
crew_controller(). since the controller was started.reset_packagesSee
crew_controller(). since the controller was started.reset_optionsSee
crew_controller(). since the controller was started.garbage_collectionSee
crew_controller(). since the controller was started.crashes_maxSee
crew_controller().backupSee
crew_controller().errorTibble of task results (with one result per row) from the last call to
map(error = "stop).looplaterloop if asynchronous auto-scaling is running,NULLotherwise.queue_resolvedQueue of resolved tasks.
queue_backlogQueue of explicitly backlogged tasks.
Methods
Method new()
mirai controller constructor.
Usage
crew_class_controller$new(
client = NULL,
launcher = NULL,
reset_globals = NULL,
reset_packages = NULL,
reset_options = NULL,
garbage_collection = NULL,
crashes_max = NULL,
backup = NULL
)Arguments
clientClient object. See
crew_controller().launcherLauncher object. See
crew_controller().reset_globalsSee
crew_controller().reset_packagesSee
crew_controller().reset_optionsSee
crew_controller().garbage_collectionSee
crew_controller().crashes_maxSee
crew_controller().backupSee
crew_controller().
Examples
if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) {
client <- crew_client()
launcher <- crew_launcher_local()
controller <- crew_controller(client = client, launcher = launcher)
controller$start()
controller$push(name = "task", command = sqrt(4))
controller$wait()
controller$pop()
controller$terminate()
}Method size()
Number of tasks in the controller.
Method empty()
Check if the controller is empty.
Arguments
controllersNot used. Included to ensure the signature is compatible with the analogous method of controller groups.
Details
A controller is empty if it has no mirai::mirai()
task objects in the controller.
There may still be other tasks running on the workers
of an empty controller, but those tasks were not submitted with
push() or collect(),
and they are not part of the controller task queue.
Method nonempty()
Check if the controller is nonempty.
Arguments
controllersNot used. Included to ensure the signature is compatible with the analogous method of controller groups.
Details
A controller is empty if it has no mirai::mirai()
task objects in the controller.
There may still be other tasks running on the workers
of an empty controller, but those tasks were not submitted with
push() or collect(),
and they are not part of the controller task queue.
Method resolved()
Cumulative number of resolved tasks.
Arguments
controllersNot used. Included to ensure the signature is compatible with the analogous method of controller groups.
Method unresolved()
Number of unresolved tasks.
Method saturated()
Check if the controller is saturated.
Arguments
collectDeprecated in version 0.5.0.9003 (2023-10-02). Not used.
throttleDeprecated in version 0.5.0.9003 (2023-10-02). Not used.
controllerNot used. Included to ensure the signature is compatible with the analogous method of controller groups.
Method start()
Start the controller if it is not already started.
Method started()
Check whether the controller is started.
Method launch()
Launch one or more workers.
Method scale()
Auto-scale workers out to meet the demand of tasks.
Arguments
throttleTRUEto skip auto-scaling if it already happened within the last polling interval.FALSEto auto-scale every timescale()is called. Throttling avoids overburdening themiraidispatcher and other resources.controllersNot used. Included to ensure the signature is compatible with the analogous method of controller groups.
Details
The scale() method launches new workers to
run tasks if needed.
Method autoscale()
Run worker auto-scaling in a later loop
in polling intervals determined by exponential backoff.
Usage
crew_class_controller$autoscale(
loop = later::global_loop(),
controllers = NULL
)Method descale()
Terminate the auto-scaling loop started by
controller$autoscale().
Method crashes()
Report the number of consecutive crashes of a task.
Arguments
nameCharacter string, name of the task to check.
controllersNot used. Included to ensure the signature is compatible with the analogous method of controller groups.
Details
See the crashes_max argument of crew_controller().
Method push()
Push a task to the head of the task list.
Arguments
commandLanguage object with R code to run.
dataNamed list of local data objects in the evaluation environment.
globalsNamed list of objects to temporarily assign to the global environment for the task. This list should include any functions you previously defined in the global environment which are required to run tasks. See the
reset_globalsargument ofcrew_controller_local().substituteLogical of length 1, whether to call
base::substitute()on the supplied value of thecommandargument. IfTRUE(default) thencommandis quoted literally as you write it, e.g.push(command = your_function_call()). IfFALSE, thencrewassumescommandis a language object and you are passing its value, e.g.push(command = quote(your_function_call())).substitute = TRUEis appropriate for interactive use, whereassubstitute = FALSEis meant for automated R programs that invokecrewcontrollers.seedInteger of length 1 with the pseudo-random number generator seed to set for the evaluation of the task. Passed to the
seedargument ofset.seed()if notNULL. Ifalgorithmandseedare bothNULL, then the random number generator defaults to the widely spaced worker-specific L'Ecuyer streams as supported bymirai::nextstream(). Seevignette("parallel", package = "parallel")for details.algorithmInteger of length 1 with the pseudo-random number generator algorithm to set for the evaluation of the task. Passed to the
kindargument ofRNGkind()if notNULL. Ifalgorithmandseedare bothNULL, then the random number generator defaults to the recommended widely spaced worker-specific L'Ecuyer streams as supported bymirai::nextstream(). Seevignette("parallel", package = "parallel")for details.packagesCharacter vector of packages to load for the task.
libraryLibrary path to load the packages. See the
lib.locargument ofrequire().seconds_timeoutOptional task timeout passed to the
.timeoutargument ofmirai::mirai()(after converting to milliseconds).scaleLogical, whether to automatically call
scale()to auto-scale workers to meet the demand of the task load. Also see thethrottleargument.throttleTRUEto skip auto-scaling if it already happened within the last polling interval.FALSEto auto-scale every timescale()is called. Throttling avoids overburdening themiraidispatcher and other resources.nameCharacter string, name of the task. If
NULL, then a random name is generated automatically. The name of the task must not conflict with the name of another task pushed to the controller. Any previous task with the same name must first be popped before a new task with that name can be pushed.save_commandDeprecated on 2025-01-22 (
crewversion 0.10.2.9004) and no longer used.controllerNot used. Included to ensure the signature is compatible with the analogous method of controller groups.
Returns
Invisibly return the mirai object of the pushed task.
This allows you to interact with the task directly, e.g.
to create a promise object with promises::as.promise().
Method walk()
Apply a single command to multiple inputs, and return control to the user without waiting for any task to complete.
Usage
crew_class_controller$walk(
command,
iterate,
data = list(),
globals = list(),
substitute = TRUE,
seed = NULL,
algorithm = NULL,
packages = character(0),
library = NULL,
seconds_timeout = NULL,
names = NULL,
save_command = NULL,
verbose = interactive(),
scale = TRUE,
throttle = TRUE,
controller = NULL
)Arguments
commandLanguage object with R code to run.
iterateNamed list of vectors or lists to iterate over. For example, to run function calls
f(x = 1, y = "a")andf(x = 2, y = "b"), setcommandtof(x, y), and setiteratetolist(x = c(1, 2), y = c("a", "b")). The individual function calls are evaluated asf(x = iterate$x[[1]], y = iterate$y[[1]])andf(x = iterate$x[[2]], y = iterate$y[[2]]). All the elements ofiteratemust have the same length. If there are any name conflicts betweeniterateanddata,iteratetakes precedence.dataNamed list of constant local data objects in the evaluation environment. Objects in this list are treated as single values and are held constant for each iteration of the map.
globalsNamed list of constant objects to temporarily assign to the global environment for each task. This list should include any functions you previously defined in the global environment which are required to run tasks. See the
reset_globalsargument ofcrew_controller_local(). Objects in this list are treated as single values and are held constant for each iteration of the map.substituteLogical of length 1, whether to call
base::substitute()on the supplied value of thecommandargument. IfTRUE(default) thencommandis quoted literally as you write it, e.g.push(command = your_function_call()). IfFALSE, thencrewassumescommandis a language object and you are passing its value, e.g.push(command = quote(your_function_call())).substitute = TRUEis appropriate for interactive use, whereassubstitute = FALSEis meant for automated R programs that invokecrewcontrollers.seedInteger of length 1 with the pseudo-random number generator seed to set for the evaluation of the task. Passed to the
seedargument ofset.seed()if notNULL. Ifalgorithmandseedare bothNULL, then the random number generator defaults to the recommended widely spaced worker-specific L'Ecuyer streams as supported bymirai::nextstream(). Seevignette("parallel", package = "parallel")for details.algorithmInteger of length 1 with the pseudo-random number generator algorithm to set for the evaluation of the task. Passed to the
kindargument ofRNGkind()if notNULL. Ifalgorithmandseedare bothNULL, then the random number generator defaults to the recommended widely spaced worker-specific L'Ecuyer streams as supported bymirai::nextstream(). Seevignette("parallel", package = "parallel")for details.packagesCharacter vector of packages to load for the task.
libraryLibrary path to load the packages. See the
lib.locargument ofrequire().seconds_timeoutOptional task timeout passed to the
.timeoutargument ofmirai::mirai()(after converting to milliseconds).namesOptional character of length 1, name of the element of
iteratewith names for the tasks. Ifnamesis supplied, theniterate[[names]]must be a character vector.save_commandDeprecated on 2025-01-22 (
crewversion 0.10.2.9004). The command is always saved now.verboseLogical of length 1, whether to print to a progress bar when pushing tasks.
scaleLogical, whether to automatically scale workers to meet demand. See also the
throttleargument.throttleTRUEto skip auto-scaling if it already happened within the last polling interval.FALSEto auto-scale every timescale()is called. Throttling avoids overburdening themiraidispatcher and other resources.controllerNot used. Included to ensure the signature is compatible with the analogous method of controller groups.
Method map()
Apply a single command to multiple inputs, wait for all tasks to complete, and return the results of all tasks.
Usage
crew_class_controller$map(
command,
iterate,
data = list(),
globals = list(),
substitute = TRUE,
seed = NULL,
algorithm = NULL,
packages = character(0),
library = NULL,
seconds_interval = NULL,
seconds_timeout = NULL,
names = NULL,
save_command = NULL,
error = "stop",
warnings = TRUE,
verbose = interactive(),
scale = TRUE,
throttle = TRUE,
controller = NULL
)Arguments
commandLanguage object with R code to run.
iterateNamed list of vectors or lists to iterate over. For example, to run function calls
f(x = 1, y = "a")andf(x = 2, y = "b"), setcommandtof(x, y), and setiteratetolist(x = c(1, 2), y = c("a", "b")). The individual function calls are evaluated asf(x = iterate$x[[1]], y = iterate$y[[1]])andf(x = iterate$x[[2]], y = iterate$y[[2]]). All the elements ofiteratemust have the same length. If there are any name conflicts betweeniterateanddata,iteratetakes precedence.dataNamed list of constant local data objects in the evaluation environment. Objects in this list are treated as single values and are held constant for each iteration of the map.
globalsNamed list of constant objects to temporarily assign to the global environment for each task. This list should include any functions you previously defined in the global environment which are required to run tasks. See the
reset_globalsargument ofcrew_controller_local(). Objects in this list are treated as single values and are held constant for each iteration of the map.substituteLogical of length 1, whether to call
base::substitute()on the supplied value of thecommandargument. IfTRUE(default) thencommandis quoted literally as you write it, e.g.push(command = your_function_call()). IfFALSE, thencrewassumescommandis a language object and you are passing its value, e.g.push(command = quote(your_function_call())).substitute = TRUEis appropriate for interactive use, whereassubstitute = FALSEis meant for automated R programs that invokecrewcontrollers.seedInteger of length 1 with the pseudo-random number generator seed to set for the evaluation of the task. Passed to the
seedargument ofset.seed()if notNULL. Ifalgorithmandseedare bothNULL, then the random number generator defaults to the recommended widely spaced worker-specific L'Ecuyer streams as supported bymirai::nextstream(). Seevignette("parallel", package = "parallel")for details.algorithmInteger of length 1 with the pseudo-random number generator algorithm to set for the evaluation of the task. Passed to the
kindargument ofRNGkind()if notNULL. Ifalgorithmandseedare bothNULL, then the random number generator defaults to the recommended widely spaced worker-specific L'Ecuyer streams as supported bymirai::nextstream(). Seevignette("parallel", package = "parallel")for details.packagesCharacter vector of packages to load for the task.
libraryLibrary path to load the packages. See the
lib.locargument ofrequire().seconds_intervalDeprecated on 2025-01-17 (
crewversion 0.10.2.9003). Instead, theseconds_intervalargument passed tocrew_controller_group()is used asseconds_maxin acrew_throttle()object which orchestrates exponential backoff.seconds_timeoutOptional task timeout passed to the
.timeoutargument ofmirai::mirai()(after converting to milliseconds).namesOptional character string, name of the element of
iteratewith names for the tasks. Ifnamesis supplied, theniterate[[names]]must be a character vector.save_commandDeprecated on 2025-01-22 (
crewversion 0.10.2.9004). The command is always saved now.errorCharacter of length 1, choice of action if a task was not successful. Possible values:
"stop": throw an error in the main R session instead of returning a value. In case of an error, the results from the last erroredmap()are in theerrorfield of the controller, e.g.controller_object$error. To reduce memory consumption, setcontroller_object$error <- NULLafter you are finished troubleshooting."warn": throw a warning. This allows the return value with all the error messages and tracebacks to be generated."silent": do nothing special. NOTE: the only kinds of errors considered here are errors at the R level. A crashed tasks will return a status of"crash"in the output and not trigger an error inmap()unlesscrashes_maxis reached.
warningsLogical of length 1, whether to throw a warning in the interactive session if at least one task encounters an error.
verboseLogical of length 1, whether to print to a progress bar as tasks resolve.
scaleLogical, whether to automatically scale workers to meet demand. See also the
throttleargument.throttleTRUEto skip auto-scaling if it already happened within the last polling interval.FALSEto auto-scale every timescale()is called. Throttling avoids overburdening themiraidispatcher and other resources.controllerNot used. Included to ensure the signature is compatible with the analogous method of controller groups.
Method pop()
Pop a completed task from the results data frame.
Usage
crew_class_controller$pop(
scale = TRUE,
collect = NULL,
throttle = TRUE,
error = NULL,
controllers = NULL
)Arguments
scaleLogical of length 1, whether to automatically call
scale()to auto-scale workers to meet the demand of the task load. Scaling up onpop()may be important for transient or nearly transient workers that tend to drop off quickly after doing little work. See also thethrottleargument.collectDeprecated in version 0.5.0.9003 (2023-10-02).
throttleTRUEto skip auto-scaling if it already happened within the last polling interval.FALSEto auto-scale every timescale()is called. Throttling avoids overburdening themiraidispatcher and other resources.errorNULLor character of length 1, choice of action if the popped task threw an error. Possible values:"stop": throw an error in the main R session instead of returning a value."warn": throw a warning.NULLor"silent": do not react to errors. NOTE: the only kinds of errors considered here are errors at the R level. A crashed tasks will return a status of"crash"in the output and not trigger an error inpop()unlesscrashes_maxis reached.
controllersNot used. Included to ensure the signature is compatible with the analogous method of controller groups.
Returns
If there is no task to collect, return NULL. Otherwise,
return a one-row tibble with the following columns.
name: the task name.command: a character string with the R command.result: a list containing the return value of the R command.NAif the task failed.status: a character string."success"if the task succeeded,"cancel"if the task was canceled with thecancel()controller method,"crash"if the worker running the task exited before it could complete the task, or"error"for any other kind of error.error: the first 2048 characters of the error message if the task status is not"success",NAotherwise. Messages for crashes and cancellations are captured here alongside ordinary R-level errors.code: an integer code denoting the specific exit status:0for successful tasks,-1for tasks with an error in the R command of the task, and another positive integer with an NNG status code if there is an error at the NNG/nanonextlevel.nanonext::nng_error()can interpret these codes.trace: the first 2048 characters of the text of the traceback if the task threw an error,NAotherwise.warnings: the first 2048 characters. of the text of warning messages that the task may have generated,NAotherwise.seconds: number of seconds that the task ran.seed: the single integer originally supplied topush(),NAotherwise. The pseudo-random number generator state just prior to the task can be restored usingset.seed(seed = seed, kind = algorithm), whereseedandalgorithmare part of this output.algorithm: name of the pseudo-random number generator algorithm originally supplied topush(),NAotherwise. The pseudo-random number generator state just prior to the task can be restored usingset.seed(seed = seed, kind = algorithm), whereseedandalgorithmare part of this output.controller: name of thecrewcontroller where the task ran.worker: name of thecrewworker that ran the task.
Method collect()
Pop all available task results and return them in a tidy
tibble.
Usage
crew_class_controller$collect(
scale = TRUE,
throttle = TRUE,
error = NULL,
controllers = NULL
)Arguments
scaleLogical of length 1, whether to automatically call
scale()to auto-scale workers to meet the demand of the task load.throttleTRUEto skip auto-scaling if it already happened within the last polling interval.FALSEto auto-scale every timescale()is called. Throttling avoids overburdening themiraidispatcher and other resources.errorNULLor character of length 1, choice of action if the popped task threw an error. Possible values: *"stop": throw an error in the main R session instead of returning a value. *"warn": throw a warning. *NULLor"silent": do not react to errors. NOTE: the only kinds of errors considered here are errors at the R level. A crashed tasks will return a status of"crash"in the output and not trigger an error incollect()unlesscrashes_maxis reached.controllersNot used. Included to ensure the signature is compatible with the analogous method of controller groups.
Method wait()
Wait for tasks.
Usage
crew_class_controller$wait(
mode = "all",
seconds_interval = NULL,
seconds_timeout = Inf,
scale = TRUE,
throttle = TRUE,
controllers = NULL
)Arguments
modeCharacter string, name of the waiting condition.
wait(mode = "all")waits until all tasks in themiraicompute profile resolve, andwait(mode = "one")waits until at least one task is available topush()orcollect()from the controller. The former still works if the controller is not the only means of submitting tasks to the compute profile, whereas the latter assumes only the controller submits tasks.seconds_intervalDeprecated on 2025-01-17 (
crewversion 0.10.2.9003). Instead, theseconds_intervalargument passed tocrew_controller_group()is used asseconds_maxin acrew_throttle()object which orchestrates exponential backoff.seconds_timeoutTimeout length in seconds waiting for tasks.
scaleLogical, whether to automatically call
scale()to auto-scale workers to meet the demand of the task load. See also thethrottleargument.throttleTRUEto skip auto-scaling if it already happened within the last polling interval.FALSEto auto-scale every timescale()is called. Throttling avoids overburdening themiraidispatcher and other resources.controllersNot used. Included to ensure the signature is compatible with the analogous method of controller groups.
Details
The wait() method blocks the calling R session
until the condition in the mode argument is met.
During the wait, wait() iteratively auto-scales the workers.
Returns
A logical of length 1, invisibly.
wait(mode = "all") returns TRUE if all tasks in the mirai
compute profile have resolved (FALSE otherwise).
wait(mode = "one") returns TRUE if the controller is ready
to pop or collect at least one resolved task (FALSE otherwise).
wait(mode = "one") assumes all
tasks were submitted through the controller and not by other means.
Method push_backlog()
Push the name of a task to the backlog.
Method pop_backlog()
Pop the task names from the head of the backlog which can be pushed without saturating the controller.
Method summary()
Summarize the collected tasks of the controller.
Arguments
controllersNot used. Included to ensure the signature is compatible with the analogous method of controller groups.
Returns
A data frame of cumulative summary statistics on the tasks
collected through pop() and collect().
It has one row and the following columns:
controller: name of the controller.seconds: total number of runtime in seconds.tasks: total number of tasks collected.success: total number of collected tasks that did not crash or error.error: total number of tasks with errors, either in the R code of the task or an NNG-level error that is not a cancellation or crash.crash: total number of crashed tasks (where the worker exited unexpectedly while it was running the task).cancel: total number of tasks interrupted with thecancel()controller method.warning: total number of tasks with one or more warnings.
Method cancel()
Cancel one or more tasks.
Usage
crew_class_controller$cancel(names = character(0L), all = FALSE)Method pids()
Deprecated on 2025-08-26 in crew version 1.2.1.9005.
Examples
if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) {
client <- crew_client()
launcher <- crew_launcher_local()
controller <- crew_controller(client = client, launcher = launcher)
controller$start()
controller$push(name = "task", command = sqrt(4))
controller$wait()
controller$pop()
controller$terminate()
}
## ------------------------------------------------
## Method `crew_class_controller$new`
## ------------------------------------------------
if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) {
client <- crew_client()
launcher <- crew_launcher_local()
controller <- crew_controller(client = client, launcher = launcher)
controller$start()
controller$push(name = "task", command = sqrt(4))
controller$wait()
controller$pop()
controller$terminate()
}