Skip to main content

A simple Python module to help distribute you tasks across multiple brokers as microservices

Project description

Classwork

This is a simple Python module to help distribute you tasks across multiple brokers as microservices.

Classwork uses NATS to manage communication between job schedulers and workers. Nats does most of the heavy lifting, allowing us to keep the rest of the code simple and concise.

What is in name?

So why ClassWork? Well, this is simply because workers are simply "python classes" (The Class) whose "methods" become individual workers (The Students).

Get started

First let us create a simple worker

import asyncio
from classwork import ClassWork


class WorkerClass:
    # Your worker...
    # Notice that we expect workers to be async
    async def add(self, a, b):
        # the work!
        resp = a + b
        # simulate a long process
        await asyncio.sleep(3)
        # return results
        return resp


# this is our nats server url
nats_url = "nats://127.0.0.1:4222"
# initialize ClassWork
class_work = ClassWork(nats_url=nats_url)
# init worker class
worker = WorkerClass()

# Ok, let us register things with asyncio
# notice the method 'class_work.register' is async!
asyncio.run(class_work.register(name="my-worker", worker_class=worker))

This is all the code we need to set up the worker. It is important to note the following:

  1. Worker Class methods should be asynchronous
  2. class_work.register is also async
  3. You will need NATS running with JetStream enabled!

The Job Scheduler

Now we need to create the job scheduler. Below is the code we need

import asyncio
import numpy as np
from pprint import pprint
from classwork import ClassWork

# init ClassWork
nats_url = "nats://127.0.0.1:4222"
class_work = ClassWork(nats_url=nats_url)

# Our callback function
# This is where complete work gets reported
async def report_callback(report_card):
    print("We have a report!")
    pprint(report_card)


# We need this function to create our job schedules
async def schedules():
    # Assign a job
    await class_work.assign(
        # the task name
        task="my_worker.add",
        # arguments
        args=[1, 2],
        # the callback to report our results
        report_callback=report_callback,
    )


# Ok, let us create the schedules now
asyncio.run(schedules())

This code will create a task into NATS and the job workers (attentive students 😂) already listening will pick the task and run it, then publish their reports which is routed via NATS back to the scheduler (teacher?).

Take note of the following:

  1. class_work.assign must be run in async mode. So we have wrapped it in an async method. You can also use asyncio.run directly.
  2. Naming your task is very important. This naming convection is unashamedly borrowed from moleculer. In this case, your task is "my_worker.add". This will route to any worker class registered with the name "my_worker" and method "add".
  3. Because all this traffic is routed via NATS, your arguments must be JSON serializable. Even though we use typ to handle edge cases like sets, beware that there are limits to what you can pass in your arguments
  4. report_callback must be async. It is called with a 'report' of your task. A report card 😊 will look like the one below:
  5. args can be passed as a list or dict. They will be treated as *args if list and **kwargs if dict.
We have a report!
{'task': 'my_worker.add',
 'duration': {'latency': {'request': '4 ms and 356 µs',
                          'response': '5 s and 4 ms'},
              'my_worker.add': '3 s and 1 ms'},
 'req_id': '8mYjJjM0kb5',
 'response': 3}

Report Explanation

  • duration: is a high precision (down to yoctoseconds) report of the time taken.
    • latency: shows the time taken to route your "task request" to the worker and "task response" back to the scheduler. It is important to understand that since both worker and scheduler are disconnected, latency may also include delays of either to access the NATS network and thus does not specifically refer to network latency.
  • req_id: is a unique id assigned to each job
  • response: is the actual value returned by the worker

Try it

We have sample code in scheduler.py and worker.py

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

classwork-0.0.7.tar.gz (7.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

classwork-0.0.7-py3-none-any.whl (9.2 kB view details)

Uploaded Python 3

File details

Details for the file classwork-0.0.7.tar.gz.

File metadata

  • Download URL: classwork-0.0.7.tar.gz
  • Upload date:
  • Size: 7.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.5.1 CPython/3.8.10 Linux/5.15.0-73-generic

File hashes

Hashes for classwork-0.0.7.tar.gz
Algorithm Hash digest
SHA256 25f74c8a41830125e630a26b2313fbc5746e924dc533d7dee5e11bff62972d16
MD5 402225a8aae415f8b55d992237addef8
BLAKE2b-256 cac7775bb2a79ea4c16f3e9b3dc5ed40625c56d5dd72c78ac9db94858619bb74

See more details on using hashes here.

File details

Details for the file classwork-0.0.7-py3-none-any.whl.

File metadata

  • Download URL: classwork-0.0.7-py3-none-any.whl
  • Upload date:
  • Size: 9.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.5.1 CPython/3.8.10 Linux/5.15.0-73-generic

File hashes

Hashes for classwork-0.0.7-py3-none-any.whl
Algorithm Hash digest
SHA256 d177153da451a6501c5ccd9ae0096fc3c478fc61b317e88886ef6a2de92e01b2
MD5 88c373ab93a1e5bf1d5c43653747c648
BLAKE2b-256 0c41cce65d9bcbba49130b062b82d74fe188b787b67e8adb92df78b62677e1d3

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page