The official Python client library for the Posthook API
Project description
posthook
The official Python client library for the Posthook API -- schedule webhooks and deliver them reliably.
Installation
pip install posthook-python
Requirements: Python 3.9+. Dependencies: httpx and websockets.
Quick Start
import posthook
client = posthook.Posthook("pk_...")
# Schedule a webhook 5 minutes from now
hook = client.hooks.schedule(
path="/webhooks/user-created",
post_in="5m",
data={"userId": "123", "event": "user.created"},
)
print(hook.id) # UUID
print(hook.status) # "pending"
How It Works
Your Posthook project has a domain configured in the dashboard (e.g., webhook.example.com). When you schedule a hook, you specify a path (e.g., /webhooks/user-created). At the scheduled time, Posthook delivers the hook by POSTing to the full URL (https://webhook.example.com/webhooks/user-created) with your data payload and signature headers.
Authentication
You can find your API key under Project Settings in the Posthook dashboard. Pass it directly to the constructor, or set the POSTHOOK_API_KEY environment variable:
# Explicit API key
client = posthook.Posthook("pk_...")
# From environment variable
client = posthook.Posthook() # reads POSTHOOK_API_KEY
For webhook signature verification, also provide a signing key:
client = posthook.Posthook("pk_...", signing_key="ph_sk_...")
The signing key can also be set via the POSTHOOK_SIGNING_KEY environment variable.
Scheduling Hooks
Three mutually exclusive scheduling modes are available. You must provide exactly one of post_in, post_at, or post_at_local.
Relative delay (post_in)
Schedule after a relative delay. Accepts s (seconds), m (minutes), h (hours), or d (days):
hook = client.hooks.schedule(
path="/webhooks/send-reminder",
post_in="30m",
data={"userId": "123"},
)
Absolute UTC time (post_at)
Schedule at an exact UTC time. Accepts datetime objects or RFC 3339 strings:
from datetime import datetime, timedelta, timezone
# Using a datetime object (automatically converted to UTC)
hook = client.hooks.schedule(
path="/webhooks/send-reminder",
post_at=datetime.now(timezone.utc) + timedelta(hours=1),
data={"userId": "123"},
)
# Using an RFC 3339 string
hook = client.hooks.schedule(
path="/webhooks/send-reminder",
post_at="2026-06-15T10:00:00Z",
data={"userId": "123"},
)
Local time with timezone (post_at_local)
Schedule at a local time. Posthook handles DST transitions automatically:
hook = client.hooks.schedule(
path="/webhooks/daily-digest",
post_at_local="2026-03-01T09:00:00",
timezone="America/New_York",
data={"userId": "123"},
)
Custom retry configuration
Override your project's default retry behavior for a specific hook:
hook = client.hooks.schedule(
path="/webhooks/critical",
post_in="1m",
data={"orderId": "456"},
retry_override=posthook.HookRetryOverride(
min_retries=10,
delay_secs=15,
strategy="exponential",
backoff_factor=2.0,
max_delay_secs=3600,
jitter=True,
),
)
Managing Hooks
Get a hook
hook = client.hooks.get("hook-uuid")
List hooks
hooks = client.hooks.list(status=posthook.STATUS_FAILED, limit=50)
print(f"Found {len(hooks)} hooks")
All list parameters are optional:
| Parameter | Description |
|---|---|
status |
Filter by status: "pending", "retry", "completed", "failed" |
limit |
Max results per page |
sort_by |
Sort field (e.g., "createdAt", "postAt") |
sort_order |
"ASC" or "DESC" |
post_at_before |
Filter hooks scheduled before this time (ISO string) |
post_at_after |
Cursor: hooks scheduled after this time (ISO string) |
created_at_before |
Filter hooks created before this time (ISO string) |
created_at_after |
Filter hooks created after this time (ISO string) |
Cursor-based pagination
Use post_at_after as a cursor. After each page, advance it to the last hook's post_at:
limit = 100
cursor = None
while True:
hooks = client.hooks.list(status="failed", limit=limit, post_at_after=cursor)
for hook in hooks:
print(hook.id, hook.failure_error)
if len(hooks) < limit:
break # last page
cursor = hooks[-1].post_at.isoformat()
Auto-paginating iterator (list_all)
For convenience, list_all yields every matching hook across all pages automatically:
for hook in client.hooks.list_all(status="failed"):
process(hook)
The async client returns an async iterator:
async for hook in client.hooks.list_all(status="failed"):
await process(hook)
Delete a hook
To cancel a pending hook, delete it before delivery. Idempotent -- returns None on both 200 (deleted) and 404 (already deleted):
client.hooks.delete("hook-uuid")
Bulk Operations
Three bulk operations are available, each supporting by-IDs or by-filter:
- Retry -- Re-attempts delivery for failed hooks
- Replay -- Re-delivers completed hooks (useful for reprocessing)
- Cancel -- Cancels pending hooks before delivery
By IDs
result = client.hooks.bulk.retry(["id-1", "id-2", "id-3"])
print(f"Retried {result.affected} hooks")
By filter
result = client.hooks.bulk.cancel_by_filter(
start_time="2026-02-01T00:00:00Z",
end_time="2026-02-22T00:00:00Z",
limit=500,
endpoint_key="/webhooks/deprecated",
)
print(f"Cancelled {result.affected} hooks")
All six methods:
# By IDs
client.hooks.bulk.retry(hook_ids)
client.hooks.bulk.replay(hook_ids)
client.hooks.bulk.cancel(hook_ids)
# By filter
client.hooks.bulk.retry_by_filter(start_time, end_time, limit, ...)
client.hooks.bulk.replay_by_filter(start_time, end_time, limit, ...)
client.hooks.bulk.cancel_by_filter(start_time, end_time, limit, ...)
Filter methods also accept optional endpoint_key and sequence_id keyword arguments.
Verifying Webhook Signatures
When Posthook delivers a hook to your endpoint, it includes signature headers for verification. Use parse_delivery to verify and parse the delivery.
Important: You must pass the raw request body (bytes or string), not a parsed JSON object.
Flask
from flask import Flask, request
import posthook
app = Flask(__name__)
client = posthook.Posthook("pk_...", signing_key="ph_sk_...")
@app.route("/webhooks/user-created", methods=["POST"])
def handle_webhook():
try:
delivery = client.signatures.parse_delivery(
body=request.get_data(),
headers=dict(request.headers),
)
except posthook.SignatureVerificationError:
return "invalid signature", 401
print(delivery.hook_id) # from Posthook-Id header
print(delivery.path) # "/webhooks/user-created"
print(delivery.data) # your custom data payload
print(delivery.post_at) # when it was scheduled
print(delivery.posted_at) # when it was delivered
return "", 200
Django
from django.http import HttpResponse
import posthook
client = posthook.Posthook("pk_...", signing_key="ph_sk_...")
def handle_webhook(request):
try:
delivery = client.signatures.parse_delivery(
body=request.body,
headers=dict(request.headers),
)
except posthook.SignatureVerificationError:
return HttpResponse(status=401)
print(delivery.hook_id)
print(delivery.data)
return HttpResponse(status=200)
FastAPI
from fastapi import FastAPI, Request, Response
import posthook
app = FastAPI()
client = posthook.Posthook("pk_...", signing_key="ph_sk_...")
@app.post("/webhooks/user-created")
async def handle_webhook(request: Request):
body = await request.body()
try:
delivery = client.signatures.parse_delivery(
body=body,
headers=dict(request.headers),
)
except posthook.SignatureVerificationError:
return Response(status_code=401)
print(delivery.hook_id)
print(delivery.data)
return Response(status_code=200)
Custom tolerance
By default, signatures older than 5 minutes are rejected. You can override this:
delivery = client.signatures.parse_delivery(
body=raw_body,
headers=headers,
tolerance=600, # 10 minutes, in seconds
)
WebSocket Listener
The WebSocket listener receives hooks in real time without running an HTTP server. Use AsyncPosthook and call hooks.listen() with an async handler:
import asyncio
import posthook
async def main():
async with posthook.AsyncPosthook("pk_...") as client:
async def on_hook(delivery):
print(f"Received: {delivery.hook_id} -> {delivery.path}")
print(f"Data: {delivery.data}")
return posthook.Result.ack()
listener = await client.hooks.listen(
on_hook,
on_connected=lambda info: print(f"Connected: {info.project_name}"),
)
await listener.wait() # Blocks until closed
asyncio.run(main())
Result types
Your handler must return a Result:
| Factory | Effect |
|---|---|
Result.ack() |
Processing complete — hook is marked as delivered immediately |
Result.nack(error?) |
Reject — triggers retry according to project settings |
Result.accept(timeout) |
Async — you have timeout seconds to call back via HTTP (see below) |
return posthook.Result.ack()
return posthook.Result.nack("processing failed")
return posthook.Result.accept(timeout=120)
Async processing with accept
Use accept when your handler needs more time than the 10-second ack window.
After returning accept, POST to the callback URLs on the delivery to report
the outcome:
async def on_hook(delivery):
# Kick off background work, save the callback URLs
await queue.enqueue("process", {
"data": delivery.data,
"ack_url": delivery.ack_url,
"nack_url": delivery.nack_url,
})
return posthook.Result.accept(timeout=300) # 5 minutes to call back
# Later, in the background worker:
await posthook.async_ack(job["ack_url"])
# or on failure:
await posthook.async_nack(job["nack_url"], {"error": "failed"})
If neither URL is called before the deadline, the hook is retried.
Concurrency
By default, handlers run with unlimited concurrency (matching HTTP delivery behavior). Set max_concurrency to limit parallel handlers — deliveries that arrive while at capacity are nacked immediately so the server can retry them:
listener = await client.hooks.listen(on_hook, max_concurrency=10)
Lifecycle callbacks
listener = await client.hooks.listen(
on_hook,
on_connected=lambda info: print(f"Connected: {info.connection_id}"),
on_disconnected=lambda err: print(f"Disconnected: {err}"),
on_reconnecting=lambda attempt: print(f"Reconnecting (attempt {attempt})"),
)
Stream API
For manual control over ack/nack, use hooks.stream() which returns an async iterator:
async with posthook.AsyncPosthook("pk_...") as client:
async with await client.hooks.stream() as stream:
async for delivery in stream:
print(delivery.hook_id, delivery.data)
if should_process(delivery):
await stream.ack(delivery.hook_id)
else:
await stream.nack(delivery.hook_id, "not ready")
HTTP fallback
If your project has a domain configured, hooks are delivered via HTTP when no
WebSocket listener is connected. You can run both an HTTP endpoint and a
WebSocket listener — the server uses WebSocket when available and falls back to
HTTP automatically. Since both paths use the same Result type, you can share
your handler logic:
async def process_hook(delivery):
await process_order(delivery.data)
return posthook.Result.ack()
# HTTP delivery (ASGI endpoint)
app = signatures.asgi_handler(process_hook)
# WebSocket delivery (runs alongside)
listener = await client.hooks.listen(process_hook)
WebSocket delivery metadata
Deliveries received via WebSocket include a ws field with attempt info:
async def on_hook(delivery):
if delivery.ws:
print(f"Attempt {delivery.ws.attempt}/{delivery.ws.max_attempts}")
if delivery.ws.forward_request:
print(f"Original body: {delivery.ws.forward_request.body}")
return posthook.Result.ack()
ASGI/WSGI Handlers
For quick integration without a full web framework, SignaturesService provides handler wrappers:
ASGI
import posthook
signatures = posthook.create_signatures("ph_sk_...")
async def on_hook(delivery):
print(delivery.data)
return posthook.Result.ack()
# Mount as an ASGI endpoint (e.g. with uvicorn)
app = signatures.asgi_handler(on_hook)
WSGI
import posthook
signatures = posthook.create_signatures("ph_sk_...")
def on_hook(delivery):
print(delivery.data)
return posthook.Result.ack()
# Mount as a WSGI endpoint (e.g. with gunicorn)
app = signatures.wsgi_handler(on_hook)
Async Hooks
When async hooks are enabled, parse_delivery() populates ack_url and nack_url on the delivery object. Return 202 from your handler and call back when processing completes.
FastAPI
from fastapi import FastAPI, Request, BackgroundTasks, HTTPException
from fastapi.responses import Response
import posthook
app = FastAPI()
client = posthook.Posthook("pk_...", signing_key="ph_sk_...")
async def process_and_ack(delivery):
try:
await process_video(delivery.data["video_id"])
result = await posthook.async_ack(delivery.ack_url)
print(f"Applied: {result.applied}")
except Exception as e:
await posthook.async_nack(delivery.nack_url, {"error": str(e)})
@app.post("/webhooks/process-video")
async def handle_webhook(request: Request, background_tasks: BackgroundTasks):
body = await request.body()
try:
delivery = client.signatures.parse_delivery(body=body, headers=dict(request.headers))
except posthook.SignatureVerificationError:
raise HTTPException(status_code=401)
background_tasks.add_task(process_and_ack, delivery)
return Response(status_code=202)
Callback functions
The SDK provides standalone callback functions -- pass the URL from the delivery object:
# Sync (Flask, Django, background workers)
result = posthook.ack(delivery.ack_url)
result = posthook.nack(delivery.nack_url, {"error": "processing failed"})
# Async (FastAPI, etc.)
result = await posthook.async_ack(delivery.ack_url)
result = await posthook.async_nack(delivery.nack_url, {"error": "processing failed"})
Both return a CallbackResult:
result = posthook.ack(delivery.ack_url)
print(result.applied) # True if state changed, False if already resolved
print(result.status) # "completed", "not_found", "conflict", etc.
ack() and nack() return normally for 200, 404, and 409 responses. They raise CallbackError for 401 (invalid token) and 410 (expired).
If processing happens in a separate worker, use the raw callback URLs instead:
queue.enqueue("transcode", {
"video_id": delivery.data["video_id"],
"ack_url": delivery.ack_url,
"nack_url": delivery.nack_url,
})
Error Handling
All API errors extend PosthookError and can be caught with isinstance or except:
import posthook
try:
hook = client.hooks.get("hook-id")
except posthook.RateLimitError:
print("Rate limited, retry later")
except posthook.AuthenticationError:
print("Invalid API key")
except posthook.NotFoundError:
print("Hook not found")
except posthook.PosthookError as err:
print(f"API error: {err.message} (status={err.status_code})")
| Error class | HTTP Status | Code |
|---|---|---|
BadRequestError |
400 | bad_request |
AuthenticationError |
401 | authentication_error |
ForbiddenError |
403 | forbidden |
NotFoundError |
404 | not_found |
PayloadTooLargeError |
413 | payload_too_large |
RateLimitError |
429 | rate_limit_exceeded |
InternalServerError |
5xx | internal_error |
PosthookConnectionError |
-- | connection_error |
SignatureVerificationError |
-- | signature_verification_error |
Configuration
client = posthook.Posthook(
"pk_...",
base_url="https://api.staging.posthook.io",
timeout=60,
signing_key="ph_sk_...",
)
| Option | Description | Default |
|---|---|---|
api_key |
Your Posthook API key | POSTHOOK_API_KEY env var |
base_url |
Custom API base URL | https://api.posthook.io |
timeout |
Request timeout in seconds | 30 |
signing_key |
Signing key for webhook verification | POSTHOOK_SIGNING_KEY env var |
http_client |
Custom httpx.Client instance |
-- |
Quota Info
After scheduling a hook, quota information is available on the returned Hook object:
hook = client.hooks.schedule(path="/test", post_in="5m")
if hook.quota:
print(f"Limit: {hook.quota.limit}")
print(f"Usage: {hook.quota.usage}")
print(f"Remaining: {hook.quota.remaining}")
print(f"Resets at: {hook.quota.resets_at}")
Async Client
The AsyncPosthook client provides an identical API -- just await each call:
import posthook
async with posthook.AsyncPosthook("pk_...") as client:
hook = await client.hooks.schedule(path="/test", post_in="5m")
print(hook.id)
hooks = await client.hooks.list(status="pending")
Both the sync and async clients support context managers for automatic cleanup:
# Sync
with posthook.Posthook("pk_...") as client:
hook = client.hooks.schedule(path="/test", post_in="5m")
# Async
async with posthook.AsyncPosthook("pk_...") as client:
hook = await client.hooks.schedule(path="/test", post_in="5m")
You can also call close() / await close() manually if you prefer.
Debug Logging
The SDK logs all requests via Python's logging module under the "posthook" logger. Enable it to see request details:
import logging
logging.basicConfig(level=logging.DEBUG)
Example output:
DEBUG:posthook:POST /v1/hooks -> 200 (0.153s)
DEBUG:posthook:GET /v1/hooks -> 200 (0.089s)
Advanced
Proxy support
Pass a custom httpx.Client configured with a proxy:
import httpx
import posthook
http_client = httpx.Client(proxy="http://proxy.example.com:8080")
client = posthook.Posthook("pk_...", http_client=http_client)
Custom CA certificates
import httpx
import posthook
http_client = httpx.Client(verify="/path/to/custom-ca-bundle.crt")
client = posthook.Posthook("pk_...", http_client=http_client)
Custom httpx client
For full control over HTTP behavior, provide your own httpx.Client (sync) or httpx.AsyncClient (async). The SDK will add its authentication headers automatically:
import httpx
import posthook
http_client = httpx.Client(
timeout=60,
verify=True,
proxy="http://proxy.example.com:8080",
limits=httpx.Limits(max_connections=20),
)
client = posthook.Posthook("pk_...", http_client=http_client)
When you provide a custom client, the SDK does not close it on client.close() -- you are responsible for its lifecycle.
Resources
- Documentation — guides, concepts, and patterns
- API Reference — endpoint specs and examples
- Quickstart — get started in under 2 minutes
- Pricing — free tier included
- Status — uptime and incident history
Requirements
- Python 3.9+
- httpx >= 0.25.0
- websockets >= 12.0 (for WebSocket listener/stream)
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file posthook_python-1.2.0.tar.gz.
File metadata
- Download URL: posthook_python-1.2.0.tar.gz
- Upload date:
- Size: 36.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
765a549cddec0217b689f048a0eacd9acac9fe2bc4d25333c1d9a10ab96bd171
|
|
| MD5 |
28aaee4c06dbcfb845e09a80461592cb
|
|
| BLAKE2b-256 |
8aa025788154ba8b7e2e601083e66708942307a535e35fc4fb38b645e79d55b0
|
File details
Details for the file posthook_python-1.2.0-py3-none-any.whl.
File metadata
- Download URL: posthook_python-1.2.0-py3-none-any.whl
- Upload date:
- Size: 29.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
6129b63f0da8f304dd59e0f2717cd6454c190040a7bc1a939459e57628fff1c6
|
|
| MD5 |
03ad1c53a672acd609d285b4af08e463
|
|
| BLAKE2b-256 |
083f3756a155298693d57effad8b99323738d38d6445bb4b65dfeb37c7c3f44d
|