Intercept and mock AWS, Azure, GCP, Kafka, RabbitMQ, MongoDB, Redis and HTTP calls locally — no application changes required
Project description
MockMesh
v0.1.0 Beta | BSL-1.1 Licensed
Run your application locally without touching external services -- no
if ENV == localrequired.
MockMesh intercepts outbound calls to AWS, Azure, GCP, Kafka, RabbitMQ, MongoDB, Redis, SQL databases, and any HTTP endpoint, returning configurable mock responses automatically. Your application code stays completely clean.
The Problem
# Without MockMesh -- conditional logic pollutes production code
import boto3, os
def get_secret(name):
if os.getenv("ENV") == "local":
return {"username": "dev", "password": "dev"} # hardcoded
client = boto3.client("secretsmanager")
return json.loads(client.get_secret_value(SecretId=name)["SecretString"])
The Solution
# With MockMesh -- one line, then your code runs identically everywhere
import mockmesh
mockmesh.initialize()
import boto3, json
def get_secret(name): # no conditional -- works locally AND in production
client = boto3.client("secretsmanager")
return json.loads(client.get_secret_value(SecretId=name)["SecretString"])
Installation
pip install mockmesh
# Install only the providers you need:
pip install "mockmesh[aws]"
pip install "mockmesh[azure]"
pip install "mockmesh[aws,azure,kafka,rabbitmq,sql,mongodb,redis]"
Quick Start
import mockmesh
mockmesh.initialize()
import boto3, requests
s3 = boto3.client("s3", region_name="us-east-1")
s3.put_object(Bucket="my-bucket", Key="hello.txt", Body=b"Hello World")
obj = s3.get_object(Bucket="my-bucket", Key="hello.txt")
print(obj["Body"].read()) # b"Hello World"
r = requests.get("https://api.example.com/data")
print(r.json()) # {"mocked": True}
mockmesh.shutdown()
No Docker. No credentials. No configuration files. Writes persist in memory -- PutObject followed by GetObject returns what you put.
API Reference
mockmesh.initialize(**kwargs) -> MockMeshEngine
Activates all interceptors. Call this before importing service SDKs. Safe to call multiple times (returns the existing engine on subsequent calls).
| Parameter | Type | Default | Description |
|---|---|---|---|
config_path |
str | Path |
None |
Path to a JSON file with custom HTTP/AWS/Kafka/RabbitMQ rules |
responses_path |
str | Path |
None |
Folder containing provider override files (aws.json, azure.json, etc.) |
storage_path |
str | Path |
./.mockmesh |
Where MockMesh stores persisted data (blobs, NoSQL docs, SQL tables) |
fallback_mode |
str |
"mock" |
What happens when an interceptor fails: "mock", "passthrough", or "error" |
on_intercept_error |
callable |
None |
Callback (provider, operation, exception) -> optional_response called on failure |
console_log |
bool |
False |
Print log output to stdout (by default, logs only go to .mockmesh/logs/mockmesh.log) |
json_console |
bool |
True |
If console_log is True, use JSON format (False = human-readable) |
log_level |
int |
DEBUG |
Python logging level for MockMesh's internal logger |
plugins |
list |
None |
Explicit list of plugin instances (bypasses auto-detection) |
engine = mockmesh.initialize(
config_path="test_overrides.json",
responses_path="fixtures/",
storage_path="/tmp/mockmesh-test",
fallback_mode="error",
console_log=True,
)
mockmesh.shutdown()
Stops all interceptors and flushes storage. After this, SDK calls hit real services again.
mockmesh.engine(**kwargs) -> MockMeshEngine
Same parameters as initialize(), but returns a context manager instead of a singleton. Use this for scoped test runs:
with mockmesh.engine(fallback_mode="error") as mm:
run_my_tests()
# interceptors are automatically cleaned up here
mockmesh.register_handler(provider, operation, handler_fn)
Register a custom Python function for any provider + operation pair. The custom handler runs instead of the built-in interceptor logic.
mockmesh.initialize()
mockmesh.register_handler("aws", "s3.GetObject", lambda **kw: {
"Body": b"custom content",
"ContentType": "text/plain",
})
mockmesh.register_fallback(provider, fallback_fn)
Register a per-provider fallback function called when the interceptor fails:
mockmesh.register_fallback("aws", lambda prov, op, exc: {
"error": str(exc), "provider": prov
})
MockMeshEngine Properties
The engine returned by initialize() or engine() exposes:
| Property | Type | Description |
|---|---|---|
engine.storage |
StorageManager |
Direct access to the storage layer for pre-seeding data |
engine.active_providers |
list[str] |
Which providers were auto-detected and activated (e.g. ["http", "aws", "azure"]) |
engine.fallback_config |
FallbackConfig |
Access to modify fallback behavior after initialization |
engine.response_engine |
ResponseEngine |
The response resolution engine (advanced usage) |
Pre-seeding data via engine.storage
engine = mockmesh.initialize()
# S3
engine.storage.s3_put("my-bucket", "config.json", b'{"key": "value"}', "application/json")
# DynamoDB
engine.storage.dynamo_put("Users", {"id": {"S": "u-1"}, "name": {"S": "Alice"}})
# SQS
engine.storage.sqs_enqueue("MyQueue", "msg-1", '{"event": "order.created"}')
# Cosmos DB
engine.storage.cosmos_upsert("profiles", {"id": "u-1", "name": "Alice"})
# MongoDB
engine.storage.mongo_insert("mydb", "users", "u-1", {"name": "Alice", "age": 30})
# Redis
engine.storage.redis_set(0, "session:u-1", {"token": "abc123"})
# SSM Parameter Store
engine.storage.ssm_put("/app/db-host", "localhost")
# Secrets Manager
engine.storage.secret_put("prod/db-creds", '{"user":"admin","pass":"secret"}')
Fallback Modes
Control what happens when MockMesh encounters an operation it can't handle:
# "mock" (default) -- return a safe generic response, never block
mockmesh.initialize(fallback_mode="mock")
# "error" -- raise InterceptError immediately (recommended for CI)
mockmesh.initialize(fallback_mode="error")
# "passthrough" -- call the real service (useful for hybrid staging)
mockmesh.initialize(fallback_mode="passthrough")
Per-provider overrides
engine = mockmesh.initialize(fallback_mode="mock")
# Let HTTP calls hit real APIs, mock everything else
engine.fallback_config.provider_overrides["http"] = "passthrough"
# Fail loudly if an unknown AWS operation is hit
engine.fallback_config.provider_overrides["aws"] = "error"
Error callback
mockmesh.initialize(
fallback_mode="mock",
on_intercept_error=lambda provider, operation, exc:
print(f"[MockMesh] {provider}/{operation} failed: {exc}"),
)
Custom Response Overrides
JSON config file
Create a JSON file to control exactly what MockMesh returns:
{
"http": {
"rules": [
{
"match": { "url_contains": "stripe.com", "method": "POST" },
"response": {
"status": 200,
"body": { "id": "pi_test_001", "status": "succeeded", "amount": 4999 }
}
},
{
"match": { "url": "https://api.sendgrid.com/v3/mail/send", "method": "POST" },
"response": { "status": 202, "body": null }
}
]
},
"aws": {
"rules": [
{
"match": { "service": "secretsmanager", "operation": "GetSecretValue" },
"response": {
"status": 200,
"body": {
"SecretString": "{\"db_host\":\"localhost\",\"api_key\":\"sk-test-123\"}"
}
}
}
]
},
"kafka": {
"rules": [
{
"match": { "topic": "order-events" },
"response": {
"produce": { "error": null, "offset": 42 },
"consume": { "value": "{\"order_id\":\"ORD-001\"}" }
}
}
]
}
}
mockmesh.initialize(config_path="test_overrides.json")
Rules are evaluated top-to-bottom, first match wins. HTTP rules support url (exact), url_contains (substring), and method filtering. AWS rules match by service and operation.
Folder-based overrides
Drop provider-specific JSON files into a folder:
fixtures/
├── aws.json
├── azure.json
└── http.json
mockmesh.initialize(responses_path="fixtures/")
Or place them in .mockmesh/ at your project root -- they're picked up automatically with no arguments.
Supported Providers
Cloud
| Provider | Key Services | Operations |
|---|---|---|
| AWS | S3, DynamoDB, SQS, SNS, Lambda, ECS, EKS, ECR, ElastiCache, SecretsManager, SSM, STS, IAM, CloudWatch, EventBridge, RDS/Aurora | 290+ |
| Azure | Blob, Cosmos DB, Service Bus, Key Vault, Event Hubs, Storage Queue, Table Storage, App Configuration, SQL, Redis, AKS, ACI | 80+ |
| GCP | GCS, Pub/Sub, Secret Manager, Firestore, BigQuery, Spanner, GKE, Cloud Run | 40+ |
Databases
| Provider | Drivers |
|---|---|
| Direct SQL | psycopg2, psycopg3, pymysql, mysql-connector, asyncpg, aiomysql |
| MongoDB | pymongo -- MongoClient, CRUD, queries ($eq, $gt, $in, $regex, $and, $or), cursors, aggregation |
| Redis | redis-py -- strings, hashes, lists, sets, pipelines, flushdb/flushall, multiple databases |
| AWS RDS/Aurora | 47 operations -- MySQL, PostgreSQL, MariaDB, Oracle, SQL Server, Aurora, Serverless v2 |
Messaging & HTTP
| Provider | Libraries |
|---|---|
| Kafka | confluent-kafka, kafka-python |
| RabbitMQ | pika BlockingConnection |
| HTTP | Any URL via requests and urllib |
Usage Examples
DynamoDB write and read
import mockmesh
mockmesh.initialize()
import boto3
ddb = boto3.client("dynamodb", region_name="us-east-1")
ddb.put_item(
TableName="Users",
Item={"id": {"S": "u-1"}, "name": {"S": "Alice"}, "age": {"N": "30"}}
)
resp = ddb.get_item(TableName="Users", Key={"id": {"S": "u-1"}})
print(resp["Item"]["name"]["S"]) # "Alice"
SQS round-trip
import mockmesh, boto3, json
mockmesh.initialize()
sqs = boto3.client("sqs", region_name="us-east-1")
sqs.send_message(
QueueUrl="https://sqs.us-east-1.amazonaws.com/123/MyQueue",
MessageBody='{"order": 1}'
)
resp = sqs.receive_message(QueueUrl="https://sqs.us-east-1.amazonaws.com/123/MyQueue")
print(json.loads(resp["Messages"][0]["Body"])) # {"order": 1}
Azure Cosmos DB
import mockmesh
mockmesh.initialize()
from azure.cosmos import CosmosClient
client = CosmosClient("https://mock.documents.azure.com:443/", credential="mock==")
container = client.get_database_client("mydb").get_container_client("items")
container.upsert_item({"id": "item-1", "name": "Widget", "price": 29.99})
item = container.read_item("item-1", partition_key="item-1")
print(item["name"]) # "Widget"
MongoDB
import mockmesh
mockmesh.initialize()
import pymongo
client = pymongo.MongoClient("mongodb://localhost:27017")
coll = client["mydb"]["users"]
coll.insert_one({"name": "Alice", "age": 30})
doc = coll.find_one({"name": "Alice"})
print(doc["age"]) # 30
Redis
import mockmesh
mockmesh.initialize()
import redis
r = redis.Redis(host="localhost", port=6379, db=0)
r.set("session:u-1", "token-abc")
print(r.get("session:u-1")) # b"token-abc"
r.hset("user:u-1", mapping={"name": "Alice", "plan": "pro"})
print(r.hget("user:u-1", "plan")) # b"pro"
Direct SQL (psycopg2)
import mockmesh
mockmesh.initialize()
import psycopg2
conn = psycopg2.connect(host="localhost", database="app", user="admin")
cur = conn.cursor()
cur.execute("CREATE TABLE users (id SERIAL PRIMARY KEY, name TEXT, email TEXT)")
cur.execute("INSERT INTO users (name, email) VALUES (%s, %s)", ("Alice", "alice@co.io"))
conn.commit()
cur.execute("SELECT name FROM users WHERE name LIKE %s", ("Ali%",))
print(cur.fetchone()[0]) # "Alice"
Kafka
import mockmesh, json
mockmesh.initialize()
from confluent_kafka import Producer, Consumer
producer = Producer({"bootstrap.servers": "localhost:9092"})
producer.produce("events", value=json.dumps({"type": "signup"}).encode())
producer.flush()
Multi-service workflow
import mockmesh, boto3, requests, json
mockmesh.initialize()
# HTTP call to Stripe
r = requests.post("https://api.stripe.com/v1/charges", json={"amount": 4999})
charge = r.json()
# Write to DynamoDB
ddb = boto3.client("dynamodb", region_name="us-east-1")
ddb.put_item(TableName="charges", Item={
"id": {"S": charge["id"]}, "amount": {"N": "4999"}, "status": {"S": "succeeded"}
})
# Queue a notification
sqs = boto3.client("sqs", region_name="us-east-1")
sqs.send_message(
QueueUrl="https://sqs.us-east-1.amazonaws.com/123/receipts",
MessageBody=json.dumps({"charge_id": charge["id"]})
)
Using with pytest
# conftest.py
import pytest
import mockmesh
@pytest.fixture(autouse=True)
def mock_services():
with mockmesh.engine(fallback_mode="error") as mm:
yield mm
Every test automatically gets full service mocking. fallback_mode="error" ensures unhandled operations fail loudly in CI.
Auto-Detection
MockMesh automatically detects which service libraries are installed and only activates the relevant interceptors. No configuration needed.
| Installed Package | Interceptor Activated |
|---|---|
botocore |
AWS (S3, DynamoDB, SQS, etc.) |
azure.core |
Azure (Blob, Cosmos, Key Vault, etc.) |
requests |
HTTP + GCP |
confluent_kafka or kafka |
Kafka |
pika |
RabbitMQ |
psycopg2, pymysql, mysql.connector, asyncpg, aiomysql |
Direct SQL |
pymongo |
MongoDB |
redis |
Redis |
Check what's active:
engine = mockmesh.initialize()
print(engine.active_providers) # ["http", "aws", "azure", "kafka", "mongodb", "redis"]
Error Types
from mockmesh import MockMeshError, InterceptError, UnknownOperationError
# MockMeshError -- base exception for all MockMesh errors
# InterceptError -- raised when an interceptor fails (in "error" fallback mode)
# .provider -- e.g. "aws"
# .operation -- e.g. "s3.GetObject"
# .cause -- the original exception
# UnknownOperationError -- raised for unrecognized operations
Logging
By default, MockMesh logs to .mockmesh/logs/mockmesh.log in JSON format. Nothing goes to stdout.
# Enable console output for debugging
mockmesh.initialize(console_log=True)
# Human-readable console output instead of JSON
mockmesh.initialize(console_log=True, json_console=False)
Built-in Default Configuration (_config)
Every built-in default JSON file includes a standardized _config section that centralizes provider-specific values like account IDs, regions, and project IDs. These values are referenced throughout response bodies using {_config.xxx} placeholders, which are resolved automatically at load time.
{
"_meta": { "provider": "aws", "version": "1.0.0", "format": "map" },
"_config": {
"account_id": "123456789012",
"region": "us-east-1",
"mock_uuid": "a1b2c3d4-e5f6-7890-abcd-ef1234567890",
"rds": { ... }
},
"s3": {
"GetObject": {
"ETag": "\"mock-etag\"",
"BucketArn": "arn:aws:s3:::{_config.account_id}:my-bucket"
}
}
}
Provider _config sections
| Provider | Key Config Values |
|---|---|
| AWS | account_id, region, mock_uuid, rds (engine templates) |
| Azure | region, subscription_id, resource_group, vault_host, auth (IMDS/token) |
| GCP | project_id, project_id_auth, project_number, auth (metadata/OAuth2) |
| SQL | connection (default_database, default_host, autocommit) |
| NoSQL | default_database |
| HTTP | {} (reserved) |
| Streaming | {} (reserved) |
When you provide folder-based overrides (responses_path), you can include your own _config section to override default values. For example, to change the AWS account ID and region across all responses:
{
"_config": {
"account_id": "999888777666",
"region": "eu-west-1"
}
}
License
BSL-1.1 (Business Source License) -- free for non-commercial use, commercial license required for production. See LICENSE for details.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file mockmesh-0.1.0.tar.gz.
File metadata
- Download URL: mockmesh-0.1.0.tar.gz
- Upload date:
- Size: 96.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
df57dcaba70b102a75698a23c1eb1ca11f2bd93e571fc5f245f56382a51505cf
|
|
| MD5 |
eee748d2d4cd90164c0ab2366f75fe82
|
|
| BLAKE2b-256 |
5342bda8d5553c322adc9710423243beeaeaf45b44b45e4525da4804c95351a6
|
Provenance
The following attestation bundles were made for mockmesh-0.1.0.tar.gz:
Publisher:
python-publish.yml on noctivant/mockmesh
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
mockmesh-0.1.0.tar.gz -
Subject digest:
df57dcaba70b102a75698a23c1eb1ca11f2bd93e571fc5f245f56382a51505cf - Sigstore transparency entry: 1280746610
- Sigstore integration time:
-
Permalink:
noctivant/mockmesh@e207640ddfb3c287106efc9ec09d8938bcb641db -
Branch / Tag:
refs/heads/main - Owner: https://github.com/noctivant
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
python-publish.yml@e207640ddfb3c287106efc9ec09d8938bcb641db -
Trigger Event:
workflow_dispatch
-
Statement type:
File details
Details for the file mockmesh-0.1.0-py3-none-any.whl.
File metadata
- Download URL: mockmesh-0.1.0-py3-none-any.whl
- Upload date:
- Size: 113.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
6af6a11d5370cd992a4135c1d20cca5dae31515985f176175309d941aab52625
|
|
| MD5 |
726fcca3d71b2b10fffe154b1d8fc62b
|
|
| BLAKE2b-256 |
9932f4c42b2e509fc26749c022e4dcef310b695e289b5a09f63c2a649ea95032
|
Provenance
The following attestation bundles were made for mockmesh-0.1.0-py3-none-any.whl:
Publisher:
python-publish.yml on noctivant/mockmesh
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
mockmesh-0.1.0-py3-none-any.whl -
Subject digest:
6af6a11d5370cd992a4135c1d20cca5dae31515985f176175309d941aab52625 - Sigstore transparency entry: 1280746613
- Sigstore integration time:
-
Permalink:
noctivant/mockmesh@e207640ddfb3c287106efc9ec09d8938bcb641db -
Branch / Tag:
refs/heads/main - Owner: https://github.com/noctivant
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
python-publish.yml@e207640ddfb3c287106efc9ec09d8938bcb641db -
Trigger Event:
workflow_dispatch
-
Statement type: