Skip to main content

pythonic Deepstream.

Project description

pythia

Pythonic Deepstream.


PyPI - Version PyPI - Wheels Docs


Conventional Commits Code style: black security: bandit


NVidia Deepstream is an excellent gstreamer framework which allows to build ai-powered, performant applications running on nvidia hardware. Its python API and bindings, however, have a bunch of painpoints which we've here collected and addressed with pythia:

  • Metadata Extraction: Deepstream metadata extraction requires using buffer probes: pythia provides an easy to use interface which splits metadata extraction and processing.
  • Metadata Iteration: pyds api iterators are not pythonic: pythia provides intuitive deepstream metadata iterators to use, like for frame in frames_per_batch(buffer) wrapping pybind c++ casting and iteration.
  • Python boilerplate: Python gstreamer apps get very large very fast. Pythia abstracts away common stuff and lets you focus on your application.
  • Quick prototyping: Sometimes you just want to check the performance of a new model (eg after exporting from Nvidia TAO), or verify the environment. Pythia comes with ready-to-run demo applications, and a gst-launch-like cli.

pythia offers:

  • Common metadata extraction and parsing utilities.
  • Workers and queues management in the background, to offload processing outside of the buffer probe.
  • Ready to use Docker images for both aarch64 (jetson) and x86_64 (nvidia gpu).

Examples

Running pythia from the cli

gst-pylaunch

You can run familiar pipelines and attach buffer probes from simple python modules.

Create Files

  • Create a file probe.py with:
from pythia import objects_per_batch

def gen_detections(batch_meta):
    for frame, detection in objects_per_batch(batch_meta):
        box = detection.rect_params
        yield {
            "frame_num": frame.frame_num,
            "label": detection.obj_label,
            "left": box.left,
            "top": box.top,
            "width": box.width,
            "height": box.height,
            "confidence": detection.confidence,
        }
  • Create a file pipeline.txt with:
uridecodebin
  uri=file://{input}
! identity
  eos-after=30
! nvvideoconvert
! muxer.sink_0
nvstreammux
  name=muxer width=1280 height=720 batch-size=1
! nvinfer
  name=pgie
  config-file-path={pgie-conf}
! nvvideoconvert
! nvdsosd
! nvvideoconvert
! queue
! x264enc
! mp4mux
! filesink location={output}

Running pythia

  • run the application with:
$ gst-pylaunch \
  -p ./pipeline.txt \
  --pgie-conf=/opt/nvidia/deepstream/deepstream/samples/configs/deepstream-app/config_infer_primary.txt \
  --input=/opt/nvidia/deepstream/deepstream/samples/streams/sample_720p.mp4 \
  --output=/tmp/overlayed.mp4 \
  --probe=probe.py:gen_detections@pgie.src

Note the --pgie-conf, --input, and --output cli args were dynamically parsed and added from the pipeline file.

This command instructed pythia to do the following:

  1. Load a pipeline from a file located at ./pipeline.txt, which contains gst-launch-like syntax with some parameters to be inserted (input, pgie-conf, output).
  2. Format the pipelie with input, pgie-conf and output from received parameters. (For a more complex syntax, you can install pythia[jinja] to use jinja as a template backend. See the documentation for more details.)
  3. Setup a buffer probe which internally calls the gen_detections method defined in the probe.py file.
  4. Attach said buffer probe in the source pad of the pgie-named element of the pipeline.
  5. Send incoming metadata to a logger which prints jsonified metadata to console.

Check your output

  • Check your console to see the incoming detections.
  • Want to do something else with the detections? You can choose between several backends: logging (stdout , stderr, file available), in-memory (deque), kafka, redis, or implement your own streaming connector with the PYTHIA_STREAM_URI env var. Check the documentation for more details.

Develop applications based on pythia

python API

If you want more granular control over the behavior of the application, its signals, events, and messages, you can instead program an aplication using pythia's API.

Create Files

Continuing with the same pipeline as in the previous example,

  • Create a file myscript.py with:
import json
from kafka import KafkaProducer
from kafka.admin import KafkaAdminClient, NewTopic
from pythia import Application, Gst, objects_per_batch

class App(Application):

    def __init__(self, *a, **kw):
        super().__init__(*a, **kw)
        self.manual_kafka = KafkaProducer(
            bootstrap_servers="kafka:9092"
        )

    def on_message_error(self, *a, **kw):
        err, debug = super().on_message_error(*a, **kw)
        self.manual_kafka.send(
            "app_events",
            json.dumps({ "CONDITION":"ERROR", "ERR": err, "DEBUG": debug}).encode()
        )
        raise RuntimeError("Unhandled pipeline error")

    def on_message_eos(self, bus, message):
        self.manual_kafka.send(
            "app_events",
            json.dumps({ "CONDITION":"EOS", "SENT_BY": str(message.src)}).encode()
        )
        super().on_message_eos(bus, message)

app = App.from_pipeline_file(
    "pipeline.txt",
    params={
      "pgie-conf": "/opt/nvidia/deepstream/deepstream/samples/configs/deepstream-app/config_infer_primary.txt",
      "input": "/opt/nvidia/deepstream/deepstream/samples/streams/sample_720p.mp4",
      "output": "/tmp/overlayed.mp4",
    }
)

@app.probe(
    "pgie",
    pad_direction="src",
    backend_uri="kafka://kafka:9092?stream=raw_detections"
)
def pgie_srcprobe(batch_meta):
    for frame, detection in objects_per_batch(batch_meta):
        frame_num = frame.frame_num
        box = detection.rect_params
        yield {
            "frame_num": frame_num,
            "label": detection.obj_label,
            "left": box.left,
            "top": box.top,
            "width": box.width,
            "height": box.height,
            "confidence": detection.confidence,
        }

@app.probe(
    "muxer",
    pad_direction="src",
)
def source_probe(pad, info):
    app.manual_kafka.send(
        "app_events",
        json.dumps({
            "CONDITION":"STARTED",
            "PAD_CAPS": pad.props.caps.to_string(),
            "PAD_DIRECTION": pad.props.direction,
            "PAD_OFFSET": pad.props.offset,
        }).encode()
    )
    return Gst.PadProbeReturn.REMOVE

if __name__ == "__main__":
    admin = KafkaAdminClient(bootstrap_servers="kafka:9092")
    if "app_events" not in admin.list_topics():
        admin.create_topics(
            new_topics=[
                NewTopic(name="app_events", num_partitions=1,replication_factor=1)
            ],
            validate_only=False,
        )
    app()

Running pythia

  • run the application with:
$ python myscript.py

In this mode, you have more control over the application behavior:

  1. Subclass application
  2. instantiate a custom message handler (a kafka producer in this example)
  3. forward error and EOS messages to a custom kafka topic
  4. interpolate the pipeline template file with python variables to construct the app
  5. use the @app.probe decorator as a generator, letting pythia handle the messages internally
  6. use the @app.probe decorator as a probe, handling manually the buffer flow and messaging.

Want to do something else while the application is running? you can run the application with app(background=True) instead. See the documentation for details and more examples.

Check your output

  • Check the kafka topics to see the incoming detections.

Setup

Prerequisites

  • nvidia hardware (either jetson or gpu)
  • One of
    • recent docker (with support --gpus=all)
    • nvidia-docker installed,
    • environment with deepstream 6.1 and these bindings

Install

non-docker

  • pip install pythiags

docker

  • docker pull ghcr.io/rmclabs-io/pythia or ghcr.io/rmclabs-io/pythia-l4t
  • Build your image using FROM ghcr.io/rmclabs-io/pythia or FROM ghcr.io/rmclabs-io/pythia-l4t

Alternatively, you could use ghcr.io/rmclabs-io/pythia-dev or FROM ghcr.io/rmclabs-io/pythia-l4t-dev.

Usage

Note: If running from docker, make sure you've properly configured the container and its environment, see the reference upstream

For more examples and tutorials, visit the examples section of the documentation.

FAQ

Check out ongoing and future development here

FAQ / Common Issues

  • Q: Package installation fails:

    • A1: upgrade your pip: pip install --upgrade pip (Required "pip>=10").
    • A2: Make sure you've installed the build prerequisites, as listed in reqs/apt.build.list.
  • Q: My application is running slow on Jetson

    • A: Ensure to enable jetson-clocks and maxn (See reference) :

      sudo nvpmodel -m 0
      sudo jetson_clocks
      
  • Q: Program exits with error Unable to get a Window, abort.

    • A: Make sure x11 is properly configured. This is common when running through ssh sessions. In most of the cases, this just means you need to have the DISPLAY environment variable correctly set. To list available displays, run the w command:

      $ w
      09:53:38 up 2 days, 17:26,  1 user,  load average: 0,36, 0,33, 0,23
      USER     TTY      FROM             LOGIN@   IDLE   JCPU   PCPU WHAT
      rmclabs  :0       :0               lun16   ?xdm?   3:06m  0.02s /usr/lib/gdm3/gdm-x-session --run-script /usr/lib/gnome-session/run-systemd-session unity-session.     target
      rmclabs  pts/11   10.100.10.79     09:57    1.00s  0.10s  0.00s w
      

      From here, choose a display corresponding to a local connection (:0 in this case, including the colon). Then, export the environment variable and run again your program:

      export DISPLAY=:0
      # run your program here
      
  • Q: Program exits with error (from docker):

    X Error of failed request:  BadShmSeg (invalidshared segment parameter)
      Major opcode of failed request:  150 (XVideo)
      Minor opcode of failed request:  19 ()
      Segment id in failed request:  0x121
      Serial number of failed request:  57
      Current serial number in output stream:  58
    python: ../../src/hb-object-private.hh:154: Type* hb_object_reference(Type*) [with Type = hb_unicode_funcs_t]: Assertion `hb_object_is_valid (obj)' failed.
    
    • A: Add --ipc=host flag to docker run.
  • Q: Python segfaults when several applications are run subsequently:

    • A: It seems to be a race condition produced by the uridecodebin element using nvjpegenc (maybe others?). You can either add a timeout between runs (1 sec seems to do it), or change nvjpegenc - see pythia.utils.gst:demote_plugin.
  • Q: Python segfaults when several applications are run subsequently:

    • A: It seems to be a race condition produced by the uridecodebin element using nvjpegenc (maybe others?). You can either add a timeout between runs (1 sec seems to do it), or change nvjpegenc - see pythia.utils.gst:demote_plugin.
  • Q: I am unable to build the devcontainer:

    • A: Make sure to update the devcontainer.json with a proper BASE_IMAGE and DOCKER_GROUP_ID.

Contribute

  1. fork
  2. clone
  3. Pull Request from new branch
  4. [Optional, recommended]: use provided devcontainer.
  5. [Optional, recommended]: run pre-commit install to validate commits.
  6. add tests, docs, code, scripts, etc
  7. [Optional] check code manually, with ./scripts/format, ./scripts/lint, ./scripts/docs, ./scripts/test, etc.
  8. Commit using Conventional commits.
  9. push, wait for ci and/or maintainers feedback
  10. repeat 6-8 until success!

For more instructions, visit the Developers section of the documentation.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distributions

No source distribution files available for this release.See tutorial on generating distribution archives.

Built Distribution

pythiags-1.2.0-py3-none-any.whl (48.6 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page