Skip to main content

Apache Airflow provider for Avito CPA — collect call statistics

Project description

airflow-provider-avito

Apache Airflow provider for Avito CPA — collect call statistics from the Avito advertising platform.


Powered by Claude Code


Installation

pip install airflow-provider-avito

Requires Python 3.10+ and apache-airflow>=2.9.1.

Connection

Create an Airflow connection of type HTTP with conn_id = avito_default (or any name you pass to the operator).

Only the Extra field is used. login and password are ignored.

Single account

{
  "client_id": "your_client_id",
  "client_secret": "your_client_secret"
}

Multiple accounts

{
  "accounts": [
    {"id": "main",    "client_id": "id1", "client_secret": "secret1"},
    {"id": "agency",  "client_id": "id2", "client_secret": "secret2"}
  ]
}

Use account_id parameter on the operator to select which account to use.

Quick start

from airflow.decorators import dag
from airflow.models.param import Param
from airflow_provider_avito.operators.calls import AvitoCallsOperator

@dag(schedule=None, params={"date_from": Param("2026-06-01"), "date_to": Param("2026-06-07")})
def avito_calls_example():
    AvitoCallsOperator(
        task_id="collect_calls",
        avito_conn_id="avito_default",
        date_from="{{ params.date_from }}",
        date_to="{{ params.date_to }}",
        base_dir="/tmp/avito",
        output_format="json",   # or "csv"
        add_snapshot_ts=True,   # optional, see "Snapshot versioning" below
    )

avito_calls_example()

The operator writes one JSONL (or CSV) file per date to {base_dir}/{safe_run_id}/{date}.json and returns a list[dict] with {"date": ..., "path": ..., "snapshot_ts": ...} entries (snapshot_ts is None unless add_snapshot_ts=True).

Snapshot versioning (add_snapshot_ts)

By default, each DAG run writes to the same per-date path, so re-running the DAG overwrites previous output and any history of call-status changes is lost.

Set add_snapshot_ts=True to inject snapshot_ts — the DAG run's start_date (actual wall-clock UTC start time of the run), formatted as YYYY-MM-DDTHH:MM:SS — into every JSON record and into the operator's returned snapshot_ts key. This lets a downstream task build a unique, non-overwriting path per run (e.g. an S3 key suffixed with the snapshot timestamp) and lets ClickHouse/Spark queries pick the latest snapshot or trace status history over time:

-- ClickHouse: latest snapshot only
SELECT * FROM s3('s3://bucket/prefix/**/*.json', 'JSONEachRow')
WHERE toDateTime(snapshot_ts) = (
    SELECT MAX(toDateTime(snapshot_ts)) FROM s3('s3://bucket/prefix/**/*.json', 'JSONEachRow')
)

add_snapshot_ts only applies to output_format="json"; it is ignored when output_format="csv" (the CSV column schema is fixed).

Output record schema

Each record contains 17 fields. The canonical ordered list of field names is also available as CALL_FIELDS (a tuple[str, ...] exported from airflow_provider_avito.hooks.avito) for use in downstream schema definitions or validation.

Field Type Description
id int Call ID
buyer_phone str Buyer phone
seller_phone str Seller phone
virtual_phone str Virtual (masked) phone
create_time str Creation time (RFC3339)
start_time str Call start time (RFC3339)
date str Date (YYYY-MM-DD) derived from start_time
duration int Call duration, seconds
waiting_duration float Wait time before answer, seconds
price int Price in kopecks
price_rub float Price in rubles (price / 100)
status_id int Status code
status str Status label (e.g. "Целевой")
item_id int Ad ID
group_title str Campaign name
is_arbitrage_available bool Whether arbitrage is available
record_url str Call recording URL

When add_snapshot_ts=True and output_format="json", an 18th field is added to every record:

Field Type Description
snapshot_ts str DAG run's start_date, ISO 8601 (YYYY-MM-DDTHH:MM:SS). Only present when add_snapshot_ts=True and output_format="json".

Call statuses

status_id status
0 Целевой
1 На модерации
2 Целевой после модерации
3 Нецелевой после модерации

Examples

Full production examples with BigQuery + S3 upload are in examples/:

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

airflow_provider_avito-0.2.1.tar.gz (29.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

airflow_provider_avito-0.2.1-py3-none-any.whl (13.6 kB view details)

Uploaded Python 3

File details

Details for the file airflow_provider_avito-0.2.1.tar.gz.

File metadata

  • Download URL: airflow_provider_avito-0.2.1.tar.gz
  • Upload date:
  • Size: 29.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for airflow_provider_avito-0.2.1.tar.gz
Algorithm Hash digest
SHA256 18fb62845e75d476a6f9c04074646cbed0fd6b8d3c1161e1697773b8c89a39bb
MD5 ae48dbee29f9d7707cc767e8782e1ba8
BLAKE2b-256 bc1ce509901651dc4cda9a658f0498f8199d7f9da1c760d9a3de077461f7f12f

See more details on using hashes here.

Provenance

The following attestation bundles were made for airflow_provider_avito-0.2.1.tar.gz:

Publisher: publish.yml on mkozhin/airflow-provider-avito

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file airflow_provider_avito-0.2.1-py3-none-any.whl.

File metadata

File hashes

Hashes for airflow_provider_avito-0.2.1-py3-none-any.whl
Algorithm Hash digest
SHA256 aafdaa6fa1d2460c69f381352760cab9242c09f051e9eabfcfac9fb9ca109852
MD5 a5bfc24a5487b41acdf5728a77a87b4f
BLAKE2b-256 807d6c8ee5dccb227c62d3eb8d807974c0ea38691182dc7c1a69f61d64546654

See more details on using hashes here.

Provenance

The following attestation bundles were made for airflow_provider_avito-0.2.1-py3-none-any.whl:

Publisher: publish.yml on mkozhin/airflow-provider-avito

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page