Skip to main content

Apache Airflow provider for Cian.ru Builder API — collect calls and chats statistics

Project description

airflow-provider-cian


Powered by Claude Code


Airflow provider for Cian.ru Builder API — collect calls and chats statistics.

Installation

pip install airflow-provider-cian

Requirements: Python 3.10+, Apache Airflow 2.9.1–2.x.

Connection Setup

Create an HTTP connection in Airflow (Admin → Connections):

Field Value
Connection Id cian_default (or any name)
Connection Type HTTP
Host https://public-api.cian.ru
Password Bearer token from your Cian Builder cabinet

The provider reads conn.host as base URL and conn.password as Bearer token.

Operator Parameters

CianBuilderReportsOperator:

Parameter Type Default Description
cian_conn_id str cian_default Airflow connection ID
date str required Collection date, YYYY-MM-DD. Supports {{ ds }} template
base_dir str /tmp/cian Base directory for output files
output_format str json json (JSONL) or csv

The operator returns the output file path via return_value XCom.

Output file path: {base_dir}/{safe_run_id}/{date}.{ext}

Output Schema (18 fields)

id, newbuilding_id, newbuilding_name, date, datetime, action_type, searcher_phone, searcher_ct_phone, builder_user_ct_phone, builder_user_phone, builder_sip_uri, call_duration, tariff_price, auction_bet, cashback_spent, billing_price, has_claim, is_targeted

  • date — collection date (YYYY-MM-DD), always equals the operator's date parameter; safe for BigQuery date partitioning
  • datetime — original API datetime with explicit Moscow offset (YYYY-MM-DDTHH:MM:SS+03:00)
  • is_targeted is computed: billing_price > 0.

Example DAG

from datetime import date, timedelta
from airflow.decorators import dag, task
from airflow.operators.python import PythonOperator
from airflow_provider_cian.operators.builder_reports import CianBuilderReportsOperator
import os

@dag(schedule=None, catchup=False, max_active_tasks=3)
def cian_reports():
    @task
    def get_dates():
        yesterday = date.today() - timedelta(days=1)
        return [(yesterday - timedelta(days=i)).isoformat() for i in range(7)]

    dates = get_dates()

    collect = CianBuilderReportsOperator.partial(
        task_id="collect",
        cian_conn_id="cian_default",
        base_dir="/tmp/cian",
        output_format="json",
    ).expand(date=dates)

    # Add upload here, e.g. LocalFilesystemToS3Operator.partial(...).expand(filename=collect)

    def cleanup(ti, **ctx):
        for path in (ti.xcom_pull(task_ids="collect") or []):
            if path and os.path.exists(path):
                os.remove(path)

    collect >> PythonOperator(task_id="cleanup", python_callable=cleanup, trigger_rule="all_done")

cian_reports()

Rate Limiting

The API limit is ≤10 req/s per token (per Cian account). The hook adds a 100ms sleep before each request. max_active_tasks=3 on the DAG level provides additional safety margin.

If multiple clients share the same IP and you still get 429 errors, create an Airflow Pool:

airflow pools set cian_api 5 "Cian API rate limit pool"

Then pass pool="cian_api" to CianBuilderReportsOperator.partial(...).

Error Handling

CianNotFoundError (subclass of AirflowException) is raised when the API returns a "not found" response for a resource. get_newbuilding_name catches it internally and returns "Неизвестно" — DAG authors don't need to handle it for that method. For custom hook usage:

from airflow_provider_cian.hooks import CianNotFoundError

Retry Behaviour

On HTTP 429 or 5xx: exponential backoff — 1s, 2s, 4s (3 attempts total), then AirflowException.

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

airflow_provider_cian-0.1.3.tar.gz (20.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

airflow_provider_cian-0.1.3-py3-none-any.whl (9.1 kB view details)

Uploaded Python 3

File details

Details for the file airflow_provider_cian-0.1.3.tar.gz.

File metadata

  • Download URL: airflow_provider_cian-0.1.3.tar.gz
  • Upload date:
  • Size: 20.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for airflow_provider_cian-0.1.3.tar.gz
Algorithm Hash digest
SHA256 c96e235ec51d7ba4c37e6c6488f35e8fbd0bec480e8809c72a3333f95dd6d2ec
MD5 30d61aea295550442e1b53c66e73e4bb
BLAKE2b-256 847e2b1deaa288d6f7fadbded629d01554f91d5ff625d83a81e1b1acfa99f532

See more details on using hashes here.

Provenance

The following attestation bundles were made for airflow_provider_cian-0.1.3.tar.gz:

Publisher: publish.yml on mkozhin/airflow-provider-cian

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file airflow_provider_cian-0.1.3-py3-none-any.whl.

File metadata

File hashes

Hashes for airflow_provider_cian-0.1.3-py3-none-any.whl
Algorithm Hash digest
SHA256 ca4773034f36675ee3f4f5287a94bf72a7505b3b05311912d37038054a7b4dd5
MD5 adc8dcdc41676c86376404f5b53a47b5
BLAKE2b-256 d0b071eecfd5cebd6bfa116222c91d9a984f7f6b60f36e0fe4b6daafc5dc215e

See more details on using hashes here.

Provenance

The following attestation bundles were made for airflow_provider_cian-0.1.3-py3-none-any.whl:

Publisher: publish.yml on mkozhin/airflow-provider-cian

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page