Skip to main content

No project description provided

Project description

Arrlio [WIP]

Documentation (WIP)

Asyncio distributed task/workflow system with supports generators and graphs

tests coverage License: MIT

Installation

pip install arrlio

Or to use latest develop version

pip install git+https://github.com/levsh/arrlio

Usage

Create tasks file

# tasks.py

import io

import arrlio
import invoke

@arrlio.task
async def hello_world():
    return "Hello World!"

# task custom name
@arrlio.task(name="foo")
async def foo():
    arrlio.logger.info("Hello from task 'foo'!")

# exception example
@arrlio.task
async def exception():
    raise ZeroDivisionError

# Arrlio supports generators and async generators
@arrlio.task
def xrange(count):
    for x in range(count):
        yield x

@arrlio.task
async def add_one(value: str):
    return int(value) + 1


@arrlio.task
async def bash(cmd, stdin: str = None):
    in_stream = io.StringIO(stdin)
    out_stream = io.StringIO()
    result = invoke.run(
        cmd,
        in_stream=in_stream,
        out_stream=out_stream
    )
    return result.stdout

Create main file and run it

import asyncio
import logging

import arrlio
import tasks

logger = logging.getLogger("arrlio")
logger.setLevel("INFO")

BACKEND = "arrlio.backends.local"
# BACKEND = "arrlio.backends.rabbitmq"

async def main():
    app = arrlio.App(arrlio.Config(backend={"module": BACKEND}))

    async with app:
        await app.consume_tasks()

        # call by task object
        ar = await app.send_task(tasks.hello_world)
        logger.info(await ar.get())

        # call by task name
        ar = await app.send_task("foo")
        logger.info(await ar.get())

        # task args example
        ar = await app.send_task(tasks.add_one, args=(1,))
        logger.info(await ar.get())

        # exception example
        try:
            ar = await app.send_task(tasks.exception)
            logger.info(await ar.get())
        except Exception as e:
            print(f"\nThis is example exception for {app.backend}:\n")
            logger.exception(e)
            print()

        # generator example
        results = []
        ar = await app.send_task(tasks.xrange, args=(3,))
        async for result in ar:
            results.append(result)
        logger.info(results)  # -> [0, 1, 2]


if __name__ == "__main__":
    asyncio.run(main())

Arrlio supports graph execution

import asyncio
import logging

import arrlio
import tasks

logger = logging.getLogger("arrlio")
logger.setLevel("INFO")

BACKEND = "arrlio.backends.local"
# BACKEND = "arrlio.backends.rabbitmq"


async def main():
    graph = arrlio.Graph("My Graph")
    graph.add_node("A", tasks.add_one, root=True)
    graph.add_node("B", tasks.add_one)
    graph.add_node("C", tasks.add_one)
    graph.add_edge("A", "B")
    graph.add_edge("B", "C")

	# arrlio.plugins.events and arrlio.plugins.graphs
	# plugins are required
    app = arrlio.App(
        arrlio.Config(
            backend={"module": BACKEND},
            plugins=[
                {"module": "arrlio.plugins.events"},
                {"module": "arrlio.plugins.graphs"},
            ],
        )
    )

    async with app:
        await app.consume_tasks()

		# execute graph with argument 0
        ars = await app.send_graph(graph, args=(0,))
        logger.info("A: %i", await ars["A"].get())  # -> A: 1
        logger.info("B: %i", await ars["B"].get())  # -> B: 2
        logger.info("C: %i", await ars["C"].get())  # -> C: 3


if __name__ == "__main__":
    asyncio.run(main())

Another graph example

import asyncio
import logging

import arrlio
import tasks

logger = logging.getLogger("arrlio")
logger.setLevel("INFO")

BACKEND = "arrlio.backends.local"
# BACKEND = "arrlio.backends.rabbitmq"


async def main():
    graph = arrlio.Graph("My Graph")
    graph.add_node("A", tasks.bash, root=True)
    graph.add_node("B", tasks.bash, args=("wc -w",))
    graph.add_edge("A", "B")

    app = arrlio.App(
        arrlio.Config(
            backend={"module": BACKEND},
            plugins=[
                {"module": "arrlio.plugins.events"},
                {"module": "arrlio.plugins.graphs"},
            ],
        )
    )

    async with app:
        await app.consume_tasks()

        ars = await app.send_graph(
            graph,
            args=('echo "Number of words in this sentence:"',)
        )
        logger.info(await asyncio.wait_for(ars["B"].get(), timeout=2))  # -> 6


if __name__ == "__main__":
    asyncio.run(main())

And more examples

poetry install
poetry run python examples/main.py

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

arrlio-0.24.0.tar.gz (31.2 kB view details)

Uploaded Source

Built Distribution

arrlio-0.24.0-py3-none-any.whl (38.6 kB view details)

Uploaded Python 3

File details

Details for the file arrlio-0.24.0.tar.gz.

File metadata

  • Download URL: arrlio-0.24.0.tar.gz
  • Upload date:
  • Size: 31.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.6.1 CPython/3.11.5 Darwin/20.6.0

File hashes

Hashes for arrlio-0.24.0.tar.gz
Algorithm Hash digest
SHA256 21d97401ea758ee67057f5e4e968d48c9cc8d0408aa66c421c6852f4be902576
MD5 8cdf442b670314811e0657e464ad85f6
BLAKE2b-256 13569b41d594715ee27c3b393a9c86e90ef089b6f43f4ff1c73d0af5f3e471fd

See more details on using hashes here.

File details

Details for the file arrlio-0.24.0-py3-none-any.whl.

File metadata

  • Download URL: arrlio-0.24.0-py3-none-any.whl
  • Upload date:
  • Size: 38.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.6.1 CPython/3.11.5 Darwin/20.6.0

File hashes

Hashes for arrlio-0.24.0-py3-none-any.whl
Algorithm Hash digest
SHA256 2c5510efc3a87afddf9fdb760215a99b82ef08a315bd18a35dfa946ffda0c28a
MD5 555beb95fa314375076373d8936b345a
BLAKE2b-256 267341f40d2652a64125788ff101aae52942ff2f5716ac8f9752346db6b9aa62

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page