Skip to main content

Communication primitives and transport abstraction for WinterForge

Project description

winterforge-channels

Communication primitives and transport abstraction for WinterForge

Tests Python License

winterforge-channels provides transport-agnostic message routing with permission-controlled channels and pluggable delivery mechanisms.


Features

  • Transport Agnostic - Messages don't know HOW they're delivered
  • Permission Controlled - Channels enforce access via WinterForge's authorization system
  • Pluggable Transports - HTTP, WebSocket, Queue, Email (extensible)
  • Subscriber Agnostic - ANY Frag can subscribe (User, Bot, Service, System)
  • Thread Support - Message threading and semantic references
  • Conversation Tracking - Canonical conversation membership
  • CLI Interface - Complete command-line management

Installation

Via WinterForge Extras (Recommended)

pip install winterforge[channels]

Standalone

pip install winterforge-channels

Quick Start

Create a Channel

from winterforge_channels.primitives import Channel

channel = Channel()
channel.set_title("General Chat")
channel.set_alias('egress_transport', 'websocket')
await channel.save()

Send a Message

from winterforge_channels.primitives import Message
from winterforge.frags import Manifest

message = Message()
message.set_content("Hello, world!")
message.set_author_id(user.id)
await message.save()

# Send via channel
await message.send(Manifest([channel]))

Subscribe to Channel

# Subscribe user to channel
await channel.subscribe(user_id=42)

# Get all subscribers
subscribers = await channel.get_subscribers()

Thread Messages

# Create original message
msg1 = Message()
msg1.set_content("What's the status?")
msg1.set_author_id(alice.id)
await msg1.save()

# Reply to message
msg2 = Message()
msg2.set_content("All systems operational")
msg2.set_author_id(bob.id)
msg2.set_reply_to_id(msg1.id)
await msg2.save()

# Get thread
from winterforge_channels.registries import MessageRegistry

registry = MessageRegistry()
thread = await registry.get_thread(msg2.id)
# Returns: [msg1, msg2]

CLI Usage

# Create channel
winterforge channel create "Support Chat" --egress websocket

# Send message
winterforge channel send 1 "Hello!" --author-id 1

# Subscribe user to channel
winterforge channel subscribe 1 42

# List subscribers
winterforge channel list-subscribers 1

# Unsubscribe
winterforge channel unsubscribe 1 42

# List all channels
winterforge channel list

Architecture

Primitives

Message - Transport-agnostic communication unit

  • Content, author, content_type
  • Thread management (reply_to chains)
  • Semantic references
  • Conversation tracking
  • Transport-agnostic sending

Channel - Permission-controlled routing mechanism

  • Configurable ingress/egress transports
  • Subscriber management
  • Permission enforcement
  • Transport repository access

Subscription - Links subscribers to channels

  • Any Frag can be a subscriber
  • Clean subscribe/unsubscribe workflow
  • Query by subscriber or channel

Traits

messageable - Content and metadata

  • content: str
  • author_id: int
  • content_type: str (default: 'text/plain')

conversable - Canonical conversation tracking

  • conversation_id: Optional[int]
  • get_conversation() - Resolve conversation Frag

transportable - Send via channels

  • send_to_channels(channels) - Permission check + routing

subscribable - Subscriber management

  • get_subscribers() - Get all subscribers
  • subscribe(subscriber_id) - Add subscriber
  • unsubscribe(subscriber_id) - Remove subscriber

routable - Message routing

  • route_message(message) - Orchestrate delivery

Transport Plugins

HTTP Ingress - Receive messages via POST endpoint

  • Validates payload
  • Creates Message Frag
  • Returns message ID

HTTP Egress - Send to subscriber endpoints

  • POST to subscriber.endpoint
  • Delivery status tracking
  • Error handling

WebSocket Egress - Real-time delivery

  • Active connection management
  • Register/unregister connections
  • Send to connected subscribers only

Registries

MessageRegistry - Message queries

  • get_thread(message_id) - Reconstruct thread
  • get_for_conversation(conversation_id) - Filter by conversation

ChannelRegistry - Channel queries

  • Standard FragRegistry operations

SubscriptionRegistry - Subscription queries

  • Filter by subscriber or channel

Permissions

Channels enforce three permissions via WinterForge's authorization system:

  • channel.submit - Can send messages to channel
  • channel.subscribe - Can subscribe to channel
  • channel.manage - Can configure channel

Permission Setup

from winterforge.frags import Frag

# Create permission
perm = Frag(affinities=['permission'], traits=['titled', 'persistable'])
perm.set_title('channel.submit')
await perm.save()

# Create role with permission
role = Frag(
    affinities=['role'],
    traits=['titled', 'permissioned', 'persistable'],
)
role.set_title('sender')
role.add_permission(perm.id)
await role.save()

# Assign role to user
user.add_role(role.id)
await user.save()

# Permission check
can_submit = await channel.can_submit(user.id)

Transport Configuration

Channels can specify preferred transports via aliases:

channel.set_alias('ingress_transport', 'http')
channel.set_alias('egress_transport', 'websocket')
await channel.save()

Transport resolution uses first-match strategy:

  1. Channel-preferred transport (if available)
  2. First available transport

Message Threading

Messages support two types of relationships:

reply_to - Direct thread parent (single message)

msg2.set_reply_to_id(msg1.id)
parent = await msg2.get_reply_to()

references - Semantic links (multiple messages)

msg3.add_reference(msg1.id)
msg3.add_reference(msg2.id)
refs = await msg3.get_references()

Thread Traversal:

registry = MessageRegistry()
thread = await registry.get_thread(message.id)
# Returns Manifest with [root, ..., message]

Examples

Complete Workflow

from winterforge.frags import Frag, Manifest
from winterforge_channels.primitives import Message, Channel

# Create permission
perm = Frag(affinities=['permission'], traits=['titled', 'persistable'])
perm.set_title('channel.submit')
await perm.save()

# Create role
role = Frag(
    affinities=['role'],
    traits=['titled', 'permissioned', 'persistable'],
)
role.set_title('member')
role.add_permission(perm.id)
await role.save()

# Create user
alice = Frag(
    affinities=['user'],
    traits=['userable', 'authorizable', 'persistable'],
)
alice.set_username('alice')
alice.set_email('alice@example.com')
alice.add_role(role.id)
await alice.save()

# Create channel
general = Channel()
general.set_title("General Chat")
general.set_alias('egress_transport', 'websocket')
await general.save()

# Subscribe
await general.subscribe(alice.id)

# Send message
msg = Message()
msg.set_content("Hello everyone!")
msg.set_author_id(alice.id)
await msg.save()

results = await msg.send(Manifest([general]))

Multi-Channel Broadcasting

# Send to multiple channels at once
channels = Manifest([general, support, announcements])
results = await message.send(channels)

for result in results:
    print(f"Delivered: {result.delivered_count}")
    print(f"Failed: {result.failed_count}")

Conversation Tracking

# Create conversation
conversation = Frag(
    affinities=['conversation'],
    traits=['titled', 'persistable'],
)
conversation.set_title("Project Planning")
await conversation.save()

# Link messages to conversation
msg1.set_conversation_id(conversation.id)
msg2.set_conversation_id(conversation.id)

# Query conversation messages
registry = MessageRegistry()
messages = await registry.get_for_conversation(conversation.id)

Testing

# Run all tests
pytest tests/winterforge_channels/

# Run with coverage
pytest tests/winterforge_channels/ --cov=winterforge_channels

# Run specific test file
pytest tests/winterforge_channels/primitives/test_message.py

Current Status: 147 tests passing


Development

Creating Custom Transports

Implement IngressTransport and/or EgressTransport protocols:

from winterforge_channels.plugins.transports import (
    EgressTransport,
    TransportResult,
)

class EmailEgressTransport:
    """Send messages via email."""

    async def send(self, message, channel, subscribers):
        """Send email to subscribers."""
        delivered = 0
        failed = 0
        errors = []

        for subscriber in subscribers:
            try:
                await send_email(
                    to=subscriber.email,
                    subject=f"New message in {channel.title}",
                    body=message.content,
                )
                delivered += 1
            except Exception as e:
                errors.append(str(e))
                failed += 1

        return [TransportResult(
            success=(failed == 0),
            delivered_count=delivered,
            failed_count=failed,
            errors=errors,
        )]

    def is_available(self) -> bool:
        """Check if email transport is available."""
        return True

Register via decorator:

from winterforge.plugins.decorators import plugin

@plugin('email', type='egress_transport')
class EmailEgressTransport:
    ...

Architecture Principles

Composition Over Inheritance

  • Messages, Channels, Subscriptions use trait composition
  • No inheritance hierarchies

Transport Agnostic

  • Messages don't know HOW they're delivered
  • Channels handle routing

Permission Controlled

  • Leverages WinterForge's authorizable trait
  • Fine-grained access control

Subscriber Agnostic

  • ANY Frag can be a subscriber
  • No special requirements

Fluent Interfaces

  • Chainable methods throughout
  • Pythonic design

Pattern Consistency

  • Matches WinterForge core patterns
  • Repository/Manifest API alignment

API Reference

See getting_started.md for detailed documentation.


Contributing

Contributions welcome! Please ensure:

  • All tests pass
  • New features have tests
  • Code follows WinterForge mandates
  • Line length ≤80 characters

License

MIT License - see LICENSE file for details


Links


Built with ❄️ by the WinterForge team

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

winterforge_channels-1.0.0.tar.gz (14.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

winterforge_channels-1.0.0-py3-none-any.whl (6.5 kB view details)

Uploaded Python 3

File details

Details for the file winterforge_channels-1.0.0.tar.gz.

File metadata

  • Download URL: winterforge_channels-1.0.0.tar.gz
  • Upload date:
  • Size: 14.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.3

File hashes

Hashes for winterforge_channels-1.0.0.tar.gz
Algorithm Hash digest
SHA256 5411ee516298707249906ad2c695c5bc5253ac29e1ab073d6bfef63e377914f7
MD5 002fc12603e005b7d3c0bc23ee2cc235
BLAKE2b-256 e242aa4f4a264fa2118e5cb7b75170caaf87a7a106a5977b4a1867d472c64074

See more details on using hashes here.

File details

Details for the file winterforge_channels-1.0.0-py3-none-any.whl.

File metadata

File hashes

Hashes for winterforge_channels-1.0.0-py3-none-any.whl
Algorithm Hash digest
SHA256 6d341f006db289e16756c550ab11d3c7c9e6d0fdedea7e746d2a34c1c38525e3
MD5 76140e24ef80a7dc52c4b4a09d38dd24
BLAKE2b-256 e37666da888b45bf25c4c897115ade49e7ee66d986c0e3c5d4286556d8e58e96

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page