Apache Airflow provider for orchesjob – lightweight idempotent job runner
Project description
airflow-providers-orchesjob
⚠️ EXPERIMENTAL
This package is an experimental implementation. Do not use in production. APIs and behaviour may change without notice.
Apache Airflow provider for orchesjob. Starts and monitors orchesjob jobs on a remote host over SSH.
⚠️ Known Limitations
mode="reschedule" is required
You must use OrchesJobSensor with mode="reschedule". The default mode="poke" does not work.
# ❌ BROKEN: poke mode (default) — do not use
OrchesJobSensor(
task_id="wait",
job_id="...",
ssh_conn_id="my_ssh",
poke_interval=30.0,
)
# ✅ CORRECT: always specify mode="reschedule"
OrchesJobSensor(
task_id="wait",
job_id="...",
ssh_conn_id="my_ssh",
poke_interval=30.0,
mode="reschedule", # required
)
In poke mode the worker process stays alive for the entire duration of the sensor.
In environments such as MWAA, the Airflow server responds with Task Instance not found
for tasks that remain in running state for a long time, causing the worker to forcibly
terminate itself.
In reschedule mode the worker exits normally after each False return from poke(),
and the scheduler re-queues the task after poke_interval seconds, avoiding this problem.
Requirements
- Apache Airflow ≥ 2.6
apache-airflow-providers-ssh≥ 3.0orchesjobinstalled on the remote host
Installation
pip install airflow-providers-orchesjob
Setup
Register an SSH Connection in Airflow (Admin → Connections):
| Field | Value |
|---|---|
| Conn Id | any name (e.g. my_ssh) |
| Conn Type | SSH |
| Host | remote host address |
| Username | SSH username |
Usage
Use OrchesJobOperator to start a job and OrchesJobSensor to wait for completion.
from airflow.decorators import dag
from airflow_providers_orchesjob.operators.orchesjob import OrchesJobOperator
from airflow_providers_orchesjob.sensors.orchesjob import OrchesJobSensor
@dag(dag_id="my_dag", ...)
def my_dag():
start = OrchesJobOperator(
task_id="run_job",
command=["/jobs/import.sh", "--date", "{{ ds }}"],
ssh_conn_id="my_ssh",
)
wait = OrchesJobSensor(
task_id="wait_job",
job_id="{{ ti.xcom_pull(task_ids='run_job', key='job_id') }}",
ssh_conn_id="my_ssh",
poke_interval=30.0,
timeout=3600.0,
mode="reschedule", # required
)
start >> wait
Idempotency
run_key defaults to {dag_id}__{task_id}__{run_id}.
Re-triggering the same DAG run will not re-execute the job if it is still active.
# Explicit run_key
OrchesJobOperator(
task_id="import",
command=["/jobs/import.sh"],
ssh_conn_id="my_ssh",
run_key="daily-import-{{ ds }}",
)
Set strict=True to prevent any re-execution with the same run_key,
even after the previous job has finished.
Parameters
OrchesJobOperator
| Parameter | Type | Default | Description |
|---|---|---|---|
command |
list[str] |
required | Command to run on the remote host |
ssh_conn_id |
str |
required | Airflow SSH Connection ID |
run_key |
str | None |
auto | orchesjob idempotency key |
strict |
bool |
False |
Prevent re-execution with the same run_key |
orchesjob_home |
str | None |
None |
Override ORCHESJOB_HOME on the remote host |
OrchesJobSensor
| Parameter | Type | Default | Description |
|---|---|---|---|
job_id |
str |
required | orchesjob job ID to monitor |
ssh_conn_id |
str |
required | Airflow SSH Connection ID |
orchesjob_home |
str | None |
None |
Override ORCHESJOB_HOME on the remote host |
poke_interval |
float |
30.0 |
Seconds between polls |
timeout |
float |
3600.0 |
Sensor timeout in seconds |
mode |
str |
"poke" |
Must be set to "reschedule" |
Error Handling
| Event | Airflow behaviour |
|---|---|
Job FAILED or LOST |
AirflowException → task retries apply |
Job CANCELLED |
AirflowException |
| SSH connection error | AirflowException → task retries apply |
Sensor timeout exceeded |
AirflowSensorTimeout → task retries apply |
License
MIT
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file airflow_providers_orchesjob-0.1.5.tar.gz.
File metadata
- Download URL: airflow_providers_orchesjob-0.1.5.tar.gz
- Upload date:
- Size: 8.4 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
24e78a82416f82ae336329d11f8318bca50fbc570fab1581f20dcd4868d67e74
|
|
| MD5 |
a04e1081b8aab213ce7a1b00f69b7893
|
|
| BLAKE2b-256 |
1821e0c3eb72c3559967d2476eafe4460fddf770707c4a930bb906a780eb5e0f
|
Provenance
The following attestation bundles were made for airflow_providers_orchesjob-0.1.5.tar.gz:
Publisher:
publish.yml on rmuraki/airflow-providers-orchesjob
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
airflow_providers_orchesjob-0.1.5.tar.gz -
Subject digest:
24e78a82416f82ae336329d11f8318bca50fbc570fab1581f20dcd4868d67e74 - Sigstore transparency entry: 1437103070
- Sigstore integration time:
-
Permalink:
rmuraki/airflow-providers-orchesjob@64137d589bf736bd525fe45dadbb7cb3e95ef96b -
Branch / Tag:
refs/heads/main - Owner: https://github.com/rmuraki
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@64137d589bf736bd525fe45dadbb7cb3e95ef96b -
Trigger Event:
workflow_dispatch
-
Statement type:
File details
Details for the file airflow_providers_orchesjob-0.1.5-py3-none-any.whl.
File metadata
- Download URL: airflow_providers_orchesjob-0.1.5-py3-none-any.whl
- Upload date:
- Size: 8.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
8ca1699f1d312ccda98e3fc449603f40fda349961e8f28fdfc1f824eba25aa10
|
|
| MD5 |
020ecd5a5a57f4aa674c3ca9e169ec63
|
|
| BLAKE2b-256 |
26eacb8e0691bb851e85d7a71a6b93a2ed6bc693f005106cf495e34b0fde0bdd
|
Provenance
The following attestation bundles were made for airflow_providers_orchesjob-0.1.5-py3-none-any.whl:
Publisher:
publish.yml on rmuraki/airflow-providers-orchesjob
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
airflow_providers_orchesjob-0.1.5-py3-none-any.whl -
Subject digest:
8ca1699f1d312ccda98e3fc449603f40fda349961e8f28fdfc1f824eba25aa10 - Sigstore transparency entry: 1437103088
- Sigstore integration time:
-
Permalink:
rmuraki/airflow-providers-orchesjob@64137d589bf736bd525fe45dadbb7cb3e95ef96b -
Branch / Tag:
refs/heads/main - Owner: https://github.com/rmuraki
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@64137d589bf736bd525fe45dadbb7cb3e95ef96b -
Trigger Event:
workflow_dispatch
-
Statement type: