Skip to main content

Official Python SDK for Synap - High-Performance In-Memory Key-Value Store & Message Broker

Project description

Synap Python SDK

Official Python client library for Synap - High-Performance In-Memory Key-Value Store & Message Broker.

Features

  • 💾 Key-Value Store: Fast in-memory KV operations with TTL support
  • 📨 Message Queues: RabbitMQ-style queues with ACK/NACK
  • 📡 Event Streams: Kafka-style event streams with offset tracking
  • 🔔 Pub/Sub: Topic-based messaging with wildcards
  • StreamableHTTP Protocol: Unified endpoint for all operations
  • 🛡️ Type-Safe: Full type hints with mypy strict mode
  • 📦 Async/Await: Built on modern async patterns with httpx

Requirements

  • Python 3.11 or higher
  • Synap Server running

Installation

pip install synap-sdk

Quick Start

import asyncio
from synap_sdk import SynapClient, SynapConfig

async def main():
    # Create client
    config = SynapConfig.create("http://localhost:15500")
    async with SynapClient(config) as client:
        # Key-Value operations
        await client.kv.set("user:1", "John Doe")
        value = await client.kv.get("user:1")
        print(f"Value: {value}")

        # Queue operations
        await client.queue.create_queue("tasks")
        msg_id = await client.queue.publish("tasks", {"task": "process-video"}, priority=9)
        message = await client.queue.consume("tasks", "worker-1")
        
        if message:
            # Process message
            print(f"Received: {message.payload}")
            await client.queue.ack("tasks", message.id)

        # Event Stream
        await client.stream.create_room("chat-room-1")
        offset = await client.stream.publish("chat-room-1", "message", {
            "user": "alice",
            "text": "Hello!"
        })

        # Pub/Sub
        await client.pubsub.subscribe_topics("user-123", ["notifications.*"])
        delivered = await client.pubsub.publish("notifications.email", {
            "to": "user@example.com",
            "subject": "Welcome"
        })

asyncio.run(main())

Transports

Since v0.11.0 the SDK selects the transport via URL scheme — no extra arguments required:

URL scheme Default port When to use
synap:// 15501 ✅ Recommended default — MessagePack over persistent TCP, lowest latency, preserves int/float/bool/bytes.
resp3:// 6379 Redis-compatible text protocol — interop with existing Redis tooling.
http:// / https:// 15500 Original REST transport — full command coverage.

All commands (KV, Hash, List, Set, Sorted Set, Queue, Stream, Pub/Sub, Transactions, Scripts, Geo, HyperLogLog) are fully supported on every transport. Native transports raise UnsupportedCommandError instead of silently falling back to HTTP.

from synap_sdk import SynapClient, SynapConfig

# SynapRPC — recommended default
config = SynapConfig("synap://127.0.0.1:15501")
client = SynapClient(config)

# RESP3 — Redis-compatible
config = SynapConfig("resp3://127.0.0.1:6379")
client = SynapClient(config)

# HTTP — full REST access
config = SynapConfig("http://127.0.0.1:15500")
client = SynapClient(config)

Queue, stream and pub/sub over synap://:

import asyncio
from synap_sdk import SynapClient, SynapConfig

async def main():
    async with SynapClient(SynapConfig("synap://127.0.0.1:15501")) as client:
        # Queue round-trip
        await client.queue.create_queue("tasks", max_size=1000, message_ttl=60)
        msg_id = await client.queue.publish("tasks", {"job": "resize"}, priority=5)
        msg = await client.queue.consume("tasks", "worker-1")
        await client.queue.ack("tasks", msg.id)

        # Stream publish + read
        await client.stream.create_room("events")
        await client.stream.publish("events", "user.created", {"id": "u1"})
        events = await client.stream.read("events", offset=0)

        # Reactive pub/sub (server-push over dedicated TCP connection on synap://)
        async for msg in client.pubsub.observe(["news.*"]):
            print("got:", msg)
            break

API Reference

Configuration

from synap_sdk import SynapConfig

config = SynapConfig.create("http://localhost:15500") \
    .with_timeout(60) \
    .with_auth_token("your-token") \
    .with_max_retries(5)

async with SynapClient(config) as client:
    # Use client
    pass

Key-Value Store

# Set value with TTL
await client.kv.set("session:abc", session_data, ttl=3600)

# Get value
data = await client.kv.get("session:abc")

# Atomic operations
new_value = await client.kv.incr("counter", delta=1)
exists = await client.kv.exists("session:abc")

# Scan keys
keys = await client.kv.scan("user:*", limit=100)

# Get stats
stats = await client.kv.stats()

# Delete key
await client.kv.delete("session:abc")

Message Queues

# Create queue
await client.queue.create_queue("tasks", max_size=10000, message_ttl=3600)

# Publish message with priority
message_id = await client.queue.publish(
    "tasks",
    {"action": "encode", "file": "video.mp4"},
    priority=9,
    max_retries=3
)

# Consume and process
message = await client.queue.consume("tasks", "worker-1")
if message:
    try:
        # Process message
        await process_message(message.payload)
        await client.queue.ack("tasks", message.id)
    except Exception:
        await client.queue.nack("tasks", message.id)  # Requeue

# Get queue stats
stats = await client.queue.stats("tasks")
queues = await client.queue.list()

# Delete queue
await client.queue.delete_queue("tasks")

Event Streams

# Create stream room
await client.stream.create_room("events")

# Publish event
offset = await client.stream.publish("events", "user.created", {
    "userId": "123",
    "name": "Alice"
})

# Read events
events = await client.stream.read("events", offset=0, limit=100)
for evt in events:
    print(f"Event: {evt.event} at offset {evt.offset}")

# Get stream stats
stats = await client.stream.stats("events")
rooms = await client.stream.list_rooms()

# Delete room
await client.stream.delete_room("events")

Pub/Sub

# Subscribe to topics (supports wildcards)
await client.pubsub.subscribe_topics("subscriber-1", [
    "user.created",
    "notifications.*",
    "events.#"
])

# Publish message
delivered = await client.pubsub.publish("notifications.email", {
    "to": "user@example.com",
    "subject": "Welcome!"
})
print(f"Delivered to {delivered} subscribers")

# Unsubscribe
await client.pubsub.unsubscribe_topics("subscriber-1", ["notifications.*"])

# Get stats
stats = await client.pubsub.stats()

Async Context Manager

The client supports async context manager for automatic cleanup:

async with SynapClient(config) as client:
    await client.kv.set("key", "value")
    # Client automatically closed when exiting context

Or manual close:

client = SynapClient(config)
try:
    await client.kv.set("key", "value")
finally:
    await client.close()

Error Handling

from synap_sdk.exceptions import SynapException

try:
    await client.kv.set("key", "value")
except SynapException as e:
    print(f"Synap error: {e}")

Custom HTTP Client

You can provide your own httpx.AsyncClient for advanced scenarios:

import httpx
from synap_sdk import SynapClient, SynapConfig

http_client = httpx.AsyncClient(timeout=120)
config = SynapConfig.create("http://localhost:15500")

async with SynapClient(config, http_client) as client:
    # Use client
    pass

Type Hints

The SDK is fully typed and passes mypy strict mode:

mypy synap_sdk

Development

Install Development Dependencies

pip install -e ".[dev]"

Run Tests

pytest

Run Tests with Coverage

pytest --cov=synap_sdk --cov-report=html

Type Checking

mypy synap_sdk

Linting

ruff check synap_sdk

Formatting

ruff format synap_sdk

License

Apache License 2.0 - See LICENSE for details.

Contributing

Contributions are welcome! Please see CONTRIBUTING.md for details.

Links

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

synap_sdk-0.11.1.tar.gz (36.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

synap_sdk-0.11.1-py3-none-any.whl (46.0 kB view details)

Uploaded Python 3

File details

Details for the file synap_sdk-0.11.1.tar.gz.

File metadata

  • Download URL: synap_sdk-0.11.1.tar.gz
  • Upload date:
  • Size: 36.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.9

File hashes

Hashes for synap_sdk-0.11.1.tar.gz
Algorithm Hash digest
SHA256 1a19684058646cbf097f04ef9212bae86abf9a159f2f633ea8236f444275c058
MD5 2d7d2a967fc5972d669f8dc59d1b64c7
BLAKE2b-256 1a11f466909e2b3c856c55a2492622b08b836a3a1a6669c0ba4b5c4cb32571ad

See more details on using hashes here.

File details

Details for the file synap_sdk-0.11.1-py3-none-any.whl.

File metadata

  • Download URL: synap_sdk-0.11.1-py3-none-any.whl
  • Upload date:
  • Size: 46.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.9

File hashes

Hashes for synap_sdk-0.11.1-py3-none-any.whl
Algorithm Hash digest
SHA256 1713ca87048e60b1ded74b254fd848d83dcee987a7de47920a4df3fb9859a08c
MD5 b59a857630730f9e8eedef8eea80eedc
BLAKE2b-256 d84a19220fb2170de77f82d283de2e29debef15c8b2d4269fabe01f95a994d8e

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page