Skip to main content

A simple and fast API framework for Python

Project description

README.md

Jararaca Microservice Framework

Overview

Jararaca is a aio-first microservice framework that provides a set of tools to build and deploy microservices in a simple and clear way.

Features

Hexagonal Architecture

The framework is based on the hexagonal architecture, which allows you to separate the business logic from the infrastructure, making the code more testable and maintainable.

Dependency Injection

The framework uses the dependency injection pattern to manage the dependencies between the components of the application.

    app = Microservice(
        providers=[
            ProviderSpec(
                provide=Token(AuthConfig, "AUTH_CONFIG"),
                use_value=AuthConfig(
                    secret="secret",
                    identity_refresh_token_expires_delta_seconds=60 * 60 * 24 * 30,
                    identity_token_expires_delta_seconds=60 * 60,
                ),
            ),
            ProviderSpec(
                provide=Token(AppConfig, "APP_CONFIG"),
                use_factory=AppConfig.provider,
            ),
            ProviderSpec(
                provide=TokenBlackListService,
                use_value=InMemoryTokenBlackListService(),
            ),
        ],
    )

Web Server Port

The framework provides a web server that listens on a specific port and routes the requests to the appropriate handler. It uses FastAPI as the web framework.

    @Delete("/{task_id}")
    async def delete_task(self, task_id: TaskId) -> None:
        await self.tasks_crud.delete_by_id(task_id)

        await use_ws_manager().broadcast(("Task %s deleted" % task_id).encode())

Message Bus

The framework provides a topic-based message bus that allows you to send messages between the components of the application. It uses AIO Pika as the message broker worker and publisher.

    @IncomingHandler("task")
    async def process_task(self, message: Message[Identifiable[TaskSchema]]) -> None:
        name = generate_random_name()
        now = asyncio.get_event_loop().time()
        print("Processing task: ", name)

        task = message.payload()

        print("Received task: ", task)
        await asyncio.sleep(random.randint(1, 5))

        await use_publisher().publish(task, topic="task")

        then = asyncio.get_event_loop().time()
        print("Task Finished: ", name, " Time: ", then - now)

Distributed Websocket

You can setup a room-based websocket server that allows you to send messages to a specific room or broadcast messages to all connected clients. All backend instances communicates with each other using a pub/sub mechanism (such as Redis).

    @WebSocketEndpoint("/ws")
    async def ws_endpoint(self, websocket: WebSocket) -> None:
        await websocket.accept()
        counter.increment()
        await use_ws_manager().add_websocket(websocket)
        await use_ws_manager().join(["tasks"], websocket)
        await use_ws_manager().broadcast(
            ("New Connection (%d) from %s" % (counter.count, self.hostname)).encode()
        )

        print("New Connection (%d)" % counter.count)

        while True:
            try:
                await websocket.receive_text()
            except WebSocketDisconnect:
                counter.decrement()
                await use_ws_manager().remove_websocket(websocket)

                await use_ws_manager().broadcast(
                    (
                        "Connection Closed (%d) from %s"
                        % (counter.count, self.hostname)
                    ).encode()
                )
                print("Connection Closed (%d)" % counter.count)
                break

Scheduled Routine

You can setup a scheduled routine that runs a specific task at a specific time or interval.

...
    @ScheduledAction("* * * * * */3", allow_overlap=False)
    async def scheduled_task(self) -> None:
        print("Scheduled Task at ", asyncio.get_event_loop().time())

        print("sleeping")
        await asyncio.sleep(5)

        await use_publisher().publish(
            message=Identifiable(
                id=uuid4(),
                data=TaskSchema(name=generate_random_name()),
            ),
            topic="task",
        )

Observability

You can setup Observability Interceptors for logs, traces and metric collection with OpenTelemetry-based Protocols

class HelloService:
    def __init__(
        self,
        hello_rpc: Annotated[HelloRPC, Token(HelloRPC, "HELLO_RPC")],
    ):
        self.hello_rpc = hello_rpc

    @TracedFunc("ping") # Decorator for tracing
    async def ping(self) -> HelloResponse:
        return await self.hello_rpc.ping()

    @TracedFunc("hello-service")
    async def hello(
        self,
        gather: bool,
    ) -> HelloResponse:
        now = asyncio.get_event_loop().time()
        if gather:
            await asyncio.gather(*[self.random_await(a) for a in range(10)])
        else:
            for a in range(10):
                await self.random_await(a)
        return HelloResponse(
            message="Elapsed time: {}".format(asyncio.get_event_loop().time() - now)
        )

    @TracedFunc("random-await")
    async def random_await(self, index: int) -> None:
        logger.info("Random await %s", index, extra={"index": index})
        await asyncio.sleep(random.randint(1, 3))
        logger.info("Random await %s done", index, extra={"index": index})

Installation

pip install jararaca

Usage

Create a Microservice

# app.py

from jararaca import Microservice, create_http_server, create_messagebus_worker
from jararaca.presentation.http_microservice import HttpMicroservice

app = Microservice(
    providers=[
        ProviderSpec(
            provide=Token[AppConfig],
            use_factory=AppConfig.provider,
        )
    ],
    controllers=[TasksController],
    interceptors=[
        AIOSqlAlchemySessionInterceptor(
            AIOSQAConfig(
                connection_name="default",
                url="sqlite+aiosqlite:///db.sqlite3",
            )
        ),
    ],
)


# App for specific Http Configuration Context
http_app = HttpMicroservice(app)

web_app = create_http_server(app)

Run as a Web Server

uvicorn app:web_app --reload
# or
jararaca server app:app
# or
jararaca server app:http_app

Run as a Message Bus Worker

jararaca worker app:app

Run as a scheduled routine

jararaca scheduler app:app

Generate Typescript intefaces from microservice app controllers

jararaca gen-tsi app.main:app app.ts

Documentation

Documentation is under construction here.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

jararaca-0.1.52.tar.gz (311.2 kB view hashes)

Uploaded Source

Built Distribution

jararaca-0.1.52-py3-none-any.whl (50.2 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page