No project description provided
Project description
Arrlio [WIP]
Documentation (WIP)
Asyncio distributed task/workflow system with supports generators and graphs
Installation
pip install arrlio
Or to use latest develop version
pip install git+https://github.com/levsh/arrlio
Usage
Create tasks file
# tasks.py
import io
import arrlio
import invoke
@arrlio.task
async def hello_world():
return "Hello World!"
# task custom name
@arrlio.task(name="foo")
async def foo():
arrlio.logger.info("Hello from task 'foo'!")
# exception example
@arrlio.task
async def exception():
raise ZeroDivisionError
# Arrlio supports generators and async generators
@arrlio.task
def xrange(count):
for x in range(count):
yield x
@arrlio.task
async def add_one(value: str):
return int(value) + 1
@arrlio.task
async def bash(cmd, stdin: str = None):
in_stream = io.StringIO(stdin)
out_stream = io.StringIO()
result = invoke.run(
cmd,
in_stream=in_stream,
out_stream=out_stream
)
return result.stdout
Create main file and run it
import asyncio
import logging
import arrlio
import tasks
logger = logging.getLogger("arrlio")
logger.setLevel("INFO")
BACKEND = "arrlio.backends.local"
# BACKEND = "arrlio.backends.rabbitmq"
async def main():
app = arrlio.App(arrlio.Config(backend={"module": BACKEND}))
async with app:
await app.consume_tasks()
# call by task object
ar = await app.send_task(tasks.hello_world)
logger.info(await ar.get())
# call by task name
ar = await app.send_task("foo")
logger.info(await ar.get())
# task args example
ar = await app.send_task(tasks.add_one, args=(1,))
logger.info(await ar.get())
# exception example
try:
ar = await app.send_task(tasks.exception)
logger.info(await ar.get())
except Exception as e:
print(f"\nThis is example exception for {app.backend}:\n")
logger.exception(e)
print()
# generator example
results = []
ar = await app.send_task(tasks.xrange, args=(3,))
async for result in ar:
results.append(result)
logger.info(results) # -> [0, 1, 2]
if __name__ == "__main__":
asyncio.run(main())
Arrlio supports graph execution
import asyncio
import logging
import arrlio
import tasks
logger = logging.getLogger("arrlio")
logger.setLevel("INFO")
BACKEND = "arrlio.backends.local"
# BACKEND = "arrlio.backends.rabbitmq"
async def main():
graph = arrlio.Graph("My Graph")
graph.add_node("A", tasks.add_one, root=True)
graph.add_node("B", tasks.add_one)
graph.add_node("C", tasks.add_one)
graph.add_edge("A", "B")
graph.add_edge("B", "C")
# arrlio.plugins.events and arrlio.plugins.graphs
# plugins are required
app = arrlio.App(
arrlio.Config(
backend={"module": BACKEND},
plugins=[
{"module": "arrlio.plugins.events"},
{"module": "arrlio.plugins.graphs"},
],
)
)
async with app:
await app.consume_tasks()
# execute graph with argument 0
ars = await app.send_graph(graph, args=(0,))
logger.info("A: %i", await ars["A"].get()) # -> A: 1
logger.info("B: %i", await ars["B"].get()) # -> B: 2
logger.info("C: %i", await ars["C"].get()) # -> C: 3
if __name__ == "__main__":
asyncio.run(main())
Another graph example
import asyncio
import logging
import arrlio
import tasks
logger = logging.getLogger("arrlio")
logger.setLevel("INFO")
BACKEND = "arrlio.backends.local"
# BACKEND = "arrlio.backends.rabbitmq"
async def main():
graph = arrlio.Graph("My Graph")
graph.add_node("A", tasks.bash, root=True)
graph.add_node("B", tasks.bash, args=("wc -w",))
graph.add_edge("A", "B")
app = arrlio.App(
arrlio.Config(
backend={"module": BACKEND},
plugins=[
{"module": "arrlio.plugins.events"},
{"module": "arrlio.plugins.graphs"},
],
)
)
async with app:
await app.consume_tasks()
ars = await app.send_graph(
graph,
args=('echo "Number of words in this sentence:"',)
)
logger.info(await asyncio.wait_for(ars["B"].get(), timeout=2)) # -> 6
if __name__ == "__main__":
asyncio.run(main())
And more examples
poetry install
poetry run python examples/main.py
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
arrlio-0.24.0.tar.gz
(31.2 kB
view details)
Built Distribution
arrlio-0.24.0-py3-none-any.whl
(38.6 kB
view details)
File details
Details for the file arrlio-0.24.0.tar.gz
.
File metadata
- Download URL: arrlio-0.24.0.tar.gz
- Upload date:
- Size: 31.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.6.1 CPython/3.11.5 Darwin/20.6.0
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 21d97401ea758ee67057f5e4e968d48c9cc8d0408aa66c421c6852f4be902576 |
|
MD5 | 8cdf442b670314811e0657e464ad85f6 |
|
BLAKE2b-256 | 13569b41d594715ee27c3b393a9c86e90ef089b6f43f4ff1c73d0af5f3e471fd |
File details
Details for the file arrlio-0.24.0-py3-none-any.whl
.
File metadata
- Download URL: arrlio-0.24.0-py3-none-any.whl
- Upload date:
- Size: 38.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.6.1 CPython/3.11.5 Darwin/20.6.0
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 2c5510efc3a87afddf9fdb760215a99b82ef08a315bd18a35dfa946ffda0c28a |
|
MD5 | 555beb95fa314375076373d8936b345a |
|
BLAKE2b-256 | 267341f40d2652a64125788ff101aae52942ff2f5716ac8f9752346db6b9aa62 |