In computationally demanding analysis projects, statisticians and data scientists asynchronously deploy long-running tasks to distributed systems, ranging from traditional clusters to cloud services. The NNG-powered mirai
R package is a sleek and sophisticated scheduler that efficiently processes these intense workloads. The crew
package extends mirai
with a unifying interface for third-party worker launchers. Inspiration also comes from packages future
, rrq
, clustermq
, and batchtools
.
For crew
, it is recommended to use nanonext
version 0.8.3.9010
or higher and mirai
version 0.8.7.9013
or higher. If the latest CRAN releases are older, then you can install the development versions from R-universe.
install.packages("nanonext", repos = "https://shikokuchuo.r-universe.dev")
install.packages("mirai", repos = "https://shikokuchuo.r-universe.dev")
There are multiple ways to install crew
itself, and both the latest release and the development version are available.
Type | Source | Command |
---|---|---|
Release | CRAN | install.packages("crew") |
Development | GitHub | remotes::install_github("wlandau/crew") |
Development | R-universe | install.packages("crew", repos = "https://wlandau.r-universe.dev") |
Please see https://wlandau.github.io/crew/ for documentation, including a full function reference and usage tutorial vignettes.
crew
lets you write custom launchers for different types of workers that connect over the local network. This flexibility can extend crew
to platforms like SLURM, Sun Grid Engine, AWS Batch, and Kubernetes. See the plugin vignette to learn how to create a launcher plugin. The following packages support ready-to-use plugins.
crew.cluster
: traditional high-performance computing clusters such as Sun Grid Engine (SGE).First, create a controller object. Thanks to the powerful features in mirai
, crew_controller_local()
allows several ways to customize the way workers are launched and the conditions under which they time out. For example, arguments tasks_max
and seconds_idle
allow for a smooth continuum between fully persistent workers and fully transient workers.
library(crew)
controller <- crew_controller_local(seconds_idle = 10)
The start()
method starts a local mirai
client and dispatcher process to listen to workers that dial in into websockets on the local network.
controller$start()
The summary()
method shows metadata on workers and tasks. Use the columns
argument to select a subset of columns in the output. The following table has one row per worker, and each column is a summary metric on the popped tasks (tasks which completed and were retrieved with pop()
).
controller$summary(columns = starts_with("popped_"))
#> # A tibble: 2 × 4
#> popped_tasks popped_seconds popped_errors popped_warnings
#> <int> <dbl> <int> <int>
#> 1 0 0 0 0
#> 2 0 0 0 0
Use the push()
method to submit a task.
controller$push(name = "get worker process ID", command = ps::ps_pid())
Use pop()
to get the result of a completed task. pop()
returns NULL
if there is no result yet.
controller$pop()
#> NULL
Even if you submitted a task, crew
may not have launched a worker yet. This is because crew
uses its own version of throttling for robustness and efficiency. In other words, push()
and pop()
defer auto-scaling for a short window after the original request. For best results, either
push()
and pop()
, orwait()
.The wait()
method blocks the R session, repeatedly scales workers, and collects completed tasks.
controller$wait(mode = "all")
When a result is available, pop()
will retrieve it.
out <- controller$pop()
The result is a monad with the result and its metadata. Even if the command of the task throws an error, it will still return the same kind of monad.
out
#> # A tibble: 1 × 11
#> name command result seconds seed error trace warni…¹ launc…² worker insta…³
#> <chr> <chr> <list> <dbl> <int> <chr> <chr> <chr> <chr> <int> <chr>
#> 1 get worker … ps::ps… <int> 0 1.56e8 NA NA NA 79e71c… 1 7686b2…
#> # … with abbreviated variable names ¹warnings, ²launcher, ³instance
The return value of the command is available in the result
column. In our case, it is the process ID of the parallel worker that ran it, as reported by ps::ps_pid()
.
out$result[[1]] # process ID of the parallel worker reported by the task
#> [1] 69631
Since it ran on a parallel worker, it is different from the process ID of the local R session.
ps::ps_pid() # local R session process ID
#> [1] 69523
Continue the above process of asynchronously submitting and collecting tasks until your workflow is complete. You may periodically inspect different columns from the summary()
method.
controller$summary(columns = starts_with("tasks"))
#> # A tibble: 2 × 2
#> tasks_assigned tasks_complete
#> <int> <int>
#> 1 1 1
#> 2 0 0
controller$summary(columns = starts_with("popped"))
#> # A tibble: 2 × 4
#> popped_tasks popped_seconds popped_errors popped_warnings
#> <int> <dbl> <int> <int>
#> 1 1 0 0 0
#> 2 0 0 0 0
When you are done, terminate the controller to close any workers still running, close the mirai
dispatcher process, and free the TCP port.
controller$terminate()
Adding more workers might speed up your workflow, but not always. Beyond a certain point, the efficiency gains will diminish, and the extra workers will have nothing to do. With proper configuration, you can find the right balance.
As mentioned above, the push()
, pop()
, and wait()
methods of the controller launch new workers automatically in response to changing demand. By default, these workers stay running until controller$terminate()
. However, you can customize the controller to scale down when circumstances allow, which helps help avoid wasting resources1 The most useful arguments for down-scaling, in order of importance, are:
seconds_idle
: automatically shut down a worker if it spends too long waiting for a target.tasks_max
: maximum number of tasks a worker can run before shutting down.seconds_wall
: soft wall time of a worker.On the other hand, it is not always helpful to eagerly down-scale workers. Because the workload can fluctuate rapidly, some workers may quit and relaunch so often that it creates noticeable overhead.
Fortunately, you can investigate auto-scaling and configuration issues empirically. Simply run your workflow and then look at the output from controller$summary()
.
controller <- crew_controller_local(workers = 10, seconds_idle = 3)
controller$start()
for (index in seq_len(1000)) {
controller$push(command = TRUE)
}
controller$wait()
result <- "start collecting results"
while (!is.null(result)) {
result <- controller$pop()
}
controller$summary(contains(c("worker_index", "worker_launches", "popped_tasks", "popped_seconds")))
#> # A tibble: 10 × 4
#> worker_index worker_launches popped_tasks popped_seconds
#> <int> <int> <int> <dbl>
#> 1 1 1 236 0.005
#> 2 2 1 110 0.004
#> 3 3 1 96 0.001
#> 4 4 1 81 0.005
#> 5 5 1 89 0.002
#> 6 6 1 77 0.003
#> 7 7 1 78 0.005
#> 8 8 1 75 0.001
#> 9 9 1 78 0.008
#> 10 10 1 80 0.002
Each worker only launched once and moved through tasks quickly, which is a good sign for a batch of 1000 instantaneous independent tasks. However, the first worker completed many more tasks than the others, even though all workers should share the load equally in this particular case. It would be helpful to see if the same workload runs just as fast with fewer workers.
The crew
package has unavoidable risk. It is your responsibility as the user to safely use crew
. Please read the final clause of the software license.
crew
currently uses unencrypted TCP connections for transactions with workers inside a trusted local network. In a compromised network, an attacker can potentially access and exploit sensitive resources. It is your responsibility to assess the sensitivity and vulnerabilities of your computing environment and make sure your network is secure.
crew
uses one TCP port per controller. TCP ports range from 0 to 65535, and only around 16000 of these ports are considered ephemeral or dynamic, so please be careful not to run too many controllers simultaneously if you are running R on a machine you share with other people (such as the login node of a computing cluster). If you are running a controller group please add only a small number of controllers to the group. The terminate()
method of the controller and crew_session_terminate()
should free these ports again for other processes to use.
The crew
package launches external R processes:
mirai
dispatcher process to schedule the tasks.To the best of its ability, crew
tries to only launch the processes it needs, and it relies on mirai
to clean up these processes when the work is done. However, sometimes it is still possible that too many workers may run concurrently, and it is still possible that either the workers or the mirai
dispatcher may run too long or hang. In large-scale workflows, these accidents can have egregious consequences. Depending on the launcher type, these consequences can range from overburdening your local machine or cluster, to incurring unexpectedly high costs on Amazon Web Services.
mirai
robustly terminates workers as appropriate, but this safeguard cannot protect against the risk of a worker that gets stuck in a crashed state before it can even start R. To be absolutely sure that workers do not run indefinitely if something goes wrong, please learn how to find and terminate workers on the specific computing platform where they run. And if you are writing a custom launcher plugin, it is recommended (although not strictly required) to write a custom terminate_worker()
method.
Workers may run on different computing platforms, depending on the type of launcher you choose. Each type of launcher connects to a different computing platform, and each platform has a different way of terminating workers. For example, the local process launcher creates R processes on your local machine, which you can find and terminate with ps::ps()
/ps::ps_kill()
or htop
. For a SLURM launcher, you need squeue
to find workers and scancel
to terminate them. For an Amazon Web Services launcher, please use the AWS web console or CloudWatch.
Depending on user settings, crew
workers may run for the entire length of the analysis pipeline, or they may exit if they idle too long or complete a certain number of tasks. crew
re-launches workers if there are more unfinished tasks in the queue than active workers to run them at a given snapshot in time. This kind of auto-scaling does not dedicate any specific worker to any specific task, and it does not perform well when workers exit too quickly due to a small value of seconds_idle
. Please set seconds_idle
to a generous enough value for workers to accept work, and please use tasks_max
to specify short-lived workers such as single-task transient workers (tasks_max = 1
).
The mirai
dispatcher is designed to gracefully exit when you call terminate()
on the controller object or when you restart your R session. However, if you ever need to shut down the dispatcher manually, you can find the process ID using the controller object, then use ps::ps_kill()
to terminate the process.
controller$router$dispatcher
#> [1] 86028
handle <- ps::ps_handle(pid = 86028L)
ps::ps_is_running(handle)
#> [1] TRUE
ps::ps_kill(handle)
ps::ps_is_running(handle)
#> [1] FALSE
mirai
: a powerful R framework for asynchronous tasks built on NNG. The purpose of crew
is to extend mirai
to different computing platforms for distributed workers.rrq
: a task queue for R based on Redis.rrqueue
: predecessor of rrq
.clustermq
: sends R function calls as jobs to computing clusters.future
: a unified interface for asynchronous evaluation of single tasks and map-reduce calls on a wide variety of backend technologies.batchtools
: tools for computation on batch systems.targets
: a Make-like pipeline tool for R.later
: delayed evaluation of synchronous tasks.promises
: minimally-invasive asynchronous programming for a small number of tasks within Shiny apps.callr
: initiates R process from other R processes.The crew
package incorporates insightful ideas from the following people.
mirai
and nanonext
and graciously accommodated the complicated and demanding feature requests that made crew
possible.rrq
.callr
and wrote an edifying blog post on implementing task queues.workers
prototype, an initial effort that led directly to the current implementation of crew
. crew
would not exist without Kirill’s insights about orchestration models for R processes.future
package ecosystem demonstrates the incredible power of a consistent R interface on top of a varying collection of high-performance computing technologies.clustermq
package supports efficient high-performance computing on traditional clusters, and it demonstrates the value of a central R6
object to manage an entire collection of persistent workers.paws
R package is a powerful interface to Amazon Web Services, and the documentation clearly communicates the capabilities and limitations of AWS to R users.paws
with David Kretch.lambdr
package establishes a helpful pattern to submit and collect AWS Lambda jobs from R.googleCloudStorageR
and googleCloudRunner
, and he started the conversation around helping targets
submit jobs to Google Cloud Run.Please note that the crew
project is released with a Contributor Code of Conduct. By contributing to this project, you agree to abide by its terms.
in publications use:
To cite package ‘crew’
WM (2023). _crew: A Distributed Worker Launcher Framework_.
Landau ://wlandau.github.io/crew/, https://github.com/wlandau/crew.
https
for LaTeX users is
A BibTeX entry
@Manual{,
= {crew: A Distributed Worker Launcher Framework},
title = {William Michael Landau},
author = {2023},
year = {https://wlandau.github.io/crew/, https://github.com/wlandau/crew},
note }
Automatic down-scaling also helps comply with wall time restrictions on shared computing clusters. See the arguments of crew_controller_local()
for details.↩︎