Skip to main content

Pure python RTMP server

Project description

pyrtmp: Pure Python RTMP server

  • Pure python
  • AsyncIO with uvloop support
  • Easy to customize
  • Support RTMP(s)
  • Support RTMPT(s)

Quickstart

You have to create your own rtmp controller to decide what to do when user connected or stream received. The below example shows process to receive stream and write them to flv file using RTMP.

If you are looking for RTMPT, please look inside pyrtmp/misc/rtmpt.py P.S. Pull requests are welcome!

Simple RTMP controller

import asyncio  
import logging  
import os  
import tempfile  
  
from pyrtmp import StreamClosedException, RTMPProtocol  
from pyrtmp.messages import SessionManager  
from pyrtmp.messages.audio import AudioMessage  
from pyrtmp.messages.command import NCConnect, NCCreateStream, NSPublish, NSCloseStream, NSDeleteStream  
from pyrtmp.messages.data import MetaDataMessage  
from pyrtmp.messages.protocolcontrol import WindowAcknowledgementSize, SetChunkSize, SetPeerBandwidth  
from pyrtmp.messages.usercontrol import StreamBegin  
from pyrtmp.messages.video import VideoMessage  
from pyrtmp.misc.flvdump import FLVStream, FLVMediaType  
  
logging.basicConfig(level=logging.DEBUG)  
logger = logging.getLogger(__name__)  
logger.setLevel(logging.DEBUG)  
  
  
async def simple_controller(reader, writer):  
    session = SessionManager(reader=reader, writer=writer)  
    flv = None  
    try:  
        logger.debug(f'Client connected {session.peername}')  
  
        # do handshake  
        await session.handshake()  
  
        # read chunks  
        while True:  
                chunk = await session.read_chunk_from_stream()  
                message = chunk.as_message()  
                logger.debug(f"Receiving {str(message)} {message.chunk_id}")  
                if isinstance(message, NCConnect):  
                    session.write_chunk_to_stream(WindowAcknowledgementSize(ack_window_size=5000000))  
                    session.write_chunk_to_stream(SetPeerBandwidth(ack_window_size=5000000, limit_type=2))  
                    session.write_chunk_to_stream(StreamBegin(stream_id=0))  
                    session.write_chunk_to_stream(SetChunkSize(chunk_size=8192))  
                    session.writer_chunk_size = 8192  
                    session.write_chunk_to_stream(message.create_response())  
                    await session.drain()  
                    logger.debug("Response to NCConnect")  
                elif isinstance(message, WindowAcknowledgementSize):  
                    pass  
                elif isinstance(message, NCCreateStream):  
                    session.write_chunk_to_stream(message.create_response())  
                    await session.drain()  
                    logger.debug("Response to NCCreateStream")  
                elif isinstance(message, NSPublish):  
                    # create flv file at temp  
                    flv = FLVStream(os.path.join(tempfile.gettempdir(), message.publishing_name))  
                    session.write_chunk_to_stream(StreamBegin(stream_id=1))  
                    session.write_chunk_to_stream(message.create_response())  
                    await session.drain()  
                    logger.debug("Response to NSPublish")  
                elif isinstance(message, MetaDataMessage):  
                    # Write meta data to file  
                    flv.write(0, message.to_raw_meta(), FLVMediaType.OBJECT)  
                elif isinstance(message, SetChunkSize):  
                    session.reader_chunk_size = message.chunk_size  
                elif isinstance(message, VideoMessage):  
                    # Write video data to file  
                    flv.write(message.timestamp, message.payload, FLVMediaType.VIDEO)  
                elif isinstance(message, AudioMessage):  
                    # Write data data to file  
                    flv.write(message.timestamp, message.payload, FLVMediaType.AUDIO)  
                elif isinstance(message, NSCloseStream):  
                    pass  
                    elif isinstance(message, NSDeleteStream):  
                    pass  
                else:  
                    logger.debug(f"Unknown message {str(message)}")  
  
    except StreamClosedException as ex:  
        logger.debug(f"Client {session.peername} disconnected!")  
    finally:  
        if flv:  
            flv.close()  
  
  
async def serve_rtmp(use_protocol=True):  
    loop = asyncio.get_running_loop()  
    if use_protocol is True:  
        server = await loop.create_server(lambda: RTMPProtocol(controller=simple_controller, loop=loop), '0.0.0.0', 1935)  
    else:  
        server = await asyncio.start_server(simple_controller, '0.0.0.0', 1935)  
    addr = server.sockets[0].getsockname()  
    logger.info(f'Serving on {addr}')  
    async with server:  
        await server.serve_forever()  
  
  
if __name__ == "__main__":  
    asyncio.run(serve_rtmp())

Deployment

Roadmap

  • Supported HTTP2/3
  • RTMP over websocket
  • Support AMF3
  • ReStream
  • HLS Playback
  • Documentation

Changelog

  • 0.1.3 Fix ReadMe
  • 0.1.2 Fix ReadMe
  • 0.1.1 Fix ReadMe
  • 0.1.0 Initial version

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pyrtmp-0.1.3.tar.gz (15.2 kB view hashes)

Uploaded Source

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page