Skip to main content

Temporal scheduled workflow worker for EWMA baseline updating and anomaly checking

Project description

Temporal EWMA Worker

Temporal worker package for scheduled EWMA baseline updates and cost anomaly detection.


Folder Structure

.
├── build/
│   └── Dockerfile
├── contracts/
│   ├── asyncapi/
│   │   └── v1.yaml
│   ├── changelog.md
│   └── workflows/
│       └── ewma_baseline_update.yaml
├── database/
│   ├── migrations/
│   │   ├── 0001_init.rollback.sql
│   │   └── 0001_init.sql
│   └── schema.lock
├── deploy/
│   └── docker/
│       └── docker-compose.yaml
├── feature-registry.yaml
├── pyproject.toml
├── README.md
├── scripts/
│   ├── deploy_docker.sh
│   ├── migrate.py
│   ├── migrate.sh
│   ├── run.sh
│   └── test.sh
├── src/
│   ├── features/
│   │   └── ewma_compute/
│   │       ├── index.py
│   │       └── service.py
│   ├── infra/
│   │   └── adapters/
│   │       ├── clickhouse/
│   │       │   └── clickhouse_adapter.py
│   │       ├── kafka/
│   │       │   └── kafka_alert_adapter.py
│   │       ├── postgres/
│   │       │   └── postgres_adapter.py
│   │       └── redis/
│   │           └── redis_adapter.py
│   ├── shared/
│   │   ├── contracts/
│   │   │   └── validator.py
│   │   ├── errors/
│   │   │   └── base.py
│   │   ├── ports/
│   │   │   ├── clickhouse_port.py
│   │   │   ├── postgres_port.py
│   │   │   └── redis_port.py
│   │   └── types/
│   │       └── ewma_types.py
│   └── worker/
│       ├── activities.py
│       ├── config.py
│       ├── index.py
│       ├── registry.py
│       └── workflows.py
├── tests/
│   ├── integration/
│   │   └── test_adapters.py
│   └── unit/
│       ├── test_config.py
│       ├── test_contract.py
│       ├── test_ewma_service.py
│       └── test_workflow.py
└── worker-registry.yaml

Work Execution & Decision Flow

The following detailed decision tree outlines how the hourly workflow updates baselines and flags anomalies, with justification for each design choice:

[Hourly Cron Trigger (0 * * * *)]
└── EwmaBaselineUpdate Workflow Starts
    │
    │   ► RATIONALE: Scheduled cron triggers workflow at off-peak hour intervals.
    │
    └── Activity: fetch_active_pairs()
        │
        │   ► RATIONALE: Scans ClickHouse log volumes for active (service, model) pairs 
        │     in the last 7 days. This filters out millions of historical combinations, 
        │     focusing computation ONLY on active traffic to minimize cost and execution time.
        │
        └── Loop over active (service, model) pairs concurrently:
            │
            │   ► RATIONALE: Temporal workflows run loops concurrently. Concurrency allows 
            │     thousands of pairs to be evaluated in parallel without blocking.
            │
            ├── Activity: get_baseline(service, model, hour_of_week)
            │   │
            │   │   ► RATIONALE: Reads the current baseline record from PostgreSQL. PostgreSQL is
            │   │     used here because it provides ACID compliance for historical baselines.
            │   │
            │   ├── Existing Baseline NOT found (Cold Start)
            │   │   ├── Activity: fetch_global_model_avg(model)
            │   │   │
            │   │   │   ► RATIONALE: Lacking historical service/model pairing, we seed the baseline
            │   │   │     using the global average cost for this specific model (e.g. gpt-4o) across 
            │   │   │     all services. This prevents false positive anomaly triggers during cold starts.
            │   │   │
            │   │   └── Seed EWMA baseline value = Global Model Average
            │   │
            │   └── Existing Baseline found (Warm Status)
            │       ├── Activity: fetch_cost_history(service, model, hour_of_week)
            │       │   │
            │       │   │   ► RATIONALE: Queries ClickHouse for the cost of the same hour_of_week (0-167)
            │       │   │     over the last 4 weeks. ClickHouse is selected here because column-oriented 
            │       │   │     storage allows ultra-fast aggregation of historical logs.
            │       │   │
            │       │   └── Fetch last 4 occurrences from ClickHouse
            │       └── Compute EWMA baseline value using α=0.1:
            │           EWMA_new = (1 - α) * EWMA_prev + α * Cost_current
            │
            ├── Activity: fetch_current_cost_1h(service, model)
            │
            ├── Activity: upsert_baseline(EwmaRecord)
            │   │
            │   │   ► RATIONALE: Persists the calculated baseline to PostgreSQL for persistent audit trail.
            │   │
            │   └── Persist updated baseline to PostgreSQL
            │
            ├── Write updated EWMA value to Redis Cache
            │   │
            │   │   ► RATIONALE: Anomaly-detection gateways on the ingestion path need ultra-low latency. 
            │   │     Redis caches the calculated baseline under: ewma:cost:{service}:{model}:{hour_of_week}
            │   │
            │   └── Key: ewma:cost:{service}:{model}:{hour_of_week}
            │
            └── Decision: Is Cost_current > (3 * EWMA_baseline)?
                │
                ├── YES (Anomaly Detected)
                │   ├── Activity: fetch_cost_by_cluster_1h(service, model)
                │   │   │
                │   │   │   ► RATIONALE: If cost spikes, we query ClickHouse to break down the cost 
                │   │   │     contributions by Kubernetes cluster/namespace to locate the root cause.
                │   │   │
                │   │   └── Get cluster drilldown metrics
                │   │
                │   └── Activity: publish_anomaly_alert(AnomalyPayload)
                │       │
                │       │   ► RATIONALE: Publishes to Kafka topic 'cost-anomaly-alerts'. Using Kafka 
                │       │     decouples anomaly detection from notification delivery (Slack, pager).
                │       │
                │       └── Emit alert JSON to Kafka topic
                │
                └── NO (Normal State)
                    └── Do nothing

Sequencing & Dependency Map

To run the worker successfully, you MUST spin up and configure dependencies in the following strict order:

[Step 1: Docker Containers] ---> [Step 2: Configuration] ---> [Step 3: DB Migrations] ---> [Step 4: Verification] ---> [Step 5: Start Worker]
  • ClickHouse (8123)              • Copy .env.example          • ./scripts/migrate.sh       • ./scripts/test.sh         • ./scripts/run.sh
  • PostgreSQL (5432)              • Set hosts & ports            (Applies SQL schemas)        (Ensures integrations       (Starts polling
  • Redis Cache (6379)                                                                          and mock runs pass)         Temporal task queue)
  • Kafka & Zookeeper (9092)
  • Temporal Server (7233)

Setup & Running

Follow these steps to set up the local development environment and run the worker:

1. Prerequisites

Ensure you have the following installed:

  • Python 3.11+
  • Docker & Docker Compose
  • Git

2. Configure Virtual Environment & Dependencies

Create a virtual environment and install the package along with development requirements:

# Create virtual environment
python3 -m venv .venv

# Activate virtual environment
source .venv/bin/activate

# Install package in editable mode with development dependencies
pip install -e ".[dev]"

3. Spin Up Infrastructure

Use the provided docker-compose to run ClickingHouse, Postgres, Redis, Kafka, and Temporal locally:

docker compose -f deploy/docker/docker-compose.yaml up -d

4. Configure Environment Variables

Copy the template .env.example to .env and fill in custom connection strings if necessary:

cp .env.example .env

Database Migrations Guide

The database schema is managed via light-weight migration scripts tracked under database/migrations/ and verified using a schema.lock file.

How it Works

The migration status is tracked inside /database/migrations/schema.lock containing the active version tag (e.g. 0001 or 0000).

Apply Migrations (UP)

To apply pending database schemas, run:

./scripts/migrate.sh up

This runs 0001_init.sql against the configured PostgreSQL database and writes 0001 to schema.lock.

Rollback Migrations

To revert schemas and return to baseline state, run:

./scripts/migrate.sh rollback

This executes the rollback SQL scripts and sets the schema.lock version to 0000.

Creating a New Migration

  1. Add your SQL changes inside database/migrations/ using a sequential identifier (e.g., 0002_add_index.sql and 0002_add_index.rollback.sql).
  2. Update the transition mappings inside scripts/migrate.py to support applying and rolling back your new script file.

Running Verification & Worker

1. Run Tests

Verify configuration, domain services, and workflow behavior using the test script:

./scripts/test.sh

2. Run Worker

Start the Temporal worker polling queue ewma-tasks:

./scripts/run.sh

Remote Management API (REST)

The worker now includes a FastAPI management layer (port 8000 in prod).

Endpoint Method Description
/health GET Check worker status and config.
/trigger POST Trigger the EWMA baseline update workflow on-demand.

Example Execution

curl -X POST http://localhost:8000/trigger \
  -H "Content-Type: application/json" \
  -d '{
    "force_hour": 42
  }'

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

temporal_ewma_worker-0.1.2.tar.gz (16.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

temporal_ewma_worker-0.1.2-py3-none-any.whl (16.8 kB view details)

Uploaded Python 3

File details

Details for the file temporal_ewma_worker-0.1.2.tar.gz.

File metadata

  • Download URL: temporal_ewma_worker-0.1.2.tar.gz
  • Upload date:
  • Size: 16.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for temporal_ewma_worker-0.1.2.tar.gz
Algorithm Hash digest
SHA256 50dc5c04950345dd3753be2ed9c03cc70bf9452bb3e51d8e0eebf2c3249f344d
MD5 62c3c1f0036887a8dfd566584ccf2aae
BLAKE2b-256 66e030cf6b574044f09d4ec7ce944bb4ec8e258df285b81b0a30f564ced77e11

See more details on using hashes here.

Provenance

The following attestation bundles were made for temporal_ewma_worker-0.1.2.tar.gz:

Publisher: publish-temporal-ewma-worker.yml on Chief-Strategist-J/llm-observability-platform

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file temporal_ewma_worker-0.1.2-py3-none-any.whl.

File metadata

File hashes

Hashes for temporal_ewma_worker-0.1.2-py3-none-any.whl
Algorithm Hash digest
SHA256 16e4774b27dc6943c84e0835bd8a2a8e93d4b37fd0637acbb2ecbd2eafab8370
MD5 eec3e70a1bd971c511eb2958e59be3e1
BLAKE2b-256 0cd6df95c56507ff98efa71182ca9361cc9e379aaed3f343953d5b4c7fca77b4

See more details on using hashes here.

Provenance

The following attestation bundles were made for temporal_ewma_worker-0.1.2-py3-none-any.whl:

Publisher: publish-temporal-ewma-worker.yml on Chief-Strategist-J/llm-observability-platform

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page