Skip to main content

A distributed event-driven RPC framework for SyftBox that enables file-based communication, request handling, and real-time file system monitoring across datasites.

Project description

SyftEvent

PyPI version Python 3.9+ License

A distributed event-driven RPC framework for SyftBox that enables file-based communication, request handling, and real-time file system monitoring across datasites.

Features

  • 🔄 Event-Driven Architecture: React to file system changes in real-time
  • 🌐 Distributed RPC: File-based communication between datasites
  • 📁 File System Monitoring: Watch for changes across multiple directories with glob patterns
  • 🔒 Secure Communication: Built-in permission management for datasite access
  • Async Support: Handle both synchronous and asynchronous request handlers
  • 📊 Schema Generation: Automatic API schema generation and publishing
  • 🔌 Router Support: Organize endpoints with modular routers
  • 🧹 Automatic Cleanup: Periodic cleanup of old request/response files with configurable retention
  • 📂 Organized File Structure: User-specific directory organization for better request management

Installation

pip install syft-event

Quick Start

Basic RPC Server

from syft_event import SyftEvents

# Create a SyftEvents instance
box = SyftEvents("my_app")

# Define an RPC endpoint
@box.on_request("/hello")
def hello_handler(name: str) -> str:
    return f"Hello, {name}!"

# Define another endpoint
@box.on_request("/calculate")
def calculate_handler(a: int, b: int, operation: str = "add") -> int:
    if operation == "add":
        return a + b
    elif operation == "multiply":
        return a * b
    else:
        raise ValueError("Unsupported operation")

# Start the server
box.run_forever()

Note: RPC endpoints automatically monitor both FileCreatedEvent and FileMovedEvent for request files. This is because request files can arrive via two different mechanisms: files delivered through websockets are initially stored as temporary files and then renamed to the target request file (triggering a move event), while files downloaded directly from the blob store are created directly (triggering a create event).

File System Monitoring

from syft_event import SyftEvents
from watchdog.events import FileCreatedEvent, FileModifiedEvent, FileMovedEvent

box = SyftEvents("file_monitor")

# Watch for JSON files in your datasite (responds to create, modify, and move events by default)
@box.watch("{datasite}/**/*.json")
def on_json_change(event):
    if hasattr(event, 'dest_path') and event.dest_path:
        print(f"JSON file moved: {event.src_path} -> {event.dest_path}")
    else:
        print(f"JSON file changed: {event.src_path}")

# Watch for specific file patterns with custom event filtering
@box.watch(["**/*.txt", "**/*.md"], event_filter=[FileCreatedEvent])
def on_text_files_created(event):
    print(f"Text file created: {event.src_path}")

# Watch for file moves specifically
@box.watch("**/*.log", event_filter=[FileMovedEvent])
def on_log_files_moved(event):
    print(f"Log file moved: {event.src_path} -> {event.dest_path}")

box.run_forever()

Using Routers

from syft_event import SyftEvents, EventRouter

# Create a router for user-related endpoints
user_router = EventRouter()

@user_router.on_request("/profile")
def get_profile(user_id: str):
    return {"user_id": user_id, "name": "John Doe"}

@user_router.on_request("/settings")
def get_settings(user_id: str):
    return {"theme": "dark", "notifications": True}

# Main application
box = SyftEvents("user_service")

# Include the router with a prefix
box.include_router(user_router, prefix="/api/v1/users")

box.run_forever()

Async Request Handlers

import asyncio
from syft_event import SyftEvents

box = SyftEvents("async_app")

@box.on_request("/async-task")
async def async_handler(task_id: str) -> dict:
    # Simulate async work
    await asyncio.sleep(1)
    return {"task_id": task_id, "status": "completed"}

box.run_forever()

Automatic Cleanup Configuration

SyftEvent now includes automatic cleanup of old request and response files to prevent disk space issues:

from syft_event import SyftEvents

# Create with custom cleanup settings
box = SyftEvents(
    "my_app",
    cleanup_expiry="7d",    # Keep files for 7 days (default: 30d)
    cleanup_interval="1h"   # Run cleanup every hour (default: 1d)
)

# Check if cleanup is running
if box.is_cleanup_running():
    print("Cleanup service is active")

# Get cleanup statistics
stats = box.get_cleanup_stats()
print(f"Deleted {stats.requests_deleted} requests and {stats.responses_deleted} responses")

Standalone Cleanup Utility

You can also run the cleanup utility independently:

from syft_event.cleanup import PeriodicCleanup

# Create a standalone cleanup instance
cleanup = PeriodicCleanup(
    app_name="my_app",
    cleanup_interval="1d",      # How often to run cleanup
    cleanup_expiry="30d",       # How long to keep files
    on_cleanup_complete=lambda stats: print(f"Cleaned up {stats.requests_deleted} files")
)

# Start cleanup in background
cleanup.start()

# Or run cleanup immediately
stats = cleanup.cleanup_now()
print(f"Immediate cleanup: {stats.requests_deleted} files deleted")

# Stop cleanup
cleanup.stop()

API Reference

SyftEvents

The main class for creating event-driven applications.

Constructor

SyftEvents(
    app_name: str, 
    publish_schema: bool = True, 
    client: Optional[Client] = None,
    cleanup_expiry: str = "30d",
    cleanup_interval: str = "1d"
)
  • app_name: Name of your application
  • publish_schema: Whether to automatically generate and publish API schemas
  • client: Optional SyftBox client instance
  • cleanup_expiry: How long to keep request/response files (e.g., "30d", "7d", "2h")
  • cleanup_interval: How often to run cleanup (e.g., "1d", "1h", "30m")

Methods

on_request(endpoint: str)

Decorator to register RPC request handlers.

@box.on_request("/my-endpoint")
def handler(param1: str, param2: int = 10) -> dict:
    return {"result": param1 * param2}
watch(glob_path, event_filter=None)

Decorator to register file system watchers. By default, watches for FileCreatedEvent, FileModifiedEvent, and FileMovedEvent.

@box.watch("**/*.json")
def on_json_change(event):
    if hasattr(event, 'dest_path') and event.dest_path:
        print(f"File moved: {event.src_path} -> {event.dest_path}")
    else:
        print(f"File changed: {event.src_path}")
include_router(router: EventRouter, prefix: str = "")

Include routes from an EventRouter instance.

run_forever()

Start the event loop and run until interrupted.

start() / stop()

Start or stop the service programmatically.

is_cleanup_running()

Check if the automatic cleanup service is currently running.

get_cleanup_stats()

Get statistics about the cleanup operations.

stats = box.get_cleanup_stats()
print(f"Requests deleted: {stats.requests_deleted}")
print(f"Responses deleted: {stats.responses_deleted}")
print(f"Errors: {stats.errors}")
print(f"Last cleanup: {stats.last_cleanup}")

EventRouter

Helper class for organizing related endpoints.

from syft_event import EventRouter

router = EventRouter()

@router.on_request("/endpoint")
def handler():
    return "response"

PeriodicCleanup

Utility class for managing automatic cleanup of old request and response files.

Constructor

PeriodicCleanup(
    app_name: str,
    cleanup_interval: str = "1d",
    cleanup_expiry: str = "30d",
    client: Optional[Client] = None,
    on_cleanup_complete: Optional[Callable[[CleanupStats], None]] = None
)

Methods

start() / stop()

Start or stop the periodic cleanup service.

cleanup_now()

Perform cleanup immediately without waiting for the next interval.

get_stats()

Get current cleanup statistics.

is_running()

Check if the cleanup service is currently running.

File Structure

When you create a SyftEvents app, it sets up the following directory structure:

~/SyftBox/datasites/{your-email}/app_data/{app_name}/
├── rpc/
│   ├── syft.pub.yaml          # Permission configuration
│   ├── rpc.schema.json        # Generated API schema
│   └── {endpoint}/            # Endpoint directories
│       ├── .syftkeep         # Directory marker
│       └── {sender-email}/    # User-specific subdirectories
│           ├── *.request     # Incoming requests from this user
│           └── *.response    # Generated responses for this user

Directory Organization

  • User-Specific Structure: Requests are now organized by sender email address, providing better isolation and organization
  • Legacy Support: The system automatically migrates old request files to the new structure
  • Automatic Cleanup: Old request/response files are automatically cleaned up based on configurable retention policies

Configuration

Permissions

SyftEvent automatically creates a syft.pub.yaml file with appropriate permissions:

rules:
- pattern: rpc.schema.json
  access:
    read:
    - '*'
- pattern: '**/{{.UserEmail}}/*.request'
  access:
    read:
    - 'USER'
    write: 
    - 'USER'
- pattern: '**/{{.UserEmail}}/*.response'
  access:
    read: 
    - 'USER'
    write: 
    - 'USER'

Time Interval Format

The cleanup utility supports human-readable time intervals:

  • Single units: "1d", "2h", "30m", "45s"
  • Combined units: "1d2h30m", "2h15m30s", "1d12h30m45s"
  • Case insensitive: "1D" is equivalent to "1d"

Examples:

  • "1d" = 1 day (86400 seconds)
  • "2h" = 2 hours (7200 seconds)
  • "30m" = 30 minutes (1800 seconds)
  • "1d2h30m" = 1 day, 2 hours, 30 minutes (95400 seconds)

Advanced Usage

Custom Response Objects

from syft_event import SyftEvents, Response
from syft_rpc.protocol import SyftStatus

box = SyftEvents("advanced_app")

@box.on_request("/custom-response")
def custom_handler() -> Response:
    return Response(
        body={"message": "Custom response"},
        status_code=SyftStatus.SYFT_201_CREATED,
        headers={"X-Custom-Header": "value"}
    )

State Management

box = SyftEvents("stateful_app")

# Access shared state
box.state["counter"] = 0

@box.on_request("/increment")
def increment():
    box.state["counter"] += 1
    return {"counter": box.state["counter"]}

Requirements

  • Python 3.9+
  • syft-rpc >= 0.2.4
  • pathspec >= 0.12.1
  • pydantic >= 2.10.4
  • watchdog >= 6.0.0
  • loguru >= 0.7.3

Changelog

Version 0.2.7+

  • 🧹 Automatic Cleanup: Added periodic cleanup utility for old request/response files
  • 📂 User-Specific Organization: Requests are now organized by sender email address
  • 🔄 Legacy Migration: Automatic migration of old request files to new structure
  • ⚙️ Configurable Retention: Customizable cleanup intervals and file retention periods
  • 📊 Cleanup Statistics: Track cleanup operations with detailed statistics

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

syft_event-0.4.3.tar.gz (16.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

syft_event-0.4.3-py3-none-any.whl (18.3 kB view details)

Uploaded Python 3

File details

Details for the file syft_event-0.4.3.tar.gz.

File metadata

  • Download URL: syft_event-0.4.3.tar.gz
  • Upload date:
  • Size: 16.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.14

File hashes

Hashes for syft_event-0.4.3.tar.gz
Algorithm Hash digest
SHA256 88b945a51961bc18d24f5ddfc1ac2b5fd479a9a912fa0e39834653276fdf314f
MD5 04b6f57a8f051b473e72ec01ea525bed
BLAKE2b-256 b49e007dcd71696a117a7c693da10eab6aa523972078df066a6cf5bb81ea445c

See more details on using hashes here.

File details

Details for the file syft_event-0.4.3-py3-none-any.whl.

File metadata

  • Download URL: syft_event-0.4.3-py3-none-any.whl
  • Upload date:
  • Size: 18.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.14

File hashes

Hashes for syft_event-0.4.3-py3-none-any.whl
Algorithm Hash digest
SHA256 7b35c7f28ad2062c21445e38d784934b3720458f80ce09157b624e5d8c802b17
MD5 f14fc2064953d9ce0f39495b9dc5db93
BLAKE2b-256 b74ddb1a01c7443ae3b638ce458997da90fdd4bd4f06e8f8ab4f351822d952a7

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page