Pulse Engine — Hybrid framework for building Pulse products
Project description
Pulse Engine
Hybrid Python framework for building multi-tenant data products. Products pip install pulse-engine, declare a manifest, and get a full FastAPI app with OpenSearch, Athena, Celery, Prefect, and MCP — out of the box.
How It Works
┌──────────────────────────────────────────────────┐
│ Product (pip install pulse-engine) │
│ manifest = ProductManifest( │
│ extractors=[...], preprocessor=..., ... │
│ ) │
└──────────────┬───────────────────────────────────┘
│
┌──────────────▼───────────────────────────────────┐
│ pulse-engine │
│ Base ABCs · Default implementations · App │
│ factory · Storage connectors · Job lifecycle │
│ · CLI · Testing utilities │
└──────────────┬───────────────────────────────────┘
│
┌──────────────▼───────────────────────────────────┐
│ Shared Infrastructure │
│ Prefect · OpenSearch · Redis · PostgreSQL │
└──────────────────────────────────────────────────┘
Products customize behaviour by:
- Implementing
BaseExtractorsubclasses for data extraction - Overriding pipeline stages (preprocessor, core, postprocessor) or using defaults
- Adding product-specific API routes, MCP tools, and Celery tasks
- Declaring everything in a
ProductManifest
Prerequisites
- Python 3.11-3.12
- Poetry
- Docker & Docker Compose (for shared infrastructure)
Quick Start
For Engine Development
git clone <repo-url> && cd pulse-engine
# Install deps and pre-commit hooks
make install
# Copy env and configure
cp .env.example .env
# Run database migrations
make migrate
# Start the dev server
make run
For Building a New Product
# Install the engine
pip install pulse-engine
# Scaffold a new product
pulse init my-product
cd pulse-my-product
# Set up the product
make install
cp .env.example .env
# Validate and test
make validate
make test
# Run
make run
See docs/building-a-product.md for the full guide, including how to add time-based filters (e.g. "last 2 days", "last 5 years") via the config dict in the trigger payload.
CLI
The pulse command is the primary interface:
| Command | Description |
|---|---|
pulse init <name> |
Scaffold a new product from template |
pulse validate [module] |
Validate a product manifest |
pulse run |
Discover manifest and start FastAPI server |
pulse run-worker |
Discover manifest and start Celery worker |
pulse run-mcp |
Discover manifest and start MCP server |
Product Manifest
Products declare their components via a ProductManifest:
from pulse_engine.registry import ProductManifest
manifest = ProductManifest(
name="my-product",
version="0.1.0"
)
Products register via a pyproject.toml entry point:
[tool.poetry.plugins."pulse_engine.products"]
my_product = "pulse_my_product:manifest"
Project Structure
src/pulse_engine/
├── main.py # App factory (create_app)
├── config.py # Settings (pydantic-settings)
├── registry.py # ProductManifest, validation, discovery
├── worker.py # Celery app factory
├── database.py # SQLAlchemy async setup
├── dependencies.py # FastAPI dependency injection
├── client.py # PulseEngineClient (container → engine HTTP)
├── s3.py # S3Stage (NDJSON inter-stage data exchange)
├── chain_recovery.py # Background task for stalled pipeline recovery
├── cli/
│ ├── main.py # pulse CLI (typer)
│ └── templates/ # Cookiecutter product template
├── api/v1/
│ ├── router.py # v1 router aggregation
│ └── health.py # Health check endpoint
├── core/
│ ├── security.py # JWT verification (Cognito + Job-scoped)
│ ├── job_token.py # Job-scoped JWT issuance & verification
│ ├── scope.py # require_scope() FastAPI dependency
│ ├── exceptions.py # Exception hierarchy
│ ├── error_handlers.py # Global error handlers
│ └── logging.py # Structured logging (structlog)
├── middleware/
│ ├── request_id.py # X-Request-ID middleware
│ ├── security_headers.py # Defensive HTTP security headers (CSP, HSTS, etc.)
│ ├── rate_limit.py # Sliding-window per-IP rate limiter (100 req/60 s)
│ └── tenant.py # Dual-token middleware (Cognito + Job JWT)
├── deployment/
│ ├── models.py # DeploymentModel ORM
│ ├── repository.py # DeploymentRepository
│ ├── service.py # DeploymentService
│ ├── router.py # POST /api/v1/deployments
│ └── schemas.py # Registration request/response
├── extractor/
│ ├── base.py # BaseExtractor ABC
│ ├── models.py # SQLAlchemy ORM: job_records
│ ├── stage_models.py # SQLAlchemy ORM: job_stages
│ ├── repository.py # JobRepository
│ ├── stage_repository.py # StageRepository
│ ├── service.py # JobService (stage-aware)
│ ├── router.py # /api/v1/jobs/ endpoints
│ ├── schemas.py # Pydantic models
│ └── orchestrator/
│ ├── base.py # BaseOrchestratorAdapter ABC
│ ├── prefect.py # PrefectAdapter (deployments + flow runs)
│ └── noop.py # NoopAdapter
├── processor/
│ ├── base.py # BasePreprocessor, BaseCoreProcessor,
│ │ # BasePostprocessor ABCs
│ ├── pipeline.py # Pluggable ProcessingPipeline
│ ├── router.py # /api/v1/process/ endpoints
│ ├── schemas.py # ProcessingContext, options
│ ├── defaults/ # Default stage implementations
│ ├── preprocessor/ # clean_html, normalize, detect_language
│ ├── core/ # chunking, NER, sentiment, topics
│ └── postprocessor/ # embeddings, dedup, quality scoring
├── storage/
│ ├── knowledge_base.py # KnowledgeBaseService
│ ├── router.py # /api/v1/kb/ endpoints
│ ├── schemas.py # Document, SearchQuery, etc.
│ └── connectors/
│ ├── base.py # BaseStorageConnector ABC
│ ├── opensearch.py # OpenSearch connector
│ └── athena.py # Athena connector
├── mcp/
│ ├── server.py # FastMCP instance
│ ├── tools_kb.py # KB tools (6)
│ ├── tools_jobs.py # Jobs tools (6)
│ ├── tools_processor.py # Processor tools (5)
│ ├── tools_pipelines.py # Pipeline tools (5)
│ └── tools_modules.py # Module registry tools (3)
├── services/
│ ├── bootstrap.py # ServiceContainer, bootstrap_services()
│ └── opensearch.py # OpenSearch client wrapper
└── testing/
├── fixtures.py # Reusable pytest fixtures
└── mocks.py # MockStorageConnector, MockExtractor, etc.
infra/
├── docker-compose.yml # Prefect, Redis, OpenSearch, PostgreSQL
└── terraform/ # AWS modules (networking, ECS, ECR, ALB)
tests/
├── unit/ # Unit tests
│ ├── framework/ # Manifest, pipeline, base class tests
│ ├── processor/
│ ├── storage/
│ ├── deployment/ # Deployment registration tests
│ └── extractor/
└── integration/ # Integration tests
├── api/
├── mcp/
└── pipelines/
Code Quality
Pre-commit hooks run on every commit:
| Hook | What it checks |
|---|---|
trailing-whitespace |
No trailing whitespace |
end-of-file-fixer |
Files end with a newline |
check-yaml |
Valid YAML syntax |
ruff |
Linting (auto-fix enabled) |
ruff-format |
Code formatting |
mypy |
Strict static type-checking |
make lint # run all hooks manually
CI/CD
PR Checks (pr-checks.yml)
Runs on every pull request to dev, uat, prod:
- lint — ruff check + format
- typecheck — mypy strict
- test — unit tests with coverage
- trivy — vulnerability scan
Deploy (deploy.yml)
Runs on push to dev, uat, prod:
| Branch | PyPI Target | Infrastructure |
|---|---|---|
dev |
PyPI (.dev suffix) |
VM via docker-compose |
staging |
PyPI (.dev suffix) |
VM via docker-compose |
prod |
PyPI (stable) | ECS cluster |
The pipeline: test → publish to PyPI → build Docker → push ECR → deploy → health check.
Terraform runs separately on infra/terraform/** changes: plan → apply → sync outputs to GitHub Secrets (pipeline infra vars auto-flow to .env on next deploy).
Required Secrets (per GitHub environment)
| Secret | Description |
|---|---|
AWS_ROLE_ARN |
OIDC role for GitHub → AWS auth |
ECR_REPOSITORY_URL |
ECR repository URL |
PYPI_TOKEN |
PyPI API token |
MCP Server
Exposes 25 tools for AI agents via the Model Context Protocol:
| Category | Tools |
|---|---|
| Jobs (6) | jobs_register, jobs_get, jobs_list, jobs_push_status, jobs_cancel, jobs_delete |
| Knowledge Base (6) | kb_store_documents, kb_retrieve_document, kb_search, kb_delete_document, kb_get_stats, kb_run_query |
| Processor (5) | process_pipeline, process_preprocess, process_analyze, process_postprocess, process_chunk |
| Pipelines (5) | pipelines_trigger, pipelines_status, pipelines_list, pipelines_cancel, pipelines_steps |
| Modules (3) | modules_register, modules_list, modules_delete |
pulse run-mcp
Products register additional tools via mcp_tool_modules in the manifest.
Environment Variables
Core
| Variable | Required | Default | Description |
|---|---|---|---|
APP_ENV |
No | development |
Environment name (development, production, etc.) |
APP_VERSION |
No | 0.1.0 |
Application version |
LOG_LEVEL |
No | INFO |
Logging level |
AWS_REGION |
Yes | ap-south-1 |
AWS region |
AWS_ACCESS_KEY_ID |
No | — | AWS credentials (use IAM role in production) |
AWS_SECRET_ACCESS_KEY |
No | — | AWS credentials (use IAM role in production) |
Authentication (Cognito)
| Variable | Required | Default | Description |
|---|---|---|---|
COGNITO_USER_POOL_ID |
Yes | — | Cognito User Pool ID |
COGNITO_APP_CLIENT_ID |
Yes | — | Cognito App Client ID |
COGNITO_APP_CLIENT_SECRET |
No | — | Client secret (required if app client has one) |
OpenSearch
| Variable | Required | Default | Description |
|---|---|---|---|
OPENSEARCH_URL |
Yes | — | OpenSearch endpoint |
OPENSEARCH_USERNAME |
No | — | Basic-auth username (AWS managed domains) |
OPENSEARCH_PASSWORD |
No | — | Basic-auth password |
OPENSEARCH_USE_SSL |
No | true |
Enable TLS |
OPENSEARCH_VERIFY_CERTS |
No | true |
Verify TLS certificates |
OPENSEARCH_INDEX_PREFIX |
No | pulse_kb |
Index name prefix per tenant |
EMBEDDING_DIMENSION |
No | 1536 |
Vector dimension for kNN indexes |
Database & Cache
| Variable | Required | Default | Description |
|---|---|---|---|
DATABASE_URL |
Yes | — | Async PostgreSQL DSN (postgresql+asyncpg://...) |
REDIS_URL |
No | redis://localhost:6379/0 |
Redis URL (enables Celery) |
CELERY_BROKER_URL |
No | — | Celery broker (defaults to REDIS_URL) |
CELERY_RESULT_BACKEND |
No | — | Celery result backend (defaults to REDIS_URL) |
Athena
| Variable | Required | Default | Description |
|---|---|---|---|
ATHENA_AWS_ACCESS_KEY_ID |
No | — | Athena-specific AWS credentials |
ATHENA_AWS_SECRET_ACCESS_KEY |
No | — | Athena-specific AWS credentials |
ATHENA_OUTPUT_LOCATION |
Yes* | — | S3 URI for Athena query results |
ATHENA_WORKGROUP |
No | primary |
Athena workgroup |
ATHENA_QUERY_TIMEOUT_SECONDS |
No | 60 |
Athena query timeout |
Orchestrator (Prefect)
| Variable | Required | Default | Description |
|---|---|---|---|
PULSE_ORCHESTRATOR_BACKEND |
No | none |
prefect or none |
PREFECT_API_URL |
No | — | Prefect API endpoint |
PREFECT_API_KEY |
No | — | Prefect Cloud API key |
PREFECT_ECS_WORK_POOL_NAME |
No | products-worker-pool |
ECS work pool name |
PREFECT_LAMBDA_WORK_POOL_NAME |
No | lambda-worker-pool |
Lambda work pool name |
PREFECT_LAMBDA_FUNCTION_NAME_TEMPLATE |
No | {product}-{stage} |
Lambda function name pattern |
PREFECT_K8S_WORK_POOL_NAME |
No | k8s-worker-pool |
Kubernetes work pool name |
PREFECT_K8S_NAMESPACE |
No | pulse-jobs |
Kubernetes namespace |
PREFECT_K8S_DEFAULT_CPU |
No | 500m |
Default CPU request |
PREFECT_K8S_DEFAULT_MEMORY |
No | 1Gi |
Default memory request |
LLM & Embeddings
| Variable | Required | Default | Description |
|---|---|---|---|
PULSE_LLM_PROVIDER |
No | openai |
LLM provider |
PULSE_LLM_MODEL |
No | gpt-4o-mini |
LLM model ID |
PULSE_LLM_API_KEY |
No | — | LLM API key (also used as embedding fallback) |
PULSE_LLM_TEMPERATURE |
No | 0.0 |
LLM sampling temperature |
PULSE_EMBEDDING_PROVIDER |
No | openai |
Embedding provider |
PULSE_OPENAI_EMBEDDING_MODEL |
No | text-embedding-3-small |
OpenAI embedding model |
PULSE_OPENAI_API_KEY |
No | — | OpenAI API key (overrides PULSE_LLM_API_KEY) |
Pipeline & Jobs
| Variable | Required | Default | Description |
|---|---|---|---|
PULSE_ENGINE_URL |
No | — | Public URL containers use for callbacks |
PULSE_JOB_TOKEN_SECRET |
No | — | HMAC secret for job-scoped JWTs |
PULSE_S3_BUCKET |
No | — | S3 bucket for inter-stage NDJSON data |
PULSE_CHAIN_GRACE_PERIOD_SECONDS |
No | 300 |
Seconds before chain recovery auto-triggers |
PULSE_MAX_CONCURRENT_JOBS_PER_TENANT |
No | 10 |
Max concurrent jobs per tenant |
PULSE_DEFAULT_CHUNK_SIZE |
No | 512 |
Default chunk token size |
PULSE_DEFAULT_CHUNK_STRATEGY |
No | token_count |
Default chunking strategy |
PULSE_DEDUP_SIMILARITY_THRESHOLD |
No | 0.95 |
Cosine similarity dedup threshold |
Pipeline Infrastructure (from Terraform)
These are auto-synced from Terraform outputs to GitHub Secrets, then written to .env at deploy time:
| Variable | Description |
|---|---|
PIPELINE_TASK_DEFINITION |
ECS task definition family for pipeline steps |
PIPELINE_CLUSTER_NAME |
ECS cluster for pipeline step tasks |
PIPELINE_EXECUTION_ROLE_ARN |
ECS task execution role (ECR pull, logs, secrets) |
PIPELINE_TASK_ROLE_ARN |
ECS task role (S3, Lambda invoke, ECS dispatch) |
PIPELINE_LOG_GROUP |
CloudWatch log group for ECS pipeline steps |
PIPELINE_SUBNETS |
Comma-separated private subnet IDs |
PIPELINE_SECURITY_GROUPS |
Comma-separated security group IDs |
LAMBDA_EXECUTION_ROLE_ARN |
Lambda execution role for pipeline functions |
LAMBDA_SUBNETS |
Comma-separated subnet IDs for Lambda VPC config |
LAMBDA_SECURITY_GROUPS |
Comma-separated security group IDs for Lambda |
LAMBDA_LOG_GROUP |
CloudWatch log group for Lambda pipeline steps |
MCP Server
| Variable | Required | Default | Description |
|---|---|---|---|
MCP_TRANSPORT |
No | sse |
Transport mode: sse or stdio |
MCP_SSE_HOST |
No | 127.0.0.1 |
MCP SSE server host |
MCP_SSE_PORT |
No | 8001 |
MCP SSE server port |
API Authentication
All endpoints (except /api/v1/health and /api/v1/auth/login) require a JWT token.
Rate limits — enforced per IP address:
- Login (
POST /api/v1/auth/login): 5 attempts per 60 seconds. Returns429withRetry-After: 60on breach.- Global: 100 requests per 60 seconds across all endpoints. Responses include
X-RateLimit-LimitandX-RateLimit-Remainingheaders.Note — Swagger UI (
/docs) and ReDoc (/redoc) are disabled in production (APP_ENV=production).
Get a Token
curl -X POST https://api.dev.pulse.mananalabs.ai/api/v1/auth/login \
-H "Content-Type: application/json" \
-d '{"email": "$PULSE_AUTH_EMAIL", "password": "$PULSE_AUTH_PASSWORD"}'
Response:
{
"id_token": "eyJ...",
"access_token": "eyJ...",
"refresh_token": "eyJ...",
"expires_in": 3600,
"token_type": "Bearer",
"tenant_id": "tenant-dev-001",
"email": "dev@pulse-engine.com"
}
Use the Token
Pass the id_token as a Bearer token in the Authorization header:
curl -H "Authorization: Bearer <id_token>" \
https://api.dev.pulse.mananalabs.ai/api/v1/kb/stats
Tokens expire after 1 hour. Call the login endpoint again to get a new one.
API Documentation
- Swagger UI: https://api.dev.pulse.mananalabs.ai/docs
- ReDoc: https://api.dev.pulse.mananalabs.ai/redoc
Library Usage
Pulse Engine can also be used as a standalone library for content processing:
from pulse_engine.processor.core.topic_splitter import TopicSplitter
splitter = TopicSplitter(provider="openai", api_key="sk-...")
result = splitter.split([
(1, "Hi there"),
(2, "Let's discuss Q1 metrics"),
(3, "Revenue grew 20%"),
])
print(result.topic_shifts)
See docs/pulse_engine_library.md for full documentation on the Topic Splitter, LLM configuration, and configurable embeddings.
Further Reading
- Building a Product — step-by-step guide to creating a new product
- Design Decisions — architectural decisions and rationale
- Infrastructure — AWS deployment architecture
- Library Usage — topic splitting, LLM config, and embeddings
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file pulse_engine-0.2.0.dev20260418180952.tar.gz.
File metadata
- Download URL: pulse_engine-0.2.0.dev20260418180952.tar.gz
- Upload date:
- Size: 107.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.8.5 CPython/3.12.13 Linux/6.17.0-1010-azure
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
30a2dd659cb1d8614b8620da89c64555209f4b0f76ab57952f264fec97e0d8f0
|
|
| MD5 |
abf73c235cc5cefa8338ca8faf70b601
|
|
| BLAKE2b-256 |
72eab507cf9b8c9151896bcb59c352d015580c1c8377c7a538579d54c0624e77
|
File details
Details for the file pulse_engine-0.2.0.dev20260418180952-py3-none-any.whl.
File metadata
- Download URL: pulse_engine-0.2.0.dev20260418180952-py3-none-any.whl
- Upload date:
- Size: 146.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.8.5 CPython/3.12.13 Linux/6.17.0-1010-azure
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
b7476b770d6c0abebca419551d89c91dab8bdf8536eddf456797fd003eef772b
|
|
| MD5 |
3de19adcf6c5098a461ca4af055529c8
|
|
| BLAKE2b-256 |
ea13af75e46cc00709677c2b267162da1ea3d67e054e28519e59a3e025a4f78f
|