Skip to main content

Apache Airflow provider for Avito CPA — collect call statistics

Project description

airflow-provider-avito

Apache Airflow provider for Avito CPA — collect call statistics from the Avito advertising platform.


Powered by Claude Code


Installation

pip install airflow-provider-avito

Requires Python 3.10+ and apache-airflow>=2.9.1.

Connection

Create an Airflow connection of type HTTP with conn_id = avito_default (or any name you pass to the operator).

Only the Extra field is used. login and password are ignored.

Single account

{
  "client_id": "your_client_id",
  "client_secret": "your_client_secret"
}

Multiple accounts

{
  "accounts": [
    {"id": "main",    "client_id": "id1", "client_secret": "secret1"},
    {"id": "agency",  "client_id": "id2", "client_secret": "secret2"}
  ]
}

Use account_id parameter on the operator to select which account to use.

Quick start

from airflow.decorators import dag
from airflow.models.param import Param
from airflow_provider_avito.operators.calls import AvitoCallsOperator

@dag(schedule=None, params={"date_from": Param("2026-06-01"), "date_to": Param("2026-06-07")})
def avito_calls_example():
    AvitoCallsOperator(
        task_id="collect_calls",
        avito_conn_id="avito_default",
        date_from="{{ params.date_from }}",
        date_to="{{ params.date_to }}",
        base_dir="/tmp/avito",
        output_format="json",   # or "csv"
        add_snapshot_ts=True,   # optional, see "Snapshot versioning" below
    )

avito_calls_example()

The operator writes one JSONL (or CSV) file per date to {base_dir}/{safe_run_id}/{date}.json and returns a list[dict] with {"date": ..., "path": ..., "snapshot_ts": ...} entries (snapshot_ts is None unless add_snapshot_ts=True).

Snapshot versioning (add_snapshot_ts)

By default, each DAG run writes to the same per-date path, so re-running the DAG overwrites previous output and any history of call-status changes is lost.

Set add_snapshot_ts=True to inject snapshot_ts — the DAG run's logical_date, formatted as YYYY-MM-DDTHH:MM:SS — into every JSON record and into the operator's returned snapshot_ts key. This lets a downstream task build a unique, non-overwriting path per run (e.g. an S3 key suffixed with the snapshot timestamp) and lets ClickHouse/Spark queries pick the latest snapshot or trace status history over time:

-- ClickHouse: latest snapshot only
SELECT * FROM s3('s3://bucket/prefix/**/*.json', 'JSONEachRow')
WHERE toDateTime(snapshot_ts) = (
    SELECT MAX(toDateTime(snapshot_ts)) FROM s3('s3://bucket/prefix/**/*.json', 'JSONEachRow')
)

add_snapshot_ts only applies to output_format="json"; it is ignored when output_format="csv" (the CSV column schema is fixed).

Output record schema

Each record contains 17 fields:

Field Type Description
id int Call ID
buyer_phone str Buyer phone
seller_phone str Seller phone
virtual_phone str Virtual (masked) phone
create_time str Creation time (RFC3339)
start_time str Call start time (RFC3339)
date str Date (YYYY-MM-DD) derived from start_time
duration int Call duration, seconds
waiting_duration float Wait time before answer, seconds
price int Price in kopecks
price_rub float Price in rubles (price / 100)
status_id int Status code
status str Status label (e.g. "Целевой")
item_id int Ad ID
group_title str Campaign name
is_arbitrage_available bool Whether arbitrage is available
record_url str Call recording URL

When add_snapshot_ts=True and output_format="json", an 18th field is added to every record:

Field Type Description
snapshot_ts str DAG run's logical_date, ISO 8601 (YYYY-MM-DDTHH:MM:SS). Only present when add_snapshot_ts=True and output_format="json".

Examples

Full production examples with BigQuery + S3 upload are in examples/:

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

airflow_provider_avito-0.2.0.tar.gz (25.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

airflow_provider_avito-0.2.0-py3-none-any.whl (11.3 kB view details)

Uploaded Python 3

File details

Details for the file airflow_provider_avito-0.2.0.tar.gz.

File metadata

  • Download URL: airflow_provider_avito-0.2.0.tar.gz
  • Upload date:
  • Size: 25.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for airflow_provider_avito-0.2.0.tar.gz
Algorithm Hash digest
SHA256 cff0104b5406c092455d3b5bfa98ca926150ada0d2939ea885c2781cfc6a9bf6
MD5 4ff0b231226bfad258c900fd196aee5e
BLAKE2b-256 0128d5ada632198c366965ffd7b4927edd5cb60038bc971c4aac00fd7775817d

See more details on using hashes here.

Provenance

The following attestation bundles were made for airflow_provider_avito-0.2.0.tar.gz:

Publisher: publish.yml on mkozhin/airflow-provider-avito

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file airflow_provider_avito-0.2.0-py3-none-any.whl.

File metadata

File hashes

Hashes for airflow_provider_avito-0.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 1cff54607fd48a3b48d2f1e6f777b00137983ea3ff8ed59bd531bb8981422dec
MD5 9e1e306dfa288ae165ed74c611496770
BLAKE2b-256 c1f411d538552f282f39c4bc50d012c7131bfe4dea7c00b8034c4e5936a9ef38

See more details on using hashes here.

Provenance

The following attestation bundles were made for airflow_provider_avito-0.2.0-py3-none-any.whl:

Publisher: publish.yml on mkozhin/airflow-provider-avito

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page