Skip to main content

everything to run a project in a distributed and serverless fashion

Project description

Algernon

everything you need to be serverless and amazing

overview

Algernon works in the AWS ecosystem, so if you are somewhere else, this isn't for you #sad_emoji_face

Everything in the Algernon world is broken down to the smallest unit of work, which we call a Task. A Task has a task_name and an optional callback. When you run a Task, you can supply it with task_kwargs, which will be the payload delivered to the Task code.

listeners and queues

Every Task is associated with an SQS queue and an SNS topic. The SNS topic serves as the listener, messages sent to it will be transferred to the corresponding queue for processing.

workers

To simplify deployment, a single Lambda Function may serve multiple Tasks. In this case, you should use a handler capable of parsing out the task_name and routing the task accordingly. We commonly use:

from some_package import tasks

def handler(event, context):
    task_name = event['task_name']
    task_kwargs = event.get('task_kwargs', {})
    task_fn = getattr(tasks, task_name)
    if task_fn is None:
        raise RuntimeError(f'task: {task_name} is not registered with the system')
    results = task_fn(**task_kwargs)
    return results

The lambda function is subscribed to the SQS queue, so as tasks are pushed into the queue from the listener, the workers automatically pick them up and start working.

idempotence

By default, workers will pull messages from the queue in batches of ten. If one of the tasks in that batch fails, the entire batch remains in the queue, meaning all those tasks will run again. For this reason, you must be markedly diligent to make your code idempotent. Whether it runs once, or it runs a hundred times, the result should be the same (this includes entries to databases, additions to storage, etc)

callbacks

Once a task has run, the results can be sent to another Task, creating a chain. You specify the name of the next task under the callback key in the Task invocation.

context

When a Lambda function invokes Python code, it provides the handler with two positional arguments, event and context. We hijack the context, and use it to persist resources across a batch. The original AWS context is preserved under the key 'aws'. You can store your own information into this context dictionary, and those will be available throughout the life of the Task Worker.

Common uses of the context is to store database connections, retrieved credentials, or other things you don't want to repeat ten times when your workers pull a batch.

Objects and Utilities

the @queued decorator

This function decorator is applied to the Worker handler function. It allows you to code your handler as if it were being directly invoked by Lambda. The decorator takes care of parsing the batch messages from SQS and sending indicated callbacks.

the serializers

Algernon loves object oriented programming, and one of our early struggles was in trying to get our objects from Task to Task. To accomplish this, we provide a base object (AlgObject), which has one required class method (parse_json). Objects which inherit from AlgObject can be sent across the wire in Task task_kwargs.

To serialize and rebuild AlgObjects, you can use the ajson utility included in this library.

from algernon import ajson, AlgObject


class DatabaseCredentials(AlgObject):
    def __init__(self, username, password, read_url, write_url):
        self._username = username
        self._password = password
        self._read_url = read_url
        self._write_url = write_url

    @classmethod
    def parse_json(cls, json_dict):
        return cls(json_dict['username'], json_dict['password'], json_dict['read_url'], json_dict['write_url'])


credentials = DatabaseCredentials('my_username', '31iteP@zzW0rd', 'some_db_url', 'some_other_db_url')
strung_credentials = ajson.dumps(credentials)

# send them to the next task

rebuilt_credentials = ajson.loads(strung_credentials)

The ajson utility also handles some common JSON problem children.

  • Python datetime objects
  • Tuples
  • Sets
  • Decimals

the rebuild_event function

To support modularity, the @queued decorator does not run messages through the ajson utility before sending them to the Task handler. This decision allows one Task Worker to handle messages meant for a different Task Worker, such as when routing or replaying messages. If you try to use the ajson utility on messages that contain AlgObjects from another Worker, and those AlgObjects are not imported into the Worker handler, your Worker will fail.

If you know that certain Tasks will only ever receive messages that can be rebuilt within the module, you can use the rebuild_event function to restore the AlgObjects in the message.

from algernon import queued, rebuild_event

@queued
def handler(event, context):
    event = rebuild_event(event)
    task_name = event['task_name']
    task_kwargs = event['task_kwargs']
    db_credentials = task_kwargs['db_credentials']

the @lambda_logged decorator

Lambda functions capture all logging activity and store it through the CloudWatch service. To help organize and search these logs, you can decorate your Task or Worker handler with the @lambda_logged decorator. Doing so will clear all existing loggers, which we have found solves many logging problems with Lambda, and then setup the root logger to include timestamp, function information, and request information with your logging. Additionally, you can toggle debug level logging on by setting the environment variable "DEBUG" to True. Additionally, the decorator will activate the native Lambda connection the X-Ray service. If you set this decorator, and then run your Worker, you can see the traces for your function with their stats under the AWS X-Ray service.

This decorator activates logging for the most common libraries (requests, SQLite, etc). You can decorate your own functions directly to help improve the granularity of your tracing.

from aws_xray_sdk.core import xray_recorder

@xray_recorder.capture
def some_task(**kwargs):
    print(f'hey, did some work with {kwargs}')

We have found that the Python boto library and the native x-ray library tend to produce chatter in the logs, so this decorator sets both of them to log level WARNING.

combining decorators

you absolutely can use the @queued and @lambda_logged decorators together on the same handler.

StoredData

when passing information from function to function, as during a callback, the task_kwargs are not sent across the wire whole. To help handle large messages, the results of the function will be uploaded to S3 and replaced with a pointer. When the information arrives at the next task, the information is automatically pulled from S3 and put back in place. You specify the bucket to send the information to by setting the "ALGERNON_BUCKET_NAME" environment variable. By default, StoredData objects are set with the prefix "cache". You can change this by "CACHE_FOLDER_NAME" environment variable. We suggest you set up expiration lifecycle rules on the bucket you use for this purpose to keep costs down.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

moncrief-2.3.12.tar.gz (30.5 kB view hashes)

Uploaded Source

Built Distribution

moncrief-2.3.12-py2.py3-none-any.whl (32.2 kB view hashes)

Uploaded Python 2 Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page