A Python SDK for nFlow client pipelines.
Project description
nFlow Client SDK
The nFlow Client SDK is a Python package designed for creating, managing, and executing media processing pipelines. It provides an easy-to-use interface for defining pipeline workflows, managing resources, and scheduling jobs.
Features
- Define pipelines with modular operators.
- Manage input and output resources.
- Schedule pipeline jobs with cron-like triggers.
- Track progress for long-running tasks (e.g., uploads, downloads, execution).
- Designed for cloud-native environments.
Installation
To install the nflow-client-sdk, use pip:
pip install nflow-client-sdk
Quick Start
Here’s an example of using the SDK to define and execute a pipeline:
import asyncio
from nflow import Resource, Operator, Pipeline, Job, Trigger
# Progress callback for resources
def show_progress(progress):
print(f"Progress: {progress:.2f}%")
async def main():
# Step 1: Create resources
input_resource = Resource("input_video", "collection_name", "file", "mp4")
output_resource = Resource("output_stream", "collection_name", "live", "rtsp://localhost:8554/test")
# Step 2: Upload input resource
await input_resource.upload_async("/path/to/input", progress_callback=show_progress)
# Step 3: Create operators
loader = Operator("MP4FileLoaderOp", params={"resource_id": input_resource.id})
brighten_op1 = Operator("BrightenConvOp", params={"brightness": 1.8})
brighten_op2 = Operator("BrightenConvOp", params={"brightness": 1.8})
sender = Operator("RTSPStreamSenderOp", params={"resource_id": output_resource.id})
# Step 4: Create and link pipeline
pipeline = Pipeline("my_pipeline")
pipeline.link(loader, "video-out", brighten_op1, "video-in")
pipeline.link(brighten_op1, "video-out", brighten_op2, "video-in")
pipeline.link(brighten_op2, "video-out", sender, "video-in")
pipeline.link(loader, "audio-out", sender, "audio-in")
pipeline_id = pipeline.register()
# Step 5: Schedule a job
trigger = Trigger(cron="*/5 * * * *")
job = Job(pipeline_id, trigger)
job_id = job.start()
print(f"Job '{job_id}' has been scheduled with trigger: {trigger.cron}")
# Step 6: Run the job and wait for completion
await job.run()
print("Job completed!")
# Step 7: Download the output resource
await output_resource.download_async("/path/to/output", progress_callback=show_progress)
print("Output resource downloaded!")
# Run the workflow
asyncio.run(main())
Operators
Operators are modular processing units that perform specific tasks in a pipeline. They encapsulate their behavior and parameters for easy integration into the pipeline workflow.
Available Operators:
-
MP4FileLoaderOp:- Purpose: Loads an MP4 file as input to the pipeline.
- Parameters:
resource_id(str): The ID of the resource to load.
-
BrightenConvOp:- Purpose: Adjusts the brightness of video frames.
- Parameters:
brightness(float): Brightness adjustment factor (e.g.,1.8).
-
RTSPStreamSenderOp:- Purpose: Sends the output of the pipeline as an RTSP stream.
- Parameters:
resource_id(str): The ID of the output resource.
Pipelines
Pipelines define the logical flow of operations by linking operators. A pipeline manages how data flows from one operator to the next.
Creating a Pipeline
- Instantiate a
Pipelineobject:pipeline = Pipeline("my_pipeline")
- Add operators to the pipeline using the link() method:
pipeline.link(source_operator, "output_pad_name", target_operator, "input_pad_name")
- Register the pipeline to prepare it for execution:
pipeline_id = pipeline.register()
Jobs and Triggers
Jobs are responsible for executing pipelines, and triggers define when or how jobs are executed.
Scheduling a Job
- Define a trigger with a cron-like schedule:
trigger = Trigger(cron="*/5 * * * *") # Every 5 minutes
- Create a job and associate it with a pipeline:
job = Job(pipeline_id, trigger)
- Start the job:
job_id = job.start()
- Optionally, wait for the job to execute:
await job.run()
License
This project is licensed under the MIT License. See the LICENSE file for details.
Contributing
Contributions are welcome! Please open an issue or submit a pull request.
Contact
For questions or support, please contact us at support@example.com.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file nflow-0.1.0.tar.gz.
File metadata
- Download URL: nflow-0.1.0.tar.gz
- Upload date:
- Size: 5.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.0.1 CPython/3.12.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
19720d5d2b687f0c5511112db7894a3259d66dd71017ac9a1a0f3023685705ee
|
|
| MD5 |
c78f34ce2c20658d9aa334e0664990de
|
|
| BLAKE2b-256 |
c347678002ef19bf181735baf94faff386edba8dcc7e7b5182551a93848febe7
|
File details
Details for the file nflow-0.1.0-py3-none-any.whl.
File metadata
- Download URL: nflow-0.1.0-py3-none-any.whl
- Upload date:
- Size: 6.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.0.1 CPython/3.12.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
c959676d0f9d3f83bc3ef6c9d90da6d00e438eba73d98914935df771482f2fbd
|
|
| MD5 |
23712cad0aa9cb146c6fc93982701570
|
|
| BLAKE2b-256 |
db501cf0038a23aea16659e9246ec9ca2d8e1f84d99f5214019354cc3f02735c
|