Skip to main content

Run pytest suites as Airflow tasks, with structured results in XCom.

Project description

airflow-pytest-operator

Run a pytest suite as an Airflow task. The operator executes your tests in a child process, parses the JUnit report into a structured result, pushes a summary to XCom, and fails the task when tests fail (configurable).

Works on Airflow 2.x and 3.x — all version-specific imports are isolated in a single compatibility module, so one wheel supports both.

PyPI version Airflow Python versions License: Apache 2.0

Quality & build status

CI codecov Checked with mypy Ruff OpenSSF Scorecard

Why a child process

Tests run via {sys.executable} -m pytest, i.e. in the same virtualenv / interpreter as the Airflow worker (same dependencies), but in a child process. This keeps pytest's global-state mutations (sys.modules, plugins, cwd, logging) out of the long-lived worker while still satisfying "same environment" semantics. A crashing or segfaulting test can't take the worker down, and the child can be killed cleanly on timeout or task termination.

Install

pip install airflow-pytest-operator
# recommended: hardened XML parsing for untrusted reports
pip install "airflow-pytest-operator[secure-xml]"

Airflow itself is not a hard dependency — the package installs into your existing Airflow environment. Pin a compatible Airflow via an extra if you want resolution help: airflow-pytest-operator[airflow2] or [airflow3].

Installing in Docker / constrained environments

In an Airflow Docker image, install the package with Airflow's official constraint file so dependency resolution matches your Airflow version exactly. Make sure the build args are actually set — an empty AIRFLOW_VERSION/PYTHON_VERSION produces an invalid constraint URL and the build fails:

ARG AIRFLOW_VERSION=2.10.3
ARG PYTHON_VERSION=3.12
RUN pip install --no-cache-dir "airflow-pytest-operator" \
    --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"

The package itself pins nothing (dependencies = []), so any resolution conflict comes from your wider environment; the constraint file is the standard way to keep it reproducible.

Verifying the release

Each PyPI release is published from GitHub Actions via PyPI's Trusted Publishing and ships with a PEP 740 Sigstore attestation that cryptographically binds every wheel and sdist to a specific commit and workflow in this repository. PyPI verifies the attestation at upload time and shows the source repository in the release's Verified details. You can also verify it yourself before installing, which protects against tampering between PyPI and your machine.

PyPI itself verifies the attestation at upload time and surfaces the link back to this repository in the release's Verified details, so the common case (pip install airflow-pytest-operator) already gives you that assurance through PyPI. To verify a specific artifact yourself before installing, use the pypi-attestations CLI:

pip install pypi-attestations
# Replace the filename with the wheel or sdist you want to verify; the
# `pypi:` prefix tells the tool to fetch the artifact + provenance from PyPI.
pypi-attestations verify pypi \
    --repository https://github.com/IKrysanov/airflow-pytest-operator \
    pypi:airflow_pytest_operator-0.3.0-py3-none-any.whl

A successful exit confirms three things at once: the file came from this GitHub repository, it was produced by release.yml (the only configured Trusted Publisher), and it has not been modified since publication. A failure means do not install — either the file is tampered with, or it was published through a path that bypasses our release workflow.

The pypi-attestations CLI is explicitly an experimentation interface per its own documentation; PyPI considers the upload-time check the primary trust path and expects future tooling (including pip itself) to stabilise verification ergonomics.

Usage

import pendulum
from airflow import DAG
from airflow_pytest_operator import PytestOperator

with DAG(
    dag_id="run_tests",
    start_date=pendulum.datetime(2024, 1, 1),
    schedule=None,
) as dag:
    smoke = PytestOperator(
        task_id="smoke_tests",
        test_path="/opt/airflow/tests",      # next to your dags/ folder
        pytest_args=["-k", "smoke", "-x"],   # any pytest CLI args
        env={"ENV": "staging"},              # extra env for the run
        fail_on_test_failure=True,           # task fails if any test fails
    )

The summary pushed to XCom (standard return_value key) looks like:

{
    "total": 12, "passed": 11, "failed": 1, "skipped": 0, "errors": 0,
    "duration": 3.4, "exit_code": 1, "success": False,
    "failed_node_ids": ["tests/test_api.py::test_timeout"],
}

Constructor options

PytestOperator accepts the parameters below plus every parameter that BaseOperator acceptstask_id, retries, execution_timeout, on_failure_callback, trigger_rule, pool, and so on. Airflow 3 users: BaseOperator moved to airflow.sdk; the canonical reference is the Task SDK API docs.

The parameters specific to PytestOperator are:

Option Default Description
test_path File or directory passed to pytest. Templated.
pytest_args [] Extra pytest CLI args, e.g. ["-k", "smoke", "-x"]. Templated.
env {} Extra environment variables for the run. Templated.
fail_on_test_failure True Fail the task on any test failure/error. If False, the task always succeeds and the outcome is only reflected in XCom.
do_xcom_push True Airflow's standard flag. When on, the summary dict is pushed to XCom under the return_value key. Set False to disable all XCom output. Read it downstream with xcom_pull(task_ids="<task>").
runner SubprocessPytestRunner() Injectable execution strategy (see Extending).
parser JUnitResultParser() Injectable report parser (see Extending).

The default SubprocessPytestRunner additionally accepts python_executable, timeout, report_dir, cwd, grace_period, and cleanup — see below.

pytest config, plugins, and Allure

The operator runs real python -m pytest, so pytest discovers its own configuration (pytest.ini, pyproject.toml, tox.ini, setup.cfg) and rootdir exactly as on the command line. Plugins and their options are picked up from your test folder's config automatically — Allure, pytest-xdist, pytest-cov, markers, addopts, and so on. The operator only adds --junitxml (for its own parsing); everything else is yours.

To make relative paths in addopts (e.g. --alluredir=allure-results) resolve next to your tests rather than the worker's working directory, the runner sets its working directory to the test folder by default: a directory test_path becomes the cwd, a file's parent becomes the cwd. Pass an explicit cwd= to override. The JUnit report path stays absolute, so this never misplaces it.

# pytest.ini next to your tests, with allure-pytest installed on the worker:
#   [pytest]
#   addopts = --alluredir=allure-results
# -> results land in <tests>/allure-results, as expected.

On distributed executors, make sure the plugins you reference (e.g. allure-pytest) are installed in the worker/pod environment, and write Allure output to persistent storage (volume/S3) rather than an ephemeral pod filesystem.

Report cleanup

When report_dir is not given, the runner creates a temporary directory per run for the JUnit report. It is cleaned up according to the cleanup policy on SubprocessPytestRunner:

cleanup Behaviour
"always" (default) Remove the temp dir after every run, including on test failure and on task kill/timeout.
"on_success" Keep the temp dir when the run failed (for post-mortem); remove it on success.
"never" Never remove it (e.g. you upload it as a CI artifact).

A user-supplied report_dir is never removed — it's your data. Cleanup also runs from on_kill, so killed tasks don't leak temp directories.

Cancellation and timeouts

When Airflow kills the task (execution timeout, manual clear/mark-failed, worker shutdown), the operator's on_kill delegates to the runner, which terminates the entire pytest process tree — not just the direct child. This matters because pytest spawns its own children (e.g. xdist workers). Termination is graceful by default: SIGTERM, wait grace_period seconds (default 10), then SIGKILL. Set timeout= on the runner to bound the run itself.

Platform note: process-group termination is fully supported on Linux and macOS. On Windows the package runs and cancels the direct process, but reliable whole-tree termination is not guaranteed; Airflow workers are Linux in virtually all deployments.

Where do the tests live?

The operator runs whatever path exists on the worker at execute time, so it works with any executor (Local, Celery, Kubernetes, custom) — the runner spawns pytest wherever the task already runs. The practical constraint is availability: with LocalExecutor the tests sit next to dags/; with Celery/Kubernetes, make sure the test folder is synced to workers the same way DAGs are (git-sync, baked image, shared volume), or point test_path at wherever they land. If the path is missing, the task fails with a clear TestExecutionError.

Extending it

The operator depends on two narrow abstractions and accepts them via constructor injection — no operator subclassing required. Provide your own to change how tests run or how results are parsed.

Custom runner

from airflow_pytest_operator import PytestOperator, PytestRunner, RunArtifacts

class DockerPytestRunner(PytestRunner):
    def run(self, test_path, *, pytest_args=None, env=None) -> RunArtifacts:
        # run pytest inside a container, write a JUnit xml, then:
        return RunArtifacts(exit_code=..., junit_xml_path="/path/junit.xml")

    # optional: override cancel() / cleanup() if you own resources
    # (the base class provides safe no-op defaults)

PytestOperator(task_id="t", test_path="tests/", runner=DockerPytestRunner())

Custom parser

from airflow_pytest_operator import PytestOperator, ResultParser, TestRunResult

class JSONResultParser(ResultParser):
    def parse(self, report_path, *, exit_code=0) -> TestRunResult:
        ...  # read pytest-json-report output, return a TestRunResult

PytestOperator(task_id="t", test_path="tests/", parser=JSONResultParser())

Architecture

Concern Type Responsibility
PytestOperator operator orchestrate runner→parser, Airflow integration, fail/cleanup policy
PytestRunner / SubprocessPytestRunner runner execute pytest, produce RunArtifacts, own cancel/cleanup
ResultParser / JUnitResultParser parser turn a report file into TestRunResult
compat.airflow shim the only place that imports Airflow
models domain framework-free dataclasses

Development

The library's own tests run without Airflow by stubbing BaseOperator — itself a demonstration of the dependency-inversion design.

pip install -e ".[dev]"
ruff check src tests
mypy
pytest --cov

License

Apache-2.0. See LICENSE.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

airflow_pytest_operator-0.3.0.tar.gz (48.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

airflow_pytest_operator-0.3.0-py3-none-any.whl (37.0 kB view details)

Uploaded Python 3

File details

Details for the file airflow_pytest_operator-0.3.0.tar.gz.

File metadata

  • Download URL: airflow_pytest_operator-0.3.0.tar.gz
  • Upload date:
  • Size: 48.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.13

File hashes

Hashes for airflow_pytest_operator-0.3.0.tar.gz
Algorithm Hash digest
SHA256 724b25f5c173030b61140e8592f3f42ffcfd097292007847bb0cbf4e4d1ddf9e
MD5 f800bc604696026fa95099985336ce57
BLAKE2b-256 2329b6f28f769245068ca74bfe6c6a5a90718bf23df1607d70637761c687a0f7

See more details on using hashes here.

Provenance

The following attestation bundles were made for airflow_pytest_operator-0.3.0.tar.gz:

Publisher: release.yml on IKrysanov/airflow-pytest-operator

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file airflow_pytest_operator-0.3.0-py3-none-any.whl.

File metadata

File hashes

Hashes for airflow_pytest_operator-0.3.0-py3-none-any.whl
Algorithm Hash digest
SHA256 26066acead297ea594b9ad0591c785f466bc263ca217f1f586e661fb226f8e37
MD5 5d2fa967fb5d5c76fd0f487bd9f8355e
BLAKE2b-256 26559336a16278dc7b739189edf241b100ab20302dc6d6942d71d971c0c3020a

See more details on using hashes here.

Provenance

The following attestation bundles were made for airflow_pytest_operator-0.3.0-py3-none-any.whl:

Publisher: release.yml on IKrysanov/airflow-pytest-operator

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page