Skip to main content

Open-source AI-native order management system

Project description

KubeRiva OMS

The open-source, AI-native order management system — self-host the order routing stack that replaces any leading OMS.

PyPI CI Docker Build License Python GHCR GitHub Stars Discussions


Architecture · Discussions · Roadmap · Contributing


Demo GIF coming soontrack progress

What is KubeRiva OMS?

KubeRiva OMS is a production-grade, async-first order management system built for mid-market e-commerce teams that have outgrown Shopify's native fulfillment but can't justify pricing for any leading OMS provider.

  • AI-native sourcing — 7 strategies including AI_ADAPTIVE (Claude Haiku scores fulfillment nodes per order using historical delivery rates, cost, and backorder data) with a full outcome-labeling feedback loop
  • Multi-tenant by default — each organization gets its own isolated PostgreSQL data-plane database, provisioned automatically
  • Connector ecosystem — bidirectional Shopify and Amazon SP-API out of the box; pluggable framework for WooCommerce, Magento, FedEx, UPS, and more

Quick Start

Option A — pip / pipx (recommended)

Requires: Python 3.12+ and Docker Desktop

# pip
pip install kuberiva-oms

# or pipx (isolated, no environment pollution)
pipx install kuberiva-oms
kuberiva start --seed    # starts all services + loads demo data

That's it. KubeRiva OMS pulls the Docker images and starts all 9 services automatically.

Command Description
kuberiva start Start all services
kuberiva start --seed Start + load demo data
kuberiva start --open Start + open dashboard in browser
kuberiva stop Stop all services
kuberiva restart Restart all services
kuberiva restart api Restart a single service
kuberiva status Show running services and URLs
kuberiva logs View logs (all services)
kuberiva logs api --follow Stream API logs
kuberiva seed Load demo data into a running instance
kuberiva update Pull latest images and restart
kuberiva --version Show installed version

Option B — Docker Compose (manual)

Requires: Docker Desktop 4.x (or Docker Engine + Compose v2)

git clone https://github.com/KubeRiva/OMS.git
cd OMS
cp .env.example .env
docker compose up --build
# Seed demo data
docker compose exec api python scripts/seed.py

Service URL
Frontend dashboard http://localhost:3001
API (OpenAPI docs) http://localhost:8001/docs
Celery Flower http://localhost:5556

Default login: admin@example.com / admin123


Features

Order lifecycle 15-state machine: PENDING → CONFIRMED → SOURCING → SOURCED → PICKING → PACKING → READY_TO_SHIP → SHIPPED → DELIVERED + return/cancel paths
7 sourcing strategies DISTANCE_OPTIMAL, COST_OPTIMAL, STORE_NEAREST, INVENTORY_RESERVATION, LEAST_COST_SPLIT, AI_ADAPTIVE, AI_HYBRID
AI learning loop Hourly outcome labeling → nightly pattern discovery → daily A/B experiment evaluation → human-approved proposals
Multi-tenant Control-plane + per-tenant data-plane PostgreSQL; environment middleware resolves tenant from request header
Connectors Shopify (webhook + fulfillment push), Amazon SP-API (polling + fulfillment push)
Inventory Multi-node stock tracking, adjustments, transfers, reservations across warehouses / stores / dark stores
Webhooks HMAC-SHA256 signed, exponential backoff retry, delivery history
Search Elasticsearch full-text search across orders and products
Observability Prometheus metrics, structured JSON logs, Celery Flower, MongoDB audit trail for every order event
RBAC 3-tier: PLATFORM_OWNER > SUPERADMIN > USER

Connector Ecosystem

Connector Status Direction
Shopify ✅ Stable Webhook inbound + fulfillment push
Amazon SP-API ✅ Stable Polling inbound + fulfillment push
WooCommerce 🗺 Planned v0.2
Magento 🗺 Planned v0.2
BigCommerce 🗺 Planned
FedEx 🗺 Planned v0.3 Label generation + tracking
UPS 🗺 Planned v0.3 Label generation + tracking
DHL 🗺 Planned v0.3 Label generation + tracking

Want a connector that's not listed? Open a connector request.


Tech Stack

Layer Technology
API FastAPI 0.111, Python 3.12, Pydantic v2
Primary DB PostgreSQL 16 (asyncpg)
Document DB MongoDB 7.0 (Motor async)
Cache / Queue broker Redis 7.2 (aioredis)
Search Elasticsearch 8.12
Task queue Celery 5.4 — 7 queues
Frontend React 18, TypeScript, Vite, TailwindCSS, TanStack Query v5
AI Anthropic Claude Haiku (AI_ADAPTIVE sourcing)
Containers Docker Compose (9 services)

Contributing

We welcome PRs for bug fixes, new connectors, and features on the roadmap.

See CONTRIBUTING.md for setup instructions, style guide, and the connector build guide.

Ask questions in GitHub Discussions before starting a large PR.


License

Apache 2.0 — see LICENSE.


Full Documentation

The sections below cover the complete system internals.


Table of Contents

  1. Architecture Overview
  2. Directory Structure
  3. Tech Stack
  4. Data Models
  5. API Reference
  6. Sourcing Rules Engine
  7. AI-Native Architecture
  8. Fulfillment Pipeline
  9. Celery Workers
  10. Webhook System
  11. Connector System
  12. Configuration
  13. Running the System
  14. End-to-End Order Flow

1. Architecture Overview

System Architecture

graph TB
    subgraph clients[" Client Layer "]
        WEB["Web / Mobile / POS"]
        APIC["API Client"]
        MKT["Marketplace"]
    end

    subgraph ext[" External Connectors "]
        SHP["Shopify\nWebhook inbound · Fulfillment push"]
        AMZ["Amazon SP-API\nPolling inbound · Fulfillment push"]
    end

    subgraph app[" FastAPI Application — Python 3.12 "]
        MW["EnvironmentMiddleware\nResolves tenant from X-OMS-Environment header"]
        RT["18+ Routers\n/orders · /inventory · /nodes · /sourcing-rules\n/connectors · /architect · /ai · /webhooks"]
    end

    subgraph db[" Persistence Layer "]
        PG[("PostgreSQL 16\nper-tenant data plane\nOrders · Inventory · Nodes · Rules")]
        MG[("MongoDB 7\nAudit events · AI patterns\nSourcing outcomes")]
        RD[("Redis 7.2\nCache · Celery broker\nSession store")]
        ES[("Elasticsearch 8.12\nFull-text order\n& product search")]
    end

    subgraph workers[" Celery Workers — 7 Queues "]
        Q1["sourcing"]
        Q2["fulfillment"]
        Q3["carrier"]
        Q4["notifications"]
        Q5["webhooks"]
        Q6["connectors"]
        Q7["learning"]
    end

    clients -->|"HTTP / REST"| app
    ext -->|"Webhook POST"| app
    app --> PG & MG & RD & ES
    app -->|"enqueue tasks"| RD
    RD -->|"broker"| workers
    workers --> PG & MG
    Q6 --> ext
    Q7 -->|"label · discover · propose"| MG

AI-Native 5-Layer Architecture

graph TB
    L5["Layer 5 — Continuous Learning Loop\nNightly pattern discovery · Hourly outcome labeling · Daily A/B evaluation"]
    L4["Layer 4 — Meta-AI Self-Modification\nProposals to human approval to safe apply to rollback"]
    L3["Layer 3 — AI Architect UI\nProposals · Patterns · Experiments · Performance dashboards"]
    L2["Layer 2 — AI Sourcing Engine\nAI_ADAPTIVE: node scoring · confidence threshold · rule fallback"]
    L1["Layer 1 — Intelligence Data Foundation\nOutcome tracking · Pattern storage · Feature vectors"]

    L5 -->|"validated patterns"| L4
    L4 -->|"applied rules activate"| L2
    L3 -->|"surfaces insights from"| L1
    L2 -->|"produces outcomes into"| L1
    L1 -->|"cluster aggregation"| L5

    style L5 fill:#1e3a5f,color:#fff,stroke:#4a90d9
    style L4 fill:#1a4a2e,color:#fff,stroke:#4caf50
    style L3 fill:#2d1b4e,color:#fff,stroke:#9c27b0
    style L2 fill:#4a2800,color:#fff,stroke:#ff9800
    style L1 fill:#3a1a1a,color:#fff,stroke:#f44336

AI Sourcing Decision Flow

flowchart TD
    A([Order Received]) --> B[EnvironmentMiddleware resolves tenant]
    B --> C{Active sourcing rules exist?}
    C -->|No| DIST[DISTANCE_OPTIMAL fallback]
    C -->|Yes| D[Evaluate rules in priority order]
    D --> E{Rule matched?}
    E -->|No| DIST
    E -->|Yes| F{A/B experiment running?}
    F -->|Yes| G{traffic split check}
    G -->|strategy_b| STRAT_B[Strategy B]
    G -->|strategy_a| STRAT_A[Strategy A]
    F -->|No| STRAT_A
    STRAT_A & STRAT_B --> H{Strategy type}
    H -->|DISTANCE_OPTIMAL| SD[Haversine distance scoring]
    H -->|COST_OPTIMAL| SC[Carrier cost scoring]
    H -->|INVENTORY_RESERVATION| SI[Stock level scoring]
    H -->|LEAST_COST_SPLIT| SS[Multi-node split scoring]
    H -->|AI_ADAPTIVE| AI[AI Node Scoring\ndelivery · cost · backorder · return rates]
    AI --> AIC{samples >= 10 and score >= 0.4?}
    AIC -->|No| DIST
    AIC -->|Yes| ALLOC
    SD & SC & SI & SS & DIST --> ALLOC[FulfillmentAllocation created\nnode selected · inventory reserved]
    ALLOC --> PICK[Celery: PICKING]
    PICK --> PACK[Celery: PACKING]
    PACK --> SHIP[Celery: READY_TO_SHIP]
    SHIP --> PUSH[Connector push to Shopify or Amazon]
    PUSH --> LABEL[Learning worker labels outcome]
    LABEL --> PATTERN[(MongoDB pattern store)]

Order Lifecycle

stateDiagram-v2
    [*] --> PENDING
    PENDING --> CONFIRMED : confirm
    CONFIRMED --> SOURCING : source_order enqueued
    SOURCING --> SOURCED : node allocated
    SOURCING --> BACKORDERED : no stock available
    BACKORDERED --> SOURCING : retry after 30 min
    SOURCED --> PICKING : start_picking task
    PICKING --> PACKING : complete_picking task
    PACKING --> READY_TO_SHIP : complete_packing task
    READY_TO_SHIP --> SHIPPED : carrier label booked
    SHIPPED --> OUT_FOR_DELIVERY : tracking update
    OUT_FOR_DELIVERY --> DELIVERED : delivery confirmed
    DELIVERED --> RETURNED : return initiated
    RETURNED --> [*]
    PENDING --> CANCELLED : cancel
    CONFIRMED --> CANCELLED : cancel
    SOURCED --> CANCELLED : cancel
    CANCELLED --> [*]
    DELIVERED --> REFUNDED : refund issued
    REFUNDED --> [*]

Request Lifecycle

  1. Client POSTs to POST /orders — Pydantic v2 validates, written to PostgreSQL, indexed in Elasticsearch
  2. order.created event written to MongoDB audit log
  3. source_order task enqueued on Redis-backed sourcing Celery queue
  4. Sourcing engine evaluates rules → selects strategy → may invoke AI_ADAPTIVE node scoring
  5. FulfillmentAllocation created → pipeline flows: sourcing → fulfillment → carrier → notifications → webhooks
  6. learning worker labels delivery outcome → feeds pattern store → triggers nightly discovery

2. Directory Structure

OMS/
├── Dockerfile                      # Multi-stage Python 3.12 image
├── docker-compose.yml              # All 8 services
├── requirements.txt                # All Python dependencies
├── .env                            # Local environment variables
│
├── app/
│   ├── main.py                     # FastAPI app, lifespan, router registration
│   ├── config.py                   # Pydantic Settings (reads .env)
│   │
│   ├── database/
│   │   ├── postgres.py             # Async SQLAlchemy engine + session factory
│   │   ├── mongodb.py              # Motor async client + index creation
│   │   ├── redis_client.py         # aioredis pool + cache helpers
│   │   └── elasticsearch_client.py # Async ES client + index mapping creation
│   │
│   ├── models/postgres/
│   │   ├── order_models.py         # Order, OrderItem, FulfillmentAllocation, Shipment,
│   │   │                           #   WebhookEndpoint, WebhookEvent + all enums
│   │   ├── inventory_models.py     # InventoryItem, InventoryAdjustment, InventoryReservation
│   │   ├── node_models.py          # FulfillmentNode + NodeType/NodeStatus enums
│   │   ├── sourcing_rule_models.py # SourcingRule + SourcingStrategy/ConditionOperator enums
│   │   ├── connector_models.py     # Connector, ConnectorEvent + ConnectorType/Status/Direction enums
│   │   └── ai_models.py            # AIProposal, CustomAttributeDefinition, UIWidget,
│   │                               #   AIExperiment, SourcingOutcomeLabel + enums
│   │
│   ├── schemas/
│   │   ├── common.py               # PaginationParams, PaginatedResponse, MessageResponse
│   │   ├── orders.py               # OrderCreate/Update/Response, OrderItemCreate, etc.
│   │   ├── inventory.py            # InventoryItemCreate/Response, AdjustmentCreate, etc.
│   │   ├── nodes.py                # NodeCreate/Update/Response
│   │   ├── sourcing_rules.py       # SourcingRuleCreate/Response, SourcingCondition, SourcingResult
│   │   ├── search.py               # OrderSearchRequest/Response, SearchHit
│   │   ├── analytics.py            # DashboardSummary, ChannelBreakdown, etc.
│   │   ├── webhooks.py             # WebhookEndpointCreate/Response, WebhookEventResponse
│   │   └── connectors.py           # ConnectorCreate/Update/Response, ConnectorEventResponse
│   │
│   ├── routers/
│   │   ├── orders.py               # 9 endpoints — full order CRUD + status transitions
│   │   ├── inventory.py            # 8 endpoints — stock management + transfers
│   │   ├── nodes.py                # 6 endpoints — node CRUD + capacity
│   │   ├── sourcing_rules.py       # 7 endpoints — rule CRUD + manual evaluation
│   │   ├── search.py               # 3 endpoints — order + product full-text search
│   │   ├── analytics.py            # 3 endpoints — dashboard + volume + inventory summary
│   │   ├── webhooks.py             # 8 endpoints — endpoint + event management
│   │   ├── connectors.py           # 10 endpoints — CRUD + webhook receiver + event log
│   │   └── architect.py            # 20+ endpoints — proposals, patterns, experiments,
│   │                               #   node performance, AI sourcing comparison
│   │
│   ├── services/
│   │   ├── sourcing_engine.py      # Intelligence core (see §6) + AI_ADAPTIVE + experiment routing
│   │   ├── ai_sourcing.py          # AISourcingAdvisor — KubeAI node scoring
│   │   ├── pattern_discovery.py    # PatternDiscoveryService — nightly cluster aggregation + proposals
│   │   ├── schema_evolution.py     # SchemaEvolutionEngine — safe additive schema changes
│   │   ├── webhook.py              # HMAC-SHA256 signed delivery
│   │   └── connectors/
│   │       ├── __init__.py         # Package init
│   │       ├── base.py             # Abstract BaseConnector (validate, normalize, push)
│   │       ├── shopify.py          # Shopify bidirectional implementation
│   │       └── registry.py         # ConnectorType → class mapping
│   │
│   └── workers/
│       ├── celery_app.py           # Celery factory + 7 queues + beat schedule
│       ├── sourcing.py             # source_order task (writes sourcing_outcomes to MongoDB)
│       ├── fulfillment.py          # start_picking, complete_packing, reset_node_daily_counters
│       ├── carrier.py              # book_shipment, simulate_delivery, sync_all_tracking
│       ├── notifications.py        # Email/SMS notification tasks
│       ├── webhooks.py             # dispatch_webhook, retry_failed_webhooks
│       ├── connectors.py           # sync_fulfillment_to_connector task
│       └── learning.py             # label_sourcing_outcomes, discover_patterns,
│                                   #   update_node_performance, evaluate_ai_experiments
│
├── scripts/
│   └── seed.py                     # Seeds all 4 databases with realistic data
│
└── tests/
    └── test_imports.py             # 13 import + unit tests (all passing)

3. Tech Stack

Layer Technology Purpose
API framework FastAPI 0.111 + Uvicorn Async REST API, OpenAPI docs
ORM SQLAlchemy 2.0 (async) PostgreSQL ORM with asyncpg driver
Primary DB PostgreSQL 16 Orders, inventory, nodes, sourcing rules, AI proposals
Document DB MongoDB 7 (Motor) Event log, product catalog, sourcing outcomes, patterns
Cache / Queue Redis 7.2 Celery broker/backend, cache, rate-limiting
Search Elasticsearch 8.12 Full-text order and product search
Task queue Celery 5.4 + Flower Async pipeline workers, beat scheduler
AI / LLM KubeAI claude-haiku-4-5-20251001 (Anthropic) AI node scoring, NL → proposals
Validation Pydantic v2 Request/response schemas, settings
Containers Docker Compose All 8 services in one command
HMAC signing hashlib + hmac (stdlib) Webhook payload integrity
Geo math haversine (stdlib math) Sourcing distance calculations

4. Data Models

4.1 PostgreSQL Models

fulfillment_nodes

Column Type Description
id UUID PK Node identifier
code VARCHAR(50) UNIQUE Short code e.g. DC-EAST
name VARCHAR(200) Display name
node_type ENUM DISTRIBUTION_CENTER, RETAIL_STORE, DARK_STORE, WAREHOUSE, PICKUP_POINT
status ENUM ACTIVE, INACTIVE, MAINTENANCE, CLOSED
latitude/longitude FLOAT Geographic coordinates
can_ship/pickup/curbside/same_day BOOL Capability flags
daily_order_capacity INT Max orders per day
current_daily_orders INT Reset to 0 at midnight by Celery beat
shipping_cost_multiplier FLOAT Relative cost weight for sourcing

orders

Column Type Description
id UUID PK Order identifier
order_number VARCHAR(50) UNIQUE Human-readable e.g. ORD-20240101-ABC123
channel ENUM WEB, MOBILE, POS, API, MARKETPLACE
fulfillment_type ENUM SHIP_TO_HOME, STORE_PICKUP, SHIP_FROM_STORE, CURBSIDE_PICKUP, SAME_DAY_DELIVERY
status ENUM 15-state machine (see §7)
total_amount NUMERIC(12,2) Order total
shipping_latitude/longitude FLOAT Customer location for sourcing
pickup_node_id UUID FK For BOPIS/curbside orders
sourcing_rule_id UUID FK Which rule was applied

order_items

Line items linked to an order. Tracks quantity_fulfilled as allocations are shipped.

fulfillment_allocations

Bridges an order item to a specific fulfillment node. One order can have multiple allocations (split fulfillment). Tracks the full picking → packing → shipping timeline.

shipments

One per allocation (or per order for simple cases). Stores carrier, tracking number, label URL, and a JSON array of tracking events.

inventory_items

Per-node, per-SKU stock levels with three counters:

  • quantity_on_hand — physical stock
  • quantity_reserved — soft-reserved by active allocations
  • quantity_available = on_hand - reserved — what sourcing can use

inventory_adjustments

Immutable audit log of every stock change with before/after quantities.

sourcing_rules

Configurable rules evaluated in priority order (ascending). Each rule has:

  • conditions — JSON array of {field, operator, value} tuples
  • strategy — which algorithm to apply (DISTANCE_OPTIMAL, COST_OPTIMAL, STORE_NEAREST, INVENTORY_RESERVATION, LEAST_COST_SPLIT, AI_ADAPTIVE, AI_HYBRID)
  • allowed_node_types, required_capabilities — node filters
  • max_split_nodes, cost_weight, distance_weight — algorithm parameters

webhook_endpoints + webhook_events

Persistent HMAC webhook delivery with retry state machine.

ai_proposals

All AI-proposed system changes awaiting human review. Lifecycle: PENDING → APPROVED → APPLIED (or REJECTED / ROLLED_BACK). Nothing is applied without explicit admin approval.

Column Type Description
id UUID PK Proposal identifier
proposal_type VARCHAR sourcing_rule, custom_attribute, schema_migration, ui_widget, sourcing_experiment
title VARCHAR Short human-readable title
description TEXT Plain-language explanation
rationale TEXT Data evidence (scores, sample counts, improvement %)
confidence_score FLOAT AI confidence 0–1
proposal_data JSONB The exact change payload to apply
status VARCHAR pending, approved, rejected, applied, rolled_back
rollback_data JSONB Data needed to undo the applied change
generated_by VARCHAR Source: learning_worker/pattern_discovery, chat session ID, etc.

ai_experiments

A/B tests between two sourcing strategies. Traffic is split at the sourcing worker level (random assignment per order).

Column Type Description
id UUID PK Experiment identifier
name VARCHAR Display name
strategy_a VARCHAR Control strategy (e.g. DISTANCE_OPTIMAL)
strategy_b VARCHAR Treatment strategy (e.g. AI_ADAPTIVE)
traffic_split_pct FLOAT % of qualifying orders routed to strategy_b (1–50)
filter_conditions JSONB Which orders qualify (channel, fulfillment_type, region, amount range)
status VARCHAR running, paused, completed
winner VARCHAR Set when experiment concludes
results JSONB Computed per-arm outcome comparison

custom_attribute_definitions

Dynamic schema extensions — adds new fields to orders, products, nodes without DDL changes (uses existing metadata_ JSONB columns).

sourcing_outcome_labels

PostgreSQL mirror of labeled sourcing_outcomes documents for fast analytical queries.

4.2 MongoDB Collections

Collection Purpose
order_events Append-only audit trail for every order state change
product_catalog Rich product data (images, attributes, rich descriptions)
webhook_deliveries Delivery attempt history per event
notifications Email/SMS notification log
sourcing_outcomes Per-allocation sourcing decision snapshot + delivery outcome labels
sourcing_patterns Aggregated node performance per order-feature cluster (channel|region|amount|type)
node_performance_metrics Rolling 7-day and 30-day node stats (avg score, delivery hours, backorder rate)

sourcing_outcomes document example:

{
  "order_id": "uuid",
  "allocation_id": "uuid",
  "node_id": "uuid",
  "node_name": "DC-EAST",
  "sku": "SKU-WIDGET-A",
  "strategy_used": "AI_ADAPTIVE",
  "cluster_key": "WEB|NY|100-250|SHIP_TO_HOME",
  "channel": "WEB",
  "region": "NY",
  "amount_bucket": "100-250",
  "fulfillment_type": "SHIP_TO_HOME",
  "sourcing_score": 0.85,
  "predicted_cost": 8.50,
  "predicted_distance_miles": 13.5,
  "ai_score": 0.91,
  "ai_reasoning": "DC-EAST has 94% on-time delivery for NY in last 7 days",
  "experiment_id": "uuid-or-null",
  "sourced_at": "2024-03-10T14:22:00Z",
  "actual_delivery_hours": 24.5,
  "actual_cost": 9.10,
  "cost_variance_pct": 7.1,
  "was_backordered": false,
  "was_returned": false,
  "outcome_score": 0.92,
  "labeled_at": "2024-03-12T09:00:00Z"
}

Outcome score formula:

outcome_score = (
  0.4 × delivery_score     # 1.0 if ≤24h, 0.5 if ≤48h, 0.0 if >72h
  0.3 × cost_score          # 1.0 if variance ≤5%, 0.0 if >25%
  0.2 × (1 - backordered)   # 1.0 if no backorder
  0.1 × (1 - returned)      # 1.0 if not returned
)

Indexes: order_events is indexed on (order_id, timestamp) and event_type. product_catalog has a text index on name + description for full-text search. sourcing_outcomes indexed on (cluster_key, strategy_used, outcome_score) and (order_id, allocation_id).

4.3 Redis Key Schema

Key Pattern Type TTL Purpose
oms:version STRING 24h Current version
oms:stats HASH Aggregate counters
oms:active_strategies STRING 1h Cached strategy list
celery:* Various Celery broker/result state

4.4 Elasticsearch Indexes

oms_orders

Optimized for order search. Fields: order_number (keyword), customer_name (text), channel, status, fulfillment_type, total_amount, created_at, tags, nested line_items.

oms_products

Product catalog search. Fields: sku (keyword), name (text), description (text), category (keyword), price (float).


5. API Reference

All endpoints are documented at http://localhost:8001/docs (Swagger UI) and http://localhost:8001/redoc.

5.1 Orders Router (/orders)

Method Path Description
POST /orders/ Create new order (triggers sourcing)
GET /orders/ List orders with filters (status, channel, date range, email)
GET /orders/{order_id} Get single order with all relationships
GET /orders/number/{order_number} Get order by order number
PATCH /orders/{order_id}/status Transition order status
POST /orders/{order_id}/cancel Cancel an order
GET /orders/{order_id}/events Get MongoDB audit trail

Create Order payload example:

{
  "channel": "WEB",
  "fulfillment_type": "SHIP_TO_HOME",
  "customer_email": "alice@example.com",
  "customer_name": "Alice Smith",
  "line_items": [
    {
      "sku": "SKU-WIDGET-A",
      "product_name": "Premium Widget A",
      "quantity": 2,
      "unit_price": 29.99
    }
  ],
  "shipping_address": {
    "address1": "123 Main St",
    "city": "New York",
    "state": "NY",
    "postal_code": "10001",
    "latitude": 40.7484,
    "longitude": -73.9967
  }
}

5.2 Inventory Router (/inventory)

Method Path Description
POST /inventory/ Create inventory item for a node/SKU
GET /inventory/ List inventory (filter by node, SKU, low-stock)
GET /inventory/sku/{sku} All node stock for a specific SKU
GET /inventory/products Aggregated product list grouped by SKU (search, node, low-stock filters)
PATCH /inventory/products/{sku} Update product-level attributes for all nodes at once
GET /inventory/{item_id} Single inventory item
PATCH /inventory/{item_id} Update item metadata
POST /inventory/{item_id}/adjust Apply stock adjustment (reason must be valid enum: RECEIVED, RETURNED, DAMAGED, CYCLE_COUNT, CORRECTION, SOLD, etc.)
POST /inventory/check-availability Bulk availability check across all nodes
POST /inventory/transfer Transfer stock between nodes

Adjustment reasons (enum): RECEIVED, SOLD, RETURNED, DAMAGED, CYCLE_COUNT, TRANSFER_IN, TRANSFER_OUT, RESERVED, RESERVATION_RELEASED, CORRECTION

5.3 Fulfillment Nodes Router (/nodes)

Method Path Description
POST /nodes/ Register a new DC or store
GET /nodes/ List nodes (filter by type, status, capabilities)
GET /nodes/{node_id} Get node details
PATCH /nodes/{node_id} Update node configuration
DELETE /nodes/{node_id} Deactivate node (soft delete)
GET /nodes/{node_id}/capacity Get daily capacity utilization

5.4 Sourcing Rules Router (/sourcing-rules)

Method Path Description
POST /sourcing-rules/ Create new rule
GET /sourcing-rules/ List rules sorted by priority
GET /sourcing-rules/{rule_id} Get rule details
PATCH /sourcing-rules/{rule_id} Update rule
DELETE /sourcing-rules/{rule_id} Delete rule
POST /sourcing-rules/{rule_id}/toggle Enable/disable rule
POST /sourcing-rules/evaluate Manually run sourcing for an order

5.5 Search Router (/search)

Method Path Description
POST /search/orders Full-text order search with filters
GET /search/orders GET-style order search (query params)
POST /search/products Full-text product search

Supports: fuzzy matching, multi-field search, date/amount range filters, pagination, sort order.

5.6 Analytics Router (/analytics)

Method Path Description
GET /analytics/dashboard Full KPI dashboard summary
GET /analytics/orders/volume Daily order volume over N days
GET /analytics/inventory/summary Aggregate inventory health metrics

5.7 Webhooks Router (/webhooks)

Method Path Description
POST /webhooks/endpoints Register webhook endpoint
GET /webhooks/endpoints List endpoints
GET /webhooks/endpoints/{id} Get endpoint
PATCH /webhooks/endpoints/{id} Update endpoint
DELETE /webhooks/endpoints/{id} Delete endpoint
POST /webhooks/endpoints/{id}/test Send test event
GET /webhooks/events List delivery events
POST /webhooks/events/{id}/retry Retry failed event

5.8 Connectors Router (/connectors)

Superadmin-only CRUD for integration connectors (except the public webhook receiver).

Method Path Auth Description
POST /connectors/ Superadmin Create a new connector
GET /connectors/ Superadmin List connectors (filter by status)
GET /connectors/{id} Superadmin Get single connector
PATCH /connectors/{id} Superadmin Update connector config
DELETE /connectors/{id} Superadmin Delete connector
POST /connectors/{id}/toggle Superadmin Enable / disable connector
POST /connectors/{id}/test Superadmin Test API connection to the platform
GET /connectors/{id}/events Superadmin Paginated inbound/outbound event log
POST /connectors/generate-secret Superadmin Generate a secure webhook secret
POST /connectors/{id}/webhook Public HMAC-validated inbound webhook receiver

Sensitive config fields (access_token, webhook_secret, api_key, etc.) are always masked as *** in API responses.

5.9 AI Architect Router (/architect)

Superadmin-only. All endpoints require requireSuperadmin authentication.

Proposals

Method Path Description
GET /architect/proposals List proposals (filter by status, proposal_type)
GET /architect/proposals/{id} Get proposal detail with full rationale
POST /architect/proposals/{id}/approve Mark proposal as approved
POST /architect/proposals/{id}/reject Reject with reason
POST /architect/proposals/{id}/apply Execute an approved proposal (safe, additive only)
POST /architect/proposals/{id}/rollback Undo an applied proposal

Patterns & Performance

Method Path Description
GET /architect/patterns List discovered order-feature clusters with node rankings
GET /architect/node-performance Rolling node stats (?period_days=7 or 30)
GET /architect/ai-sourcing/performance AI vs rule-based outcome comparison

A/B Experiments

Method Path Description
GET /architect/experiments List experiments (filter by status)
POST /architect/experiments Create new experiment
POST /architect/experiments/{id}/pause Pause a running experiment
POST /architect/experiments/{id}/resume Resume a paused experiment
GET /architect/experiments/{id}/results Live per-arm outcome aggregation

6. Sourcing Rules Engine

File: app/services/sourcing_engine.py

The engine is the intelligence core. It runs every time an order needs to be sourced.

6.1 Processing Pipeline

Order
  │
  ▼
RuleSelector ──► Find highest-priority SourcingRule where ALL conditions match
  │
  ▼
NodeFilter ──► Apply: node type filter, capability filter, distance filter,
  │            capacity filter, excluded node list
  ▼
InventoryLoader ──► Load quantity_available per (node, SKU) in one query
  │
  ▼
NodeScorer ──► Compute normalized score per strategy
  │
  ▼
AllocationDecider ──► Single-node or split allocation
  │
  ▼
Persist ──► Write FulfillmentAllocation rows + reserve inventory

6.2 Seven Sourcing Strategies

DISTANCE_OPTIMAL

Score = distance_norm × 0.7 + inventory_norm × 0.3

Picks the node closest to the customer's shipping address. Uses Haversine great-circle distance. Falls back to split if no single node can fulfill all items.

COST_OPTIMAL

Score = cost_norm × cost_weight + distance_norm × distance_weight

Minimizes total estimated shipping cost (base_rate + per_km_rate × distance × node_multiplier). Weights are configurable per rule (default 50/50).

STORE_NEAREST

Identical scoring to DISTANCE_OPTIMAL but the node filter pre-restricts to RETAIL_STORE and DARK_STORE types. Used for same-day delivery and local fulfillment.

INVENTORY_RESERVATION

Score = inventory_norm × 0.8 + distance_norm × 0.2

Prefers nodes with the deepest available stock — reduces the chance of a reservation failing downstream. Useful for high-velocity SKUs.

LEAST_COST_SPLIT

Greedy algorithm that assigns each SKU to the cheapest eligible node:

  1. Sort SKUs by fulfillability (hardest first = fewest nodes with stock)
  2. For each SKU, iterate nodes in score order
  3. Allocate as much as possible from each node until the full quantity is covered
  4. Enforces max_split_nodes limit

AI_ADAPTIVE

Uses KubeAI claude-haiku-4-5-20251001 to score candidate nodes based on historical patterns and rolling performance data. KubeAI receives the order context (channel, region, amount, fulfillment type), the top-3 matching historical pattern clusters, 7-day node performance metrics, and a list of candidate nodes. It responds with a JSON array of {node_id, score, reason}.

Fallback to DISTANCE_OPTIMAL when:

  • Best matching pattern has < 10 samples
  • KubeAI API call fails or returns invalid JSON
  • Maximum AI score across all candidates < 0.4

Final score blend: 0.6 × ai_score + 0.4 × rule_score

AI_HYBRID

Identical to AI_ADAPTIVE but uses the blended rule_score more aggressively. Intended for transitional rollouts where full AI trust is not yet established.

6.3 Condition Operators

Operator Example
EQUALS channel == WEB
NOT_EQUALS channel != MARKETPLACE
GREATER_THAN total_amount > 200
LESS_THAN total_amount < 50
GREATER_THAN_OR_EQUAL total_amount >= 100
LESS_THAN_OR_EQUAL total_amount <= 500
IN shipping_state IN [NY, NJ, CT]
NOT_IN channel NOT IN [POS]
CONTAINS customer_email CONTAINS example.com
STARTS_WITH shipping_state STARTS_WITH N

6.4 Haversine Distance Formula

def haversine_km(lat1, lon1, lat2, lon2):
    R = 6371.0  # Earth radius km
    φ1, φ2 = radians(lat1), radians(lat2)
    Δφ = radians(lat2 - lat1)
    Δλ = radians(lon2 - lon1)
    a = sin(Δφ/2)**2 + cos(φ1)*cos(φ2)*sin(Δλ/2)**2
    return R * 2 * asin(sqrt(a))

Accuracy: ±0.5% vs actual road distance. Sufficient for DC-level sourcing decisions.


7. AI-Native Architecture

7.1 Overview

The AI layer is fully additive — it extends the existing rule engine without replacing it. All AI decisions are audited, all proposals require human approval, and every strategy has a deterministic fallback.

Design Principles:

  • Additive-only — no existing data or functionality is ever modified or deleted
  • Human-gated — proposals are created as PENDING; nothing applies without admin approval
  • Fallback-safe — every AI path falls back to DISTANCE_OPTIMAL on failure
  • Fully audited — every sourcing decision writes a sourcing_outcomes document
  • Evidence-backed — proposals include data rationale (sample counts, score improvements)

7.2 Intelligence Data Foundation

Every time an order is sourced, the sourcing Celery worker writes a sourcing_outcomes document to MongoDB capturing the full decision context: which strategy was used, AI score + reasoning, predicted cost and distance, and whether an A/B experiment was active.

When an order reaches DELIVERED, the label_sourcing_outcomes task (runs hourly) computes an outcome_score from actual delivery time, cost variance, backorder flag, and return flag. This creates a labeled training example.

Cluster key = channel|region|amount_bucket|fulfillment_type Example: WEB|NY|100-250|SHIP_TO_HOME

7.3 AI Sourcing (AI_ADAPTIVE)

AISourcingAdvisor (in app/services/ai_sourcing.py) is called by the sourcing engine when strategy == AI_ADAPTIVE. It:

  1. Extracts order features and computes the cluster key
  2. Finds the top-3 matching sourcing_patterns from MongoDB
  3. Loads rolling 7-day node_performance_metrics for each candidate
  4. Sends a structured prompt to KubeAI with order context + patterns + node metrics
  5. Parses the JSON response: [{node_id, score, reason}]
  6. Blends AI scores with rule-based scores: 0.6 × ai + 0.4 × rule
  7. Falls back to DISTANCE_OPTIMAL on any error or low-confidence result

7.4 Pattern Discovery

The discover_patterns Celery task runs nightly at 02:00 UTC via PatternDiscoveryService:

  1. Aggregates all labeled sourcing_outcomes by (cluster_key, node_id) using MongoDB $group
  2. Upserts sourcing_patterns collection with ranked node performance per cluster
  3. Runs strategy comparison: for each cluster, compares AI_ADAPTIVE vs DISTANCE_OPTIMAL avg outcome scores
  4. Creates a pending AIProposal when all thresholds are met:
    • ≥ 50 total labeled samples in cluster
    • ≥ 10 AI_ADAPTIVE samples
    • AI outperforms baseline by ≥ 10%

7.5 A/B Experiments

Admins create experiments via the Architect UI or API. The sourcing engine checks for matching running experiments before executing any strategy:

# In sourcing_engine._check_experiment():
if random.random() * 100 < exp.traffic_split_pct:
    strategy = exp.strategy_b   # treatment arm
else:
    strategy = exp.strategy_a   # control arm

The evaluate_ai_experiments task (runs daily at 03:00 UTC) computes per-arm outcome scores and declares a winner when both arms have ≥ 50 samples and the score difference ≥ 0.05.

7.6 Proposal Lifecycle

PENDING
  │ (admin clicks Approve)
  ▼
APPROVED
  │ (admin clicks Apply)
  ▼
APPLIED ──────────────────────► (admin clicks Rollback) ──► ROLLED_BACK

  ── from PENDING ──► (admin clicks Reject) ──► REJECTED

Apply operations are strictly additive:

Proposal Type Apply Action Rollback Action
sourcing_rule INSERT into sourcing_rules (is_active=False) DELETE by stored rule id
custom_attribute INSERT into custom_attribute_definitions Soft-delete (is_active=False)
schema_migration ALTER TABLE ADD COLUMN IF NOT EXISTS ... DEFAULT NULL ALTER TABLE DROP COLUMN
ui_widget INSERT into ui_widgets Soft-delete (is_active=False)
sourcing_experiment INSERT into ai_experiments UPDATE status='paused'

7.7 Architect UI

The /architect page (superadmin only) has four tabs:

Tab Content
Proposals Pending/approved/applied list; inline approve/reject/apply/rollback; rationale + proposal data preview
Patterns Discovered order-feature clusters; top-5 nodes per cluster with score bars
Experiments A/B test management; create/pause/resume; live per-arm outcome stats
Performance AI vs baseline outcome comparison; node performance table with 7d/30d toggle

8. Fulfillment Pipeline (Order Status State Machine)

Order Status State Machine

PENDING
  │ (order confirmed / payment authorized)
  ▼
CONFIRMED
  │ (sourcing engine runs)
  ▼
SOURCING ──► SOURCED
                │
                ▼
             PICKING
                │
                ▼
             PACKING
                │
                ▼
          READY_TO_SHIP
                │ (carrier booked)
                ▼
            SHIPPED
                │ (tracking events)
                ▼
        OUT_FOR_DELIVERY
                │
                ▼
           DELIVERED ◄─── PICKED_UP (for BOPIS)
                │
                ▼
            RETURNED ──► REFUNDED

  ── from any pre-shipped state ──► CANCELLED

Fulfillment Types

Type Description Required Node Capabilities
SHIP_TO_HOME Standard home delivery can_ship
STORE_PICKUP Buy online, pick up in store (BOPIS) can_pickup
SHIP_FROM_STORE Ship from retail store can_ship
CURBSIDE_PICKUP Drive-up pickup can_curbside
SAME_DAY_DELIVERY Same-day home delivery can_same_day

9. Celery Workers

7 Named Queues

Queue Worker Tasks
sourcing Sourcing Worker source_order — runs the full sourcing engine (writes sourcing_outcomes); retry_backordered_orders — retry orders stuck in backorder
fulfillment Fulfillment Worker start_picking, complete_packing, reset_node_daily_counters
carrier Carrier Worker book_shipment, simulate_delivery, sync_all_tracking
notifications Notifications Worker send_order_confirmation, send_shipment_notification, send_delivery_notification, send_cancellation_notification
webhooks Webhook Worker dispatch_webhook, retry_failed_webhooks, retry_webhook_event
connectors Connector Worker sync_fulfillment_to_connector — push shipment/tracking to external platforms; sync_order_cancel_to_connector — push cancellations; poll_amazon_orders — poll Amazon SP-API for new orders
learning Learning Worker label_sourcing_outcomes, discover_patterns, update_node_performance, evaluate_ai_experiments — low-priority; runs the AI continuous learning loop

Celery Beat Schedule

Task Schedule Description
reset_node_daily_counters Daily 00:00 UTC Reset current_daily_orders on all nodes
retry_failed_webhooks Every 5 minutes Retry FAILED webhook events due for retry
sync_all_tracking Every 15 minutes Sync carrier tracking for in-transit shipments
retry_backordered_orders Every 30 minutes Re-run sourcing for orders stuck in backorder
poll_amazon_orders Every 15 minutes Poll all active Amazon SP-API connectors for new Unshipped orders
label_sourcing_outcomes Every hour Compute outcome_score for DELIVERED orders; write labels to MongoDB + PostgreSQL
update_node_performance Every 4 hours Compute rolling 7d/30d stats per node from labeled outcomes
discover_patterns Daily 02:00 UTC Aggregate patterns, compare strategies, auto-generate AIProposals
evaluate_ai_experiments Daily 03:00 UTC Compute per-arm outcomes; declare winner when ≥50 samples per arm + score diff ≥0.05

Start workers

celery -A app.workers.celery_app worker \
  --loglevel=info \
  -Q sourcing,fulfillment,carrier,notifications,webhooks,connectors,learning \
  --concurrency=4

Monitor with Flower

http://localhost:5556

10. Webhook System

HMAC-SHA256 Signing

Every outbound webhook request is signed:

signature = HMAC-SHA256(secret, JSON.stringify(payload, sort_keys=True))
X-OMS-Signature: sha256={signature}
X-OMS-Timestamp: {unix_timestamp}
X-OMS-Event: order.shipped

To verify on the receiver side:

import hmac, hashlib, json

def verify_webhook(body: bytes, signature: str, secret: str) -> bool:
    expected = "sha256=" + hmac.new(
        secret.encode(), body, hashlib.sha256
    ).hexdigest()
    return hmac.compare_digest(expected, signature)

Supported Event Types

  • order.created — new order accepted
  • order.confirmed — payment confirmed
  • order.sourced — fulfillment node(s) assigned
  • order.picking — items being picked
  • order.packed — items packed and ready
  • order.shipped — carrier label created, tracking available
  • order.delivered — delivery confirmed
  • order.cancelled — order cancelled
  • order.test — test ping

Retry Strategy

Failed deliveries are retried with exponential backoff:

Attempt Backoff
1 5 minutes
2 10 minutes
3 20 minutes
4+ ABANDONED

11. Connector System

The Connector System provides a pluggable integration framework for syncing the OMS with external platforms: e-commerce engines (Shopify, WooCommerce, Amazon), carriers (FedEx, UPS, DHL), and WMS/TMS systems.

Architecture

External Platform (e.g. Shopify)
      │ orders/create webhook
      ▼
POST /connectors/{id}/webhook  ← PUBLIC, HMAC-validated
      │
      ▼
ShopifyConnector.normalize_order() → creates OMS Order (channel=MARKETPLACE)
      │
      ▼ (when order status → SHIPPED)
Celery task: sync_fulfillment_to_connector (queue: connectors)
      │
      ▼
ShopifyConnector.push_fulfillment() → POST Shopify /orders/{id}/fulfillments
      │
      ▼
External Platform ← buyer notified with tracking info

Supported Platforms

Platform Type Status Direction
Shopify E-commerce Live Bidirectional
Amazon SP Marketplace Live Bidirectional (inbound polling + outbound fulfillment)
WooCommerce E-commerce Planned Bidirectional
Magento E-commerce Planned Bidirectional
BigCommerce E-commerce Planned Bidirectional
FedEx Carrier Planned Outbound
UPS Carrier Planned Outbound
DHL Carrier Planned Outbound
Custom Generic Available Configurable

Shopify Setup

  1. Create a connector via the Admin UI (/connectors) or API:
curl -X POST http://localhost:8001/connectors/ \
  -H "Authorization: Bearer {token}" \
  -H "Content-Type: application/json" \
  -d '{
    "name": "My Shopify Store",
    "connector_type": "SHOPIFY",
    "direction": "BIDIRECTIONAL",
    "config": {
      "shop_url": "mystore.myshopify.com",
      "access_token": "shpat_xxxxxxxxxxxx",
      "webhook_secret": "my-hmac-secret",
      "api_version": "2024-01"
    }
  }'
  1. Copy the webhook URL from the response: http://localhost:8001/connectors/{id}/webhook

  2. Register in Shopify Admin → Settings → Notifications → Webhooks:

    • Topic: Orders / Creation
    • URL: the webhook URL from step 2
    • Format: JSON
  3. Enable the connector via POST /connectors/{id}/toggle

  4. Test the connection via POST /connectors/{id}/test → returns shop name and plan

Inbound Order Flow (Shopify → OMS)

  1. Shopify fires POST /connectors/{id}/webhook on orders/create
  2. HMAC-SHA256 signature validated against X-Shopify-Hmac-Sha256 header
  3. Deduplication check: skip if OMS already has an Order with the same external_order_id + connector_id
  4. Order normalized from Shopify format to OMS format (channel=MARKETPLACE)
  5. Order created in PostgreSQL; connector_id stored on the order
  6. OMS sourcing engine runs automatically
  7. ConnectorEvent logged with direction=inbound, status=success

Outbound Fulfillment Flow (OMS → Shopify)

  1. OMS order status transitions to SHIPPED
  2. _trigger_connector_sync(order_id) enqueued on the connectors Celery queue
  3. sync_fulfillment_to_connector task runs asynchronously:
    • Loads order + latest shipment + connector config
    • Calls ShopifyConnector.push_fulfillment()POST /orders/{shopify_id}/fulfillments.json
    • Sets notify_customer=True, sends tracking number + carrier
  4. ConnectorEvent logged with direction=outbound, status=success or failed
  5. Connector stats updated: orders_synced, last_synced_at
  6. On failure: connector.status set to ERROR, error stored in last_error

Amazon SP-API Setup

Amazon uses polling rather than webhooks. The beat task poll_amazon_orders runs every 15 minutes.

  1. Create a connector via the Admin UI (/connectors) or API:
curl -X POST http://localhost:8001/connectors/ \
  -H "Authorization: Bearer {token}" \
  -H "Content-Type: application/json" \
  -d '{
    "name": "Amazon US",
    "connector_type": "AMAZON_SP",
    "direction": "BIDIRECTIONAL",
    "config": {
      "marketplace_id": "ATVPDKIKX0DER",
      "seller_id": "YOURSELLERID",
      "client_id": "amzn1.application-oa2-client.xxx",
      "client_secret": "xxxx",
      "refresh_token": "Atzr|xxxxx"
    }
  }'
  1. Enable the connector via POST /connectors/{id}/toggle

  2. The beat scheduler polls every 15 min for Unshipped and PartiallyShipped orders via the GetOrders SP-API endpoint

  3. Outbound fulfillment: when an OMS order status → SHIPPED, the connector pushes a confirmShipment call to Amazon SP-API automatically

Extending with a New Connector

  1. Add a new ConnectorType value to the ConnectorType enum in connector_models.py
  2. Create app/services/connectors/{platform}.py implementing BaseConnector:
from app.services.connectors.base import BaseConnector

class WooCommerceConnector(BaseConnector):
    def validate_webhook(self, headers: dict, raw_body: bytes) -> bool:
        # WooCommerce uses X-WC-Webhook-Signature
        ...

    def normalize_order(self, payload: dict) -> dict:
        # Map WooCommerce order JSON → OMS OrderCreate dict
        ...

    async def push_fulfillment(self, order, shipment) -> dict:
        # POST to WooCommerce REST API
        ...

    async def test_connection(self) -> dict:
        # GET /wp-json/wc/v3/system_status
        ...
  1. Register in app/services/connectors/registry.py:
_REGISTRY = {
    ConnectorType.SHOPIFY: ShopifyConnector,
    ConnectorType.WOOCOMMERCE: WooCommerceConnector,  # add here
}
  1. Add platform metadata to frontend/src/pages/Connectors.tsx PLATFORMS dict

Data Models

connectors table

Column Type Description
id UUID PK Connector identifier
name VARCHAR(200) Display name
connector_type ENUM SHOPIFY, WOOCOMMERCE, AMAZON_SP, etc.
direction ENUM INBOUND, OUTBOUND, BIDIRECTIONAL
status ENUM ACTIVE, INACTIVE, ERROR
config JSON Platform credentials (sensitive fields masked in API)
orders_received INT Count of inbound orders received
orders_synced INT Count of outbound fulfillments pushed
last_error TEXT Most recent error message
last_synced_at TIMESTAMP Last successful outbound sync

connector_events table

Column Type Description
id UUID PK Event identifier
connector_id UUID FK Parent connector
order_id UUID FK (nullable) Associated OMS order
external_order_id VARCHAR Platform's order ID
event_type VARCHAR order.received, fulfillment.pushed, error
direction VARCHAR inbound or outbound
status VARCHAR success or failed
payload JSON Raw inbound or outbound payload
response JSON Platform API response
error_message TEXT Error detail on failure

12. Configuration

All configuration is via environment variables (.env file for local dev):

Variable Default Description
DATABASE_URL postgresql+asyncpg://... Async PostgreSQL URL
SYNC_DATABASE_URL postgresql+psycopg2://... Sync URL for Celery workers
MONGODB_URL mongodb://... MongoDB connection string
MONGODB_DB oms_events MongoDB database name
REDIS_URL redis://:pass@... Redis URL (DB 0 for cache)
CELERY_BROKER_URL redis://:pass@.../1 Redis DB 1 for Celery broker
CELERY_RESULT_BACKEND redis://:pass@.../2 Redis DB 2 for results
ELASTICSEARCH_URL http://localhost:9200 Elasticsearch URL
SECRET_KEY JWT / signing key
WEBHOOK_SECRET Default HMAC signing secret
WEBHOOK_TIMEOUT_SECONDS 10 Per-request timeout
WEBHOOK_MAX_RETRIES 3 Max retry attempts
DEFAULT_SOURCING_STRATEGY DISTANCE_OPTIMAL Fallback strategy when no rule matches
MAX_SPLIT_NODES 3 Global max nodes for split fulfillment
ANTHROPIC_API_KEY Required for AI_ADAPTIVE / AI_HYBRID strategies and NL commands

13. Running the System

Prerequisites

  • Docker Desktop
  • Python 3.12+ (for local dev / testing)

Start all services

docker compose up -d --build

Services started:

Container Exposed port
PostgreSQL 5433
MongoDB 27018
Redis 6380
Elasticsearch 9200 (internal)
API 8001
Celery worker background
Celery beat background
Flower 5556
Frontend 3001

Seed all databases

docker compose exec api python scripts/seed.py

Seeds:

  • PostgreSQL: 8 fulfillment nodes, 64 inventory items, 5 sourcing rules, 1 webhook endpoint
  • MongoDB: 8 product catalog documents, 3 sample order events
  • Redis: version, stats, and cache warmup keys
  • Elasticsearch: 8 product documents, 3 sample order documents

API Documentation

Run tests

PYTHONPATH=. pytest tests/ -v

14. End-to-End Order Flow

Step 1: Create an order

curl -X POST http://localhost:8001/orders/ \
  -H "Content-Type: application/json" \
  -d '{
    "channel": "WEB",
    "fulfillment_type": "SHIP_TO_HOME",
    "customer_email": "customer@example.com",
    "customer_name": "John Doe",
    "line_items": [
      {
        "sku": "SKU-WIDGET-A",
        "product_name": "Premium Widget A",
        "quantity": 2,
        "unit_price": 29.99
      },
      {
        "sku": "SKU-GADGET-X",
        "product_name": "Gadget X Pro",
        "quantity": 1,
        "unit_price": 99.99
      }
    ],
    "shipping_address": {
      "address1": "456 Park Ave",
      "city": "New York",
      "state": "NY",
      "postal_code": "10022",
      "latitude": 40.7614,
      "longitude": -73.9776
    }
  }'

Response (HTTP 201):

{
  "id": "550e8400-e29b-41d4-a716-446655440000",
  "order_number": "ORD-20240215-XK9M2A",
  "channel": "WEB",
  "status": "PENDING",
  ...
}

Step 2: Sourcing fires automatically

Within seconds, the Celery sourcing worker:

  1. Loads all active sourcing rules sorted by priority
  2. Evaluates the "Default — Distance Optimal" catch-all rule
  3. Loads all ACTIVE nodes with inventory for SKU-WIDGET-A and SKU-GADGET-X
  4. Computes haversine distance from (40.7614, -73.9776) to each node
  5. Scores nodes: STR-NYC-01 at (40.7484, -73.9967) → ~1.8km away, highest score
  6. Creates 2 FulfillmentAllocation rows (one per SKU) pointing to STR-NYC-01
  7. Reserves inventory: quantity_available -= quantity_allocated
  8. Order transitions to SOURCED

Step 3: Fulfillment pipeline

The fulfillment worker chain runs:

  • Picking (t+2s): Allocations → PICKING, order → PICKING
  • Packing (t+7s): Allocations → PACKED, order → PACKINGREADY_TO_SHIP

Step 4: Carrier booking

The carrier worker:

  • Selects a random carrier (UPS, FedEx, etc.) and service level
  • Generates a mock tracking number
  • Creates a Shipment record with label URL and estimated delivery
  • Updates order → SHIPPED, allocations → SHIPPED
  • Sends notifications + webhooks

Step 5: Delivery simulation

  • After 10 seconds, simulate_delivery fires
  • Adds tracking events (IN_TRANSIT → OUT_FOR_DELIVERY → DELIVERED)
  • Order → DELIVERED, shipment → DELIVERED
  • Webhook order.delivered fired to all subscribed endpoints

Step 6: Verify in search

curl -X POST http://localhost:8001/search/orders \
  -H "Content-Type: application/json" \
  -d '{"query": "John Doe", "status": "DELIVERED"}'

Step 7: Check audit trail

curl http://localhost:8001/orders/{order_id}/events

Returns chronological MongoDB events: order.created → order.sourced → order.shipped → order.delivered

Step 8: Analytics

curl "http://localhost:8001/analytics/dashboard?from_date=2024-01-01"

Returns: total orders, revenue, breakdown by channel/fulfillment type, top nodes, inventory alerts.


Seed Data Reference

Fulfillment Nodes (8 total)

Code Type City ship pickup curbside same_day Capacity
DC-EAST DC Edison NJ 2000/day
DC-WEST DC Los Angeles CA 2500/day
DC-MID DC Chicago IL 1800/day
STR-NYC-01 Store New York NY 300/day
STR-LA-01 Store Beverly Hills CA 250/day
STR-CHI-01 Store Chicago IL 200/day
STR-MIA-01 Store Miami Beach FL 150/day
DARK-SF-01 Dark San Francisco CA 500/day

Sourcing Rules (5 active)

Priority Name Strategy Conditions
10 Same-Day — West Coast STORE_NEAREST fulfillment_type = SAME_DAY_DELIVERY AND state IN [CA,WA,OR]
20 BOPIS / Curbside INVENTORY_RESERVATION fulfillment_type IN [STORE_PICKUP, CURBSIDE_PICKUP]
30 High-Value Orders COST_OPTIMAL total_amount > 200
40 Marketplace LEAST_COST_SPLIT channel = MARKETPLACE
100 Default DISTANCE_OPTIMAL (catch-all — no conditions)

Product SKUs (8 SKUs × 8 nodes = 64 inventory records)

SKU Name Price
SKU-WIDGET-A Premium Widget A $29.99
SKU-WIDGET-B Standard Widget B $19.99
SKU-GADGET-X Gadget X Pro $99.99
SKU-GADGET-Y Gadget Y Basic $49.99
SKU-GIZMO-1 Gizmo 1 $14.99
SKU-GIZMO-2 Gizmo 2 Deluxe $39.99
SKU-TOOL-Z Power Tool Z $149.99
SKU-ACCESSORY-1 Accessory Pack 1 $9.99

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

kuberiva_oms-0.1.0.tar.gz (479.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

kuberiva_oms-0.1.0-py3-none-any.whl (35.5 kB view details)

Uploaded Python 3

File details

Details for the file kuberiva_oms-0.1.0.tar.gz.

File metadata

  • Download URL: kuberiva_oms-0.1.0.tar.gz
  • Upload date:
  • Size: 479.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for kuberiva_oms-0.1.0.tar.gz
Algorithm Hash digest
SHA256 c55dab4b9231bacc16494b40d63749f9b3faa4e5acef20534271e4e6e4f59c8b
MD5 1e5756da35d183497ce579d093950212
BLAKE2b-256 efcb8acae4099285dd7ecd841cdf24a7b80396b38c7c1f0022c12a303c9379bf

See more details on using hashes here.

Provenance

The following attestation bundles were made for kuberiva_oms-0.1.0.tar.gz:

Publisher: pypi-publish.yml on KubeRiva/OMS

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file kuberiva_oms-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: kuberiva_oms-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 35.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for kuberiva_oms-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 f83d192bdccdda79fba97c0d67821270633bb40183a8f4e055f6af3bb7a242c5
MD5 3066e086641a397cb99531da446797c1
BLAKE2b-256 d230274da9c359386eafae18a26b7b3b43fe0e2d30f0de8fd4a3b119570190fd

See more details on using hashes here.

Provenance

The following attestation bundles were made for kuberiva_oms-0.1.0-py3-none-any.whl:

Publisher: pypi-publish.yml on KubeRiva/OMS

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page