Apache Airflow provider for RemSvc remote execution via gRPC
Project description
airflow-provider-remsvc
Apache Airflow provider for RemSvc — a gRPC-based remote command execution service.
Requirements
- Apache Airflow >= 3.1.0
- Python >= 3.10
grpcio >= 1.67.0protobuf >= 4.0.0
Installation
# From the prv/ directory — stubs are generated automatically at install time
pip install .
The package is self-contained. pip install runs hatch_build.py which invokes
grpc_tools.protoc in an isolated build environment to generate the remsvc_proto
stubs and bundle them into the wheel. No manual proto generation step is needed.
Airflow Connection Setup
Create a connection of type remsvc in the Airflow UI or via environment variable:
# Insecure, no authentication (development only)
export AIRFLOW_CONN_REMSVC_DEFAULT='remsvc://remotehost:50051?extra={}'
# Insecure channel + bearer token
export AIRFLOW_CONN_REMSVC_DEFAULT='remsvc://remotehost:50051?extra={"bearer_token":"secret-prod-token"}'
# TLS with system trust store + bearer token (recommended for production)
export AIRFLOW_CONN_REMSVC_DEFAULT='remsvc://remotehost:50051?extra={"use_ssl":true,"bearer_token":"secret-prod-token"}'
# TLS with custom CA certificate + bearer token
export AIRFLOW_CONN_REMSVC_DEFAULT='remsvc://remotehost:50051?extra={"use_ssl":true,"ca_cert_path":"/etc/ssl/remsvc-ca.pem","bearer_token":"secret-prod-token"}'
Connection extra fields:
| Field | Type | Description |
|---|---|---|
use_ssl |
bool | Enable TLS (default: false) |
ca_cert_path |
str | Path to CA certificate PEM file; uses system trust store if omitted |
bearer_token |
str | Bearer token sent as Authorization: Bearer <token> on every gRPC call. Must match an identity entry in the server's [auth] config section. Omit if the server has no [auth] section configured. |
Authentication overview
The server enforces per-call bearer-token authentication when its INI config
contains an [auth] section. Each token is tied to a named identity:
# server.ini
[auth]
airflow-prod = secret-prod-token
airflow-staging = secret-staging-token
dev-semih = secret-dev-token
The token is read from the Airflow connection's bearer_token extra field and
injected automatically as Authorization: Bearer <token> metadata on every
gRPC call. Service handlers never see the raw token — it is consumed by the
server-side auth processor before the RPC runs. The verified identity
(airflow-prod, etc.) is logged on the server for each authenticated call.
If the server's [auth] section is absent or empty, all callers are permitted
and bearer_token in the connection extra is silently ignored.
Security note: bearer tokens provide caller identity but not confidentiality. Always pair with TLS (
use_ssl: true) in production so tokens are not transmitted in plaintext.
Usage
Single command
from remsvc_provider.operators.remsvc import RemSvcOperator
run = RemSvcOperator(
task_id="run_remote",
grpc_conn_id="remsvc_default",
commands=[{"cmd": "echo hello", "cmdtyp": 0}],
dag=dag,
)
Multiple commands (streamed concurrently)
run = RemSvcOperator(
task_id="run_remote",
grpc_conn_id="remsvc_default",
commands=[
{"cmd": "echo {{ ds }}", "cmdtyp": 0},
{"cmd": "hostname", "cmdtyp": 0},
{"cmd": "whoami", "cmdtyp": 0},
],
stream_timeout=120.0,
dag=dag,
)
All commands are sent over a single RemCmdStrm bidirectional stream. The server
processes commands sequentially (one child process at a time) and echoes back the
tid in each response. The trigger correlates responses to commands by tid
(1-based index in the commands list) and collects all results before resuming
the worker. The operator fails the task if any command returns a non-zero exit
code or if any response is missing.
Command execution model
Each command the server receives is run in a dedicated OS child process
(cmd.exe /C on Windows or /bin/sh -c on Linux for cmdtyp=0;
powershell.exe on Windows or pwsh on Linux for cmdtyp=1).
Every command gets its own address space, PID, and stdio pipes; when it exits
the process is reaped and all resources are released.
The server processes messages from a single stream sequentially — one child
process at a time. The Airflow trigger sends all commands and reads all
responses concurrently (asyncio.gather), so from the operator's perspective
commands appear to run in parallel, but on the server they are dispatched one
after another in arrival order. If two Airflow tasks target the same server
concurrently their messages interleave at the gRPC level, but each command
still runs in its own isolated child process — they share no process state.
Operator parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
commands |
list[dict] |
required | List of command dicts (see table below). Jinja-templated. |
grpc_conn_id |
str | "remsvc_default" |
Airflow connection ID of type remsvc |
stream_timeout |
float | 3600.0 |
Maximum seconds for the entire stream, enforced at both the Python asyncio level and as the gRPC deadline. If exceeded the task is cancelled (not retried). |
metadata |
list[tuple[str,str]] |
[] |
Additional gRPC call metadata. Connection-level authorization takes precedence — duplicate keys from this list are stripped. Jinja-templated. |
result_handler |
Callable |
built-in | Applied to the raw {tid: result_dict} mapping before XCom push. Defaults to a list sorted ascending by tid. Supply a custom callable for filtering or reshaping results. |
Command dict fields
| Field | Type | Required | Description |
|---|---|---|---|
cmd |
str | Yes | Command string to execute. Must be non-empty and non-whitespace — the operator raises AirflowException at validation time if any command is blank. |
cmdtyp |
int | No | 0 = native shell (default): cmd.exe /C on Windows, /bin/sh -c on Linux. 1 = PowerShell: powershell.exe on Windows, pwsh on Linux. |
cmdusr |
str | No | OS user to run the command as (Linux only; silently ignored on Windows). |
src |
str | No | Source identifier included in server-side log entries for this command. |
XCom result
execute_complete returns a dict pushed to XCom under return_value:
{
"state": "SUCCESS",
"results": [
{"tid": 1, "rc": 0, "out": "2024-01-01\n", "err": "", "hsh": "...", "cmd": "echo {{ ds }}"},
{"tid": 2, "rc": 0, "out": "myhost\n", "err": "", "hsh": "...", "cmd": "hostname"},
],
"error_msg": None,
}
Results are sorted by tid (i.e. original command order).
Development
cd prv/
# Install in editable mode — stubs are generated automatically by the build hook
pip install -e ".[dev]"
# Run all tests (path is configured in pyproject.toml)
pytest
# Run a specific test file
pytest ../tst/unit/prv/test_remsvc_async.py -v
pytest ../tst/unit/prv/test_remsvc_hook.py -v
Tests can also be run from the repository root without cd:
PYTHONPATH=prv pytest tst/unit/prv -v
Regenerating stubs manually — only needed if
src/proto/RemSvc.protochanges and you are working in an editable install without rebuilding:./regen_proto.sh
Building a wheel
The wheel is self-contained — gRPC stubs are compiled from src/proto/RemSvc.proto
at build time by hatch_build.py and bundled into the wheel. No manual proto
generation step is required on the target machine.
cd prv/
# Install the build frontend (one-time)
pip install build
# Build the wheel
python -m build --wheel
The wheel is written to prv/dist/airflow_provider_remsvc-1.0.0-py3-none-any.whl.
Why wheel only? The sdist (
.tar.gz) is not suitable for deployment becausehatch_build.pyresolves the.protofile relative to the repository root, which is not present inside a standalone sdist tarball. Always use--wheel.
Deploying to a bare-metal / VM Airflow instance
1. Copy the wheel to the Airflow server
scp prv/dist/airflow_provider_remsvc-1.0.0-py3-none-any.whl user@airflow-host:/tmp/
2. Install into the Airflow virtualenv
ssh user@airflow-host
source /path/to/airflow-venv/bin/activate
# First install
pip install /tmp/airflow_provider_remsvc-1.0.0-py3-none-any.whl
# Reinstall (same version number, updated content)
pip install --force-reinstall --no-deps /tmp/airflow_provider_remsvc-1.0.0-py3-none-any.whl
Use --no-deps when force-reinstalling to avoid pip upgrading or downgrading
grpcio and protobuf alongside the provider.
3. Verify the provider is detected
airflow providers list | grep remsvc
Expected output:
airflow-provider-remsvc | Remote command execution for Apache Airflow via RemSvc gRPC | 1.0.0
4. Restart Airflow components
sudo systemctl restart airflow-scheduler
sudo systemctl restart airflow-triggerer # required — RemSvcTrigger runs here
sudo systemctl restart airflow-api-server # picks up new connection type in the UI (Airflow 3.x)
The triggerer must be running. The deferrable operator offloads all gRPC communication to the triggerer process; if it is stopped, deferred tasks will hang indefinitely waiting to be resumed.
5. Create the Airflow connection
In the Airflow UI go to Admin → Connections and create a new connection:
| Field | Value |
|---|---|
| Connection Id | remsvc_default (or any name passed to grpc_conn_id) |
| Connection Type | RemSvc gRPC |
| Host | hostname or IP of the RemSvc server |
| Port | 50051 (or the port configured on the server) |
| Extra | {"use_ssl": true, "bearer_token": "secret-token"} |
Alternatively, set the connection via environment variable before starting Airflow:
export AIRFLOW_CONN_REMSVC_DEFAULT='remsvc://remotehost:50051?extra={"use_ssl":true,"bearer_token":"secret-token"}'
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distributions
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file airflow_provider_remsvc-1.0.0-py3-none-any.whl.
File metadata
- Download URL: airflow_provider_remsvc-1.0.0-py3-none-any.whl
- Upload date:
- Size: 22.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
a6bca3113908cf596a0b412728f0397aedf163415916ceb4b721dbf6cc90bdb4
|
|
| MD5 |
ea1f3e0aee989284474b2adf1faf2786
|
|
| BLAKE2b-256 |
0058ddb188015695424ea53ffce78daef9e5b124c3d49cb6eeb1d9e851d46114
|