Skip to main content

A Django-based system for managing long-running tasks with Server-Sent Events (SSE) streaming

Project description

Django Streaming Coordinator

A Django-based system for managing long-running tasks with Server-Sent Events (SSE) streaming. Tasks continue running even if clients disconnect, and multiple clients can connect to the same task simultaneously.

Features

  • Persistent Tasks: Tasks continue running in the background even when clients disconnect
  • Multiple Clients: Multiple clients can connect to the same task and receive real-time updates
  • Server-Sent Events (SSE): Uses SSE for efficient real-time streaming
  • Easy Task Creation: Simple API for creating custom streaming tasks via HTTP or programmatically
  • Client Library: Built-in httpx-based client for easy task creation and management
  • Generator Support: Process sync and async generators with automatic event streaming
  • HTTP Integration: Use httpx to fetch data from APIs within tasks
  • Shared Client: Single httpx client per process for efficient connection pooling
  • Unix Socket Support: Can bind to Unix sockets or TCP ports
  • Async/Await: Built on modern Python asyncio for efficient concurrent operations

Installation

# Install dependencies
poetry install

# Run migrations
poetry run python manage.py migrate

Quick Start

1. Create a Custom Task

Subclass StreamTask and implement the async process() method:

from streaming.models import StreamTask
import asyncio

class MyCustomTask(StreamTask):
    # Add your custom fields
    title = models.CharField(max_length=255)

    async def process(self):
        # Send start event
        await self.send_event('start', {
            'message': f'Starting task: {self.title}'
        })

        # Do your work and send events
        for i in range(10):
            await asyncio.sleep(1)
            await self.send_event('progress', {
                'step': i + 1,
                'total': 10,
                'message': f'Processing step {i + 1}'
            })

        # Send completion event
        await self.send_event('complete', {
            'message': 'Task completed successfully'
        })

2. Run the Streaming Server

# Using Unix socket (default)
poetry run python manage.py runserver_stream

# Using TCP port
poetry run python manage.py runserver_stream --host 127.0.0.1 --port 8888

# Custom Unix socket path
poetry run python manage.py runserver_stream --socket /tmp/my-stream.sock

3. Create and Start a Task

Tasks are created through Django ORM and started with the coordinator:

from tests.models import ExampleTask
from streaming.coordinator import coordinator

# Create the task
task = await ExampleTask.objects.acreate(message="Hello, World!")

# Start the task (will run in background)
await coordinator.start_task(task, 'tests', 'ExampleTask')

# Clients can now connect via HTTP SSE
# GET /stream/tests/ExampleTask/{task.pk}

4. Connect from Client

Using JavaScript:

const eventSource = new EventSource('/stream/ExampleTask/1');

eventSource.addEventListener('start', (event) => {
    const data = JSON.parse(event.data);
    console.log('Task started:', data);
});

eventSource.addEventListener('progress', (event) => {
    const data = JSON.parse(event.data);
    console.log('Progress:', data);
});

eventSource.addEventListener('complete', (event) => {
    const data = JSON.parse(event.data);
    console.log('Task completed:', data);
    eventSource.close();
});

eventSource.addEventListener('error', (event) => {
    const data = JSON.parse(event.data);
    console.error('Error:', data);
    eventSource.close();
});

Using Python (httpx + httpx-sse):

import httpx
from httpx_sse import aconnect_sse
import json

async with httpx.AsyncClient() as client:
    async with aconnect_sse(
        client, 'GET', 'http://127.0.0.1:8888/stream/ExampleTask/1'
    ) as event_source:
        async for event in event_source.aiter_sse():
            data = json.loads(event.data)
            print(f"{event.event}: {data}")

            if event.event == 'complete':
                break

API

Client API

The library provides a shared httpx client for efficient HTTP requests.

get_client(base_url="http://127.0.0.1:8888")

Get the shared httpx client instance (singleton) for efficient connection pooling across your entire process.

Example:

from streaming import get_client

client = get_client()

# Use the shared httpx client for any HTTP requests
response = await client.async_client.get("https://api.example.com/data")
data = response.json()

# Or use synchronously
response = client.sync_client.get("https://api.example.com/data")

StreamTask Model

Base abstract model for all streaming tasks.

Fields:

  • created_at: Timestamp when task was created
  • updated_at: Timestamp when task was last updated
  • completed_at: Timestamp when task completed (null if not completed)
  • final_value: JSONField storing the final return value from process()

Methods:

async send_event(event_type: str, data: dict)

Send an event to all connected clients.

Parameters:

  • event_type: Type of event (e.g., 'start', 'progress', 'complete', 'error')
  • data: Dictionary of data to send to clients

Example:

await self.send_event('progress', {
    'step': 5,
    'total': 10,
    'message': 'Halfway there!'
})

async process()

Override this method to implement your task logic. This is where your task's work happens.

async mark_completed(final_value=None)

Mark the task as completed. Called automatically by the coordinator when process() finishes.

async process_generator(generator: AsyncGenerator[dict, None])

Process an async generator, automatically sending each yielded value as a progress event.

Example:

async def process(self):
    async def my_generator():
        for i in range(10):
            await asyncio.sleep(0.1)
            yield {'step': i, 'message': f'Step {i}'}

    await self.send_event('start', {})
    final = await self.process_generator(my_generator())
    await self.send_event('complete', {})
    return final

async process_sync_generator(generator: Generator[dict, None, None])

Process a sync generator, automatically sending each yielded value as a progress event.

Example:

async def process(self):
    def my_generator():
        for i in range(10):
            yield {'step': i, 'message': f'Step {i}'}

    await self.send_event('start', {})
    final = await self.process_sync_generator(my_generator())
    await self.send_event('complete', {})
    return final

TaskCoordinator

Singleton that manages running tasks.

Methods:

async start_task(task_instance: StreamTask, model_name: str)

Start a task in the background.

async get_task_instance(model_name: str, task_id: int) -> StreamTask

Get a running task instance or load from database.

is_task_running(model_name: str, task_id: int) -> bool

Check if a task is currently running.

HTTP Endpoints

GET /stream/{app_name}/{model_name}/{task_id}

Connect to a task's SSE stream.

Response Format (if task is running):

event: start
data: {"message": "...", "_task_id": 1, "_app": "tests", "_model": "ExampleTask", "_timestamp": "..."}

event: progress
data: {"step": 1, "total": 3, "_task_id": 1, "_app": "tests", "_model": "ExampleTask", "_timestamp": "..."}

event: complete
data: {"message": "...", "_task_id": 1, "_app": "tests", "_model": "ExampleTask", "_timestamp": "..."}

Response Format (if task is completed):

{
  "status": "completed",
  "final_value": "...",
  "completed_at": "2025-01-15T12:34:56.789Z"
}

GET /health

Health check endpoint. Returns 200 OK.

Testing

Run tests with Django's test runner:

# Run all tests
poetry run python manage.py test

# Run specific test class
poetry run python manage.py test streaming.tests.StreamingSystemTests

# Run with verbose output
poetry run python manage.py test --verbosity=2

Architecture

  1. StreamTask: Abstract Django model that defines the interface for streaming tasks
  2. TaskCoordinator: Singleton that manages task lifecycle and keeps tasks running
  3. SSE Server: asgineer-based ASGI server that handles HTTP SSE connections
  4. Management Command: runserver_stream to start the server on Unix socket or TCP port

Advanced Examples

Using Async Generators

Process data streams with automatic event emission:

from streaming import StreamTask

class AsyncGeneratorTask(StreamTask):
    count = models.IntegerField(default=5)

    async def process(self):
        async def progress_generator():
            for i in range(self.count):
                await asyncio.sleep(0.1)
                yield {
                    'step': i + 1,
                    'total': self.count,
                    'percentage': ((i + 1) / self.count) * 100
                }

        await self.send_event('start', {'total_steps': self.count})
        final = await self.process_generator(progress_generator())
        await self.send_event('complete', {'message': 'Done'})
        return final

Using Sync Generators

Process data with regular (non-async) generators:

from streaming import StreamTask

class SyncGeneratorTask(StreamTask):
    items = models.JSONField(default=list)

    async def process(self):
        def item_processor():
            for idx, item in enumerate(self.items):
                yield {
                    'index': idx,
                    'item': item,
                    'processed': f'Processed: {item}'
                }

        await self.send_event('start', {'total_items': len(self.items)})
        final = await self.process_sync_generator(item_processor())
        await self.send_event('complete', {})
        return final

Using httpx to Fetch Data

Make HTTP requests within tasks:

import httpx
from streaming import StreamTask

class HttpxFetchTask(StreamTask):
    url = models.URLField()

    async def process(self):
        await self.send_event('start', {'url': self.url})

        async with httpx.AsyncClient(timeout=30.0) as client:
            await self.send_event('progress', {
                'status': 'fetching',
                'message': f'Fetching from {self.url}'
            })

            response = await client.get(self.url)
            response.raise_for_status()
            data = response.json()

            await self.send_event('progress', {
                'status': 'fetched',
                'status_code': response.status_code
            })

            await self.send_event('complete', {
                'message': 'Data fetched successfully'
            })

            return data

Example: ExampleTask

The project includes a simple example task:

class ExampleTask(StreamTask):
    message = models.CharField(max_length=255, default="Hello from ExampleTask")

    async def process(self):
        await self.send_event('start', {
            'message': self.message,
            'total_steps': 3
        })

        for i in range(1, 4):
            await asyncio.sleep(2)
            await self.send_event('progress', {
                'step': i,
                'total_steps': 3,
                'message': f"Step {i} of 3"
            })

        await self.send_event('complete', {
            'message': 'Task completed successfully'
        })

Test it:

# Start server
poetry run python manage.py runserver_stream --port 8888

# In another terminal, create a task
poetry run python manage.py shell
>>> from streaming.models import ExampleTask
>>> from streaming.coordinator import coordinator
>>> import asyncio
>>> task = ExampleTask.objects.create(message="Test")
>>> asyncio.run(coordinator.start_task(task, 'ExampleTask'))

# Connect with curl or browser
curl http://127.0.0.1:8888/stream/ExampleTask/1

Requirements

  • Python 3.11+
  • Django 5.2+
  • asgineer
  • uvicorn
  • httpx (for testing)
  • httpx-sse (for testing)

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

django_streaming_coordinator-0.2.0.tar.gz (15.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

django_streaming_coordinator-0.2.0-py3-none-any.whl (15.4 kB view details)

Uploaded Python 3

File details

Details for the file django_streaming_coordinator-0.2.0.tar.gz.

File metadata

File hashes

Hashes for django_streaming_coordinator-0.2.0.tar.gz
Algorithm Hash digest
SHA256 2f480357f87a5117fe6c526293f4acae19d2cf58d4da21f2d0696dbc3b12a8b6
MD5 2852d9fee44449393c2f487f77648bad
BLAKE2b-256 de979284553d8058ae347f7957cf1fccd287e7f93c2f2d26abbb300e482794c1

See more details on using hashes here.

File details

Details for the file django_streaming_coordinator-0.2.0-py3-none-any.whl.

File metadata

File hashes

Hashes for django_streaming_coordinator-0.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 2846b0b6dd4cc7f2a34916bf76c2b40feb4d931f72b1e21e204f9008389188cd
MD5 ff2496a79d510ac2638b0df1779bd531
BLAKE2b-256 8a440de542b9dc116ade23d6200fd960fe4edef7d4b0f801e6cfaa777c4632c2

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page