Skip to main content

Pulse Engine — Hybrid framework for building Pulse products

Project description

Pulse Engine

Hybrid Python framework for building multi-tenant data products. Products pip install pulse-engine, declare a manifest, and get a full FastAPI app with OpenSearch, Athena, Celery, Prefect, and MCP — out of the box.

How It Works

┌──────────────────────────────────────────────────┐
│  Product (pip install pulse-engine)           │
│  manifest = ProductManifest(                     │
│    extractors=[...], preprocessor=..., ...       │
│  )                                               │
└──────────────┬───────────────────────────────────┘
               │
┌──────────────▼───────────────────────────────────┐
│  pulse-engine                               │
│  Base ABCs · Default implementations · App       │
│  factory · Storage connectors · Job lifecycle    │
│  · CLI · Testing utilities                       │
└──────────────┬───────────────────────────────────┘
               │
┌──────────────▼───────────────────────────────────┐
│  Shared Infrastructure                           │
│  Prefect · OpenSearch · Redis · PostgreSQL        │
└──────────────────────────────────────────────────┘

Products customize behaviour by:

  • Implementing BaseExtractor subclasses for data extraction
  • Overriding pipeline stages (preprocessor, core, postprocessor) or using defaults
  • Adding product-specific API routes, MCP tools, and Celery tasks
  • Declaring everything in a ProductManifest

Prerequisites

  • Python 3.11-3.12
  • Poetry
  • Docker & Docker Compose (for shared infrastructure)

Quick Start

For Engine Development

git clone <repo-url> && cd pulse-engine

# Install deps and pre-commit hooks
make install

# Copy env and configure
cp .env.example .env

# Run database migrations
make migrate

# Start the dev server
make run

For Building a New Product

# Install the engine
pip install pulse-engine

# Scaffold a new product
pulse init my-product
cd pulse-my-product

# Set up the product
make install
cp .env.example .env

# Validate and test
make validate
make test

# Run
make run

See docs/building-a-product.md for the full guide.

CLI

The pulse command is the primary interface:

Command Description
pulse init <name> Scaffold a new product from template
pulse validate [module] Validate a product manifest
pulse run Discover manifest and start FastAPI server
pulse run-worker Discover manifest and start Celery worker
pulse run-mcp Discover manifest and start MCP server

Product Manifest

Products declare their components via a ProductManifest:

from pulse_engine.registry import ProductManifest

manifest = ProductManifest(
    name="my-product",
    version="0.1.0",
    extractors=[MyExtractor],          # data extraction classes
    preprocessor=...,                   # ... = default, None = skip
    core_processor=...,                 # custom instance = override
    postprocessor=None,                 # skip postprocessing
    routers=[my_router],               # FastAPI routers
    mcp_tool_modules=["my_pkg.mcp"],   # MCP tool modules
    celery_task_modules=["my_pkg.tasks"],
    athena_database="my_db",
)

Products register via a pyproject.toml entry point:

[tool.poetry.plugins."pulse_engine.products"]
my_product = "pulse_my_product:manifest"

Project Structure

src/pulse_engine/
├── main.py                  # App factory (create_app)
├── config.py                # Settings (pydantic-settings)
├── registry.py              # ProductManifest, validation, discovery
├── worker.py                # Celery app factory
├── database.py              # SQLAlchemy async setup
├── dependencies.py          # FastAPI dependency injection
├── client.py                # PulseEngineClient (container → engine HTTP)
├── s3.py                    # S3Stage (NDJSON inter-stage data exchange)
├── chain_recovery.py        # Background task for stalled pipeline recovery
├── cli/
│   ├── main.py              # pulse CLI (typer)
│   └── templates/           # Cookiecutter product template
├── api/v1/
│   ├── router.py            # v1 router aggregation
│   └── health.py            # Health check endpoint
├── core/
│   ├── security.py          # JWT verification (Cognito + Job-scoped)
│   ├── job_token.py         # Job-scoped JWT issuance & verification
│   ├── scope.py             # require_scope() FastAPI dependency
│   ├── exceptions.py        # Exception hierarchy
│   ├── error_handlers.py    # Global error handlers
│   └── logging.py           # Structured logging (structlog)
├── middleware/
│   ├── request_id.py        # X-Request-ID middleware
│   ├── security_headers.py  # Defensive HTTP security headers (CSP, HSTS, etc.)
│   ├── rate_limit.py        # Sliding-window per-IP rate limiter (100 req/60 s)
│   └── tenant.py            # Dual-token middleware (Cognito + Job JWT)
├── deployment/
│   ├── models.py            # DeploymentModel ORM
│   ├── repository.py        # DeploymentRepository
│   ├── service.py           # DeploymentService
│   ├── router.py            # POST /api/v1/deployments
│   └── schemas.py           # Registration request/response
├── extractor/
│   ├── base.py              # BaseExtractor ABC
│   ├── models.py            # SQLAlchemy ORM: job_records
│   ├── stage_models.py      # SQLAlchemy ORM: job_stages
│   ├── repository.py        # JobRepository
│   ├── stage_repository.py  # StageRepository
│   ├── service.py           # JobService (stage-aware)
│   ├── router.py            # /api/v1/jobs/ endpoints
│   ├── schemas.py           # Pydantic models
│   └── orchestrator/
│       ├── base.py          # BaseOrchestratorAdapter ABC
│       ├── prefect.py       # PrefectAdapter (deployments + flow runs)
│       └── noop.py          # NoopAdapter
├── processor/
│   ├── base.py              # BasePreprocessor, BaseCoreProcessor,
│   │                        # BasePostprocessor ABCs
│   ├── pipeline.py          # Pluggable ProcessingPipeline
│   ├── router.py            # /api/v1/process/ endpoints
│   ├── schemas.py           # ProcessingContext, options
│   ├── defaults/            # Default stage implementations
│   ├── preprocessor/        # clean_html, normalize, detect_language
│   ├── core/                # chunking, NER, sentiment, topics
│   └── postprocessor/       # embeddings, dedup, quality scoring
├── storage/
│   ├── knowledge_base.py    # KnowledgeBaseService
│   ├── router.py            # /api/v1/kb/ endpoints
│   ├── schemas.py           # Document, SearchQuery, etc.
│   └── connectors/
│       ├── base.py          # BaseStorageConnector ABC
│       ├── opensearch.py    # OpenSearch connector
│       └── athena.py        # Athena connector
├── mcp/
│   ├── server.py            # FastMCP instance
│   ├── tools_kb.py          # KB tools (6)
│   ├── tools_jobs.py        # Jobs tools (6)
│   ├── tools_processor.py   # Processor tools (5)
│   ├── tools_pipelines.py   # Pipeline tools (5)
│   └── tools_modules.py     # Module registry tools (3)
├── services/
│   ├── bootstrap.py         # ServiceContainer, bootstrap_services()
│   └── opensearch.py        # OpenSearch client wrapper
└── testing/
    ├── fixtures.py          # Reusable pytest fixtures
    └── mocks.py             # MockStorageConnector, MockExtractor, etc.

infra/
├── docker-compose.yml       # Prefect, Redis, OpenSearch, PostgreSQL
└── terraform/               # AWS modules (networking, ECS, ECR, ALB)

tests/
├── unit/                    # Unit tests
│   ├── framework/           # Manifest, pipeline, base class tests
│   ├── processor/
│   ├── storage/
│   ├── deployment/          # Deployment registration tests
│   └── extractor/
└── integration/             # Integration tests
    ├── api/
    ├── mcp/
    └── pipelines/

Base Classes

Products extend these ABCs:

ABC Module Purpose
BaseExtractor pulse_engine.extractor.base Data extraction from external sources
BasePreprocessor pulse_engine.processor.base Content cleaning and normalization
BaseCoreProcessor pulse_engine.processor.base Chunking, NER, sentiment, topics
BasePostprocessor pulse_engine.processor.base Embeddings, dedup, storage formatting
BaseStorageConnector pulse_engine.storage.connectors.base Custom storage backends
BaseOrchestratorAdapter pulse_engine.extractor.orchestrator.base Custom orchestrator integrations

Testing

make test             # full suite with coverage
make test-unit        # unit tests only
make test-integration # integration tests only

Products import engine test fixtures in their conftest.py:

from pulse_engine.testing.fixtures import *  # noqa: F401, F403

Available fixtures: mock_storage_connector, mock_orchestrator, mock_extractor, kb_service, processing_pipeline.

Code Quality

Pre-commit hooks run on every commit:

Hook What it checks
trailing-whitespace No trailing whitespace
end-of-file-fixer Files end with a newline
check-yaml Valid YAML syntax
ruff Linting (auto-fix enabled)
ruff-format Code formatting
mypy Strict static type-checking
make lint    # run all hooks manually

CI/CD

PR Checks (pr-checks.yml)

Runs on every pull request to dev, uat, prod:

  • lint — ruff check + format
  • typecheck — mypy strict
  • test — unit tests with coverage
  • trivy — vulnerability scan

Deploy (deploy.yml)

Runs on push to dev, uat, prod:

Branch PyPI Target Infrastructure
dev PyPI (.dev suffix) VM via docker-compose
staging PyPI (.dev suffix) VM via docker-compose
prod PyPI (stable) ECS cluster

The pipeline: test → publish to PyPI → build Docker → push ECR → deploy → health check.

Terraform runs separately on infra/terraform/** changes: plan → apply → sync outputs to GitHub Secrets (pipeline infra vars auto-flow to .env on next deploy).

Required Secrets (per GitHub environment)

Secret Description
AWS_ROLE_ARN OIDC role for GitHub → AWS auth
ECR_REPOSITORY_URL ECR repository URL
PYPI_TOKEN PyPI API token

MCP Server

Exposes 25 tools for AI agents via the Model Context Protocol:

Category Tools
Jobs (6) jobs_register, jobs_get, jobs_list, jobs_push_status, jobs_cancel, jobs_delete
Knowledge Base (6) kb_store_documents, kb_retrieve_document, kb_search, kb_delete_document, kb_get_stats, kb_run_query
Processor (5) process_pipeline, process_preprocess, process_analyze, process_postprocess, process_chunk
Pipelines (5) pipelines_trigger, pipelines_status, pipelines_list, pipelines_cancel, pipelines_steps
Modules (3) modules_register, modules_list, modules_delete
pulse run-mcp

Products register additional tools via mcp_tool_modules in the manifest.

Environment Variables

Core

Variable Required Default Description
APP_ENV No development Environment name (development, production, etc.)
APP_VERSION No 0.1.0 Application version
LOG_LEVEL No INFO Logging level
AWS_REGION Yes ap-south-1 AWS region
AWS_ACCESS_KEY_ID No AWS credentials (use IAM role in production)
AWS_SECRET_ACCESS_KEY No AWS credentials (use IAM role in production)

Authentication (Cognito)

Variable Required Default Description
COGNITO_USER_POOL_ID Yes Cognito User Pool ID
COGNITO_APP_CLIENT_ID Yes Cognito App Client ID
COGNITO_APP_CLIENT_SECRET No Client secret (required if app client has one)

OpenSearch

Variable Required Default Description
OPENSEARCH_URL Yes OpenSearch endpoint
OPENSEARCH_USERNAME No Basic-auth username (AWS managed domains)
OPENSEARCH_PASSWORD No Basic-auth password
OPENSEARCH_USE_SSL No true Enable TLS
OPENSEARCH_VERIFY_CERTS No true Verify TLS certificates
OPENSEARCH_INDEX_PREFIX No pulse_kb Index name prefix per tenant
EMBEDDING_DIMENSION No 1536 Vector dimension for kNN indexes

Database & Cache

Variable Required Default Description
DATABASE_URL Yes Async PostgreSQL DSN (postgresql+asyncpg://...)
REDIS_URL No redis://localhost:6379/0 Redis URL (enables Celery)
CELERY_BROKER_URL No Celery broker (defaults to REDIS_URL)
CELERY_RESULT_BACKEND No Celery result backend (defaults to REDIS_URL)

Athena

Variable Required Default Description
ATHENA_AWS_ACCESS_KEY_ID No Athena-specific AWS credentials
ATHENA_AWS_SECRET_ACCESS_KEY No Athena-specific AWS credentials
ATHENA_OUTPUT_LOCATION Yes* S3 URI for Athena query results
ATHENA_WORKGROUP No primary Athena workgroup
ATHENA_QUERY_TIMEOUT_SECONDS No 60 Athena query timeout

Orchestrator (Prefect)

Variable Required Default Description
PULSE_ORCHESTRATOR_BACKEND No none prefect or none
PREFECT_API_URL No Prefect API endpoint
PREFECT_API_KEY No Prefect Cloud API key
PREFECT_ECS_WORK_POOL_NAME No products-worker-pool ECS work pool name
PREFECT_LAMBDA_WORK_POOL_NAME No lambda-worker-pool Lambda work pool name
PREFECT_LAMBDA_FUNCTION_NAME_TEMPLATE No {product}-{stage} Lambda function name pattern
PREFECT_K8S_WORK_POOL_NAME No k8s-worker-pool Kubernetes work pool name
PREFECT_K8S_NAMESPACE No pulse-jobs Kubernetes namespace
PREFECT_K8S_DEFAULT_CPU No 500m Default CPU request
PREFECT_K8S_DEFAULT_MEMORY No 1Gi Default memory request

LLM & Embeddings

Variable Required Default Description
PULSE_LLM_PROVIDER No openai LLM provider
PULSE_LLM_MODEL No gpt-4o-mini LLM model ID
PULSE_LLM_API_KEY No LLM API key (also used as embedding fallback)
PULSE_LLM_TEMPERATURE No 0.0 LLM sampling temperature
PULSE_EMBEDDING_PROVIDER No openai Embedding provider
PULSE_OPENAI_EMBEDDING_MODEL No text-embedding-3-small OpenAI embedding model
PULSE_OPENAI_API_KEY No OpenAI API key (overrides PULSE_LLM_API_KEY)

Pipeline & Jobs

Variable Required Default Description
PULSE_ENGINE_URL No Public URL containers use for callbacks
PULSE_JOB_TOKEN_SECRET No HMAC secret for job-scoped JWTs
PULSE_S3_BUCKET No S3 bucket for inter-stage NDJSON data
PULSE_CHAIN_GRACE_PERIOD_SECONDS No 300 Seconds before chain recovery auto-triggers
PULSE_MAX_CONCURRENT_JOBS_PER_TENANT No 10 Max concurrent jobs per tenant
PULSE_DEFAULT_CHUNK_SIZE No 512 Default chunk token size
PULSE_DEFAULT_CHUNK_STRATEGY No token_count Default chunking strategy
PULSE_DEDUP_SIMILARITY_THRESHOLD No 0.95 Cosine similarity dedup threshold

Pipeline Infrastructure (from Terraform)

These are auto-synced from Terraform outputs to GitHub Secrets, then written to .env at deploy time:

Variable Description
PIPELINE_TASK_DEFINITION ECS task definition family for pipeline steps
PIPELINE_CLUSTER_NAME ECS cluster for pipeline step tasks
PIPELINE_EXECUTION_ROLE_ARN ECS task execution role (ECR pull, logs, secrets)
PIPELINE_TASK_ROLE_ARN ECS task role (S3, Lambda invoke, ECS dispatch)
PIPELINE_LOG_GROUP CloudWatch log group for ECS pipeline steps
PIPELINE_SUBNETS Comma-separated private subnet IDs
PIPELINE_SECURITY_GROUPS Comma-separated security group IDs
LAMBDA_EXECUTION_ROLE_ARN Lambda execution role for pipeline functions
LAMBDA_SUBNETS Comma-separated subnet IDs for Lambda VPC config
LAMBDA_SECURITY_GROUPS Comma-separated security group IDs for Lambda
LAMBDA_LOG_GROUP CloudWatch log group for Lambda pipeline steps

MCP Server

Variable Required Default Description
MCP_TRANSPORT No sse Transport mode: sse or stdio
MCP_SSE_HOST No 127.0.0.1 MCP SSE server host
MCP_SSE_PORT No 8001 MCP SSE server port

API Authentication

All endpoints (except /api/v1/health and /api/v1/auth/login) require a JWT token.

Rate limits — enforced per IP address:

  • Login (POST /api/v1/auth/login): 5 attempts per 60 seconds. Returns 429 with Retry-After: 60 on breach.
  • Global: 100 requests per 60 seconds across all endpoints. Responses include X-RateLimit-Limit and X-RateLimit-Remaining headers.

Note — Swagger UI (/docs) and ReDoc (/redoc) are disabled in production (APP_ENV=production).

Get a Token

curl -X POST https://api.dev.pulse.mananalabs.ai/api/v1/auth/login \
  -H "Content-Type: application/json" \
  -d '{"email": "dev@pulse-engine.com", "password": "PulseDev@2026"}'

Response:

{
  "id_token": "eyJ...",
  "access_token": "eyJ...",
  "refresh_token": "eyJ...",
  "expires_in": 3600,
  "token_type": "Bearer",
  "tenant_id": "tenant-dev-001",
  "email": "dev@pulse-engine.com"
}

Use the Token

Pass the id_token as a Bearer token in the Authorization header:

curl -H "Authorization: Bearer <id_token>" \
  https://api.dev.pulse.mananalabs.ai/api/v1/kb/stats

Tokens expire after 1 hour. Call the login endpoint again to get a new one.

API Documentation

Library Usage

Pulse Engine can also be used as a standalone library for content processing:

from pulse_engine.processor.core.topic_splitter import TopicSplitter

splitter = TopicSplitter(provider="openai", api_key="sk-...")
result = splitter.split([
    (1, "Hi there"),
    (2, "Let's discuss Q1 metrics"),
    (3, "Revenue grew 20%"),
])
print(result.topic_shifts)

See docs/pulse_engine_library.md for full documentation on the Topic Splitter, LLM configuration, and configurable embeddings.

OCR Module

Extract text from PDFs and images using Google Gemini's vision capabilities:

from pulse_engine.processor.ocr.gemini import GeminiOCRProvider
from pulse_engine.processor.schemas import OCRInput

# Create provider
provider = GeminiOCRProvider()

# Option 1: Extract from file path
ocr_input = OCRInput(
    file_path="/path/to/document.pdf",
    mime_type="application/pdf",
    prompt="Extract all text and structure as JSON",
    temperature=0.0,
    model="gemini-2.0-flash",
    api_key="your-gemini-api-key",
    max_output_tokens=4096,
)

# Option 2: Extract from bytes
ocr_input = OCRInput(
    file_bytes=open("document.pdf", "rb").read(),
    mime_type="application/pdf",
    prompt="Extract invoice line items as JSON",
    api_key="your-gemini-api-key",
)

# Extract
response = provider.extract(ocr_input)
print(response.text)  # Raw JSON response from Gemini

OCRInput parameters:

  • file_path or file_bytes: PDF/image content (provide one)
  • mime_type: application/pdf, image/png, image/jpeg, image/webp (default: application/pdf)
  • prompt: Instructions for extraction (e.g., "Extract all text as JSON")
  • temperature: LLM randomness, 0.0-1.0 (default: 0.1)
  • model: Gemini model ID (default: gemini-2.5-flash)
  • api_key: Google Gemini API key
  • max_output_tokens: Max response length (default: 65536)

Response: Returns raw Gemini response object with .text property containing the extraction result.

Further Reading

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pulse_engine-0.2.0.dev20260407082537.tar.gz (101.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pulse_engine-0.2.0.dev20260407082537-py3-none-any.whl (138.8 kB view details)

Uploaded Python 3

File details

Details for the file pulse_engine-0.2.0.dev20260407082537.tar.gz.

File metadata

  • Download URL: pulse_engine-0.2.0.dev20260407082537.tar.gz
  • Upload date:
  • Size: 101.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.8.5 CPython/3.12.13 Linux/6.17.0-1008-azure

File hashes

Hashes for pulse_engine-0.2.0.dev20260407082537.tar.gz
Algorithm Hash digest
SHA256 e43e6d3f9e283751801b79a87b047743cd522f553920d3ff38b9e0604d3b14fc
MD5 04f58f8f0d98b1c55795d5c1ada8b4c9
BLAKE2b-256 33701780d255403393ae05b61bc933f1e23f258dbce49084f4e2faaae7b027cf

See more details on using hashes here.

File details

Details for the file pulse_engine-0.2.0.dev20260407082537-py3-none-any.whl.

File metadata

File hashes

Hashes for pulse_engine-0.2.0.dev20260407082537-py3-none-any.whl
Algorithm Hash digest
SHA256 1601d795e956f49043fe7b121240d123a77130cdffbf73fc265e02d4e4a815e3
MD5 60832a998a16f9d91ec88de8bd6aa2e6
BLAKE2b-256 5b80364559bf82a7c51211c06f9d91dfd8e1509bbb6db44aab3b8920554d1bfe

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page