Skip to main content

Apiphany is a declarative API orchestration engine. It enables developers to define complex API workflows—including authentication, pagination, chained requests, rate limiting, state tracking, and response extraction—through a fully validated configuration model. Powered by `httpx` and `asyncio`, Apiphany executes API workflows concurrently and seamlessly unpacks deeply nested JSON blobs into structured Pandas DataFrames or SQL tables.

Project description

Apiphany

Apiphany is a declarative API orchestration engine. It enables developers to define complex API workflows—including authentication, pagination, chained requests, rate limiting, state tracking, and response extraction—through a fully validated configuration model. Powered by httpx and asyncio, Apiphany executes API workflows concurrently and seamlessly unpacks deeply nested JSON blobs into structured Pandas DataFrames or SQL tables.

Features

  • Asynchronous Concurrency: Built entirely on httpx and asyncio, the engine fires hundreds of concurrent requests simultaneously with minimal memory overhead.
  • Strict Rate Limiting: Built-in aiolimiter safely paces outgoing requests to ensure you never exceed API quotas, strictly adhering to your configured requests_per_second.
  • Smart Throttling: Dynamic rate-limit header parsing (e.g., x-ratelimit-reset) completely prevents 429 errors by pausing exactly until the reset window opens.
  • Automated OAuth2: Automatically intercepts 401 Unauthorized responses, locks the thread, performs a token exchange, and retries the failed request seamlessly.
  • Chained Requests: Feed the extracted outputs of one API call as parameters into another dynamically (e.g., fetch Users -> Posts -> Comments).
  • Deep Extraction: Native integration with json_extract_pandas to extract, unpack, and normalize nested JSON responses into clean DataFrames.
  • Incremental State Tracking: Native state.json watermarking. Works locally or via S3 (s3://bucket/state.json) to persist the latest timestamps or IDs fetched.
  • Secrets Management: Dynamically fetch client credentials or API keys directly from AWS Secrets Manager using ARNs.
  • Circuit Breakers: System resilience mechanism that automatically opens the circuit and fails fast if downstream servers continuously return 5xx errors, protecting your pipeline.
  • Cloud-Agnostic Logging: Emits pure JSON logs using python-json-logger natively compatible with AWS CloudWatch and Datadog.

Requirements

  • Python 3.8+
  • httpx
  • aiolimiter
  • pydantic
  • python-json-logger

(Optional based on cloud providers)

  • boto3 (for S3 exports, state management, and AWS Secrets Manager)
  • pandas & sqlalchemy (for SQL database exports)

Installation & Usage

Install the core package:

pip install apiphany

To install with AWS capabilities (S3 exports and AWS Secrets Manager) or SQL export capabilities:

pip install apiphany[aws]
pip install apiphany[sql]

Python API

Import APIOrchestrator directly into your data pipeline, Lambda function, or test suite:

import asyncio
from apiphany import APIOrchestrator

# 1. Initialize the Client
client = APIOrchestrator(
    config_file="apiphany_config.json", 
    entity_name="jsonplaceholder"
)

# 2. Execute an API asynchronously!
# output_format can be "raw" (list of JSON dicts) or "flattened" (Pandas DataFrame)
df = asyncio.run(client.execute(
    api_identifier="get_users", 
    output_format="flattened"
))

print(df.head())

Configuration Schema Details

Apiphany operates entirely off a declarative JSON configuration file (apiphany_config.json). Every attribute is strictly validated at runtime by pydantic.

1. Root Entity Structure

The top level defines logical groups of APIs (Entities), global credentials, and rate limit rules.

  • entity_name (string): The logical name of the API provider (e.g., "stripe", "salesforce").
  • post_process (string): (Optional) Filepath to a custom Python script to mutate raw JSON before extraction.
  • certificates (dict): (Optional) Must contain "cert" and "key" paths. Supports local or s3:// URIs for mTLS.
  • client_credentials (dict): Global dictionary for Secrets. Supports templating ({{variable}}). If you provide "secrets_manager_arn", Apiphany resolves it directly from AWS Secrets Manager.
  • retry_config (dict): Contains total_retries (default: 3) and requests_per_second (default: 10).
  • api_list (list): The core array of APISchema endpoint objects.

Example:

{
    "api_config": [
        {
            "entity_name": "stripe",
            "client_credentials": {
                "secrets_manager_arn": "arn:aws:secretsmanager:us-east-1:123:secret:stripe-keys"
            },
            "retry_config": {
                "total_retries": 5,
                "requests_per_second": 20
            },
            "api_list": [ ... ]
        }
    ]
}

2. API Object (api_list)

Each object inside api_list defines a single executable endpoint.

  • api_identifier (string): Unique ID used to execute the endpoint via .execute(api_identifier="my_api").
  • api_name (string): Human-readable description.
  • method (string): HTTP Method ("GET", "POST", "PUT", etc.).
  • url (string): The URL. Supports templating from parent outputs (e.g., https://api.com/users/{{id}}).
  • auth_type (string): Authentication type: "None", "Bearer", "Basic", or "APIKey".
  • api_key_name (string): If auth_type is "APIKey", the header key name (default: "x-api-key").
  • headers (dict): Static HTTP headers to inject.
  • query_params (dict): Static URL parameters to inject. Supports templating.
  • payload (dict): JSON body payload for POST/PUT requests.
  • graphql_query (string): If supplied, forces a POST request and wraps the string as {"query": "..."}. Supports templating.

Example:

{
    "api_identifier": "create_user",
    "api_name": "Create a new User",
    "method": "POST",
    "url": "https://api.example.com/users",
    "auth_type": "Bearer",
    "headers": {
        "Content-Type": "application/json"
    },
    "payload": {
        "name": "Jane",
        "role": "admin"
    }
}

3. Pagination Configuration

Apiphany automatically scrolls through pages until the payload is empty.

  • type (string): "page_based" or "offset_based".
  • page_key (string): Parameter name for page number (default: "page").
  • size_key (string): Parameter name for page size (default: "limit").
  • page_size (int): Number of records to request per page (default: 100).
  • offset_key (string): Parameter name for offset (default: "offset").
  • limit_key (string): Parameter name for limit (default: "limit").
  • limit_value (int): Offset increment amount (default: 100).
  • stop_condition (string): Defines when to stop. "no_data" stops when returned array length < limit.

Example (Offset Based):

"pagination": {
    "type": "offset_based",
    "offset_key": "start",
    "limit_key": "limit",
    "limit_value": 500
}

cursor_based Pagination

For APIs that return a dynamic cursor or token for the next page.

"pagination": {
    "type": "cursor_based",
    "cursor_path": "meta.pagination.next_token",
    "cursor_query_key": "continuation_token"
}
  • cursor_path: The dot-notation path inside the API's raw JSON response to find the next cursor (e.g., "paging.cursors.after").
  • cursor_query_key: The URL query parameter where the extracted cursor should be injected on the subsequent request (default: "cursor").

4. Chained Requests

Dynamically feeds the output of this API into another child API.

  • child_api_identifier (string): The api_identifier of the next endpoint to trigger.
  • key_mapping (dict): Maps parent JSON keys to child URL template parameters (e.g., {"id": "userId"}).
  • max_concurrent_requests (int): Maximum parallel threads hitting the child API (default: 5).
  • batch_size (int): If > 1, combines parent values into comma-separated lists (e.g. 1,2,3,4,5).

Example (Batched Child Resolution):

"chained_request": {
    "child_api_identifier": "get_user_posts",
    "key_mapping": {
        "id": "userIds"
    },
    "max_concurrent_requests": 10,
    "batch_size": 20
}

If 100 IDs are fetched, it will group them into 5 concurrent requests, injecting ?userIds=1,2,3...20 into the child URL.


5. Data Extraction (extractor_config)

Define how nested API JSON responses should be flattened into Pandas DataFrames. Powered by json_extract_pandas.

  • response_data_extractor (string): Target JSON path to pluck the core array from the root response (e.g. "data.results").
  • json_extract_config.record_path (list): Path to the nested array within the row to explode into multiple rows (e.g., ["line_items"]).
  • json_extract_config.meta (list): Fields to duplicate across every exploded row (e.g., ["transaction_id", ["user", "name"]]).
  • json_extract_config.errors (string): How to handle missing keys ("raise" or "ignore").

Example:

"extractor_config": {
    "response_data_extractor": "data.results",
    "json_extract_config": {
        "record_path": [
            "line_items"
        ],
        "meta": [
            "transaction_id",
            "timestamp",
            ["user", "name"]
        ]
    }
}

The above will flatten an array of line_items while ensuring transaction_id, timestamp, and user.name are attached as columns to every item's row.


6. Export Configuration

Defines where Apiphany automatically saves the final dataset when execution finishes.

  • save_output (bool): Master toggle to enable saving (default: true).
  • output_type (string): "flattened" (Pandas CSV/JSON/SQL) or "raw" (Pure JSON dicts).
  • target.type (string): Destination: "file" or "sql".
  • target.location (string): Local filepath ("out.csv"), S3 URI ("s3://bucket/out.csv"), or SQL URI ("sqlite:///db.sqlite").
  • target.table_name (string): (SQL Only) Name of the target database table.
  • target.if_exists (string): (SQL Only) Behavior if table exists ("append", "replace", "fail").

Example (Export to SQL):

"export_config": {
    "save_output": true,
    "output_type": "flattened",
    "target": {
        "type": "sql",
        "location": "sqlite:///my_db.db",
        "table_name": "transactions",
        "if_exists": "append"
    }
}

7. State Tracking (Incremental Syncs)

Saves the highest watermark detected in a payload to skip historical downloads on the next run.

  • incremental_key (string): The column to evaluate for the highest watermark (e.g., "updated_at", "id").

Example:

"state_tracking": {
    "incremental_key": "updated_at"
}

When you run Apiphany, it scans the DataFrame for the highest updated_at and saves it. On your next run, you can inject it directly into the URL by putting {{state_updated_at}} in your query_params.


8. Advanced Resilience & Security

Apiphany features advanced architectural upgrades to ensure long-running extractions survive hostile API conditions.

Smart Throttling (rate_limit_config)

Dynamically parses rate limit headers and pauses the asynchronous loop to completely prevent 429 Too Many Requests errors.

"rate_limit_config": {
    "remaining_header": "x-ratelimit-remaining",
    "reset_header": "x-ratelimit-reset"
}

Automated OAuth2 Exchange (oauth2_config)

Automatically intercepts 401 Unauthorized responses, fetches a new token, and seamlessly retries the failed request. Note: Ensure auth_type is set to "Bearer".

"oauth2_config": {
    "token_url": "https://api.example.com/oauth/token",
    "client_id_key": "client_id",
    "client_secret_key": "client_secret",
    "grant_type": "client_credentials"
}

Circuit Breakers (circuit_breaker)

Tracks consecutive server failures (5xx). If the threshold is crossed, the circuit "opens" and prevents any further requests from being made for the timeout duration, raising a CircuitBreakerOpenException.

"circuit_breaker": {
    "failure_threshold": 10,
    "recovery_timeout_seconds": 300
}

Full End-to-End Example Config

Here is a complete example demonstrating how Apiphany can fetch a list of Users, automatically extract the id from each user, and concurrently fetch all the Posts authored by those users using batched Chained Requests.

{
    "api_config": [
        {
            "entity_name": "jsonplaceholder",
            "client_credentials": {},
            "retry_config": {
                "total_retries": 3,
                "requests_per_second": 10
            },
            "api_list": [
                {
                    "api_identifier": "get_users",
                    "api_name": "Fetch Users",
                    "method": "GET",
                    "url": "https://jsonplaceholder.typicode.com/users",
                    "auth_type": "None",
                    "chained_request": {
                        "child_api_identifier": "get_user_posts",
                        "key_mapping": {
                            "id": "userId"
                        },
                        "batch_size": 5,
                        "max_concurrent_requests": 10
                    },
                    "export_config": {
                        "save_output": true,
                        "output_type": "flattened",
                        "target": {
                            "type": "file",
                            "location": "users.csv"
                        }
                    }
                },
                {
                    "api_identifier": "get_user_posts",
                    "api_name": "Fetch Posts for User",
                    "method": "GET",
                    "url": "https://jsonplaceholder.typicode.com/posts",
                    "query_params": {
                        "userId": "{{userId}}"
                    },
                    "auth_type": "None",
                    "export_config": {
                        "save_output": true,
                        "output_type": "flattened",
                        "target": {
                            "type": "file",
                            "location": "posts.csv"
                        }
                    }
                }
            ]
        }
    ]
}

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

apiphany-0.1.2.tar.gz (31.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

apiphany-0.1.2-py3-none-any.whl (30.0 kB view details)

Uploaded Python 3

File details

Details for the file apiphany-0.1.2.tar.gz.

File metadata

  • Download URL: apiphany-0.1.2.tar.gz
  • Upload date:
  • Size: 31.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.7

File hashes

Hashes for apiphany-0.1.2.tar.gz
Algorithm Hash digest
SHA256 b5e22f2408d21693dfbab2a18fbfed6b161ef9a2f3baa99600cb5e7dcca565fe
MD5 94657e482887cabf1850a95e5f8bc22c
BLAKE2b-256 0d0cb5196e67b48d6cd027a0ee02444e4727fa62e0c352e5c6f075b80362f400

See more details on using hashes here.

File details

Details for the file apiphany-0.1.2-py3-none-any.whl.

File metadata

  • Download URL: apiphany-0.1.2-py3-none-any.whl
  • Upload date:
  • Size: 30.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.7

File hashes

Hashes for apiphany-0.1.2-py3-none-any.whl
Algorithm Hash digest
SHA256 0a15286988fd7927a964255ceccf6f82de5a7dc1126a69eb3d595369ef3d3ff5
MD5 89be42af8b6cba6370ff4a18ba2caaba
BLAKE2b-256 d5185f8e8ef19da89f9fd3a19b771c832cbcef27600f0fcc69d8f299f3949f54

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page