A Python wrapper for the Permutive API.
Project description
PermutiveAPI
PermutiveAPI is a Python module to interact with the Permutive API. It provides a set of classes and methods to manage users, imports, cohorts, and workspaces within the Permutive ecosystem.
Table of Contents
- Installation
- Configuration
- Usage
- Managing Imports
- Managing Users
- Evaluating Segmentation
- Working with pandas DataFrames
- Batch Helpers and Progress Callbacks
- Error Handling
- Development
- Contributing
- License
Installation
You can install the PermutiveAPI module using pip:
pip install PermutiveAPI --upgrade
Note PermutiveAPI depends on
pandasfor its DataFrame export helpers. The dependency is installed automatically with the package, but make sure your runtime environment includes it before using theto_pd_dataframeutilities described below.
Configuration
Before using the library, you need to configure your credentials.
- Copy the environment file:
cp _env .env
- Set your credentials path:
Edit the
.envfile and set thePERMUTIVE_APPLICATION_CREDENTIALSenvironment variable to the absolute path of your workspace JSON file.PERMUTIVE_APPLICATION_CREDENTIALS="/absolute/path/to/your/workspace.json"
The workspace credentials JSON can be downloaded from the Permutive dashboard under Settings → API keys. Save the file somewhere secure. The apiKey inside this JSON is used to authenticate API calls.
Usage
Importing the Module
To use the PermutiveAPI module, import the necessary classes. The main classes are exposed at the top level of the PermutiveAPI package:
from PermutiveAPI import (
Alias,
Cohort,
Identity,
Import,
Segment,
Source,
Workspace,
)
Managing Workspaces
The Workspace class is the main entry point for interacting with your Permutive workspace.
# Create a workspace instance
workspace = Workspace(
name="Main",
organisation_id="your-org-id",
workspace_id="your-workspace-id",
api_key="your-api-key",
)
# List cohorts in the workspace (including child workspaces)
all_cohorts = workspace.cohorts()
print(f"Found {len(all_cohorts)} cohorts.")
# List imports in the workspace
all_imports = workspace.imports()
print(f"Found {len(all_imports)} imports.")
# List segments for a specific import
segments_in_import = workspace.segments(import_id="your-import-id")
print(f"Found {len(segments_in_import)} segments.")
Managing Cohorts
You can create, retrieve, and list cohorts using the Cohort class.
# List all cohorts
all_cohorts = Cohort.list(api_key="your_api_key")
print(f"Found {len(all_cohorts)} cohorts.")
# Get a specific cohort by ID
cohort_id = "your-cohort-id"
cohort = Cohort.get_by_id(id=cohort_id, api_key="your_api_key")
print(f"Retrieved cohort: {cohort.name}")
# Create a new cohort
new_cohort = Cohort(
name="High-Value Customers",
query={"type": "segment", "id": "segment-id-for-high-value-customers"}
)
new_cohort.create(api_key="your_api_key")
print(f"Created cohort with ID: {new_cohort.id}")
Managing Segments
The Segment class allows you to interact with audience segments.
# List all segments for a given import
import_id = "your-import-id"
segments = Segment.list(api_key="your_api_key", import_id=import_id)
print(f"Found {len(segments)} segments in import {import_id}.")
# Get a specific segment by ID
segment_id = "your-segment-id"
segment = Segment.get_by_id(import_id=import_id, segment_id=segment_id, api_key="your_api_key")
print(f"Retrieved segment: {segment.name}")
Managing Imports
You can list and retrieve imports using the Import class.
# List all imports
all_imports = Import.list(api_key="your_api_key")
for imp in all_imports:
print(f"Import ID: {imp.id}, Code: {imp.code}, Source Type: {imp.source.type}")
# Get a specific import by ID
import_id = "your-import-id"
import_instance = Import.get_by_id(id=import_id, api_key="your_api_key")
print(f"Retrieved import: {import_instance.id}, Source Type: {import_instance.source.type}")
Managing Users
The Identity and Alias classes are used to manage user profiles.
# Create an alias for a user
alias = Alias(id="user@example.com", tag="email", priority=1)
# Create an identity for the user
identity = Identity(user_id="internal-user-id-123", aliases=[alias])
# Send the identity information to Permutive
try:
identity.identify(api_key="your-api-key")
print("Successfully identified user.")
except Exception as e:
print(f"Error identifying user: {e}")
Evaluating Segmentation
The segmentation helpers expose the low-level CCS segmentation endpoint so you
can evaluate arbitrary event streams against your configured audiences. Start by
describing each event with the Event dataclass and then submit the request with
the Segmentation helper.
from PermutiveAPI import Event, Segmentation
event = Event(
name="SlotViewable",
time="2025-07-01T15:39:11.594Z",
properties={"campaign_id": "3747123491"},
)
request = Segmentation(user_id="user-123", events=[event])
# Submit the request to retrieve segment membership
response = request.send(api_key="your-api-key")
print(response["segments"]) # [{"id": "segment-id", "name": "Segment Name"}, ...]
The segmentation endpoint accepts two optional query parameters that you can control directly from the helper:
| Parameter | Default | What it does |
|---|---|---|
activations |
False |
Include any activated cohorts in the response payload. |
synchronous-validation |
False |
Validate events against their schemas before segmentation, which is useful for debugging but adds latency. |
Set them when constructing the request or override them per call:
# Opt in for activations and synchronous validation on every request
request = Segmentation(
user_id="user-123",
events=[event],
activations=True,
synchronous_validation=True,
)
# Or override when sending if you only need them occasionally
response = request.send(
api_key="your-api-key",
activations=True,
synchronous_validation=True,
)
Event.session_id and Event.view_id are optional—include them only when you
need to tie events together across sessions or page views. When present, they
are forwarded as part of the event payload.
For high-volume workloads, use Segmentation.batch_send to process multiple
requests concurrently. The helper integrates with the shared batch runner
described in the next section so you can surface throughput metrics via
progress_callback while respecting rate limits.
Working with pandas DataFrames
The list models expose helpers for quick DataFrame exports when you need to
analyze your data using pandas. Each list class provides a to_pd_dataframe
method that returns a pandas.DataFrame populated with the model attributes:
from PermutiveAPI import Cohort, CohortList
cohorts = CohortList(
[
Cohort(name="C1", id="1", code="c1", tags=["t1"]),
Cohort(name="C2", id="2", description="second cohort"),
]
)
df = cohorts.to_pd_dataframe()
print(df[["id", "name"]])
The same helper is available on SegmentList and ImportList for consistency
across the API.
Batch Helpers and Progress Callbacks
High-volume workflows often rely on the batch_* helpers to run requests
concurrently. Every helper accepts an optional progress_callback that is
invoked after each request completes with a
:class:~PermutiveAPI._Utils.http.Progress snapshot describing aggregate
throughput. The dataclass includes counters for completed requests, failure
totals, elapsed time, and the estimated seconds required to process 1,000
requests, making it straightforward to surface both reliability and latency
trends in dashboards or logs. Most workloads achieve a good balance between
throughput and API friendliness with max_workers=4. Increase the pool size
gradually (for example to 6 or 8 workers) only after observing stable latency
and error rates because the Permutive API enforces rate limits.
from PermutiveAPI import Cohort
from PermutiveAPI._Utils.http import Progress
def on_progress(progress: Progress) -> None:
"""Render a concise progress snapshot."""
avg = progress.average_per_thousand_seconds
avg_display = f"{avg:.2f}s" if avg is not None else "n/a"
print(
f"{progress.completed}/{progress.total} completed; "
f"errors: {progress.errors}, avg/1k: {avg_display}"
)
cohorts = [
Cohort(name="VIP Customers", query={"type": "users"}),
Cohort(name="Returning Visitors", query={"type": "visitors"}),
]
responses, failures = Cohort.batch_create(
cohorts,
api_key="your-api-key",
max_workers=4, # recommended starting point for concurrent writes
progress_callback=on_progress,
)
if failures:
print(f"Encountered {len(failures)} failures.")
The same callback shape is shared across helpers such as
Identity.batch_identify and Segment.batch_create, enabling reuse of
progress reporting utilities that surface throughput, error counts, and
latency projections. The helpers delegate to
:func:PermutiveAPI._Utils.http.process_batch, so they automatically benefit
from the shared retry/backoff configuration used by the underlying request
helpers. When the API responds with HTTP 429 (rate limiting), the helper
retries using the exponential backoff already built into the package before
surfacing the error in the failures list. The segmentation helper,
Segmentation.batch_send, also consumes the same callback so you can track
progress consistently across segmentation workloads.
Configuring batch defaults
Two environment variables allow you to tune the default behaviour without touching application code:
PERMUTIVE_BATCH_MAX_WORKERScontrols the worker pool size used by the shared batch runner whenmax_workersis omitted. Provide a positive integer to cap concurrency or leave it unset to use Python's default heuristic.PERMUTIVE_BATCH_TIMEOUT_SECONDScontrols the default timeout applied to eachPermutiveAPI._Utils.http.BatchRequest. Set it to a positive float (in seconds) to align the HTTP timeout with your infrastructure's expectations.
Invalid values raise ValueError during initialisation to surface mistakes
early in the development cycle.
Configuring retry defaults
Transient failure handling can also be adjusted through environment variables.
When unset, the package uses the standard RetryConfig defaults.
PERMUTIVE_RETRY_MAX_RETRIESsets the number of attempts performed by the HTTP helpers before surfacing an error. Provide a positive integer.PERMUTIVE_RETRY_BACKOFF_FACTORcontrols the exponential multiplier applied after each failed attempt. Provide a positive number (floats are accepted).PERMUTIVE_RETRY_INITIAL_DELAY_SECONDSspecifies the starting delay in seconds before retrying. Provide a positive number.
Supplying invalid values for any of these variables raises ValueError when
the retry configuration is evaluated, helping catch misconfiguration early.
Segmentation workflows follow the same pattern. For example, you can create multiple segments for a given import in one request batch while reporting progress back to an observability system:
from PermutiveAPI import Segment
segments = [
Segment(
import_id="import-123",
name="Frequent Flyers",
query={"type": "users", "filter": {"country": "US"}},
),
Segment(
import_id="import-123",
name="Dormant Subscribers",
query={"type": "users", "filter": {"status": "inactive"}},
),
]
segment_responses, segment_failures = Segment.batch_create(
segments,
api_key="your-api-key",
max_workers=4,
progress_callback=on_progress,
)
if segment_failures:
print(f"Encountered {len(segment_failures)} failures during segment creation.")
You can also evaluate multiple users in parallel while reporting progress back to an observability system:
from PermutiveAPI import Event, Segmentation
events = [
Event(
name="SlotViewable",
time="2025-07-01T15:39:11.594Z",
properties={"campaign_id": "3747123491"},
session_id="f19199e4-1654-4869-b740-703fd5bafb6f",
view_id="d30ccfc5-c621-4ac4-a282-9a30ac864c8a",
)
]
requests = [
Segmentation(user_id="user-1", events=events),
Segmentation(user_id="user-2", events=events),
]
segmentation_responses, segmentation_failures = Segmentation.batch_send(
requests,
api_key="your-api-key",
max_workers=4,
progress_callback=on_progress,
)
if segmentation_failures:
print(f"Encountered {len(segmentation_failures)} failures during segmentation.")
Error Handling
The package raises purpose-specific exceptions that are also available at the top level of the package for convenience:
from PermutiveAPI import (
PermutiveAPIError,
PermutiveAuthenticationError,
PermutiveBadRequestError,
PermutiveRateLimitError,
PermutiveResourceNotFoundError,
PermutiveServerError,
)
try:
# make an API call via the high-level classes
Cohort.list(api_key="your_api_key")
except PermutiveBadRequestError as e:
# e.status, e.url, and e.response are available for debugging
print(e.status, e.url, e)
except PermutiveAPIError as e:
print("Unhandled API error:", e)
Development
To set up a development environment, install the development dependencies:
pip install ".[dev]"
Running Tests
Before committing any changes, please run the following checks to ensure code quality and correctness.
Style Checks:
pydocstyle PermutiveAPI
black --check .
Static Type Analysis:
pyright PermutiveAPI
Unit Tests and Coverage:
pytest -q --cov=PermutiveAPI --cov-report=term-missing --cov-fail-under=70
All checks must pass before a pull request can be merged.
Contributing
Contributions are welcome! Please see CONTRIBUTING.md for development setup and pull request guidelines.
License
This project is licensed under the MIT License. See the LICENSE file for details.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file permutiveapi-5.4.4.tar.gz.
File metadata
- Download URL: permutiveapi-5.4.4.tar.gz
- Upload date:
- Size: 52.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.9.25
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
48b0b40bdbb7c6f5dec0c738408792b923eaf2747895a656b8aaf7017d15c6aa
|
|
| MD5 |
45045c3739728b195bc138c2d52a99aa
|
|
| BLAKE2b-256 |
950a4387cf353ccac3d279ebda41db4575341994408e806bf2876531a64953c1
|
File details
Details for the file permutiveapi-5.4.4-py3-none-any.whl.
File metadata
- Download URL: permutiveapi-5.4.4-py3-none-any.whl
- Upload date:
- Size: 45.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.9.25
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
39ed7c7b5e90bee59c4ccb59f3442821a883c00f4ebb86aebaee7d909d735780
|
|
| MD5 |
d0b4c8c1e87d8913e94f8ef7d2dd958f
|
|
| BLAKE2b-256 |
3a538697ed111c5defba932de1f2b856feb6ed4cd80995a2051ea89949de299b
|