Skip to main content

A Python wrapper for the Permutive API.

Project description

PermutiveAPI

PyPI version Python versions License: MIT

PermutiveAPI is a Python module to interact with the Permutive API. It provides a set of classes and methods to manage users, imports, cohorts, and workspaces within the Permutive ecosystem.

Table of Contents

Installation

You can install the PermutiveAPI module using pip:

pip install PermutiveAPI --upgrade

Note PermutiveAPI depends on pandas for its DataFrame export helpers. The dependency is installed automatically with the package, but make sure your runtime environment includes it before using the to_pd_dataframe utilities described below.

Configuration

Before using the library, you need to configure your credentials.

  1. Copy the environment file:
    cp _env .env
    
  2. Set your credentials path: Edit the .env file and set the PERMUTIVE_APPLICATION_CREDENTIALS environment variable to the absolute path of your workspace JSON file.
    PERMUTIVE_APPLICATION_CREDENTIALS="/absolute/path/to/your/workspace.json"
    

The workspace credentials JSON can be downloaded from the Permutive dashboard under Settings → API keys. Save the file somewhere secure. The apiKey inside this JSON is used to authenticate API calls.

Usage

Importing the Module

To use the PermutiveAPI module, import the necessary classes. The main classes are exposed at the top level of the PermutiveAPI package:

from PermutiveAPI import (
    Alias,
    Cohort,
    Identity,
    Import,
    Segment,
    Source,
    Workspace,
    ContextSegment,
)

Managing Workspaces

The Workspace class is the main entry point for interacting with your Permutive workspace.

# Create a workspace instance
workspace = Workspace(
    name="Main",
    organisation_id="your-org-id",
    workspace_id="your-workspace-id",
    api_key="your-api-key",
)

# List cohorts in the workspace (including child workspaces)
all_cohorts = workspace.cohorts()
print(f"Found {len(all_cohorts)} cohorts.")

# List imports in the workspace
all_imports = workspace.imports()
print(f"Found {len(all_imports)} imports.")

# List segments for a specific import
segments_in_import = workspace.segments(import_id="your-import-id")
print(f"Found {len(segments_in_import)} segments.")

Managing Cohorts

You can create, retrieve, and list cohorts using the Cohort class.

# List all cohorts
all_cohorts = Cohort.list(api_key="your_api_key")
print(f"Found {len(all_cohorts)} cohorts.")

# Get a specific cohort by ID
cohort_id = "your-cohort-id"
cohort = Cohort.get_by_id(id=cohort_id, api_key="your_api_key")
print(f"Retrieved cohort: {cohort.name}")

# Create a new cohort
new_cohort = Cohort(
    name="High-Value Customers",
    query={"type": "segment", "id": "segment-id-for-high-value-customers"}
)
new_cohort.create(api_key="your_api_key")
print(f"Created cohort with ID: {new_cohort.id}")

Managing Segments

The Segment class allows you to interact with audience segments.

# List all segments for a given import
import_id = "your-import-id"
segments = Segment.list(api_key="your_api_key", import_id=import_id)
print(f"Found {len(segments)} segments in import {import_id}.")

# Get a specific segment by ID
segment_id = "your-segment-id"
segment = Segment.get_by_id(import_id=import_id, segment_id=segment_id, api_key="your_api_key")
print(f"Retrieved segment: {segment.name}")

Managing Imports

You can list and retrieve imports using the Import class.

# List all imports
all_imports = Import.list(api_key="your_api_key")
for imp in all_imports:
    print(f"Import ID: {imp.id}, Code: {imp.code}, Source Type: {imp.source.type}")

# Get a specific import by ID
import_id = "your-import-id"
import_instance = Import.get_by_id(id=import_id, api_key="your_api_key")
print(f"Retrieved import: {import_instance.id}, Source Type: {import_instance.source.type}")

Managing Users

The Identity and Alias classes are used to manage user profiles.

# Create an alias for a user
alias = Alias(id="user@example.com", tag="email", priority=1)

# Create an identity for the user
identity = Identity(user_id="internal-user-id-123", aliases=[alias])

# Send the identity information to Permutive
try:
    identity.identify(api_key="your-api-key")
    print("Successfully identified user.")
except Exception as e:
    print(f"Error identifying user: {e}")

Evaluating Segmentation

The segmentation helpers expose the low-level CCS segmentation endpoint so you can evaluate arbitrary event streams against your configured audiences. Start by describing each event with the Event dataclass and then submit the request with the Segmentation helper.

from PermutiveAPI import Event, Segmentation


event = Event(
    name="SlotViewable",
    time="2025-07-01T15:39:11.594Z",
    properties={"campaign_id": "3747123491"},
)

request = Segmentation(user_id="user-123", events=[event])

# Submit the request to retrieve segment membership
response = request.send(api_key="your-api-key")
print(response["segments"])  # [{"id": "segment-id", "name": "Segment Name"}, ...]

The segmentation endpoint accepts two optional query parameters that you can control directly from the helper:

Parameter Default What it does
activations False Include any activated cohorts in the response payload.
synchronous-validation False Validate events against their schemas before segmentation, which is useful for debugging but adds latency.

Set them when constructing the request or override them per call:

# Opt in for activations and synchronous validation on every request
request = Segmentation(
    user_id="user-123",
    events=[event],
    activations=True,
    synchronous_validation=True,
)

# Or override when sending if you only need them occasionally
response = request.send(
    api_key="your-api-key",
    activations=True,
    synchronous_validation=True,
)

Event.session_id and Event.view_id are optional—include them only when you need to tie events together across sessions or page views. When present, they are forwarded as part of the event payload.

For high-volume workloads, use Segmentation.batch_send to process multiple requests concurrently. The helper integrates with the shared batch runner described in the next section so you can surface throughput metrics via progress_callback while respecting rate limits.

Evaluating Context Segmentation

Use the ContextSegment helper to call the Context API endpoint (https://api.permutive.com/ctx/v1/segment) with a page URL and page properties payload.

from PermutiveAPI import ContextSegment

request = ContextSegment(
    url="https://example.com/article/sports-news",
    page_properties={
        "client": {
            "url": "https://example.com/article/sports-news",
            "domain": "example.com",
            "referrer": "https://example.com",
            "type": "web",
            "user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64)",
            "title": "Latest Sports News",
        },
        "category": "sports",
        "tags": ["football", "premier-league"],
    },
)

response = request.send(api_key="your-api-key")
print(response["segments"])

Working with pandas DataFrames

The list models expose helpers for quick DataFrame exports when you need to analyze your data using pandas. Each list class provides a to_pd_dataframe method that returns a pandas.DataFrame populated with the model attributes:

from PermutiveAPI import Cohort, CohortList

cohorts = CohortList(
    [
        Cohort(name="C1", id="1", code="c1", tags=["t1"]),
        Cohort(name="C2", id="2", description="second cohort"),
    ]
)

df = cohorts.to_pd_dataframe()
print(df[["id", "name"]])

The same helper is available on SegmentList and ImportList for consistency across the API.

Batch Helpers and Progress Callbacks

High-volume workflows often rely on the batch_* helpers to run requests concurrently. Every helper accepts an optional progress_callback that is invoked after each request completes with a :class:~PermutiveAPI._Utils.http.Progress snapshot describing aggregate throughput. The dataclass includes counters for completed requests, failure totals, elapsed time, and the estimated seconds required to process 1,000 requests, making it straightforward to surface both reliability and latency trends in dashboards or logs. Most workloads achieve a good balance between throughput and API friendliness with max_workers=4. Increase the pool size gradually (for example to 6 or 8 workers) only after observing stable latency and error rates because the Permutive API enforces rate limits.

from PermutiveAPI import Cohort
from PermutiveAPI._Utils.http import Progress


def on_progress(progress: Progress) -> None:
    """Render a concise progress snapshot."""
    avg = progress.average_per_thousand_seconds
    avg_display = f"{avg:.2f}s" if avg is not None else "n/a"
    print(
        f"{progress.completed}/{progress.total} completed; "
        f"errors: {progress.errors}, avg/1k: {avg_display}"
    )


cohorts = [
    Cohort(name="VIP Customers", query={"type": "users"}),
    Cohort(name="Returning Visitors", query={"type": "visitors"}),
]

responses, failures = Cohort.batch_create(
    cohorts,
    api_key="your-api-key",
    max_workers=4,  # recommended starting point for concurrent writes
    progress_callback=on_progress,
)

if failures:
    print(f"Encountered {len(failures)} failures.")

The same callback shape is shared across helpers such as Identity.batch_identify and Segment.batch_create, enabling reuse of progress reporting utilities that surface throughput, error counts, and latency projections. The helpers delegate to :func:PermutiveAPI._Utils.http.process_batch, so they automatically benefit from the shared retry/backoff configuration used by the underlying request helpers. When the API responds with HTTP 429 (rate limiting), the helper retries using the exponential backoff already built into the package before surfacing the error in the failures list. The segmentation helper, Segmentation.batch_send, also consumes the same callback so you can track progress consistently across segmentation workloads.

Configuring batch defaults

Two environment variables allow you to tune the default behaviour without touching application code:

  • PERMUTIVE_BATCH_MAX_WORKERS controls the worker pool size used by the shared batch runner when max_workers is omitted. Provide a positive integer to cap concurrency or leave it unset to use Python's default heuristic.
  • PERMUTIVE_BATCH_TIMEOUT_SECONDS controls the default timeout applied to each PermutiveAPI._Utils.http.BatchRequest. Set it to a positive float (in seconds) to align the HTTP timeout with your infrastructure's expectations.

Invalid values raise ValueError during initialisation to surface mistakes early in the development cycle.

Configuring retry defaults

Transient failure handling can also be adjusted through environment variables. When unset, the package uses the standard RetryConfig defaults.

  • PERMUTIVE_RETRY_MAX_RETRIES sets the number of attempts performed by the HTTP helpers before surfacing an error. Provide a positive integer.
  • PERMUTIVE_RETRY_BACKOFF_FACTOR controls the exponential multiplier applied after each failed attempt. Provide a positive number (floats are accepted).
  • PERMUTIVE_RETRY_INITIAL_DELAY_SECONDS specifies the starting delay in seconds before retrying. Provide a positive number.

Supplying invalid values for any of these variables raises ValueError when the retry configuration is evaluated, helping catch misconfiguration early.

Segmentation workflows follow the same pattern. For example, you can create multiple segments for a given import in one request batch while reporting progress back to an observability system:

from PermutiveAPI import Segment


segments = [
    Segment(
        import_id="import-123",
        name="Frequent Flyers",
        query={"type": "users", "filter": {"country": "US"}},
    ),
    Segment(
        import_id="import-123",
        name="Dormant Subscribers",
        query={"type": "users", "filter": {"status": "inactive"}},
    ),
]

segment_responses, segment_failures = Segment.batch_create(
    segments,
    api_key="your-api-key",
    max_workers=4,
    progress_callback=on_progress,
)

if segment_failures:
    print(f"Encountered {len(segment_failures)} failures during segment creation.")

You can also evaluate multiple users in parallel while reporting progress back to an observability system:

from PermutiveAPI import Event, Segmentation


events = [
    Event(
        name="SlotViewable",
        time="2025-07-01T15:39:11.594Z",
        properties={"campaign_id": "3747123491"},
        session_id="f19199e4-1654-4869-b740-703fd5bafb6f",
        view_id="d30ccfc5-c621-4ac4-a282-9a30ac864c8a",
    )
]

requests = [
    Segmentation(user_id="user-1", events=events),
    Segmentation(user_id="user-2", events=events),
]

segmentation_responses, segmentation_failures = Segmentation.batch_send(
    requests,
    api_key="your-api-key",
    max_workers=4,
    progress_callback=on_progress,
)

if segmentation_failures:
    print(f"Encountered {len(segmentation_failures)} failures during segmentation.")

Error Handling

The package raises purpose-specific exceptions that are also available at the top level of the package for convenience:

from PermutiveAPI import (
    PermutiveAPIError,
    PermutiveAuthenticationError,
    PermutiveBadRequestError,
    PermutiveRateLimitError,
    PermutiveResourceNotFoundError,
    PermutiveServerError,
)

try:
    # make an API call via the high-level classes
    Cohort.list(api_key="your_api_key")
except PermutiveBadRequestError as e:
    # e.status, e.url, and e.response are available for debugging
    print(e.status, e.url, e)
except PermutiveAPIError as e:
    print("Unhandled API error:", e)

Development

To set up a development environment, install the development dependencies:

pip install ".[dev]"

Running Tests

Before committing any changes, please run the following checks to ensure code quality and correctness.

Style Checks:

pydocstyle PermutiveAPI
black --check .

Static Type Analysis:

pyright PermutiveAPI

Unit Tests and Coverage:

pytest -q --cov=PermutiveAPI --cov-report=term-missing --cov-fail-under=70

All checks must pass before a pull request can be merged.

Contributing

Contributions are welcome! Please see CONTRIBUTING.md for development setup and pull request guidelines.

License

This project is licensed under the MIT License. See the LICENSE file for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

permutiveapi-5.4.5.tar.gz (53.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

permutiveapi-5.4.5-py3-none-any.whl (46.8 kB view details)

Uploaded Python 3

File details

Details for the file permutiveapi-5.4.5.tar.gz.

File metadata

  • Download URL: permutiveapi-5.4.5.tar.gz
  • Upload date:
  • Size: 53.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.9.25

File hashes

Hashes for permutiveapi-5.4.5.tar.gz
Algorithm Hash digest
SHA256 c7db72cdf247cfb4ad19978c3346f6843ddbc09f0f98a9c7bd74d53e36a702a5
MD5 b7f06071b1f285c8c7468f18564cf273
BLAKE2b-256 bebfb77748cbf0c6712f9669e522fde9f39d377c1b38d71e13a4818db3ec894d

See more details on using hashes here.

File details

Details for the file permutiveapi-5.4.5-py3-none-any.whl.

File metadata

  • Download URL: permutiveapi-5.4.5-py3-none-any.whl
  • Upload date:
  • Size: 46.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.9.25

File hashes

Hashes for permutiveapi-5.4.5-py3-none-any.whl
Algorithm Hash digest
SHA256 192ff8f254d027c5ae8630a97060d41a36f57da06dde641f6e197304aacbee42
MD5 eede6262178e789194d26ce16f77b797
BLAKE2b-256 c74cc1fed80d0f51290142d9b47c979aa07bb0ee22f368ea63c6fde6de4cbf89

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page