Skip to main content

Minimalistic library that simplifies the adoption of async/await (asyncio) programming style in a multithreaded application.

Project description

magic_foundation

Build Status codecov pypi

Minimalistic library that simplifies the adoption of async/await (asyncio) programming style in a multithreaded application.

Define 2 services and run in the same runloop (single thread)

import logging

from magic_foundation import Service, ServiceStatus, ServiceContext, Main


class TestService(Service):

  def __init__(self, name:str):
      self.name = name

  async def initialize(self, ctx:ServiceContext):
    log.info(f"[{self.name}] initialize")

  async def run(self, ctx:ServiceContext):
    index = 0

    while self.status is ServiceStatus.running:
      log.info(f"[{self.name}] run {index}")
      index += 1
      await asyncio.sleep(1.0)

  async def terminate(self, ctx:ServiceContext):
    log.info(f"[{self.name}] terminate")


if __name__ == "__main__":
  log = logging.getLogger(__name__)
  logging.basicConfig(level=logging.INFO)
  logging.getLogger("magic_foundation").setLevel(logging.DEBUG)

  main = Main.instance()
  main.service_pools = {
    'main': [
      TestService(name="Service_1"),
      TestService(name="Service_2")
    ],
  }
  main.run()

If you want running the 2 services in different threads just define the service_pools as following:

  ...
  main = Main.instance()
  main.service_pools = {
    'th1': [
      TestService(name="Service_1")
    ],
    'th2': [
      TestService(name="Service_2")
    ],
  }
  main.run()

Communication

The communication between Services mimics the publish/subscribe paradigm

Subscribe

  async def coro(data):
    log.info(f"[{self.name}] coro data:{data}")

  await ctx.subscribe(queue_name="q://my_queue", handler=coro)

Pubblish

  data = {
    "a": "A",
    "b": 1234 
  }

  await ctx.publish(queue_name="q://my_queue", data=data) 

It is possible to subscribe to the same queue from different services and each service will receive the message

WebSocket Service

As an additionan component the library provides a built-in websocket service.

from magic_foundation import Container, Service, ServiceStatus, ServiceContext, Main
from magic_foundation.websocket_service import WebSocketService

...

class TestService(Service):

  ...  

  async def run(self, ctx:ServiceContext):
    log.info(f"[{self.name}] run")

    async def handler(data):
      log.info(f"[{self.name}] handler inbound data:{data}")
      req = json.loads(data)
      res = json.dumps({"status": "OK", "timestamp": req["timestamp"]})
      await ctx.publish(queue_name="ws://outbound/client", data=res)

    await ctx.subscribe(queue_name="ws://inbound/client", handler=handler)

    ...

  async def terminate(self, ctx:ServiceContext):
    log.info(f"[{self.name}] terminate")
...


if __name__ == "__main__":
    main = Main.instance()

    main.service_pools = {
      "main" : [
        WebSocketService(host="localhost", port=8765),
        TestService(name="Consumer"), 
      ]
    }

    main.run()

The prefixes ws://inbound/ and ws://outbound/ refer to the WS endpoint (WebSocketService) and must be considered reserved.

The complete example is contained in the examples folder.

To trying it run in a shell the command: python examples/websocket.py and in a browser open the file websocket.html.

Limitations

Right now only the requested path is use as discriminant, so messages from different clients with the same path are routed to the same handler and vice versa one outbound message is sent to all clients connected to the same path.

Simple Logging Service

The included logging service is a simpple way to dump json maggase to a local file.

from magic_foundation import Container, Service, ServiceStatus, ServiceContext, Main
from magic_foundation.logging_service import LoggingService

...

file_path = "logging_out.log"

class TestService(Service):

  ...  

  async def run(self, ctx:ServiceContext):
    log.info(f"[{self.name}] run")

    ...

  async def terminate(self, ctx:ServiceContext):
    log.info(f"[{self.name}] terminate")

    index = 0;

    while self.status == ServiceStatus.running:
      index += 1
      data = {"cmd": "log", "index": index}

      await ctx.publish(queue_name=f"log://{file_path}", data=data)

      await asyncio.sleep(1.0)
...


if __name__ == "__main__":
    main = Main.instance()

    main.service_pools = {
      "backgound" : [
        LoggingService(file_path=file_path, flush_interval_sec=4.0)
      ],
      "main" : [
        TestService(name="Producer")
      ]
    }

    main.run()

The prefixes log:// refers to the simple logging service (LoggingService) and must be considered reserved.

The complete example is contained in the examples folder.

To trying it run in a shell the command: python examples/dump_to_file.py. In the root of the project a file called logging_out.log will be created and every 4 seconds the logging service will flush the collected messages.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

magic_foundation-0.1.5.tar.gz (9.0 kB view details)

Uploaded Source

Built Distribution

magic_foundation-0.1.5-py3-none-any.whl (8.6 kB view details)

Uploaded Python 3

File details

Details for the file magic_foundation-0.1.5.tar.gz.

File metadata

  • Download URL: magic_foundation-0.1.5.tar.gz
  • Upload date:
  • Size: 9.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.2.0 pkginfo/1.5.0.1 requests/2.24.0 setuptools/41.2.0 requests-toolbelt/0.9.1 tqdm/4.46.1 CPython/3.8.3

File hashes

Hashes for magic_foundation-0.1.5.tar.gz
Algorithm Hash digest
SHA256 01687bd2c6eb42f39807a105a40547cb294a51fb24390b3dd7f5f08860e827df
MD5 67eaaf4ef86bdf052badb09d1e21f191
BLAKE2b-256 cd8c7ec5b6b859ae523cdb5c208838d9f56108ce09774471bb9ebd2a8f845cf8

See more details on using hashes here.

File details

Details for the file magic_foundation-0.1.5-py3-none-any.whl.

File metadata

  • Download URL: magic_foundation-0.1.5-py3-none-any.whl
  • Upload date:
  • Size: 8.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.2.0 pkginfo/1.5.0.1 requests/2.24.0 setuptools/41.2.0 requests-toolbelt/0.9.1 tqdm/4.46.1 CPython/3.8.3

File hashes

Hashes for magic_foundation-0.1.5-py3-none-any.whl
Algorithm Hash digest
SHA256 828bda4fc854992ec5b704cb6a47adb93edc4e555e3427e4938969fa85a0bb56
MD5 a46963ad704ea6f7cc1dcb37f15ac35c
BLAKE2b-256 9ed68a3c8d5b5cafb7dfb971b1f026145c8dbbbe71c72ddf605a267caf96b123

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page