Skip to main content

Python SDK for aepipe — multi-tenant log ingestion and query

Project description

aepipe-sdk-python

Python SDK for aepipe, a multi-tenant log ingestion and query service built on Cloudflare Workers Analytics Engine.

Zero external dependencies. Python 3.10+.

Installation

pip install aepipe-sdk

Quick Start

from aepipe import Aepipe

client = Aepipe(
    base_url="https://aepipe.yourdomain.com",
    token="your-admin-token",
)

API Reference

ingest(project, logstore, points) -> IngestResult

Write structured event points to Analytics Engine.

Parameter Type Description
project str Project name (^[a-zA-Z0-9_-]{1,64}$)
logstore str Logstore name (^[a-zA-Z0-9_-]{1,64}$)
points list[DataPoint] Event points (max 250 per call)
from aepipe import DataPoint

result = client.ingest("myapp", "backend", [
    DataPoint(event="user_login"),
    DataPoint(event="api_error", level="error", blobs=["GET /api"], doubles=[1.23]),
    # With D1 payload — large data stored separately, linked via UUID
    DataPoint(
        event="unhandled_exception",
        level="error",
        payload={"stack": "Error: ...\n  at ...", "request": {"url": "/api/users"}},
        ttl=604800,  # 7 days
    ),
])
print(result.ok)       # True
print(result.written)  # 3

log(project, logstore, logs) -> LogResult

Write raw log entries via Workers Observability.

Parameter Type Description
project str Project name
logstore str Logstore name
logs list[LogEntry] Log entries (max 250 per call)
from aepipe import LogEntry

result = client.log("myapp", "backend", [
    LogEntry(message="server started"),
    LogEntry(message="connection timeout", level="error", extra={"ip": "1.2.3.4"}),
])
print(result.written)  # 2

query(project, logstore, sql) -> QueryResult

Run a SQL query against Analytics Engine. Tenant filters are injected automatically.

Parameter Type Description
project str Project name
logstore str Logstore name
sql str SQL query (Analytics Engine dialect)
result = client.query("myapp", "backend", "SELECT count() as cnt FROM aepipe")
print(result.data)

rawlog(project, logstore, *, limit=50, start=None, end=None) -> RawLogResult

Query raw Worker logs via Cloudflare Telemetry API.

Parameter Type Default Description
project str Project name
logstore str Logstore name
limit int 50 Max results (server caps at 200)
start str or None None Start timestamp (ISO 8601)
end str or None None End timestamp (ISO 8601)
result = client.rawlog("myapp", "backend", limit=100, start="2025-01-01T00:00:00Z")
for entry in result.logs:
    print(f"[{entry.level}] {entry.timestamp} {entry.data}")
print(f"total: {result.count}")

detail(project, logstore, ref_ids) -> DetailResult

Fetch extended payloads from D1 by ref_id. Requires D1 binding on the server.

Parameter Type Description
project str Project name
logstore str Logstore name
ref_ids list[str] UUID references from AE blob5 (max 100)
# 1. Query AE to get ref_ids
ae_result = client.query("myapp", "backend",
    "SELECT blob5 as ref_id, blob3 as event FROM aepipe WHERE blob5 != '' LIMIT 10"
)

# 2. Fetch full payloads from D1
ref_ids = [r["ref_id"] for r in ae_result.data.get("data", [])]
details = client.detail("myapp", "backend", ref_ids)
for entry in details.results:
    print(entry.ref_id, entry.payload)  # {"stack": "...", "request": {...}}

list_projects() -> ListResult

List all projects that have written data.

result = client.list_projects()
print(result.items)  # ["myapp", "analytics", ...]

list_logstores(project) -> ListResult

List all logstores within a project.

result = client.list_logstores("myapp")
print(result.items)  # ["backend", "frontend", ...]

Data Types

DataPoint

Field Type Default AE Mapping Description
event str required blob3 Event name (non-empty)
level str "info" blob4 Log level
blobs list[str] [] blob6 – blob20 String metadata for grouping/filtering (max 15 items)
doubles list[float] [] double1 – double20 Numeric metrics for aggregation (max 20 items)
payload dict[str, Any] None D1 (via blob5 ref_id) Extended data stored in D1. Requires D1 binding on server
ttl int 7776000 Payload TTL in seconds (default: 90 days)

blobs vs doubles — how to choose?

  • blobs (string): Used for grouping (GROUP BY) and filtering (WHERE). Not aggregatable. Examples: URL path, user ID, region, error message.
  • doubles (number): 64-bit IEEE 754 floats. Can be aggregated with SUM, AVG, QUANTILE, MIN, MAX. Examples: response time, request size, error count.

Cloudflare Analytics Engine Limits

These are hard limits enforced by Cloudflare. Exceeding blob size limits causes silent data truncation — the write succeeds but data is lost, making it impossible to observe anomalies. The SDK validates these limits client-side and raises ValidationError before sending.

Limit Value Notes
Blobs per data point 20 total 5 used by system (project, logstore, event, level, ref_id), 15 available to user
Doubles per data point 20 All available to user
Total blob size per data point 16 KB Sum of all blob byte lengths (UTF-8), including system blobs. Exceeding = silent truncation
Index size 96 bytes Index = {project}/{logstore}, auto-managed
Data points per request 250 Per ingest() / log() call
Data retention 3 months Data auto-expires after 3 months

Tip: If your data is large (e.g., full stack traces, request dumps), use the payload field instead of blobs. Payloads are stored in Cloudflare D1 with no 16 KB limit, linked to AE via a UUID in blob5.

See: Cloudflare Analytics Engine Limits

LogEntry

Field Type Default Description
message str required Log message (non-empty)
level str "info" Log level
extra dict[str, Any] {} Additional fields

Result Types

Type Fields
IngestResult ok: bool, written: int
LogResult ok: bool, written: int
QueryResult data: Any
DetailResult results: list[DetailEntry]
DetailEntry ref_id: str, payload: dict, created_at: int, expires_at: int
RawLogResult logs: list[RawLogEntry], count: int
RawLogEntry timestamp: str, level: str, data: Any
ListResult items: list[str]

Validation

All methods validate inputs before making network requests:

  • Name format: project and logstore names must match ^[a-zA-Z0-9_-]{1,64}$.
  • Batch size: ingest() and log() accept at most 250 items per call.

Invalid inputs raise ValidationError (subclass of ValueError).

Error Handling

from aepipe import AepipeError, ValidationError

try:
    client.ingest("myapp", "backend", points)
except ValidationError as e:
    # Client-side validation failed
    print(f"invalid input: {e}")
except AepipeError as e:
    # Server returned an error
    print(f"API error {e.status}: {e.message}")
Error Class Parent Fields When
AepipeError Exception status, message Server returns non-2xx
ValidationError ValueError message Invalid client input

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

aepipe_sdk-0.1.1.tar.gz (12.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

aepipe_sdk-0.1.1-py3-none-any.whl (8.7 kB view details)

Uploaded Python 3

File details

Details for the file aepipe_sdk-0.1.1.tar.gz.

File metadata

  • Download URL: aepipe_sdk-0.1.1.tar.gz
  • Upload date:
  • Size: 12.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.5

File hashes

Hashes for aepipe_sdk-0.1.1.tar.gz
Algorithm Hash digest
SHA256 0fcf32cb3b9e94f01d3f29bc85d22fe6b88590eb0e0e9afa12a122ce865f138a
MD5 b3e0f25381418dc0d69bccdfada52000
BLAKE2b-256 6882d417e828859de12619eb87b4056e5c8f8af317da94e3cd323691e4b0f34b

See more details on using hashes here.

File details

Details for the file aepipe_sdk-0.1.1-py3-none-any.whl.

File metadata

  • Download URL: aepipe_sdk-0.1.1-py3-none-any.whl
  • Upload date:
  • Size: 8.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.5

File hashes

Hashes for aepipe_sdk-0.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 05baeaee07b3427b6a90369c1caca52a1ef27bc85fe98e938adcfed312e8801e
MD5 4b271677ebfe0c79fb22255c3edce4f0
BLAKE2b-256 49a667a7acb1c110c4371ecaddb463a2c89cd21b7e5d554713ff4ff694ed3f4c

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page