Skip to main content

A private SDK for nFlow client pipelines.

Project description

nFlow Client SDK

The nFlow Client SDK is a Python package designed for creating, managing, and executing media processing pipelines. It provides an easy-to-use interface for defining pipeline workflows, managing resources, and scheduling jobs.


Features

  • Define pipelines with modular operators.
  • Manage input and output resources.
  • Schedule pipeline jobs with cron-like triggers.
  • Track progress for long-running tasks (e.g., uploads, downloads, execution).
  • Designed for cloud-native environments.

Installation

To install the nflow-client-sdk, use pip:

pip install nflow-client-sdk

Quick Start

Here’s an example of using the SDK to define and execute a pipeline:

import asyncio
from nflow import Resource, Operator, Pipeline, Job, Trigger

# Progress callback for resources
def show_progress(progress):
    print(f"Progress: {progress:.2f}%")

async def main():
    # Step 1: Create resources
    input_resource = Resource("input_video", "collection_name", "file", "mp4")
    output_resource = Resource("output_stream", "collection_name", "live", "rtsp://localhost:8554/test")

    # Step 2: Upload input resource
    await input_resource.upload_async("/path/to/input", progress_callback=show_progress)

    # Step 3: Create operators
    loader = Operator("MP4FileLoaderOp", params={"resource_id": input_resource.id})
    brighten_op1 = Operator("BrightenConvOp", params={"brightness": 1.8})
    brighten_op2 = Operator("BrightenConvOp", params={"brightness": 1.8})
    sender = Operator("RTSPStreamSenderOp", params={"resource_id": output_resource.id})

    # Step 4: Create and link pipeline
    pipeline = Pipeline("my_pipeline")
    pipeline.link(loader, "video-out", brighten_op1, "video-in")
    pipeline.link(brighten_op1, "video-out", brighten_op2, "video-in")
    pipeline.link(brighten_op2, "video-out", sender, "video-in")
    pipeline.link(loader, "audio-out", sender, "audio-in")

    pipeline_id = pipeline.register()

    # Step 5: Schedule a job
    trigger = Trigger(cron="*/5 * * * *")
    job = Job(pipeline_id, trigger)
    job_id = job.start()
    print(f"Job '{job_id}' has been scheduled with trigger: {trigger.cron}")

    # Step 6: Run the job and wait for completion
    await job.run()
    print("Job completed!")

    # Step 7: Download the output resource
    await output_resource.download_async("/path/to/output", progress_callback=show_progress)
    print("Output resource downloaded!")

# Run the workflow
asyncio.run(main())

Operators

Operators are modular processing units that perform specific tasks in a pipeline. They encapsulate their behavior and parameters for easy integration into the pipeline workflow.

Available Operators:

  • MP4FileLoaderOp:

    • Purpose: Loads an MP4 file as input to the pipeline.
    • Parameters:
      • resource_id (str): The ID of the resource to load.
  • BrightenConvOp:

    • Purpose: Adjusts the brightness of video frames.
    • Parameters:
      • brightness (float): Brightness adjustment factor (e.g., 1.8).
  • RTSPStreamSenderOp:

    • Purpose: Sends the output of the pipeline as an RTSP stream.
    • Parameters:
      • resource_id (str): The ID of the output resource.

Pipelines

Pipelines define the logical flow of operations by linking operators. A pipeline manages how data flows from one operator to the next.

Creating a Pipeline

  1. Instantiate a Pipeline object:
    pipeline = Pipeline("my_pipeline")
    
  2. Add operators to the pipeline using the link() method:
    pipeline.link(source_operator, "output_pad_name", target_operator, "input_pad_name")
    
  3. Register the pipeline to prepare it for execution:
    pipeline_id = pipeline.register()
    

Jobs and Triggers

Jobs are responsible for executing pipelines, and triggers define when or how jobs are executed.

Scheduling a Job

  1. Define a trigger with a cron-like schedule:
    trigger = Trigger(cron="*/5 * * * *")  # Every 5 minutes
    
  2. Create a job and associate it with a pipeline:
    job = Job(pipeline_id, trigger)
    
  3. Start the job:
    job_id = job.start()
    
  4. Optionally, wait for the job to execute:
    await job.run()
    

License

This project is licensed under the MIT License. See the LICENSE file for details.


Contributing

Contributions are welcome! Please open an issue or submit a pull request.


Contact

For questions or support, please contact us at support@example.com.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

nflow_client_sdk-0.1.0.tar.gz (5.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

nflow_client_sdk-0.1.0-py3-none-any.whl (6.2 kB view details)

Uploaded Python 3

File details

Details for the file nflow_client_sdk-0.1.0.tar.gz.

File metadata

  • Download URL: nflow_client_sdk-0.1.0.tar.gz
  • Upload date:
  • Size: 5.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.0.1 CPython/3.12.3

File hashes

Hashes for nflow_client_sdk-0.1.0.tar.gz
Algorithm Hash digest
SHA256 e34642e5849baad2aa82c6022a9a34dcbd012c3427b2d9e0221adb7ba69bcf55
MD5 bf2f45d1093c2956bcd68816ad5fbd61
BLAKE2b-256 cc98a56522519aae013604ba0ac3571d80bd9af81a1ae4cde1cce01da4fc510c

See more details on using hashes here.

File details

Details for the file nflow_client_sdk-0.1.0-py3-none-any.whl.

File metadata

File hashes

Hashes for nflow_client_sdk-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 c54779c11cff956c79dadd59ca46b3800e309c6f4bfc846124283aa892427eb4
MD5 d6f706ef8e5297b9c81bf83ee831ae1b
BLAKE2b-256 89fc20b6f6a49c8998f6ffaaa4edf9e7a0c9eaa2d0c49266119b7a8f49ad8220

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page