A Python microservices framework with FastAPI-like interface for seamless inter-service communication
Project description
MicroAPI
A Python microservices framework that lets your services communicate as if calling regular Python functions. Built with a FastAPI-like interface, full Pydantic typing, and multiple transport backends.
Features
- FastAPI-like Interface — Decorate async functions to expose them as RPC methods with
@service.method - Full Type Safety — Pydantic schemas for validation + generated client code with complete type hints
- 5 Transport Backends — gRPC (custom h2), HTTP, WebSocket, Kafka, RabbitMQ
- 4 RPC Patterns — Unary, server streaming, client streaming, bidirectional streaming
- Auto Code Generation — Generate typed Python client libraries and
.protofiles automatically - Middleware & Dependencies — FastAPI-style middleware chain and
Depends()injection - CLI Tool —
microapi run,microapi generate,microapi init,microapi info - Hot Reload — Auto-restart on file changes during development
Installation
# Core
pip install microapi
# With specific transport
pip install microapi[grpc] # gRPC (h2-based)
pip install microapi[http] # HTTP (aiohttp)
pip install microapi[ws] # WebSocket
pip install microapi[kafka] # Apache Kafka
pip install microapi[rabbitmq] # RabbitMQ
# Everything
pip install microapi[all]
Quick Start
1. Define Your Service
# server/schemas.py
from microapi import Schema
class User(Schema):
username: str | None = None
firstname: str | None = None
age: int | None = None
class GetUserPayload(Schema):
user_id: int
fields: list[str] | None = None
# server/service.py
from microapi import Service, types
from schemas import GetUserPayload, User
service = Service("users")
@service.method
async def get_user(payload: GetUserPayload) -> User:
# Fetch from your database
return User(username="alice", firstname="Alice", age=30)
@service.method
async def list_users(payload: GetUserPayload) -> types.Streaming[User]:
users = await db.get_all_users()
for user in users:
yield User.model_validate(user)
2. Run the Server
# server/main.py
from microapi import MicroAPI
from microapi.transport.http import HTTPTransport
from service import service
app = MicroAPI()
app.add_service(service)
if __name__ == "__main__":
app.run(
transport=HTTPTransport(host="0.0.0.0", port=8080),
auto_generate_lib=True,
generated_lib_dir="lib",
)
Or via CLI:
microapi run server.main:app --transport http --port 8080 --generate-lib
3. Use the Generated Client
The auto_generate_lib=True flag creates a fully-typed Python library:
# client.py
import asyncio
from lib import users
from microapi.client.base import Connection
from microapi.transport.http import HTTPTransport
async def main():
transport = HTTPTransport(host="127.0.0.1", port=8080)
conn = Connection(transport.create_client())
async with conn:
# Call it like a regular function — fully typed!
user = await users.get_user(user_id=1)
print(user.username) # IDE autocomplete works!
asyncio.run(main())
RPC Method Patterns
MicroAPI supports all four RPC patterns, detected automatically from type hints:
Unary (Request → Response)
@service.method
async def get_user(payload: GetUserPayload) -> User:
return User(username="alice")
Server Streaming (Request → Stream of Responses)
@service.method
async def list_users(payload: ListPayload) -> types.Streaming[User]:
for user in await get_all():
yield user
Client Streaming (Stream of Requests → Response)
@service.method
async def add_users(stream: types.Stream[User]) -> None:
async for user in stream:
await save(user)
Bidirectional Streaming (Stream ↔ Stream)
@service.method(generated_name="create_return_user")
async def process_users(stream: types.Stream[User]) -> types.Streaming[User]:
async for user in stream:
created = await create(user)
yield created
Transports
gRPC (Custom HTTP/2)
Built from scratch using h2 — no grpcio dependency. Supports standard gRPC wire format with JSON payloads.
from microapi.transport.grpc import GRPCTransport
app.run(transport=GRPCTransport(host="0.0.0.0", port=50051))
Features:
- HTTP/2 with flow control
.protofile generation for cross-language interop- SSL/TLS support
HTTP
Standard HTTP/1.1 transport using aiohttp. Methods map to POST /{service}/{method}.
from microapi.transport.http import HTTPTransport
app.run(transport=HTTPTransport(host="0.0.0.0", port=8080))
Features:
- JSON request/response
- NDJSON for server streaming
- Health check at
GET /health
WebSocket
Persistent connection with multiplexed request/response messages.
from microapi.transport.websocket import WebSocketTransport
app.run(transport=WebSocketTransport(host="0.0.0.0", port=8765))
Features:
- Single connection, multiple concurrent requests
- Full streaming support
- Low latency
Kafka
Event-driven transport using Apache Kafka as the message broker.
from microapi.transport.kafka import KafkaTransport
app.run(transport=KafkaTransport(
bootstrap_servers="kafka:9092",
request_topic="my-requests",
response_topic="my-responses",
))
RabbitMQ
Queue-based transport using RabbitMQ.
from microapi.transport.rabbitmq import RabbitMQTransport
app.run(transport=RabbitMQTransport(
url="amqp://guest:guest@rabbitmq:5672/",
request_queue="my-requests",
))
Middleware
Middleware follows the FastAPI pattern — a chain of functions that wrap the request/response cycle:
from microapi import Middleware
from microapi.protocol import Request, Response
class AuthMiddleware(Middleware):
async def __call__(self, request: Request, call_next) -> Response:
token = request.metadata.get("authorization")
if not token:
return Response(error="Unauthorized", status_code=StatusCode.UNAUTHENTICATED)
return await call_next(request)
class LoggingMiddleware(Middleware):
async def __call__(self, request: Request, call_next) -> Response:
print(f"→ {request.service}.{request.method}")
response = await call_next(request)
print(f"← {response.status_code.name}")
return response
app = MicroAPI()
app.add_middleware(LoggingMiddleware()) # runs first
app.add_middleware(AuthMiddleware()) # runs second
Dependencies
FastAPI-style dependency injection with Depends():
from microapi import Depends, Service, Schema
async def get_database():
db = await create_connection()
return db
async def get_current_user():
return {"id": 1, "name": "admin"}
service = Service("items")
@service.method
async def get_items(
payload: ItemsPayload,
db = Depends(get_database),
user = Depends(get_current_user),
) -> ItemList:
items = await db.query(user["id"])
return ItemList(items=items)
Dependencies are:
- Resolved per-request
- Cached by default (same dependency reused within a request)
- Support both sync and async callables
Code Generation
Python Client Library
microapi generate server.main:app --output lib
Generates:
lib/types.py— Pydantic schemas as client modelslib/{service}.py— Typed async functions and stream classeslib/__init__.py— Convenience imports
Protobuf Files
microapi generate server.main:app --protos --protos-dir protos
Generates standard .proto files for cross-language gRPC interop:
syntax = "proto3";
package users;
message User {
optional string username = 1;
optional string firstname = 2;
optional int64 age = 3;
}
service UsersService {
rpc get_user(GetUserPayload) returns (User);
rpc list_users(ListPayload) returns (stream User);
}
CLI
# Run a server
microapi run server.main:app --transport http --port 8080 --reload
# Generate client library
microapi generate server.main:app --output lib --protos
# Show service info
microapi info server.main:app
# Scaffold a new project
microapi init myservice
# Print version
microapi version
Lifecycle Hooks
app = MicroAPI()
@app.on_startup
async def startup():
print("Server starting...")
await init_database()
@app.on_shutdown
async def shutdown():
print("Server stopping...")
await close_connections()
Or use a lifespan context manager:
from contextlib import asynccontextmanager
@asynccontextmanager
async def lifespan(app):
# Startup
db = await connect_db()
yield
# Shutdown
await db.close()
app = MicroAPI(lifespan=lifespan)
Project Structure
microapi/
├── app.py # MicroAPI application class
├── service.py # Service & @method decorator
├── schema.py # Pydantic Schema base class
├── types.py # Stream, Streaming types
├── middleware.py # Middleware system
├── dependencies.py # Depends() injection
├── protocol.py # Wire protocol messages
├── serialization.py # JSON serialization (orjson)
├── routing.py # Request router/dispatcher
├── transport/
│ ├── base.py # Abstract transport interfaces
│ ├── grpc/ # Custom gRPC (h2-based)
│ ├── http/ # HTTP (aiohttp)
│ ├── websocket/ # WebSocket
│ ├── kafka/ # Apache Kafka
│ └── rabbitmq/ # RabbitMQ
├── client/ # Client SDK base classes
├── generator/ # Code generators (Python + Protobuf)
├── cli/ # CLI tool (typer + rich)
└── hot_reload.py # File watcher
Configuration
All transports accept simple configuration:
# gRPC with SSL
import ssl
ctx = ssl.create_default_context()
GRPCTransport(host="0.0.0.0", port=50051, ssl_context=ctx)
# HTTP
HTTPTransport(host="0.0.0.0", port=8080)
# WebSocket
WebSocketTransport(host="0.0.0.0", port=8765)
# Kafka
KafkaTransport(
bootstrap_servers="kafka1:9092,kafka2:9092",
request_topic="my-requests",
response_topic="my-responses",
group_id="my-service",
)
# RabbitMQ
RabbitMQTransport(
url="amqp://user:pass@rabbitmq:5672/vhost",
request_queue="my-requests",
response_queue="my-responses",
)
Development
# Clone
git clone https://github.com/BiqRed/MicroAPI.git
cd MicroAPI
# Install with dev dependencies
uv sync --all-extras
# Run tests
uv run pytest tests/ -v
# Lint
uv run ruff check microapi/
# Type check
uv run mypy microapi/
License
MIT
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file microapi-1.1.0.tar.gz.
File metadata
- Download URL: microapi-1.1.0.tar.gz
- Upload date:
- Size: 140.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
7ada14319083e6e4ea639b35acc0e2904884a7feec1d61f57e15664875b18023
|
|
| MD5 |
ab4fe200c1e9b2e69aaf61f01baaac3d
|
|
| BLAKE2b-256 |
5b44e79fea2737e5ba78f6e3fccc70a19acb2d30091eb13a36c1476e998ca03b
|
Provenance
The following attestation bundles were made for microapi-1.1.0.tar.gz:
Publisher:
release.yml on BiqRed/MicroAPI
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
microapi-1.1.0.tar.gz -
Subject digest:
7ada14319083e6e4ea639b35acc0e2904884a7feec1d61f57e15664875b18023 - Sigstore transparency entry: 928408744
- Sigstore integration time:
-
Permalink:
BiqRed/MicroAPI@1e82ee34f5c2b4d964a4a9018fadad566c719195 -
Branch / Tag:
refs/tags/v1.1.0 - Owner: https://github.com/BiqRed
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@1e82ee34f5c2b4d964a4a9018fadad566c719195 -
Trigger Event:
push
-
Statement type:
File details
Details for the file microapi-1.1.0-py3-none-any.whl.
File metadata
- Download URL: microapi-1.1.0-py3-none-any.whl
- Upload date:
- Size: 56.2 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
a298cb189f8fed60b105b5eebc8819cf560e3d052aa81b32f0112e35684cec88
|
|
| MD5 |
030982ddc784d2dfb4cf9f67ec5d1ffe
|
|
| BLAKE2b-256 |
7555a0658dd5982242450c82a928988d51c3613645486047f6a6527e434ee965
|
Provenance
The following attestation bundles were made for microapi-1.1.0-py3-none-any.whl:
Publisher:
release.yml on BiqRed/MicroAPI
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
microapi-1.1.0-py3-none-any.whl -
Subject digest:
a298cb189f8fed60b105b5eebc8819cf560e3d052aa81b32f0112e35684cec88 - Sigstore transparency entry: 928408745
- Sigstore integration time:
-
Permalink:
BiqRed/MicroAPI@1e82ee34f5c2b4d964a4a9018fadad566c719195 -
Branch / Tag:
refs/tags/v1.1.0 - Owner: https://github.com/BiqRed
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@1e82ee34f5c2b4d964a4a9018fadad566c719195 -
Trigger Event:
push
-
Statement type: