Skip to main content

Tools to design Job APIs on top of NATS

Project description

ChapsNats

A library of python tools for working with NATS

Link to the pypi package: https://pypi.org/project/chaps-nats/

Description of the project

This project is a library of python tools for working with NATS. It is a collection of classes and functions that can be used to interact with NATS servers, and constructing standardized APIs with FastAPI.

Installation

The library is designed to be used in a python environment. It can be installed using pip (see the pypi package):

pip install chaps_nats

Usage

The library provides simple functionalities on top of the NATS python client:

import chaps_nats

# Create a NATS context within a context manager
# The cleanup is done automatically when the context manager exits
# The nats context is associated with an application id which will prefix the
# subjects / object store to allow for more consistency in the NATS server
async with chaps_nats.NatsContext(app_id="test", urls=[self.nats_url()]) as nats_context:

    # Create an object store for the context
    store = await nats_context.js.create_object_store("test")

    # Push a message to the object store
    await store.put("toto", b"Hello World!")

    # Retrieve the object from the object store
    res = (await store.get("toto")).data.decode("utf-8")

    # Publish a message to a subject
    await nats_context.publish("test.subject", b"Hello World!")

The library defines an job abstraction NatsJob which can be used to create a job that can be scheduled on a NATS server:

# Producer of a Job

import chaps_nats

async with chaps_nats.NatsContext(app_id="_tuto", urls=[self.nats_url()]) as nats_context:
    # Creates a job and updates its status
    job = await chaps_nats.NatsJob.create(nats_context=nats_context,
        create_if_not_exists=True # Create the job if it does not exist
    )

    assert await job.exists(), "The job should exist"

    status = await job.status()

    # The job is associated with a specific object store
    await job.object_store.put("input", b"Hello World!")

    # Sends a message to the NATS Queue with a subject subscribed by workers
    # The workers will then process the job and update the status
    await job.start()

The NatsJob can be consumed by a worker, extending chaps_nats.workers.NatsJobWorkerProcess :

import chaps_nats
import chaps_nats.workers

class TestBasicWorker(chaps_nats.workers.NatsJobWorkerProcess):
    """
    A worker which runs in a separate python process

    /!\ Be careful what you put in the members of this class
        A communication between processes is not trivial.
    """

    async def init_context(self, nats_context: chaps_nats.NatsContext) -> Dict:
        """
        Initialize the context (called when starting the process)

        You can use this method to initialize the context of the worker
        Load a model, connect to a database, initialize NATS subscribers and publishers, etc.
        """
        rich.print("[green]Initializing Context[/]")
        return {}

    async def process_job(self, job: chaps_nats.NatsJob, context: Dict) -> bool:
        """
        Processes a job ->
        Interact with the object store, and / or publish messages to the NATS server
        """
        rich.print(f"[green]Processing Job: {job.job_id}")
        await job.object_store.put("output", job.job_id.encode("utf-8"))
        return True

Finally we provide a tool to design standardized Job APIs with FastAPI, see Examples for more details.

You can run the examples with nats using the docker files provided.

# Launches the docker image containing the nats server
# And the example API located at Examples/basic_job_api/API.py
./launch_api.sh basic_job_api 8099

You can look at unit tests in repertory Tests for more details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

chaps_nats-0.0.3.tar.gz (14.6 kB view details)

Uploaded Source

Built Distribution

chaps_nats-0.0.3-py3-none-any.whl (15.4 kB view details)

Uploaded Python 3

File details

Details for the file chaps_nats-0.0.3.tar.gz.

File metadata

  • Download URL: chaps_nats-0.0.3.tar.gz
  • Upload date:
  • Size: 14.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.0 CPython/3.11.9

File hashes

Hashes for chaps_nats-0.0.3.tar.gz
Algorithm Hash digest
SHA256 de597fe05f8d5ebef53d867db83a0d614d15b3953444e328e394abff662b2f7b
MD5 c23d46783b20bde579cac4aeb7104173
BLAKE2b-256 8315b59cafe5bb842eb8acbe6cfbbfb476eaedc3a9c147096263f90519b668e7

See more details on using hashes here.

File details

Details for the file chaps_nats-0.0.3-py3-none-any.whl.

File metadata

  • Download URL: chaps_nats-0.0.3-py3-none-any.whl
  • Upload date:
  • Size: 15.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.0 CPython/3.11.9

File hashes

Hashes for chaps_nats-0.0.3-py3-none-any.whl
Algorithm Hash digest
SHA256 395a0e608e8d470adcf870f7418ce32abc3d3e61dddf7cc720346150854e2d2e
MD5 943a8ead71f1817bc678d855108c8759
BLAKE2b-256 781a731089de5de462a232c301009eb3d0dd6b7e357bed7fac5896f187e37e49

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page