Skip to main content

Apache Airflow provider for Cian.ru Builder API — collect calls and chats statistics

Project description

airflow-provider-cian


Powered by Claude Code


Airflow provider for Cian.ru Builder API — collect calls and chats statistics.

Installation

pip install airflow-provider-cian

Requirements: Python 3.10+, Apache Airflow 2.9.1–2.x.

Connection Setup

Create an HTTP connection in Airflow (Admin → Connections):

Field Value
Connection Id cian_default (or any name)
Connection Type HTTP
Host https://public-api.cian.ru
Password Bearer token from your Cian Builder cabinet

The provider reads conn.host as base URL and conn.password as Bearer token.

Operator Parameters

CianBuilderReportsOperator:

Parameter Type Default Description
cian_conn_id str cian_default Airflow connection ID
date str required Collection date, YYYY-MM-DD. Supports {{ ds }} template
base_dir str /tmp/cian Base directory for output files
output_format str json json (JSONL) or csv

The operator returns the output file path via return_value XCom.

Output file path: {base_dir}/{safe_run_id}/{date}.{ext}

Output Schema (18 fields)

id, newbuilding_id, newbuilding_name, date, datetime, action_type, searcher_phone, searcher_ct_phone, builder_user_ct_phone, builder_user_phone, builder_sip_uri, call_duration, tariff_price, auction_bet, cashback_spent, billing_price, has_claim, is_targeted

  • date — date extracted in Moscow time (YYYY-MM-DD), useful for partitioning
  • datetime — original API datetime with explicit Moscow offset (YYYY-MM-DDTHH:MM:SS+03:00)
  • is_targeted is computed: billing_price > 0.

Example DAG

from datetime import date, timedelta
from airflow.decorators import dag, task
from airflow.operators.python import PythonOperator
from airflow_provider_cian.operators.builder_reports import CianBuilderReportsOperator
import os

@dag(schedule=None, catchup=False, max_active_tasks=3)
def cian_reports():
    @task
    def get_dates():
        yesterday = date.today() - timedelta(days=1)
        return [(yesterday - timedelta(days=i)).isoformat() for i in range(7)]

    dates = get_dates()

    collect = CianBuilderReportsOperator.partial(
        task_id="collect",
        cian_conn_id="cian_default",
        base_dir="/tmp/cian",
        output_format="json",
    ).expand(date=dates)

    # Add upload here, e.g. LocalFilesystemToS3Operator.partial(...).expand(filename=collect)

    def cleanup(ti, **ctx):
        for path in (ti.xcom_pull(task_ids="collect") or []):
            if path and os.path.exists(path):
                os.remove(path)

    collect >> PythonOperator(task_id="cleanup", python_callable=cleanup, trigger_rule="all_done")

cian_reports()

Rate Limiting

The API limit is ≤10 req/s per token (per Cian account). The hook adds a 100ms sleep before each request. max_active_tasks=3 on the DAG level provides additional safety margin.

If multiple clients share the same IP and you still get 429 errors, create an Airflow Pool:

airflow pools set cian_api 5 "Cian API rate limit pool"

Then pass pool="cian_api" to CianBuilderReportsOperator.partial(...).

Retry Behaviour

On HTTP 429 or 5xx: exponential backoff — 1s, 2s, 4s (3 attempts total), then AirflowException.

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

airflow_provider_cian-0.1.2.tar.gz (15.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

airflow_provider_cian-0.1.2-py3-none-any.whl (8.5 kB view details)

Uploaded Python 3

File details

Details for the file airflow_provider_cian-0.1.2.tar.gz.

File metadata

  • Download URL: airflow_provider_cian-0.1.2.tar.gz
  • Upload date:
  • Size: 15.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for airflow_provider_cian-0.1.2.tar.gz
Algorithm Hash digest
SHA256 455adede453fe75fec841b32f9e520a8ec4eccebf6c2476dd75172915f2e527a
MD5 884212c67497fc69752302a639bd1a4c
BLAKE2b-256 d6d7b1b06efeddc66f98bed91edb31f3463c66e36daa61de389e66a092e43da8

See more details on using hashes here.

Provenance

The following attestation bundles were made for airflow_provider_cian-0.1.2.tar.gz:

Publisher: publish.yml on mkozhin/airflow-provider-cian

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file airflow_provider_cian-0.1.2-py3-none-any.whl.

File metadata

File hashes

Hashes for airflow_provider_cian-0.1.2-py3-none-any.whl
Algorithm Hash digest
SHA256 23b07e9ea239efa5df4319cfbae82a6656595b9d31d40ebdbf158ff5b5f4afc8
MD5 599e86c70b883189560c4e84b59fdeac
BLAKE2b-256 1af8500eb12fd050eb3c8de8426b4962a31026b29302ccf7f605c9aac8573ed9

See more details on using hashes here.

Provenance

The following attestation bundles were made for airflow_provider_cian-0.1.2-py3-none-any.whl:

Publisher: publish.yml on mkozhin/airflow-provider-cian

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page