Skip to main content

Minimalistic library that simplifies the adoption of async/await (asyncio) programming style in a multithreaded application.

Project description

magic_foundation

Build Status codecov pypi

Minimalistic library that simplifies the adoption of async/await (asyncio) programming style in a multithreaded application.

Define 2 services and run in the same runloop (single thread)

import logging

from magic_foundation import Service, ServiceStatus, ServiceContext, Main


class TestService(Service):

  def __init__(self, name:str):
      self.name = name

  async def initialize(self, ctx:ServiceContext):
    log.info(f"[{self.name}] initialize")

  async def run(self, ctx:ServiceContext):
    index = 0

    while self.status is ServiceStatus.running:
      log.info(f"[{self.name}] run {index}")
      index += 1
      await asyncio.sleep(1.0)

  async def terminate(self, ctx:ServiceContext):
    log.info(f"[{self.name}] terminate")


if __name__ == "__main__":
  log = logging.getLogger(__name__)
  logging.basicConfig(level=logging.INFO)
  logging.getLogger("magic_foundation").setLevel(logging.DEBUG)

  main = Main.instance()
  main.service_pools = {
    'main': [
      TestService(name="Service_1"),
      TestService(name="Service_2")
    ],
  }
  main.run()

If you want running the 2 services in different threads just define the service_pools as following:

  ...
  main = Main.instance()
  main.service_pools = {
    'th1': [
      TestService(name="Service_1")
    ],
    'th2': [
      TestService(name="Service_2")
    ],
  }
  main.run()

Communication

The communication between Services mimics the publish/subscribe paradigm

Subscribe

  async def coro(data):
    log.info(f"[{self.name}] coro data:{data}")

  await ctx.subscribe(queue_name="q://my_queue", handler=coro)

Pubblish

  data = {
    "a": "A",
    "b": 1234 
  }

  await ctx.publish(queue_name="q://my_queue", data=data) 

It is possible to subscribe to the same queue from different services and each service will receive the message

WebSocket Service

As an additionan component the library provides a built-in websocket service.

from magic_foundation import Container, Service, ServiceStatus, ServiceContext, Main
from magic_foundation.websocket_service import WebSocketService

...

class TestService(Service):

  ...  

  async def run(self, ctx:ServiceContext):
    log.info(f"[{self.name}] run")

    async def handler(data):
      log.info(f"[{self.name}] handler inbound data:{data}")
      req = json.loads(data)
      res = json.dumps({"status": "OK", "timestamp": req["timestamp"]})
      await ctx.publish(queue_name="ws://outbound/client", data=res)

    await ctx.subscribe(queue_name="ws://inbound/client", handler=handler)

    ...

  async def terminate(self, ctx:ServiceContext):
    log.info(f"[{self.name}] terminate")
...


if __name__ == "__main__":
    main = Main.instance()

    main.service_pools = {
      "main" : [
        WebSocketService(host="localhost", port=8765),
        TestService(name="Consumer"), 
      ]
    }

    main.run()

The prefixes ws://inbound/ and ws://outbound/ refer to the WS endpoint (WebSocketService) and must be considered reserved.

The complete example is contained in the examples folder.

To trying it run in a shell the command: python examples/websocket.py and in a browser open the file websocket.html.

Limitations

Right now only the requested path is use as discriminant, so messages from different clients with the same path are routed to the same handler and vice versa one outbound message is sent to all clients connected to the same path.

Simple Logging Service

The included logging service is a simpple way to dump json maggase to a local file.

from magic_foundation import Container, Service, ServiceStatus, ServiceContext, Main
from magic_foundation.logging_service import LoggingService

...

file_path = "logging_out.log"

class TestService(Service):

  ...  

  async def run(self, ctx:ServiceContext):
    log.info(f"[{self.name}] run")

    ...

  async def terminate(self, ctx:ServiceContext):
    log.info(f"[{self.name}] terminate")

    index = 0;

    while self.status == ServiceStatus.running:
      index += 1
      data = {"cmd": "log", "index": index}

      await ctx.publish(queue_name=f"log://{file_path}", data=data)

      await asyncio.sleep(1.0)
...


if __name__ == "__main__":
    main = Main.instance()

    main.service_pools = {
      "backgound" : [
        LoggingService(file_path=file_path, flush_interval_sec=4.0)
      ],
      "main" : [
        TestService(name="Producer")
      ]
    }

    main.run()

The prefixes log:// refers to the simple logging service (LoggingService) and must be considered reserved.

The complete example is contained in the examples folder.

To trying it run in a shell the command: python examples/dump_to_file.py. In the root of the project a file called logging_out.log will be created and every 4 seconds the logging service will flush the collected messages.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

magic_foundation-0.1.6.tar.gz (9.0 kB view details)

Uploaded Source

Built Distribution

magic_foundation-0.1.6-py3-none-any.whl (8.6 kB view details)

Uploaded Python 3

File details

Details for the file magic_foundation-0.1.6.tar.gz.

File metadata

  • Download URL: magic_foundation-0.1.6.tar.gz
  • Upload date:
  • Size: 9.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.2.0 pkginfo/1.5.0.1 requests/2.24.0 setuptools/41.2.0 requests-toolbelt/0.9.1 tqdm/4.46.1 CPython/3.8.3

File hashes

Hashes for magic_foundation-0.1.6.tar.gz
Algorithm Hash digest
SHA256 3747fa21e28ac6063704c144b90e17d29cf0917c5da49746c1f900e32770dfd0
MD5 ab56a0fcb10f73e904a4484b400d18ea
BLAKE2b-256 0b1ef1c8d14708956a14489a7ab02a403b1963c6be373608e9b3a59a3dfd3d66

See more details on using hashes here.

File details

Details for the file magic_foundation-0.1.6-py3-none-any.whl.

File metadata

  • Download URL: magic_foundation-0.1.6-py3-none-any.whl
  • Upload date:
  • Size: 8.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.2.0 pkginfo/1.5.0.1 requests/2.24.0 setuptools/41.2.0 requests-toolbelt/0.9.1 tqdm/4.46.1 CPython/3.8.3

File hashes

Hashes for magic_foundation-0.1.6-py3-none-any.whl
Algorithm Hash digest
SHA256 d206aceb12e98a0c1d3e47bdfd5f90d412d71655e0faa18652af58fe0f9abbb7
MD5 dbf2789d2851b615436f73ffdbdcb682
BLAKE2b-256 9d4fa08824ed4124040d11cff232cfe0458d3b8dfa35c8419934b2f13381bef0

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page