comotore - component engine library - simple actor model realisation
Project description
Comotore - component engine library - simple actor model realisation.
Examples
Client/server example
import asyncio
import logging
import sys
from comotore import (Engine, Component)
from comotore.logging.formatters import PlainFormatter
from comotore import runner
class Client(Component):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self._name = kwargs.get("name", None)
async def construct(self):
async def process():
req = await self.request("{name}".format(name=self._name))
async for res in req.response():
self.logger["Client"].info(
"Response {name}".format(name=self._name), corr_id=req.correlation_id, payload=res
)
self.logger["Client"].info("Responses completed {name}".format(name=self._name))
await self.completed()
self.fly(process())
@Component.signal
def request(self, data):
pass
@Component.signal
def completed(self, _):
pass
class Server(Component):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self._name = kwargs.get("name", None)
async def response(self, request):
self.logger["Server"].info("Request", payload=request.payload)
for i in range(2):
yield "{name} {payload} {number}".format(name=self._name, payload=request.payload, number=i)
await asyncio.sleep(1)
async def main():
engine = Engine()
engine.logger.setLevel(logging.INFO)
handler = logging.StreamHandler(sys.stderr)
formatter = PlainFormatter(
"{asctime} - {levelname}: [{tag}] hostname={hostname}, pid={process}, tid={thread}> {message} {data}", style="{"
)
handler.setFormatter(formatter)
engine.logger.addHandler(handler)
server_first = engine.component(Server, name="first")
server_second = engine.component(Server, name="second")
client = engine.component(
Client, name="client_1",
start_waiting_for=[server_first.started, server_second.started]
)
engine.call(client.request, server_first.response)
engine.call(client.request, server_second.response)
engine.cast(client.completed, engine.avatar.quit)
print(engine)
print("-" * 40)
await engine
if __name__ == "__main__":
runner.run(main())
Producer/consumer example
import asyncio
import logging
import uuid
import sys
from comotore import (Engine, Component)
from comotore.logging.formatters import PlainFormatter
from comotore import runner
class Producer(Component):
async def construct(self):
async def produce():
try:
i = 0
while True:
payload = uuid.uuid4()
await self.produced(payload)
self.logger["Producer"].info("Produce", payload=payload)
await asyncio.sleep(2)
i += 1
if i > 2:
break
await self.completed()
except asyncio.CancelledError:
pass
self.fly(produce())
@Component.signal
def produced(self, uniq_id):
pass
@Component.signal
def completed(self, _):
pass
class Consumer(Component):
async def consume1(self, signal):
self.logger["Consumer"].info("Consume 1", payload=signal.payload)
async def consume2(self, signal):
self.logger["Consumer"].info("Consume 2", payload=signal.payload)
async def main():
engine = Engine()
engine.logger.setLevel(logging.INFO)
handler = logging.StreamHandler(sys.stderr)
formatter = PlainFormatter(
"{asctime} - {levelname}: [{tag}] hostname={hostname}, pid={process}, tid={thread}> {message} {data}", style="{"
)
handler.setFormatter(formatter)
engine.logger.addHandler(handler)
consumer = engine.component(Consumer)
producer = engine.component(Producer, start_waiting_for=[consumer.started])
engine.cast(producer.produced, consumer.consume1)
engine.cast(producer.produced, consumer.consume2)
engine.cast(producer.completed, engine.avatar.quit)
print(engine)
print("-" * 40)
await engine
if __name__ == "__main__":
runner.run(main())
Invoke example
import asyncio
import logging
import sys
from comotore import (Engine, Component)
from comotore.logging.formatters import PlainFormatter
from comotore import runner
class Worker(Component):
@Component.signal
def completed(self):
pass
async def cast_work(self, signal):
self.logger["Worker"].info("Cast work was invoked with", payload=signal.payload)
await self.completed()
async def call_work(self, signal):
self.logger["Worker"].info("Call work was invoked with", payload=signal.payload)
for i in range(2):
yield "Call work response {number}".format(number=i)
await asyncio.sleep(1)
async def invoker(eng, cast_target, call_target):
eng.logger["Invoker"].info("Invoke in 2 seconds")
await asyncio.sleep(2)
eng.logger["Invoker"].info("Invoke call")
req = await eng.invoke_call(call_target, payload="Some call data")
async for res in req.response():
eng.logger["Invoker"].info(res)
await eng.invoke_cast(cast_target, payload="Some cast data")
async def main():
engine = Engine()
engine.logger.setLevel(logging.INFO)
handler = logging.StreamHandler(sys.stderr)
formatter = PlainFormatter(
"{asctime} - {levelname}: [{tag}] hostname={hostname}, pid={process}, tid={thread}> {message} {data}", style="{"
)
handler.setFormatter(formatter)
engine.logger.addHandler(handler)
worker = engine.component(Worker)
engine.cast(worker.completed, engine.avatar.quit)
print(engine)
print("-" * 40)
asyncio.ensure_future(invoker(engine, worker.cast_work, worker.call_work), loop=engine.loop)
await engine
if __name__ == "__main__":
runner.run(main())
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distributions
No source distribution files available for this release.See tutorial on generating distribution archives.
Built Distribution
comotore-19.12.2-py3-none-any.whl
(21.3 kB
view hashes)
Close
Hashes for comotore-19.12.2-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 6b3ce833386718b2e1a5c5113ccd188bd447259b6e22c874366685403a502953 |
|
MD5 | 25e10cfb8a4a121682ce27d25688b40a |
|
BLAKE2b-256 | a3e31359d0aff7541f8c51b59e6329db7f32c65e998baec9bf1bb01ee853c3ca |