Skip to main content
Join the official 2019 Python Developers SurveyStart the survey!

Python 3.6+ library for submitting to AWS Batch interactively.

Project description

pendant

Testing Status codecov Documentation Build Status PyPi Release Python Versions MyPy Checked Code style: black

Python 3.6+ library for submitting to AWS Batch interactively.

❯ pip install pendant

Features:

  • Submit Batch jobs

Read the documentation at: pendant.readthedocs.io

End-to-end Example

The principle object for deploying jobs to AWS Batch is the Batch job definition. Every Batch job definition has a name, parameters, and some form of optional parameter validation.

>>> from pendant.aws.batch import BatchJob, JobDefinition
>>> from pendant.aws.s3 import S3Uri
>>> from pendant.aws.exception import S3ObjectNotFoundError

>>> class DemoJobDefinition(JobDefinition):
...     def __init__(self, input_object: S3Uri) -> None:
...         self.input_object = input_object
... 
...     @property
...     def name(self) -> str:
...         return 'demo-job'
... 
...     def validate(self) -> None:
...         if not self.input_object.object_exists():
...             raise S3ObjectNotFoundError(f'S3 object does not exist: {self.input_object}')

Let's instantiate the definition at a specific revision and validate it.

>>> definition = DemoJobDefinition(input_object=S3Uri('s3://bucket/object')).at_revision('6')
>>> definition.validate()
None

Validation is also performed when a job definition is wrapped by a BatchJob so the call to .validate() above was redundant. Wrapping a job definition into a Batch job is achieved with the following, but no useful work will happen until the job is submitted.

>>> job = BatchJob(definition)

Now we are ready to submit this job to AWS Batch! Submitting this Batch job is easy, and introspection can be performed immediately:

>>> response = job.submit(queue='prod')
>>> job.is_submitted()
True

When the job is in a RUNNING state we can access the job's Cloudwatch logs. The log events are returned as objects which have useful properties such as timestamp and message.

>>> for log_event in job.log_stream_events():
...     print(log_event)
LogEvent(timestamp="1543809952329", message="You have started up this demo job", ingestion_time="1543809957080")
LogEvent(timestamp="1543809955437", message="Configuration, we are loading from...", ingestion_time="1543809957080")
LogEvent(timestamp="1543809955437", message="Defaulting to approximate values", ingestion_time="1543809957080")
LogEvent(timestamp="1543809955437", message="Setting up logger, nothing to see here", ingestion_time="1543809957080")

And if we must, we can cancel the job as long as we provide a reason:

>>> response = job.terminate(reason='I was just testing!')

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Files for pendant, version 0.4.1
Filename, size File type Python version Upload date Hashes
Filename, size pendant-0.4.1.tar.gz (11.2 kB) File type Source Python version None Upload date Hashes View hashes

Supported by

Elastic Elastic Search Pingdom Pingdom Monitoring Google Google BigQuery Sentry Sentry Error logging AWS AWS Cloud computing DataDog DataDog Monitoring Fastly Fastly CDN SignalFx SignalFx Supporter DigiCert DigiCert EV certificate StatusPage StatusPage Status page