Skip to main content

No project description provided

Project description

Eventix

Eventix is a distributed event-driven task scheduling and execution system built with Python and FastAPI. It enables you to register events, define triggers that respond to those events, and schedule asynchronous tasks with priorities, retry logic, and flexible execution.

Table of Contents

Features

  • Event-Driven Architecture: Post events and automatically trigger tasks based on registered triggers
  • Priority-Based Task Scheduling: Tasks are executed based on priority and ETA (Estimated Time of Arrival)
  • Retry Logic: Automatic retry with configurable backoff for failed tasks
  • Task Uniqueness: Prevent duplicate tasks with unique keys
  • Namespace Support: Isolate tasks and events across different namespaces
  • MongoDB Backend: Built on MongoDB for persistent storage
  • RESTful API: FastAPI-based HTTP interface for event and task management
  • Worker System: Distributed worker support for task execution
  • Scheduled Tasks: Support for recurring tasks via cron-like scheduling
  • Task Relay: Forward tasks between different Eventix instances
  • Metrics & Monitoring: Built-in metrics endpoints for observability

Architecture

Eventix consists of three main components:

  1. API Server (main.py): FastAPI application that receives events, registers triggers, and manages tasks
  2. Worker (worker.py): Processes scheduled tasks from the queue
  3. Client: Applications that post events or schedule tasks
┌─────────────┐         ┌──────────────┐         ┌─────────────┐
│   Client    │────────▶│  API Server  │────────▶│  MongoDB    │
│ Application │         │  (FastAPI)   │         │  Backend    │
└─────────────┘         └──────────────┘         └─────────────┘
                               │                         ▲
                               │                         │
                               ▼                         │
                        ┌──────────────┐                 │
                        │   Worker(s)  │─────────────────┘
                        │  Task Exec   │
                        └──────────────┘

Installation

Prerequisites

  • Python 3.13+
  • MongoDB instance (local or remote)

Install with uv (recommended)

# Clone the repository
git clone https://github.com/yourusername/eventix.git
cd eventix

# Install dependencies with uv
uv sync

Install with pip

pip install eventix

Quick Start

1. Start MongoDB

docker run -d -p 27017:27017 mongo:latest

2. Configure Environment

Create a .env.local file based on .env.local.example:

cp .env.local.example .env.local

Edit .env.local:

MONGODB_URI=mongodb://localhost:27017/eventix
EVENTIX_URL=http://localhost:8000
EVENTIX_NAMESPACE=default
EVENTIX_API_PORT=8000

3. Start the API Server

python main.py

The API will be available at http://localhost:8000. Visit http://localhost:8000/docs for interactive API documentation.

4. Start a Worker

python worker.py

5. Schedule Your First Task

Create a simple task scheduler (example from demo_app/demo_scheduler.py):

from eventix.functions.core import task
from eventix.functions.task_scheduler import TaskScheduler

@task()
def mytask(data: str):
    print(f"Task executed with data: {data}")

if __name__ == "__main__":
    TaskScheduler.config({})
    mytask.delay(data="Hello, Eventix!")

Configuration

Environment Variables

Variable Description Default
MONGODB_URI MongoDB connection string -
EVENTIX_URL URL of the Eventix API server -
EVENTIX_NAMESPACE Default namespace for tasks/events default
EVENTIX_API_PORT Port for the API server 8000
EVENTIX_BACKEND Backend type (mongodb/couchdb) mongodb
EVENTIX_DELAY_TASKS Enable delayed task execution true
EVENTIX_SCHEDULE_ENABLED Enable scheduled/recurring tasks false
EVENTIX_TRIGGER_CONFIG_DIRECTORY Directory for trigger YAML configs -
EVENTIX_RELAY_CONFIG Path to relay configuration YAML relay.yaml

Core Concepts

Events

Events are notifications that something has occurred in your system. Events have:

  • name: Unique identifier for the event type
  • namespace: Logical grouping (default: "default")
  • payload: Data associated with the event
  • priority: Execution priority (lower = higher priority)
  • timestamp: When the event occurred
  • operator: Who/what triggered the event

Example event definition:

from eventix.pydantic.event import TEventixEvent

class TransactionStatusChangeEvent(TEventixEvent):
    class Payload(BaseModel):
        full_tr_number: str
        status: str

    payload: Payload

Triggers

Triggers are event handlers that execute when specific events occur. They convert events into tasks.

Example trigger:

from eventix.pydantic.event import TEventixEventTrigger, TEventixEvent
from eventix.pydantic.task import TEventixTask

class AnnouncementEmailTrigger(TEventixEventTrigger):
    event_name: str = "TransactionStatusChangeEvent"
    namespace: str = "default"

    def execute(self, event: TEventixEvent) -> TEventixTask:
        # Check if event matches criteria
        if event.payload.get("status") != "ANNOUNCED":
            self.ignore()  # Skip this event

        # Return a task to be scheduled
        return TEventixTask(
            task="send_email",
            kwargs={"to": "user@example.com", "subject": "Announced!"}
        )

Register triggers:

from eventix.functions.event import event_register_trigger

event_register_trigger(AnnouncementEmailTrigger())

Or load from YAML configuration files.

Tasks

Tasks are units of work to be executed by workers. Tasks have:

  • task: Function name to execute
  • args/kwargs: Function arguments
  • eta: Estimated time of arrival (when to execute)
  • priority: Execution order (lower = higher priority)
  • status: scheduled, processing, done, error, retry
  • unique_key: Optional key to prevent duplicate tasks
  • retry: Enable automatic retry on failure
  • max_retries: Maximum retry attempts

Example task creation:

from eventix.functions.core import task

@task()
def send_email(to: str, subject: str, body: str):
    # Send email logic
    print(f"Sending email to {to}: {subject}")

# Schedule immediately
send_email.delay(to="user@example.com", subject="Hello", body="World")

# Schedule for later
from datetime import datetime, timedelta
send_email.apply_async(
    kwargs={"to": "user@example.com", "subject": "Reminder", "body": "..."},
    eta=datetime.utcnow() + timedelta(hours=1)
)

Namespaces

Namespaces provide logical isolation for tasks and events, allowing you to:

  • Separate environments (dev, staging, production)
  • Isolate different applications or tenants
  • Apply different processing rules per namespace

Usage Examples

Example 1: Post an Event

from eventix.functions.event import event_post
from eventix.pydantic.event import TEventixEvent

event = TEventixEvent(
    name="UserRegistered",
    namespace="production",
    payload={"user_id": "12345", "email": "user@example.com"}
)

tasks = event_post(event)  # Returns list of scheduled tasks

Example 2: Define and Register a Trigger

from eventix.pydantic.event import TEventixEventTrigger
from eventix.pydantic.task import TEventixTask
from eventix.functions.event import event_register_trigger

class WelcomeEmailTrigger(TEventixEventTrigger):
    event_name = "UserRegistered"

    def execute(self, event):
        return TEventixTask(
            task="send_welcome_email",
            kwargs={"email": event.payload["email"]}
        )

event_register_trigger(WelcomeEmailTrigger())

Example 3: Define a Task with Retry Logic

from eventix.functions.core import task

@task(retry=True, max_retries=3)
def process_payment(order_id: str, amount: float):
    # Payment processing logic
    pass

process_payment.delay(order_id="ORD-123", amount=99.99)

Example 4: Task with Unique Key (Prevents Duplicates)

@task()
def generate_report(report_id: str):
    # Report generation logic
    pass

generate_report.apply_async(
    kwargs={"report_id": "RPT-001"},
    unique_key="report_RPT-001"  # Only one task with this key will be scheduled
)

API Documentation

Once the server is running, visit:

  • Swagger UI: http://localhost:8000/docs
  • ReDoc: http://localhost:8000/redoc

Key Endpoints

  • POST /event - Post a new event
  • GET /tasks - List tasks (with filtering)
  • GET /task/{task_id} - Get task details
  • POST /task - Create a new task
  • GET /metrics - System metrics
  • GET /namespaces - List available namespaces
  • GET /healthz - Health check endpoint

Worker Setup

Workers are responsible for executing scheduled tasks. You can run multiple workers for horizontal scaling.

Basic Worker (worker.py)

from eventix.functions.fastapi import init_backend
from eventix.functions.task_worker import TaskWorker
from eventixconfig import config

worker = TaskWorker(config)

if __name__ == "__main__":
    init_backend()
    worker.start(endless=True)

Worker Configuration (eventixconfig.py)

# Define your task functions here
config = {
    "send_email": send_email_function,
    "process_payment": process_payment_function,
    # ... more tasks
}

Running Multiple Workers

# Terminal 1
python worker.py

# Terminal 2
python worker.py

# Terminal 3
python worker.py

Workers will automatically coordinate via MongoDB to prevent duplicate task execution.

Development

Running Tests

pytest

Code Formatting

ruff format .

Linting

ruff check .

Docker Setup

Build and run with Docker:

docker-compose up

Or use the helper scripts:

python docker_build.py
python docker_run.py

Database

Eventix uses MongoDB as its primary backend for storing events, tasks, and triggers. The system automatically creates indexes for optimal query performance.

Collections

  • task: Stores all task records with status, priority, and execution details
  • event: Stores event history (if enabled)

License

[Add your license information here]

Contributing

Contributions are welcome! Please open an issue or submit a pull request.

Support

For questions or issues, please open an issue on GitHub.

Project details


Release history Release notifications | RSS feed

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

eventix-4.3.3.tar.gz (134.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

eventix-4.3.3-py3-none-any.whl (37.6 kB view details)

Uploaded Python 3

File details

Details for the file eventix-4.3.3.tar.gz.

File metadata

  • Download URL: eventix-4.3.3.tar.gz
  • Upload date:
  • Size: 134.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.24 {"installer":{"name":"uv","version":"0.9.24","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"25.10","id":"questing","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for eventix-4.3.3.tar.gz
Algorithm Hash digest
SHA256 8921ea59902a9cd43fc3e0ba987234aceb62094a518746ffbb9fbeb1ac4aa1e7
MD5 42024389edf520ca825878db9f26df71
BLAKE2b-256 a522f48b6e9995728defb76c47bb1d6ada3a34960ad9a6f33281d3d25f3c326d

See more details on using hashes here.

File details

Details for the file eventix-4.3.3-py3-none-any.whl.

File metadata

  • Download URL: eventix-4.3.3-py3-none-any.whl
  • Upload date:
  • Size: 37.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.24 {"installer":{"name":"uv","version":"0.9.24","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"25.10","id":"questing","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for eventix-4.3.3-py3-none-any.whl
Algorithm Hash digest
SHA256 b086beddafa5605fc553a670dcf1d938dc07be1e43beaeb94925dfe4e548e6fb
MD5 8a2407fd0806c59ebbbf246e4eb90d01
BLAKE2b-256 ceb1591e6ef8329202c0f80f4939ef902fae8b5b158a660ab60e6c361b49ec0c

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page