A modular Python framework for backend, AI, and data projects
Project description
Minix
Minix is a modular Python framework for building backend, AI, and data-driven applications. It provides a clean, layered architecture with built-in support for REST APIs, task scheduling, message queues, and machine learning workflows.
Table of Contents
- Key Features
- Architecture Overview
- Installation
- Core Concepts
- Configuration
- Extras
- Registry Usage
- CLI Commands
- License
- Contributing
- Author
Key Features
- FastAPI Integration: Build high-performance REST APIs with automatic OpenAPI documentation
- Modular Architecture: Organize code into self-contained modules with entities, repositories, services, and controllers
- Multi-Database Support: Built-in connectors for MySQL, ClickHouse, Redis, and Qdrant (vector DB)
- Task Scheduling: Celery-powered background tasks with RedBeat scheduler for periodic jobs
- Kafka Consumers: Async Kafka message processing with
aiokafka - Object Storage: S3-compatible storage support via
boto3 - ML Workflows: Optional MLflow integration for model versioning and deployment
- Dependency Registry: Singleton-based service container for clean dependency injection
- Environment Management: Configuration via
.envfiles usingdotenv
Architecture Overview
┌─────────────────────────────────────────────────────────────────┐
│ Application │
├─────────────────────────────────────────────────────────────────┤
│ Bootstrap → Registers Connectors & Modules │
├─────────────────────────────────────────────────────────────────┤
│ Modules │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │Controller│ │ Service │ │Repository│ │ Entity │ │
│ │ (API) │→│ (Logic) │→│ (Data) │→│ (Model) │ │
│ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │
├─────────────────────────────────────────────────────────────────┤
│ Tasks (Celery) │ Consumers (Kafka) │ Models (MLflow) │
├─────────────────────────────────────────────────────────────────┤
│ Connectors │
│ SQL (MySQL/ClickHouse) │ Redis │ Qdrant │ Object Storage (S3) │
└─────────────────────────────────────────────────────────────────┘
Installation
Requirements
- Python 3.10 or higher
Install via pip
pip install minix
Install from source (development)
git clone <repository-url>
cd minix
pip install -e .
Core Concepts
Bootstrap
The bootstrap function initializes your application by registering connectors and modules:
from minix.core.bootstrap import bootstrap
bootstrap(
modules=[Module1(), Module2()],
connectors=[
(sql_connector, None), # Default connector
(sql_connector_2, "analytics") # Named connector with salt
]
)
Modules
Modules are self-contained units that group related functionality:
from minix.core.module.business_module import BusinessModule
class ProductModule(BusinessModule):
def __init__(self):
super().__init__("product")
self.add_entity(ProductEntity)
self.add_repository(ProductRepository)
self.add_service(ProductService)
self.add_controller(ProductController)
self.add_periodic_task(SyncProductsTask)
self.add_consumer(ProductEventConsumer)
Module Methods:
add_entity(entity)- Register a data entityadd_repository(repository, connector_salt)- Register a repository with optional connectoradd_service(service)- Register a serviceadd_controller(controller)- Register an API controlleradd_task(task)- Register an async taskadd_periodic_task(periodic_task)- Register a scheduled taskadd_consumer(consumer)- Register a Kafka consumeradd_model(model, config)- Register an ML model
Entities
Entities define your data models:
SQL Entity
from sqlalchemy import String
from sqlalchemy.orm import Mapped, mapped_column
from minix.core.entity import SqlEntity
class UserEntity(SqlEntity):
__tablename__ = "users"
name: Mapped[str] = mapped_column(String(100), nullable=False)
email: Mapped[str] = mapped_column(String(255), unique=True, nullable=False)
SQL entities automatically include id, created_at, and updated_at columns.
Qdrant Entity (Vector DB)
from minix.core.entity import QdrantEntity
class DocumentEntity(QdrantEntity):
content: str
@staticmethod
def collection() -> str:
return "documents"
Redis Entity
from minix.core.entity import RedisEntity
class CacheEntity(RedisEntity):
pass
Repositories
Repositories handle data access:
SQL Repository
from minix.core.repository import SqlRepository
class UserRepository(SqlRepository[UserEntity]):
pass
Built-in methods:
save(entity)- Save a single entitysave_all(entities)- Save multiple entitiessave_bulk(entities, chunk_size)- Bulk insert with chunkingget_all()- Get all entitiesget_by_id(id)- Get by IDget_by(**kwargs)- Filter by attributesupdate(entity)- Update an entitydelete(entity)- Delete an entity
Qdrant Repository
from minix.core.repository import QdrantRepository
class DocumentRepository(QdrantRepository[DocumentEntity]):
pass
Methods:
insert(entities)- Insert vectorsquery(entity, top_k)- Similarity searchdelete(entity_ids)- Delete vectors
Services
Services contain business logic:
from minix.core.service import Service
class UserService(Service[UserEntity]):
def get_active_users(self):
return self.repository.get_by(status="active")
def create_user(self, name: str, email: str) -> UserEntity:
user = UserEntity(name=name, email=email)
return self.repository.save(user)
Controllers
Controllers define REST API endpoints:
from minix.core.controller import Controller
from minix.core.registry import Registry
class UserController(Controller):
def get_prefix(self):
return "/users"
def define_routes(self):
@self.router.get("/")
def get_users():
service = Registry().get(UserService)
return service.get_repository().get_all()
@self.router.post("/")
def create_user(name: str, email: str):
service = Registry().get(UserService)
return service.create_user(name, email)
Connectors
SQL Connector (MySQL/ClickHouse)
from minix.core.connectors.sql_connector import SqlConnector, SqlConnectorConfig
config = SqlConnectorConfig(
username="root",
password="password",
host="localhost",
port=3306,
database="mydb",
driver="mysql", # or "clickhouse"
connect_timeout=30,
pool_recycle=3600
)
connector = SqlConnector(config)
Qdrant Connector
from minix.core.connectors.qdrant_connector import QdrantConnector
connector = QdrantConnector(
url="http://localhost:6333",
api_key="your-api-key"
)
await connector.connect()
Object Storage Connector (S3-compatible)
from minix.core.connectors.object_storage_connector import ObjectStorageConnector
from minix.core.connectors.object_storage_connector.config import ObjectStorageConfig
config = ObjectStorageConfig(
endpoint_url="https://s3.amazonaws.com",
access_key="your-access-key",
secret_key="your-secret-key",
bucket_name="my-bucket"
)
connector = ObjectStorageConnector(config)
# Upload, download, delete files
await connector.upload_file(file_obj, "path/to/file.txt")
await connector.download_file("path/to/file.txt", local_file)
await connector.generate_presigned_url("path/to/file.txt", expiration=3600)
Tasks & Scheduling
Async Task
from minix.core.scheduler.task import Task
class ProcessOrderTask(Task):
def get_name(self) -> str:
return "process_order"
def run(self, order_id: int):
# Process the order
pass
Periodic Task
from celery.schedules import crontab
from minix.core.scheduler.task import PeriodicTask
class DailyReportTask(PeriodicTask):
def get_name(self) -> str:
return "daily_report"
def get_schedule(self) -> crontab:
return crontab(hour=0, minute=0) # Run at midnight
def run(self):
# Generate daily report
pass
Running Tasks Manually
from minix.core.registry import Registry
from minix.core.scheduler import Scheduler
scheduler = Registry().get(Scheduler)
scheduler.run_task(ProcessOrderTask(order_id=123))
Workflows (DAG Scheduling)
Minix supports defining and executing workflows as DAGs (Directed Acyclic Graphs) where:
- Each node is a
Task - Nodes can depend on other nodes
- Dependency results can be passed to downstream tasks
- The workflow engine guarantees each node runs at most once during a workflow execution
- If the DAG has multiple sink nodes, all sinks are executed
Creating a Workflow
from minix.core.registry import Registry
from minix.core.scheduler import Scheduler
from minix.core.scheduler.workflow import Workflow
scheduler = Registry().get(Scheduler)
wf = Workflow("order_pipeline")
t1 = wf.add(DAGTaskTest1(in_a=aaaaa), node_id="task1")
t2 = wf.add(DAGTaskTest2(), node_id="task2", depends_on=[t1], consume_dependency_results=True)
t3 = wf.add(DAGTaskTest3(), node_id="task3", depends_on=[t2, t1], consume_dependency_results=True)
t4 = wf.add(DAGTaskTest4(), node_id="task4", depends_on=[t3], consume_dependency_results=True)
t5 = wf.add(DAGTaskTest5(), node_id="task5", depends_on=[t4], consume_dependency_results=True)
res = scheduler.run_workflow(wf)
Dependency Result Passing
For a node with consume_dependency_results=True:
- If it has 1 dependency, that dependency result is passed as the first positional argument (
arg0) to the task. - If it has multiple dependencies, a list of dependency results is passed as
arg0in the same order asdepends_on.
For a node with consume_dependency_results=False, no dependency result is injected.
This design allows tasks to explicitly opt in/out of consuming upstream values, while still receiving their own constructor-provided
args/kwargs.
Running the Entire Workflow
res = scheduler.run_workflow(wf)
print(res.id) # Celery task id
# output = res.get(timeout=...) # if you want to block and fetch the final result
Final result shape when running the whole workflow:
- If the workflow has one sink, the final result is that sink’s output.
- If the workflow has multiple sinks, the final result is a dictionary mapping sink node ids to outputs:
{ "task5": <result_of_task6>, "task4": <result_of_task9> }
Running a Specific Target Node
You can execute only the subgraph required to compute a particular node (useful for partial runs or debugging):
res = scheduler.run_workflow(wf, target_node_id="task9")
# output = res.get(timeout=...)
In this mode, only task9 and its dependency closure are executed, and the returned value corresponds to the output of the target node (task9).
Workflow Execution Guarantees
When executing a workflow:
- No duplicate execution: if a node is required by multiple downstream nodes, it runs once.
- Multiple sinks are supported: all sink nodes run when executing the whole workflow.
- Full-graph execution: when no
target_node_idis provided, the workflow runs the entire DAG (not only the ancestry of a single sink).
Kafka Consumers
from minix.core.consumer import AsyncConsumer, AsyncConsumerConfig
class OrderEventConsumer(AsyncConsumer):
def get_config(self) -> AsyncConsumerConfig:
return AsyncConsumerConfig(
name="order_consumer",
topics=["orders"],
group_id="order_service",
bootstrap_servers=["localhost:9092"]
)
async def run(self, message: dict):
# Process the message
order_id = message.get("order_id")
print(f"Processing order: {order_id}")
ML Models
Base Model
from minix.core.model import Model
class MyModel(Model):
def get_model_name(self):
return "my_model"
def predict(self, model_input: dict):
# Prediction logic
pass
def set_device(self, device: str):
self.device = device
Embedding Model
from minix.core.model import EmbeddingModel
import numpy as np
class TextEmbedder(EmbeddingModel):
def get_model_name(self):
return "text_embedder"
def embed(self, text: str) -> np.ndarray:
# Return embedding vector
pass
def embed_batch(self, texts: list[str]) -> np.ndarray:
# Return batch of embeddings
pass
def set_device(self, device: str):
self.device = device
MLflow Model (requires ai extra)
from minix.core.model import MlflowModel
class MyMlflowModel(MlflowModel):
def __init__(self):
super().__init__(
name="my_model",
version=1,
packages=["torch", "transformers"]
)
def get_model(self):
# Return your model instance
pass
def predict(self, model_input):
model = self.load_model()
return model.predict(model_input)
Configuration
Minix uses environment variables for configuration. Create a .env file in your project root:
# Celery Configuration
CELERY_BROKER_URL=redis://localhost:6379/0
CELERY_RESULT_BACKEND=db+mysql://root:password@localhost:3306/celery_results
# Kafka Configuration
KAFKA_BOOTSTRAP_SERVERS=localhost:9092
# MLflow Configuration (for AI extras)
MLFLOW_TRACKING_URL=http://localhost:5000
PYTHON_VERSION=3.10
Extras
AI Capabilities
Install with AI tools (PyTorch, MLflow, Qdrant):
pip install "minix[ai]"
ClickHouse Support
Install with ClickHouse support:
pip install "minix[clickhouse]"
Development Tools
Install development dependencies:
pip install "minix[dev]"
Install All Extras
pip install "minix[ai,clickhouse,dev]"
Registry Usage
The Registry is a singleton-based dependency container:
from minix.core.registry import Registry
# Register a service
Registry().register(MyService, MyService())
# Register with a salt (for multiple instances)
Registry().register(SqlConnector, connector, salt="analytics")
# Retrieve a service
service = Registry().get(MyService)
# Retrieve with salt
connector = Registry().get(SqlConnector, salt="analytics")
CLI Commands
# Initialize a new project
minix init <project_name>
# Show framework version
minix version
License
Minix is licensed under the MIT License. See the LICENSE file for more details.
Contributing
Contributions are welcome! Please feel free to submit a Pull Request.
Author
AmirHossein Advari - amiradvari@gmail.com
Shirin Dehghani - shirin.dehghani1996@gmail.com
Parsa Mohammadpour - parsa.mohammadpour01@gmail.com
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file minix-0.1.57.tar.gz.
File metadata
- Download URL: minix-0.1.57.tar.gz
- Upload date:
- Size: 32.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/2.2.0 CPython/3.13.7 Darwin/24.6.0
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
4f5964da7b4505c1c9089c359b51a6838cf5e9b2457789db777a503cac7b3362
|
|
| MD5 |
57f6b465596a27b28a96b248faae3716
|
|
| BLAKE2b-256 |
8a2c12168eb4c34837cd775b8e8fc07dbdfb9f240aa5482c8a770ad51f55e7b5
|
File details
Details for the file minix-0.1.57-py3-none-any.whl.
File metadata
- Download URL: minix-0.1.57-py3-none-any.whl
- Upload date:
- Size: 46.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/2.2.0 CPython/3.13.7 Darwin/24.6.0
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
d9b0f906a77b35ecdce9a586858beee49dba6fcfbbdbaa834d8ff5c3041c9091
|
|
| MD5 |
b79e350db9991b788356ac8acd7c0b4d
|
|
| BLAKE2b-256 |
ab5816ddc3f4f8d3d7bf6287d853be70e86cebde9e604f0053f253b457f0a567
|