Skip to main content

Intercept and mock AWS, Azure, GCP, Kafka, RabbitMQ, MongoDB, Redis and HTTP calls locally — no application changes required

Project description

MockMesh

PyPI version Python versions License: BSL-1.1

v0.1.0 Beta | BSL-1.1 Licensed

Run your application locally without touching external services -- no if ENV == local required.

MockMesh intercepts outbound calls to AWS, Azure, GCP, Kafka, RabbitMQ, MongoDB, Redis, SQL databases, and any HTTP endpoint, returning configurable mock responses automatically. Your application code stays completely clean.


The Problem

# Without MockMesh -- conditional logic pollutes production code
import boto3, os

def get_secret(name):
    if os.getenv("ENV") == "local":
        return {"username": "dev", "password": "dev"}   # hardcoded
    client = boto3.client("secretsmanager")
    return json.loads(client.get_secret_value(SecretId=name)["SecretString"])

The Solution

# With MockMesh -- one line, then your code runs identically everywhere
import mockmesh
mockmesh.initialize()

import boto3, json

def get_secret(name):   # no conditional -- works locally AND in production
    client = boto3.client("secretsmanager")
    return json.loads(client.get_secret_value(SecretId=name)["SecretString"])

Installation

pip install mockmesh

# Install only the providers you need:
pip install "mockmesh[aws]"
pip install "mockmesh[azure]"
pip install "mockmesh[aws,azure,kafka,rabbitmq,sql,mongodb,redis]"

Quick Start

import mockmesh
mockmesh.initialize()

import boto3, requests

s3 = boto3.client("s3", region_name="us-east-1")
s3.put_object(Bucket="my-bucket", Key="hello.txt", Body=b"Hello World")
obj = s3.get_object(Bucket="my-bucket", Key="hello.txt")
print(obj["Body"].read())   # b"Hello World"

r = requests.get("https://api.example.com/data")
print(r.json())   # {"mocked": True}

mockmesh.shutdown()

No Docker. No credentials. No configuration files. Writes persist in memory -- PutObject followed by GetObject returns what you put.


API Reference

mockmesh.initialize(**kwargs) -> MockMeshEngine

Activates all interceptors. Call this before importing service SDKs. Safe to call multiple times (returns the existing engine on subsequent calls).

Parameter Type Default Description
config_path str | Path None Path to a JSON file with custom HTTP/AWS/Kafka/RabbitMQ rules
responses_path str | Path None Folder containing provider override files (aws.json, azure.json, etc.)
storage_path str | Path ./.mockmesh Where MockMesh stores persisted data (blobs, NoSQL docs, SQL tables)
fallback_mode str "mock" What happens when an interceptor fails: "mock", "passthrough", or "error"
on_intercept_error callable None Callback (provider, operation, exception) -> optional_response called on failure
console_log bool False Print log output to stdout (by default, logs only go to .mockmesh/logs/mockmesh.log)
json_console bool True If console_log is True, use JSON format (False = human-readable)
log_level int DEBUG Python logging level for MockMesh's internal logger
plugins list None Explicit list of plugin instances (bypasses auto-detection)
engine = mockmesh.initialize(
    config_path="test_overrides.json",
    responses_path="fixtures/",
    storage_path="/tmp/mockmesh-test",
    fallback_mode="error",
    console_log=True,
)

mockmesh.shutdown()

Stops all interceptors and flushes storage. After this, SDK calls hit real services again.

mockmesh.engine(**kwargs) -> MockMeshEngine

Same parameters as initialize(), but returns a context manager instead of a singleton. Use this for scoped test runs:

with mockmesh.engine(fallback_mode="error") as mm:
    run_my_tests()
# interceptors are automatically cleaned up here

mockmesh.register_handler(provider, operation, handler_fn)

Register a custom Python function for any provider + operation pair. The custom handler runs instead of the built-in interceptor logic.

mockmesh.initialize()

mockmesh.register_handler("aws", "s3.GetObject", lambda **kw: {
    "Body": b"custom content",
    "ContentType": "text/plain",
})

mockmesh.register_fallback(provider, fallback_fn)

Register a per-provider fallback function called when the interceptor fails:

mockmesh.register_fallback("aws", lambda prov, op, exc: {
    "error": str(exc), "provider": prov
})

MockMeshEngine Properties

The engine returned by initialize() or engine() exposes:

Property Type Description
engine.storage StorageManager Direct access to the storage layer for pre-seeding data
engine.active_providers list[str] Which providers were auto-detected and activated (e.g. ["http", "aws", "azure"])
engine.fallback_config FallbackConfig Access to modify fallback behavior after initialization
engine.response_engine ResponseEngine The response resolution engine (advanced usage)

Pre-seeding data via engine.storage

engine = mockmesh.initialize()

# S3
engine.storage.s3_put("my-bucket", "config.json", b'{"key": "value"}', "application/json")

# DynamoDB
engine.storage.dynamo_put("Users", {"id": {"S": "u-1"}, "name": {"S": "Alice"}})

# SQS
engine.storage.sqs_enqueue("MyQueue", "msg-1", '{"event": "order.created"}')

# Cosmos DB
engine.storage.cosmos_upsert("profiles", {"id": "u-1", "name": "Alice"})

# MongoDB
engine.storage.mongo_insert("mydb", "users", "u-1", {"name": "Alice", "age": 30})

# Redis
engine.storage.redis_set(0, "session:u-1", {"token": "abc123"})

# SSM Parameter Store
engine.storage.ssm_put("/app/db-host", "localhost")

# Secrets Manager
engine.storage.secret_put("prod/db-creds", '{"user":"admin","pass":"secret"}')

Fallback Modes

Control what happens when MockMesh encounters an operation it can't handle:

# "mock" (default) -- return a safe generic response, never block
mockmesh.initialize(fallback_mode="mock")

# "error" -- raise InterceptError immediately (recommended for CI)
mockmesh.initialize(fallback_mode="error")

# "passthrough" -- call the real service (useful for hybrid staging)
mockmesh.initialize(fallback_mode="passthrough")

Per-provider overrides

engine = mockmesh.initialize(fallback_mode="mock")

# Let HTTP calls hit real APIs, mock everything else
engine.fallback_config.provider_overrides["http"] = "passthrough"

# Fail loudly if an unknown AWS operation is hit
engine.fallback_config.provider_overrides["aws"] = "error"

Error callback

mockmesh.initialize(
    fallback_mode="mock",
    on_intercept_error=lambda provider, operation, exc:
        print(f"[MockMesh] {provider}/{operation} failed: {exc}"),
)

Custom Response Overrides

JSON config file

Create a JSON file to control exactly what MockMesh returns:

{
  "http": {
    "rules": [
      {
        "match": { "url_contains": "stripe.com", "method": "POST" },
        "response": {
          "status": 200,
          "body": { "id": "pi_test_001", "status": "succeeded", "amount": 4999 }
        }
      },
      {
        "match": { "url": "https://api.sendgrid.com/v3/mail/send", "method": "POST" },
        "response": { "status": 202, "body": null }
      }
    ]
  },
  "aws": {
    "rules": [
      {
        "match": { "service": "secretsmanager", "operation": "GetSecretValue" },
        "response": {
          "status": 200,
          "body": {
            "SecretString": "{\"db_host\":\"localhost\",\"api_key\":\"sk-test-123\"}"
          }
        }
      }
    ]
  },
  "kafka": {
    "rules": [
      {
        "match": { "topic": "order-events" },
        "response": {
          "produce": { "error": null, "offset": 42 },
          "consume": { "value": "{\"order_id\":\"ORD-001\"}" }
        }
      }
    ]
  }
}
mockmesh.initialize(config_path="test_overrides.json")

Rules are evaluated top-to-bottom, first match wins. HTTP rules support url (exact), url_contains (substring), and method filtering. AWS rules match by service and operation.

Folder-based overrides

Drop provider-specific JSON files into a folder:

fixtures/
├── aws.json
├── azure.json
└── http.json
mockmesh.initialize(responses_path="fixtures/")

Or place them in .mockmesh/ at your project root -- they're picked up automatically with no arguments.


Supported Providers

Cloud

Provider Key Services Operations
AWS S3, DynamoDB, SQS, SNS, Lambda, ECS, EKS, ECR, ElastiCache, SecretsManager, SSM, STS, IAM, CloudWatch, EventBridge, RDS/Aurora 290+
Azure Blob, Cosmos DB, Service Bus, Key Vault, Event Hubs, Storage Queue, Table Storage, App Configuration, SQL, Redis, AKS, ACI 80+
GCP GCS, Pub/Sub, Secret Manager, Firestore, BigQuery, Spanner, GKE, Cloud Run 40+

Databases

Provider Drivers
Direct SQL psycopg2, psycopg3, pymysql, mysql-connector, asyncpg, aiomysql
MongoDB pymongo -- MongoClient, CRUD, queries ($eq, $gt, $in, $regex, $and, $or), cursors, aggregation
Redis redis-py -- strings, hashes, lists, sets, pipelines, flushdb/flushall, multiple databases
AWS RDS/Aurora 47 operations -- MySQL, PostgreSQL, MariaDB, Oracle, SQL Server, Aurora, Serverless v2

Messaging & HTTP

Provider Libraries
Kafka confluent-kafka, kafka-python
RabbitMQ pika BlockingConnection
HTTP Any URL via requests and urllib

Usage Examples

DynamoDB write and read

import mockmesh
mockmesh.initialize()

import boto3

ddb = boto3.client("dynamodb", region_name="us-east-1")
ddb.put_item(
    TableName="Users",
    Item={"id": {"S": "u-1"}, "name": {"S": "Alice"}, "age": {"N": "30"}}
)
resp = ddb.get_item(TableName="Users", Key={"id": {"S": "u-1"}})
print(resp["Item"]["name"]["S"])   # "Alice"

SQS round-trip

import mockmesh, boto3, json
mockmesh.initialize()

sqs = boto3.client("sqs", region_name="us-east-1")
sqs.send_message(
    QueueUrl="https://sqs.us-east-1.amazonaws.com/123/MyQueue",
    MessageBody='{"order": 1}'
)
resp = sqs.receive_message(QueueUrl="https://sqs.us-east-1.amazonaws.com/123/MyQueue")
print(json.loads(resp["Messages"][0]["Body"]))   # {"order": 1}

Azure Cosmos DB

import mockmesh
mockmesh.initialize()

from azure.cosmos import CosmosClient

client = CosmosClient("https://mock.documents.azure.com:443/", credential="mock==")
container = client.get_database_client("mydb").get_container_client("items")
container.upsert_item({"id": "item-1", "name": "Widget", "price": 29.99})
item = container.read_item("item-1", partition_key="item-1")
print(item["name"])   # "Widget"

MongoDB

import mockmesh
mockmesh.initialize()

import pymongo

client = pymongo.MongoClient("mongodb://localhost:27017")
coll = client["mydb"]["users"]
coll.insert_one({"name": "Alice", "age": 30})
doc = coll.find_one({"name": "Alice"})
print(doc["age"])   # 30

Redis

import mockmesh
mockmesh.initialize()

import redis

r = redis.Redis(host="localhost", port=6379, db=0)
r.set("session:u-1", "token-abc")
print(r.get("session:u-1"))   # b"token-abc"

r.hset("user:u-1", mapping={"name": "Alice", "plan": "pro"})
print(r.hget("user:u-1", "plan"))   # b"pro"

Direct SQL (psycopg2)

import mockmesh
mockmesh.initialize()

import psycopg2

conn = psycopg2.connect(host="localhost", database="app", user="admin")
cur = conn.cursor()
cur.execute("CREATE TABLE users (id SERIAL PRIMARY KEY, name TEXT, email TEXT)")
cur.execute("INSERT INTO users (name, email) VALUES (%s, %s)", ("Alice", "alice@co.io"))
conn.commit()

cur.execute("SELECT name FROM users WHERE name LIKE %s", ("Ali%",))
print(cur.fetchone()[0])   # "Alice"

Kafka

import mockmesh, json
mockmesh.initialize()

from confluent_kafka import Producer, Consumer

producer = Producer({"bootstrap.servers": "localhost:9092"})
producer.produce("events", value=json.dumps({"type": "signup"}).encode())
producer.flush()

Multi-service workflow

import mockmesh, boto3, requests, json
mockmesh.initialize()

# HTTP call to Stripe
r = requests.post("https://api.stripe.com/v1/charges", json={"amount": 4999})
charge = r.json()

# Write to DynamoDB
ddb = boto3.client("dynamodb", region_name="us-east-1")
ddb.put_item(TableName="charges", Item={
    "id": {"S": charge["id"]}, "amount": {"N": "4999"}, "status": {"S": "succeeded"}
})

# Queue a notification
sqs = boto3.client("sqs", region_name="us-east-1")
sqs.send_message(
    QueueUrl="https://sqs.us-east-1.amazonaws.com/123/receipts",
    MessageBody=json.dumps({"charge_id": charge["id"]})
)

Using with pytest

# conftest.py
import pytest
import mockmesh

@pytest.fixture(autouse=True)
def mock_services():
    with mockmesh.engine(fallback_mode="error") as mm:
        yield mm

Every test automatically gets full service mocking. fallback_mode="error" ensures unhandled operations fail loudly in CI.


Auto-Detection

MockMesh automatically detects which service libraries are installed and only activates the relevant interceptors. No configuration needed.

Installed Package Interceptor Activated
botocore AWS (S3, DynamoDB, SQS, etc.)
azure.core Azure (Blob, Cosmos, Key Vault, etc.)
requests HTTP + GCP
confluent_kafka or kafka Kafka
pika RabbitMQ
psycopg2, pymysql, mysql.connector, asyncpg, aiomysql Direct SQL
pymongo MongoDB
redis Redis

Check what's active:

engine = mockmesh.initialize()
print(engine.active_providers)   # ["http", "aws", "azure", "kafka", "mongodb", "redis"]

Error Types

from mockmesh import MockMeshError, InterceptError, UnknownOperationError

# MockMeshError      -- base exception for all MockMesh errors
# InterceptError     -- raised when an interceptor fails (in "error" fallback mode)
#   .provider        -- e.g. "aws"
#   .operation       -- e.g. "s3.GetObject"
#   .cause           -- the original exception
# UnknownOperationError -- raised for unrecognized operations

Logging

By default, MockMesh logs to .mockmesh/logs/mockmesh.log in JSON format. Nothing goes to stdout.

# Enable console output for debugging
mockmesh.initialize(console_log=True)

# Human-readable console output instead of JSON
mockmesh.initialize(console_log=True, json_console=False)

Built-in Default Configuration (_config)

Every built-in default JSON file includes a standardized _config section that centralizes provider-specific values like account IDs, regions, and project IDs. These values are referenced throughout response bodies using {_config.xxx} placeholders, which are resolved automatically at load time.

{
  "_meta": { "provider": "aws", "version": "1.0.0", "format": "map" },
  "_config": {
    "account_id": "123456789012",
    "region": "us-east-1",
    "mock_uuid": "a1b2c3d4-e5f6-7890-abcd-ef1234567890",
    "rds": { ... }
  },
  "s3": {
    "GetObject": {
      "ETag": "\"mock-etag\"",
      "BucketArn": "arn:aws:s3:::{_config.account_id}:my-bucket"
    }
  }
}

Provider _config sections

Provider Key Config Values
AWS account_id, region, mock_uuid, rds (engine templates)
Azure region, subscription_id, resource_group, vault_host, auth (IMDS/token)
GCP project_id, project_id_auth, project_number, auth (metadata/OAuth2)
SQL connection (default_database, default_host, autocommit)
NoSQL default_database
HTTP {} (reserved)
Streaming {} (reserved)

When you provide folder-based overrides (responses_path), you can include your own _config section to override default values. For example, to change the AWS account ID and region across all responses:

{
  "_config": {
    "account_id": "999888777666",
    "region": "eu-west-1"
  }
}

License

BSL-1.1 (Business Source License) -- free for non-commercial use, commercial license required for production. See LICENSE for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

mockmesh-0.1.0.tar.gz (96.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

mockmesh-0.1.0-py3-none-any.whl (113.0 kB view details)

Uploaded Python 3

File details

Details for the file mockmesh-0.1.0.tar.gz.

File metadata

  • Download URL: mockmesh-0.1.0.tar.gz
  • Upload date:
  • Size: 96.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for mockmesh-0.1.0.tar.gz
Algorithm Hash digest
SHA256 df57dcaba70b102a75698a23c1eb1ca11f2bd93e571fc5f245f56382a51505cf
MD5 eee748d2d4cd90164c0ab2366f75fe82
BLAKE2b-256 5342bda8d5553c322adc9710423243beeaeaf45b44b45e4525da4804c95351a6

See more details on using hashes here.

Provenance

The following attestation bundles were made for mockmesh-0.1.0.tar.gz:

Publisher: python-publish.yml on noctivant/mockmesh

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file mockmesh-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: mockmesh-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 113.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for mockmesh-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 6af6a11d5370cd992a4135c1d20cca5dae31515985f176175309d941aab52625
MD5 726fcca3d71b2b10fffe154b1d8fc62b
BLAKE2b-256 9932f4c42b2e509fc26749c022e4dcef310b695e289b5a09f63c2a649ea95032

See more details on using hashes here.

Provenance

The following attestation bundles were made for mockmesh-0.1.0-py3-none-any.whl:

Publisher: python-publish.yml on noctivant/mockmesh

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page