In computationally demanding analysis projects, statisticians and data scientists asynchronously deploy long-running tasks to distributed systems, ranging from traditional clusters to cloud services. The crew.aws.batch
package extends the mirai
-powered ‘crew’ package with a worker launcher plugin for AWS Batch. Inspiration also comes from packages mirai
, future
, rrq
, clustermq
, and batchtools
.
Installation
Type | Source | Command |
---|---|---|
Release | CRAN | install.packages("crew.aws.batch") |
Development | GitHub | remotes::install_github("wlandau/crew.aws.batch") |
Development | R-universe | install.packages("crew.aws.batch", repos = "https://wlandau.r-universe.dev") |
Documentation
Please see https://wlandau.github.io/crew.aws.batch/ for documentation, including a full function reference and usage tutorial.
Prerequisites
crew.aws.batch
launches AWS Batch jobs to run crew
workers. This comes with a set of special requirements:
- Understand AWS Batch and its official documentation.
- Your job definitions must each have Docker-compatible container image with R and
crew.aws.batch
installed. You may wish to inherit from an existing rocker image. - At minimum, for the launcher plugin to work, your IAM policies need permission to submit and terminate jobs. To appropriately monitor jobs, your policies also need permission to list and describe jobs. In addition, managing job definitions as described below requires permission to register, deregister, and describe job definitions. To view CloudWatch logs, you need permission to get log events.
- In the compute environment, the security group must permit all inbound and outbound TCP traffic within itself.1 The controller and the workers must run in this security group so they can communicate within the firewalled local network.2 If your security group ID is
sg-00000
and belongs to VPCvpc-00000
, then your inbound and outbound rules may look something like this:
client <- paws.compute::ec2()
groups <- client$describe_security_groups(GroupIds = "sg-00000")
str(groups$SecurityGroups[[1L]])
#> List of 8
#> $ Description : chr "Allow TCP traffic on ephemeral ports"
#> $ GroupName : chr "self-pointing-group"
#> $ IpPermissions :List of 1
#> ..$ :List of 7
#> .. ..$ FromPort : num 1024
#> .. ..$ IpProtocol : chr "tcp"
#> .. ..$ IpRanges : list()
#> .. ..$ Ipv6Ranges : list()
#> .. ..$ PrefixListIds : list()
#> .. ..$ ToPort : num 65535
#> .. ..$ UserIdGroupPairs:List of 1
#> .. .. ..$ :List of 7
#> .. .. .. ..$ Description : chr "Accept traffic from other jobs in group."
#> .. .. .. ..$ GroupId : chr "sg-00000"
#> .. .. .. ..$ GroupName : chr(0)
#> .. .. .. ..$ PeeringStatus : chr(0)
#> .. .. .. ..$ UserId : chr "CENSORED"
#> .. .. .. ..$ VpcId : chr(0)
#> .. .. .. ..$ VpcPeeringConnectionId: chr(0)
#> $ OwnerId : chr "CENSORED"
#> $ GroupId : chr "sg-00000"
#> $ IpPermissionsEgress:List of 1
#> ..$ :List of 7
#> .. ..$ FromPort : num 1024
#> .. ..$ IpProtocol : chr "tcp"
#> .. ..$ IpRanges : list()
#> .. ..$ Ipv6Ranges : list()
#> .. ..$ PrefixListIds : list()
#> .. ..$ ToPort : num 65535
#> .. ..$ UserIdGroupPairs:List of 1
#> .. .. ..$ :List of 7
#> .. .. .. ..$ Description : chr "Allow traffic to other jobs in group."
#> .. .. .. ..$ GroupId : chr "sg-00000"
#> .. .. .. ..$ GroupName : chr(0)
#> .. .. .. ..$ PeeringStatus : chr(0)
#> .. .. .. ..$ UserId : chr "CENSORED"
#> .. .. .. ..$ VpcId : chr(0)
#> .. .. .. ..$ VpcPeeringConnectionId: chr(0)
#> $ Tags : list()
#> $ VpcId : chr "vpc-00000"
Managing job definitions
Before submitting jobs, AWS Batch requires a job definition to describe the container image and resource requirements. You can do this through the AWS web console, the AWS command line interface (CLI), a software development kit (SDK) like the paws
R package, or the job definition class in crew.aws.batch
. For crew.aws.batch
, first create a job definition object.
definition <- crew_definition_aws_batch(
job_definition = "YOUR_JOB_DEFINITION_NAME",
job_queue = "YOUR_JOB_QUEUE_NAME"
)
The job definition may or may not exist at this point. If it does not exist, you can register with register()
, an oversimplified limited-scope method which creates container-based job definitions with the "awslogs"
log driver (for CloudWatch).3 Below, your container image can be as simple as a Docker Hub identifier (like "alpine:latest:
) or a full URI of an ECR image.4
definition$register(
image = "AWS_ACCOUNT_ID.dkr.ecr.AWS_REGION.amazonaws.com/ECR_REPOSITORY_NAME:IMAGE_TAG",
platform_capabilities = "EC2",
memory_units = "gigabytes",
memory = 8,
cpus = 2
)
#> # A tibble: 1 × 3
#> name revision arn
#> <chr> <int> <chr>
#> 1 YOUR_JOB_DEFINITION_NAME 81 arn:aws:batch:us-east-1:CENSORED:jo…
The describe()
method shows information about current and past revisions of the job definition. Set active
to TRUE
to see just the active revisions.
definition$describe(active = TRUE)
#> # A tibble: 2 × 16
#> name arn revision status type scheduling_priority parameters
#> <chr> <chr> <int> <chr> <chr> <dbl> <list>
#> 1 YOUR_JOB_DEFIN… arn:… 82 active cont… 3 <list [0]>
#> 2 YOUR_JOB_DEFIN… arn:… 81 active cont… 3 <list [0]>
#> # ℹ 9 more variables: retry_strategy <list>, container_properties <list>,
#> # timeout <list>, node_properties <list>, tags <list>,
#> # propagate_tags <lgl>, platform_capabilities <chr>,
#> # eks_properties <list>, container_orchestration_type <chr>
Use deregister()
to deregister a revision of a job definition. If a revision number is not supplied, then it defaults to the greatest active revision number.
definition$deregister()
#> # A tibble: 1 × 16
#> name arn revision status type scheduling_priority parameters
#> <chr> <chr> <int> <chr> <chr> <dbl> <list>
#> 1 YOUR_JOB_DEFIN… arn:… 81 active cont… 3 <list [0]>
#> # ℹ 9 more variables: retry_strategy <list>, container_properties <list>,
#> # timeout <list>, node_properties <list>, tags <list>,
#> # propagate_tags <lgl>, platform_capabilities <chr>,
#> # eks_properties <list>, container_orchestration_type <chr>
Monitoring and terminating jobs
With crew.aws.batch
, your crew
controller automatically submits jobs to AWS Batch. These jobs may fail or linger for any number of reasons, which could impede work and increase costs. So before you use crew_controller_aws_batch()
, please learn how to monitor and terminate AWS Batch jobs manually.
crew_monitor_aws_batch()
defines a “monitor” to help you manually list, inspect, and terminate jobs. You will need to supply a job definition name and a job queue name.
monitor <- crew_monitor_aws_batch(
job_definition = "YOUR_JOB_DEFINITION_NAME",
job_queue = "YOUR_JOB_QUEUE_NAME"
)
You can submit individual AWS Batch jobs to test your computing environment.
job1 <- monitor$submit(name = "job1", command = c("echo", "hello\nworld"))
job2 <- monitor$submit(name = "job2", command = c("echo", "job\nsubmitted"))
job2
#> # A tibble: 1 × 3
#> name id arn
#> <chr> <chr> <chr>
#> 1 job2 c38d55ad-4a86-4371-9994-6ea8882f5726 arn:aws:batch:us-east-2:0…
Method status()
checks the status of an individual job.
monitor$status(id = job2$id)
#> # A tibble: 1 × 8
#> name id arn status reason created started stopped
#> <chr> <chr> <chr> <chr> <chr> <dbl> <dbl> <dbl>
#> 1 job2 c38d55ad-4a86-43… arn:… runnable NA 1.70e12 NA NA
The jobs()
method gets the status of all the jobs within the job queue and job definition you originally supplied to crew_monitor_aws_batch()
. This may include many more jobs than the ones you submitted during the life cycle of the current monitor
object.
monitor$jobs()
#> # A tibble: 2 × 8
#> name id arn status reason created started stopped
#> <chr> <chr> <chr> <chr> <chr> <dbl> <dbl> <dbl>
#> 1 job1 653df636-ac74-43… arn:… succeeded Essen… 1.70e12 1.70e12 1.70e12
#> 2 job2 c38d55ad-4a86-43… arn:… runnable NA 1.70e12 NA NA
The job state can be "submitted"
, "pending"
, "runnable"
, "starting"
, "running"
, "succeeded"
, or "failed"
. The monitor has a method for each job state to get only the jobs with that state.
monitor$succeeded()
#> # A tibble: 1 × 8
#> name id arn status reason created started stopped
#> <chr> <chr> <chr> <chr> <chr> <dbl> <dbl> <dbl>
#> 1 job1 653df636-ac74-43… arn:… succeeded NA 1.70e12 1.70e12 1.70e12
In addition, there is an active()
method for just states "submitted"
, "pending"
, "runnable"
, "starting"
, and "running"
, and there is an inactive()
method for just the "succeeded"
and "failed"
states.
monitor$inactive()
#> # A tibble: 1 × 8
#> name id arn status reason created started stopped
#> <chr> <chr> <chr> <chr> <chr> <dbl> <dbl> <dbl>
#> 1 job1 653df636-ac74-43… arn:… succeeded NA 1.70e12 1.70e12 1.70e12
To terminate a job, use the terminate()
method. This has the effect of both canceling and terminating the job, although you may not see the change right away if the job is currently "runnable"
. Manually terminated jobs are listed as failed.
monitor$terminate(id = job2$id)
To get the CloudWatch logs of a job, use the log()
method. This method returns a tibble
with the log messages and numeric timestamps.
log <- monitor$log(id = job1$id)
log
#> # A tibble: 2 × 3
#> message timestamp ingestion_time
#> <chr> <dbl> <dbl>
#> 1 hello 1702068378163 1702068378245
#> 2 world 1702068378163 1702068378245
If the log messages are too long to conveniently view in the tibble
, you can print them to your screen with cat()
or writeLines()
.
writeLines(log$message)
#> hello
#> world
Using crew
with AWS Batch workers
To start using crew.aws.batch
in earnest, first create a controller object. Also supply the names of your job queue and job definition, as well as any optional flags and settings you may need. If you do not already have a job definition, the “monitor” object above can help you create one (see above).
library(crew.aws.batch)
controller <- crew_controller_aws_batch(
name = "my_workflow", # for informative job names
workers = 16,
tasks_max = 2, # to avoid reaching wall time limits (if any exist)
seconds_launch = 600, # to allow a 10-minute startup window
seconds_idle = 60, # to release resources when they are not needed
processes = NULL, # See the "Asynchronous worker management" section below.
options_aws_batch = crew_options_aws_batch(
job_definition = "YOUR_JOB_DEFINITION_NAME",
job_queue = "YOUR_JOB_QUEUE_NAME",
cpus = 2,
gpus = 0,
# Launch workers with 4 GB memory, then 8 GB if the worker crashes,
# then 16 GB on all subsequent launches. Go back to 4 GB if the worker
# completes all its tasks before exiting.
memory = c(4, 8, 16),
memory_units = "gigabytes"
)
)
controller$start()
At this point, usage is exactly the same as basic crew
. The push()
method submits tasks and auto-scales AWS Batch workers to meet demand.
controller$push(name = "do work", command = do_work())
The pop()
method retrieves available tasks.
controller$pop()
#> # A tibble: 1 × 11
#> name command result seconds seed error trace warni…¹ launc…² worker insta…³
#> <chr> <chr> <list> <dbl> <int> <chr> <chr> <chr> <chr> <int> <chr>
#> 1 do work … do_work… <int> 0 1.56e8 NA NA NA 79e71c… 1 7686b2…
#> # … with abbreviated variable names ¹warnings, ²launcher, ³instance
Remember to terminate the controller when you are done.
controller$terminate()
Asynchronous worker management
HTTP requests to submit and terminate jobs may take up to 1 or 2 seconds, and this overhead may be burdensome if there are many workers. To run these requests asynchronously, set the processes
argument of crew_controller_aws_batch()
to the number of local mirai
daemons you want to process the requests. These processes will start on controller$start()
and end on controller$terminate()
or when your local R session ends. controller$launcher$async$errors()
shows the most recent error messages generated on launch or termination for all workers.
Troubleshooting
processes = NULL
disables async and makes launch/termination errors immediate and easier to see. You may also wish to set options(paws.log_level = 3L)
to increase the verbosity of paws
messages.
Thanks
-
Charlie Gao created
mirai
andnanonext
and graciously accommodated the complicated and demanding feature requests that madecrew
and its ecosystem possible. - Thanks to Henrik Bengtsson, David Kretch, Adam Banker, and Michael Schubert for edifying conversations about cloud computing in R.
Code of Conduct
Please note that the crew
project is released with a Contributor Code of Conduct. By contributing to this project, you agree to abide by its terms.
Citation
citation("crew.aws.batch")
To cite package 'crew.aws.batch' in publications use:
Landau WM (????). _crew.aws.batch: A Crew Launcher Plugin for AWS
Batch_. R package version 0.0.7,
https://github.com/wlandau/crew.aws.batch,
<https://wlandau.github.io/crew.aws.batch/>.
A BibTeX entry for LaTeX users is
@Manual{,
title = {crew.aws.batch: A Crew Launcher Plugin for AWS Batch},
author = {William Michael Landau},
note = {R package version 0.0.7,
https://github.com/wlandau/crew.aws.batch},
url = {https://wlandau.github.io/crew.aws.batch/},
}