Skip to main content

Apache Airflow provider for Cian.ru Builder API — collect calls and chats statistics

Project description

airflow-provider-cian

Airflow provider for Cian.ru Builder API — collect calls and chats statistics.

Installation

pip install airflow-provider-cian

Requirements: Python 3.10+, Apache Airflow 2.9.1–2.x.

Connection Setup

Create an HTTP connection in Airflow (Admin → Connections):

Field Value
Connection Id cian_default (or any name)
Connection Type HTTP
Host https://public-api.cian.ru
Password Bearer token from your Cian Builder cabinet

The provider reads conn.host as base URL and conn.password as Bearer token.

Operator Parameters

CianBuilderReportsOperator:

Parameter Type Default Description
cian_conn_id str cian_default Airflow connection ID
date str required Collection date, YYYY-MM-DD. Supports {{ ds }} template
base_dir str /tmp/cian Base directory for output files
output_format str json json (JSONL) or csv

The operator returns the output file path via return_value XCom.

Output file path: {base_dir}/{safe_run_id}/{date}.{ext}

Output Schema (16 fields)

id, newbuilding_id, newbuilding_name, date, action_type, searcher_phone, builder_user_ct_phone, builder_user_phone, builder_sip_uri, call_duration, tariff_price, auction_bet, cashback_spent, billing_price, has_claim, is_targeted

is_targeted is computed: billing_price > 0.

Example DAG

from datetime import date, timedelta
from airflow.decorators import dag, task
from airflow.operators.python import PythonOperator
from airflow_provider_cian.operators.builder_reports import CianBuilderReportsOperator
import os

@dag(schedule=None, catchup=False, max_active_tasks=3)
def cian_reports():
    @task
    def get_dates():
        yesterday = date.today() - timedelta(days=1)
        return [(yesterday - timedelta(days=i)).isoformat() for i in range(7)]

    dates = get_dates()

    collect = CianBuilderReportsOperator.partial(
        task_id="collect",
        cian_conn_id="cian_default",
        base_dir="/tmp/cian",
        output_format="json",
    ).expand(date=dates)

    # Add upload here, e.g. LocalFilesystemToS3Operator.partial(...).expand(filename=collect)

    def cleanup(ti, **ctx):
        for path in (ti.xcom_pull(task_ids="collect") or []):
            if path and os.path.exists(path):
                os.remove(path)

    collect >> PythonOperator(task_id="cleanup", python_callable=cleanup, trigger_rule="all_done")

cian_reports()

Rate Limiting

The API limit is ≤10 req/s per token (per Cian account). The hook adds a 100ms sleep before each request. max_active_tasks=3 on the DAG level provides additional safety margin.

If multiple clients share the same IP and you still get 429 errors, create an Airflow Pool:

airflow pools set cian_api 5 "Cian API rate limit pool"

Then pass pool="cian_api" to CianBuilderReportsOperator.partial(...).

Retry Behaviour

On HTTP 429 or 5xx: exponential backoff — 1s, 2s, 4s (3 attempts total), then AirflowException.

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

airflow_provider_cian-0.1.0.tar.gz (12.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

airflow_provider_cian-0.1.0-py3-none-any.whl (8.2 kB view details)

Uploaded Python 3

File details

Details for the file airflow_provider_cian-0.1.0.tar.gz.

File metadata

  • Download URL: airflow_provider_cian-0.1.0.tar.gz
  • Upload date:
  • Size: 12.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for airflow_provider_cian-0.1.0.tar.gz
Algorithm Hash digest
SHA256 42a2352fb14a349bca8a791a90522bc2e4d358227ff09e3e0e9e5f4cdfaf8b82
MD5 e0a0946c99a85df856606c27c619bfc5
BLAKE2b-256 37ba3a3e2f6e15ce83db93c488c8b2c9d7d26559725e96bd057be51ab1929ce9

See more details on using hashes here.

Provenance

The following attestation bundles were made for airflow_provider_cian-0.1.0.tar.gz:

Publisher: publish.yml on mkozhin/airflow-provider-cian

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file airflow_provider_cian-0.1.0-py3-none-any.whl.

File metadata

File hashes

Hashes for airflow_provider_cian-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 f3242c5698661fd30c23a770fc13fb1b50ef4d1e4491827f64cebf2a7d3cde1d
MD5 7d52c5ca1f7bd22e8e6199b4fd0e99f0
BLAKE2b-256 7bfbb4954db59a479d3421fd1c4d129c3b2e87fd7599a9ee25b961492aaa5ed2

See more details on using hashes here.

Provenance

The following attestation bundles were made for airflow_provider_cian-0.1.0-py3-none-any.whl:

Publisher: publish.yml on mkozhin/airflow-provider-cian

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page