High-performance message relay system for real-time message delivery via WebSocket and long-polling
Project description
Pulsar Relay
A message relay system for real-time message delivery to clients via WebSocket and long-polling connections.
Features
- Multi-Protocol Support: WebSocket and HTTP long-polling
- Topic-Based Routing: Subscribe to specific message topics
- Two-Tier Storage: In-memory hot tier + Valkey persistent tier with AOF/RDB
- Simple Architecture: No external database required, just Valkey + application
Quick Start
Prerequisites
- Valkey (or Redis 7+) with AOF/RDB persistence enabled
- Docker (optional)
Installation
# Clone the repository
git clone https://github.com/mvdbeek/pulsar-relay.git
cd pulsar-relay
# Set up configuration
cp config.example.yaml config.yaml
# Edit config.yaml with your settings
# Start Valkey (if not already running)
docker run -d -p 6379:6379 -v valkey-data:/data valkey/valkey --appendonly yes
export PULSAR_STORAGE_BACKEND=valkey
export PULSAR_VALKEY_HOST=valkey.example.com
export PULSAR_JWT_SECRET_KEY=your-secure-secret-key
# Start the server (port and workers controlled by uvicorn)
uvicorn app.main:app --host 0.0.0.0 --port 9000 --workers 4
Docker Deployment
For production deployments, use the official Docker image which includes both Valkey and Pulsar Relay:
# Pull the latest image
docker pull ghcr.io/mvdbeek/pulsar-relay:latest
# Run with environment variables
docker run -d \
--name pulsar-relay \
-p 8080:8080 \
-e PULSAR_JWT_SECRET_KEY=your-secure-secret-key \
-e PULSAR_LOG_LEVEL=INFO \
-v pulsar-data:/var/lib/valkey \
ghcr.io/mvdbeek/pulsar-relay:latest
The Docker image:
- Runs both Valkey and Pulsar Relay using supervisor
- Exposes port 8080 for the API
- Persists Valkey data to
/var/lib/valkey - Includes health checks for automatic container recovery
Available environment variables:
PULSAR_JWT_SECRET_KEY- Secret key for JWT token signing (required)PULSAR_LOG_LEVEL- Log level (default: INFO)PULSAR_STORAGE_BACKEND- Storage backend (default: valkey)PULSAR_VALKEY_HOST- Valkey host (default: localhost)PULSAR_VALKEY_PORT- Valkey port (default: 6379)
Usage
Authentication
Pulsar Relay uses JWT-based authentication. Before sending or receiving messages, you need to obtain an access token.
Creating the First Admin User
On first deployment, you'll need to create an admin user directly in the storage backend. For development, you can use the test fixture function:
# create_admin.py
import asyncio
from app.auth.storage import InMemoryUserStorage
from app.auth.models import UserCreate
async def create_admin():
storage = InMemoryUserStorage()
user_data = UserCreate(
username="admin",
email="admin@example.com",
password="your-secure-password",
permissions=["admin", "read", "write"]
)
user = await storage.create_user(user_data)
print(f"Created admin user: {user.username}")
asyncio.run(create_admin())
For production deployments with Valkey storage, contact your system administrator to create the initial admin user.
Logging In
Once you have user credentials, obtain a JWT token:
# Login to get access token
curl -X POST http://localhost:8080/auth/login \
-H "Content-Type: application/x-www-form-urlencoded" \
-d "username=admin&password=your-secure-password"
# Response:
# {
# "access_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...",
# "token_type": "bearer",
# "expires_in": 3600
# }
Tokens expire after 1 hour by default. Save the access_token value to use in subsequent requests.
Creating Additional Users (Admin Only)
Admins can create new users via the API:
curl -X POST http://localhost:8080/auth/register \
-H "Authorization: Bearer YOUR_ADMIN_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"username": "producer1",
"email": "producer@example.com",
"password": "secure-password",
"permissions": ["write"]
}'
Available permissions:
admin: Full access to all operations including user managementread: Can subscribe to and receive messageswrite: Can publish messages to topics- Multiple permissions can be granted to a single user
Sending Messages (Producers)
# Send a single message
curl -X POST http://localhost:8080/api/v1/messages \
-H "Authorization: Bearer YOUR_ACCESS_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"topic": "notifications",
"payload": {
"user_id": 123,
"message": "Hello, World!"
},
"ttl": 3600
}'
Note: Requires write permission. Topic access is controlled by topic ownership and permissions.
Receiving Messages (Consumers)
WebSocket Client
// Use the access token from login response
const token = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9..."; // Your JWT token
const ws = new WebSocket(`ws://localhost:8080/ws?token=${token}`);
ws.onopen = () => {
// Subscribe to topics
ws.send(JSON.stringify({
type: 'subscribe',
topics: ['notifications', 'alerts']
}));
};
ws.onmessage = (event) => {
const data = JSON.parse(event.data);
if (data.type === 'subscribed') {
console.log('Subscribed to:', data.topics);
console.log('Session ID:', data.session_id);
}
if (data.type === 'message') {
console.log('Received:', data.payload);
// Acknowledge receipt (optional - for delivery tracking)
ws.send(JSON.stringify({
type: 'ack',
message_id: data.message_id
}));
}
if (data.type === 'error') {
console.error('Error:', data.message);
}
};
ws.onerror = (error) => {
console.error('WebSocket error:', error);
// Token may have expired - re-authenticate and reconnect
};
Note: Requires read permission. You can only subscribe to topics you have access to (owned topics, public topics, or topics you've been granted access to).
Long-Polling Client
// Use the access token from login response
const token = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9..."; // Your JWT token
// Track last seen message IDs for each topic
const lastSeenIds = {};
async function poll() {
try {
const response = await fetch(
'http://localhost:8080/messages/poll',
{
method: 'POST',
headers: {
'Authorization': `Bearer ${token}`,
'Content-Type': 'application/json'
},
body: JSON.stringify({
topics: ['notifications', 'alerts'],
since: lastSeenIds, // Catch up on any missed messages
timeout: 30
})
}
);
if (response.status === 401) {
console.error('Token expired - please re-authenticate');
return;
}
const data = await response.json();
if (data.messages && data.messages.length > 0) {
data.messages.forEach(msg => {
console.log('Received:', msg.payload);
// Track the last message ID for each topic
lastSeenIds[msg.topic] = msg.message_id;
});
}
// Continue polling
poll();
} catch (error) {
console.error('Polling error:', error);
// Retry after delay
setTimeout(poll, 5000);
}
}
poll();
Note: Requires read permission and appropriate topic access. The since parameter automatically handles message acknowledgment by tracking which messages you've already received.
Topic Management
Topics control access to message streams. Users with write permission can create topics, and topic owners can manage access.
Creating a Topic
curl -X POST http://localhost:8080/api/v1/topics \
-H "Authorization: Bearer YOUR_ACCESS_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"topic_name": "notifications",
"is_public": false,
"description": "User notification messages"
}'
Granting Topic Access
Topic owners can grant access to other users:
# Grant access by username
curl -X POST http://localhost:8080/api/v1/topics/notifications/permissions \
-H "Authorization: Bearer YOUR_ACCESS_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"username": "consumer1"
}'
Listing Your Topics
curl -X GET http://localhost:8080/api/v1/topics \
-H "Authorization: Bearer YOUR_ACCESS_TOKEN"
Making a Topic Public
Public topics can be read by any authenticated user with read permission:
curl -X PUT http://localhost:8080/api/v1/topics/notifications \
-H "Authorization: Bearer YOUR_ACCESS_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"is_public": true
}'
Architecture
See ARCHITECTURE.md for detailed architecture documentation.
High-Level Overview
Producers → Ingestion API → Storage (Memory + Valkey Streams) → WebSocket/Long-Poll → Clients
Key Components
- Ingestion Layer: REST API for message submission
- Storage Layer: Two-tier storage (hot in-memory + persistent Valkey)
- Connection Manager: Tracks active client connections
- Delivery Layer: WebSocket and long-polling servers
Configuration
Configuration is managed via YAML file or environment variables:
server:
http_port: 8080
read_timeout: 30s
write_timeout: 30s
valkey:
host: localhost
port: 6379
password: ""
db: 0
pool_size: 100
# Persistence settings (configured in valkey.conf)
# appendonly yes
# appendfsync everysec
storage:
persistent_tier_retention: 24h # Valkey streams retention
max_messages_per_topic: 1000000 # Trim streams at this count
API Reference
See API.md for complete API documentation.
Authentication API
POST /auth/login- Authenticate and obtain JWT token (OAuth2 compatible)POST /auth/register- Create new user (admin only)GET /auth/me- Get current user informationGET /auth/users- List all users (admin only)PATCH /auth/users/{user_id}- Update user (admin only)DELETE /auth/users/{user_id}- Delete user (admin only)
Topic Management API
POST /api/v1/topics- Create a new topicGET /api/v1/topics- List topics accessible to current userGET /api/v1/topics/{topic_name}- Get topic detailsPUT /api/v1/topics/{topic_name}- Update topic metadata (owner only)DELETE /api/v1/topics/{topic_name}- Delete topic (owner only)POST /api/v1/topics/{topic_name}/permissions- Grant user access to topic (owner only)GET /api/v1/topics/{topic_name}/permissions- List topic permissions (owner only)DELETE /api/v1/topics/{topic_name}/permissions/{user_id}- Revoke user access (owner only)GET /api/v1/topics/stats- Topic statistics (admin only)
Producer API
POST /api/v1/messages- Send a single messagePOST /api/v1/messages/bulk- Send multiple messages
Consumer API
GET /ws- WebSocket connection endpoint (query param:token)POST /messages/poll- Long-polling endpoint for message retrievalGET /messages/poll/stats- Poll client statistics
Management API
GET /health- Health check endpoint (no auth required)GET /ready- Readiness check endpoint (no auth required)GET /metrics- Prometheus metrics
Performance
Benchmarks
Valkey Tuning
Key configuration for optimal performance:
# valkey.conf
maxmemory 8gb
maxmemory-policy allkeys-lru
appendonly yes
appendfsync everysec # Balance between durability and performance
save 900 1
save 300 10
save 60 10000
Security
Authentication & Authorization
- JWT Authentication: All API endpoints (except health/ready) require valid JWT tokens
- Token Expiration: Access tokens expire after 1 hour (configurable)
- Password Hashing: User passwords are securely hashed using industry-standard algorithms
- Permission-Based Access Control:
admin: Full system access including user and topic managementread: Subscribe to and receive messages from accessible topicswrite: Publish messages to accessible topics
- Topic-Level Security:
- Topics are owned by the user who creates them
- Owners can grant access to specific users
- Public topics are readable by any authenticated user
- Private topics are only accessible to the owner and explicitly granted users
Best Practices
- Store JWT secret key securely via environment variables
- Use strong passwords (minimum 8 characters)
- Rotate admin passwords regularly
- Grant minimum required permissions to users
- Use HTTPS in production to protect tokens in transit
- Implement rate limiting at the reverse proxy level
Development
Local Development
# Create virtual environment
python3 -m venv venv
source venv/bin/activate
# Install development dependencies
pip install -e ".[dev]"
# Run tests
pytest
# Run linting
tox -e lint,mypy
Creating a Release
For maintainers: See RELEASE.md for detailed instructions on creating a new release.
Quick summary:
- Update version in
pyproject.toml - Commit and push changes
- Create and push a version tag:
git tag v0.2.0 && git push origin v0.2.0 - GitHub Actions will automatically publish to PyPI and Docker registries
Contributing
Contributions are welcome! Please read CONTRIBUTING.md for guidelines.
- Fork the repository
- Create a feature branch
- Make your changes
- Write tests
- Submit a pull request
License
MIT License - see LICENSE for details.
Support
- Documentation: docs/
- Issues: GitHub Issues
- Discussions: GitHub Discussions
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file pulsar_relay-0.1.0.tar.gz.
File metadata
- Download URL: pulsar_relay-0.1.0.tar.gz
- Upload date:
- Size: 24.4 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
b056637d837e21ede00b05365a218af45f127fc88395aa5841abf097a0321e29
|
|
| MD5 |
2a725a3b24de700b95cde11098915854
|
|
| BLAKE2b-256 |
90c652fd59d8381f533681889f82b41bb5dd6d987571463b9db8831ab57ae924
|
Provenance
The following attestation bundles were made for pulsar_relay-0.1.0.tar.gz:
Publisher:
release.yml on mvdbeek/pulsar-relay
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
pulsar_relay-0.1.0.tar.gz -
Subject digest:
b056637d837e21ede00b05365a218af45f127fc88395aa5841abf097a0321e29 - Sigstore transparency entry: 649436464
- Sigstore integration time:
-
Permalink:
mvdbeek/pulsar-relay@35b7e5a6dfd9ce5c7d271cc1fa077065a832c604 -
Branch / Tag:
refs/tags/v0.1.0 - Owner: https://github.com/mvdbeek
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@35b7e5a6dfd9ce5c7d271cc1fa077065a832c604 -
Trigger Event:
push
-
Statement type:
File details
Details for the file pulsar_relay-0.1.0-py3-none-any.whl.
File metadata
- Download URL: pulsar_relay-0.1.0-py3-none-any.whl
- Upload date:
- Size: 14.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
ad45ee4981d11a009b4c2006018e8c2bda75d190f8e3345021b08ca504d324ab
|
|
| MD5 |
660e168a943bb14f361850d70d6e1188
|
|
| BLAKE2b-256 |
f0bc2d04932261bc0b24061c37747290b6fb6a83c4ac3d1332714227b7779323
|
Provenance
The following attestation bundles were made for pulsar_relay-0.1.0-py3-none-any.whl:
Publisher:
release.yml on mvdbeek/pulsar-relay
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
pulsar_relay-0.1.0-py3-none-any.whl -
Subject digest:
ad45ee4981d11a009b4c2006018e8c2bda75d190f8e3345021b08ca504d324ab - Sigstore transparency entry: 649436480
- Sigstore integration time:
-
Permalink:
mvdbeek/pulsar-relay@35b7e5a6dfd9ce5c7d271cc1fa077065a832c604 -
Branch / Tag:
refs/tags/v0.1.0 - Owner: https://github.com/mvdbeek
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@35b7e5a6dfd9ce5c7d271cc1fa077065a832c604 -
Trigger Event:
push
-
Statement type: