Skip to main content

Data Cost Protocol for Python — universal dict-to-positional-array encoder for LLM context compression

Project description

dcp-py

Data Cost Protocol for Python — universal dict → positional array encoder for LLM context compression.

Any structured data flowing into an LLM context window can be DCP-encoded: RAG chunk metadata, SQL query results, log streams, API responses, sensor readings. Schema once, position always, keys never repeated.

Problem

Every time structured data enters an LLM as natural language, you pay for redundant key names that the model doesn't need.

RAG:  "Source: docs/auth.md\nPage: 12\nScore: 0.92\n..."   → keys repeated per chunk
SQL:  {"id":1,"name":"Alice","dept":"Eng","salary":90000}  → keys at every row
Logs: "Error in auth-service at 2024-03-24: timeout"       → parsing requires inference

Solution

Define a schema once. Write data by position. Strip everything the consumer doesn't need.

Schema:  ["$S","rag-chunk-meta:v1",5,"source","page","section","score","chunk_index"]
Data:    ["docs/auth.md","-","-",0.92,3]

Absent fields use "-" (single token, unambiguous). Fields absent across the entire batch are dropped via bitmask cutdown.

Metadata reduction: 40-60%. Total RAG prompt reduction: 10-15% (chunk text is untouched).

Quick Start

RAG / Vector DB

from dcp_py.core.encoder import DcpEncoder

# 1-line preset for supported Vector DBs
encoder = DcpEncoder.from_preset("pinecone")

# Preset + custom field path overrides
encoder = DcpEncoder.from_preset("qdrant", overrides={
    "section": "payload.heading_text",
})

# Full custom mapping — any DB, any metadata structure
encoder = DcpEncoder(schema="rag-chunk-meta:v1", mapping={
    "source": "metadata.file_path",
    "page":   "metadata.page_num",
    "section": "metadata.heading",
    "score":  "score",
    "chunk_index": "metadata.idx",
})

# Encode search results
batch = encoder.encode(search_results, texts=chunk_texts)
print(batch.to_string())

pandas DataFrame

from dcp_py.core.encoder import DcpEncoder

encoder, batch = DcpEncoder.from_dataframe(df, domain="query-result")
print(batch.to_string())
# ["$S","query-result:v1",3,"name","dept","salary"]
# ["Alice","Eng",90000]
# ["Bob","Sales",75000]

# Exclude columns
encoder, batch = DcpEncoder.from_dataframe(
    df, domain="query-result", exclude=["internal_id", "embedding"]
)

Framework Integration

# LlamaIndex — node_postprocessor
from dcp_py.adapters.llamaindex import DcpNodePostprocessor

query_engine = index.as_query_engine(
    node_postprocessors=[DcpNodePostprocessor.from_preset("pinecone")]
)

# LangChain — LCEL pipe
from dcp_py.adapters.langchain import DcpRunnable

chain = retriever | DcpRunnable.from_preset("qdrant") | prompt | llm

# Haystack — pipeline component
from dcp_py.adapters.haystack import DcpComponent

pipeline.add_component("dcp", DcpComponent.from_preset("weaviate"))
pipeline.connect("retriever", "dcp")
pipeline.connect("dcp", "prompt_builder")

Any Domain

The core (DcpSchema, FieldMapping, DcpEncoder) has zero RAG-specific code. Schema + mapping = encoder for anything.

Log streams

from dcp_py.core.schema import DcpSchema
from dcp_py.core.encoder import DcpEncoder

schema = DcpSchema.from_dict({
    "$dcp": "schema",
    "id": "log-entry:v1",
    "fields": ["ts", "level", "service", "msg"],
    "fieldCount": 4,
    "types": {
        "ts":      {"type": "number"},
        "level":   {"type": "string", "enum": ["debug", "info", "warn", "error"]},
        "service": {"type": "string"},
        "msg":     {"type": "string"},
    }
})

encoder = DcpEncoder(schema=schema, mapping={
    "ts":      "timestamp",
    "level":   "level",
    "service": "service_name",
    "msg":     "message",
})

batch = encoder.encode(log_entries)
# ["$S","log-entry:v1",4,"ts","level","service","msg"]
# [1711284600,"error","auth","connection timeout"]

SQL / API results

# DataFrame (schema auto-inferred from columns)
encoder, batch = DcpEncoder.from_dataframe(cursor_df, domain="sales-report")

# Manual schema for fixed API shape
schema = DcpSchema.from_dict({
    "$dcp": "schema",
    "id": "api-response:v1",
    "fields": ["status", "latency_ms", "endpoint", "method"],
    "fieldCount": 4,
    "types": {
        "status":     {"type": "number"},
        "latency_ms": {"type": "number"},
        "endpoint":   {"type": "string"},
        "method":     {"type": "string", "enum": ["GET","POST","PUT","DELETE"]},
    }
})

Absent Values

Fields with None values appear as "-" in the positional array — single token, unambiguous, log/CSV tradition.

# page is None → encoded as "-"
["docs/auth.md", "-", "JWT Config", 0.92, 3]

Fields absent across the entire batch are dropped via bitmask cutdown — they don't appear in the row at all:

# Only source + score present in all records → 2-field cutdown schema
["$S","rag-chunk-meta:v1#12",2,"source","score"]
["docs/auth.md",0.92]

Shadow Levels

Control how much schema context accompanies the data:

Level Form When
L0 field names only lightweight models, no $S parsing
L1 $S + schema ID capable agents after first contact
L2 $S + ID + count + fields first contact (default)
L3 full schema definition new consumer, education
L4 natural language fallback no structured parsing capability
batch = encoder.encode(records, shadow_level=1)  # abbreviated after first contact

Grouping

$G grouping is not included. If you want to group rows by a shared field (e.g. all chunks from the same source document), do it in your own pipeline before passing to the prompt builder:

from itertools import groupby

records_by_source = groupby(sorted(records, key=lambda r: r["source"]), key=lambda r: r["source"])
for source, group in records_by_source:
    batch = encoder.encode(list(group), texts=...)
    prompt += f"--- {source} ---\n{batch.to_string()}\n"

Writing Custom Schemas

{
  "$dcp": "schema",
  "id": "your-domain:v1",
  "description": "optional",
  "fields": ["field1", "field2", "field3"],
  "fieldCount": 3,
  "types": {
    "field1": { "type": "string" },
    "field2": { "type": "number", "min": 0, "max": 1 },
    "field3": { "type": ["string", "null"] }
  },
  "origin": {
    "source": "your-api/endpoint",
    "direction": "output"
  }
}

origin is optional metadata about the data stream:

  • source: free-form stream identifier ("tavily/search", "sensor/gyro", "agent/receptor")
  • direction: "input" / "output" / "bidirectional" (default)

Schema Generator

Infer schema automatically from data samples:

from dcp_py.core.generator import SchemaGenerator

gen = SchemaGenerator()
draft = gen.from_samples(
    samples=[row1, row2, row3],
    domain="my-domain",
    exclude=["internal_id", "embedding_vector"],
)

print(draft.report)       # type inference + enum candidates
draft.save("schemas/my-domain.v1.json")
encoder = draft.to_encoder()

Architecture

Layer 0: DcpSchema        ← Schema definition (fields, types, validation)  [universal]
Layer 1: FieldMapping      ← source key → DCP field (dot-notation paths)   [universal]
Layer 2: Preset            ← Per-source defaults (Pinecone, Qdrant, ...)    [domain]
Layer 3: Adapter           ← Per-framework (LlamaIndex, LangChain, ...)     [domain]
dcp_py/
  core/
    schema.py        ← DcpSchema, SchemaRegistry, FieldType
    mapping.py       ← FieldMapping: dot-notation path resolver
    encoder.py       ← DcpEncoder: $S header, cutdown, DataFrame support
    generator.py     ← SchemaGenerator: infer schema from samples
    controller.py    ← OutputController: place LLM key-value output into DCP
    presets/
      rag/           ← Vector DB presets (pinecone, qdrant, weaviate, chroma, milvus)
  adapters/
    llamaindex.py    ← DcpNodePostprocessor
    langchain.py     ← DcpRunnable
    haystack.py      ← DcpComponent
    azure.py         ← Azure AI Search Custom Skill
  schemas/           ← Built-in DCP schema definitions

Vector DB Presets

DB Response structure Key
Pinecone { score, metadata: { ... } } DcpEncoder.from_preset("pinecone")
Qdrant { score, payload: { ... } } DcpEncoder.from_preset("qdrant")
Weaviate { _additional: { score }, properties: { ... } } DcpEncoder.from_preset("weaviate")
Chroma { distance, metadata: { ... } } DcpEncoder.from_preset("chroma")
Milvus { distance, entity: { ... } } DcpEncoder.from_preset("milvus")

Install

pip install dcp-py

# With framework extras
pip install "dcp-py[langchain]"
pip install "dcp-py[llamaindex]"
pip install "dcp-py[haystack]"
pip install "dcp-py[azure]"

Related

License

Apache License 2.0


Designed by Hiatama Workshop

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dcp_py-0.3.0.tar.gz (69.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

dcp_py-0.3.0-py3-none-any.whl (38.9 kB view details)

Uploaded Python 3

File details

Details for the file dcp_py-0.3.0.tar.gz.

File metadata

  • Download URL: dcp_py-0.3.0.tar.gz
  • Upload date:
  • Size: 69.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.13

File hashes

Hashes for dcp_py-0.3.0.tar.gz
Algorithm Hash digest
SHA256 8f6794a76b92750e72ccc69ec6efe5f6729864bbfedfc010bb503f3bfcde10d8
MD5 d5cfeff6e13cdcdc557b305f76d32243
BLAKE2b-256 784ca6b2a2f238f0b64486a9e6473ca4933e5bf817b05fde8829e65a02cfd7dd

See more details on using hashes here.

File details

Details for the file dcp_py-0.3.0-py3-none-any.whl.

File metadata

  • Download URL: dcp_py-0.3.0-py3-none-any.whl
  • Upload date:
  • Size: 38.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.13

File hashes

Hashes for dcp_py-0.3.0-py3-none-any.whl
Algorithm Hash digest
SHA256 7e9d7023f1d363a706f74f57c69e8d608eaef83981436ba1c3c99ccbda6660de
MD5 7dfdf495b9f2f3e8cc657713a562ba02
BLAKE2b-256 0b5a82d5e830c3c69f7ad768f7557b2fb4216da40c4111c3bebbccf261978cdf

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page