Skip to main content

A Python wrapper for the Permutive API.

Project description

PermutiveAPI

PyPI version Python versions License: MIT

PermutiveAPI is a Python module to interact with the Permutive API. It provides a set of classes and methods to manage users, imports, cohorts, and workspaces within the Permutive ecosystem.

Table of Contents

Installation

You can install the PermutiveAPI module using pip:

pip install PermutiveAPI --upgrade

Note PermutiveAPI depends on pandas for its DataFrame export helpers. The dependency is installed automatically with the package, but make sure your runtime environment includes it before using the to_pd_dataframe utilities described below.

Configuration

Before using the library, you need to configure your credentials.

  1. Copy the environment file:
    cp _env .env
    
  2. Set your credentials path: Edit the .env file and set the PERMUTIVE_APPLICATION_CREDENTIALS environment variable to the absolute path of your workspace JSON file.
    PERMUTIVE_APPLICATION_CREDENTIALS="/absolute/path/to/your/workspace.json"
    

The workspace credentials JSON can be downloaded from the Permutive dashboard under Settings → API keys. Save the file somewhere secure. The apiKey inside this JSON is used to authenticate API calls.

Usage

Importing the Module

To use the PermutiveAPI module, import the necessary classes. The main classes are exposed at the top level of the PermutiveAPI package:

from PermutiveAPI import (
    Alias,
    Cohort,
    Identity,
    Import,
    Segment,
    Source,
    Workspace,
    ContextSegment,
)

Managing Workspaces

The Workspace class is the main entry point for interacting with your Permutive workspace.

# Create a workspace instance
workspace = Workspace(
    name="Main",
    organisation_id="your-org-id",
    workspace_id="your-workspace-id",
    api_key="your-api-key",
)

# List cohorts in the workspace (including child workspaces)
all_cohorts = workspace.cohorts()
print(f"Found {len(all_cohorts)} cohorts.")

# List imports in the workspace
all_imports = workspace.imports()
print(f"Found {len(all_imports)} imports.")

# List segments for a specific import
segments_in_import = workspace.segments(import_id="your-import-id")
print(f"Found {len(segments_in_import)} segments.")

Managing Cohorts

You can create, retrieve, and list cohorts using the Cohort class.

# List all cohorts
all_cohorts = Cohort.list(api_key="your_api_key")
print(f"Found {len(all_cohorts)} cohorts.")

# Get a specific cohort by ID
cohort_id = "your-cohort-id"
cohort = Cohort.get_by_id(id=cohort_id, api_key="your_api_key")
print(f"Retrieved cohort: {cohort.name}")

# Create a new cohort
new_cohort = Cohort(
    name="High-Value Customers",
    query={"type": "segment", "id": "segment-id-for-high-value-customers"}
)
new_cohort.create(api_key="your_api_key")
print(f"Created cohort with ID: {new_cohort.id}")

Managing Segments

The Segment class allows you to interact with audience segments.

# List all segments for a given import
import_id = "your-import-id"
segments = Segment.list(api_key="your_api_key", import_id=import_id)
print(f"Found {len(segments)} segments in import {import_id}.")

# Get a specific segment by ID
segment_id = "your-segment-id"
segment = Segment.get_by_id(import_id=import_id, segment_id=segment_id, api_key="your_api_key")
print(f"Retrieved segment: {segment.name}")

Managing Imports

You can list and retrieve imports using the Import class.

# List all imports
all_imports = Import.list(api_key="your_api_key")
for imp in all_imports:
    print(f"Import ID: {imp.id}, Code: {imp.code}, Source Type: {imp.source.type}")

# Get a specific import by ID
import_id = "your-import-id"
import_instance = Import.get_by_id(id=import_id, api_key="your_api_key")
print(f"Retrieved import: {import_instance.id}, Source Type: {import_instance.source.type}")

Managing Users

The Identity and Alias classes are used to manage user profiles.

# Create an alias for a user
alias = Alias(id="user@example.com", tag="email", priority=1)

# Create an identity for the user
identity = Identity(user_id="internal-user-id-123", aliases=[alias])

# Send the identity information to Permutive
try:
    identity.identify(api_key="your-api-key")
    print("Successfully identified user.")
except Exception as e:
    print(f"Error identifying user: {e}")

Evaluating Segmentation

The segmentation helpers expose the low-level CCS segmentation endpoint so you can evaluate arbitrary event streams against your configured audiences. Start by describing each event with the Event dataclass and then submit the request with the Segmentation helper.

from PermutiveAPI import Alias, Event, Segmentation


event = Event(
    name="SlotViewable",
    time="2025-07-01T15:39:11.594Z",
    properties={"campaign_id": "3747123491"},
)

request = Segmentation(
    alias=Alias(id="user@example.com", tag="email", priority=1),
    events=[event],
)

# Submit the request to retrieve segment membership
response = request.send(api_key="your-api-key")
print(response["segments"])  # [{"id": "segment-id", "name": "Segment Name"}, ...]

The segmentation endpoint accepts two optional query parameters that you can control directly from the helper:

Parameter Default What it does
activations False Include any activated cohorts in the response payload.
synchronous-validation False Validate events against their schemas before segmentation, which is useful for debugging but adds latency.

Set them when constructing the request or override them per call:

# Opt in for activations and synchronous validation on every request
request = Segmentation(
    user_id="user-123",
    events=[event],
    activations=True,
    synchronous_validation=True,
)

# Or override when sending if you only need them occasionally
response = request.send(
    api_key="your-api-key",
    activations=True,
    synchronous_validation=True,
)

Event.session_id and Event.view_id are optional—include them only when you need to tie events together across sessions or page views. When present, they are forwarded as part of the event payload.

For high-volume workloads, use Segmentation.batch_send to process multiple requests concurrently. The helper integrates with the shared batch runner described in the next section so you can surface throughput metrics via progress_callback while respecting rate limits.

Evaluating Context Segmentation

Use the ContextSegment helper to call the Context API endpoint (https://api.permutive.com/ctx/v1/segment) with a page URL and page properties payload.

from PermutiveAPI import ContextSegment

request = ContextSegment(
    url="https://example.com/article/sports-news",
    page_properties={
        "client": {
            "url": "https://example.com/article/sports-news",
            "domain": "example.com",
            "referrer": "https://example.com",
            "type": "web",
            "user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64)",
            "title": "Latest Sports News",
        },
        "category": "sports",
        "tags": ["football", "premier-league"],
    },
)

response = request.send(api_key="your-api-key")
print(response["segments"])

Working with pandas DataFrames

The list models expose helpers for quick DataFrame exports when you need to analyze your data using pandas. Each list class provides a to_pd_dataframe method that returns a pandas.DataFrame populated with the model attributes:

from PermutiveAPI import Cohort, CohortList

cohorts = CohortList(
    [
        Cohort(name="C1", id="1", code="c1", tags=["t1"]),
        Cohort(name="C2", id="2", description="second cohort"),
    ]
)

df = cohorts.to_pd_dataframe()
print(df[["id", "name"]])

The same helper is available on SegmentList and ImportList for consistency across the API.

Batch Helpers and Progress Callbacks

High-volume workflows often rely on the batch_* helpers to run requests concurrently. Every helper accepts an optional progress_callback that is invoked after each request completes with a PermutiveAPI.utils.http.Progress snapshot describing aggregate throughput. The dataclass includes counters for completed requests, failure totals, elapsed time, and the estimated seconds required to process 1,000 requests, making it straightforward to surface both reliability and latency trends in dashboards or logs. Most workloads achieve a good balance between throughput and API friendliness with max_workers=4. Increase the pool size gradually (for example to 6 or 8 workers) only after observing stable latency and error rates because the Permutive API enforces rate limits.

from PermutiveAPI import Cohort
from PermutiveAPI.utils.http import Progress


def on_progress(progress: Progress) -> None:
    """Render a concise progress snapshot."""
    avg = progress.average_per_thousand_seconds
    avg_display = f"{avg:.2f}s" if avg is not None else "n/a"
    print(
        f"{progress.completed}/{progress.total} completed; "
        f"errors: {progress.errors}, avg/1k: {avg_display}"
    )


cohorts = [
    Cohort(name="VIP Customers", query={"type": "users"}),
    Cohort(name="Returning Visitors", query={"type": "visitors"}),
]

responses, failures = Cohort.batch_create(
    cohorts,
    api_key="your-api-key",
    max_workers=4,  # recommended starting point for concurrent writes
    progress_callback=on_progress,
)

if failures:
    print(f"Encountered {len(failures)} failures.")

The same callback shape is shared across helpers such as Identity.batch_identify and Segment.batch_create, enabling reuse of progress reporting utilities that surface throughput, error counts, and latency projections. The helpers delegate to PermutiveAPI.utils.http.process_batch, so they automatically benefit from the shared retry/backoff configuration used by the underlying request helpers. When the API responds with HTTP 429 (rate limiting), the helper retries using the exponential backoff already built into the package before surfacing the error in the failures list. The segmentation helper, Segmentation.batch_send, also consumes the same callback so you can track progress consistently across segmentation workloads.

Configuring batch defaults

Two environment variables allow you to tune the default behaviour without touching application code:

  • PERMUTIVE_BATCH_MAX_WORKERS controls the worker pool size used by the shared batch runner when max_workers is omitted. Provide a positive integer to cap concurrency or leave it unset to use Python's default heuristic.
  • PERMUTIVE_BATCH_TIMEOUT_SECONDS controls the default timeout applied to each PermutiveAPI.utils.http.BatchRequest. Set it to a positive float (in seconds) to align the HTTP timeout with your infrastructure's expectations.

Invalid values raise ValueError during initialisation to surface mistakes early in the development cycle.

Configuring retry defaults

Transient failure handling can also be adjusted through environment variables. When unset, the package uses the standard RetryConfig defaults.

  • PERMUTIVE_RETRY_MAX_RETRIES sets the number of attempts performed by the HTTP helpers before surfacing an error. Provide a positive integer.
  • PERMUTIVE_RETRY_BACKOFF_FACTOR controls the exponential multiplier applied after each failed attempt. Provide a positive number (floats are accepted).
  • PERMUTIVE_RETRY_INITIAL_DELAY_SECONDS specifies the starting delay in seconds before retrying. Provide a positive number.

Supplying invalid values for any of these variables raises ValueError when the retry configuration is evaluated, helping catch misconfiguration early.

Segmentation workflows follow the same pattern. For example, you can create multiple segments for a given import in one request batch while reporting progress back to an observability system:

from PermutiveAPI import Segment


segments = [
    Segment(
        import_id="import-123",
        name="Frequent Flyers",
        query={"type": "users", "filter": {"country": "US"}},
    ),
    Segment(
        import_id="import-123",
        name="Dormant Subscribers",
        query={"type": "users", "filter": {"status": "inactive"}},
    ),
]

segment_responses, segment_failures = Segment.batch_create(
    segments,
    api_key="your-api-key",
    max_workers=4,
    progress_callback=on_progress,
)

if segment_failures:
    print(f"Encountered {len(segment_failures)} failures during segment creation.")

You can also evaluate multiple users in parallel while reporting progress back to an observability system:

from PermutiveAPI import Alias, Event, Segmentation


events = [
    Event(
        name="SlotViewable",
        time="2025-07-01T15:39:11.594Z",
        properties={"campaign_id": "3747123491"},
        session_id="f19199e4-1654-4869-b740-703fd5bafb6f",
        view_id="d30ccfc5-c621-4ac4-a282-9a30ac864c8a",
    )
]

requests = [
    Segmentation(user_id="user-1", events=events),
    Segmentation(user_id="user-2", events=events),
]

segmentation_responses, segmentation_failures = Segmentation.batch_send(
    requests,
    api_key="your-api-key",
    max_workers=4,
    progress_callback=on_progress,
)

if segmentation_failures:
    print(f"Encountered {len(segmentation_failures)} failures during segmentation.")

Error Handling

The package raises purpose-specific exceptions that are also available at the top level of the package for convenience:

from PermutiveAPI import (
    PermutiveAPIError,
    PermutiveAuthenticationError,
    PermutiveBadRequestError,
    PermutiveRateLimitError,
    PermutiveResourceNotFoundError,
    PermutiveServerError,
)

try:
    # make an API call via the high-level classes
    Cohort.list(api_key="your_api_key")
except PermutiveBadRequestError as e:
    # e.status, e.url, and e.response are available for debugging
    print(e.status, e.url, e)
except PermutiveAPIError as e:
    print("Unhandled API error:", e)

Development

To set up a development environment, install the development dependencies:

pip install -e ".[dev]"

Running Tests

Before committing any changes, please run the following checks to ensure code quality and correctness.

Style Checks:

pydocstyle src/PermutiveAPI
black --check .

Static Type Analysis:

pyright src/PermutiveAPI

Unit Tests and Coverage:

pytest -q --cov=src/PermutiveAPI --cov-report=term-missing --cov-fail-under=70

All checks must pass before a pull request can be merged.

Contributing

Contributions are welcome! Please see CONTRIBUTING.md for development setup and pull request guidelines.

License

This project is licensed under the MIT License. See the LICENSE file for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

permutiveapi-6.0.1.tar.gz (38.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

permutiveapi-6.0.1-py3-none-any.whl (52.7 kB view details)

Uploaded Python 3

File details

Details for the file permutiveapi-6.0.1.tar.gz.

File metadata

  • Download URL: permutiveapi-6.0.1.tar.gz
  • Upload date:
  • Size: 38.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.9.25

File hashes

Hashes for permutiveapi-6.0.1.tar.gz
Algorithm Hash digest
SHA256 5c7cf351cc3628c63b9dcbfac699633f03db56976e84faf6a7bbee8373ed01d0
MD5 9913501d0aece71c4f37870836208549
BLAKE2b-256 83230ca04a72bad459a3696e6e79ed67a779c39111c86bbb355c80bd2f2bdbc2

See more details on using hashes here.

File details

Details for the file permutiveapi-6.0.1-py3-none-any.whl.

File metadata

  • Download URL: permutiveapi-6.0.1-py3-none-any.whl
  • Upload date:
  • Size: 52.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.9.25

File hashes

Hashes for permutiveapi-6.0.1-py3-none-any.whl
Algorithm Hash digest
SHA256 b0ba7c3f26f59f10ab1b7fff9de97d231b3442b2ce89fbece3c2b24a179d9494
MD5 df85ee505c47887d13c12305714f0a62
BLAKE2b-256 548aba9cc6c13d83741a786f9e2caa27257b29eec0e3ed71db390dd34fcfde23

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page