Communication primitives and transport abstraction for WinterForge
Project description
winterforge-channels
Communication primitives and transport abstraction for WinterForge
winterforge-channels provides transport-agnostic message routing with permission-controlled channels and pluggable delivery mechanisms.
Features
- Transport Agnostic - Messages don't know HOW they're delivered
- Permission Controlled - Channels enforce access via WinterForge's authorization system
- Pluggable Transports - HTTP, WebSocket, Queue, Email (extensible)
- Subscriber Agnostic - ANY Frag can subscribe (User, Bot, Service, System)
- Thread Support - Message threading and semantic references
- Conversation Tracking - Canonical conversation membership
- CLI Interface - Complete command-line management
Installation
Via WinterForge Extras (Recommended)
pip install winterforge[channels]
Standalone
pip install winterforge-channels
Quick Start
Create a Channel
from winterforge_channels.primitives import Channel
channel = Channel()
channel.set_title("General Chat")
channel.set_alias('egress_transport', 'websocket')
await channel.save()
Send a Message
from winterforge_channels.primitives import Message
from winterforge.frags import Manifest
message = Message()
message.set_content("Hello, world!")
message.set_author_id(user.id)
await message.save()
# Send via channel
await message.send(Manifest([channel]))
Subscribe to Channel
# Subscribe user to channel
await channel.subscribe(user_id=42)
# Get all subscribers
subscribers = await channel.get_subscribers()
Thread Messages
# Create original message
msg1 = Message()
msg1.set_content("What's the status?")
msg1.set_author_id(alice.id)
await msg1.save()
# Reply to message
msg2 = Message()
msg2.set_content("All systems operational")
msg2.set_author_id(bob.id)
msg2.set_reply_to_id(msg1.id)
await msg2.save()
# Get thread
from winterforge_channels.registries import MessageRegistry
registry = MessageRegistry()
thread = await registry.get_thread(msg2.id)
# Returns: [msg1, msg2]
CLI Usage
# Create channel
winterforge channel create "Support Chat" --egress websocket
# Send message
winterforge channel send 1 "Hello!" --author-id 1
# Subscribe user to channel
winterforge channel subscribe 1 42
# List subscribers
winterforge channel list-subscribers 1
# Unsubscribe
winterforge channel unsubscribe 1 42
# List all channels
winterforge channel list
Architecture
Primitives
Message - Transport-agnostic communication unit
- Content, author, content_type
- Thread management (reply_to chains)
- Semantic references
- Conversation tracking
- Transport-agnostic sending
Channel - Permission-controlled routing mechanism
- Configurable ingress/egress transports
- Subscriber management
- Permission enforcement
- Transport repository access
Subscription - Links subscribers to channels
- Any Frag can be a subscriber
- Clean subscribe/unsubscribe workflow
- Query by subscriber or channel
Traits
messageable - Content and metadata
- content: str
- author_id: int
- content_type: str (default: 'text/plain')
conversable - Canonical conversation tracking
- conversation_id: Optional[int]
- get_conversation() - Resolve conversation Frag
transportable - Send via channels
- send_to_channels(channels) - Permission check + routing
subscribable - Subscriber management
- get_subscribers() - Get all subscribers
- subscribe(subscriber_id) - Add subscriber
- unsubscribe(subscriber_id) - Remove subscriber
routable - Message routing
- route_message(message) - Orchestrate delivery
Transport Plugins
HTTP Ingress - Receive messages via POST endpoint
- Validates payload
- Creates Message Frag
- Returns message ID
HTTP Egress - Send to subscriber endpoints
- POST to subscriber.endpoint
- Delivery status tracking
- Error handling
WebSocket Egress - Real-time delivery
- Active connection management
- Register/unregister connections
- Send to connected subscribers only
Registries
MessageRegistry - Message queries
- get_thread(message_id) - Reconstruct thread
- get_for_conversation(conversation_id) - Filter by conversation
ChannelRegistry - Channel queries
- Standard FragRegistry operations
SubscriptionRegistry - Subscription queries
- Filter by subscriber or channel
Permissions
Channels enforce three permissions via WinterForge's authorization system:
- channel.submit - Can send messages to channel
- channel.subscribe - Can subscribe to channel
- channel.manage - Can configure channel
Permission Setup
from winterforge.frags import Frag
# Create permission
perm = Frag(affinities=['permission'], traits=['titled', 'persistable'])
perm.set_title('channel.submit')
await perm.save()
# Create role with permission
role = Frag(
affinities=['role'],
traits=['titled', 'permissioned', 'persistable'],
)
role.set_title('sender')
role.add_permission(perm.id)
await role.save()
# Assign role to user
user.add_role(role.id)
await user.save()
# Permission check
can_submit = await channel.can_submit(user.id)
Transport Configuration
Channels can specify preferred transports via aliases:
channel.set_alias('ingress_transport', 'http')
channel.set_alias('egress_transport', 'websocket')
await channel.save()
Transport resolution uses first-match strategy:
- Channel-preferred transport (if available)
- First available transport
Message Threading
Messages support two types of relationships:
reply_to - Direct thread parent (single message)
msg2.set_reply_to_id(msg1.id)
parent = await msg2.get_reply_to()
references - Semantic links (multiple messages)
msg3.add_reference(msg1.id)
msg3.add_reference(msg2.id)
refs = await msg3.get_references()
Thread Traversal:
registry = MessageRegistry()
thread = await registry.get_thread(message.id)
# Returns Manifest with [root, ..., message]
Examples
Complete Workflow
from winterforge.frags import Frag, Manifest
from winterforge_channels.primitives import Message, Channel
# Create permission
perm = Frag(affinities=['permission'], traits=['titled', 'persistable'])
perm.set_title('channel.submit')
await perm.save()
# Create role
role = Frag(
affinities=['role'],
traits=['titled', 'permissioned', 'persistable'],
)
role.set_title('member')
role.add_permission(perm.id)
await role.save()
# Create user
alice = Frag(
affinities=['user'],
traits=['userable', 'authorizable', 'persistable'],
)
alice.set_username('alice')
alice.set_email('alice@example.com')
alice.add_role(role.id)
await alice.save()
# Create channel
general = Channel()
general.set_title("General Chat")
general.set_alias('egress_transport', 'websocket')
await general.save()
# Subscribe
await general.subscribe(alice.id)
# Send message
msg = Message()
msg.set_content("Hello everyone!")
msg.set_author_id(alice.id)
await msg.save()
results = await msg.send(Manifest([general]))
Multi-Channel Broadcasting
# Send to multiple channels at once
channels = Manifest([general, support, announcements])
results = await message.send(channels)
for result in results:
print(f"Delivered: {result.delivered_count}")
print(f"Failed: {result.failed_count}")
Conversation Tracking
# Create conversation
conversation = Frag(
affinities=['conversation'],
traits=['titled', 'persistable'],
)
conversation.set_title("Project Planning")
await conversation.save()
# Link messages to conversation
msg1.set_conversation_id(conversation.id)
msg2.set_conversation_id(conversation.id)
# Query conversation messages
registry = MessageRegistry()
messages = await registry.get_for_conversation(conversation.id)
Testing
# Run all tests
pytest tests/winterforge_channels/
# Run with coverage
pytest tests/winterforge_channels/ --cov=winterforge_channels
# Run specific test file
pytest tests/winterforge_channels/primitives/test_message.py
Current Status: 147 tests passing
Development
Creating Custom Transports
Implement IngressTransport and/or EgressTransport protocols:
from winterforge_channels.plugins.transports import (
EgressTransport,
TransportResult,
)
class EmailEgressTransport:
"""Send messages via email."""
async def send(self, message, channel, subscribers):
"""Send email to subscribers."""
delivered = 0
failed = 0
errors = []
for subscriber in subscribers:
try:
await send_email(
to=subscriber.email,
subject=f"New message in {channel.title}",
body=message.content,
)
delivered += 1
except Exception as e:
errors.append(str(e))
failed += 1
return [TransportResult(
success=(failed == 0),
delivered_count=delivered,
failed_count=failed,
errors=errors,
)]
def is_available(self) -> bool:
"""Check if email transport is available."""
return True
Register via decorator:
from winterforge.plugins.decorators import plugin
@plugin('email', type='egress_transport')
class EmailEgressTransport:
...
Architecture Principles
Composition Over Inheritance
- Messages, Channels, Subscriptions use trait composition
- No inheritance hierarchies
Transport Agnostic
- Messages don't know HOW they're delivered
- Channels handle routing
Permission Controlled
- Leverages WinterForge's authorizable trait
- Fine-grained access control
Subscriber Agnostic
- ANY Frag can be a subscriber
- No special requirements
Fluent Interfaces
- Chainable methods throughout
- Pythonic design
Pattern Consistency
- Matches WinterForge core patterns
- Repository/Manifest API alignment
API Reference
See getting_started.md for detailed documentation.
Contributing
Contributions welcome! Please ensure:
- All tests pass
- New features have tests
- Code follows WinterForge mandates
- Line length ≤80 characters
License
MIT License - see LICENSE file for details
Links
- WinterForge Core: https://github.com/yourusername/winterforge
- Documentation: https://winterforge-channels.readthedocs.io
- Issues: https://github.com/yourusername/winterforge-channels/issues
Built with ❄️ by the WinterForge team
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file winterforge_channels-1.0.0.tar.gz.
File metadata
- Download URL: winterforge_channels-1.0.0.tar.gz
- Upload date:
- Size: 14.4 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
5411ee516298707249906ad2c695c5bc5253ac29e1ab073d6bfef63e377914f7
|
|
| MD5 |
002fc12603e005b7d3c0bc23ee2cc235
|
|
| BLAKE2b-256 |
e242aa4f4a264fa2118e5cb7b75170caaf87a7a106a5977b4a1867d472c64074
|
File details
Details for the file winterforge_channels-1.0.0-py3-none-any.whl.
File metadata
- Download URL: winterforge_channels-1.0.0-py3-none-any.whl
- Upload date:
- Size: 6.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
6d341f006db289e16756c550ab11d3c7c9e6d0fdedea7e746d2a34c1c38525e3
|
|
| MD5 |
76140e24ef80a7dc52c4b4a09d38dd24
|
|
| BLAKE2b-256 |
e37666da888b45bf25c4c897115ade49e7ee66d986c0e3c5d4286556d8e58e96
|