Skip to main content

Testcontainers module for AT Protocol PDS integration testing

Project description

testcontainers-atproto

tests codecov publish

Spin up ephemeral PDS instances in your Python test suite. Lexicon-agnostic — works with any application built on AT Protocol, not just Bluesky.


Installation

pip install testcontainers-atproto

Requires Python 3.10+ and a running Docker daemon.

Extras

Extra What it adds
testcontainers-atproto[firehose] websockets, cbor2 for firehose subscription
testcontainers-atproto[sync] cbor2 for CAR file parsing
testcontainers-atproto[sdk] atproto (MarshalX SDK) for high-level record ops
testcontainers-atproto[oauth] cryptography, PyJWT for OAuth DPoP flow testing
testcontainers-atproto[all] All of the above

Quick start

from testcontainers_atproto import PDSContainer

with PDSContainer() as pds:
    account = pds.create_account("alice.test")
    print(pds.base_url)       # http://localhost:<port>
    print(account.did)         # did:plc:...
    print(account.handle)      # alice.test

A local PLC directory runs alongside the PDS on a shared Docker network — no public internet required. For Postgres-backed PLC parity with production, pass plc_mode="real":

with PDSContainer(plc_mode="real") as pds:
    account = pds.create_account("alice.test")

Record operations

with PDSContainer() as pds:
    alice = pds.create_account("alice.test")

    # Create
    ref = alice.create_record("app.bsky.feed.post", {
        "$type": "app.bsky.feed.post",
        "text": "hello from testcontainers",
        "createdAt": "2026-01-01T00:00:00Z",
    })

    # Read
    record = alice.get_record("app.bsky.feed.post", ref.rkey)

    # Update
    alice.put_record("app.bsky.feed.post", ref.rkey, {
        "$type": "app.bsky.feed.post",
        "text": "updated text",
        "createdAt": "2026-01-01T00:00:00Z",
    })

    # List & delete
    records = alice.list_records("app.bsky.feed.post")
    alice.delete_record("app.bsky.feed.post", ref.rkey)

Firehose subscription

Observe real-time repository events via the AT Protocol firehose:

from testcontainers_atproto import PDSContainer

with PDSContainer() as pds:
    alice = pds.create_account("alice.test")
    alice.create_record("app.bsky.feed.post", {
        "$type": "app.bsky.feed.post",
        "text": "hello firehose",
        "createdAt": "2026-01-01T00:00:00Z",
    })

    sub = pds.subscribe()
    events = sub.collect(count=10, timeout=5.0)

    commits = [e for e in events if e["header"].get("t") == "#commit"]
    print(commits[-1]["body"]["ops"])  # [{"action": "create", ...}]

Requires the firehose extra: pip install testcontainers-atproto[firehose]

Repo sync

Export repositories and retrieve blobs for relay and indexer testing:

with PDSContainer() as pds:
    alice = pds.create_account("alice.test")
    alice.create_record("app.bsky.feed.post", {
        "$type": "app.bsky.feed.post",
        "text": "sync test",
        "createdAt": "2026-01-01T00:00:00Z",
    })

    # Export full repository as CAR bytes
    car_bytes = alice.export_repo()

    # Parse the CAR to inspect blocks
    from testcontainers_atproto import parse_car
    car = parse_car(car_bytes)
    print(f"{len(car.blocks)} blocks, {len(car.roots)} roots")

    # Retrieve a specific blob
    blob_ref = alice.upload_blob(b"test data", "image/png")
    blob_data = alice.get_blob(blob_ref["ref"]["$link"])
    assert blob_data == b"test data"

CAR parsing requires the sync extra: pip install testcontainers-atproto[sync]

Declarative seeding

Describe the world, materialize it in one call — no boilerplate account/record setup:

from testcontainers_atproto import PDSContainer, Seed

with PDSContainer() as pds:
    world = (
        Seed(pds)
        .account("alice.test")
            .post("Hello from Alice")
            .post("Another post")
        .account("bob.test")
            .post("Bob's first post")
            .follow("alice.test")
            .like("alice.test", 0)   # like Alice's first post
        .apply()
    )

    alice = world.accounts["alice.test"]
    bob = world.accounts["bob.test"]
    assert len(world.records["alice.test"]) == 2

Placeholders let custom records reference other accounts' DIDs and records — resolved at apply() time:

with PDSContainer() as pds:
    world = (
        Seed(pds)
        .account("alice.test")
            .record("com.example.project", {
                "$type": "com.example.project",
                "name": "My Project",
            })
        .account("bob.test")
            .record("com.example.review", {
                "$type": "com.example.review",
                "reviewer": Seed.did("bob.test"),
                "project": Seed.ref("alice.test", 0),
            })
        .apply()
    )

Accounts can be revisited to interleave records (e.g. conversation threads):

world = (
    Seed(pds)
    .account("alice.test")
        .post("alice first")
    .account("bob.test")
        .post("bob replies")
    .account("alice.test")
        .post("alice continues")
    .apply()
)

Also available as a dict-based API for data-driven fixtures:

world = pds.seed({
    "accounts": [
        {"handle": "alice.test", "posts": ["Hello from Alice"]},
        {"handle": "bob.test", "follows": ["alice.test"]},
    ],
})

Federation testing

Test cross-PDS scenarios with two PDS instances sharing a PLC directory:

def test_cross_pds_resolution(pds_pair):
    pds_a, pds_b = pds_pair
    alice = pds_a.create_account("alice.test")
    bob = pds_b.create_account("bob.test")

    # Each PDS resolves its own handles
    assert pds_a.xrpc_get(
        "com.atproto.identity.resolveHandle",
        params={"handle": "alice.test"},
    )["did"] == alice.did

    # DIDs from both PDS instances are registered in the shared PLC
    assert alice.did != bob.did
    assert alice.did.startswith("did:plc:")
    assert bob.did.startswith("did:plc:")

The pds_pair fixture creates a shared Docker network and PLC directory. Handle resolution is local to each PDS; cross-PDS discovery uses DIDs resolved through the shared PLC.

Rate limit simulation

Test your client's 429-handling and backoff logic against real PDS rate limiting:

from testcontainers_atproto import CreateSession, PDSContainer

with PDSContainer(rate_limits=True) as pds:
    alice = pds.create_account("alice.test", password="s3cret")
    target = CreateSession(alice.handle, "s3cret")

    # Burn through the rate limit budget (30 calls for createSession)
    pds.exhaust_rate_limit_budget(target)

    # The next call triggers a 429
    import httpx
    resp = httpx.post(
        f"{pds.base_url}/xrpc/com.atproto.server.createSession",
        json={"identifier": alice.handle, "password": "s3cret"},
        timeout=10.0,
    )
    assert resp.status_code == 429
    assert resp.json()["error"] == "RateLimitExceeded"

When rate_limits=False (the default), rate limiting is disabled and no bypass key is generated. Internal library calls (account creation, seeding, etc.) always use a bypass header so they never consume rate limit budget.

For custom endpoints, subclass RateLimitTarget:

from testcontainers_atproto import RateLimitTarget

class MyEndpoint(RateLimitTarget):
    nsid = "com.example.heavyEndpoint"

    def __call__(self, base_url):
        return httpx.post(f"{base_url}/xrpc/{self.nsid}", ...)

pds.exhaust_rate_limit_budget(MyEndpoint(), threshold=50)

OAuth DPoP authentication

Test OAuth client implementations end-to-end with DPoP (Demonstration of Proof-of-Possession) bound tokens:

from testcontainers_atproto import PDSContainer

with PDSContainer() as pds:
    alice = pds.create_account("alice.test", password="hunter2")

    # Full flow in one call — returns an OAuthClient + tokens
    client, tokens = pds.oauth_authenticate(alice)

    # Use DPoP-authenticated XRPC calls
    resp = client.xrpc_get(
        "com.atproto.repo.describeRepo",
        tokens.access_token,
        params={"repo": alice.did},
    )
    assert resp["handle"] == "alice.test"

    # Create records via OAuth
    client.xrpc_post(
        "com.atproto.repo.createRecord",
        tokens.access_token,
        data={
            "repo": alice.did,
            "collection": "app.bsky.feed.post",
            "record": {
                "$type": "app.bsky.feed.post",
                "text": "posted via OAuth DPoP",
                "createdAt": "2026-01-01T00:00:00Z",
            },
        },
    )

    # Token refresh
    new_tokens = client.refresh_tokens(tokens.refresh_token)

    # Token revocation
    client.revoke_token(new_tokens.access_token)

For step-by-step control over each phase of the flow:

from testcontainers_atproto import DPoPKey, PKCEChallenge, PDSContainer

with PDSContainer() as pds:
    alice = pds.create_account("alice.test", password="hunter2")
    client = pds.oauth_client()

    pkce = PKCEChallenge.generate()
    request_uri = client.pushed_authorization_request(pkce, login_hint="alice.test")
    code = client.authorize(request_uri, "alice.test", "hunter2")
    tokens = client.token_exchange(code, pkce)

Requires the oauth extra: pip install testcontainers-atproto[oauth]

Email verification

Test email verification and password reset flows with a local Mailpit SMTP server:

with PDSContainer(email_mode="capture") as pds:
    alice = pds.create_account("alice.test")

    # Request verification email
    alice.request_email_confirmation()

    # Retrieve it from Mailpit
    message = pds.await_email(alice.email)

    # Extract token and confirm (token format is PDS-version-dependent)
    token = extract_token(message)  # your extraction logic
    alice.confirm_email(token)

Password reset follows the same pattern:

    alice.request_password_reset()
    message = pds.await_email(alice.email)
    token = extract_token(message)
    alice.reset_password(token, "new-password")

When email_mode="none" (the default), email verification is bypassed and no Mailpit container is started.

Account lifecycle

Deactivate, reactivate, and delete accounts to test how your app handles state changes:

with PDSContainer() as pds:
    alice = pds.create_account("alice.test")

    # Deactivate — account becomes inaccessible
    alice.deactivate()

    # Re-activate — access restored
    alice.activate()

    # Check status
    status = alice.check_account_status()
    assert status["activated"] is True

Admin operations let you test moderation flows:

with PDSContainer() as pds:
    alice = pds.create_account("alice.test")

    # Takedown — blocks access
    pds.takedown(alice)

    # Restore — unblocks access
    pds.restore(alice)

    # Query status
    status = pds.get_subject_status(alice)

Account deletion requires email_mode="capture" to retrieve the deletion token:

with PDSContainer(email_mode="capture") as pds:
    alice = pds.create_account("alice.test", password="s3cret")

    # ... confirm email first ...

    alice.request_account_delete()
    message = pds.await_email(alice.email)
    token = extract_token(message)  # your extraction logic
    alice.delete_account("s3cret", token)

Error handling

XRPC failures raise XrpcError with structured fields:

from testcontainers_atproto import PDSContainer, XrpcError

with PDSContainer() as pds:
    try:
        pds.create_account("alice.invalid")
    except XrpcError as e:
        print(e.status_code)  # 400
        print(e.error)        # "InvalidHandle"
        print(e.message)      # human-readable detail

Pytest fixtures

After installing the package, these fixtures are available automatically via the pytest11 entry point:

Fixture Scope Description
pds function Fresh PDS instance per test
pds_module module Shared PDS instance within a test module
pds_pair function Two PDS instances for federation testing
pds_image session PDS image tag (override via ATP_PDS_IMAGE env var)
def test_create_account(pds):
    account = pds.create_account("bob.test")
    assert account.did.startswith("did:plc:")

Development

make venv                                       # Create virtual environment
source .testcontainers-atproto-3.12/bin/activate # Activate
make test                                        # Run tests
make test-all                                    # Run across all supported Python versions

Glossary

AT Protocol introduces many domain-specific terms. See docs/glossary.md for definitions of PDS, DID, PLC, XRPC, and other initialisms used in this project.


License

Apache-2.0. See LICENSE.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

testcontainers_atproto-0.9.0.tar.gz (36.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

testcontainers_atproto-0.9.0-py3-none-any.whl (36.6 kB view details)

Uploaded Python 3

File details

Details for the file testcontainers_atproto-0.9.0.tar.gz.

File metadata

  • Download URL: testcontainers_atproto-0.9.0.tar.gz
  • Upload date:
  • Size: 36.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for testcontainers_atproto-0.9.0.tar.gz
Algorithm Hash digest
SHA256 9a170e8500e37ad88c2d23aecae62698a9a375df5fef2f736cdd2c252fcaa5c0
MD5 f3ec7d3e135ab3e5283244609ca1b5a7
BLAKE2b-256 9d9ac432d26e906755165e7351ed525c4c87a5e122b94e53ca6fd13234a058cb

See more details on using hashes here.

File details

Details for the file testcontainers_atproto-0.9.0-py3-none-any.whl.

File metadata

File hashes

Hashes for testcontainers_atproto-0.9.0-py3-none-any.whl
Algorithm Hash digest
SHA256 a895dcdf69083b987bc30d1d4f49365c0b985fbed4af8bdad41ca18916a88114
MD5 2998fd0dbfc3f5f38adefcf1d0a1d636
BLAKE2b-256 0e666da9a877643a912a8347628e69b4b3994051f7e762a8697ce2a2bc0c892e

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page