Skip to main content

Consumer/publisher loop for workers in an object recognition pipeline

Project description

catflow-worker

Consumer/publisher loop for workers in an object recognition pipeline

Setup

  • Install pre-commit in your virtualenv. Run pre-commit install after cloning this repository.

Develop

Install:

pip install --editable .[dev]

Configure:

export CATFLOW_S3_ENDPOINT_URL="your-endpoint-url"
export CATFLOW_AWS_ACCESS_KEY_ID="your-access-key-id"
export CATFLOW_AWS_SECRET_ACCESS_KEY="your-secret-access-key"
export CATFLOW_AWS_BUCKET_NAME="your-bucket-name"
export CATFLOW_AMQP_URL="amqp://username:password@hostname:port/"
export CATFLOW_AMQP_EXCHANGE="catflow"

Run the sample loop:

$ python -m catflow_worker
[*] Message received (ingest.video): da0b7b0a-c5e9-44dd-b67a-4fef9c5d86bd.mp4
[-] Content-Type binary/octet-stream
[-] Content-Length 92598
[*] Message received (detect.video): da0b7b0a-c5e9-44dd-b67a-4fef9c5d86bd.mp4
[-] Content-Type binary/octet-stream
[-] Content-Length 92598

Build

pip install build
python -m build

Test

pytest

Usage

Define an async handler function:

async def example_handler(
    msg: str, key: str, s3: Any, bucket: str
) -> Tuple[bool, Tuple[str, List[Any]]]:

The handler function

  • gets: a message, the key the message was sent to, an S3 client, the name of an S3 bucket to use
  • and returns: an error status, and a list of (key, message) tuples to publish

Then start it like:

worker = await Worker.create(handler, queue_name, topic_key)
await worker.work()

See catflow_worker/main.py for a working example.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

catflow-worker-0.1.8.tar.gz (11.6 kB view hashes)

Uploaded Source

Built Distribution

catflow_worker-0.1.8-py3-none-any.whl (7.6 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page