Source code for cloudknot.aws.batch

import cloudknot.config
import cloudpickle
from datetime import datetime
import logging
import pickle
import time

try:
    from collections.abc import namedtuple
except ImportError:
    from collections import namedtuple

from ..dockerimage import DEFAULT_PICKLE_PROTOCOL
from .base_classes import (
    NamedObject,
    clients,
    ResourceDoesNotExistException,
    ResourceClobberedException,
    BatchJobFailedError,
    CKTimeoutError,
    CloudknotInputError,
    get_s3_params,
)

__all__ = ["BatchJob"]
mod_logger = logging.getLogger(__name__)


def _exists_already(job_id):
    """
    Check if an AWS batch job exists already.

    If batch job exists, return namedtuple with batch job info.
    Otherwise, set the namedtuple's `exists` field to
    `False`. The remaining fields default to `None`.

    Returns
    -------
    namedtuple JobExists
        A namedtuple with fields
        ['exists', 'name', 'job_id', 'job_queue_arn', 'job_definition',
         'environment_variables', 'array_job']
    """
    # define a namedtuple for return value type
    JobExists = namedtuple(
        "JobExists",
        [
            "exists",
            "name",
            "job_id",
            "job_queue_arn",
            "job_definition",
            "environment_variables",
            "array_job",
        ],
    )
    # make all but the first value default to None
    JobExists.__new__.__defaults__ = (None,) * (len(JobExists._fields) - 1)

    response = clients["batch"].describe_jobs(jobs=[job_id])

    if response.get("jobs"):
        job = response.get("jobs")[0]
        name = job["jobName"]
        job_queue_arn = job["jobQueue"]
        job_def_arn = job["jobDefinition"]
        environment_variables = job["container"]["environment"]

        array_job = "arrayProperties" in job

        response = clients["batch"].describe_job_definitions(
            jobDefinitions=[job_def_arn]
        )
        job_def = response.get("jobDefinitions")[0]
        job_def_name = job_def["jobDefinitionName"]
        job_def_env = job_def["containerProperties"]["environment"]
        bucket_env = [e for e in job_def_env if e["name"] == "CLOUDKNOT_JOBS_S3_BUCKET"]
        output_bucket = bucket_env[0]["value"] if bucket_env else None
        job_def_retries = job_def["retryStrategy"]["attempts"]

        JobDef = namedtuple("JobDef", ["name", "arn", "output_bucket", "retries"])
        job_definition = JobDef(
            name=job_def_name,
            arn=job_def_arn,
            output_bucket=output_bucket,
            retries=job_def_retries,
        )

        mod_logger.info("Job {id:s} exists.".format(id=job_id))

        return JobExists(
            exists=True,
            name=name,
            job_id=job_id,
            job_queue_arn=job_queue_arn,
            job_definition=job_definition,
            environment_variables=environment_variables,
            array_job=array_job,
        )
    else:
        return JobExists(exists=False)


# noinspection PyPropertyAccess,PyAttributeOutsideInit
[docs]class BatchJob(NamedObject): """Class for defining AWS Batch Job.""" def __init__( self, job_id=None, name=None, job_queue=None, job_definition=None, input_=None, starmap=False, environment_variables=None, array_job=True, ): """Initialize an AWS Batch Job object. If requesting information on a pre-existing job, `job_id` is required. Otherwise, `name`, `job_queue`, and `job_definition` are required to submit a new job. Parameters ---------- job_id: string The AWS jobID, if requesting a job that already exists name : string Name of the job. Must satisfy regular expression pattern: [a-zA-Z][-a-zA-Z0-9]* job_queue : string Job queue ARN specifying the job queue to which this job will be submitted job_definition : namedtuple or object namedtuple specifying the job definition on which to base this job. Must contain fields 'name', 'arn', 'output_bucket', and 'retries' input_ : The input to be pickled and sent to the batch job via S3 starmap : bool If True, assume input is already grouped in tuples from a single iterable. environment_variables : list of dict list of key/value pairs representing environment variables sent to the container array_job : bool If True, this batch job will be an array_job. Default: True """ has_input = input_ is not None if not (job_id or all([name, job_queue, has_input, job_definition])): raise CloudknotInputError( "You must supply either job_id or " "(name, input_, job_queue, and " "job_definition)." ) if job_id and any([name, job_queue, has_input, job_definition]): raise CloudknotInputError( "You may supply either job_id or (name, input_, job_queue, " "and job_definition), not both." ) self._starmap = starmap if job_id: job = _exists_already(job_id=job_id) if not job.exists: raise ResourceDoesNotExistException( "jobId {id:s} does not exists".format(id=job_id), job_id ) super(BatchJob, self).__init__(name=job.name) self._job_queue_arn = job.job_queue_arn self._job_definition = job.job_definition self._environment_variables = job.environment_variables self._job_id = job.job_id self._array_job = job.array_job bucket = self._job_definition.output_bucket key = "/".join( [ "cloudknot.jobs", self._job_definition.name, self._job_id, "input.pickle", ] ) try: response = clients["s3"].get_object(Bucket=bucket, Key=key) self._input = pickle.loads(response.get("Body").read()) except ( clients["s3"].exceptions.NoSuchBucket, clients["s3"].exceptions.NoSuchKey, ): self._input = None self._section_name = self._get_section_name("batch-jobs") cloudknot.config.add_resource(self._section_name, self.job_id, self.name) mod_logger.info( "Retrieved pre-existing batch job {id:s}".format(id=self.job_id) ) else: super(BatchJob, self).__init__(name=name) if not isinstance(job_queue, str): raise CloudknotInputError("job_queue must be a string.") self._job_queue_arn = job_queue if not all( [ hasattr(job_definition, "name"), hasattr(job_definition, "arn"), hasattr(job_definition, "output_bucket"), hasattr(job_definition, "retries"), ] ): raise CloudknotInputError( 'job_definition must have attributes "name", "arn", ' '"output_bucket", and "retries".' ) self._job_definition = job_definition if environment_variables: if not all(isinstance(s, dict) for s in environment_variables): raise CloudknotInputError("env_vars must be a sequence of " "dicts") if not all( set(d.keys()) == {"name", "value"} for d in environment_variables ): raise CloudknotInputError( "each dict in env_vars must " 'have keys "name" and "value"' ) self._environment_variables = environment_variables else: self._environment_variables = None self._input = input_ self._array_job = array_job self._job_id = self._create() @property def job_queue_arn(self): """ARN for the job queue to which this job will be submitted.""" return self._job_queue_arn @property def job_definition(self): """ Job definition on which to base this job. Has properties 'name', 'arn', 'output_bucket', and 'retries' """ return self._job_definition @property def environment_variables(self): """Key/value pairs for environment variables sent to the container.""" return self._environment_variables @property def input(self): """The input to be pickled and sent to the batch job via S3.""" return self._input @property def starmap(self): """Boolean flag to indicate whether input was 'pre-zipped'.""" return self._starmap @property def array_job(self): """Boolean flag to indicate whether this is an array job.""" return self._array_job @property def job_id(self): """The AWS job-ID for this job.""" return self._job_id def _create(self): # pragma: nocover """ Create AWS batch job using instance parameters. Returns ------- string job ID for the created batch job """ # no coverage since actually submitting a batch job for # unit testing would be expensive bucket = self.job_definition.output_bucket sse = get_s3_params().sse pickled_input = cloudpickle.dumps(self.input, protocol=DEFAULT_PICKLE_PROTOCOL) command = [self.job_definition.output_bucket] if self.starmap: command = ["--starmap"] + command if sse: command = ["--sse", sse] + command if self.array_job: command = ["--arrayjob"] + command if self.environment_variables: container_overrides = { "environment": self.environment_variables, "command": command, } else: container_overrides = {"command": command} # We have to submit before uploading the input in order to get the # jobID first. if self.array_job: response = clients["batch"].submit_job( jobName=self.name, jobQueue=self.job_queue_arn, arrayProperties={"size": len(self.input)}, jobDefinition=self.job_definition.arn, containerOverrides=container_overrides, ) else: response = clients["batch"].submit_job( jobName=self.name, jobQueue=self.job_queue_arn, jobDefinition=self.job_definition.arn, containerOverrides=container_overrides, ) job_id = response["jobId"] key = "/".join( ["cloudknot.jobs", self.job_definition.name, job_id, "input.pickle"] ) # Upload the input pickle if sse: clients["s3"].put_object( Bucket=bucket, Body=pickled_input, Key=key, ServerSideEncryption=sse ) else: clients["s3"].put_object(Bucket=bucket, Body=pickled_input, Key=key) # Add this job to the list of jobs in the config file self._section_name = self._get_section_name("batch-jobs") cloudknot.config.add_resource(self._section_name, job_id, self.name) mod_logger.info( "Submitted batch job {name:s} with jobID " "{job_id:s}".format(name=self.name, job_id=job_id) ) return job_id @property def status(self): """ Query AWS batch job status using instance parameter `self.job_id`. Returns ------- status : dict dictionary with keys: {status, statusReason, attempts} for this AWS batch job """ if self.clobbered: raise ResourceClobberedException( "This batch job has already been clobbered.", self.job_id ) self.check_profile_and_region() # Query the job_id response = clients["batch"].describe_jobs(jobs=[self.job_id]) job = response.get("jobs")[0] # Return only a subset of the job dictionary keys = ["status", "statusReason", "attempts"] if self.array_job: keys.append("arrayProperties") status = {k: job.get(k) for k in keys} return status @property def log_urls(self): """ Return the urls of the batch job logs on AWS Cloudwatch. Returns ------- log_urls : list A list of log urls for each attempt number. If the job has not yet run, this will return an empty list """ attempts = sorted(self.status["attempts"], key=lambda a: a["startedAt"]) log_stream_names = [a["container"].get("logStreamName") for a in attempts] def log_name2url(log_name): return ( "https://console.aws.amazon.com/cloudwatch/home?region=" "{region:s}#logEventViewer:group=/aws/batch/job;" "stream={log_name:s}".format(region=self.region, log_name=log_name) ) log_urls = [log_name2url(log) for log in log_stream_names] return log_urls @property def done(self): """Return True if the job is done. In this case, "done" means the job status is SUCCEEDED or that it is FAILED and the job has exceeded the max number of retry attempts """ stat = self.status done = stat["status"] == "SUCCEEDED" or ( stat["status"] == "FAILED" and len(stat["attempts"]) >= self.job_definition.retries ) return done def _collect_array_job_result(self, idx=0): """Collect the array job results and return as a complete list. Parameters ---------- idx : int Index of the array job element to be retrieved. Default: 0 Returns ------- The array job element at index `idx` """ bucket = self.job_definition.output_bucket # For array jobs, different child jobs may have had different # numbers of attempts. So we start at the highest possible attempt # number and retrieve the latest one. attempt = self.job_definition.retries result_retrieved = False while not result_retrieved and attempt >= 0: key = "/".join( [ "cloudknot.jobs", self.job_definition.name, self.job_id, str(idx), "{0:03d}".format(attempt), "output.pickle", ] ) try: response = clients["s3"].get_object(Bucket=bucket, Key=key) result_retrieved = True except clients["s3"].exceptions.NoSuchKey: attempt -= 1 if not result_retrieved: raise CKTimeoutError( "Result not available in bucket {bucket:s} with key {key:s}" "".format(bucket=bucket, key=key) ) return pickle.loads(response.get("Body").read()) def result(self, timeout=None): """ Return the result of the latest attempt. If the call hasn't yet completed then this method will wait up to timeout seconds. If the call hasn't completed in timeout seconds, then a CKTimeoutError is raised. If the batch job is in FAILED status then a BatchJobFailedError is raised. Parameters ---------- timeout: int or float timeout time in seconds. If timeout is not specified or None, there is no limit to the wait time. Default: None Returns ------- result: The result of the AWS Batch job """ # Set start time for timeout period start_time = datetime.now() def time_diff(): return (datetime.now() - start_time).seconds while not self.done and (timeout is None or time_diff() < timeout): time.sleep(5) if not self.done: raise CKTimeoutError(self.job_id) status = self.status if status["status"] == "FAILED": raise BatchJobFailedError(self.job_id) else: if self.array_job: return [ self._collect_array_job_result(idx) for idx in range(len(self.input)) ] else: return self._collect_array_job_result() def terminate(self, reason): """ Terminate AWS batch job using instance parameter `self.job_id`. terminate() combines the cancel and terminate AWS CLI commands. Jobs that are in the SUBMITTED, PENDING, or RUNNABLE state must be cancelled, while jobs that are in the STARTING or RUNNING state must be terminated. Parameters ---------- reason : string A message to attach to the job that explains the reason for cancelling/terminating it. This message is returned by future DescribeJobs operations on the job. This message is also recorded in the AWS Batch activity logs. """ if self.clobbered: raise ResourceClobberedException( "This batch job has already been clobbered.", self.job_id ) self.check_profile_and_region() # Require the user to supply a reason for job termination if not isinstance(reason, str): raise CloudknotInputError("reason must be a string.") state = self.status["status"] if state in ["SUBMITTED", "PENDING", "RUNNABLE"]: clients["batch"].cancel_job(jobId=self.job_id, reason=reason) mod_logger.info( "Cancelled job {name:s} with jobID {job_id:s}".format( name=self.name, job_id=self.job_id ) ) elif state in ["STARTING", "RUNNING"]: clients["batch"].terminate_job(jobId=self.job_id, reason=reason) mod_logger.info( "Terminated job {name:s} with jobID {job_id:s}".format( name=self.name, job_id=self.job_id ) ) def clobber(self): """Kill an batch job and remove it's info from config.""" if self.clobbered: return self.check_profile_and_region() self.terminate( reason="Cloudknot job killed after calling " "BatchJob.clobber()" ) # Set the clobbered parameter to True, # preventing subsequent method calls self._clobbered = True # Remove this job from the list of jobs in the config file cloudknot.config.remove_resource(self._section_name, self.job_id)