Skip to main content

Async CustomerIO Client - a Python client to interact with CustomerIO in an async fashion.

Project description

async-customerio logo

async-customerio

A lightweight asynchronous Python client to interact with Customer.io

PyPI downloads PyPI version License Python versions CI Coverage

  • Free software: MIT license
  • Requires: Python 3.10+

Features

  • Fully asynchronous — built on httpx with HTTP/2 support
  • Async context manager support for clean resource management
  • Compatible interface with the official customerio Python client
  • Track API v1 & v2 (persons, objects, relationships, batch operations)
  • App API support (customers, segments, send messages)
  • Pluggable retry strategy via the RetryStrategy protocol
  • Webhook signature verification

Table of Contents

Installation

pip install async-customerio

Getting started

import asyncio

from async_customerio import AsyncCustomerIO, Regions


async def main():
    async with AsyncCustomerIO(site_id="YOUR_SITE_ID", api_key="YOUR_API_KEY", region=Regions.US) as cio:
        await cio.identify(
            id=5,
            email="customer@example.com",
            first_name="John",
            last_name="Doe",
            subscription_plan="premium",
        )
        await cio.track(
            customer_id=5, name="product.purchased", product_sku="XYZ-12345", price=23.45
        )


if __name__ == "__main__":
    asyncio.run(main())

Configuration

Region

Create an instance of the client with your Customer.io credentials.

from async_customerio import AsyncCustomerIO, Regions


cio = AsyncCustomerIO(site_id, api_key, region=Regions.US)

region is optional and takes one of two values — Regions.US or Regions.EU. If you do not specify your region, we assume that your account is based in the US (Regions.US). If your account is based in the EU and you do not provide the correct region (Regions.EU), we'll route requests to our EU data centers accordingly, however, this may cause data to be logged in the US.

Custom User-Agent

By default every request is sent with the User-Agent header set to async-customerio/<version>. You can override it via the user_agent parameter:

cio = AsyncCustomerIO(site_id, api_key, user_agent="my-app/1.0")

The same parameter is available on AsyncAPIClient.

Track API v1

async with AsyncCustomerIO(site_id="site", api_key="key", region=Regions.US) as cio:
    # Identify a customer
    await cio.identify(id=5, email="customer@example.com", first_name="John")

    # Track an event
    await cio.track(customer_id=5, name="product.purchased", product_sku="XYZ-12345", price=23.45)

Track API v2

The v2 Track API is accessed via the .v2 property on the AsyncCustomerIO instance. It provides typed convenience methods for all person and object operations, sharing the same connection and credentials as the v1 client.

Person operations

async with AsyncCustomerIO(site_id="site", api_key="key", region=Regions.US) as cio:
    # Identify (create or update) a person
    await cio.v2.identify_person(identifiers={"id": 123}, name="Jane", plan="premium")

    # Track an event
    await cio.v2.track_person_event(identifiers={"id": 123}, name="purchase", amount=49.99)

    # Page view / screen view (mobile)
    await cio.v2.person_pageview(identifiers={"id": 123}, name="/pricing")
    await cio.v2.person_screen(identifiers={"id": 123}, name="home_screen")

    # Device management
    await cio.v2.add_person_device(identifiers={"id": 123}, device_id="tok_abc", platform="ios")
    await cio.v2.delete_person_device(identifiers={"id": 123}, device_id="tok_abc")

    # Suppress / unsuppress
    await cio.v2.suppress_person(identifiers={"id": 123})
    await cio.v2.unsuppress_person(identifiers={"id": 123})

    # Merge two person profiles (secondary is deleted)
    await cio.v2.merge_persons(primary={"id": 123}, secondary={"email": "old@example.com"})

    # Delete a person
    await cio.v2.delete_person(identifiers={"id": 123})

Object operations

async with AsyncCustomerIO(site_id="site", api_key="key") as cio:
    # Identify (create or update) an object
    await cio.v2.identify_object(
        identifiers={"object_type_id": "1", "object_id": "acme"},
        name="Acme Corp",
        industry="Software",
    )

    # Track an event on an object
    await cio.v2.track_object_event(
        identifiers={"object_type_id": "1", "object_id": "acme"},
        name="plan_changed",
    )

    # Delete an object
    await cio.v2.delete_object(identifiers={"object_type_id": "1", "object_id": "acme"})

Relationships

async with AsyncCustomerIO(site_id="site", api_key="key") as cio:
    # Relate a person to an object
    await cio.v2.add_person_relationships(
        identifiers={"id": 123},
        relationships=[{"identifiers": {"object_type_id": "1", "object_id": "acme"}}],
    )

    # Relate an object to people
    await cio.v2.add_object_relationships(
        identifiers={"object_type_id": "1", "object_id": "acme"},
        relationships=[{"identifiers": {"id": 123}}, {"identifiers": {"id": 456}}],
    )

    # Remove relationships
    await cio.v2.delete_person_relationships(
        identifiers={"id": 123},
        relationships=[{"identifiers": {"object_type_id": "1", "object_id": "acme"}}],
    )

Batch operations

from async_customerio.track.v2 import Actions

async with AsyncCustomerIO(site_id="site", api_key="key") as cio:
    batch = [
        {
            "type": "person",
            "action": Actions.identify.value,
            "identifiers": {"id": 123},
            "attributes": {"name": "Jane"},
        },
        {
            "type": "object",
            "action": Actions.identify.value,
            "identifiers": {"object_type_id": "1", "object_id": "acme"},
            "attributes": {"name": "Acme Corp"},
        },
    ]
    await cio.v2.send_batch(batch)

Notes

  • All v2 methods validate required parameters and raise AsyncCustomerIOError for missing identifiers, names, etc.
  • The API enforces size limits: each item <= 32 KB, whole batch < 500 KB.
  • send_batch returns the parsed JSON response body. On 200 the batch fully succeeded; on 207 the response contains per-item "errors" for partial failures. HTTP 400+ raises AsyncCustomerIOError.
  • The legacy cio.send_entity() and cio.send_batch() methods still work for backwards compatibility but delegate to the .v2 class internally.

App API

The AsyncAPIClient provides access to the Customer.io App API.

Send messages

import asyncio

from async_customerio import AsyncAPIClient, SendInboxMessageRequest, Regions


async def main():
    api = AsyncAPIClient(key="your-app-api-key", region=Regions.US)
    request = SendInboxMessageRequest(
        transactional_message_id="3",
        identifiers={"id": "user_123"},
        message_data={"name": "Jane", "order_id": "1234"},
    )
    response = await api.send_inbox_message(request)
    print(response)


if __name__ == "__main__":
    asyncio.run(main())

Customers

Customer endpoints are accessed via the .customers namespace:

async with AsyncAPIClient(key="your-app-api-key", region=Regions.US) as client:
    # Look up customers by email
    result = await client.customers.get_by_email("test@example.com")

    # Search with filters and pagination
    result = await client.customers.search(
        filter={"and": [{"segment": {"id": 1}}]},
        limit=10,
    )

    # Get customers with attributes by IDs
    result = await client.customers.get_by_ids([1, 2, 3])

    # Look up a single customer's attributes, segments, messages, etc.
    attrs = await client.customers.get_attributes(42)
    segments = await client.customers.get_segments(42)
    messages = await client.customers.get_messages(42, start_ts=1700000000, limit=5)
    activities = await client.customers.get_activities(42, type="event")
    relationships = await client.customers.get_relationships(42)
    prefs = await client.customers.get_subscription_preferences(42)

    # Use id_type to reference by email or cio_id instead of id
    attrs = await client.customers.get_attributes("test@example.com", id_type="email")

Segments

async with AsyncAPIClient(key="your-app-api-key") as client:
    # List all segments
    result = await client.segments.list()

    # Create a manual segment
    result = await client.segments.create("VIP Users", description="High-value customers")

    # Get a segment, its customer count, and dependencies
    segment = await client.segments.get(segment_id=1)
    count = await client.segments.get_customer_count(segment_id=1)
    deps = await client.segments.get_used_by(segment_id=1)

    # List customers in a segment with pagination
    members = await client.segments.get_membership(segment_id=1, limit=50)

    # Delete a segment
    await client.segments.delete(segment_id=1)

Custom Retry Strategy

By default the library does not retry failed requests — it raises AsyncCustomerIORetryableError for transient failures (transport errors, 429, 502-504) so you can handle retries however you like.

If you want automatic retries, pass a retry_strategy that implements the RetryStrategy protocol. Any object with an async execute(func, *args, **kwargs) method will work.

Using Tenacity

import asyncio

from tenacity import AsyncRetrying, retry_if_exception_type, stop_after_attempt, wait_exponential

from async_customerio import AsyncCustomerIO, AsyncCustomerIORetryableError, Regions


class TenacityRetryStrategy:
    """Retry strategy backed by tenacity."""

    def __init__(self, **kwargs):
        self._kwargs = kwargs

    async def execute(self, func, *args, **kwargs):
        async for attempt in AsyncRetrying(**self._kwargs):
            with attempt:
                return await func(*args, **kwargs)


async def main():
    retry = TenacityRetryStrategy(
        retry=retry_if_exception_type(AsyncCustomerIORetryableError),
        stop=stop_after_attempt(5),
        wait=wait_exponential(multiplier=1, min=1, max=30),
    )
    async with AsyncCustomerIO(
        site_id="YOUR_SITE_ID",
        api_key="YOUR_API_KEY",
        region=Regions.US,
        retry_strategy=retry,
    ) as cio:
        await cio.identify(id=1, name="Jane")


if __name__ == "__main__":
    asyncio.run(main())

The same retry_strategy parameter is available on AsyncAPIClient.

Webhook Signature Verification

Securely verify that incoming webhooks originate from Customer.io (docs).

from async_customerio import validate_signature


webhook_signing_key = "755781b5e03a973f3405a85474d5a032a60fd56fabaad66039b12eadd83955fa"
x_cio_timestamp = 1692633432   # header value
x_cio_signature = "d7c655389bb364d3e8bdbb6ec18a7f1b6cf91f39bba647554ada78aa61de37b9"  # header value
body = b'{"key": "value"}'

if validate_signature(
    signing_key=webhook_signing_key,
    timestamp=x_cio_timestamp,
    request_body=body,
    signature=x_cio_signature,
):
    print("Request is sent from CustomerIO")
else:
    print("Malicious request received")

API Coverage

Track API — 14 of 18 endpoints implemented
Category Endpoints Status
Track Customers 8 Implemented
Track Events 4 Implemented
Track v2 2 Implemented
Track Segments 2 Not yet
Region 1 Not yet
Forms 1 Not yet
App API — 21 of 131 endpoints implemented
Category Endpoints Status
Customers 9 Implemented
Segments 7 Implemented
Send Messages 5 Implemented
Transactional 9 Not yet
Campaigns 13 Not yet
Broadcasts 15 Not yet
Newsletters 16 Not yet
Messages 3 Not yet
Objects 4 Not yet
Activities 1 Not yet
Collections 7 Not yet
Exports 5 Not yet
Imports 2 Not yet
Snippets 4 Not yet
Design Studio 20 Not yet
Assets 10 Not yet
Sender Identities 3 Not yet
Reporting Webhooks 5 Not yet
ESP Suppression 4 Not yet
Subscription Center 1 Not yet
Data Index 2 Not yet
Workspaces 1 Not yet
Info 1 Not yet

License

async-customerio is offered under the MIT license.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

async_customerio-2.12.1.tar.gz (27.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

async_customerio-2.12.1-py3-none-any.whl (30.1 kB view details)

Uploaded Python 3

File details

Details for the file async_customerio-2.12.1.tar.gz.

File metadata

  • Download URL: async_customerio-2.12.1.tar.gz
  • Upload date:
  • Size: 27.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.2.1 CPython/3.12.3 Linux/6.17.0-1008-azure

File hashes

Hashes for async_customerio-2.12.1.tar.gz
Algorithm Hash digest
SHA256 d03e5bc750e2a0ef6aba1b4fc308e856af5840656d6072f7e2dde3dee5f9354e
MD5 7db3c6ba85a7f9ac6baa7364c417353e
BLAKE2b-256 3512f76bb4eaa66f4a366843c2c53c40ac7aebd78c5119db4fc1380fd8236930

See more details on using hashes here.

File details

Details for the file async_customerio-2.12.1-py3-none-any.whl.

File metadata

  • Download URL: async_customerio-2.12.1-py3-none-any.whl
  • Upload date:
  • Size: 30.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.2.1 CPython/3.12.3 Linux/6.17.0-1008-azure

File hashes

Hashes for async_customerio-2.12.1-py3-none-any.whl
Algorithm Hash digest
SHA256 3534feef78b15cbb6168d8bc5add9e6b0e340015b6dcce82bd8440198dbe1e4a
MD5 73d1101b6bd7acc78d5288009882679a
BLAKE2b-256 93dee2a4f5e5dc52abbab4dc3cbfa3f67dc538e96e2ca9f7e6559a7ac5e2f2dc

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page