Skip to contents

CRAN status check codecov lint pkgdown

In computationally demanding analysis projects, statisticians and data scientists asynchronously deploy long-running tasks to distributed systems, ranging from traditional clusters to cloud services. The crew.aws.batch package extends the mirai-powered ‘crew’ package with a worker launcher plugin for AWS Batch. Inspiration also comes from packages mirai, future, rrq, clustermq, and batchtools.

Installation

Type Source Command
Release CRAN install.packages("crew.aws.batch")
Development GitHub remotes::install_github("wlandau/crew.aws.batch")
Development R-universe install.packages("crew.aws.batch", repos = "https://wlandau.r-universe.dev")

Documentation

Please see https://wlandau.github.io/crew.aws.batch/ for documentation, including a full function reference and usage tutorial.

Prerequisites

crew.aws.batch launches AWS Batch jobs to run crew workers. This comes with a set of special requirements:

  1. Understand AWS Batch and its official documentation.
  2. Your job definitions must each have Docker-compatible container image with R and crew.aws.batch installed. You may wish to inherit from an existing rocker image.
  3. At minimum, for the launcher plugin to work, your IAM policies need permission to submit and terminate jobs. To appropriately monitor jobs, your policies also need permission to list and describe jobs. In addition, managing job definitions as described below requires permission to register, deregister, and describe job definitions. To view CloudWatch logs, you need permission to get log events.
  4. In the compute environment, the security group must permit all inbound and outbound TCP traffic within itself.1 The controller and the workers must run in this security group so they can communicate within the firewalled local network.2 If your security group ID is sg-00000 and belongs to VPC vpc-00000, then your inbound and outbound rules may look something like this:

client <- paws.compute::ec2()
groups <- client$describe_security_groups(GroupIds = "sg-00000")
str(groups$SecurityGroups[[1L]])
#> List of 8
#>  $ Description        : chr "Allow TCP traffic on ephemeral ports"
#>  $ GroupName          : chr "self-pointing-group"
#>  $ IpPermissions      :List of 1
#>   ..$ :List of 7
#>   .. ..$ FromPort        : num 1024
#>   .. ..$ IpProtocol      : chr "tcp"
#>   .. ..$ IpRanges        : list()
#>   .. ..$ Ipv6Ranges      : list()
#>   .. ..$ PrefixListIds   : list()
#>   .. ..$ ToPort          : num 65535
#>   .. ..$ UserIdGroupPairs:List of 1
#>   .. .. ..$ :List of 7
#>   .. .. .. ..$ Description           : chr "Accept traffic from other jobs in group."
#>   .. .. .. ..$ GroupId               : chr "sg-00000"
#>   .. .. .. ..$ GroupName             : chr(0)
#>   .. .. .. ..$ PeeringStatus         : chr(0)
#>   .. .. .. ..$ UserId                : chr "CENSORED"
#>   .. .. .. ..$ VpcId                 : chr(0)
#>   .. .. .. ..$ VpcPeeringConnectionId: chr(0)
#>  $ OwnerId            : chr "CENSORED"
#>  $ GroupId            : chr "sg-00000"
#>  $ IpPermissionsEgress:List of 1
#>   ..$ :List of 7
#>   .. ..$ FromPort        : num 1024
#>   .. ..$ IpProtocol      : chr "tcp"
#>   .. ..$ IpRanges        : list()
#>   .. ..$ Ipv6Ranges      : list()
#>   .. ..$ PrefixListIds   : list()
#>   .. ..$ ToPort          : num 65535
#>   .. ..$ UserIdGroupPairs:List of 1
#>   .. .. ..$ :List of 7
#>   .. .. .. ..$ Description           : chr "Allow traffic to other jobs in group."
#>   .. .. .. ..$ GroupId               : chr "sg-00000"
#>   .. .. .. ..$ GroupName             : chr(0)
#>   .. .. .. ..$ PeeringStatus         : chr(0)
#>   .. .. .. ..$ UserId                : chr "CENSORED"
#>   .. .. .. ..$ VpcId                 : chr(0)
#>   .. .. .. ..$ VpcPeeringConnectionId: chr(0)
#>  $ Tags               : list()
#>  $ VpcId              : chr "vpc-00000"

Managing job definitions

Before submitting jobs, AWS Batch requires a job definition to describe the container image and resource requirements. You can do this through the AWS web console, the AWS command line interface (CLI), a software development kit (SDK) like the paws R package, or the job definition class in crew.aws.batch. For crew.aws.batch, first create a job definition object.

definition <- crew_definition_aws_batch(
  job_definition = "YOUR_JOB_DEFINITION_NAME",
  job_queue = "YOUR_JOB_QUEUE_NAME"
)

The job definition may or may not exist at this point. If it does not exist, you can register with register(), an oversimplified limited-scope method which creates container-based job definitions with the "awslogs" log driver (for CloudWatch).3 Below, your container image can be as simple as a Docker Hub identifier (like "alpine:latest:) or a full URI of an ECR image.4

definition$register(
  image = "AWS_ACCOUNT_ID.dkr.ecr.AWS_REGION.amazonaws.com/ECR_REPOSITORY_NAME:IMAGE_TAG",
  platform_capabilities = "EC2",
  memory_units = "gigabytes",
  memory = 8,
  cpus = 2
)
#> # A tibble: 1 × 3
#>   name                     revision arn                                     
#>   <chr>                       <int> <chr>                                   
#> 1 YOUR_JOB_DEFINITION_NAME       81 arn:aws:batch:us-east-1:CENSORED:jo…

The describe() method shows information about current and past revisions of the job definition. Set active to TRUE to see just the active revisions.

definition$describe(active = TRUE)
#> # A tibble: 2 × 16
#>   name            arn   revision status type  scheduling_priority parameters
#>   <chr>           <chr>    <int> <chr>  <chr>               <dbl> <list>    
#> 1 YOUR_JOB_DEFIN… arn:…       82 active cont…                   3 <list [0]>
#> 2 YOUR_JOB_DEFIN… arn:…       81 active cont…                   3 <list [0]>
#> # ℹ 9 more variables: retry_strategy <list>, container_properties <list>,
#> #   timeout <list>, node_properties <list>, tags <list>,
#> #   propagate_tags <lgl>, platform_capabilities <chr>,
#> #   eks_properties <list>, container_orchestration_type <chr>

Use deregister() to deregister a revision of a job definition. If a revision number is not supplied, then it defaults to the greatest active revision number.

definition$deregister()
#> # A tibble: 1 × 16
#>   name            arn   revision status type  scheduling_priority parameters
#>   <chr>           <chr>    <int> <chr>  <chr>               <dbl> <list>    
#> 1 YOUR_JOB_DEFIN… arn:…       81 active cont…                   3 <list [0]>
#> # ℹ 9 more variables: retry_strategy <list>, container_properties <list>,
#> #   timeout <list>, node_properties <list>, tags <list>,
#> #   propagate_tags <lgl>, platform_capabilities <chr>,
#> #   eks_properties <list>, container_orchestration_type <chr>

Monitoring and terminating jobs

With crew.aws.batch, your crew controller automatically submits jobs to AWS Batch. These jobs may fail or linger for any number of reasons, which could impede work and increase costs. So before you use crew_controller_aws_batch(), please learn how to monitor and terminate AWS Batch jobs manually.

crew_monitor_aws_batch() defines a “monitor” to help you manually list, inspect, and terminate jobs. You will need to supply a job definition name and a job queue name.

monitor <- crew_monitor_aws_batch(
  job_definition = "YOUR_JOB_DEFINITION_NAME",
  job_queue = "YOUR_JOB_QUEUE_NAME"
)

You can submit individual AWS Batch jobs to test your computing environment.

job1 <- monitor$submit(name = "job1", command = c("echo", "hello\nworld"))
job2 <- monitor$submit(name = "job2", command = c("echo", "job\nsubmitted"))
job2
#> # A tibble: 1 × 3
#>   name  id                                   arn                       
#>   <chr> <chr>                                <chr>                     
#> 1 job2  c38d55ad-4a86-4371-9994-6ea8882f5726 arn:aws:batch:us-east-2:0…

Method status() checks the status of an individual job.

monitor$status(id = job2$id)
#> # A tibble: 1 × 8
#>   name  id                arn   status   reason created started stopped
#>   <chr> <chr>             <chr> <chr>    <chr>    <dbl>   <dbl>   <dbl>
#> 1 job2  c38d55ad-4a86-43… arn:… runnable NA     1.70e12      NA      NA

The jobs() method gets the status of all the jobs within the job queue and job definition you originally supplied to crew_monitor_aws_batch(). This may include many more jobs than the ones you submitted during the life cycle of the current monitor object.

monitor$jobs()
#> # A tibble: 2 × 8
#>   name  id                arn   status    reason created started stopped
#>   <chr> <chr>             <chr> <chr>     <chr>    <dbl>   <dbl>   <dbl>
#> 1 job1  653df636-ac74-43… arn:… succeeded Essen… 1.70e12 1.70e12 1.70e12
#> 2 job2  c38d55ad-4a86-43… arn:… runnable  NA     1.70e12      NA      NA

The job state can be "submitted", "pending", "runnable", "starting", "running", "succeeded", or "failed". The monitor has a method for each job state to get only the jobs with that state.

monitor$succeeded()
#> # A tibble: 1 × 8
#>   name  id                arn   status    reason created started stopped
#>   <chr> <chr>             <chr> <chr>     <chr>    <dbl>   <dbl>   <dbl>
#> 1 job1  653df636-ac74-43… arn:… succeeded NA     1.70e12 1.70e12 1.70e12

In addition, there is an active() method for just states "submitted", "pending", "runnable", "starting", and "running", and there is an inactive() method for just the "succeeded" and "failed" states.

monitor$inactive()
#> # A tibble: 1 × 8
#>   name  id                arn   status    reason created started stopped
#>   <chr> <chr>             <chr> <chr>     <chr>    <dbl>   <dbl>   <dbl>
#> 1 job1  653df636-ac74-43… arn:… succeeded NA     1.70e12 1.70e12 1.70e12

To terminate a job, use the terminate() method. This has the effect of both canceling and terminating the job, although you may not see the change right away if the job is currently "runnable". Manually terminated jobs are listed as failed.

monitor$terminate(id = job2$id)

To get the CloudWatch logs of a job, use the log() method. This method returns a tibble with the log messages and numeric timestamps.

log <- monitor$log(id = job1$id)
log
#> # A tibble: 2 × 3
#>   message     timestamp ingestion_time
#>   <chr>           <dbl>          <dbl>
#> 1 hello   1702068378163  1702068378245
#> 2 world   1702068378163  1702068378245

If the log messages are too long to conveniently view in the tibble, you can print them to your screen with cat() or writeLines().

writeLines(log$message)
#> hello
#> world

Using crew with AWS Batch workers

To start using crew.aws.batch in earnest, first create a controller object. Also supply the names of your job queue and job definition, as well as any optional flags and settings you may need. If you do not already have a job definition, the “monitor” object above can help you create one (see above).

library(crew.aws.batch)
controller <- crew_controller_aws_batch(
  name = "my_workflow", # for informative job names
  workers = 16,
  tasks_max = 2, # to avoid reaching wall time limits (if any exist)
  seconds_launch = 600, # to allow a 10-minute startup window
  seconds_idle = 60, # to release resources when they are not needed
  processes = NULL, # See the "Asynchronous worker management" section below.
  options_aws_batch = crew_options_aws_batch(
    job_definition = "YOUR_JOB_DEFINITION_NAME",
    job_queue = "YOUR_JOB_QUEUE_NAME",
    cpus = 2,
    gpus = 0,
    # Launch workers with 4 GB memory, then 8 GB if the worker crashes,
    # then 16 GB on all subsequent launches. Go back to 4 GB if the worker
    # completes all its tasks before exiting.
    memory = c(4, 8, 16),
    memory_units = "gigabytes"
  )
  
)
controller$start()

At this point, usage is exactly the same as basic crew. The push() method submits tasks and auto-scales AWS Batch workers to meet demand.

controller$push(name = "do work", command = do_work())

The pop() method retrieves available tasks.

controller$pop()
#> # A tibble: 1 × 11
#>   name         command result seconds   seed error trace warni…¹ launc…² worker insta…³
#>   <chr>        <chr>   <list>   <dbl>  <int> <chr> <chr> <chr>   <chr>    <int> <chr>  
#> 1 do work   … do_work… <int>        0 1.56e8 NA    NA    NA      79e71c…      1 7686b2…
#> # … with abbreviated variable names ¹​warnings, ²​launcher, ³​instance

Remember to terminate the controller when you are done.

controller$terminate()

Asynchronous worker management

HTTP requests to submit and terminate jobs may take up to 1 or 2 seconds, and this overhead may be burdensome if there are many workers. To run these requests asynchronously, set the processes argument of crew_controller_aws_batch() to the number of local mirai daemons you want to process the requests. These processes will start on controller$start() and end on controller$terminate() or when your local R session ends. controller$launcher$async$errors() shows the most recent error messages generated on launch or termination for all workers.

Troubleshooting

processes = NULL disables async and makes launch/termination errors immediate and easier to see. You may also wish to set options(paws.log_level = 3L) to increase the verbosity of paws messages.

Thanks

Code of Conduct

Please note that the crew project is released with a Contributor Code of Conduct. By contributing to this project, you agree to abide by its terms.

Citation

citation("crew.aws.batch")
To cite package 'crew.aws.batch' in publications use:

  Landau WM (????). _crew.aws.batch: A Crew Launcher Plugin for AWS
  Batch_. R package version 0.0.7,
  https://github.com/wlandau/crew.aws.batch,
  <https://wlandau.github.io/crew.aws.batch/>.

A BibTeX entry for LaTeX users is

  @Manual{,
    title = {crew.aws.batch: A Crew Launcher Plugin for AWS Batch},
    author = {William Michael Landau},
    note = {R package version 0.0.7, 
https://github.com/wlandau/crew.aws.batch},
    url = {https://wlandau.github.io/crew.aws.batch/},
  }