Official Python SDK for Anlyon Relay API - Type-safe message scheduling and delivery
Project description
Anlyon Relay Python SDK
Official Python SDK for the Anlyon Relay API - Type-safe message scheduling and delivery.
Installation
pip install anlyon-relay
Quick Start
Synchronous Client
from anlyon_relay import Client, PublishMessageRequest
# Create a client
client = Client(api_key="your-api-key")
# Publish a message
result = client.messages.publish(
PublishMessageRequest(
url="https://example.com/webhook",
body={"hello": "world"}
)
)
print(f"Message ID: {result.data.message_id}")
# Close when done
client.close()
Asynchronous Client
import asyncio
from anlyon_relay import AsyncClient, PublishMessageRequest
async def main():
async with AsyncClient(api_key="your-api-key") as client:
result = await client.messages.publish(
PublishMessageRequest(
url="https://example.com/webhook",
body={"hello": "world"}
)
)
print(f"Message ID: {result.data.message_id}")
asyncio.run(main())
Features
- Type-safe: Full type hints and Pydantic models
- Sync & Async: Both synchronous and asynchronous clients
- Fluent Builders: Intuitive API for building requests
- Automatic Retries: Exponential backoff with jitter
- Webhook Verification: HMAC-SHA256 signature verification
Usage Examples
Fluent Message Builder
from anlyon_relay import Client
client = Client(api_key="your-api-key")
# Using the fluent builder
result = (
client.messages.create_message()
.to("https://example.com/webhook")
.method("POST")
.header("X-Custom-Header", "value")
.body({"event": "user.created", "user_id": 123})
.in_minutes(5) # Schedule 5 minutes from now
.max_retries(3)
.send()
)
Scheduled Messages
from datetime import datetime
from anlyon_relay import Client, PublishMessageRequest
client = Client(api_key="your-api-key")
# Schedule for a specific time
result = client.messages.publish(
PublishMessageRequest(
url="https://example.com/webhook",
scheduled_for="2024-12-25T00:00:00Z"
)
)
# Or use Unix timestamp
result = client.messages.publish(
PublishMessageRequest(
url="https://example.com/webhook",
execute_at=1735084800 # Unix timestamp in seconds
)
)
Cron Schedules
from anlyon_relay import Client, CreateScheduleRequest
client = Client(api_key="your-api-key")
# Create a schedule
result = client.schedules.create(
CreateScheduleRequest(
name="Daily Report",
cron_expression="0 9 * * *", # Every day at 9 AM
timezone="America/New_York",
url="https://example.com/generate-report"
)
)
# Using the fluent builder
result = (
client.schedules.create_schedule()
.name("Hourly Check")
.hourly()
.to("https://example.com/health-check")
.create()
)
URL Groups (Broadcasting)
from anlyon_relay import Client, CreateUrlGroupRequest, EndpointInput
client = Client(api_key="your-api-key")
# Create a group for broadcasting
result = client.url_groups.create(
CreateUrlGroupRequest(
name="Notification Services",
endpoints=[
EndpointInput(url="https://service1.com/webhook"),
EndpointInput(url="https://service2.com/webhook"),
EndpointInput(url="https://service3.com/webhook"),
]
)
)
# Publish to all endpoints
from anlyon_relay import PublishGroupMessageRequest
client.url_groups.publish(
result.data.id,
PublishGroupMessageRequest(body={"event": "broadcast"})
)
Queues
from anlyon_relay import Client, CreateQueueRequest
client = Client(api_key="your-api-key")
# Create a queue with parallelism
result = client.queues.create(
CreateQueueRequest(
name="email-queue",
parallelism=5 # Process 5 messages at a time
)
)
# Publish to the queue
from anlyon_relay import PublishMessageRequest
client.messages.publish(
PublishMessageRequest(
url="https://example.com/process-email",
queue="email-queue"
)
)
Pagination
from anlyon_relay import Client
client = Client(api_key="your-api-key")
# Iterate through all messages
for message in client.messages.list_all():
print(f"Message: {message.message_id} - {message.status}")
# Or collect to a list
all_messages = client.messages.list_all().to_list()
# Take first 10
first_10 = client.messages.list_all().take(10)
# Find specific message
pending = client.messages.list_all().find(
lambda m: m.status == "pending"
)
Async Pagination
import asyncio
from anlyon_relay import AsyncClient
async def main():
async with AsyncClient(api_key="your-api-key") as client:
async for message in client.messages.list_all():
print(f"Message: {message.message_id}")
# Or collect to list
all_messages = await client.messages.list_all().to_list()
asyncio.run(main())
Webhook Verification
from anlyon_relay import Client, Receiver, ReceiverConfig
# Option 1: Using Client static method
result = Client.verify_webhook_signature(
payload=request_body,
signature_header=request.headers["x-anlyon-signature"],
secret="your-webhook-secret"
)
if result.valid:
# Process the webhook
pass
# Option 2: Using Receiver class
receiver = Receiver(ReceiverConfig(signing_secret="your-webhook-secret"))
try:
receiver.verify_or_throw(
signature=request.headers["x-anlyon-signature"],
body=request_body
)
# Signature is valid, process the webhook
except SignatureVerificationError as e:
# Invalid signature
pass
Error Handling
from anlyon_relay import (
Client,
AnlyonError,
RateLimitError,
ValidationError,
AuthenticationError,
)
client = Client(api_key="your-api-key")
try:
result = client.messages.publish(...)
except RateLimitError as e:
print(f"Rate limited, retry after {e.retry_after} seconds")
except ValidationError as e:
print(f"Validation failed: {e.details}")
except AuthenticationError as e:
print("Invalid API key")
except AnlyonError as e:
print(f"API error: {e.code} - {e.message}")
Custom Configuration
from anlyon_relay import Client, RetryConfig, LoggingConfig, LogLevel
client = Client(
api_key="your-api-key",
base_url="https://api.queue.anlyon.com",
timeout=60.0,
retry=RetryConfig(
max_attempts=5,
initial_delay=0.5,
max_delay=30.0,
backoff_multiplier=2.0,
jitter=True,
),
logging=LoggingConfig(
enabled=True,
level=LogLevel.DEBUG,
redact_sensitive=True,
),
default_headers={"X-Custom-Header": "value"},
)
API Reference
Resources
client.messages- Publish and manage messagesclient.schedules- Create and manage cron schedulesclient.url_groups- Manage URL groups for broadcastingclient.queues- Create and manage message queuesclient.dlq- Dead letter queue operationsclient.analytics- Usage analytics and metricsclient.notifications- Notification managementclient.billing- Billing and usage information
Types
All request and response types are available as Pydantic models with full type hints.
License
MIT
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
anlyon_relay-1.0.0.tar.gz
(32.5 kB
view details)
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file anlyon_relay-1.0.0.tar.gz.
File metadata
- Download URL: anlyon_relay-1.0.0.tar.gz
- Upload date:
- Size: 32.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.14.2
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
8d1d49732f0f2fad57ea8818195f72ce5183ff09b2021e5e68160f669f0e168a
|
|
| MD5 |
215a2d99020044e62e8ac878f44344e3
|
|
| BLAKE2b-256 |
11cbccaf47feb769212f0fec160aac5fc44058db6b6015127f159fd6f30c9d8e
|
File details
Details for the file anlyon_relay-1.0.0-py3-none-any.whl.
File metadata
- Download URL: anlyon_relay-1.0.0-py3-none-any.whl
- Upload date:
- Size: 44.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.14.2
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
3b76838354d9f5d2c3848456f138581ead391828212270755b89b2aa731ef22f
|
|
| MD5 |
1cecb46705fcbc6000f8de9d396835d2
|
|
| BLAKE2b-256 |
3e1e7fcabbf8b18d1f3178160474d42a3db2799d4e4a6fc92f4c50ad618eac32
|