Skip to main content

Apache Airflow provider for Cian.ru Builder API — collect calls and chats statistics

Project description

airflow-provider-cian


Powered by Claude Code


Airflow provider for Cian.ru Builder API — collect calls and chats statistics.

Installation

pip install airflow-provider-cian

Requirements: Python 3.10+, Apache Airflow 2.9.1–2.x.

Connection Setup

Create an HTTP connection in Airflow (Admin → Connections):

Field Value
Connection Id cian_default (or any name)
Connection Type HTTP
Host https://public-api.cian.ru
Password Bearer token from your Cian Builder cabinet

The provider reads conn.host as base URL and conn.password as Bearer token.

Operator Parameters

CianBuilderReportsOperator:

Parameter Type Default Description
cian_conn_id str cian_default Airflow connection ID
date str required Collection date, YYYY-MM-DD. Supports {{ ds }} template
base_dir str /tmp/cian Base directory for output files
output_format str json json (JSONL) or csv

The operator returns the output file path via return_value XCom.

Output file path: {base_dir}/{safe_run_id}/{date}.{ext}

Output Schema (17 fields)

id, newbuilding_id, newbuilding_name, date, action_type, searcher_phone, searcher_ct_phone, builder_user_ct_phone, builder_user_phone, builder_sip_uri, call_duration, tariff_price, auction_bet, cashback_spent, billing_price, has_claim, is_targeted

is_targeted is computed: billing_price > 0.

Example DAG

from datetime import date, timedelta
from airflow.decorators import dag, task
from airflow.operators.python import PythonOperator
from airflow_provider_cian.operators.builder_reports import CianBuilderReportsOperator
import os

@dag(schedule=None, catchup=False, max_active_tasks=3)
def cian_reports():
    @task
    def get_dates():
        yesterday = date.today() - timedelta(days=1)
        return [(yesterday - timedelta(days=i)).isoformat() for i in range(7)]

    dates = get_dates()

    collect = CianBuilderReportsOperator.partial(
        task_id="collect",
        cian_conn_id="cian_default",
        base_dir="/tmp/cian",
        output_format="json",
    ).expand(date=dates)

    # Add upload here, e.g. LocalFilesystemToS3Operator.partial(...).expand(filename=collect)

    def cleanup(ti, **ctx):
        for path in (ti.xcom_pull(task_ids="collect") or []):
            if path and os.path.exists(path):
                os.remove(path)

    collect >> PythonOperator(task_id="cleanup", python_callable=cleanup, trigger_rule="all_done")

cian_reports()

Rate Limiting

The API limit is ≤10 req/s per token (per Cian account). The hook adds a 100ms sleep before each request. max_active_tasks=3 on the DAG level provides additional safety margin.

If multiple clients share the same IP and you still get 429 errors, create an Airflow Pool:

airflow pools set cian_api 5 "Cian API rate limit pool"

Then pass pool="cian_api" to CianBuilderReportsOperator.partial(...).

Retry Behaviour

On HTTP 429 or 5xx: exponential backoff — 1s, 2s, 4s (3 attempts total), then AirflowException.

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

airflow_provider_cian-0.1.1.tar.gz (14.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

airflow_provider_cian-0.1.1-py3-none-any.whl (8.2 kB view details)

Uploaded Python 3

File details

Details for the file airflow_provider_cian-0.1.1.tar.gz.

File metadata

  • Download URL: airflow_provider_cian-0.1.1.tar.gz
  • Upload date:
  • Size: 14.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for airflow_provider_cian-0.1.1.tar.gz
Algorithm Hash digest
SHA256 648be0cc596e6532f5b3646ca75bb4f4346d0f61e8a169b19065cd057f1e816e
MD5 04e7bc26656f1d20b73a3d0b88d21028
BLAKE2b-256 29830ee5728985a768c301e956c752f54c4bc0903b3b8486aa7866244947b1e9

See more details on using hashes here.

Provenance

The following attestation bundles were made for airflow_provider_cian-0.1.1.tar.gz:

Publisher: publish.yml on mkozhin/airflow-provider-cian

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file airflow_provider_cian-0.1.1-py3-none-any.whl.

File metadata

File hashes

Hashes for airflow_provider_cian-0.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 8189fdcacdf63915379467f94d89f88aded9fc33aa9006a46cca2338e37420eb
MD5 5f2dafb494b8d69b84b9dbcb40936e8a
BLAKE2b-256 dd3403923a6b08274c34f0359ca008ccde5cca5023370418b031d94d95f7e6a1

See more details on using hashes here.

Provenance

The following attestation bundles were made for airflow_provider_cian-0.1.1-py3-none-any.whl:

Publisher: publish.yml on mkozhin/airflow-provider-cian

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page