Skip to main content

Tools to design Job APIs on top of NATS

Project description

ChapsNats

A library of python tools for working with NATS

Link to the pypi package: https://pypi.org/project/chaps-nats/

Description of the project

This project is a library of python tools for working with NATS. It is a collection of classes and functions that can be used to interact with NATS servers, and constructing standardized APIs with FastAPI.

Installation

The library is designed to be used in a python environment. It can be installed using pip (see the pypi package):

pip install chaps_nats

Usage

The library provides simple functionalities on top of the NATS python client:

import chaps_nats

# Create a NATS context within a context manager
# The cleanup is done automatically when the context manager exits
# The nats context is associated with an application id which will prefix the
# subjects / object store to allow for more consistency in the NATS server
async with chaps_nats.NatsContext(app_id="test", urls=[self.nats_url()]) as nats_context:

    # Create an object store for the context
    store = await nats_context.js.create_object_store("test")

    # Push a message to the object store
    await store.put("toto", b"Hello World!")

    # Retrieve the object from the object store
    res = (await store.get("toto")).data.decode("utf-8")

    # Publish a message to a subject
    await nats_context.publish("test.subject", b"Hello World!")

The library defines an job abstraction NatsJob which can be used to create a job that can be scheduled on a NATS server:

# Producer of a Job

import chaps_nats

async with chaps_nats.NatsContext(app_id="_tuto", urls=[self.nats_url()]) as nats_context:
    # Creates a job and updates its status
    job = await chaps_nats.NatsJob.create(nats_context=nats_context,
        create_if_not_exists=True # Create the job if it does not exist
    )

    assert await job.exists(), "The job should exist"

    status = await job.status()

    # The job is associated with a specific object store
    await job.object_store.put("input", b"Hello World!")

    # Sends a message to the NATS Queue with a subject subscribed by workers
    # The workers will then process the job and update the status
    await job.start()

The NatsJob can be consumed by a worker, extending chaps_nats.workers.NatsJobWorkerProcess :

import chaps_nats
import chaps_nats.workers

class TestBasicWorker(chaps_nats.workers.NatsJobWorkerProcess):
    """
    A worker which runs in a separate python process

    /!\ Be careful what you put in the members of this class
        A communication between processes is not trivial.
    """

    async def init_context(self, nats_context: chaps_nats.NatsContext) -> Dict:
        """
        Initialize the context (called when starting the process)

        You can use this method to initialize the context of the worker
        Load a model, connect to a database, initialize NATS subscribers and publishers, etc.
        """
        rich.print("[green]Initializing Context[/]")
        return {}

    async def process_job(self, job: chaps_nats.NatsJob, context: Dict) -> bool:
        """
        Processes a job ->
        Interact with the object store, and / or publish messages to the NATS server
        """
        rich.print(f"[green]Processing Job: {job.job_id}")
        await job.object_store.put("output", job.job_id.encode("utf-8"))
        return True

Finally we provide a tool to design standardized Job APIs with FastAPI, see Examples for more details.

You can run the examples with nats using the docker files provided.

# Launches the docker image containing the nats server
# And the example API located at Examples/basic_job_api/API.py
./launch_api.sh basic_job_api 8099

You can look at unit tests in repertory Tests for more details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

chaps_nats-0.0.4.tar.gz (14.5 kB view details)

Uploaded Source

Built Distribution

chaps_nats-0.0.4-py3-none-any.whl (15.3 kB view details)

Uploaded Python 3

File details

Details for the file chaps_nats-0.0.4.tar.gz.

File metadata

  • Download URL: chaps_nats-0.0.4.tar.gz
  • Upload date:
  • Size: 14.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.0 CPython/3.11.9

File hashes

Hashes for chaps_nats-0.0.4.tar.gz
Algorithm Hash digest
SHA256 44631432774acff916dc11801b826ee12c0a73564f19e65944bd7baa601f7015
MD5 ab0ee0d25087f07c6449ea34bc84b627
BLAKE2b-256 f6d7e4a9be49bab8de409e4b4e58ed13e6940216e35da5779014173c0d8d4987

See more details on using hashes here.

File details

Details for the file chaps_nats-0.0.4-py3-none-any.whl.

File metadata

  • Download URL: chaps_nats-0.0.4-py3-none-any.whl
  • Upload date:
  • Size: 15.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.0 CPython/3.11.9

File hashes

Hashes for chaps_nats-0.0.4-py3-none-any.whl
Algorithm Hash digest
SHA256 010cbbf5a48274804c97b235baa199737cc923a343482032e4f773a29d196f29
MD5 2a1280335b9db2991e39c910b10a3e48
BLAKE2b-256 b5ead38ed5fbcbd2e0066581ed88e821c1a23b21a8e6f0340873acba643d4da8

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page