A simple and fast API framework for Python
Project description
Jararaca Microservice Framework
Overview
Jararaca is a aio-first microservice framework that provides a set of tools to build and deploy microservices in a simple and clear way.
Features
Hexagonal Architecture
The framework is based on the hexagonal architecture, which allows you to separate the business logic from the infrastructure, making the code more testable and maintainable.
Dependency Injection
The framework uses the dependency injection pattern to manage the dependencies between the components of the application.
app = Microservice(
providers=[
ProviderSpec(
provide=Token(AuthConfig, "AUTH_CONFIG"),
use_value=AuthConfig(
secret="secret",
identity_refresh_token_expires_delta_seconds=60 * 60 * 24 * 30,
identity_token_expires_delta_seconds=60 * 60,
),
),
ProviderSpec(
provide=Token(AppConfig, "APP_CONFIG"),
use_factory=AppConfig.provider,
),
ProviderSpec(
provide=TokenBlackListService,
use_value=InMemoryTokenBlackListService(),
),
],
)
Web Server Port
The framework provides a web server that listens on a specific port and routes the requests to the appropriate handler. It uses FastAPI as the web framework.
@Delete("/{task_id}")
async def delete_task(self, task_id: TaskId) -> None:
await self.tasks_crud.delete_by_id(task_id)
await use_ws_manager().broadcast(("Task %s deleted" % task_id).encode())
Message Bus
The framework provides a topic-based message bus that allows you to send messages between the components of the application. It uses AIO Pika as the message broker worker and publisher.
@IncomingHandler("task")
async def process_task(self, message: Message[Identifiable[TaskSchema]]) -> None:
name = generate_random_name()
now = asyncio.get_event_loop().time()
print("Processing task: ", name)
task = message.payload()
print("Received task: ", task)
await asyncio.sleep(random.randint(1, 5))
await use_publisher().publish(task, topic="task")
then = asyncio.get_event_loop().time()
print("Task Finished: ", name, " Time: ", then - now)
Distributed Websocket
You can setup a room-based websocket server that allows you to send messages to a specific room or broadcast messages to all connected clients. All backend instances communicates with each other using a pub/sub mechanism (such as Redis).
@WebSocketEndpoint("/ws")
async def ws_endpoint(self, websocket: WebSocket) -> None:
await websocket.accept()
counter.increment()
await use_ws_manager().add_websocket(websocket)
await use_ws_manager().join(["tasks"], websocket)
await use_ws_manager().broadcast(
("New Connection (%d) from %s" % (counter.count, self.hostname)).encode()
)
print("New Connection (%d)" % counter.count)
while True:
try:
await websocket.receive_text()
except WebSocketDisconnect:
counter.decrement()
await use_ws_manager().remove_websocket(websocket)
await use_ws_manager().broadcast(
(
"Connection Closed (%d) from %s"
% (counter.count, self.hostname)
).encode()
)
print("Connection Closed (%d)" % counter.count)
break
Scheduled Routine
You can setup a scheduled routine that runs a specific task at a specific time or interval.
...
@ScheduledAction("* * * * * */3", allow_overlap=False)
async def scheduled_task(self) -> None:
print("Scheduled Task at ", asyncio.get_event_loop().time())
print("sleeping")
await asyncio.sleep(5)
await use_publisher().publish(
message=Identifiable(
id=uuid4(),
data=TaskSchema(name=generate_random_name()),
),
topic="task",
)
Observability
You can setup Observability Interceptors for logs, traces and metric collection with OpenTelemetry-based Protocols
class HelloService:
def __init__(
self,
hello_rpc: Annotated[HelloRPC, Token(HelloRPC, "HELLO_RPC")],
):
self.hello_rpc = hello_rpc
@TracedFunc("ping") # Decorator for tracing
async def ping(self) -> HelloResponse:
return await self.hello_rpc.ping()
@TracedFunc("hello-service")
async def hello(
self,
gather: bool,
) -> HelloResponse:
now = asyncio.get_event_loop().time()
if gather:
await asyncio.gather(*[self.random_await(a) for a in range(10)])
else:
for a in range(10):
await self.random_await(a)
return HelloResponse(
message="Elapsed time: {}".format(asyncio.get_event_loop().time() - now)
)
@TracedFunc("random-await")
async def random_await(self, index: int) -> None:
logger.info("Random await %s", index, extra={"index": index})
await asyncio.sleep(random.randint(1, 3))
logger.info("Random await %s done", index, extra={"index": index})
Installation
pip install jararaca
Usage
Create a Microservice
# app.py
from jararaca import Microservice, create_http_server, create_messagebus_worker
from jararaca.presentation.http_microservice import HttpMicroservice
app = Microservice(
providers=[
ProviderSpec(
provide=Token[AppConfig],
use_factory=AppConfig.provider,
)
],
controllers=[TasksController],
interceptors=[
AIOSqlAlchemySessionInterceptor(
AIOSQAConfig(
connection_name="default",
url="sqlite+aiosqlite:///db.sqlite3",
)
),
],
)
# App for specific Http Configuration Context
http_app = HttpMicroservice(app)
web_app = create_http_server(app)
Run as a Web Server
uvicorn app:web_app --reload
# or
jararaca server app:app
# or
jararaca server app:http_app
Run as a Message Bus Worker
jararaca worker app:app
Run as a scheduled routine
jararaca scheduler app:app
Generate Typescript intefaces from microservice app controllers
jararaca gen-tsi app.main:app app.ts
Documentation
Documentation is under construction here.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Hashes for jararaca-0.1.51-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | c2fd734d7a0037a08d7fd3e52f115b506f05811cff3b930b03edcbbe47dc5e53 |
|
MD5 | 795cda45d7ffa3ed102c248f495c5247 |
|
BLAKE2b-256 | 80510bf47208cbba7b47d2e0363bec3daa805d8e29abc0731d6f83d344b6f8cb |