Skip to main content

A modular Python framework for backend, AI, and data projects

Project description

Minix

Minix is a modular Python framework for building backend, AI, and data-driven applications. It provides a clean, layered architecture with built-in support for REST APIs, task scheduling, message queues, and machine learning workflows.

Python Version License


Table of Contents


Key Features

  • FastAPI Integration: Build high-performance REST APIs with automatic OpenAPI documentation
  • Modular Architecture: Organize code into self-contained modules with entities, repositories, services, and controllers
  • Multi-Database Support: Built-in connectors for MySQL, ClickHouse, Redis, and Qdrant (vector DB)
  • Task Scheduling: Celery-powered background tasks with RedBeat scheduler for periodic jobs
  • Kafka Consumers: Async Kafka message processing with aiokafka
  • Object Storage: S3-compatible storage support via boto3
  • ML Workflows: Optional MLflow integration for model versioning and deployment
  • Dependency Registry: Singleton-based service container for clean dependency injection
  • Environment Management: Configuration via .env files using dotenv

Architecture Overview

┌─────────────────────────────────────────────────────────────────┐
│                         Application                              │
├─────────────────────────────────────────────────────────────────┤
│  Bootstrap → Registers Connectors & Modules                      │
├─────────────────────────────────────────────────────────────────┤
│                          Modules                                 │
│  ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐            │
│  │Controller│ │ Service  │ │Repository│ │  Entity  │            │
│  │ (API)    │→│ (Logic)  │→│  (Data)  │→│ (Model)  │            │
│  └──────────┘ └──────────┘ └──────────┘ └──────────┘            │
├─────────────────────────────────────────────────────────────────┤
│  Tasks (Celery)  │  Consumers (Kafka)  │  Models (MLflow)       │
├─────────────────────────────────────────────────────────────────┤
│                        Connectors                                │
│  SQL (MySQL/ClickHouse) │ Redis │ Qdrant │ Object Storage (S3)  │
└─────────────────────────────────────────────────────────────────┘

Installation

Requirements

  • Python 3.10 or higher

Install via pip

pip install minix

Install from source (development)

git clone <repository-url>
cd minix
pip install -e .

Core Concepts

Bootstrap

The bootstrap function initializes your application by registering connectors and modules:

from minix.core.bootstrap import bootstrap

bootstrap(
    modules=[Module1(), Module2()],
    connectors=[
        (sql_connector, None),           # Default connector
        (sql_connector_2, "analytics")   # Named connector with salt
    ]
)

Modules

Modules are self-contained units that group related functionality:

from minix.core.module.business_module import BusinessModule

class ProductModule(BusinessModule):
    def __init__(self):
        super().__init__("product")
        self.add_entity(ProductEntity)
        self.add_repository(ProductRepository)
        self.add_service(ProductService)
        self.add_controller(ProductController)
        self.add_periodic_task(SyncProductsTask)
        self.add_consumer(ProductEventConsumer)

Module Methods:

  • add_entity(entity) - Register a data entity
  • add_repository(repository, connector_salt) - Register a repository with optional connector
  • add_service(service) - Register a service
  • add_controller(controller) - Register an API controller
  • add_task(task) - Register an async task
  • add_periodic_task(periodic_task) - Register a scheduled task
  • add_consumer(consumer) - Register a Kafka consumer
  • add_model(model, config) - Register an ML model

Entities

Entities define your data models:

SQL Entity

from sqlalchemy import String
from sqlalchemy.orm import Mapped, mapped_column
from minix.core.entity import SqlEntity

class UserEntity(SqlEntity):
    __tablename__ = "users"

    name: Mapped[str] = mapped_column(String(100), nullable=False)
    email: Mapped[str] = mapped_column(String(255), unique=True, nullable=False)

SQL entities automatically include id, created_at, and updated_at columns.

Qdrant Entity (Vector DB)

from minix.core.entity import QdrantEntity

class DocumentEntity(QdrantEntity):
    content: str

    @staticmethod
    def collection() -> str:
        return "documents"

Redis Entity

from minix.core.entity import RedisEntity

class CacheEntity(RedisEntity):
    pass

Repositories

Repositories handle data access:

SQL Repository

from minix.core.repository import SqlRepository

class UserRepository(SqlRepository[UserEntity]):
    pass

Built-in methods:

  • save(entity) - Save a single entity
  • save_all(entities) - Save multiple entities
  • save_bulk(entities, chunk_size) - Bulk insert with chunking
  • get_all() - Get all entities
  • get_by_id(id) - Get by ID
  • get_by(**kwargs) - Filter by attributes
  • update(entity) - Update an entity
  • delete(entity) - Delete an entity

Qdrant Repository

from minix.core.repository import QdrantRepository

class DocumentRepository(QdrantRepository[DocumentEntity]):
    pass

Methods:

  • insert(entities) - Insert vectors
  • query(entity, top_k) - Similarity search
  • delete(entity_ids) - Delete vectors

Services

Services contain business logic:

from minix.core.service import Service

class UserService(Service[UserEntity]):
    def get_active_users(self):
        return self.repository.get_by(status="active")

    def create_user(self, name: str, email: str) -> UserEntity:
        user = UserEntity(name=name, email=email)
        return self.repository.save(user)

Controllers

Controllers define REST API endpoints:

from minix.core.controller import Controller
from minix.core.registry import Registry

class UserController(Controller):
    def get_prefix(self):
        return "/users"

    def define_routes(self):
        @self.router.get("/")
        def get_users():
            service = Registry().get(UserService)
            return service.get_repository().get_all()

        @self.router.post("/")
        def create_user(name: str, email: str):
            service = Registry().get(UserService)
            return service.create_user(name, email)

Connectors

SQL Connector (MySQL/ClickHouse)

from minix.core.connectors.sql_connector import SqlConnector, SqlConnectorConfig

config = SqlConnectorConfig(
    username="root",
    password="password",
    host="localhost",
    port=3306,
    database="mydb",
    driver="mysql",  # or "clickhouse"
    connect_timeout=30,
    pool_recycle=3600
)
connector = SqlConnector(config)

Qdrant Connector

from minix.core.connectors.qdrant_connector import QdrantConnector

connector = QdrantConnector(
    url="http://localhost:6333",
    api_key="your-api-key"
)
await connector.connect()

Object Storage Connector (S3-compatible)

from minix.core.connectors.object_storage_connector import ObjectStorageConnector
from minix.core.connectors.object_storage_connector.config import ObjectStorageConfig

config = ObjectStorageConfig(
    endpoint_url="https://s3.amazonaws.com",
    access_key="your-access-key",
    secret_key="your-secret-key",
    bucket_name="my-bucket"
)
connector = ObjectStorageConnector(config)

# Upload, download, delete files
await connector.upload_file(file_obj, "path/to/file.txt")
await connector.download_file("path/to/file.txt", local_file)
await connector.generate_presigned_url("path/to/file.txt", expiration=3600)

Tasks & Scheduling

Async Task

from minix.core.scheduler.task import Task

class ProcessOrderTask(Task):
    def get_name(self) -> str:
        return "process_order"

    def run(self, order_id: int):
        # Process the order
        pass

Periodic Task

from celery.schedules import crontab
from minix.core.scheduler.task import PeriodicTask

class DailyReportTask(PeriodicTask):
    def get_name(self) -> str:
        return "daily_report"

    def get_schedule(self) -> crontab:
        return crontab(hour=0, minute=0)  # Run at midnight

    def run(self):
        # Generate daily report
        pass

Running Tasks Manually

from minix.core.registry import Registry
from minix.core.scheduler import Scheduler

scheduler = Registry().get(Scheduler)
scheduler.run_task(ProcessOrderTask(order_id=123))

Workflows (DAG Scheduling)

Minix supports defining and executing workflows as DAGs (Directed Acyclic Graphs) where:

  • Each node is a Task
  • Nodes can depend on other nodes
  • Dependency results can be passed to downstream tasks
  • The workflow engine guarantees each node runs at most once during a workflow execution
  • If the DAG has multiple sink nodes, all sinks are executed
Creating a Workflow
from minix.core.registry import Registry
from minix.core.scheduler import Scheduler
from minix.core.scheduler.workflow import Workflow

scheduler = Registry().get(Scheduler)

wf = Workflow("order_pipeline")

t1 = wf.add(DAGTaskTest1(in_a=aaaaa), node_id="task1")
t2 = wf.add(DAGTaskTest2(), node_id="task2", depends_on=[t1], consume_dependency_results=True)
t3 = wf.add(DAGTaskTest3(), node_id="task3", depends_on=[t2, t1], consume_dependency_results=True)
t4 = wf.add(DAGTaskTest4(), node_id="task4", depends_on=[t3], consume_dependency_results=True)

t5 = wf.add(DAGTaskTest5(), node_id="task5", depends_on=[t4], consume_dependency_results=True)

res = scheduler.run_workflow(wf)
Dependency Result Passing

For a node with consume_dependency_results=True:

  • If it has 1 dependency, that dependency result is passed as the first positional argument (arg0) to the task.
  • If it has multiple dependencies, a list of dependency results is passed as arg0 in the same order as depends_on.

For a node with consume_dependency_results=False, no dependency result is injected.

This design allows tasks to explicitly opt in/out of consuming upstream values, while still receiving their own constructor-provided args/kwargs.

Running the Entire Workflow
res = scheduler.run_workflow(wf)
print(res.id)  # Celery task id
# output = res.get(timeout=...)  # if you want to block and fetch the final result

Final result shape when running the whole workflow:

  • If the workflow has one sink, the final result is that sink’s output.
  • If the workflow has multiple sinks, the final result is a dictionary mapping sink node ids to outputs:
    {
      "task5": <result_of_task6>,
      "task4": <result_of_task9>
    }
    
Running a Specific Target Node

You can execute only the subgraph required to compute a particular node (useful for partial runs or debugging):

res = scheduler.run_workflow(wf, target_node_id="task9")
# output = res.get(timeout=...)

In this mode, only task9 and its dependency closure are executed, and the returned value corresponds to the output of the target node (task9).

Workflow Execution Guarantees

When executing a workflow:

  • No duplicate execution: if a node is required by multiple downstream nodes, it runs once.
  • Multiple sinks are supported: all sink nodes run when executing the whole workflow.
  • Full-graph execution: when no target_node_id is provided, the workflow runs the entire DAG (not only the ancestry of a single sink).

Kafka Consumers

from minix.core.consumer import AsyncConsumer, AsyncConsumerConfig

class OrderEventConsumer(AsyncConsumer):
    def get_config(self) -> AsyncConsumerConfig:
        return AsyncConsumerConfig(
            name="order_consumer",
            topics=["orders"],
            group_id="order_service",
            bootstrap_servers=["localhost:9092"]
        )

    async def run(self, message: dict):
        # Process the message
        order_id = message.get("order_id")
        print(f"Processing order: {order_id}")

ML Models

Base Model

from minix.core.model import Model

class MyModel(Model):
    def get_model_name(self):
        return "my_model"

    def predict(self, model_input: dict):
        # Prediction logic
        pass

    def set_device(self, device: str):
        self.device = device

Embedding Model

from minix.core.model import EmbeddingModel
import numpy as np

class TextEmbedder(EmbeddingModel):
    def get_model_name(self):
        return "text_embedder"

    def embed(self, text: str) -> np.ndarray:
        # Return embedding vector
        pass

    def embed_batch(self, texts: list[str]) -> np.ndarray:
        # Return batch of embeddings
        pass

    def set_device(self, device: str):
        self.device = device

MLflow Model (requires ai extra)

from minix.core.model import MlflowModel

class MyMlflowModel(MlflowModel):
    def __init__(self):
        super().__init__(
            name="my_model",
            version=1,
            packages=["torch", "transformers"]
        )

    def get_model(self):
        # Return your model instance
        pass

    def predict(self, model_input):
        model = self.load_model()
        return model.predict(model_input)

Configuration

Minix uses environment variables for configuration. Create a .env file in your project root:

# Celery Configuration
CELERY_BROKER_URL=redis://localhost:6379/0
CELERY_RESULT_BACKEND=db+mysql://root:password@localhost:3306/celery_results

# Kafka Configuration
KAFKA_BOOTSTRAP_SERVERS=localhost:9092

# MLflow Configuration (for AI extras)
MLFLOW_TRACKING_URL=http://localhost:5000
PYTHON_VERSION=3.10

Extras

AI Capabilities

Install with AI tools (PyTorch, MLflow, Qdrant):

pip install "minix[ai]"

ClickHouse Support

Install with ClickHouse support:

pip install "minix[clickhouse]"

Development Tools

Install development dependencies:

pip install "minix[dev]"

Install All Extras

pip install "minix[ai,clickhouse,dev]"

Registry Usage

The Registry is a singleton-based dependency container:

from minix.core.registry import Registry

# Register a service
Registry().register(MyService, MyService())

# Register with a salt (for multiple instances)
Registry().register(SqlConnector, connector, salt="analytics")

# Retrieve a service
service = Registry().get(MyService)

# Retrieve with salt
connector = Registry().get(SqlConnector, salt="analytics")

CLI Commands

# Initialize a new project
minix init <project_name>

# Show framework version
minix version

License

Minix is licensed under the MIT License. See the LICENSE file for more details.


Contributing

Contributions are welcome! Please feel free to submit a Pull Request.


Author

AmirHossein Advari - amiradvari@gmail.com
Shirin Dehghani - shirin.dehghani1996@gmail.com
Parsa Mohammadpour - parsa.mohammadpour01@gmail.com

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

minix-0.1.52.tar.gz (32.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

minix-0.1.52-py3-none-any.whl (46.8 kB view details)

Uploaded Python 3

File details

Details for the file minix-0.1.52.tar.gz.

File metadata

  • Download URL: minix-0.1.52.tar.gz
  • Upload date:
  • Size: 32.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.2.0 CPython/3.13.7 Darwin/24.6.0

File hashes

Hashes for minix-0.1.52.tar.gz
Algorithm Hash digest
SHA256 bca9f34f190f1c73dc1214a410c0f6851b22546e2fc1c32102a7e819e442b912
MD5 6b306f90cb4c59bbfe2d8f84720fd318
BLAKE2b-256 f35869f3a7ee38b4bfc6fb0213284c72389ddb26972d38680bb5075ccc9add22

See more details on using hashes here.

File details

Details for the file minix-0.1.52-py3-none-any.whl.

File metadata

  • Download URL: minix-0.1.52-py3-none-any.whl
  • Upload date:
  • Size: 46.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.2.0 CPython/3.13.7 Darwin/24.6.0

File hashes

Hashes for minix-0.1.52-py3-none-any.whl
Algorithm Hash digest
SHA256 d59c646a78845e25d4e254c990a494045eabfaab613b2b1002519d53716dcf62
MD5 6c1a0b3c56ea5f7aa2ad17c54eba7913
BLAKE2b-256 75e570e2d7d51c04de119022ca2e98add5f4af35cf688986947ad951019fb0a6

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page