Moderation and guardrails library for Pruna AI
Project description
p-moderation
Async moderation library for Pruna AI with a unified, non-blocking provider-based architecture.
All providers support the same async interface:
submit()returns immediately with a job ID (non-blocking)retrieve()fetches the result (when ready)check_status()polls for completion without blocking
Providers:
OpenAIProvider: text + image moderation via OpenAI APIReplicateProvider: text + image moderation via Replicate deployments (async, background threading)TransformersImageClassifierProvider: local image-only classification (allowlisted models)TransformersTextClassifierProvider: local text-only classification (allowlisted models, default:ezb/NSFW-Prompt-Detector)
Install
Base package:
uv add p-moderation
# or
pip install p-moderation
With local transformers support:
pip install 'p-moderation[transformers]'
Quick Start
OpenAI
Async moderation with the convenience constructor:
import asyncio
from p_moderation import ModerationClient
async def main() -> None:
client = ModerationClient.from_openai(api_key="sk-...", fail_open=True)
result = await client.moderate(text="Hello, this is benign.")
print(result.action) # ModerationAction.PASS
print(result.flagged) # False
await client.aclose()
asyncio.run(main())
Or explicit provider injection:
from p_moderation import OpenAIProvider, ModerationClient
provider = OpenAIProvider(api_key="sk-...", model="omni-moderation-latest")
client = ModerationClient(provider=provider)
Replicate
High-throughput moderation with background threading (best for sync callers or fire-and-forget scenarios):
from p_moderation.providers.replicate import get_replicate_provider
provider = get_replicate_provider()
# Run moderation in background thread
q, event = provider.create_and_wait_deployment_thread(
deployment_name="prunaai/p-moderation",
input_data={"text": "Text to moderate", "timeout": 0.5},
)
# Do other work...
event.wait(timeout=30)
status, result = q.get_nowait()
if status == "success":
print(f"Action: {result.get('action')}")
Performance: ~0.5s latency, 33+ req/sec sustained, early detection <0.35s with connection pooling.
Provider Usage
OpenAI
Text and/or image moderation (single request, all input types):
client = ModerationClient.from_openai(api_key="sk-...")
# Text only
result = await client.moderate(text="This is safe text")
print(result.item_results[0].input_type) # "text"
# Image only
result = await client.moderate(image="https://example.com/image.jpg")
print(result.item_results[0].input_type) # "image"
# Text + Image combined
result = await client.moderate(text="User prompt", image=image_bytes)
print(result.item_results[0].input_type) # "combined"
Categories: violence, self-harm, sexual, harassment, hate, and illicit. Response includes category_applied_input_types showing which inputs evaluated each category.
Replicate
High-throughput moderation via Replicate deployments with optimized async polling and connection pooling:
from p_moderation.providers.replicate import get_replicate_provider
provider = get_replicate_provider()
# Direct async usage (returns when complete)
result = await provider.create_and_wait(
"prunaai/p-moderation",
{"text": "Text to moderate", "timeout": 0.5},
timeout=30.0,
)
print(result.get("action")) # "pass" or "block"
For sync callers (background threading):
q, event = provider.create_and_wait_deployment_thread(
"prunaai/p-moderation",
{"text": "Text to moderate", "timeout": 0.5},
)
event.wait(timeout=30)
status, result = q.get_nowait()
Performance: ~0.5s latency, 33+ req/sec, early detection <0.35s. Auto-tuned polling: interval = min(0.05, timeout/10), exponential backoff up to 0.5s.
Local Image Classification
Fast local NSFW detection (no API calls):
client = ModerationClient.from_image_classifier(
model_id="Falconsai/nsfw_image_detection_26",
)
result = await client.moderate(image="/path/to/image.jpg")
print(result.action) # ModerationAction.PASS or ModerationAction.MONITOR
Only allowlisted Hugging Face models accepted. Categories: nsfw / normal. Image-only (rejects text).
Local Text Classification
Local NSFW detection for text (default: ezb/NSFW-Prompt-Detector):
client = ModerationClient.from_text_classifier()
result = await client.moderate(text="Have a great day!")
print(result.action) # ModerationAction.PASS
print(result.categories) # {'sfw': ModerationAction.PASS, 'nsfw': ModerationAction.PASS}
Only allowlisted Hugging Face models accepted. Categories: sfw / nsfw. Text-only (rejects images).
Response Format & Optimization
Configurable Detail Levels
All providers return the same ModerationResult with policy already applied. Serialize to JSON with appropriate detail:
result = await client.moderate(text="Hello, this is safe text")
# Minimal (~50B): action + flagged only
minimal = result.to_dict(detail="minimal")
# Balanced (~200B, recommended): action + categories
balanced = result.to_dict(detail="balanced")
# Full (~1KB): complete response with all metadata
full = result.to_dict(detail="full")
Response Examples
Minimal (~50 bytes, for polling):
{
"success": true,
"action": "pass",
"flagged": false
}
Balanced (~200 bytes, recommended for production):
{
"success": true,
"action": "pass",
"flagged": false,
"categories": {
"harassment": "pass",
"hate": "pass",
"self-harm": "pass",
"sexual": "pass",
"violence": "pass"
},
"provider_response_id": "modr-8MZo6m9F5OMVdXcg3SBF6I0z"
}
Full (~1KB, for debugging):
{
"success": true,
"provider_name": "openai",
"model": "omni-moderation-latest",
"action": "pass",
"flagged": false,
"categories": {
"harassment": "pass",
"hate": "pass",
"self-harm": "pass",
"sexual": "pass",
"violence": "pass"
},
"category_scores": {
"harassment": 0.000023,
"hate": 0.000010,
"self-harm": 0.000005,
"sexual": 0.000048,
"violence": 0.000487
},
"category_applied_input_types": {
"harassment": ["text"],
"sexual": ["text", "image"],
"violence": ["text", "image"]
},
"item_results": [
{
"item_index": 0,
"input_type": "text",
"action": "pass",
"flagged": false,
"categories": {
"harassment": "pass",
"sexual": "pass",
"violence": "pass"
},
"category_scores": {
"harassment": 0.000023,
"sexual": 0.000048,
"violence": 0.000487
},
"category_applied_input_types": {
"harassment": ["text"],
"sexual": ["text", "image"],
"violence": ["text", "image"]
}
}
],
"provider_response_id": "modr-8MZo6m9F5OMVdXcg3SBF6I0z",
"request_id": null,
"client_id": null,
"error": null
}
Error Response (fail-open, action=pass):
{
"success": false,
"action": "pass",
"flagged": false,
"provider_response_id": null,
"error": {
"type": "TimeoutError",
"message": "OpenAI API request timed out"
}
}
Balanced is recommended for production (58% smaller than full, retains all decision transparency).
Image Optimization: Pre-Encoded Bytes
If you have bounded JPEG/PNG bytes, use EncodedImage to skip normalization:
from p_moderation import EncodedImage
encoded = EncodedImage(content=image_bytes, mime_type="image/jpeg")
result = await client.moderate(image=encoded)
For most cases, pass bytes, Path, or URL strings directly—the library handles encoding. Use EncodedImage only if profiling shows image preparation as a bottleneck.
Async Jobs: Submit and Retrieve
All providers support the same unified, non-blocking API. Submit a request (returns immediately), do other work, and retrieve results when ready.
How it works:
submit()extracts the first text/image from lists and launches a background task- Returns immediately with a
job_idandstatus="pending" - Background task runs moderation and stores the result
retrieve()returns the storedProviderModerationResponsewhen ready- Jobs are stored per-instance (memory-based for local providers, server-based for Replicate)
Quick Example
Submit a request and retrieve later:
provider = OpenAIProvider(api_key="sk-...")
# Submit request
prediction = await provider.submit(text="Some text to moderate")
job_id = prediction.id
# Do other work...
await run_expensive_computation()
# Retrieve result
result = await provider.retrieve(job_id)
print(result.item_results[0].flagged) # False
Polling Status
You can check the status of a job without blocking:
prediction = await provider.submit(text="Text to moderate")
# Poll for completion
while True:
status = await prediction.check_status()
if status == "completed":
break
await asyncio.sleep(1)
result = prediction.output
print(result.item_results[0].input_type) # "text"
Wait for Completion
Or use the prediction's .wait() method to block until done:
prediction = await provider.submit(text="Text to moderate")
# Wait for result (blocks until completion)
result = await prediction.wait(timeout=30.0)
print(result.provider_response_id) # "modr-..."
Important: Job Storage and Persistence
-
OpenAI/Transformers: Results stored in memory (per provider instance)
- Jobs are lost on restart or instance deletion
- Best for: short-lived requests, single-session processing
-
Replicate: Results persisted on Replicate's servers
- Jobs survive client disconnect and process restart
- Best for: long-running jobs, reliable persistence needed
All providers support the same submit() / retrieve() interface, but Replicate's server-side storage means jobs remain accessible indefinitely.
Batch Moderation: moderate_many
Process multiple items efficiently using provider-specific optimizations.
Input Format
Each item is a dictionary with optional text and/or image:
items = [
{"text": "Check this text"},
{"image": image_bytes},
{"text": "With image", "image": img_bytes},
]
results = await client.moderate_many(items)
Provider-Specific Strategies
OpenAI/Replicate: Submit-wait parallelization
- Submits up to
max_concurrencyitems in parallel - Respects API rate limits via concurrency tuning
- Best for: API-based moderation with many items
client = ModerationClient.from_openai(api_key=api_key)
items = [
{"text": f"Item {i}"} for i in range(100)
]
results = await client.moderate_many(
items,
max_concurrency=5, # Tune based on rate limits
)
Transformers Classifiers: Sequential + batched pipeline
- Items processed sequentially
- Internal pipeline handles GPU batching via
batch_size - Best for: Local inference with many texts/images
client = ModerationClient.from_text_classifier(batch_size=64)
items = [
{"text": f"Text {i}"} for i in range(1000)
]
results = await client.moderate_many(items)
Response Structure
moderate_many returns a list of ModerationResult objects in input order:
results = await client.moderate_many(items)
for i, result in enumerate(results):
print(f"Item {i}:")
print(f" action: {result.action}") # ModerationAction enum
print(f" flagged: {result.flagged}") # Boolean
print(f" input_type: {result.item_results[0].input_type}") # "text", "image", "combined"
print(f" categories: {result.categories}") # {category: action}
print(f" category_scores: {result.category_scores}") # {category: float}
if result.error:
print(f" error: {result.error}")
Response Examples
Text-only moderation:
result = await client.moderate(text="Have a great day!")
print(result)
# ModerationResult(
# provider_name='openai',
# model='omni-moderation-latest',
# action=ModerationAction.PASS,
# flagged=False,
# categories={
# 'harassment': ModerationAction.PASS,
# 'hate': ModerationAction.PASS,
# 'self-harm': ModerationAction.PASS,
# 'sexual': ModerationAction.PASS,
# 'violence': ModerationAction.PASS,
# ...
# },
# category_scores={
# 'harassment': 1.2e-05,
# 'hate': 9.8e-06,
# 'self-harm': 1.5e-05,
# 'sexual': 4.8e-05,
# 'violence': 4.8e-04,
# ...
# },
# category_applied_input_types={
# 'harassment': ['text'],
# 'sexual': ['text'],
# 'violence': ['text'],
# ...
# },
# item_results=[
# ProviderItemResult(
# item_index=0,
# input_type='text',
# flagged=False,
# categories={...}
# )
# ]
# )
Combined text + image:
result = await client.moderate(text="User prompt", image=image_bytes)
print(result.item_results[0].input_type) # "combined"
print(result.category_applied_input_types['violence']) # ['text', 'image']
# Violence category was evaluated against both text and image
Batch with mixed inputs:
items = [
{"text": "Text 1"},
{"image": img1},
{"text": "Text 2", "image": img2},
]
results = await client.moderate_many(items, max_concurrency=3)
# results[0].item_results[0].input_type → "text"
# results[1].item_results[0].input_type → "image"
# results[2].item_results[0].input_type → "combined"
Error handling:
results = await client.moderate_many(
items,
fail_open=True, # Return PASS on errors
)
for i, result in enumerate(results):
if result.error:
print(f"Item {i} failed: {result.error.message}")
print(f"Error type: {result.error.type}")
else:
print(f"Item {i}: {result.action}")
Performance Tips
- OpenAI: Start with
max_concurrency=5, increase gradually while monitoring for rate limits (429 errors) - Transformers: Tune
batch_sizein provider initialization (larger = faster but more VRAM needed) - Mixed inputs: Combine texts and images in the same batch safely (each gets appropriate evaluation)
Policies
Policies are provider-specific. The policy's provider_name must match the active provider.
from p_moderation import ModerationAction, ProviderPolicy
policy = ProviderPolicy(
provider_name="openai",
category_actions={
"violence": ModerationAction.BLOCK,
"violence/graphic": ModerationAction.BLOCK,
},
input_type_overrides={
"image": {"sexual": ModerationAction.BLOCK},
},
)
Failure Behavior
fail_open=True: provider/runtime errors returnaction=passwithresult.errorfail_open=False: provider/runtime errors raise- provider configuration or capability errors raise regardless of
fail_open
Benchmarks
General moderation latency:
uv run python benchmarks/bench_moderation.py --provider openai --iterations 10
uv run python benchmarks/bench_moderation.py --provider image-classifier --iterations 10
uv run --extra transformers python benchmarks/bench_moderation.py --provider text-classifier --iterations 10
uv run python benchmarks/bench_moderation.py --provider openai --iterations 10 --concurrency 5
uv run --extra aiohttp python benchmarks/bench_moderation.py --provider openai --iterations 10 --concurrency 5 --http-backend aiohttp
The moderation benchmark reports per-request latency plus aggregate throughput for the selected concurrency level.
For OpenAI, you can also compare the default SDK transport versus aiohttp.
See benchmarks/README.md for recorded commands and the latest captured numbers.
Development
Install the git hooks once:
uv run --extra dev pre-commit install
Useful local commands:
make check
make bench-openai BENCH_ARGS='--concurrency 5'
make bench-image-classifier BENCH_ARGS='--concurrency 5'
make bench-text-classifier BENCH_ARGS='--concurrency 5'
Notes
- Categories are provider-specific by design. OpenAI, image-classifier, and text-classifier labels are not normalized into one shared taxonomy.
- Migration: old
FalconsAITransformersProvidernaming was removed; useTransformersImageClassifierProviderorModerationClient.from_image_classifier(...).
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file p_moderation-0.0.3.tar.gz.
File metadata
- Download URL: p_moderation-0.0.3.tar.gz
- Upload date:
- Size: 425.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.9.17 {"installer":{"name":"uv","version":"0.9.17","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"macOS","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
7ec3742056a80a8e909accc9850a97a213b21396fcab38e8d68f2f85fb591500
|
|
| MD5 |
20f3e0f819adebd773a51c7de4dc1873
|
|
| BLAKE2b-256 |
2cd726f1627bb4e714d58c8e1aaafb1cb736da2fcccf93fa2c8abe49d532a43f
|
File details
Details for the file p_moderation-0.0.3-py3-none-any.whl.
File metadata
- Download URL: p_moderation-0.0.3-py3-none-any.whl
- Upload date:
- Size: 42.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.9.17 {"installer":{"name":"uv","version":"0.9.17","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"macOS","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
89f2c126f4120809f6478ec61d4a90cf0c740553b8bb09dd98ece19518592de1
|
|
| MD5 |
86863377ba4833faf1eb66606e4b5239
|
|
| BLAKE2b-256 |
6a791ccccbc3fefd5cba35b2b9bfb984c8c0441fccf4beebcd8137622b2b66af
|