Temporal scheduled workflow worker for EWMA baseline updating and anomaly checking
Project description
Temporal EWMA Worker
Temporal worker package for scheduled EWMA baseline updates and cost anomaly detection.
Folder Structure
.
├── build/
│ └── Dockerfile
├── contracts/
│ ├── asyncapi/
│ │ └── v1.yaml
│ ├── changelog.md
│ └── workflows/
│ └── ewma_baseline_update.yaml
├── database/
│ ├── migrations/
│ │ ├── 0001_init.rollback.sql
│ │ └── 0001_init.sql
│ └── schema.lock
├── deploy/
│ └── docker/
│ └── docker-compose.yaml
├── feature-registry.yaml
├── pyproject.toml
├── README.md
├── scripts/
│ ├── deploy_docker.sh
│ ├── migrate.py
│ ├── migrate.sh
│ ├── run.sh
│ └── test.sh
├── src/
│ ├── features/
│ │ └── ewma_compute/
│ │ ├── index.py
│ │ └── service.py
│ ├── infra/
│ │ └── adapters/
│ │ ├── clickhouse/
│ │ │ └── clickhouse_adapter.py
│ │ ├── kafka/
│ │ │ └── kafka_alert_adapter.py
│ │ ├── postgres/
│ │ │ └── postgres_adapter.py
│ │ └── redis/
│ │ └── redis_adapter.py
│ ├── shared/
│ │ ├── contracts/
│ │ │ └── validator.py
│ │ ├── errors/
│ │ │ └── base.py
│ │ ├── ports/
│ │ │ ├── clickhouse_port.py
│ │ │ ├── postgres_port.py
│ │ │ └── redis_port.py
│ │ └── types/
│ │ └── ewma_types.py
│ └── worker/
│ ├── activities.py
│ ├── config.py
│ ├── index.py
│ ├── registry.py
│ └── workflows.py
├── tests/
│ ├── integration/
│ │ └── test_adapters.py
│ └── unit/
│ ├── test_config.py
│ ├── test_contract.py
│ ├── test_ewma_service.py
│ └── test_workflow.py
└── worker-registry.yaml
Work Execution & Decision Flow
The following detailed decision tree outlines how the hourly workflow updates baselines and flags anomalies, with justification for each design choice:
[Hourly Cron Trigger (0 * * * *)]
└── EwmaBaselineUpdate Workflow Starts
│
│ ► RATIONALE: Scheduled cron triggers workflow at off-peak hour intervals.
│
└── Activity: fetch_active_pairs()
│
│ ► RATIONALE: Scans ClickHouse log volumes for active (service, model) pairs
│ in the last 7 days. This filters out millions of historical combinations,
│ focusing computation ONLY on active traffic to minimize cost and execution time.
│
└── Loop over active (service, model) pairs concurrently:
│
│ ► RATIONALE: Temporal workflows run loops concurrently. Concurrency allows
│ thousands of pairs to be evaluated in parallel without blocking.
│
├── Activity: get_baseline(service, model, hour_of_week)
│ │
│ │ ► RATIONALE: Reads the current baseline record from PostgreSQL. PostgreSQL is
│ │ used here because it provides ACID compliance for historical baselines.
│ │
│ ├── Existing Baseline NOT found (Cold Start)
│ │ ├── Activity: fetch_global_model_avg(model)
│ │ │
│ │ │ ► RATIONALE: Lacking historical service/model pairing, we seed the baseline
│ │ │ using the global average cost for this specific model (e.g. gpt-4o) across
│ │ │ all services. This prevents false positive anomaly triggers during cold starts.
│ │ │
│ │ └── Seed EWMA baseline value = Global Model Average
│ │
│ └── Existing Baseline found (Warm Status)
│ ├── Activity: fetch_cost_history(service, model, hour_of_week)
│ │ │
│ │ │ ► RATIONALE: Queries ClickHouse for the cost of the same hour_of_week (0-167)
│ │ │ over the last 4 weeks. ClickHouse is selected here because column-oriented
│ │ │ storage allows ultra-fast aggregation of historical logs.
│ │ │
│ │ └── Fetch last 4 occurrences from ClickHouse
│ └── Compute EWMA baseline value using α=0.1:
│ EWMA_new = (1 - α) * EWMA_prev + α * Cost_current
│
├── Activity: fetch_current_cost_1h(service, model)
│
├── Activity: upsert_baseline(EwmaRecord)
│ │
│ │ ► RATIONALE: Persists the calculated baseline to PostgreSQL for persistent audit trail.
│ │
│ └── Persist updated baseline to PostgreSQL
│
├── Write updated EWMA value to Redis Cache
│ │
│ │ ► RATIONALE: Anomaly-detection gateways on the ingestion path need ultra-low latency.
│ │ Redis caches the calculated baseline under: ewma:cost:{service}:{model}:{hour_of_week}
│ │
│ └── Key: ewma:cost:{service}:{model}:{hour_of_week}
│
└── Decision: Is Cost_current > (3 * EWMA_baseline)?
│
├── YES (Anomaly Detected)
│ ├── Activity: fetch_cost_by_cluster_1h(service, model)
│ │ │
│ │ │ ► RATIONALE: If cost spikes, we query ClickHouse to break down the cost
│ │ │ contributions by Kubernetes cluster/namespace to locate the root cause.
│ │ │
│ │ └── Get cluster drilldown metrics
│ │
│ └── Activity: publish_anomaly_alert(AnomalyPayload)
│ │
│ │ ► RATIONALE: Publishes to Kafka topic 'cost-anomaly-alerts'. Using Kafka
│ │ decouples anomaly detection from notification delivery (Slack, pager).
│ │
│ └── Emit alert JSON to Kafka topic
│
└── NO (Normal State)
└── Do nothing
Sequencing & Dependency Map
To run the worker successfully, you MUST spin up and configure dependencies in the following strict order:
[Step 1: Docker Containers] ---> [Step 2: Configuration] ---> [Step 3: DB Migrations] ---> [Step 4: Verification] ---> [Step 5: Start Worker]
• ClickHouse (8123) • Copy .env.example • ./scripts/migrate.sh • ./scripts/test.sh • ./scripts/run.sh
• PostgreSQL (5432) • Set hosts & ports (Applies SQL schemas) (Ensures integrations (Starts polling
• Redis Cache (6379) and mock runs pass) Temporal task queue)
• Kafka & Zookeeper (9092)
• Temporal Server (7233)
Setup & Running
Follow these steps to set up the local development environment and run the worker:
1. Prerequisites
Ensure you have the following installed:
- Python 3.11+
- Docker & Docker Compose
- Git
2. Configure Virtual Environment & Dependencies
Create a virtual environment and install the package along with development requirements:
# Create virtual environment
python3 -m venv .venv
# Activate virtual environment
source .venv/bin/activate
# Install package in editable mode with development dependencies
pip install -e ".[dev]"
3. Spin Up Infrastructure
Use the provided docker-compose to run ClickingHouse, Postgres, Redis, Kafka, and Temporal locally:
docker compose -f deploy/docker/docker-compose.yaml up -d
4. Configure Environment Variables
Copy the template .env.example to .env and fill in custom connection strings if necessary:
cp .env.example .env
Database Migrations Guide
The database schema is managed via light-weight migration scripts tracked under database/migrations/ and verified using a schema.lock file.
How it Works
The migration status is tracked inside /database/migrations/schema.lock containing the active version tag (e.g. 0001 or 0000).
Apply Migrations (UP)
To apply pending database schemas, run:
./scripts/migrate.sh up
This runs 0001_init.sql against the configured PostgreSQL database and writes 0001 to schema.lock.
Rollback Migrations
To revert schemas and return to baseline state, run:
./scripts/migrate.sh rollback
This executes the rollback SQL scripts and sets the schema.lock version to 0000.
Creating a New Migration
- Add your SQL changes inside
database/migrations/using a sequential identifier (e.g.,0002_add_index.sqland0002_add_index.rollback.sql). - Update the transition mappings inside
scripts/migrate.pyto support applying and rolling back your new script file.
Running Verification & Worker
1. Run Tests
Verify configuration, domain services, and workflow behavior using the test script:
./scripts/test.sh
2. Run Worker
Start the Temporal worker polling queue ewma-tasks:
./scripts/run.sh
Remote Management API (REST)
The worker now includes a FastAPI management layer (port 8000 in prod).
| Endpoint | Method | Description |
|---|---|---|
/health |
GET | Check worker status and config. |
/trigger |
POST | Trigger the EWMA baseline update workflow on-demand. |
Example Execution
curl -X POST http://localhost:8000/trigger \
-H "Content-Type: application/json" \
-d '{
"force_hour": 42
}'
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file temporal_ewma_worker-0.1.2.tar.gz.
File metadata
- Download URL: temporal_ewma_worker-0.1.2.tar.gz
- Upload date:
- Size: 16.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
50dc5c04950345dd3753be2ed9c03cc70bf9452bb3e51d8e0eebf2c3249f344d
|
|
| MD5 |
62c3c1f0036887a8dfd566584ccf2aae
|
|
| BLAKE2b-256 |
66e030cf6b574044f09d4ec7ce944bb4ec8e258df285b81b0a30f564ced77e11
|
Provenance
The following attestation bundles were made for temporal_ewma_worker-0.1.2.tar.gz:
Publisher:
publish-temporal-ewma-worker.yml on Chief-Strategist-J/llm-observability-platform
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
temporal_ewma_worker-0.1.2.tar.gz -
Subject digest:
50dc5c04950345dd3753be2ed9c03cc70bf9452bb3e51d8e0eebf2c3249f344d - Sigstore transparency entry: 1633079985
- Sigstore integration time:
-
Permalink:
Chief-Strategist-J/llm-observability-platform@f7be928ae53d313f7e2aaee4b166ae3e0f569284 -
Branch / Tag:
refs/heads/main - Owner: https://github.com/Chief-Strategist-J
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish-temporal-ewma-worker.yml@f7be928ae53d313f7e2aaee4b166ae3e0f569284 -
Trigger Event:
push
-
Statement type:
File details
Details for the file temporal_ewma_worker-0.1.2-py3-none-any.whl.
File metadata
- Download URL: temporal_ewma_worker-0.1.2-py3-none-any.whl
- Upload date:
- Size: 16.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
16e4774b27dc6943c84e0835bd8a2a8e93d4b37fd0637acbb2ecbd2eafab8370
|
|
| MD5 |
eec3e70a1bd971c511eb2958e59be3e1
|
|
| BLAKE2b-256 |
0cd6df95c56507ff98efa71182ca9361cc9e379aaed3f343953d5b4c7fca77b4
|
Provenance
The following attestation bundles were made for temporal_ewma_worker-0.1.2-py3-none-any.whl:
Publisher:
publish-temporal-ewma-worker.yml on Chief-Strategist-J/llm-observability-platform
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
temporal_ewma_worker-0.1.2-py3-none-any.whl -
Subject digest:
16e4774b27dc6943c84e0835bd8a2a8e93d4b37fd0637acbb2ecbd2eafab8370 - Sigstore transparency entry: 1633080030
- Sigstore integration time:
-
Permalink:
Chief-Strategist-J/llm-observability-platform@f7be928ae53d313f7e2aaee4b166ae3e0f569284 -
Branch / Tag:
refs/heads/main - Owner: https://github.com/Chief-Strategist-J
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish-temporal-ewma-worker.yml@f7be928ae53d313f7e2aaee4b166ae3e0f569284 -
Trigger Event:
push
-
Statement type: