Skip to main content

Cogniflow pipeline manager daemon and orchestration runtime

Project description

cf_pipeline_manager

cf_pipeline_manager runs the local Cogniflow pipeline manager daemon.

It is the only supported project entrypoint for starting pipeline execution.

It owns:

  • daemon lifecycle
  • polling queued runs and pending events
  • launching cf_pipeline_v2
  • exposing manager commands over CLI

It does not replace cf_semantics_runtime. Runtime state, snapshots, queue state, and event persistence live in cf_pipeline_manager.

Runtime Files

The manager writes local daemon state under workspace/pipeline_manager:

  • daemon.json
  • daemon.lock
  • daemon.log
  • runs/<run_id>/pipeline.engine.json
  • runs/<run_id>/step_<index>.engine.json
  • runs/<run_id>/engine.command.json
  • runs/<run_id>/engine.stdout.log
  • runs/<run_id>/engine.stderr.log

The manager uses ~/.cogniflow/workspace/pipeline_manager for its local daemon state.

CLI Entry Points

You can use either surface:

  • standalone: python -m cf_pipeline_manager ...
  • unified CLI: cf pipeline manager ...

The command set is the same for both.

Command Overview

status
ensure-daemon
stop-daemon
activate
submit-run
snapshot
summaries
stop-run
emit-event

All commands return JSON.

Typical Flow

1. Start the daemon

python -m cf_pipeline_manager ensure-daemon

Or:

cf pipeline manager ensure-daemon

This starts the daemon if it is not already running and writes workspace/pipeline_manager/daemon.json.

2. Check status

python -m cf_pipeline_manager status

Example output shape:

{
  "running": true,
  "workspace_dir": "C:\\repo\\workspace\\pipeline_manager",
  "active_workers": [],
  "worker_errors": {}
}

3. Activate a pipeline for manager scheduling

python -m cf_pipeline_manager activate opcua_fifo_avg_to_duckdb_parquet_triggered

You can also set runtime state explicitly:

python -m cf_pipeline_manager activate opcua_fifo_avg_to_duckdb_parquet_triggered --desired-state enabled --concurrency-limit 1 --priority 0

4. Submit a manual run

python -m cf_pipeline_manager submit-run opcua_fifo_avg_to_duckdb_parquet_triggered --source manual --request-id req-001

With JSON overrides:

python -m cf_pipeline_manager submit-run opcua_fifo_avg_to_duckdb_parquet_triggered `
  --runner-overrides-json '{\"engine_iterations\":1}' `
  --input-overrides-json '[{\"node_id\":\"pipe:opcua_fifo_avg_to_duckdb_parquet_triggered/n1\",\"parameters\":{\"endpoint\":\"opc.tcp://127.0.0.1:4841/VirtualPhServer\"}}]' `
  --metadata-json '{\"origin\":\"cli\"}'

5. Inspect summaries or one pipeline snapshot

python -m cf_pipeline_manager summaries
python -m cf_pipeline_manager snapshot opcua_fifo_avg_to_duckdb_parquet_triggered

To inspect a specific run:

python -m cf_pipeline_manager snapshot opcua_fifo_avg_to_duckdb_parquet_triggered --selected-run-id <run_id>

6. Emit an event instead of a manual run

python -m cf_pipeline_manager emit-event opcua_fifo_avg_to_duckdb_parquet_triggered manual --source external --source-event-id evt-001 --dedupe-key evt-001 --payload-json '{\"kind\":\"manual\"}'

The manager will persist the event through cf_ontology and then try to drain it into the run queue.

7. Stop a queued run

python -m cf_pipeline_manager stop-run opcua_fifo_avg_to_duckdb_parquet_triggered <run_id>

Stopping a queued run returns a stopped result. Stopping a running run returns a structured unsupported response instead of throwing.

8. Stop the daemon

python -m cf_pipeline_manager stop-daemon

Using The Unified CLI

If cf_core_cli is installed, the manager is also available here:

cf pipeline manager --help
cf pipeline manager ensure-daemon
cf pipeline manager activate opcua_fifo_avg_to_duckdb_parquet_triggered
cf pipeline manager submit-run opcua_fifo_avg_to_duckdb_parquet_triggered --source manual
cf pipeline manager summaries

Example Pipeline Assets

The canonical example pipeline used for manager testing is:

  • sandcastle/cf_pipeline/cf_pipeline_engine/examples/opcua_fifo_avg_to_duckdb_parquet_triggered.nq

The mirrored setup demo is:

  • sandcastle/cf_setup/src/cf_setup/demo/opcua_fifo_avg_to_duckdb_parquet_triggered.nq

The setup demo intentionally differs from the engine example in a few runtime values such as OPC UA endpoint, window size, and engine duration.

Notes

  • The manager launches cf_pipeline_v2 directly. It does not shell out through the cf_pipeline_engine CLI wrapper.
  • The daemon binds to loopback HTTP only.
  • Provider runtime path discovery still follows the existing workspace/runtime/provider_registry.v1.json convention.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

cf_pipeline_manager-0.1.0.tar.gz (46.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

cf_pipeline_manager-0.1.0-py3-none-any.whl (41.4 kB view details)

Uploaded Python 3

File details

Details for the file cf_pipeline_manager-0.1.0.tar.gz.

File metadata

  • Download URL: cf_pipeline_manager-0.1.0.tar.gz
  • Upload date:
  • Size: 46.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.2.0 CPython/3.14.4

File hashes

Hashes for cf_pipeline_manager-0.1.0.tar.gz
Algorithm Hash digest
SHA256 ad4bb05b8afa88fa63d24eeb6aba5e3c9e109fd7965de050a86e898ca7076571
MD5 733ef1c7d8d4d8a8e960d4b5bfe7d952
BLAKE2b-256 7822023bc38e1b3d63aeb9f556d39a3a2477ed46ca8a701b93ba8144d2746d59

See more details on using hashes here.

File details

Details for the file cf_pipeline_manager-0.1.0-py3-none-any.whl.

File metadata

File hashes

Hashes for cf_pipeline_manager-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 e45106269103b44761ff21a3867e183bedaf11afdba03d254ffa2cd00d7544ba
MD5 276bde15cd9c6b2fa712a549e36c6412
BLAKE2b-256 1719c87a61ef727913f9516e42ad66799d4365da1c8ba520df2d5a11076b777a

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page