Skip to main content

Tools to design Job APIs on top of NATS

Project description

ChapsNats

A library of python tools for working with NATS

Description of the project

This project is a library of python tools for working with NATS. It is a collection of classes and functions that can be used to interact with NATS servers, and constructing standardized APIs with FastAPI.

Installation

The library is designed to be used in a python environment. It can be installed using pip:

pip install .

Usage

The library provides simple functionalities on top of the NATS python client:

import chaps_nats

# Create a NATS context within a context manager
# The cleanup is done automatically when the context manager exits
# The nats context is associated with an application id which will prefix the
# subjects / object store to allow for more consistency in the NATS server
async with chaps_nats.NatsContext(app_id="test", urls=[self.nats_url()]) as nats_context:

    # Create an object store for the context
    store = await nats_context.js.create_object_store("test")

    # Push a message to the object store
    await store.put("toto", b"Hello World!")

    # Retrieve the object from the object store
    res = (await store.get("toto")).data.decode("utf-8")

    # Publish a message to a subject
    await nats_context.publish("test.subject", b"Hello World!")

The library defines an job abstraction NatsJob which can be used to create a job that can be scheduled on a NATS server:

# Producer of a Job

import chaps_nats

async with chaps_nats.NatsContext(app_id="_tuto", urls=[self.nats_url()]) as nats_context:
    # Creates a job and updates its status
    job = await chaps_nats.NatsJob.create(nats_context=nats_context,
        create_if_not_exists=True # Create the job if it does not exist
    )
    status = await job.status()

    # The job is associated with a specific object store
    await job.object_store.put("input", b"Hello World!")

    # Sends a message to the NATS Queue with a subject subscribed by workers
    # The workers will then process the job and update the status
    await job.start()

The NatsJob can be consumed by a worker, extending chaps_nats.workers.NatsJobWorkerProcess :

import chaps_nats
import chaps_nats.workers

class TestBasicWorker(chaps_nats.workers.NatsJobWorkerProcess):
    """
    A worker which runs in a separate python process

    /!\ Be careful what you put in the members of this class
        A communication between processes is not trivial.
    """

    async def init_context(self, nats_context: chaps_nats.NatsContext) -> Dict:
        """
        Initialize the context (called when starting the process)

        You can use this method to initialize the context of the worker
        Load a model, connect to a database, initialize NATS subscribers and publishers, etc.
        """
        rich.print("[green]Initializing Context[/]")
        return {}

    async def process_job(self, job: chaps_nats.NatsJob, context: Dict) -> bool:
        """
        Processes a job ->
        Interact with the object store, and / or publish messages to the NATS server
        """
        rich.print(f"[green]Processing Job: {job.job_id}")
        await job.object_store.put("output", job.job_id.encode("utf-8"))
        return True

Finally we provide a tool to design standardized Job APIs with FastAPI, see Examples for more details.

You can run the examples with nats using the docker files provided.

# Launches the docker image containing the nats server
# And the example API located at Examples/basic_job_api/API.py
./launch_api.sh basic_job_api 8099

You can look at unit tests in repertory Tests for more details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

chaps_nats-0.0.1.tar.gz (14.5 kB view details)

Uploaded Source

Built Distribution

chaps_nats-0.0.1-py3-none-any.whl (15.3 kB view details)

Uploaded Python 3

File details

Details for the file chaps_nats-0.0.1.tar.gz.

File metadata

  • Download URL: chaps_nats-0.0.1.tar.gz
  • Upload date:
  • Size: 14.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.0 CPython/3.11.9

File hashes

Hashes for chaps_nats-0.0.1.tar.gz
Algorithm Hash digest
SHA256 7adb559c0284d450a891983c4806e6acb0c7852ec7d81aa225a7775f942f51a6
MD5 445a0e811dd78719b5245748ff2591b3
BLAKE2b-256 66ecd3d1186aa3bb057b500f10c9fc8fc472e6e234fc5bae63536e48c0282fee

See more details on using hashes here.

File details

Details for the file chaps_nats-0.0.1-py3-none-any.whl.

File metadata

  • Download URL: chaps_nats-0.0.1-py3-none-any.whl
  • Upload date:
  • Size: 15.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.0 CPython/3.11.9

File hashes

Hashes for chaps_nats-0.0.1-py3-none-any.whl
Algorithm Hash digest
SHA256 f31e08b99416c653b39979ee60fcf526fbebe5c8c1bea0d40c1d2f3f5fccc628
MD5 07db7f7875a4c19b5edb01b57d226577
BLAKE2b-256 7ee961fd9cc7e13d87f2874b4b5a1fe2a36aad4f21a130821c7096f8cae31a42

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page