Skip to main content

Pulse Engine — Hybrid framework for building Pulse products

Project description

Pulse Engine

Hybrid Python framework for building multi-tenant data products. Products pip install pulse-core-engine, declare a manifest, and get a full FastAPI app with OpenSearch, Athena, Celery, Prefect, and MCP — out of the box.

How It Works

┌──────────────────────────────────────────────────┐
│  Product (pip install pulse-core-engine)           │
│  manifest = ProductManifest(                     │
│    extractors=[...], preprocessor=..., ...       │
│  )                                               │
└──────────────┬───────────────────────────────────┘
               │
┌──────────────▼───────────────────────────────────┐
│  pulse-core-engine                               │
│  Base ABCs · Default implementations · App       │
│  factory · Storage connectors · Job lifecycle    │
│  · CLI · Testing utilities                       │
└──────────────┬───────────────────────────────────┘
               │
┌──────────────▼───────────────────────────────────┐
│  Shared Infrastructure                           │
│  Prefect · OpenSearch · Redis · PostgreSQL        │
└──────────────────────────────────────────────────┘

Products customize behaviour by:

  • Implementing BaseExtractor subclasses for data extraction
  • Overriding pipeline stages (preprocessor, core, postprocessor) or using defaults
  • Adding product-specific API routes, MCP tools, and Celery tasks
  • Declaring everything in a ProductManifest

Prerequisites

  • Python 3.11-3.12
  • Poetry
  • Docker & Docker Compose (for shared infrastructure)

Quick Start

For Engine Development

git clone <repo-url> && cd pulse-core-engine

# Install deps and pre-commit hooks
make install

# Copy env and configure
cp .env.example .env

# Run database migrations
make migrate

# Start the dev server
make run

For Building a New Product

# Install the engine
pip install pulse-core-engine

# Scaffold a new product
pulse init my-product
cd pulse-my-product

# Set up the product
make install
cp .env.example .env

# Validate and test
make validate
make test

# Run
make run

See docs/building-a-product.md for the full guide.

CLI

The pulse command is the primary interface:

Command Description
pulse init <name> Scaffold a new product from template
pulse validate [module] Validate a product manifest
pulse run Discover manifest and start FastAPI server
pulse run-worker Discover manifest and start Celery worker
pulse run-mcp Discover manifest and start MCP server

Product Manifest

Products declare their components via a ProductManifest:

from pulse_engine.registry import ProductManifest

manifest = ProductManifest(
    name="my-product",
    version="0.1.0",
    extractors=[MyExtractor],          # data extraction classes
    preprocessor=...,                   # ... = default, None = skip
    core_processor=...,                 # custom instance = override
    postprocessor=None,                 # skip postprocessing
    routers=[my_router],               # FastAPI routers
    mcp_tool_modules=["my_pkg.mcp"],   # MCP tool modules
    celery_task_modules=["my_pkg.tasks"],
    athena_database="my_db",
)

Products register via a pyproject.toml entry point:

[tool.poetry.plugins."pulse_engine.products"]
my_product = "pulse_my_product:manifest"

Project Structure

src/pulse_engine/
├── main.py                  # App factory (create_app)
├── config.py                # Settings (pydantic-settings)
├── registry.py              # ProductManifest, validation, discovery
├── worker.py                # Celery app factory
├── database.py              # SQLAlchemy async setup
├── dependencies.py          # FastAPI dependency injection
├── client.py                # PulseEngineClient (container → engine HTTP)
├── s3.py                    # S3Stage (NDJSON inter-stage data exchange)
├── chain_recovery.py        # Background task for stalled pipeline recovery
├── cli/
│   ├── main.py              # pulse CLI (typer)
│   └── templates/           # Cookiecutter product template
├── api/v1/
│   ├── router.py            # v1 router aggregation
│   └── health.py            # Health check endpoint
├── core/
│   ├── security.py          # JWT verification (Cognito + Job-scoped)
│   ├── job_token.py         # Job-scoped JWT issuance & verification
│   ├── scope.py             # require_scope() FastAPI dependency
│   ├── exceptions.py        # Exception hierarchy
│   ├── error_handlers.py    # Global error handlers
│   └── logging.py           # Structured logging (structlog)
├── middleware/
│   ├── request_id.py        # X-Request-ID middleware
│   ├── security_headers.py  # Defensive HTTP security headers (CSP, HSTS, etc.)
│   ├── rate_limit.py        # Sliding-window per-IP rate limiter (100 req/60 s)
│   └── tenant.py            # Dual-token middleware (Cognito + Job JWT)
├── deployment/
│   ├── models.py            # DeploymentModel ORM
│   ├── repository.py        # DeploymentRepository
│   ├── service.py           # DeploymentService
│   ├── router.py            # POST /api/v1/deployments
│   └── schemas.py           # Registration request/response
├── extractor/
│   ├── base.py              # BaseExtractor ABC
│   ├── models.py            # SQLAlchemy ORM: job_records
│   ├── stage_models.py      # SQLAlchemy ORM: job_stages
│   ├── repository.py        # JobRepository
│   ├── stage_repository.py  # StageRepository
│   ├── service.py           # JobService (stage-aware)
│   ├── router.py            # /api/v1/jobs/ endpoints
│   ├── schemas.py           # Pydantic models
│   └── orchestrator/
│       ├── base.py          # BaseOrchestratorAdapter ABC
│       ├── prefect.py       # PrefectAdapter (deployments + flow runs)
│       └── noop.py          # NoopAdapter
├── processor/
│   ├── base.py              # BasePreprocessor, BaseCoreProcessor,
│   │                        # BasePostprocessor ABCs
│   ├── pipeline.py          # Pluggable ProcessingPipeline
│   ├── router.py            # /api/v1/process/ endpoints
│   ├── schemas.py           # ProcessingContext, options
│   ├── defaults/            # Default stage implementations
│   ├── preprocessor/        # clean_html, normalize, detect_language
│   ├── core/                # chunking, NER, sentiment, topics
│   └── postprocessor/       # embeddings, dedup, quality scoring
├── storage/
│   ├── knowledge_base.py    # KnowledgeBaseService
│   ├── router.py            # /api/v1/kb/ endpoints
│   ├── schemas.py           # Document, SearchQuery, etc.
│   └── connectors/
│       ├── base.py          # BaseStorageConnector ABC
│       ├── opensearch.py    # OpenSearch connector
│       └── athena.py        # Athena connector
├── mcp/
│   ├── server.py            # FastMCP instance
│   ├── tools_kb.py          # KB tools
│   ├── tools_jobs.py        # Jobs tools
│   └── tools_processor.py   # Processor tools
├── services/
│   ├── bootstrap.py         # ServiceContainer, bootstrap_services()
│   └── opensearch.py        # OpenSearch client wrapper
└── testing/
    ├── fixtures.py          # Reusable pytest fixtures
    └── mocks.py             # MockStorageConnector, MockExtractor, etc.

infra/
├── docker-compose.yml       # Prefect, Redis, OpenSearch, PostgreSQL
└── terraform/               # AWS modules (networking, ECS, ECR, ALB)

tests/
├── unit/                    # Unit tests
│   ├── framework/           # Manifest, pipeline, base class tests
│   ├── processor/
│   ├── storage/
│   ├── deployment/          # Deployment registration tests
│   └── extractor/
└── integration/             # Integration tests
    ├── api/
    ├── mcp/
    └── pipelines/

Base Classes

Products extend these ABCs:

ABC Module Purpose
BaseExtractor pulse_engine.extractor.base Data extraction from external sources
BasePreprocessor pulse_engine.processor.base Content cleaning and normalization
BaseCoreProcessor pulse_engine.processor.base Chunking, NER, sentiment, topics
BasePostprocessor pulse_engine.processor.base Embeddings, dedup, storage formatting
BaseStorageConnector pulse_engine.storage.connectors.base Custom storage backends
BaseOrchestratorAdapter pulse_engine.extractor.orchestrator.base Custom orchestrator integrations

Testing

make test             # full suite with coverage
make test-unit        # unit tests only
make test-integration # integration tests only

Products import engine test fixtures in their conftest.py:

from pulse_engine.testing.fixtures import *  # noqa: F401, F403

Available fixtures: mock_storage_connector, mock_orchestrator, mock_extractor, kb_service, processing_pipeline.

Code Quality

Pre-commit hooks run on every commit:

Hook What it checks
trailing-whitespace No trailing whitespace
end-of-file-fixer Files end with a newline
check-yaml Valid YAML syntax
ruff Linting (auto-fix enabled)
ruff-format Code formatting
mypy Strict static type-checking
make lint    # run all hooks manually

CI/CD

PR Checks (pr-checks.yml)

Runs on every pull request to dev, uat, prod:

  • lint — ruff check + format
  • typecheck — mypy strict
  • test — unit tests with coverage
  • trivy — vulnerability scan

Deploy (deploy.yml)

Runs on push to dev, uat, prod:

Branch PyPI Target Infrastructure
dev TestPyPI dev ECS cluster
uat TestPyPI uat ECS cluster
prod PyPI prod ECS cluster

The pipeline: test → publish to PyPI/TestPyPI → build Docker → push ECR → deploy ECS → wait for stability.

Required Secrets (per GitHub environment)

Secret Description
AWS_ROLE_ARN OIDC role for GitHub → AWS auth
ECR_REPOSITORY_URL ECR repository URL
PYPI_TOKEN PyPI API token (prod only)
TEST_PYPI_TOKEN TestPyPI API token (dev/uat)

MCP Server

Exposes KB, Jobs, and Processor as MCP tools for AI agents:

pulse run-mcp

Products register additional tools via mcp_tool_modules in the manifest.

Environment Variables

Core

Variable Required Default Description
APP_ENV No development Environment name (development, production, etc.)
APP_VERSION No 0.1.0 Application version
LOG_LEVEL No INFO Logging level
AWS_REGION Yes ap-south-1 AWS region
AWS_ACCESS_KEY_ID No AWS credentials (use IAM role in production)
AWS_SECRET_ACCESS_KEY No AWS credentials (use IAM role in production)

Authentication (Cognito)

Variable Required Default Description
COGNITO_USER_POOL_ID Yes Cognito User Pool ID
COGNITO_APP_CLIENT_ID Yes Cognito App Client ID
COGNITO_APP_CLIENT_SECRET No Client secret (required if app client has one)

OpenSearch

Variable Required Default Description
OPENSEARCH_URL Yes OpenSearch endpoint
OPENSEARCH_USERNAME No Basic-auth username (AWS managed domains)
OPENSEARCH_PASSWORD No Basic-auth password
OPENSEARCH_USE_SSL No true Enable TLS
OPENSEARCH_VERIFY_CERTS No true Verify TLS certificates
OPENSEARCH_INDEX_PREFIX No pulse_kb Index name prefix per tenant
EMBEDDING_DIMENSION No 1536 Vector dimension for kNN indexes

Database & Cache

Variable Required Default Description
DATABASE_URL Yes Async PostgreSQL DSN (postgresql+asyncpg://...)
REDIS_URL No redis://localhost:6379/0 Redis URL (enables Celery)
CELERY_BROKER_URL No Celery broker (defaults to REDIS_URL)
CELERY_RESULT_BACKEND No Celery result backend (defaults to REDIS_URL)

Athena

Variable Required Default Description
ATHENA_AWS_ACCESS_KEY_ID No Athena-specific AWS credentials
ATHENA_AWS_SECRET_ACCESS_KEY No Athena-specific AWS credentials
ATHENA_OUTPUT_LOCATION Yes* S3 URI for Athena query results
ATHENA_WORKGROUP No primary Athena workgroup
ATHENA_QUERY_TIMEOUT_SECONDS No 60 Athena query timeout

Orchestrator (Prefect)

Variable Required Default Description
PULSE_ORCHESTRATOR_BACKEND No none prefect or none
PREFECT_API_URL No Prefect API endpoint
PREFECT_API_KEY No Prefect Cloud API key
PREFECT_ECS_WORK_POOL_NAME No products-worker-pool ECS work pool name
PREFECT_LAMBDA_WORK_POOL_NAME No lambda-worker-pool Lambda work pool name
PREFECT_LAMBDA_FUNCTION_NAME_TEMPLATE No {product}-{stage} Lambda function name pattern
PREFECT_K8S_WORK_POOL_NAME No k8s-worker-pool Kubernetes work pool name
PREFECT_K8S_NAMESPACE No pulse-jobs Kubernetes namespace
PREFECT_K8S_DEFAULT_CPU No 500m Default CPU request
PREFECT_K8S_DEFAULT_MEMORY No 1Gi Default memory request

LLM & Embeddings

Variable Required Default Description
PULSE_LLM_PROVIDER No openai LLM provider
PULSE_LLM_MODEL No gpt-4o-mini LLM model ID
PULSE_LLM_API_KEY No LLM API key (also used as embedding fallback)
PULSE_LLM_TEMPERATURE No 0.0 LLM sampling temperature
PULSE_EMBEDDING_PROVIDER No openai Embedding provider
PULSE_OPENAI_EMBEDDING_MODEL No text-embedding-3-small OpenAI embedding model
PULSE_OPENAI_API_KEY No OpenAI API key (overrides PULSE_LLM_API_KEY)

Pipeline & Jobs

Variable Required Default Description
PULSE_ENGINE_URL No Public URL containers use for callbacks
PULSE_JOB_TOKEN_SECRET No HMAC secret for job-scoped JWTs
PULSE_S3_BUCKET No S3 bucket for inter-stage NDJSON data
PULSE_CHAIN_GRACE_PERIOD_SECONDS No 300 Seconds before chain recovery auto-triggers
PULSE_MAX_CONCURRENT_JOBS_PER_TENANT No 10 Max concurrent jobs per tenant
PULSE_DEFAULT_CHUNK_SIZE No 512 Default chunk token size
PULSE_DEFAULT_CHUNK_STRATEGY No token_count Default chunking strategy
PULSE_DEDUP_SIMILARITY_THRESHOLD No 0.95 Cosine similarity dedup threshold

MCP Server

Variable Required Default Description
MCP_TRANSPORT No sse Transport mode: sse or stdio
MCP_SSE_HOST No 127.0.0.1 MCP SSE server host
MCP_SSE_PORT No 8001 MCP SSE server port

API Authentication

All endpoints (except /api/v1/health and /api/v1/auth/login) require a JWT token.

Rate limits — enforced per IP address:

  • Login (POST /api/v1/auth/login): 5 attempts per 60 seconds. Returns 429 with Retry-After: 60 on breach.
  • Global: 100 requests per 60 seconds across all endpoints. Responses include X-RateLimit-Limit and X-RateLimit-Remaining headers.

Note — Swagger UI (/docs) and ReDoc (/redoc) are disabled in production (APP_ENV=production).

Get a Token

curl -X POST https://api.dev.pulse.mananalabs.ai/api/v1/auth/login \
  -H "Content-Type: application/json" \
  -d '{"email": "dev@pulse-engine.com", "password": "PulseDev@2026"}'

Response:

{
  "id_token": "eyJ...",
  "access_token": "eyJ...",
  "refresh_token": "eyJ...",
  "expires_in": 3600,
  "token_type": "Bearer",
  "tenant_id": "tenant-dev-001",
  "email": "dev@pulse-engine.com"
}

Use the Token

Pass the id_token as a Bearer token in the Authorization header:

curl -H "Authorization: Bearer <id_token>" \
  https://api.dev.pulse.mananalabs.ai/api/v1/kb/stats

Tokens expire after 1 hour. Call the login endpoint again to get a new one.

API Documentation

Library Usage

Pulse Engine can also be used as a standalone library for content processing:

from pulse_engine.processor.core.topic_splitter import TopicSplitter

splitter = TopicSplitter(provider="openai", api_key="sk-...")
result = splitter.split([
    (1, "Hi there"),
    (2, "Let's discuss Q1 metrics"),
    (3, "Revenue grew 20%"),
])
print(result.topic_shifts)

See docs/pulse_engine_library.md for full documentation on the Topic Splitter, LLM configuration, and configurable embeddings.

OCR Module

Extract text from PDFs and images using Google Gemini's vision capabilities:

from pulse_engine.processor.ocr.gemini import GeminiOCRProvider
from pulse_engine.processor.schemas import OCRInput

# Create provider
provider = GeminiOCRProvider()

# Option 1: Extract from file path
ocr_input = OCRInput(
    file_path="/path/to/document.pdf",
    mime_type="application/pdf",
    prompt="Extract all text and structure as JSON",
    temperature=0.0,
    model="gemini-2.0-flash",
    api_key="your-gemini-api-key",
    max_output_tokens=4096,
)

# Option 2: Extract from bytes
ocr_input = OCRInput(
    file_bytes=open("document.pdf", "rb").read(),
    mime_type="application/pdf",
    prompt="Extract invoice line items as JSON",
    api_key="your-gemini-api-key",
)

# Extract
response = provider.extract(ocr_input)
print(response.text)  # Raw JSON response from Gemini

OCRInput parameters:

  • file_path or file_bytes: PDF/image content (provide one)
  • mime_type: application/pdf, image/png, image/jpeg, image/webp (default: application/pdf)
  • prompt: Instructions for extraction (e.g., "Extract all text as JSON")
  • temperature: LLM randomness, 0.0-1.0 (default: 0.1)
  • model: Gemini model ID (default: gemini-2.5-flash)
  • api_key: Google Gemini API key
  • max_output_tokens: Max response length (default: 65536)

Response: Returns raw Gemini response object with .text property containing the extraction result.

Further Reading

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pulse_engine-0.2.0.dev20260407065251.tar.gz (98.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pulse_engine-0.2.0.dev20260407065251-py3-none-any.whl (135.2 kB view details)

Uploaded Python 3

File details

Details for the file pulse_engine-0.2.0.dev20260407065251.tar.gz.

File metadata

  • Download URL: pulse_engine-0.2.0.dev20260407065251.tar.gz
  • Upload date:
  • Size: 98.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.8.5 CPython/3.12.13 Linux/6.17.0-1008-azure

File hashes

Hashes for pulse_engine-0.2.0.dev20260407065251.tar.gz
Algorithm Hash digest
SHA256 6384c7532909675df6d9a635b98a8242629b2206248457b85ff0ca79f4f71f95
MD5 baaa0d6b004ae4e3ae739431b79a6e2f
BLAKE2b-256 88002505e228643fad57631e7a34786ff9889681b73f0558025ebd2beb0f4a3e

See more details on using hashes here.

File details

Details for the file pulse_engine-0.2.0.dev20260407065251-py3-none-any.whl.

File metadata

File hashes

Hashes for pulse_engine-0.2.0.dev20260407065251-py3-none-any.whl
Algorithm Hash digest
SHA256 fe166747d3788555bd0cbe7dfe47dfa5c42fb028494b98623f4ee9ef583ca25a
MD5 66f2c4ee0624b180f6047696d9ffe783
BLAKE2b-256 f757a6841c5d60e6be6d29e912fba146ae9f7533924786b2bc70c9dd542ede36

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page