Skip to main content

Consumer/publisher loop for workers in an object recognition pipeline

Project description

catflow-worker

Consumer/publisher loop for workers in an object recognition pipeline

Setup

  • Install pre-commit in your virtualenv. Run pre-commit install after cloning this repository.

Develop

Install:

pip install --editable .[dev]

Configure:

export CATFLOW_S3_ENDPOINT_URL="your-endpoint-url"
export CATFLOW_AWS_ACCESS_KEY_ID="your-access-key-id"
export CATFLOW_AWS_SECRET_ACCESS_KEY="your-secret-access-key"
export CATFLOW_AWS_BUCKET_NAME="your-bucket-name"
export CATFLOW_AMQP_URL="amqp://username:password@hostname:port/"
export CATFLOW_AMQP_EXCHANGE="catflow"

Run the sample loop:

$ python -m catflow_worker
[*] Message received (ingest.video): da0b7b0a-c5e9-44dd-b67a-4fef9c5d86bd.mp4
[-] Content-Type binary/octet-stream
[-] Content-Length 92598
[*] Message received (detect.video): da0b7b0a-c5e9-44dd-b67a-4fef9c5d86bd.mp4
[-] Content-Type binary/octet-stream
[-] Content-Length 92598

Build

pip install build
python -m build

Test

pytest

Usage

Define an async handler function:

async def example_handler(
    msg: str, key: str, s3: Any, bucket: str
) -> Tuple[bool, Tuple[str, List[Any]]]:

The handler function

  • gets: a message, the key the message was sent to, an S3 client, the name of an S3 bucket to use
  • and returns: an error status, and a list of (key, message) tuples to publish

Then start it like:

worker = await Worker.create(handler, queue_name, topic_key)
await worker.work()

See catflow_worker/main.py for a working example.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

catflow-worker-0.1.8.tar.gz (11.6 kB view details)

Uploaded Source

Built Distribution

catflow_worker-0.1.8-py3-none-any.whl (7.6 kB view details)

Uploaded Python 3

File details

Details for the file catflow-worker-0.1.8.tar.gz.

File metadata

  • Download URL: catflow-worker-0.1.8.tar.gz
  • Upload date:
  • Size: 11.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/4.0.1 CPython/3.11.4

File hashes

Hashes for catflow-worker-0.1.8.tar.gz
Algorithm Hash digest
SHA256 2c1cea020c6cdc6747aaa192849568ff712c52d3b49996679cb3929e8b5518fc
MD5 ab4651325deee9a2859e4fbe936b9acb
BLAKE2b-256 fbae48d10112ba83b2151b99af5ccc61a17a3aa940979bb78ceacdf108b4bfd7

See more details on using hashes here.

File details

Details for the file catflow_worker-0.1.8-py3-none-any.whl.

File metadata

File hashes

Hashes for catflow_worker-0.1.8-py3-none-any.whl
Algorithm Hash digest
SHA256 4c94859ce2e37fd2da45e1ccd3618bc9b95a72fbb8c3141e6cae23ce75696288
MD5 ceeea80ba2f18647a0278beb42a47ed8
BLAKE2b-256 1b3c4bdc198705e64e39f0a69ac2570fffd87cc0b4faeb7c6efc5d42440119b3

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page