About
crew lets users write custom launchers
for different types of workers that connect over the local network. The
crew.cluster
package already has plugins for traditional high-performance computing
schedulers (SLURM, SGE, LSF,
and PBS/TORQUE).
How it works
These launcher
plugins need not become part of the crew package itself.
You can write your plugin in a simple R script, or you write it in a
custom R package that depends on
crew. Published packages with launcher
plugins are powerful extensions that enhance crew for the
entire open-source community. See R
Packages by Hadley Wickham
and Jenny Bryan for how to
write an R package.
Scope
This vignette demonstrates how to write a crew launcher
plugin. It assumes prior familiarity with R6
classes and the computing platform of your plugin.
Implementation
To create your own launcher plugin, write an R6
subclass of crew_class_launcher
with a launch_worker()
method analogous the one in the local
process launcher. launch_worker() must accept the same
arguments as the local
process launch_worker() method, generate a call to crew_worker(),
and then submit a new job or process to run that call.
Network
Each worker that launches must be able to dial into the client over
the local network. The host argument of
crew_client() provides the local IP address, and the
port argument provides the TCP port. The controller helper
function (see below) should expose arguments host and
port in order to solve potential network problems like this
one.
By default, host is the local IP address.
crew assumes the local network is secure. Please take the
time to assess the network security risks of your computing environment.
Use at your own risk.
Example
The following is a custom custom launcher class whose workers are local R processes on Unix-like systems.
custom_launcher_class <- R6::R6Class(
classname = "custom_launcher_class",
inherit = crew::crew_class_launcher,
public = list(
launch_worker = function(call) {
bin <- file.path(R.home("bin"), "Rscript")
processx::process$new(
command = bin,
args = c(self$r_arguments, "-e", call),
cleanup = FALSE
)
}
)
)Inside launch_worker(), the
processx::process$new(command = bin, args = c(self$r_arguments, "-e", call))
line runs the crew_worker()
call in an external R process with the command line arguments from
r_arguments (supplied when the launcher is created). This
process runs in the background, connects back to crew and
mirai over the local network, and accepts the tasks you
push to the controller.
Every launch_worker() method must accept a
call argument. This argument is a text string with an R
function call to crew_worker().
launch_worker() must launch a worker that runs the R code
in call.
To see what the call argument will look like from inside
launch_worker(), create a new launcher and run the
call() method.
library(crew)
launcher <- crew_launcher_local()
launcher$start(url = "tcp://127.0.0.1:57000", profile = "example_profile")
launcher$call()
#> [1] "crew::crew_worker(settings = list(url = \"tcp://127.0.0.1:57000\", dispatcher = TRUE, asyncdial = FALSE, autoexit = 15L, cleanup = FALSE, output = TRUE, maxtasks = Inf, idletime = Inf, walltime = Inf, timerstart = 0L, tlscert = NULL, rs = NULL), controller = \"a28f357a\", options_metrics = crew::crew_options_metrics(path = NULL, seconds_interval = 5))"Batched launches
Some platforms support launching multiple workers from a single
system call. For example, clusters like SLURM and cloud services like
AWS Batch support job arrays. To leverage this feature in
crew, define a method called launch_workers()
(plural) instead of launch_worker() (singular). The former
supersedes the latter when it is user-defined.1 For example:
R6::R6Class(
classname = "slurm_launcher_class",
inherit = crew::crew_class_launcher,
public = list(
launch_workers = function(call, n) {
template <- c(
"#!/bin/bash",
"#SBATCH --array=1-%s",
"module load R",
"Rscript -e '%s'"
)
script <- tempfile()
writeLines(sprintf(template, n, call), script)
system2("sbatch", script, wait = FALSE)
}
)
)Above, call is the same as before: a call to
mirai::daemon() to run a single worker. n is
the number of crew workers (i.e. SLURM jobs) to launch in
the current round of auto-scaling. The body of the function creates a
job script for an array job, then submits the script to the cluster with
sbatch.
Controllers
It is useful to have a helper function that creates controllers with your custom launcher. It should:
- Accept all the same arguments as
crew_controller_local(). - Create a client object using
crew_client(). - Create a launcher object with the
new()method of your custom launcher class. - Create a new controller using
crew_controller(). - Scan the controller for obvious errors using the
validate()method of the controller.
Feel free to borrow from the crew_controller_local()
source code. For packages, you can use the
@inheritParams roxygen2 tag to
inherit the documentation of all the arguments instead of writing it by
hand. You may want to adjust the default arguments based on the
specifics of your platform, especially seconds_launch if
workers take a long time to launch.
#' @title Create a controller with the custom launcher.
#' @export
#' @description Create an `R6` object to submit tasks and
#' launch workers.
#' @inheritParams crew::crew_controller_local
crew_controller_custom <- function(
name = "custom controller name",
workers = 1L,
host = NULL,
port = NULL,
tls = crew::crew_tls(),
serialization = NULL,
profile = crew::crew_random_name(),
seconds_interval = 0.5,
seconds_timeout = 30,
seconds_launch = 30,
seconds_idle = Inf,
seconds_wall = Inf,
tasks_max = Inf,
tasks_timers = 0L,
reset_globals = TRUE,
reset_packages = FALSE,
reset_options = FALSE,
garbage_collection = FALSE,
r_arguments = NULL,
options_metrics = crew::crew_options_metrics(),
crashes_max = 5L,
backup = NULL
) {
client <- crew::crew_client(
host = host,
port = port,
tls = tls,
serialization = serialization,
profile = profile,
seconds_interval = seconds_interval,
seconds_timeout = seconds_timeout
)
launcher <- custom_launcher_class$new(
name = name,
workers = workers,
seconds_interval = seconds_interval,
seconds_timeout = seconds_timeout,
seconds_launch = seconds_launch,
seconds_idle = seconds_idle,
seconds_wall = seconds_wall,
tasks_max = tasks_max,
tasks_timers = tasks_timers,
tls = tls,
r_arguments = r_arguments,
options_metrics = options_metrics
)
controller <- crew::crew_controller(
client = client,
launcher = launcher,
reset_globals = reset_globals,
reset_packages = reset_packages,
reset_options = reset_options,
garbage_collection = garbage_collection,
crashes_max = crashes_max,
backup = backup
)
controller$validate()
controller
}Informal testing
Before you begin testing, please begin monitoring local processes and
remote jobs on your platform. In the case of the above crew
launcher which only creates local processes, it is sufficient to start
htop and filter for R
processes, or launch a new R session to monitor the process table from
ps::ps().
However, for more ambitious launchers that submit workers to e.g. AWS Batch, you may need to open
the CloudWatch
dashboard, then view the AWS billing dashboard after testing.
When you are ready to begin testing, try out the example in the README, but
use your your custom controller helper instead of crew_controller_local().
First, create and start a controller.
Try pushing a task that gets the local IP address and process ID of the worker instance.
controller$push(
name = "get worker IP address and process ID",
command = paste(nanonext::ip_addr()[1], ps::ps_pid())
)Wait for the task to complete and look at the result.
controller$wait()
result <- controller$pop()
result$result[[1]]
#> [1] "192.168.0.2 27336"Please use the result to verify that the task really ran on a worker
as intended. The process ID above should agree with the one from the
handle (except on
Windows because the actual R process may be different from the
Rscript.exe process created first).
controller$launcher$instances$handle[[1]]$get_pid()
#> [1] 27336In addition, please compare the worker IP address to the IP address of the local R session. Since our custom launcher creates local processes, the IP addresses are the same in this case. However, if the worker runs on a different computer (as with theSLURM or AWS Batch launcher) then the worker IP address should be different from the one you get from the local R session.
as.character(nanonext::ip_addr())[1]
#> "192.168.0.2"If you did not set any timeouts or task limits, the worker that ran the task should still be running. The other worker had no tasks, so it did not need to launch.
controller$launcher$instances$handle[[1]]$is_alive()
#> [1] TRUEWhen you are done, either close the local R session or terminate the controller manually.
controller$terminate()Finally, use the process monitoring interface of your computing
platform or operating system
(e.g. crew::crew_monitor_local() if using
crew_controller_local()) to verify that all
crew workers are terminated.
Load testing
If the informal testing succeeded, we recommend you scale up testing to more ambitious scenarios. As one example, you can test that your workers can auto-scale and quickly churn through a large number of tasks.
library(crew)
controller <- crew_controller_custom(
seconds_idle = 2L,
workers = 2L
)
controller$start()
# Push 100 tasks
for (index in seq_len(100L)) {
name <- paste0("task_", index)
controller$push(name = name, command = index, data = list(index = index))
message(paste("push", name))
}
# Wait for the tasks to complete.
controller$wait(mode = "all")
# Do the same for 100 more tasks.
for (index in (seq_len(100L) + 100L)) {
name <- paste0("task_", index)
controller$push(name = name, command = index, data = list(index = index))
message(paste("push", name))
}
controller$wait(mode = "all")
# Collect the results.
results <- controller$collect()
# Check the results
all(sort(unlist(results$result)) == seq_len(200L))
#> [1] TRUE
# View the controller summary.
controller$summary()
# Terminate the controller.
controller$terminate()
# Now outside crew, verify that all the
# crew workers successfully terminated.Managing workers
There are safeguards to make sure workers terminate when the
controller terminates or the parent R session exits. However, these
safeguards are based on network connections and operating system
signals, which are not guaranteed to work in all cases. It is the user’s
responsibility to monitor and manage crew workers. To make
the user’s job easier, it is good practice to implement job management
utilities to go along with your launcher plugin. The crew
ecosystem implements “monitor” objects such as crew_monitor_local()
and crew_monitor_aws_batch()
to help users list and terminate workers, as well as view logs.
The essence of the local monitor is copied below:
crew_monitor_local <- function() {
crew_class_monitor_local$new()
}
crew_class_monitor_local <- R6::R6Class(
classname = "crew_class_monitor_local",
cloneable = FALSE,
public = list(
workers = function() {
crew_monitor_pids(pattern = "crew::crew_worker")
},
terminate = function(pids) {
lapply(as.integer(pids), crew::crew_terminate_process)
}
)
)
crew_monitor_pids <- function(pattern) {
processes <- ps::ps()
commands <- map(
processes$ps_handle,
~tryCatch(ps::ps_cmdline(.x), error = function(condition) "")
)
filter <- grepl(pattern = pattern, x = as.character(commands), fixed = TRUE)
as.integer(sort(processes$pid[filter]))
}Example usage:
monitor <- crew_monitor_local()
monitor$workers()
#> [1] 57001 57002
monitor$terminate(pids = c(57001, 57002))
monitor$workers()
#> integer(0)