Cogniflow pipeline manager daemon and orchestration runtime
Project description
cf_pipeline_manager
cf_pipeline_manager runs the local Cogniflow pipeline manager daemon.
It is the only supported project entrypoint for starting pipeline execution.
It owns:
- daemon lifecycle
- polling queued runs and pending events
- launching
cf_pipeline_v2 - exposing manager commands over CLI
It does not replace cf_semantics_runtime. Runtime state, snapshots, queue
state, and event persistence live in cf_pipeline_manager.
Runtime Files
The manager writes local daemon state under workspace/pipeline_manager:
daemon.jsondaemon.lockdaemon.logruns/<run_id>/pipeline.engine.jsonruns/<run_id>/step_<index>.engine.jsonruns/<run_id>/engine.command.jsonruns/<run_id>/engine.stdout.logruns/<run_id>/engine.stderr.log
The manager uses ~/.cogniflow/workspace/pipeline_manager for its local daemon state.
CLI Entry Points
You can use either surface:
- standalone:
python -m cf_pipeline_manager ... - unified CLI:
cf pipeline manager ...
The command set is the same for both.
Command Overview
status
ensure-daemon
stop-daemon
activate
submit-run
snapshot
summaries
stop-run
emit-event
All commands return JSON.
Typical Flow
1. Start the daemon
python -m cf_pipeline_manager ensure-daemon
Or:
cf pipeline manager ensure-daemon
This starts the daemon if it is not already running and writes
workspace/pipeline_manager/daemon.json.
2. Check status
python -m cf_pipeline_manager status
Example output shape:
{
"running": true,
"workspace_dir": "C:\\repo\\workspace\\pipeline_manager",
"active_workers": [],
"worker_errors": {}
}
3. Activate a pipeline for manager scheduling
python -m cf_pipeline_manager activate opcua_fifo_avg_to_duckdb_parquet_triggered
You can also set runtime state explicitly:
python -m cf_pipeline_manager activate opcua_fifo_avg_to_duckdb_parquet_triggered --desired-state enabled --concurrency-limit 1 --priority 0
4. Submit a manual run
python -m cf_pipeline_manager submit-run opcua_fifo_avg_to_duckdb_parquet_triggered --source manual --request-id req-001
With JSON overrides:
python -m cf_pipeline_manager submit-run opcua_fifo_avg_to_duckdb_parquet_triggered `
--runner-overrides-json '{\"engine_iterations\":1}' `
--input-overrides-json '[{\"node_id\":\"pipe:opcua_fifo_avg_to_duckdb_parquet_triggered/n1\",\"parameters\":{\"endpoint\":\"opc.tcp://127.0.0.1:4841/VirtualPhServer\"}}]' `
--metadata-json '{\"origin\":\"cli\"}'
5. Inspect summaries or one pipeline snapshot
python -m cf_pipeline_manager summaries
python -m cf_pipeline_manager snapshot opcua_fifo_avg_to_duckdb_parquet_triggered
To inspect a specific run:
python -m cf_pipeline_manager snapshot opcua_fifo_avg_to_duckdb_parquet_triggered --selected-run-id <run_id>
6. Emit an event instead of a manual run
python -m cf_pipeline_manager emit-event opcua_fifo_avg_to_duckdb_parquet_triggered manual --source external --source-event-id evt-001 --dedupe-key evt-001 --payload-json '{\"kind\":\"manual\"}'
The manager will persist the event through cf_ontology and then try to drain
it into the run queue.
7. Stop a queued run
python -m cf_pipeline_manager stop-run opcua_fifo_avg_to_duckdb_parquet_triggered <run_id>
Stopping a queued run returns a stopped result. Stopping a running run returns a structured unsupported response instead of throwing.
8. Stop the daemon
python -m cf_pipeline_manager stop-daemon
Using The Unified CLI
If cf_core_cli is installed, the manager is also available here:
cf pipeline manager --help
cf pipeline manager ensure-daemon
cf pipeline manager activate opcua_fifo_avg_to_duckdb_parquet_triggered
cf pipeline manager submit-run opcua_fifo_avg_to_duckdb_parquet_triggered --source manual
cf pipeline manager summaries
Example Pipeline Assets
The canonical example pipeline used for manager testing is:
sandcastle/cf_pipeline/cf_pipeline_engine/examples/opcua_fifo_avg_to_duckdb_parquet_triggered.nq
The mirrored setup demo is:
sandcastle/cf_setup/src/cf_setup/demo/opcua_fifo_avg_to_duckdb_parquet_triggered.nq
The setup demo intentionally differs from the engine example in a few runtime values such as OPC UA endpoint, window size, and engine duration.
Notes
- The manager launches
cf_pipeline_v2directly. It does not shell out through thecf_pipeline_engineCLI wrapper. - The daemon binds to loopback HTTP only.
- Provider runtime path discovery still follows the existing
workspace/runtime/provider_registry.v1.jsonconvention.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file cf_pipeline_manager-0.1.0.tar.gz.
File metadata
- Download URL: cf_pipeline_manager-0.1.0.tar.gz
- Upload date:
- Size: 46.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.2.0 CPython/3.14.4
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
ad4bb05b8afa88fa63d24eeb6aba5e3c9e109fd7965de050a86e898ca7076571
|
|
| MD5 |
733ef1c7d8d4d8a8e960d4b5bfe7d952
|
|
| BLAKE2b-256 |
7822023bc38e1b3d63aeb9f556d39a3a2477ed46ca8a701b93ba8144d2746d59
|
File details
Details for the file cf_pipeline_manager-0.1.0-py3-none-any.whl.
File metadata
- Download URL: cf_pipeline_manager-0.1.0-py3-none-any.whl
- Upload date:
- Size: 41.4 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.2.0 CPython/3.14.4
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
e45106269103b44761ff21a3867e183bedaf11afdba03d254ffa2cd00d7544ba
|
|
| MD5 |
276bde15cd9c6b2fa712a549e36c6412
|
|
| BLAKE2b-256 |
1719c87a61ef727913f9516e42ad66799d4365da1c8ba520df2d5a11076b777a
|