Skip to contents

crew is a distributed computing framework with a centralized interface and auto-scaling. A crew controller is an object in R which accepts tasks, returns results, and launches workers. Workers can be local processes, jobs on traditional clusters such as SLURM, or jobs on cloud services such as AWS Batch, depending on the launcher plugin of the controller.

Tasks vs workers

A task is a piece of R code, such as an expression or a function call. A worker is a non-interactive R process that runs one or more tasks. When tasks run on workers, the local R session is free and responsive, and work gets done faster. For example, this vignette shows how crew and mirai work together to speed up Shiny apps.

How to use crew

First, create a controller object to manage tasks and workers.

library(crew)
controller <- crew_controller_local(
  name = "example",
  workers = 2,
  seconds_idle = 10
)

Next, start the controller to create the mirai client. Later, when you are done with the controller, call controller$terminate() to clean up your resources.

controller$start()

Use push() to submit a new task and pop() to return a completed task.

controller$push(name = "get pid", command = ps::ps_pid())

As a side effect, methods push(), pop(), and scale() also launch workers to run the tasks. If your controller uses transient workers and has a backlog of tasks, you may need to loop over pop() or scale() multiple times to make sure enough workers are always available.

controller$pop() # No workers started yet and the task is not done.
#> NULL

task <- controller$pop() # Worker started, task complete.
task
#> # A tibble: 1 × 12
#>   name    command result seconds  seed algorithm error trace warnings
#>   <chr>   <chr>   <list>   <dbl> <int> <chr>     <chr> <chr> <chr>
#> 1 get pid NA      <int>        0    NA NA        NA    NA    NA
#> # ℹ 3 more variables: launcher <chr>, worker <int>, instance <chr>

Alternatively, wait() is a loop that repeatedly checks tasks and launches workers until all tasks complete.

controller$wait(mode = "all")

The return value of the task is in the result column.

task$result[[1]] # return value of the task
#> [1] 69631

Here is the full list of output in the task object returned by pop().

  • name: the task name if given.
  • command: a character string with the R command if save_command was set to TRUE in push().
  • result: a list containing the return value of the R command.
  • seconds: number of seconds that the task ran.
  • seed: the single integer originally supplied to push(), NA if seed was supplied as NULL.
  • algorithm: name of the pseudo-random number generator algorithm originally supplied to push(), NA if algorithm was supplied as NULL.
  • error: the first 2048 characters of the error message if the task threw an error, NA otherwise.
  • trace: the first 2048 characters of the text of the traceback if the task threw an error, NA otherwise.
  • warnings: the first 2048 characters. of the text of warning messages that the task may have generated, NA otherwise.
  • launcher: name of the crew launcher where the task ran.

If seed and algorithm are both non-missing in the output, then you can recover the pseudo-random number generator state of the task using set.seed(seed = seed, kind = algorithm). However, it is recommended to supply NULL to these arguments in push(), in which case you will observe NA in the outputs. With seed and algorithm both NULL, the random number generator defaults to the recommended widely spaced worker-specific L’Ecuyer streams supported by mirai::nextstream(). See vignette("parallel", package = "parallel") for details.

Synchronous functional programming

The map() method of the controller supports functional programming similar to purrr::map() and clustermq::Q(). The arguments of map() are mostly the same those of push(), but there is a new iterate argument to define the inputs of individual tasks. map() submits a whole collection of tasks, auto-scales the workers, waits for all the tasks to finish, and returns the results in a tibble.

Below, map() submits one task to compute 1 + 2 + 5 + 6 and another task to compute 3 + 4 + 5 + 6. The lists and vectors inside iterate vary from task to task, while the elements of data and globals stay constant across tasks.

results <- controller$map(
  command = a + b + c + d,
  iterate = list(
    a = c(1, 3),
    b = c(2, 4)
  ),
  data = list(c = 5),
  globals = list(d = 6)
)

results
#> # A tibble: 2 × 12
#>   name  command result    seconds  seed algorithm error trace warnings
#>   <chr> <chr>   <list>      <dbl> <int> <chr>     <chr> <chr> <chr>
#> 1 1     NA      <dbl [1]>       0    NA NA        NA    NA    NA
#> 2 2     NA      <dbl [1]>       0    NA NA        NA    NA    NA
#> # ℹ 3 more variables: launcher <chr>, worker <int>, instance <chr>

as.numeric(results$result)
#> [1] 14 18

If at least one task in map() throws an error, the default behavior is to error out in the main session and not return the results, If that happens, the results are available in the controller$error. To return the results instead of setting controller$error, regardless of error status, set error = "warn" or "silent" in map(). To conserve memory, consider setting controller$error <- NULL when you are done troubleshooting.

Asynchronous functional programming

The walk() method is just like map(), but it does not wait for any tasks to complete. Instead, it returns control to the local R session immediately and lets you do other things while the tasks run in the background.

controller$walk(
  command = a + b + c + d,
  iterate = list(
    a = c(1, 3),
    b = c(2, 4)
  ),
  data = list(c = 5),
  globals = list(d = 6)
)

The collect() pops all completed tasks. Put together, walk(), wait(mode = "all"), and collect() have the same overall effect as map().

controller$wait(mode = "all")

controller$collect()
#> # A tibble: 2 × 12
#>   name  command result    seconds  seed algorithm error trace warnings
#>   <chr> <chr>   <list>      <dbl> <int> <chr>     <chr> <chr> <chr>
#> 1 1     NA      <dbl [1]>       0    NA NA        NA    NA    NA
#> 2 2     NA      <dbl [1]>       0    NA NA        NA    NA    NA
#> # ℹ 3 more variables: launcher <chr>, worker <int>, instance <chr>

However, there are subtle differences between the synchronous and asynchronous functional programming methods:

  1. map() requires an empty controller to start with (no prior tasks). But with walk(), the controller can have any number of running or unpopped tasks beforehand.
  2. wait() does not show a progress bar because it would be misleading if there are a lot of prior tasks. Because map() requires the controller to be empty initially (i.e. (1)), it shows a progress bar while correctly representing the amount of work left to do.

Summaries

The controller summary shows how many tasks each worker ran, how many total seconds it spent running tasks, and how many tasks threw warnings and errors.

controller$summary()
#> # A tibble: 2 × 6
#>   controller worker tasks seconds errors warnings
#>   <chr>       <int> <int>   <dbl>  <int>    <int>
#> 1 example         1     2   0.001      0        0
#> 2 example         2     1   0          0        0

The launcher summary counts the number of times each worker was launched, and it shows the total number of assigned and completed tasks from all past terminated instances of each worker. In addition, it shows whether the current worker instance was actively connected (“online”) or had connected at some point during its life cycle (“discovered”) as of the last call to controller$launcher$tally().

controller$launcher$summary()
#> # A tibble: 2 × 6
#>   worker launches online discovered assigned complete
#>    <int>    <int> <lgl>  <lgl>         <int>    <int>
#> 1      1        2 TRUE   TRUE              0        0
#> 2      2        1 TRUE   TRUE              0        0

Finally, the client summary shows up-to-date worker status from mirai::daemons().

controller$client$summary()
#> # A tibble: 2 × 6
#>   worker online instances assigned complete socket
#>    <int> <lgl>      <int>    <int>    <int> <chr>
#> 1      1 FALSE          1        2        2 ws://10.0.0.32:58685/1/15e07250…
#> 2      2 FALSE          1        1        1 ws://10.0.0.32:58685/2/cb45b3d4…

Resources

crew utilizes background processes, and it may consume a lot of memory for big data workloads. A common cause of crashes is running out of computer memory. If you are running crew in a targets pipeline (as explained here in the targets user manual), consider setting storage = "worker" and retrieval = "worker in tar_option_set() to minimize memory consumption of the local processes (see also the performance chapter).

As of crew version 0.9.5.9007, you monitor resources by supplying a log file path to the log_resources argument of the controller. That way, normal controller methods write to the log as a side effect every seconds_interval seconds. The output file is a comma-separated values (CSV) file which can be read into R with readr::read_csv().

Each new observation in the log file comes from the resources() client method:

controller$client$resources()
#> # A tibble: 2 × 5
#>   name         pid status        rss time                   
#> * <chr>      <int> <chr>       <dbl> <chr>                  
#> 1 client      6821 running 319029248 2024-08-05 08:26:28 EDT
#> 2 dispatcher  7168 running  24985600 2024-08-05 08:26:28 EDT

rss stands for “resident set size”, and it is the total random access memory (RAM) consumed by the process, including shared libraries that may also be in use by different processes. pid is the process ID, and time is a character string with the time stamp of when the data was recorded. (Use as.POSIXct() to convert time to a date type, e.g. for visualization.)

In the output above, "client" refers to the local R process running the controller, and "dispatcher" refers to the mirai dispatcher process, a special background R process which sends tasks to workers. If the output of resources() does not include the dispatcher, it means the dispatcher is not running. (Either there was a crash, possibly due to maxing out memory, or you did not start the controller yet.)

The workers is not included in the output of resources() because they may be running on different computers, and monitoring their resource consumption would require case-by-case plugin-specific approaches.

Termination

Call terminate() on the controller after you finish using it. terminate() tries to close the the mirai dispatcher and any workers that may still be running. It is important to free up these resources.

controller$terminate()

The mirai dispatcher process should exit on its own, but if not, you can manually terminate the process with ps::ps_kill(p = controller$client$dispatcher) or call crew_clean() to terminate any dispatchers from current or previous R sessions.

crew_clean()
#> nothing to clean up

Monitoring local processes

A crew controller creates different types of local processes. These include:

  • Dispatchers: every controller has a special local process called a “dispatcher”. mirai needs this process to orchestrate tasks.
  • Workers: the R processes that crew launches to run tasks. These may be local processes as in the case of crew_controller_local(), or they may be processes on different computers if you are using a third-party launcher plugin like crew.cluster or crew.aws.batch. launches processes.
  • Daemons: R processes created by mirai outside of crew to run tasks. Such processes may spawn automatically if you set the processes argument of e.g. crew.aws.batch::crew_controller_aws_batch() to a positive integer.

Usually these processes terminate themselves when the parent R session exits or the controller terminates, but under rare circumstances they may continue running. The “local monitor” in crew makes it easy to list and terminate any of these processes which may be running on your local computer. Example:

monitor <- crew_monitor_local()
monitor$dispatchers() # List PIDs of all local {mirai} dispatcher processes.
#> [1] 31215
monitor$daemons()
#> integer(0)
monitor$workers()
#> [1] 57001 57002
monitor$terminate(pid = c(57001, 57002))
monitor$workers()
#> integer(0)

crew_monitor_local() only manages processes running on your local computer. To manage crew workers running on different computers, such as SLURM or AWS Batch, please familiarize yourself with the given computing platform, and consider using the monitor objects in the relevant third-party plugin packages such as crew.cluster or crew.aws.batch. Example: https://wlandau.github.io/crew.aws.batch/index.html#job-management.

Tuning and auto-scaling

As explained above, push(), pop(), and wait() launch new workers to run tasks. The number of new workers depends on the number of tasks at the time. In addition, workers can shut themselves down as work completes. In other words, crew automatically raises and lowers the number of workers in response to fluctuations in the task workload.

The most useful arguments for down-scaling, in order of importance, are:

  1. seconds_idle: shut down a worker if it spends too long waiting for a task.
  2. tasks_max: shut down a worker after it completes a certain number of tasks.
  3. seconds_wall: soft wall time of a worker.

Please tune these these arguments to achieve the desired balance for auto-scaling. The two extremes of auto-scaling are clustermq-like persistent workers and future-like transient workers, and each is problematic in its own way.

  1. Persistent workers: a persistent worker launches once, typically runs many tasks, and stays running for the entire lifetime of the controller. Persistent workers minimize overhead and quickly complete large numbers of short tasks. However, they risk spending too much time in an idle state if there are no tasks to run. Excessive idling wastes resources, which could impact your colleagues on a shared cluster or drive up costs on Amazon Web Services.
  2. Transient workers: a transient worker terminates as soon as it completes a single task. Each subsequent task requires a new transient worker to run it. Transient workers avoid excessive idling, but frequent worker launches cause significant overhead and slows down the computation as a whole.

Asynchronous management of workers

Some launchers support local processes to launch and terminate workers asynchronously. For example, a cloud-based launcher may need to make HTTP requests to launch and terminate workers on e.g. AWS Batch, and these time-consuming requests should happen in the background. Controllers that support this will have a processes argument to specify the number of local R processes to churn through worker launches and terminations. Set processes = NULL to disable async, which can be helpful for troubleshooting.