CRAN status check codecov lint

In computationally demanding analysis projects, statisticians and data scientists asynchronously deploy long-running tasks to distributed systems, ranging from traditional clusters to cloud services. The NNG-powered mirai R package is a sleek and sophisticated scheduler that efficiently processes these intense workloads. The crew package extends mirai with a unifying interface for third-party worker launchers. Inspiration also comes from packages future, rrq, clustermq, and batchtools.

Installation

For crew, it is recommended to use nanonext version 0.8.3.9010 or higher and mirai version 0.8.7.9013 or higher. If the latest CRAN releases are older, then you can install the development versions from R-universe.

install.packages("nanonext", repos = "https://shikokuchuo.r-universe.dev")
install.packages("mirai", repos = "https://shikokuchuo.r-universe.dev")

There are multiple ways to install crew itself, and both the latest release and the development version are available.

Type Source Command
Release CRAN install.packages("crew")
Development GitHub remotes::install_github("wlandau/crew")
Development R-universe install.packages("crew", repos = "https://wlandau.r-universe.dev")

Documentation

Please see https://wlandau.github.io/crew/ for documentation, including a full function reference and usage tutorial vignettes.

Plugins

crew lets you write custom launchers for different types of workers that connect over the local network. This flexibility can extend crew to platforms like SLURM, Sun Grid Engine, AWS Batch, and Kubernetes. See the plugin vignette to learn how to create a launcher plugin. The following packages support ready-to-use plugins.

  • crew.cluster: traditional high-performance computing clusters such as Sun Grid Engine (SGE).

Usage

First, create a controller object. Thanks to the powerful features in mirai, crew_controller_local() allows several ways to customize the way workers are launched and the conditions under which they time out. For example, arguments tasks_max and seconds_idle allow for a smooth continuum between fully persistent workers and fully transient workers.

library(crew)
controller <- crew_controller_local(seconds_idle = 10)

The start() method starts a local mirai client and dispatcher process to listen to workers that dial in into websockets on the local network.

controller$start()

The summary() method shows metadata on workers and tasks. Use the columns argument to select a subset of columns in the output. The following table has one row per worker, and each column is a summary metric on the popped tasks (tasks which completed and were retrieved with pop()).

controller$summary(columns = starts_with("popped_"))
#> # A tibble: 2 × 4
#>   popped_tasks popped_seconds popped_errors popped_warnings
#>          <int>          <dbl>         <int>           <int>
#> 1            0              0             0               0
#> 2            0              0             0               0

Use the push() method to submit a task.

controller$push(name = "get worker process ID", command = ps::ps_pid())

Use pop() to get the result of a completed task. pop() returns NULL if there is no result yet.

controller$pop()
#> NULL

Even if you submitted a task, crew may not have launched a worker yet. This is because crew uses its own version of throttling for robustness and efficiency. In other words, push() and pop() defer auto-scaling for a short window after the original request. For best results, either

  1. Continue to loop over frequent calls to push() and pop(), or
  2. Call wait().

The wait() method blocks the R session, repeatedly scales workers, and collects completed tasks.

controller$wait(mode = "all")

When a result is available, pop() will retrieve it.

out <- controller$pop()

The result is a monad with the result and its metadata. Even if the command of the task throws an error, it will still return the same kind of monad.

out
#> # A tibble: 1 × 11
#>   name         command result seconds   seed error trace warni…¹ launc…² worker insta…³
#>   <chr>        <chr>   <list>   <dbl>  <int> <chr> <chr> <chr>   <chr>    <int> <chr>  
#> 1 get worker … ps::ps… <int>        0 1.56e8 NA    NA    NA      79e71c…      1 7686b2…
#> # … with abbreviated variable names ¹​warnings, ²​launcher, ³​instance

The return value of the command is available in the result column. In our case, it is the process ID of the parallel worker that ran it, as reported by ps::ps_pid().

out$result[[1]] # process ID of the parallel worker reported by the task
#> [1] 69631

Since it ran on a parallel worker, it is different from the process ID of the local R session.

ps::ps_pid() # local R session process ID
#> [1] 69523

Continue the above process of asynchronously submitting and collecting tasks until your workflow is complete. You may periodically inspect different columns from the summary() method.

controller$summary(columns = starts_with("tasks"))
#> # A tibble: 2 × 2
#>   tasks_assigned tasks_complete
#>            <int>          <int>
#> 1              1              1
#> 2              0              0
controller$summary(columns = starts_with("popped"))
#> # A tibble: 2 × 4
#>   popped_tasks popped_seconds popped_errors popped_warnings
#>          <int>          <dbl>         <int>           <int>
#> 1            1              0             0               0
#> 2            0              0             0               0

When you are done, terminate the controller to close any workers still running, close the mirai dispatcher process, and free the TCP port.

controller$terminate()

Efficiency and resources

Adding more workers might speed up your workflow, but not always. Beyond a certain point, the efficiency gains will diminish, and the extra workers will have nothing to do. With proper configuration, you can find the right balance.

As mentioned above, the push(), pop(), and wait() methods of the controller launch new workers automatically in response to changing demand. By default, these workers stay running until controller$terminate(). However, you can customize the controller to scale down when circumstances allow, which helps help avoid wasting resources1 The most useful arguments for down-scaling, in order of importance, are:

  1. seconds_idle: automatically shut down a worker if it spends too long waiting for a target.
  2. tasks_max: maximum number of tasks a worker can run before shutting down.
  3. seconds_wall: soft wall time of a worker.

On the other hand, it is not always helpful to eagerly down-scale workers. Because the workload can fluctuate rapidly, some workers may quit and relaunch so often that it creates noticeable overhead.

Fortunately, you can investigate auto-scaling and configuration issues empirically. Simply run your workflow and then look at the output from controller$summary().

controller <- crew_controller_local(workers = 10, seconds_idle = 3)
controller$start()
for (index in seq_len(1000)) {
  controller$push(command = TRUE)
}
controller$wait()
result <- "start collecting results"
while (!is.null(result)) {
  result <- controller$pop()
}
controller$summary(contains(c("worker_index", "worker_launches", "popped_tasks", "popped_seconds")))
#> # A tibble: 10 × 4
#>    worker_index worker_launches popped_tasks popped_seconds
#>           <int>           <int>        <int>          <dbl>
#>  1            1               1          236          0.005
#>  2            2               1          110          0.004
#>  3            3               1           96          0.001
#>  4            4               1           81          0.005
#>  5            5               1           89          0.002
#>  6            6               1           77          0.003
#>  7            7               1           78          0.005
#>  8            8               1           75          0.001
#>  9            9               1           78          0.008
#> 10           10               1           80          0.002

Each worker only launched once and moved through tasks quickly, which is a good sign for a batch of 1000 instantaneous independent tasks. However, the first worker completed many more tasks than the others, even though all workers should share the load equally in this particular case. It would be helpful to see if the same workload runs just as fast with fewer workers.

Risks

The crew package has unavoidable risk. It is your responsibility as the user to safely use crew. Please read the final clause of the software license.

Security

crew currently uses unencrypted TCP connections for transactions with workers inside a trusted local network. In a compromised network, an attacker can potentially access and exploit sensitive resources. It is your responsibility to assess the sensitivity and vulnerabilities of your computing environment and make sure your network is secure.

Ports

crew uses one TCP port per controller. TCP ports range from 0 to 65535, and only around 16000 of these ports are considered ephemeral or dynamic, so please be careful not to run too many controllers simultaneously if you are running R on a machine you share with other people (such as the login node of a computing cluster). If you are running a controller group please add only a small number of controllers to the group. The terminate() method of the controller and crew_session_terminate() should free these ports again for other processes to use.

Zombies

The crew package launches external R processes:

  1. Worker processes to run tasks, possibly on different computers on the local network, and
  2. A local mirai dispatcher process to schedule the tasks.

To the best of its ability, crew tries to only launch the processes it needs, and it relies on mirai to clean up these processes when the work is done. However, sometimes it is still possible that too many workers may run concurrently, and it is still possible that either the workers or the mirai dispatcher may run too long or hang. In large-scale workflows, these accidents can have egregious consequences. Depending on the launcher type, these consequences can range from overburdening your local machine or cluster, to incurring unexpectedly high costs on Amazon Web Services.

Workers

mirai robustly terminates workers as appropriate, but this safeguard cannot protect against the risk of a worker that gets stuck in a crashed state before it can even start R. To be absolutely sure that workers do not run indefinitely if something goes wrong, please learn how to find and terminate workers on the specific computing platform where they run. And if you are writing a custom launcher plugin, it is recommended (although not strictly required) to write a custom terminate_worker() method.

Workers may run on different computing platforms, depending on the type of launcher you choose. Each type of launcher connects to a different computing platform, and each platform has a different way of terminating workers. For example, the local process launcher creates R processes on your local machine, which you can find and terminate with ps::ps()/ps::ps_kill() or htop. For a SLURM launcher, you need squeue to find workers and scancel to terminate them. For an Amazon Web Services launcher, please use the AWS web console or CloudWatch.

Scheduling

Depending on user settings, crew workers may run for the entire length of the analysis pipeline, or they may exit if they idle too long or complete a certain number of tasks. crew re-launches workers if there are more unfinished tasks in the queue than active workers to run them at a given snapshot in time. This kind of auto-scaling does not dedicate any specific worker to any specific task, and it does not perform well when workers exit too quickly due to a small value of seconds_idle. Please set seconds_idle to a generous enough value for workers to accept work, and please use tasks_max to specify short-lived workers such as single-task transient workers (tasks_max = 1).

Dispatcher

The mirai dispatcher is designed to gracefully exit when you call terminate() on the controller object or when you restart your R session. However, if you ever need to shut down the dispatcher manually, you can find the process ID using the controller object, then use ps::ps_kill() to terminate the process.

controller$router$dispatcher
#> [1] 86028
handle <- ps::ps_handle(pid = 86028L)
ps::ps_is_running(handle)
#> [1] TRUE
ps::ps_kill(handle)
ps::ps_is_running(handle)
#> [1] FALSE

Similar work

  • mirai: a powerful R framework for asynchronous tasks built on NNG. The purpose of crew is to extend mirai to different computing platforms for distributed workers.
  • rrq: a task queue for R based on Redis.
  • rrqueue: predecessor of rrq.
  • clustermq: sends R function calls as jobs to computing clusters.
  • future: a unified interface for asynchronous evaluation of single tasks and map-reduce calls on a wide variety of backend technologies.
  • batchtools: tools for computation on batch systems.
  • targets: a Make-like pipeline tool for R.
  • later: delayed evaluation of synchronous tasks.
  • promises: minimally-invasive asynchronous programming for a small number of tasks within Shiny apps.
  • callr: initiates R process from other R processes.
  • High-performance computing CRAN task view.

Thanks

The crew package incorporates insightful ideas from the following people.

Code of Conduct

Please note that the crew project is released with a Contributor Code of Conduct. By contributing to this project, you agree to abide by its terms.

Citation

To cite package ‘crew’ in publications use:

  Landau WM (2023). _crew: A Distributed Worker Launcher Framework_.
  https://wlandau.github.io/crew/, https://github.com/wlandau/crew.

A BibTeX entry for LaTeX users is

  @Manual{,
    title = {crew: A Distributed Worker Launcher Framework},
    author = {William Michael Landau},
    year = {2023},
    note = {https://wlandau.github.io/crew/, https://github.com/wlandau/crew},
  }

  1. Automatic down-scaling also helps comply with wall time restrictions on shared computing clusters. See the arguments of crew_controller_local() for details.↩︎