Skip to main content

Real-time data quality screening API — PASS / WARN / BLOCK in under 10ms

Project description

DataScreenIQ Python SDK

PyPI version Python 3.8+ Monthly installs MIT License Sub-10ms

Stop bad data before it enters your pipeline.
Real-time schema drift detection and data quality screening — returns PASS / WARN / BLOCK in milli seconds.


The problem

Your pipeline ran successfully last night. The dashboard is broken this morning.

Somewhere between your upstream API and your database, a field went null, a type changed, a schema drifted, or a timestamp went stale — and nothing caught it. Data quality tools are almost always batch-based. They run after the INSERT. By the time Great Expectations or dbt tests flag an issue, bad rows have been in production for hours.

DataScreenIQ moves the check to the ingest boundary — before storage, before transformation, before damage.

Your API → DataScreenIQ → PASS ✓ → Database
                        → WARN ⚠ → Quarantine / flag
                        → BLOCK ✗ → Dead-letter queue

Install

pip install datascreeniq

Optional extras:

pip install datascreeniq[pandas]   # screen DataFrames directly
pip install datascreeniq[excel]    # screen .xlsx files
pip install datascreeniq[all]      # everything

60-second quickstart

import datascreeniq as dsiq

client = dsiq.Client("dsiq_live_...")   # get free key at datascreeniq.com

rows = [
    {"order_id": "ORD-001", "amount": 99.50,    "email": "alice@corp.com"},
    {"order_id": "ORD-002", "amount": "broken", "email": None},           # type mismatch
    {"order_id": "ORD-003", "amount": 75.00,    "email": None},           # null
]

report = client.screen(rows, source="orders")

print(report.status)          # BLOCK
print(report.health_pct)      # 34.0%
print(report.type_mismatches) # ["amount"]
print(report.null_rates)      # {"email": 0.67}
print(report.summary())
# 🚨 BLOCK | Health: 34.0% | Rows: 3 | Type mismatches: amount | Null rate: email=67% | (7ms)

What gets detected

The engine runs a single-pass column analysis on a deterministically-sampled subset of your rows. Every check is computed in-memory — no data is written anywhere.

Column-level checks (per field, per batch)

Check What it catches Default threshold
Null rate Fields with too many missing values WARN ≥ 30%, BLOCK ≥ 70%
Type mismatch Fields where values aren't a consistent type WARN ≥ 5%, BLOCK ≥ 20%
Empty string rate Fields full of "" instead of null WARN ≥ 30%, BLOCK ≥ 60%
Duplicate rate Cardinality collapse — rows repeating unexpectedly WARN > 10%
Outliers (IQR) Numeric values beyond 1.5× interquartile range Reported
Percentiles p25 / p50 / p75 / p95 for every numeric field Reported
Distinct count Approximate unique values via HyperLogLog (±2%) Reported
Enum tracking Low-cardinality string fields tracked for new values Reported
Timestamp detection ISO 8601 / date fields auto-detected Reported
Timestamp staleness Most recent timestamp older than expected WARN ≥ 24h, BLOCK ≥ 72h

Drift detection (compared against your baseline)

After the first batch, every subsequent batch is compared against your stored schema and baselines:

Drift kind What triggers it Severity
field_added New field not in previous schema WARN
field_removed Known field missing from this batch WARN
type_changed Field type changed (e.g. numberstring) BLOCK
null_spike Null rate increased >20% from baseline WARN / BLOCK
empty_string_spike Empty string rate spiked WARN / BLOCK
new_enum_value New value appeared in a low-cardinality field WARN
row_count_anomaly Batch size deviates >3× from historical average WARN / BLOCK
timestamp_stale Most recent timestamp is unexpectedly old WARN / BLOCK

Verdict logic

Any BLOCK-severity drift event       → BLOCK
Health score < 0.5                   → BLOCK
Health score < 0.8 or any WARN event → WARN
Everything clean                     → PASS

Full response structure

{
  "status": "BLOCK",
  "health_score": 0.34,
  "decision": {
    "action": "BLOCK",
    "reason": "Type mismatch in: 'amount'; High null rate in 'email' (67%)"
  },
  "schema": {
    "order_id": { "type": "string",  "confidence": 1.0 },
    "amount":   { "type": "number",  "confidence": 0.67 },
    "email":    { "type": "string",  "confidence": 1.0 }
  },
  "schema_fingerprint": "a3f8c2...",
  "drift": [
    {
      "field": "user_age",
      "kind": "field_added",
      "severity": "warn",
      "detail": "New field \"user_age\" (type: number) not in previous schema"
    }
  ],
  "issues": {
    "type_mismatches": {
      "amount": {
        "expected": "number",
        "found": ["string"],
        "sample_value": "broken",
        "rate": 0.33,
        "severity": "critical"
      }
    },
    "null_rates": {
      "email": { "actual": 0.67, "threshold": 0.3, "severity": "critical" }
    }
  },
  "stats": {
    "rows_received": 3,
    "rows_sampled": 3,
    "sample_ratio": 1.0,
    "sample_version": "v2",
    "source": "orders"
  },
  "latency_ms": 7,
  "timestamp": "2025-06-01T09:14:22.000Z"
}

Response headers also carry key signals for lightweight pipeline integration:

X-DataScreenIQ-Status:   BLOCK
X-DataScreenIQ-Health:   0.34
X-DataScreenIQ-Latency:  7ms
X-RateLimit-Plan:        developer
X-RateLimit-Remaining:   498234

Pipeline integration

Block bad data from reaching your database

from datascreeniq.exceptions import DataQualityError

try:
    client.screen(rows, source="orders").raise_on_block()
    load_to_warehouse(rows)                    # only runs on PASS or WARN

except DataQualityError as e:
    send_to_dead_letter_queue(rows)
    alert_team(f"Pipeline blocked: {e.report.summary()}")

Apache Airflow

from airflow.decorators import task
import datascreeniq as dsiq

@task
def quality_gate(rows: list, source: str) -> dict:
    report = dsiq.Client().screen(rows, source=source)
    if report.is_blocked:
        raise ValueError(f"Data quality gate failed: {report.summary()}")
    return report.to_dict()

Prefect

from prefect import flow, task
import datascreeniq as dsiq

@task
def screen_data(rows, source):
    dsiq.Client().screen(rows, source=source).raise_on_block()

@flow
def etl_pipeline():
    rows = extract_from_source()
    screen_data(rows, source="orders")   # raises DataQualityError if BLOCK
    load_to_warehouse(rows)

pandas DataFrame

import pandas as pd
import datascreeniq as dsiq

df = pd.read_csv("orders.csv")
report = dsiq.Client().screen_dataframe(df, source="orders")
print(report.summary())

dbt post-hook

import pandas as pd
import datascreeniq as dsiq

def screen_dbt_model(model_name: str, conn):
    df = pd.read_sql(f"SELECT * FROM {model_name} LIMIT 10000", conn)
    dsiq.Client().screen_dataframe(df, source=model_name).raise_on_block()

CSV / Excel / JSON / XML files

report = client.screen_file("orders.csv",  source="orders")
report = client.screen_file("orders.xlsx", source="orders", sheet=0)  # requires [excel]
report = client.screen_file("events.json", source="events")
report = client.screen_file("feed.xml",    source="feed")

CSV via raw HTTP (no SDK)

curl -X POST https://api.datascreeniq.com/v1/screen \
  -H "X-API-Key: YOUR_KEY" \
  -H "Content-Type: text/csv" \
  -H "X-Source: orders" \
  --data-binary @orders.csv

Large files — auto chunking

Files over 10,000 rows are automatically split and screened in parallel. Results are merged into a single ScreenReport:

# 1M-row file — runs as parallel batches, one merged result
report = client.screen_file("events.csv", source="events")
print(f"Screened {report.rows_received:,} rows in {report.latency_ms}ms")

Custom thresholds

Override the defaults per request:

report = client.screen(
    rows,
    source="orders",
    options={
        "thresholds": {
            "null_rate_warn":       0.1,   # warn if >10% nulls (default: 0.3)
            "null_rate_block":      0.5,   # block if >50% nulls (default: 0.7)
            "type_mismatch_warn":   0.01,  # warn if >1% type mismatches (default: 0.05)
            "type_mismatch_block":  0.1,   # block if >10% (default: 0.2)
            "health_block":         0.6,   # block if health score < 0.6 (default: 0.5)
            "health_warn":          0.9,   # warn if health score < 0.9 (default: 0.8)
        }
    }
)

The ScreenReport object

# Verdict
report.status           # "PASS" | "WARN" | "BLOCK"
report.is_pass          # bool
report.is_warn          # bool
report.is_blocked       # bool
report.health_score     # float 0.0 – 1.0
report.health_pct       # "94.5%"

# Issues (from actual response fields)
report.issues           # full issues dict
report.type_mismatches  # list of field names with type problems
report.null_rates       # dict of field → null rate (only fields above threshold)
report.outlier_fields   # list of field names with outliers

# Schema drift
report.drift            # list of DriftEvent dicts
report.drift_count      # int
report.has_drift        # bool

# Sampling metadata (auditable)
report.rows_received    # int — total rows in your batch
report.rows_sampled     # int — rows actually analysed
report.sample_ratio     # float — fraction sampled
report.sample_version   # "v2" — sampling strategy version
report.latency_ms       # int
report.batch_id         # str (uuid, same as request_id)
report.timestamp        # ISO 8601 string

# Output
report.summary()        # human-readable one-liner
report.to_dict()        # full API response as dict

Error handling

from datascreeniq.exceptions import (
    AuthenticationError,   # invalid or missing API key
    PlanLimitError,        # monthly row limit exceeded — response includes upgrade_url
    RateLimitError,        # too many concurrent requests
    ValidationError,       # bad payload (missing source, empty rows, >100K rows)
    APIError,              # unexpected server error
    DataQualityError,      # raised by .raise_on_block() — has .report attribute
)

try:
    report = client.screen(rows, source="orders")
except AuthenticationError:
    print("Invalid API key — check DATASCREENIQ_API_KEY")
except PlanLimitError as e:
    print(f"Monthly limit reached — upgrade at {e.upgrade_url}")
except ValidationError as e:
    print(f"Bad payload: {e}")   # e.g. rows > 100,000 limit

Configuration

# Recommended: environment variable
export DATASCREENIQ_API_KEY="dsiq_live_..."
client = dsiq.Client()              # reads DATASCREENIQ_API_KEY from env
client = dsiq.Client("dsiq_live_...") # explicit key
client = dsiq.Client(timeout=10)    # custom timeout in seconds (default: 30)

Privacy

DataScreenIQ runs on Cloudflare Workers — a serverless edge runtime with no filesystem access. Your raw payload is processed entirely in-memory and physically cannot be written to disk at the edge layer.

What we store (permanently): schema fingerprints (SHA-256 hashes), null rates, type distributions, and quality scores — aggregated statistics only. No row-level data, no field values, no PII, ever.

Full privacy architecture


Pricing

Plan Price Rows / month
Developer Free 500K
Starter $19/mo 5M
Growth $79/mo 50M
Scale $199/mo 500M+

Get a free API key →


Requirements

  • Python 3.8+
  • requests (auto-installed)
  • pandas — optional, for screen_dataframe()
  • openpyxl — optional, for Excel files

See also

Questions → app@datascreeniq.com or open an issue


License

MIT © DataScreenIQ

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

datascreeniq-1.0.5.tar.gz (21.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

datascreeniq-1.0.5-py3-none-any.whl (15.4 kB view details)

Uploaded Python 3

File details

Details for the file datascreeniq-1.0.5.tar.gz.

File metadata

  • Download URL: datascreeniq-1.0.5.tar.gz
  • Upload date:
  • Size: 21.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.14.3

File hashes

Hashes for datascreeniq-1.0.5.tar.gz
Algorithm Hash digest
SHA256 b709764a64c2180dcce1e1f4c964249f252abca41e601a7dc262ca75d787bced
MD5 f12e47a8ac284f643255f08f6b7d7dde
BLAKE2b-256 9b06093c45e0842abad13ce2f670062e747121c5ea6ce1d932597eafbc95570c

See more details on using hashes here.

File details

Details for the file datascreeniq-1.0.5-py3-none-any.whl.

File metadata

  • Download URL: datascreeniq-1.0.5-py3-none-any.whl
  • Upload date:
  • Size: 15.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.14.3

File hashes

Hashes for datascreeniq-1.0.5-py3-none-any.whl
Algorithm Hash digest
SHA256 27d18bc5800380ca661364af9c312956fe150b9697fc7e200ce2ce7633c3f0fd
MD5 d759aa8fae64faa0138aea0c59a0a944
BLAKE2b-256 053bab4ae4ad53a5a1a0610dbf160dc06a34b826c6ce4141a204f03778392bc0

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page