Skip to main content

SSE plugin for Starlette

Project description

Server Sent Events for Starlette and FastAPI

Downloads PyPI Version Build Status Code Coverage

Implements the Server-Sent Events specification.

Background: https://sysid.github.io/server-sent-events/

Installation:

pip install sse-starlette

Usage:

import asyncio
import uvicorn
from starlette.applications import Starlette
from starlette.routing import Route
from sse_starlette.sse import EventSourceResponse

async def numbers(minimum, maximum):
    for i in range(minimum, maximum + 1):
        await asyncio.sleep(0.9)
        yield dict(data=i)

async def sse(request):
    generator = numbers(1, 5)
    return EventSourceResponse(generator)

routes = [
    Route("/", endpoint=sse)
]

app = Starlette(debug=True, routes=routes)

if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8000, log_level='info')

Output: output

Caveat: SSE streaming does not work in combination with GZipMiddleware.

Be aware that for proper server shutdown your application must stop all running tasks (generators). Otherwise you might experience the following warnings at shutdown: Waiting for background tasks to complete. (CTRL+C to force quit).

Client disconnects need to be handled in your Request handler (see example.py):

async def endless(req: Request):
    async def event_publisher():
        i = 0
        try:
          while True:
              i += 1
              yield dict(data=i)
              await asyncio.sleep(0.2)
        except asyncio.CancelledError as e:
          _log.info(f"Disconnected from client (via refresh/close) {req.client}")
          # Do any other cleanup, if any
          raise e
    return EventSourceResponse(event_publisher())

Thread Safety with SQLAlchemy Sessions and Similar Objects

The streaming portion of this package is accomplished via anyio TaskGroups. Care needs to be taken to avoid passing objects that are not thread-safe to generators you use to yield streaming data.

For example, if you are using SQLAlchemy, you should not use/pass an AsyncSession object to your generator:

# ❌ This can result in "The garbage collector is trying to clean up non-checked-in connection..." errors
async def bad_route():
    async with AsyncSession() as session:
        async def generator():
            async for row in session.execute(select(User)):
                yield dict(data=row)

    return EventSourceResponse(generator)

Instead, ensure you create sessions within the generator itself

# ✅ This is safe
async def good_route():
    async def generator():
        async with AsyncSession() as session:
            async for row in session.execute(select(User)):
                yield dict(data=row)

    return EventSourceResponse(generator)

Special use cases

Customize Ping

By default, the server sends a ping every 15 seconds. You can customize this by:

  1. setting the ping parameter
  2. by changing the ping event to a comment event so that it is not visible to the client
@router.get("")
async def handle():
    generator = numbers(1, 100)
    return EventSourceResponse(
        generator,
        headers={"Server": "nini"},
        ping=5,
        ping_message_factory=lambda: ServerSentEvent(**{"comment": "You can't see\r\nthis ping"}),
    )

SSE Send Timeout

To avoid 'hanging' connections in case HTTP connection from a certain client was kept open, but the client stopped reading from the connection you can specifiy a send timeout (see #89).

EventSourceResponse(..., send_timeout=5)  # terminate hanging send call after 5s

Fan out Proxies

Fan out proxies usually rely on response being cacheable. To support that, you can set the value of Cache-Control. For example:

return EventSourceResponse(
        generator(), headers={"Cache-Control": "public, max-age=29"}
    )

Error Handling

See example: examples/error_handling.py

Sending Responses without Async Generators

Async generators can expose tricky error and cleanup behavior especially when they are interrupted.

Background: Cleanup in async generators.

Example no_async_generators.py shows an alternative implementation that does not rely on async generators but instead uses memory channels (examples/no_async_generators.py).

Using pytest to test SSE Endpoints

When testing more than a single SSE endpoint via pytest, one may encounter the following error: RuntimeError: <asyncio.locks.Event object at 0xxxx [unset]> is bound to a different event loop.

A workaround to fix this error is to use the following fixture on all tests that use SSE:

@pytest.fixture
def reset_sse_starlette_appstatus_event():
    """
    Fixture that resets the appstatus event in the sse_starlette app.

    Should be used on any test that uses sse_starlette to stream events.
    """
    # See https://github.com/sysid/sse-starlette/issues/59
    from sse_starlette.sse import AppStatus

    AppStatus.should_exit_event = None

For details, see Issue#59.

Development, Contributing

  1. install pdm: pip install pdm
  2. install dependencies using pipenv: pdm install -d.
  3. To run tests:

Makefile

  • make sure your virtualenv is active
  • check Makefile for available commands and development support, e.g. run the unit tests:
make test
make tox

For integration testing you can use the provided examples in tests and examples.

If you are using Postman, please see: https://github.com/sysid/sse-starlette/issues/47#issuecomment-1445953826

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

sse_starlette-2.2.1.tar.gz (17.4 kB view details)

Uploaded Source

Built Distribution

sse_starlette-2.2.1-py3-none-any.whl (10.1 kB view details)

Uploaded Python 3

File details

Details for the file sse_starlette-2.2.1.tar.gz.

File metadata

  • Download URL: sse_starlette-2.2.1.tar.gz
  • Upload date:
  • Size: 17.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.0.0 CPython/3.12.7

File hashes

Hashes for sse_starlette-2.2.1.tar.gz
Algorithm Hash digest
SHA256 54470d5f19274aeed6b2d473430b08b4b379ea851d953b11d7f1c4a2c118b419
MD5 95482e546fc787a78b721f98a3425185
BLAKE2b-256 71a480d2a11af59fe75b48230846989e93979c892d3a20016b42bb44edb9e398

See more details on using hashes here.

File details

Details for the file sse_starlette-2.2.1-py3-none-any.whl.

File metadata

File hashes

Hashes for sse_starlette-2.2.1-py3-none-any.whl
Algorithm Hash digest
SHA256 6410a3d3ba0c89e7675d4c273a301d64649c03a5ef1ca101f10b47f895fd0e99
MD5 4ba1aa6d6a5f65643deb5030c9a629c8
BLAKE2b-256 d9e05b8bd393f27f4a62461c5cf2479c75a2cc2ffa330976f9f00f5f6e4f50eb

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page