Skip to main content

No project description provided

Project description

Schematic Python Library

The Schematic Python Library provides convenient access to the Schematic API from applications written in Python.

The library includes type definitions for all request and response fields, and offers both synchronous and asynchronous clients powered by httpx.

Installation and Setup

  1. Add schematichq to your project's build file:
pip install schematichq
# or
poetry add schematichq
  1. Issue an API key for the appropriate environment using the Schematic app.

  2. Using this secret key, initialize a client in your application:

from schematic.client import Schematic

client = Schematic("YOUR_API_KEY")

Async Client

The SDK exports an async client for non-blocking API calls with automatic background event processing. The async client features lazy initialization - you can start using it immediately without manual setup.

Simple Usage (Lazy Initialization)

The easiest way to use the async client - just create and use it directly:

import asyncio
from schematic.client import AsyncSchematic

async def main():
    # Create client - no initialize() needed!
    client = AsyncSchematic("YOUR_API_KEY")
    
    # Use immediately - auto-initializes on first call
    is_enabled = await client.check_flag(
        "new-feature",
        company={"id": "company-123"},
        user={"id": "user-456"}
    )
    
    if is_enabled:
        print("New feature is enabled!")
    
    # Track usage
    await client.track(
        event="feature-used",
        company={"id": "company-123"},
        user={"id": "user-456"}
    )
    
    # Always shutdown when done
    await client.shutdown()

asyncio.run(main())

Context Manager (Recommended)

Use the async client as a context manager for automatic lifecycle management:

import asyncio
from schematic.client import AsyncSchematic

async def main():
    async with AsyncSchematic("YOUR_API_KEY") as client:
        # Client auto-initializes and will auto-shutdown
        
        is_enabled = await client.check_flag(
            "feature-flag",
            company={"id": "company-123"}
        )
        
        await client.identify(
            keys={"id": "company-123"},
            name="Acme Corp"
        )
        
        # Automatic cleanup on exit

asyncio.run(main())

Production Usage (Explicit Control)

For production applications that need precise control over initialization timing:

import asyncio
from schematic.client import AsyncSchematic

# Web application example
client = AsyncSchematic("YOUR_API_KEY")

async def startup():
    """Call during application startup"""
    await client.initialize()  # Start background tasks now
    print("Schematic client ready")

async def shutdown():
    """Call during application shutdown"""  
    await client.shutdown()   # Stop background tasks and flush events
    print("Schematic client stopped")

async def handle_request():
    """Handle individual requests"""
    # Client is already initialized - this will be fast
    is_enabled = await client.check_flag(
        "feature-flag",
        company={"id": "company-123"}
    )
    return {"feature_enabled": is_enabled}

Exception Handling

All errors thrown by the SDK will be subclasses of ApiError.

try:
    client.companies.get_company(
        company_id="company_id",
    )
except schematic.core.ApiError as e: # Handle all errors
  print(e.status_code)
  print(e.body)

Usage examples

A number of these examples use keys to identify companies and users. Learn more about keys here.

Sending identify events

Create or update users and companies using identify events.

from schematic import EventBodyIdentifyCompany
from schematic.client import Schematic

client = Schematic("YOUR_API_KEY")

client.identify(
    keys={
        "email": "wcoyote@acme.net",
        "user_id": "your-user-id",
    },
    company=EventBodyIdentifyCompany(
        keys={"id": "your-company-id"},
        name="Acme Widgets, Inc.",
        traits={
          "city": "Atlanta",
        },
    ),
    name="Wile E. Coyote",
    traits={
        "login_count": 24,
        "is_staff": false,
    },
)

This call is non-blocking and there is no response to check.

Sending track events

Track activity in your application using track events; these events can later be used to produce metrics for targeting.

from schematic.client import Schematic

client = Schematic("YOUR_API_KEY")

client.track(
    event="some-action",
    user={"user_id": "your-user-id"},
    company={"id": "your-company-id"},
)

Async client:

import asyncio
from schematic.client import AsyncSchematic

async def main():
    async with AsyncSchematic("YOUR_API_KEY") as client:
        await client.track(
            event="some-action",
            user={"user_id": "your-user-id"},
            company={"id": "your-company-id"},
        )

asyncio.run(main())

These calls are non-blocking and there is no response to check.

If you want to record large numbers of the same event at once, or perhaps measure usage in terms of a unit like tokens or memory, you can optionally specify a quantity for your event:

client.track(
    event="some-action",
    user={"user_id": "your-user-id"},
    company={"id": "your-company-id"},
    quantity=10,
)

Creating and updating companies

Although it is faster to create companies and users via identify events, if you need to handle a response, you can use the companies API to upsert companies. Because you use your own identifiers to identify companies, rather than a Schematic company ID, creating and updating companies are both done via the same upsert operation:

from schematic.client import Schematic

client = Schematic("YOUR_API_KEY")

client.companies.upsert_company(
    keys={"id": "your-company-id"},
    name="Acme Widgets, Inc.",
    traits={
        "city": "Atlanta",
        "high_score": 25,
        "is_active": true,
    },
)

You can define any number of company keys; these are used to address the company in the future, for example by updating the company's traits or checking a flag for the company.

You can also define any number of company traits; these can then be used as targeting parameters.

Creating and updating users

Similarly, you can upsert users using the Schematic API, as an alternative to using identify events. Because you use your own identifiers to identify users, rather than a Schematic user ID, creating and updating users are both done via the same upsert operation:

from schematic.client import Schematic

client = Schematic("YOUR_API_KEY")

client.companies.upsert_user(
    keys={
        "email": "wcoyote@acme.net",
        "user_id": "your-user-id",
    },
    name="Wile E. Coyote",
    traits={
        "city": "Atlanta",
        "high_score": 25,
        "is_active": true,
    },
    company={"id": "your-company-id"},
)

You can define any number of user keys; these are used to address the user in the future, for example by updating the user's traits or checking a flag for the user.

You can also define any number of user traits; these can then be used as targeting parameters.

Checking flags

When checking a flag, you'll provide keys for a company and/or keys for a user. You can also provide no keys at all, in which case you'll get the default value for the flag.

from schematic.client import Schematic

client = Schematic("YOUR_API_KEY")

client.check_flag(
    "some-flag-key",
    company={"id": "your-company-id"},
    user={"user_id": "your-user-id"},
)

DataStream

DataStream enables local flag evaluation by maintaining a WebSocket connection to Schematic and caching flag rules, company, and user data locally (or in a shared cache such as Redis). Flag checks are evaluated locally via a WASM rules engine, eliminating per-check network requests.

Async-only: DataStream and Replicator Mode are only available on the AsyncSchematic client. The synchronous Schematic client does not support either feature — use AsyncSchematic (shown in all examples below) if you need them.

Installation

DataStream requires additional dependencies for WebSocket connections and local flag evaluation. Install them with the datastream extra:

pip install 'schematichq[datastream]'
# or
poetry add schematichq -E datastream

To use the Redis-backed shared cache (see below), also install redis:

pip install 'schematichq[datastream]' redis

Key Features

  • Real-Time Updates: Automatically updates cached data when changes occur on the backend.
  • Local Flag Evaluation: Flag checks are evaluated locally via WASM, eliminating per-check network requests.
  • Configurable Caching: Supports in-memory caching (default) and custom AsyncCacheProvider implementations including a built-in Redis provider.

How to Enable DataStream

Set use_datastream=True on AsyncSchematicConfig:

import asyncio
from schematic.client import AsyncSchematic, AsyncSchematicConfig, DataStreamConfig

async def main():
    config = AsyncSchematicConfig(
        use_datastream=True,
        datastream=DataStreamConfig(
            cache_ttl=300_000,  # 5 minutes, in ms
        ),
    )

    async with AsyncSchematic("YOUR_API_KEY", config) as client:
        is_enabled = await client.check_flag(
            "some-flag-key",
            company={"id": "your-company-id"},
            user={"id": "your-user-id"},
        )

asyncio.run(main())

Configuration Options

All fields live on DataStreamConfig.

Option Type Default Description
cache_ttl Optional[int] 24 hours Cache TTL in milliseconds. None means no expiration.
company_cache AsyncCacheProvider in-memory Cache for full company records.
company_lookup_cache AsyncCacheProvider in-memory Cache mapping company keys → company IDs.
user_cache AsyncCacheProvider in-memory Cache for full user records.
user_lookup_cache AsyncCacheProvider in-memory Cache mapping user keys → user IDs.
flag_cache AsyncCacheProvider in-memory Cache for flag rules.
replicator_mode bool False Enable Replicator Mode (see below).
replicator_health_url Optional[str] http://localhost:8090/ready Replicator health check URL.
replicator_health_check Optional[int] 30000 Health check interval in milliseconds.

Using Redis as a Shared Cache

The SDK ships with a RedisCache provider built on redis.asyncio. Pass a Redis client into the cache slots on DataStreamConfig to share state across multiple processes:

import asyncio
import redis.asyncio as aioredis
from schematic.cache import RedisCache
from schematic.client import AsyncSchematic, AsyncSchematicConfig, DataStreamConfig

async def main():
    redis_client = aioredis.from_url("redis://localhost:6379")
    cache_ttl_ms = 60 * 60 * 1000  # 1 hour

    config = AsyncSchematicConfig(
        use_datastream=True,
        datastream=DataStreamConfig(
            cache_ttl=cache_ttl_ms,
            company_cache=RedisCache(redis_client, default_ttl_ms=cache_ttl_ms),
            company_lookup_cache=RedisCache(redis_client, default_ttl_ms=cache_ttl_ms),
            user_cache=RedisCache(redis_client, default_ttl_ms=cache_ttl_ms),
            user_lookup_cache=RedisCache(redis_client, default_ttl_ms=cache_ttl_ms),
            flag_cache=RedisCache(redis_client, default_ttl_ms=cache_ttl_ms),
        ),
    )

    async with AsyncSchematic("YOUR_API_KEY", config) as client:
        await client.check_flag("some-flag-key", company={"id": "your-company-id"})

asyncio.run(main())

RedisCache accepts a prefix argument (default "schematic") if you need to namespace keys — this must match the prefix used by any other SDKs or the replicator writing to the same Redis instance.

Replicator Mode

When running the schematic-datastream-replicator service, configure the client to operate in Replicator Mode. The replicator holds the single WebSocket connection to Schematic and populates a shared cache; SDK instances read from that cache and evaluate flags locally without opening their own WebSocket connections.

Replicator Mode requires a shared cache (e.g. Redis) so the SDK can read data written by the external replicator process. Configure the cache slots on DataStreamConfig exactly as in the Redis example above.

How to Enable Replicator Mode

import asyncio
import redis.asyncio as aioredis
from schematic.cache import RedisCache
from schematic.client import AsyncSchematic, AsyncSchematicConfig, DataStreamConfig

async def main():
    redis_client = aioredis.from_url("redis://localhost:6379")

    config = AsyncSchematicConfig(
        use_datastream=True,
        datastream=DataStreamConfig(
            replicator_mode=True,
            cache_ttl=None,  # Match the replicator's unlimited default
            company_cache=RedisCache(redis_client),
            company_lookup_cache=RedisCache(redis_client),
            user_cache=RedisCache(redis_client),
            user_lookup_cache=RedisCache(redis_client),
            flag_cache=RedisCache(redis_client),
        ),
    )

    async with AsyncSchematic("YOUR_API_KEY", config) as client:
        is_enabled = await client.check_flag(
            "some-flag-key",
            company={"id": "your-company-id"},
        )

asyncio.run(main())

Cache TTL Configuration

Set the SDK's cache_ttl to match the replicator's cache TTL. The replicator defaults to an unlimited cache TTL. If the SDK uses a shorter TTL (the default is 24 hours), locally updated cache entries (e.g. after track events) will be written back with the shorter TTL and eventually evicted from the shared cache, even though the replicator originally set them with no expiration.

If you have configured a custom cache TTL on the replicator, use the same value here.

Advanced Configuration

The client automatically configures sensible defaults for Replicator Mode, but you can customize the health check endpoint and interval:

config = AsyncSchematicConfig(
    use_datastream=True,
    datastream=DataStreamConfig(
        replicator_mode=True,
        cache_ttl=None,
        replicator_health_url="http://my-replicator:8090/ready",
        replicator_health_check=60_000,  # 60 seconds, in ms
        # ... shared cache providers
    ),
)

Default Configuration

  • Replicator Health URL: http://localhost:8090/ready
  • Health Check Interval: 30 seconds
  • Cache TTL: 24 hours (SDK default; should be set to match the replicator's TTL, which defaults to unlimited)

When running in Replicator Mode, the client will:

  • Skip establishing WebSocket connections
  • Periodically check if the replicator service is ready
  • Use cached data populated by the external replicator service
  • Fall back to direct API calls if the replicator is not available

Webhook Verification

Schematic can send webhooks to notify your application of events. To ensure the security of these webhooks, Schematic signs each request using HMAC-SHA256. The Python SDK provides utility functions to verify these signatures.

Verifying Webhook Signatures

When your application receives a webhook request from Schematic, you should verify its signature to ensure it's authentic. The SDK provides simple functions to verify webhook signatures. Here's how to use them in different frameworks:

Flask

from flask import Flask, request, jsonify
from schematic.webhook_utils import verify_webhook_signature, WebhookSignatureError

app = Flask(__name__)

@app.route('/webhooks/schematic', methods=['POST'])
def schematic_webhook():
    try:
        # Each webhook has a distinct secret; you can access this via the Schematic app
        webhook_secret = "your-webhook-secret"

        # Verify the webhook signature
        verify_webhook_signature(request, webhook_secret)

        # Process the webhook payload
        data = request.json
        print(f"Webhook verified: {data}")

        return "", 200
    except WebhookSignatureError as e:
        print(f"Webhook verification failed: {str(e)}")
        return jsonify({"error": str(e)}), 400
    except Exception as e:
        print(f"Error processing webhook: {str(e)}")
        return jsonify({"error": "Internal server error"}), 500

if __name__ == '__main__':
    app.run(port=3000)

Django

from django.http import JsonResponse, HttpResponse
from django.views.decorators.csrf import csrf_exempt
from schematic.webhook_utils import verify_webhook_signature, WebhookSignatureError

@csrf_exempt
def schematic_webhook(request):
    if request.method != 'POST':
        return HttpResponse(status=405)

    try:
        # Each webhook has a distinct secret; you can access this via the Schematic app
        webhook_secret = "your-webhook-secret"

        # Verify the webhook signature
        verify_webhook_signature(request, webhook_secret)

        # Process the webhook payload
        data = request.json
        print(f"Webhook verified: {data}")

        return HttpResponse(status=200)
    except WebhookSignatureError as e:
        print(f"Webhook verification failed: {str(e)}")
        return JsonResponse({"error": str(e)}, status=400)
    except Exception as e:
        print(f"Error processing webhook: {str(e)}")
        return JsonResponse({"error": "Internal server error"}, status=500)

FastAPI

from fastapi import FastAPI, Request, Response, HTTPException, Depends
from schematic.webhook_utils import verify_webhook_signature, WebhookSignatureError

app = FastAPI()

async def verify_signature(request: Request):
    # Each webhook has a distinct secret; you can access this via the Schematic app
    webhook_secret = "your-webhook-secret"

    try:
        # Get the raw body
        body = await request.body()

        # Verify the webhook signature
        verify_webhook_signature(request, webhook_secret, body)
    except WebhookSignatureError as e:
        raise HTTPException(status_code=400, detail=str(e))

@app.post("/webhooks/schematic")
async def schematic_webhook(request: Request, _: None = Depends(verify_signature)):
    # Process the webhook payload
    data = await request.json()
    print(f"Webhook verified: {data}")

    return Response(status_code=200)

Verifying Signatures Manually

If you need to verify a webhook signature outside of the context of a web request, you can use the verify_signature function:

from schematic.webhook_utils import verify_signature, WebhookSignatureError

def verify_webhook_manually(body: str, signature: str, timestamp: str, secret: str):
    try:
        # Verify the signature
        verify_signature(body, signature, timestamp, secret)
        return True
    except WebhookSignatureError as e:
        print(f"Webhook verification failed: {str(e)}")
        return False

Advanced

Flag Check Options

By default, the client will do some local caching for flag checks. If you would like to change this behavior, you can do so using an initialization option to specify the max size of the cache (in terms of number of entries) and the max age of the cache (in milliseconds):

from schematic.client import LocalCache, Schematic

cache_size = 100
cache_ttl = 1000  # in milliseconds
config = SchematicConfig(
    cache_providers=[LocalCache[bool](cache_size, cache_ttl)],
)
client = Schematic("YOUR_API_KEY", config)

You can also disable local caching entirely; bear in mind that, in this case, every flag check will result in a network request:

from schematic.client import Schematic

config = SchematicConfig(cache_providers=[])
client = Schematic("YOUR_API_KEY", config)

You may want to specify default flag values for your application, which will be used if there is a service interruption or if the client is running in offline mode (see below):

from schematic.client import Schematic

config = SchematicConfig(flag_defaults={"some-flag-key": True})
client = Schematic("YOUR_API_KEY", config)

Offline Mode

In development or testing environments, you may want to avoid making network requests to the Schematic API. You can run Schematic in offline mode by specifying the offline option; in this case, it does not matter what API key you specify:

from schematic.client import Schematic

config = SchematicConfig(offline=True)
client = Schematic("", config)

Offline mode works well with flag defaults:

from schematic.client import Schematic

config = SchematicConfig(
    flag_defaults={"some-flag-key": True},
    offline=True,
)
client = Schematic("", config)
client.check_flag("some-flag-key") # Returns True

Timeouts

By default, requests time out after 60 seconds. You can configure this with a timeout option at the client or request level.

from schematic.client import Schematic

client = Schematic(
    # All timeouts are 20 seconds
    timeout=20.0,
)

# Override timeout for a specific method
client.companies.get_company(..., {
    timeout_in_seconds=20.0
})

Retries

The SDK is instrumented with automatic retries with exponential backoff. A request will be retried as long as the request is deemed retriable and the number of retry attempts has not grown larger than the configured retry limit (default: 2).

A request is deemed retriable when any of the following HTTP status codes is returned:

  • 408 (Timeout)
  • 429 (Too Many Requests)
  • 5XX (Internal Server Errors)

Use the max_retries request option to configure this behavior.

# Override timeout for a specific method
client.companies.get_company(..., {
    max_retries=1 # Only retry once on failure
})

Custom HTTP client

You can override the httpx client to customize it for your use-case. Some common use-cases include support for proxies and transports.

import httpx

from schematic.client import Schematic

client = Schematic(
    http_client=httpx.Client(
        proxies="http://my.test.proxy.example.com",
        transport=httpx.HTTPTransport(local_address="0.0.0.0"),
    ),
)

Beta Status

This SDK is in Preview, and there may be breaking changes between versions without a major version update.

To ensure a reproducible environment (and minimize risk of breaking changes), we recommend pinning a specific package version.

Contributing

While we value open-source contributions to this SDK, this library is generated programmatically. Additions made directly to this library would have to be moved over to our generation code, otherwise they would be overwritten upon the next generated release. Feel free to open a PR as a proof of concept, but know that we will not be able to merge it as-is. We suggest opening an issue first to discuss with us!

On the other hand, contributions to the README are always very welcome!

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

schematichq-1.2.0.tar.gz (453.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

schematichq-1.2.0-py3-none-any.whl (828.3 kB view details)

Uploaded Python 3

File details

Details for the file schematichq-1.2.0.tar.gz.

File metadata

  • Download URL: schematichq-1.2.0.tar.gz
  • Upload date:
  • Size: 453.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.5.1 CPython/3.9.25 Linux/6.17.0-1010-azure

File hashes

Hashes for schematichq-1.2.0.tar.gz
Algorithm Hash digest
SHA256 34fbeff6df689d81d0e4a96a9dc8360c2fabce32c9d2523aa13ab75e61aae9ec
MD5 d76e7b31e27b1a0c92039c24dac4f385
BLAKE2b-256 66d5fd7e86681a92927e13059f707691da7b98de1825d1dbd2cfb30df3a30fcd

See more details on using hashes here.

File details

Details for the file schematichq-1.2.0-py3-none-any.whl.

File metadata

  • Download URL: schematichq-1.2.0-py3-none-any.whl
  • Upload date:
  • Size: 828.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.5.1 CPython/3.9.25 Linux/6.17.0-1010-azure

File hashes

Hashes for schematichq-1.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 6ecfa58aecdc31903da684b506e05231abab9320a660284c119b2719f8699c0a
MD5 18fee3fd26f8d3bc0f82999ae5855e08
BLAKE2b-256 cb7c7d954a022abc8970f1f8185c95e134ee1a617162a81571b8fa75b6d0c690

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page