Testcontainers module for AT Protocol PDS integration testing
Project description
testcontainers-atproto
Spin up ephemeral PDS instances in your Python test suite. Lexicon-agnostic — works with any application built on AT Protocol, not just Bluesky.
Installation
pip install testcontainers-atproto
Requires Python 3.10+ and a running Docker daemon.
Extras
| Extra | What it adds |
|---|---|
testcontainers-atproto[firehose] |
websockets, cbor2 for firehose subscription |
testcontainers-atproto[sync] |
cbor2 for CAR file parsing |
testcontainers-atproto[sdk] |
atproto (MarshalX SDK) for high-level record ops |
testcontainers-atproto[oauth] |
cryptography, PyJWT for OAuth DPoP flow testing |
testcontainers-atproto[all] |
All of the above |
Quick start
from testcontainers_atproto import PDSContainer
with PDSContainer() as pds:
account = pds.create_account("alice.test")
print(pds.base_url) # http://localhost:<port>
print(account.did) # did:plc:...
print(account.handle) # alice.test
A local PLC directory runs alongside the PDS on a shared Docker network — no public internet required. For Postgres-backed PLC parity with production, pass plc_mode="real":
with PDSContainer(plc_mode="real") as pds:
account = pds.create_account("alice.test")
Record operations
with PDSContainer() as pds:
alice = pds.create_account("alice.test")
# Create
ref = alice.create_record("app.bsky.feed.post", {
"$type": "app.bsky.feed.post",
"text": "hello from testcontainers",
"createdAt": "2026-01-01T00:00:00Z",
})
# Read
record = alice.get_record("app.bsky.feed.post", ref.rkey)
# Update
alice.put_record("app.bsky.feed.post", ref.rkey, {
"$type": "app.bsky.feed.post",
"text": "updated text",
"createdAt": "2026-01-01T00:00:00Z",
})
# List & delete
records = alice.list_records("app.bsky.feed.post")
alice.delete_record("app.bsky.feed.post", ref.rkey)
Firehose subscription
Observe real-time repository events via the AT Protocol firehose:
from testcontainers_atproto import PDSContainer
with PDSContainer() as pds:
alice = pds.create_account("alice.test")
alice.create_record("app.bsky.feed.post", {
"$type": "app.bsky.feed.post",
"text": "hello firehose",
"createdAt": "2026-01-01T00:00:00Z",
})
sub = pds.subscribe()
events = sub.collect(count=10, timeout=5.0)
commits = [e for e in events if e["header"].get("t") == "#commit"]
print(commits[-1]["body"]["ops"]) # [{"action": "create", ...}]
Requires the firehose extra: pip install testcontainers-atproto[firehose]
Repo sync
Export repositories and retrieve blobs for relay and indexer testing:
with PDSContainer() as pds:
alice = pds.create_account("alice.test")
alice.create_record("app.bsky.feed.post", {
"$type": "app.bsky.feed.post",
"text": "sync test",
"createdAt": "2026-01-01T00:00:00Z",
})
# Export full repository as CAR bytes
car_bytes = alice.export_repo()
# Parse the CAR to inspect blocks
from testcontainers_atproto import parse_car
car = parse_car(car_bytes)
print(f"{len(car.blocks)} blocks, {len(car.roots)} roots")
# Retrieve a specific blob
blob_ref = alice.upload_blob(b"test data", "image/png")
blob_data = alice.get_blob(blob_ref["ref"]["$link"])
assert blob_data == b"test data"
CAR parsing requires the sync extra: pip install testcontainers-atproto[sync]
Declarative seeding
Describe the world, materialize it in one call — no boilerplate account/record setup:
from testcontainers_atproto import PDSContainer, Seed
with PDSContainer() as pds:
world = (
Seed(pds)
.account("alice.test")
.post("Hello from Alice")
.post("Another post")
.account("bob.test")
.post("Bob's first post")
.follow("alice.test")
.like("alice.test", 0) # like Alice's first post
.apply()
)
alice = world.accounts["alice.test"]
bob = world.accounts["bob.test"]
assert len(world.records["alice.test"]) == 2
Placeholders let custom records reference other accounts' DIDs and records — resolved at apply() time:
with PDSContainer() as pds:
world = (
Seed(pds)
.account("alice.test")
.record("com.example.project", {
"$type": "com.example.project",
"name": "My Project",
})
.account("bob.test")
.record("com.example.review", {
"$type": "com.example.review",
"reviewer": Seed.did("bob.test"),
"project": Seed.ref("alice.test", 0),
})
.apply()
)
Accounts can be revisited to interleave records (e.g. conversation threads):
world = (
Seed(pds)
.account("alice.test")
.post("alice first")
.account("bob.test")
.post("bob replies")
.account("alice.test")
.post("alice continues")
.apply()
)
Also available as a dict-based API for data-driven fixtures:
world = pds.seed({
"accounts": [
{"handle": "alice.test", "posts": ["Hello from Alice"]},
{"handle": "bob.test", "follows": ["alice.test"]},
],
})
Federation testing
Test cross-PDS scenarios with two PDS instances sharing a PLC directory:
def test_cross_pds_resolution(pds_pair):
pds_a, pds_b = pds_pair
alice = pds_a.create_account("alice.test")
bob = pds_b.create_account("bob.test")
# Each PDS resolves its own handles
assert pds_a.xrpc_get(
"com.atproto.identity.resolveHandle",
params={"handle": "alice.test"},
)["did"] == alice.did
# DIDs from both PDS instances are registered in the shared PLC
assert alice.did != bob.did
assert alice.did.startswith("did:plc:")
assert bob.did.startswith("did:plc:")
The pds_pair fixture creates a shared Docker network and PLC directory. Handle resolution is local to each PDS; cross-PDS discovery uses DIDs resolved through the shared PLC.
Relay testing
Test relay aggregation with two PDS instances feeding a single relay:
def test_relay_aggregates_events(pds_relay):
pds_a, pds_b, relay = pds_relay
alice = pds_a.create_account("alice.test")
bob = pds_b.create_account("bob.test")
alice.create_record("app.bsky.feed.post", {
"$type": "app.bsky.feed.post",
"text": "hello from PDS-A",
"createdAt": "2026-01-01T00:00:00Z",
})
bob.create_record("app.bsky.feed.post", {
"$type": "app.bsky.feed.post",
"text": "hello from PDS-B",
"createdAt": "2026-01-01T00:00:00Z",
})
import time
time.sleep(2)
sub = relay.subscribe()
events = sub.collect(count=30, timeout=10.0)
commits = [
e for e in events
if e["header"].get("t") == "#commit" and e["body"].get("ops")
]
repos = {c["body"]["repo"] for c in commits}
assert alice.did in repos
assert bob.did in repos
The pds_relay fixture creates two PDS instances and a relay on a shared Docker network with a shared PLC directory. The relay crawls both PDS instances on startup, so events from either PDS appear on the relay's aggregated firehose.
Relay admin operations are also available:
def test_relay_admin(pds_relay):
pds_a, _pds_b, relay = pds_relay
# Health check
assert "version" in relay.health()
# List crawled hosts
hosts = relay.list_hosts()
hostnames = [h.get("hostname", "") for h in hosts]
assert any("pds-a" in h for h in hostnames)
Rate limit simulation
Test your client's 429-handling and backoff logic against real PDS rate limiting:
from testcontainers_atproto import CreateSession, PDSContainer
with PDSContainer(rate_limits=True) as pds:
alice = pds.create_account("alice.test", password="s3cret")
target = CreateSession(alice.handle, "s3cret")
# Burn through the rate limit budget (30 calls for createSession)
pds.exhaust_rate_limit_budget(target)
# The next call triggers a 429
import httpx
resp = httpx.post(
f"{pds.base_url}/xrpc/com.atproto.server.createSession",
json={"identifier": alice.handle, "password": "s3cret"},
timeout=10.0,
)
assert resp.status_code == 429
assert resp.json()["error"] == "RateLimitExceeded"
When rate_limits=False (the default), rate limiting is disabled and no bypass key is generated. Internal library calls (account creation, seeding, etc.) always use a bypass header so they never consume rate limit budget.
For custom endpoints, subclass RateLimitTarget:
from testcontainers_atproto import RateLimitTarget
class MyEndpoint(RateLimitTarget):
nsid = "com.example.heavyEndpoint"
def __call__(self, base_url):
return httpx.post(f"{base_url}/xrpc/{self.nsid}", ...)
pds.exhaust_rate_limit_budget(MyEndpoint(), threshold=50)
OAuth DPoP authentication
Test OAuth client implementations end-to-end with DPoP (Demonstration of Proof-of-Possession) bound tokens:
from testcontainers_atproto import PDSContainer
with PDSContainer() as pds:
alice = pds.create_account("alice.test", password="hunter2")
# Full flow in one call — returns an OAuthClient + tokens
client, tokens = pds.oauth_authenticate(alice)
# Use DPoP-authenticated XRPC calls
resp = client.xrpc_get(
"com.atproto.repo.describeRepo",
tokens.access_token,
params={"repo": alice.did},
)
assert resp["handle"] == "alice.test"
# Create records via OAuth
client.xrpc_post(
"com.atproto.repo.createRecord",
tokens.access_token,
data={
"repo": alice.did,
"collection": "app.bsky.feed.post",
"record": {
"$type": "app.bsky.feed.post",
"text": "posted via OAuth DPoP",
"createdAt": "2026-01-01T00:00:00Z",
},
},
)
# Token refresh
new_tokens = client.refresh_tokens(tokens.refresh_token)
# Token revocation
client.revoke_token(new_tokens.access_token)
For step-by-step control over each phase of the flow:
from testcontainers_atproto import DPoPKey, PKCEChallenge, PDSContainer
with PDSContainer() as pds:
alice = pds.create_account("alice.test", password="hunter2")
client = pds.oauth_client()
pkce = PKCEChallenge.generate()
request_uri = client.pushed_authorization_request(pkce, login_hint="alice.test")
code = client.authorize(request_uri, "alice.test", "hunter2")
tokens = client.token_exchange(code, pkce)
Requires the oauth extra: pip install testcontainers-atproto[oauth]
Email verification
Test email verification and password reset flows with a local Mailpit SMTP server:
with PDSContainer(email_mode="capture") as pds:
alice = pds.create_account("alice.test")
# Request verification email
alice.request_email_confirmation()
# Retrieve it from Mailpit
message = pds.await_email(alice.email)
# Extract token and confirm (token format is PDS-version-dependent)
token = extract_token(message) # your extraction logic
alice.confirm_email(token)
Password reset follows the same pattern:
alice.request_password_reset()
message = pds.await_email(alice.email)
token = extract_token(message)
alice.reset_password(token, "new-password")
When email_mode="none" (the default), email verification is bypassed and no Mailpit container is started.
Account lifecycle
Deactivate, reactivate, and delete accounts to test how your app handles state changes:
with PDSContainer() as pds:
alice = pds.create_account("alice.test")
# Deactivate — account becomes inaccessible
alice.deactivate()
# Re-activate — access restored
alice.activate()
# Check status
status = alice.check_account_status()
assert status["activated"] is True
Admin operations let you test moderation flows:
with PDSContainer() as pds:
alice = pds.create_account("alice.test")
# Takedown — blocks access
pds.takedown(alice)
# Restore — unblocks access
pds.restore(alice)
# Query status
status = pds.get_subject_status(alice)
Account deletion requires email_mode="capture" to retrieve the deletion token:
with PDSContainer(email_mode="capture") as pds:
alice = pds.create_account("alice.test", password="s3cret")
# ... confirm email first ...
alice.request_account_delete()
message = pds.await_email(alice.email)
token = extract_token(message) # your extraction logic
alice.delete_account("s3cret", token)
Error handling
XRPC failures raise XrpcError with structured fields:
from testcontainers_atproto import PDSContainer, XrpcError
with PDSContainer() as pds:
try:
pds.create_account("alice.invalid")
except XrpcError as e:
print(e.status_code) # 400
print(e.error) # "InvalidHandle"
print(e.message) # human-readable detail
Pytest fixtures
After installing the package, these fixtures are available automatically via the pytest11 entry point:
| Fixture | Scope | Description |
|---|---|---|
pds |
function | Fresh PDS instance per test |
pds_module |
module | Shared PDS instance within a test module |
pds_pair |
function | Two PDS instances for federation testing |
pds_relay |
function | Two PDS instances + relay for firehose aggregation testing |
pds_image |
session | PDS image tag (override via ATP_PDS_IMAGE env var) |
relay_image |
session | Relay image tag (override via ATP_RELAY_IMAGE env var) |
def test_create_account(pds):
account = pds.create_account("bob.test")
assert account.did.startswith("did:plc:")
Development
make venv # Create virtual environment
source .testcontainers-atproto-3.12/bin/activate # Activate
make test # Run tests
make test-all # Run across all supported Python versions
Glossary
AT Protocol introduces many domain-specific terms. See docs/glossary.md for definitions of PDS, DID, PLC, XRPC, and other initialisms used in this project.
License
Apache-2.0. See LICENSE.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file testcontainers_atproto-0.10.0.tar.gz.
File metadata
- Download URL: testcontainers_atproto-0.10.0.tar.gz
- Upload date:
- Size: 38.9 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
623394d1ddf205747740cf2e7dad62c64eafe1077aefd5b301481de797defd47
|
|
| MD5 |
05245a6844b50dd0a653f2509b965e5e
|
|
| BLAKE2b-256 |
ea584568d98996f448daace016e23e7f573efd222bfe966f338f6eaa925e9249
|
File details
Details for the file testcontainers_atproto-0.10.0-py3-none-any.whl.
File metadata
- Download URL: testcontainers_atproto-0.10.0-py3-none-any.whl
- Upload date:
- Size: 39.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
936e1881940d83441e3fd7ec4a615d1a991836f865d13629af861b2268e15e3d
|
|
| MD5 |
89da86845cb9f1751fd9ece8265cd751
|
|
| BLAKE2b-256 |
54d6488f4f01c58d39a33b3524ed46e4639db381f8d743212e5d09658764e7ce
|