No project description provided
Project description
Eventix
Eventix is a distributed event-driven task scheduling and execution system built with Python and FastAPI. It enables you to register events, define triggers that respond to those events, and schedule asynchronous tasks with priorities, retry logic, and flexible execution.
Table of Contents
- Features
- Architecture
- Installation
- Quick Start
- Configuration
- Core Concepts
- Usage Examples
- API Documentation
- Worker Setup
- Development
- License
Features
- Event-Driven Architecture: Post events and automatically trigger tasks based on registered triggers
- Priority-Based Task Scheduling: Tasks are executed based on priority and ETA (Estimated Time of Arrival)
- Retry Logic: Automatic retry with configurable backoff for failed tasks
- Task Uniqueness: Prevent duplicate tasks with unique keys
- Namespace Support: Isolate tasks and events across different namespaces
- MongoDB Backend: Built on MongoDB for persistent storage
- RESTful API: FastAPI-based HTTP interface for event and task management
- Worker System: Distributed worker support for task execution
- Scheduled Tasks: Support for recurring tasks via cron-like scheduling
- Task Relay: Forward tasks between different Eventix instances
- Metrics & Monitoring: Built-in metrics endpoints for observability
Architecture
Eventix consists of three main components:
- API Server (
main.py): FastAPI application that receives events, registers triggers, and manages tasks - Worker (
worker.py): Processes scheduled tasks from the queue - Client: Applications that post events or schedule tasks
┌─────────────┐ ┌──────────────┐ ┌─────────────┐
│ Client │────────▶│ API Server │────────▶│ MongoDB │
│ Application │ │ (FastAPI) │ │ Backend │
└─────────────┘ └──────────────┘ └─────────────┘
│ ▲
│ │
▼ │
┌──────────────┐ │
│ Worker(s) │─────────────────┘
│ Task Exec │
└──────────────┘
Installation
Prerequisites
- Python 3.13+
- MongoDB instance (local or remote)
Install with uv (recommended)
# Clone the repository
git clone https://github.com/yourusername/eventix.git
cd eventix
# Install dependencies with uv
uv sync
Install with pip
pip install eventix
Quick Start
1. Start MongoDB
docker run -d -p 27017:27017 mongo:latest
2. Configure Environment
Create a .env.local file based on .env.local.example:
cp .env.local.example .env.local
Edit .env.local:
MONGODB_URI=mongodb://localhost:27017/eventix
EVENTIX_URL=http://localhost:8000
EVENTIX_NAMESPACE=default
EVENTIX_API_PORT=8000
3. Start the API Server
python main.py
The API will be available at http://localhost:8000. Visit http://localhost:8000/docs for interactive API documentation.
4. Start a Worker
python worker.py
5. Schedule Your First Task
Create a simple task scheduler (example from demo_app/demo_scheduler.py):
from eventix.functions.core import task
from eventix.functions.task_scheduler import TaskScheduler
@task()
def mytask(data: str):
print(f"Task executed with data: {data}")
if __name__ == "__main__":
TaskScheduler.config({})
mytask.delay(data="Hello, Eventix!")
Configuration
Environment Variables
| Variable | Description | Default |
|---|---|---|
MONGODB_URI |
MongoDB connection string | - |
EVENTIX_URL |
URL of the Eventix API server | - |
EVENTIX_NAMESPACE |
Default namespace for tasks/events | default |
EVENTIX_API_PORT |
Port for the API server | 8000 |
EVENTIX_BACKEND |
Backend type (mongodb/couchdb) | mongodb |
EVENTIX_DELAY_TASKS |
Enable delayed task execution | true |
EVENTIX_SCHEDULE_ENABLED |
Enable scheduled/recurring tasks | false |
EVENTIX_TRIGGER_CONFIG_DIRECTORY |
Directory for trigger YAML configs | - |
EVENTIX_RELAY_CONFIG |
Path to relay configuration YAML | relay.yaml |
Core Concepts
Events
Events are notifications that something has occurred in your system. Events have:
- name: Unique identifier for the event type
- namespace: Logical grouping (default: "default")
- payload: Data associated with the event
- priority: Execution priority (lower = higher priority)
- timestamp: When the event occurred
- operator: Who/what triggered the event
Example event definition:
from eventix.pydantic.event import TEventixEvent
class TransactionStatusChangeEvent(TEventixEvent):
class Payload(BaseModel):
full_tr_number: str
status: str
payload: Payload
Triggers
Triggers are event handlers that execute when specific events occur. They convert events into tasks.
Example trigger:
from eventix.pydantic.event import TEventixEventTrigger, TEventixEvent
from eventix.pydantic.task import TEventixTask
class AnnouncementEmailTrigger(TEventixEventTrigger):
event_name: str = "TransactionStatusChangeEvent"
namespace: str = "default"
def execute(self, event: TEventixEvent) -> TEventixTask:
# Check if event matches criteria
if event.payload.get("status") != "ANNOUNCED":
self.ignore() # Skip this event
# Return a task to be scheduled
return TEventixTask(
task="send_email",
kwargs={"to": "user@example.com", "subject": "Announced!"}
)
Register triggers:
from eventix.functions.event import event_register_trigger
event_register_trigger(AnnouncementEmailTrigger())
Or load from YAML configuration files.
Tasks
Tasks are units of work to be executed by workers. Tasks have:
- task: Function name to execute
- args/kwargs: Function arguments
- eta: Estimated time of arrival (when to execute)
- priority: Execution order (lower = higher priority)
- status: scheduled, processing, done, error, retry
- unique_key: Optional key to prevent duplicate tasks
- retry: Enable automatic retry on failure
- max_retries: Maximum retry attempts
Example task creation:
from eventix.functions.core import task
@task()
def send_email(to: str, subject: str, body: str):
# Send email logic
print(f"Sending email to {to}: {subject}")
# Schedule immediately
send_email.delay(to="user@example.com", subject="Hello", body="World")
# Schedule for later
from datetime import datetime, timedelta
send_email.apply_async(
kwargs={"to": "user@example.com", "subject": "Reminder", "body": "..."},
eta=datetime.utcnow() + timedelta(hours=1)
)
Namespaces
Namespaces provide logical isolation for tasks and events, allowing you to:
- Separate environments (dev, staging, production)
- Isolate different applications or tenants
- Apply different processing rules per namespace
Usage Examples
Example 1: Post an Event
from eventix.functions.event import event_post
from eventix.pydantic.event import TEventixEvent
event = TEventixEvent(
name="UserRegistered",
namespace="production",
payload={"user_id": "12345", "email": "user@example.com"}
)
tasks = event_post(event) # Returns list of scheduled tasks
Example 2: Define and Register a Trigger
from eventix.pydantic.event import TEventixEventTrigger
from eventix.pydantic.task import TEventixTask
from eventix.functions.event import event_register_trigger
class WelcomeEmailTrigger(TEventixEventTrigger):
event_name = "UserRegistered"
def execute(self, event):
return TEventixTask(
task="send_welcome_email",
kwargs={"email": event.payload["email"]}
)
event_register_trigger(WelcomeEmailTrigger())
Example 3: Define a Task with Retry Logic
from eventix.functions.core import task
@task(retry=True, max_retries=3)
def process_payment(order_id: str, amount: float):
# Payment processing logic
pass
process_payment.delay(order_id="ORD-123", amount=99.99)
Example 4: Task with Unique Key (Prevents Duplicates)
@task()
def generate_report(report_id: str):
# Report generation logic
pass
generate_report.apply_async(
kwargs={"report_id": "RPT-001"},
unique_key="report_RPT-001" # Only one task with this key will be scheduled
)
API Documentation
Once the server is running, visit:
- Swagger UI:
http://localhost:8000/docs - ReDoc:
http://localhost:8000/redoc
Key Endpoints
POST /event- Post a new eventGET /tasks- List tasks (with filtering)GET /task/{task_id}- Get task detailsPOST /task- Create a new taskGET /metrics- System metricsGET /namespaces- List available namespacesGET /healthz- Health check endpoint
Worker Setup
Workers are responsible for executing scheduled tasks. You can run multiple workers for horizontal scaling.
Basic Worker (worker.py)
from eventix.functions.fastapi import init_backend
from eventix.functions.task_worker import TaskWorker
from eventixconfig import config
worker = TaskWorker(config)
if __name__ == "__main__":
init_backend()
worker.start(endless=True)
Worker Configuration (eventixconfig.py)
# Define your task functions here
config = {
"send_email": send_email_function,
"process_payment": process_payment_function,
# ... more tasks
}
Running Multiple Workers
# Terminal 1
python worker.py
# Terminal 2
python worker.py
# Terminal 3
python worker.py
Workers will automatically coordinate via MongoDB to prevent duplicate task execution.
Development
Running Tests
pytest
Code Formatting
ruff format .
Linting
ruff check .
Docker Setup
Build and run with Docker:
docker-compose up
Or use the helper scripts:
python docker_build.py
python docker_run.py
Database
Eventix uses MongoDB as its primary backend for storing events, tasks, and triggers. The system automatically creates indexes for optimal query performance.
Collections
- task: Stores all task records with status, priority, and execution details
- event: Stores event history (if enabled)
License
[Add your license information here]
Contributing
Contributions are welcome! Please open an issue or submit a pull request.
Support
For questions or issues, please open an issue on GitHub.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file eventix-4.3.3.tar.gz.
File metadata
- Download URL: eventix-4.3.3.tar.gz
- Upload date:
- Size: 134.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.9.24 {"installer":{"name":"uv","version":"0.9.24","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"25.10","id":"questing","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
8921ea59902a9cd43fc3e0ba987234aceb62094a518746ffbb9fbeb1ac4aa1e7
|
|
| MD5 |
42024389edf520ca825878db9f26df71
|
|
| BLAKE2b-256 |
a522f48b6e9995728defb76c47bb1d6ada3a34960ad9a6f33281d3d25f3c326d
|
File details
Details for the file eventix-4.3.3-py3-none-any.whl.
File metadata
- Download URL: eventix-4.3.3-py3-none-any.whl
- Upload date:
- Size: 37.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.9.24 {"installer":{"name":"uv","version":"0.9.24","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"25.10","id":"questing","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
b086beddafa5605fc553a670dcf1d938dc07be1e43beaeb94925dfe4e548e6fb
|
|
| MD5 |
8a2407fd0806c59ebbbf246e4eb90d01
|
|
| BLAKE2b-256 |
ceb1591e6ef8329202c0f80f4939ef902fae8b5b158a660ab60e6c361b49ec0c
|