Skip to main content

Moderation and guardrails library for Pruna AI

Project description

p-moderation

Async moderation library for Pruna AI with a unified, non-blocking provider-based architecture.

All providers support the same async interface:

  • submit() returns immediately with a job ID (non-blocking)
  • retrieve() fetches the result (when ready)
  • check_status() polls for completion without blocking

Providers:

  • OpenAIProvider: text + image moderation via OpenAI API
  • ReplicateProvider: text + image moderation via Replicate deployments (async, background threading)
  • TransformersImageClassifierProvider: local image-only classification (allowlisted models)
  • TransformersTextClassifierProvider: local text-only classification (allowlisted models, default: ezb/NSFW-Prompt-Detector)

Install

Base package:

uv add p-moderation
# or
pip install p-moderation

With local transformers support:

pip install 'p-moderation[transformers]'

Quick Start

OpenAI

Async moderation with the convenience constructor:

import asyncio
from p_moderation import ModerationClient

async def main() -> None:
    client = ModerationClient.from_openai(api_key="sk-...", fail_open=True)
    result = await client.moderate(text="Hello, this is benign.")
    print(result.action)  # ModerationAction.PASS
    print(result.flagged)  # False
    await client.aclose()

asyncio.run(main())

Or explicit provider injection:

from p_moderation import OpenAIProvider, ModerationClient

provider = OpenAIProvider(api_key="sk-...", model="omni-moderation-latest")
client = ModerationClient(provider=provider)

Replicate

High-throughput moderation with background threading (best for sync callers or fire-and-forget scenarios):

from p_moderation.providers.replicate import get_replicate_provider

provider = get_replicate_provider()

# Run moderation in background thread
q, event = provider.create_and_wait_deployment_thread(
    deployment_name="prunaai/p-moderation",
    input_data={"text": "Text to moderate", "timeout": 0.5},
)

# Do other work...
event.wait(timeout=30)
status, result = q.get_nowait()

if status == "success":
    print(f"Action: {result.get('action')}")

Performance: ~0.5s latency, 33+ req/sec sustained, early detection <0.35s with connection pooling.

Provider Usage

OpenAI

Text and/or image moderation (single request, all input types):

client = ModerationClient.from_openai(api_key="sk-...")

# Text only
result = await client.moderate(text="This is safe text")
print(result.item_results[0].input_type)  # "text"

# Image only
result = await client.moderate(image="https://example.com/image.jpg")
print(result.item_results[0].input_type)  # "image"

# Text + Image combined
result = await client.moderate(text="User prompt", image=image_bytes)
print(result.item_results[0].input_type)  # "combined"

Categories: violence, self-harm, sexual, harassment, hate, and illicit. Response includes category_applied_input_types showing which inputs evaluated each category.

Replicate

High-throughput moderation via Replicate deployments with optimized async polling and connection pooling:

from p_moderation.providers.replicate import get_replicate_provider

provider = get_replicate_provider()

# Direct async usage (returns when complete)
result = await provider.create_and_wait(
    "prunaai/p-moderation",
    {"text": "Text to moderate", "timeout": 0.5},
    timeout=30.0,
)
print(result.get("action"))  # "pass" or "block"

For sync callers (background threading):

q, event = provider.create_and_wait_deployment_thread(
    "prunaai/p-moderation",
    {"text": "Text to moderate", "timeout": 0.5},
)
event.wait(timeout=30)
status, result = q.get_nowait()

Performance: ~0.5s latency, 33+ req/sec, early detection <0.35s. Auto-tuned polling: interval = min(0.05, timeout/10), exponential backoff up to 0.5s.

Local Image Classification

Fast local NSFW detection (no API calls):

client = ModerationClient.from_image_classifier(
    model_id="Falconsai/nsfw_image_detection_26",
)

result = await client.moderate(image="/path/to/image.jpg")
print(result.action)  # ModerationAction.PASS or ModerationAction.MONITOR

Only allowlisted Hugging Face models accepted. Categories: nsfw / normal. Image-only (rejects text).

Local Text Classification

Local NSFW detection for text (default: ezb/NSFW-Prompt-Detector):

client = ModerationClient.from_text_classifier()

result = await client.moderate(text="Have a great day!")
print(result.action)  # ModerationAction.PASS
print(result.categories)  # {'sfw': ModerationAction.PASS, 'nsfw': ModerationAction.PASS}

Only allowlisted Hugging Face models accepted. Categories: sfw / nsfw. Text-only (rejects images).

Response Format & Optimization

Configurable Detail Levels

All providers return the same ModerationResult with policy already applied. Serialize to JSON with appropriate detail:

result = await client.moderate(text="Hello, this is safe text")

# Minimal (~50B): action + flagged only
minimal = result.to_dict(detail="minimal")

# Balanced (~200B, recommended): action + categories
balanced = result.to_dict(detail="balanced")

# Full (~1KB): complete response with all metadata
full = result.to_dict(detail="full")

Response Examples

Minimal (~50 bytes, for polling):

{
  "success": true,
  "action": "pass",
  "flagged": false
}

Balanced (~200 bytes, recommended for production):

{
  "success": true,
  "action": "pass",
  "flagged": false,
  "categories": {
    "harassment": "pass",
    "hate": "pass",
    "self-harm": "pass",
    "sexual": "pass",
    "violence": "pass"
  },
  "provider_response_id": "modr-8MZo6m9F5OMVdXcg3SBF6I0z"
}

Full (~1KB, for debugging):

{
  "success": true,
  "provider_name": "openai",
  "model": "omni-moderation-latest",
  "action": "pass",
  "flagged": false,
  "categories": {
    "harassment": "pass",
    "hate": "pass",
    "self-harm": "pass",
    "sexual": "pass",
    "violence": "pass"
  },
  "category_scores": {
    "harassment": 0.000023,
    "hate": 0.000010,
    "self-harm": 0.000005,
    "sexual": 0.000048,
    "violence": 0.000487
  },
  "category_applied_input_types": {
    "harassment": ["text"],
    "sexual": ["text", "image"],
    "violence": ["text", "image"]
  },
  "item_results": [
    {
      "item_index": 0,
      "input_type": "text",
      "action": "pass",
      "flagged": false,
      "categories": {
        "harassment": "pass",
        "sexual": "pass",
        "violence": "pass"
      },
      "category_scores": {
        "harassment": 0.000023,
        "sexual": 0.000048,
        "violence": 0.000487
      },
      "category_applied_input_types": {
        "harassment": ["text"],
        "sexual": ["text", "image"],
        "violence": ["text", "image"]
      }
    }
  ],
  "provider_response_id": "modr-8MZo6m9F5OMVdXcg3SBF6I0z",
  "request_id": null,
  "client_id": null,
  "error": null
}

Error Response (fail-open, action=pass):

{
  "success": false,
  "action": "pass",
  "flagged": false,
  "provider_response_id": null,
  "error": {
    "type": "TimeoutError",
    "message": "OpenAI API request timed out"
  }
}

Balanced is recommended for production (58% smaller than full, retains all decision transparency).

Image Optimization: Pre-Encoded Bytes

If you have bounded JPEG/PNG bytes, use EncodedImage to skip normalization:

from p_moderation import EncodedImage

encoded = EncodedImage(content=image_bytes, mime_type="image/jpeg")
result = await client.moderate(image=encoded)

For most cases, pass bytes, Path, or URL strings directly—the library handles encoding. Use EncodedImage only if profiling shows image preparation as a bottleneck.

Async Jobs: Submit and Retrieve

All providers support the same unified, non-blocking API. Submit a request (returns immediately), do other work, and retrieve results when ready.

How it works:

  • submit() extracts the first text/image from lists and launches a background task
  • Returns immediately with a job_id and status="pending"
  • Background task runs moderation and stores the result
  • retrieve() returns the stored ProviderModerationResponse when ready
  • Jobs are stored per-instance (memory-based for local providers, server-based for Replicate)

Quick Example

Submit a request and retrieve later:

provider = OpenAIProvider(api_key="sk-...")

# Submit request
prediction = await provider.submit(text="Some text to moderate")
job_id = prediction.id

# Do other work...
await run_expensive_computation()

# Retrieve result
result = await provider.retrieve(job_id)
print(result.item_results[0].flagged)  # False

Polling Status

You can check the status of a job without blocking:

prediction = await provider.submit(text="Text to moderate")

# Poll for completion
while True:
    status = await prediction.check_status()
    if status == "completed":
        break
    await asyncio.sleep(1)

result = prediction.output
print(result.item_results[0].input_type)  # "text"

Wait for Completion

Or use the prediction's .wait() method to block until done:

prediction = await provider.submit(text="Text to moderate")

# Wait for result (blocks until completion)
result = await prediction.wait(timeout=30.0)
print(result.provider_response_id)  # "modr-..."

Important: Job Storage and Persistence

  • OpenAI/Transformers: Results stored in memory (per provider instance)

    • Jobs are lost on restart or instance deletion
    • Best for: short-lived requests, single-session processing
  • Replicate: Results persisted on Replicate's servers

    • Jobs survive client disconnect and process restart
    • Best for: long-running jobs, reliable persistence needed

All providers support the same submit() / retrieve() interface, but Replicate's server-side storage means jobs remain accessible indefinitely.

Batch Moderation: moderate_many

Process multiple items efficiently using provider-specific optimizations.

Input Format

Each item is a dictionary with optional text and/or image:

items = [
    {"text": "Check this text"},
    {"image": image_bytes},
    {"text": "With image", "image": img_bytes},
]

results = await client.moderate_many(items)

Provider-Specific Strategies

OpenAI/Replicate: Submit-wait parallelization

  • Submits up to max_concurrency items in parallel
  • Respects API rate limits via concurrency tuning
  • Best for: API-based moderation with many items
client = ModerationClient.from_openai(api_key=api_key)

items = [
    {"text": f"Item {i}"} for i in range(100)
]

results = await client.moderate_many(
    items,
    max_concurrency=5,  # Tune based on rate limits
)

Transformers Classifiers: Sequential + batched pipeline

  • Items processed sequentially
  • Internal pipeline handles GPU batching via batch_size
  • Best for: Local inference with many texts/images
client = ModerationClient.from_text_classifier(batch_size=64)

items = [
    {"text": f"Text {i}"} for i in range(1000)
]

results = await client.moderate_many(items)

Response Structure

moderate_many returns a list of ModerationResult objects in input order:

results = await client.moderate_many(items)

for i, result in enumerate(results):
    print(f"Item {i}:")
    print(f"  action: {result.action}")  # ModerationAction enum
    print(f"  flagged: {result.flagged}")  # Boolean
    print(f"  input_type: {result.item_results[0].input_type}")  # "text", "image", "combined"
    print(f"  categories: {result.categories}")  # {category: action}
    print(f"  category_scores: {result.category_scores}")  # {category: float}
    if result.error:
        print(f"  error: {result.error}")

Response Examples

Text-only moderation:

result = await client.moderate(text="Have a great day!")
print(result)
# ModerationResult(
#   provider_name='openai',
#   model='omni-moderation-latest',
#   action=ModerationAction.PASS,
#   flagged=False,
#   categories={
#     'harassment': ModerationAction.PASS,
#     'hate': ModerationAction.PASS,
#     'self-harm': ModerationAction.PASS,
#     'sexual': ModerationAction.PASS,
#     'violence': ModerationAction.PASS,
#     ...
#   },
#   category_scores={
#     'harassment': 1.2e-05,
#     'hate': 9.8e-06,
#     'self-harm': 1.5e-05,
#     'sexual': 4.8e-05,
#     'violence': 4.8e-04,
#     ...
#   },
#   category_applied_input_types={
#     'harassment': ['text'],
#     'sexual': ['text'],
#     'violence': ['text'],
#     ...
#   },
#   item_results=[
#     ProviderItemResult(
#       item_index=0,
#       input_type='text',
#       flagged=False,
#       categories={...}
#     )
#   ]
# )

Combined text + image:

result = await client.moderate(text="User prompt", image=image_bytes)
print(result.item_results[0].input_type)  # "combined"
print(result.category_applied_input_types['violence'])  # ['text', 'image']
# Violence category was evaluated against both text and image

Batch with mixed inputs:

items = [
    {"text": "Text 1"},
    {"image": img1},
    {"text": "Text 2", "image": img2},
]

results = await client.moderate_many(items, max_concurrency=3)

# results[0].item_results[0].input_type → "text"
# results[1].item_results[0].input_type → "image"
# results[2].item_results[0].input_type → "combined"

Error handling:

results = await client.moderate_many(
    items,
    fail_open=True,  # Return PASS on errors
)

for i, result in enumerate(results):
    if result.error:
        print(f"Item {i} failed: {result.error.message}")
        print(f"Error type: {result.error.type}")
    else:
        print(f"Item {i}: {result.action}")

Performance Tips

  • OpenAI: Start with max_concurrency=5, increase gradually while monitoring for rate limits (429 errors)
  • Transformers: Tune batch_size in provider initialization (larger = faster but more VRAM needed)
  • Mixed inputs: Combine texts and images in the same batch safely (each gets appropriate evaluation)

Policies

Policies are provider-specific. The policy's provider_name must match the active provider.

from p_moderation import ModerationAction, ProviderPolicy

policy = ProviderPolicy(
    provider_name="openai",
    category_actions={
        "violence": ModerationAction.BLOCK,
        "violence/graphic": ModerationAction.BLOCK,
    },
    input_type_overrides={
        "image": {"sexual": ModerationAction.BLOCK},
    },
)

Failure Behavior

  • fail_open=True: provider/runtime errors return action=pass with result.error
  • fail_open=False: provider/runtime errors raise
  • provider configuration or capability errors raise regardless of fail_open

Benchmarks

General moderation latency:

uv run python benchmarks/bench_moderation.py --provider openai --iterations 10
uv run python benchmarks/bench_moderation.py --provider image-classifier --iterations 10
uv run --extra transformers python benchmarks/bench_moderation.py --provider text-classifier --iterations 10
uv run python benchmarks/bench_moderation.py --provider openai --iterations 10 --concurrency 5
uv run --extra aiohttp python benchmarks/bench_moderation.py --provider openai --iterations 10 --concurrency 5 --http-backend aiohttp

The moderation benchmark reports per-request latency plus aggregate throughput for the selected concurrency level. For OpenAI, you can also compare the default SDK transport versus aiohttp. See benchmarks/README.md for recorded commands and the latest captured numbers.

Development

Install the git hooks once:

uv run --extra dev pre-commit install

Useful local commands:

make check
make bench-openai BENCH_ARGS='--concurrency 5'
make bench-image-classifier BENCH_ARGS='--concurrency 5'
make bench-text-classifier BENCH_ARGS='--concurrency 5'

Notes

  • Categories are provider-specific by design. OpenAI, image-classifier, and text-classifier labels are not normalized into one shared taxonomy.
  • Migration: old FalconsAITransformersProvider naming was removed; use TransformersImageClassifierProvider or ModerationClient.from_image_classifier(...).

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

p_moderation-0.0.2.tar.gz (424.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

p_moderation-0.0.2-py3-none-any.whl (42.6 kB view details)

Uploaded Python 3

File details

Details for the file p_moderation-0.0.2.tar.gz.

File metadata

  • Download URL: p_moderation-0.0.2.tar.gz
  • Upload date:
  • Size: 424.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.17 {"installer":{"name":"uv","version":"0.9.17","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"macOS","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for p_moderation-0.0.2.tar.gz
Algorithm Hash digest
SHA256 9189d3ac6a07ca71b3584cfe961135a90f13d954f041b76be2355b62c21310cd
MD5 80c61dfdeaa1e31b07f5438e2c704e0a
BLAKE2b-256 f5d2594f9cfe16009d5af0bbb6c59a41e34d732b6428c19427193e6650752fc6

See more details on using hashes here.

File details

Details for the file p_moderation-0.0.2-py3-none-any.whl.

File metadata

  • Download URL: p_moderation-0.0.2-py3-none-any.whl
  • Upload date:
  • Size: 42.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.17 {"installer":{"name":"uv","version":"0.9.17","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"macOS","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for p_moderation-0.0.2-py3-none-any.whl
Algorithm Hash digest
SHA256 c791f89747cdb80ad13b7aee8e9f81201ba535de6529ac6e054f2345c5839ac5
MD5 4ee5e0b74752157ce02e109b78b237e4
BLAKE2b-256 7210c924fe1a8885cb2431a6ef651054ac18dd509b2598eeac082ed278e2c799

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page