PydanticAI integration for Cloudflare's AI stack — Workers AI, Browser Run, Vectorize, D1, AI Gateway
Project description
pydantic-ai-cloudflare
The Python toolkit for AI agents, structured output, RAG, and knowledge graphs — powered by Cloudflare.
pip install pydantic-ai-cloudflare
What you can do with this library
# 1. AI agents with structured output (all 6 Workers AI models)
from pydantic_ai_cloudflare import cf_structured_sync
result = cf_structured_sync("Analyze this company", MySchema, model="@cf/qwen/qwen3-30b-a3b-fp8")
# 2. Browse the web, extract structured data
from pydantic_ai_cloudflare import BrowserRunToolset
tools = BrowserRunToolset()
data = await tools._extract("https://example.com", "Extract pricing plans")
# 3. RAG in 4 lines (managed or DIY)
from pydantic_ai_cloudflare import KnowledgeBase
kb = KnowledgeBase("my-docs")
answer = await kb.ask("How does caching work?")
# 4. Knowledge graphs for ML feature engineering
from pydantic_ai_cloudflare import EntityGraph, profile_data
kg = EntityGraph()
dd = profile_data(records, id_column="account_id") # auto-classifies 25 columns
await kg.build_from_records(records, data_dict=dd) # 2000 rows → 4321 nodes, 52K edges
features = kg.to_feature_dicts() # 29 ML features per entity
recs = kg.recommend("Acct A", ["products"]) # peer-based recommendations
# 5. Zero-config observability (every LLM call logged via AI Gateway)
from pydantic_ai_cloudflare import GatewayObservability
logs = await GatewayObservability().get_logs()
At a glance
| Capability | What it does | Free tier? |
|---|---|---|
cf_structured() |
Complex nested Pydantic schemas → validated output on ALL Workers AI models | Yes |
BrowserRunToolset |
Browse, scrape, extract, crawl any website (headless Chrome on edge) | Yes |
KnowledgeBase |
Managed RAG with hybrid search + BM25 + reranking | Yes |
DIYKnowledgeBase |
Ingest URLs/files/folders → chunk → embed → Vectorize → search + rerank | Yes |
EntityGraph |
Peer-adoption features + entity relationships from tabular data | Local |
cloudflare_agent() |
One-liner PydanticAI agent wired to Workers AI | Yes |
D1MessageHistory |
Conversation persistence across sessions | Yes |
GatewayObservability |
Auto-logging, cost tracking, analytics for every LLM call | Yes |
list_models() |
Browse 9 Workers AI models, get recommendations by task | — |
What Cloudflare Already Has
Cloudflare provides a complete AI infrastructure stack — all with free tiers:
┌─────────────────────────────────────────────────────────────────────┐
│ CLOUDFLARE AI INFRASTRUCTURE │
├─────────────────┬───────────────────┬───────────────────────────────┤
│ │ │ │
│ ┌───────────┐ │ ┌─────────────┐ │ ┌──────────────────────────┐ │
│ │Workers AI │ │ │ Browser Run │ │ │ AI Gateway │ │
│ │ │ │ │ │ │ │ │ │
│ │ 20+ LLMs │ │ │ Headless │ │ │ Logging · Analytics │ │
│ │ Embedding │ │ │ Chrome on │ │ │ Cost tracking · Cache │ │
│ │ Free tier │ │ │ the edge │ │ │ Rate limiting │ │
│ └───────────┘ │ └─────────────┘ │ └──────────────────────────┘ │
│ │ │ │
│ ┌───────────┐ │ ┌─────────────┐ │ ┌──────────────────────────┐ │
│ │ Vectorize │ │ │ D1 │ │ │ R2 │ │
│ │ │ │ │ │ │ │ │ │
│ │ Vector │ │ │ Serverless │ │ │ Object storage │ │
│ │ database │ │ │ SQLite │ │ │ Zero egress fees │ │
│ │ for RAG │ │ │ 5GB free │ │ │ 10GB free │ │
│ └───────────┘ │ └─────────────┘ │ └──────────────────────────┘ │
│ │ │ │
└─────────────────┴───────────────────┴───────────────────────────────┘
The problem: There's no Python SDK that connects PydanticAI to any of this. Until now.
What This Library Does
┌──────────────────────────────────────────────────────────────────────┐
│ pydantic-ai-cloudflare │
│ │
│ ┌──────────────┐ ┌──────────────────┐ ┌───────────────────────┐ │
│ │ │ │ │ │ │ │
│ │ cloudflare_ │ │ BrowserRun │ │ VectorizeToolset │ │
│ │ agent() │ │ Toolset │ │ │ │
│ │ │ │ │ │ search_knowledge() │ │
│ │ One-liner │ │ browse() │ │ store_knowledge() │ │
│ │ agent │ │ extract() │ │ │ │
│ │ factory │ │ crawl() │ │ Workers AI embeddings │ │
│ │ │ │ scrape() │ │ + Vectorize storage │ │
│ └──────┬───────┘ │ discover_links() │ └───────────┬───────────┘ │
│ │ │ screenshot() │ │ │
│ │ └────────┬─────────┘ │ │
│ │ │ │ │
│ ┌──────┴───────────────────┴────────────────────────┴───────────┐ │
│ │ │ │
│ │ CloudflareProvider ────────→ Workers AI ──→ AI Gateway │ │
│ │ (auto AI Gateway routing, response normalization, │ │
│ │ model profiles for all Workers AI model families) │ │
│ │ │ │
│ └───────────────────────────────────────────────────────────────┘ │
│ │
│ ┌───────────────────┐ ┌───────────────────┐ ┌─────────────────┐ │
│ │ D1MessageHistory │ │ GatewayObserv. │ │ Schema Utils │ │
│ │ │ │ │ │ │ │
│ │ Conversation │ │ get_logs() │ │ simplify_schema │ │
│ │ persistence │ │ get_analytics() │ │ schema_stats() │ │
│ │ across sessions │ │ add_feedback() │ │ extract_json() │ │
│ └───────────────────┘ └───────────────────┘ └─────────────────┘ │
│ │
└──────────────────────────────────────────────────────────────────────┘
Components
| Component | What it does | Cloudflare Service |
|---|---|---|
cloudflare_agent() |
One-liner agent factory with sensible defaults | All |
cloudflare_model() |
LLM inference with auto response normalization | Workers AI |
BrowserRunToolset |
6 web interaction tools for agents | Browser Run |
VectorizeToolset |
RAG search + store (DIY) | Vectorize |
AISearchToolset |
Managed RAG search + chat | AI Search |
CloudflareEmbeddingModel |
Text embeddings | Workers AI |
D1MessageHistory |
Conversation persistence | D1 |
GatewayObservability |
Logs, cost, analytics, feedback | AI Gateway |
list_models() / recommend_model() |
Model discovery + recommendations | — |
cf_structured() |
Complex structured output that works on ALL models | Workers AI |
simplify_schema() / schema_stats() |
Schema optimization for reliability | — |
What we handle that's hard
Workers AI has quirks that break naive integrations. This library handles them:
- Dict content responses — Workers AI returns
contentas a parsed dict instead of a JSON string. We normalize it. - Markdown code fences — Models wrap JSON in
```json ... ```. We strip them. - Prose-wrapped JSON — Models add "Here's the JSON:" before the actual JSON. We extract it.
- Model-specific structured output — Each model family needs a different strategy (tool calling vs json_object vs guided_json). Our profiles handle this automatically.
- Schema simplification — Large schemas (9K+ chars) overwhelm models.
simplify_schema()strips descriptions and defaults (65% reduction) while keeping the structure valid.
Quick Start
1. Set up Cloudflare credentials
# Get your Account ID from https://dash.cloudflare.com (right sidebar)
export CLOUDFLARE_ACCOUNT_ID="your-account-id"
# Create an API token at https://dash.cloudflare.com/profile/api-tokens
# Permissions: Workers AI → Read, Browser Rendering → Edit
export CLOUDFLARE_API_TOKEN="your-api-token"
What each feature needs
| Feature | Token Permission | CF Resource Needed | How to Create |
|---|---|---|---|
cloudflare_agent() |
Workers AI Read | None | — |
cf_structured() |
Workers AI Read | None | — |
BrowserRunToolset |
Browser Rendering Edit | None | — |
VectorizeToolset |
Vectorize Edit | A Vectorize index | npx wrangler vectorize create NAME --dimensions 768 --metric cosine |
AISearchToolset |
AI Search Edit + Run | An AI Search instance | Dashboard → AI → AI Search → Create |
CloudflareEmbeddingModel |
Workers AI Read | None | — |
D1MessageHistory |
D1 Edit | A D1 database | npx wrangler d1 create NAME |
GatewayObservability |
AI Gateway Read | None (auto-created) | — |
Start with just Workers AI Read + Browser Rendering Edit. Add more as you need them.
2. Install
pip install pydantic-ai-cloudflare
3. Use
from pydantic_ai_cloudflare import cloudflare_agent
# Plain text
agent = cloudflare_agent()
result = agent.run_sync("What is Cloudflare?")
print(result.output)
# Structured output
from pydantic import BaseModel
class City(BaseModel):
name: str
country: str
population: int
agent = cloudflare_agent(output_type=City)
result = agent.run_sync("Tell me about Tokyo")
print(result.output.name) # "Tokyo"
print(result.output.population) # 13900000
# With web browsing
agent = cloudflare_agent(web=True)
result = agent.run_sync("What's on cloudflare.com/plans?")
# With RAG
agent = cloudflare_agent(web=True, rag="my-knowledge-base")
# Specific model
agent = cloudflare_agent(model="@cf/qwen/qwen3-30b-a3b")
Code Mode with Monty
Monty is PydanticAI's sandboxed Python interpreter. Instead of the LLM making 10 sequential tool calls (10 round-trips), it writes one Python script that calls your tools in parallel. Monty executes it safely in <1μs.
┌──────────────────────────────────────────────────────────────────┐
│ WITHOUT Code Mode │
│ │
│ LLM call 1 → browse(cloudflare.com/plans) → wait for result │
│ LLM call 2 → browse(aws.amazon.com/pricing) → wait for result │
│ LLM call 3 → extract(cloudflare.com/plans) → wait for result │
│ LLM call 4 → extract(aws.amazon.com/pricing) → wait for result │
│ LLM call 5 → compare results → wait for result │
│ LLM call 6 → generate report → final answer │
│ │
│ Total: 6 LLM round-trips, ~30 seconds │
└──────────────────────────────────────────────────────────────────┘
┌──────────────────────────────────────────────────────────────────┐
│ WITH Code Mode (Monty) │
│ │
│ LLM call 1 → writes Python: │
│ ┌──────────────────────────────────────────────────┐ │
│ │ cf, aws = await asyncio.gather( │ │
│ │ browse("cloudflare.com/plans"), │ │
│ │ browse("aws.amazon.com/pricing"), │ │
│ │ ) │ │
│ │ cf_data = await extract(cf, "pricing plans") │ │
│ │ aws_data = await extract(aws, "pricing plans") │ │
│ │ return compare(cf_data, aws_data) │ │
│ └──────────────────────────────────────────────────┘ │
│ Monty executes it (<1μs) → tools run in parallel → done │
│ │
│ Total: 1-2 LLM round-trips, ~10 seconds │
└──────────────────────────────────────────────────────────────────┘
pip install 'pydantic-ai-harness[code-mode]'
from pydantic_ai_harness import CodeMode
from pydantic_ai_cloudflare import cloudflare_agent
agent = cloudflare_agent(
web=True,
capabilities=[CodeMode()],
)
result = agent.run_sync(
"Compare pricing on cloudflare.com/plans and aws.amazon.com/lambda/pricing"
)
The LLM writes Python, Monty executes it in a sandbox, your tools (Browser Run, Vectorize, etc.) run on Cloudflare's edge. Best of both worlds.
Model Discovery
Don't know which Workers AI model to use? Let the library recommend one:
from pydantic_ai_cloudflare import list_models, recommend_model
# Browse the catalog
for m in list_models():
print(f"{m['name']}: {m['context']} context, {m['speed']}")
# Llama 3.3 70B: 128K context, fast
# Qwen 3 30B: 128K context, fast
# Kimi K2.6: 256K context, medium
# ...
# Filter by capability
list_models(capability="reasoning") # → Qwen 3, Kimi, DeepSeek R1, ...
list_models(capability="vision") # → Gemma 4, Llama 3.2 Vision
# Get a recommendation
recommend_model(task="reasoning") # → Qwen 3 30B
recommend_model(task="vision") # → Gemma 4 26B
recommend_model(schema_size="large") # → Kimi K2.6 (256K context)
recommend_model(speed="very_fast") # → Llama 3.1 8B
Web Browsing
from pydantic_ai_cloudflare import cloudflare_agent
agent = cloudflare_agent(web=True)
result = agent.run_sync("Summarize the Cloudflare Workers AI docs page")
The agent has 6 tools:
| Tool | What it does | Use case |
|---|---|---|
browse |
Fetch page as markdown | Read any webpage |
extract |
AI-powered JSON extraction | Pull structured data from a page |
crawl |
Crawl entire sites | Build knowledge bases |
scrape |
CSS selector extraction | Grab specific elements |
discover_links |
Find all links | Explore a site |
screenshot |
Capture PNG | Visual QA |
KnowledgeBase — 4-Line RAG
Path 1: Managed (recommended)
from pydantic_ai_cloudflare import KnowledgeBase
# Uses AI Search: hybrid search (semantic + BM25) + reranking + query rewriting
kb = KnowledgeBase("my-docs") # AI Search instance name
# Search with hybrid retrieval + cross-encoder reranking
results = await kb.search("How does caching work?")
# Or get an AI-generated answer with citations
answer = await kb.ask("How does caching work?")
Prereq: Create an AI Search instance in the dashboard (AI → AI Search). Point it at an R2 bucket or website. Done.
Path 2: DIY (full control)
from pydantic_ai_cloudflare import DIYKnowledgeBase
# You control: chunking, embedding model, reranker, metadata
kb = DIYKnowledgeBase("my-vectors")
# Ingest: fetches URLs via Browser Run, chunks, embeds, stores in Vectorize
await kb.ingest([
"https://docs.example.com/getting-started",
"https://docs.example.com/api-reference",
])
# Search: embed query → Vectorize → rerank with bge-reranker-base
results = await kb.search("How do I authenticate?", rerank=True)
Prereq: npx wrangler vectorize create my-vectors --dimensions 768 --metric cosine
What the retrieval pipeline does
Path 1 (KnowledgeBase):
Query → Query rewriting (LLM) → Embed → Vector search
↘
BM25 keyword search
↓
RRF Fusion
↓
Cross-encoder reranking (bge-reranker-base)
↓
Relevance boosting (by metadata)
↓
Top K results
Path 2 (DIYKnowledgeBase):
Query → Embed (bge-base-en-v1.5) → Vectorize search (top 20)
↓
Cross-encoder reranking (bge-reranker-base)
↓
Top 5 results
RAG with Vectorize (low-level)
npx wrangler vectorize create my-docs --dimensions 768 --metric cosine
from pydantic_ai_cloudflare import cloudflare_agent
agent = cloudflare_agent(
web=True,
rag="my-docs",
system_prompt="Browse pages, store findings, answer from knowledge base.",
)
Full pipeline: Browser Run → Workers AI embeddings → Vectorize → Workers AI
AI Search (Managed RAG)
If you don't want to manage embeddings and Vectorize yourself, use AI Search -- Cloudflare's fully-managed RAG. Point it at an R2 bucket or website, and it handles chunking, embedding, indexing, and search.
Create an instance in the dashboard: AI → AI Search → Create
from pydantic_ai_cloudflare import cloudflare_agent, AISearchToolset
agent = cloudflare_agent(
toolsets=[AISearchToolset(instance_name="my-docs")],
)
result = agent.run_sync("What does our documentation say about caching?")
The agent gets two tools: search (returns relevant chunks) and ask (returns an AI-generated answer with citations).
Conversation Persistence
npx wrangler d1 create my-chat-db
from pydantic_ai_cloudflare import cloudflare_agent, D1MessageHistory
agent = cloudflare_agent()
history = D1MessageHistory(database_id="your-d1-uuid")
messages = await history.get_messages("session-123")
result = await agent.run("Follow up question", message_history=messages)
await history.save_messages("session-123", result.all_messages())
Observability
Every LLM call through cloudflare_agent() is logged via AI Gateway automatically. Query programmatically:
from pydantic_ai_cloudflare import GatewayObservability
obs = GatewayObservability()
logs = await obs.get_logs(limit=10)
await obs.add_feedback(logs[0]["id"], score=95, feedback=1)
Or just check dash.cloudflare.com → AI → AI Gateway.
Schema Utilities
For complex Pydantic models, check reliability before running:
from pydantic_ai_cloudflare import schema_stats, simplify_schema
stats = schema_stats(MyComplexModel)
# {'total_chars': 9066, 'simplified_chars': 3200, 'reduction': '65%',
# 'field_count': 26, 'nested_model_count': 9,
# 'recommendation': 'Large -- may need retries...'}
Complex Structured Output — cf_structured()
PydanticAI's built-in structured output uses tool calling, which breaks on Workers AI for complex schemas (null arguments, malformed retries). cf_structured() bypasses this and calls Workers AI directly with the same approach as langchain-cloudflare:
from pydantic_ai_cloudflare import cf_structured_sync
result = cf_structured_sync(
"Research report on NovaPay, a payment processing startup",
CompanyReport, # 7 nested models, Literal types, lists
model="@cf/qwen/qwen3-30b-a3b-fp8",
)
print(result.company.name) # validated Pydantic object
print(result.next_steps[0]) # NextStep(action=..., priority="HIGH")
How it works:
- Generates + simplifies JSON schema from your Pydantic model
- Injects schema into system prompt with strict formatting instructions
- Sets
response_format: json_objectto force valid JSON - Parses response (handles dict content, markdown fences, prose wrapping)
- Validates against Pydantic
- On failure: retries with error feedback (not via API messages that Workers AI rejects)
Tested on all 6 major Workers AI models with a 7-nested-model schema:
| Model | Complex Schema (7 nested) | Time |
|---|---|---|
| Llama 3.3 70B | Pass | 31s |
| Qwen 3 30B | Pass | 17s |
| Kimi K2.6 | Pass | 55s |
| Gemma 4 26B | Pass | 32s |
| GLM 4.7 Flash | Pass | 24s |
| DeepSeek R1 32B | Pass | 30s |
When to use what:
- Simple schemas (3-5 fields):
cloudflare_agent(output_type=MyModel)works fine - Complex schemas (4+ nested models, Literal types): use
cf_structured()
EntityGraph — Peer-Adoption Features from Tabular Data
Build typed entity graphs from tabular datasets. Each row becomes an entity, column values become nodes, edges represent real relationships. Then extract graph-derived features for ML.
Quick start
from pydantic_ai_cloudflare import EntityGraph, profile_data
# 1. Auto-profile (detects column types)
dd = profile_data(records, id_column="account_id")
# 2. Build graph
kg = EntityGraph()
await kg.build_from_records(records, data_dict=dd)
# 10,000 rows → 28K nodes, 153K edges in 0.25s
# 3. Get ML features
features = kg.to_feature_dicts()
# 22+ features per entity: degree, pagerank, community, Node2Vec, ...
Entity-to-entity relationships
Go beyond bipartite feature graphs. Add direct typed relationships between entities:
# From structured columns (automatic)
await kg.build_from_records(records, data_dict=dd,
relationship_columns={
"primary_competitor": "COMPETES_WITH",
"partner": "PARTNERS_WITH",
"referred_by": "REFERRED_BY",
},
)
# Manually
kg.add_relationship("Cisco", "COMPETES_WITH", "Zscaler")
kg.add_relationship("Acme Corp", "DISPLACED", "Palo Alto")
# LLM-extracted from text columns
await kg.build_from_records(records, data_dict=dd,
extract_relationships=True,
)
# Extracts: (Acme, DISPLACED, Zscaler), (Acme, MIGRATED_TO, Zero Trust), ...
Now you can traverse multi-hop paths: Account → COMPETES_WITH → Zscaler → PROVIDES → Zero Trust.
Auto-canonicalization (LLM-powered)
Collapse entity aliases automatically. "Zscaler/ZS/zscaler inc" → one node.
# LLM groups aliases in one call
alias_map = await kg.auto_canonicalize(records, ["tech_stack", "competitors"])
# → {'ZS': 'Zscaler', 'PAN': 'Palo Alto Networks', 'K8s': 'Kubernetes', ...}
# Or provide manually
kg = EntityGraph(canonical_map={"ZS": "Zscaler", "PAN": "Palo Alto"})
Outcome-aware co-occurrence with lift
# Build with outcome tracking
await kg.build_from_records(records, data_dict=dd, outcome_column="deal_stage")
# Co-occurrence now includes lift + outcome rates
co = kg.co_occurrence_features("products_owned", outcome_filter="Won")
# co["casb"]["waf"] = {
# "p_ba": 0.67, P(WAF|CASB)
# "lift": 2.1, lift > 1.5 = real signal
# "co_count": 45, entities with both
# "co_won": 38, of those, how many won
# "co_rate_won": 0.84 P(WAF|CASB, Won) — outcome-conditional
# }
Edge confidence tiers
# Structured fields get high confidence, LLM-extracted get lower
await kg.build_from_records(records, data_dict=dd,
confidence_map={
"industry": 1.0, # CRM field — certain
"tech_stack": 0.9, # comma-separated — reliable
"competitors": 0.7, # LLM-extracted — noisy
},
)
Temporal decay
await kg.build_from_records(records, data_dict=dd,
temporal_column="created_date", temporal_decay=0.003,
)
# Recent records: weight ≈ 0.95. Two-year-old: weight ≈ 0.05. 18x ratio.
KNN peer adoption (propensity signals)
rates = kg.knn_rate_features(["products_owned"], k=5)
# "4/5 of your graph peers have WAF. You don't." → knn_rate_waf = 0.8
recs = kg.recommend("Account A", ["products_owned"], k=5, min_rate=0.5)
# "Graph found a signal a flat table cannot."
Point-in-time features (no leakage)
build_temporal_dataset() produces feature/label rows where the features at
each snapshot date use ONLY records from on or before that date. Eliminates the
silent training-time leakage that flat-graph builds suffer from.
from pydantic_ai_cloudflare import build_temporal_dataset
X, y = await build_temporal_dataset(
records,
id_column="account_id",
time_column="event_date",
snapshot_dates=["2023-01-01", "2023-04-01", "2023-07-01"],
label_column="purchased_casb_at",
label_horizon_days=180,
target_columns=["products_owned"],
feature_kwargs={
"categorical_columns": ["industry", "geo"],
"list_columns": {"tech": "USES_TECH", "products": "HAS_PRODUCT"},
"extract_entities": False,
},
)
# X[i] = {"entity": "Acme", "snapshot_date": "2023-01-01", **features}
# y[i] = {"entity": "Acme", "snapshot_date": "2023-01-01", "label": 1}
Or for a single snapshot:
kg = EntityGraph()
await kg.build_from_records(
records,
time_column="event_date",
as_of="2024-01-31", # only records on/before this date
...
)
print(kg._snapshot_date) # 2024-01-31T00:00:00
Freeze for production scoring (no feature drift)
Train a model on a snapshot, freeze the graph, then score new records using the SAME topology — guaranteed no feature drift between training and inference.
# Train: build, freeze, save features for the model
await kg.build_from_records(records, time_column="created", as_of="2024-04-01", ...)
kg.freeze(target_columns=["products_owned"], k=5)
X, y = kg.to_ml_dataset("products_owned", target_columns=["products_owned"], k=5)
# train your model on (X, y) ...
kg.save_features("snapshot_2024_04_01.json", target_columns=["products_owned"], k=5)
# Score: load features, freeze, score new records
features = await kg.score_one(new_record)
# Uses frozen peers, frozen target-value catalog
# {"degree": 5, "knn_rate_waf": 0.8, "knn_avg_distance": 0.31, ...}
# add_records() now raises — must unfreeze() to mutate
await kg.score_batch([rec1, rec2, rec3]) # ← parallel-friendly inference
Sentinel-zero filter & IDF similarity (auto-correctness)
Numeric columns where 0 means "missing" (propensity scores, deal counts, etc.) no longer dominate similarity. Detected automatically at build time and excluded from the graph.
# Auto-detection (default): warns at build time, excludes 0s
await kg.build_from_records(records, numeric_columns=["propensity_score", "arr"])
# WARNING: 'propensity_score': 4/6 (67%) values are 0. Treating 0 as missing.
# Manual control
await kg.build_from_records(
records,
sentinel_zero_columns=["propensity_score"], # explicit
auto_detect_sentinels=False,
)
find_similar() also automatically downweights paths through hub feature nodes
using inverse-frequency (use_idf=True, default). A feature node connected to
170 entities now contributes ~0.19× weight; a rare one connected to 3 entities
contributes ~0.91×. No more manual edge_type_weights for the common case.
# Default — IDF + multi-path scoring + sentinel-aware
similar = await kg.find_similar("AcmeCorp", top_k=5)
for s in similar:
print(s["entity"], s["score"], s["via"])
# AcmeCorp's via list now shows ALL shared features:
# ['HAS_INDUSTRY:SaaS', 'HAS_PRODUCT:CDN', 'HAS_PRODUCT:WAF',
# 'USES_TECH:AWS', 'USES_TECH:K8s']
Build-time profiling & feature_report()
await kg.build_from_records(records, ...)
kg.compute_features()
kg.print_report()
EntityGraph report
Snapshot date: 2024-01-31
Frozen: False
Entities: 300
Total nodes: 1842
Total edges: 5104
Communities: 27
Features/entity: 49
Features generated:
structural ( 9) ['degree', 'unique_neighbors', ...]
community ( 2) ['community_id', 'community_size']
centrality ( 1) ['pagerank']
embedding ( 2) ['n2v_avg_neighbor_dist', 'n2v_norm']
knn_distance ( 3) ['knn_avg_distance', 'knn_min_distance', ...]
knn_rate ( 32) ['knn_rate_cdn', 'knn_rate_waf', ...]
PageRank caveat:
PageRank is computed on the bipartite graph and measures
feature-mediated centrality, not entity importance.
Warnings:
- Numeric column 'propensity_score': 170/300 (57%) values are 0.
Treating 0 as missing (excluded from graph).
- Categorical column 'segment': 60% null/sentinel.
Custom reference-group features
from pydantic_ai_cloudflare import compute_feature
# Shared tech with Zero Trust customers
f = compute_feature(kg, computation="shared_count",
node_type="tech_stack", reference_filter={"products": "zero trust"})
# Graph distance to nearest Enterprise customer
f = compute_feature(kg, computation="distance_to_nearest",
reference_filter={"segment": "Enterprise", "stage": "Customer"})
Export to ML
# Full (X, y) for sklearn/XGBoost
X, y = kg.to_ml_dataset("products_owned", target_columns=["products_owned"], k=5)
# X = {entity: {degree, pagerank, community, knn_rate_waf, knn_rate_casb, ...}}
# y = {entity: {label_cdn: 1, label_waf: 0, label_casb: 0, ...}}
# Persist for reproducible inference
kg.save_features("features.json")
features = EntityGraph.load_features("features.json")
Visualization (zero-dependency)
Render any EntityGraph to interactive HTML, Cytoscape.js JSON, D3 force-graph JSON, Mermaid (for docs), or GraphML (for Gephi/yEd). Color-coded by community or node type; edges colored by relationship type.
# Interactive HTML — open in any browser, draggable + zoomable
kg.render_html("graph.html", color_by="community", max_nodes=300)
# Self-contained — Cytoscape.js loaded from CDN, no Python deps to install.
# Subgraph around one entity (its 2-hop neighborhood)
kg.render_html("acme_neighborhood.html", focus="AcmeCorp", hops=2)
# Cytoscape.js JSON for embedding in your own web UI
spec = kg.to_cytoscape(color_by="community")
# {"nodes": [...], "edges": [...], "metadata": {legend: [...], ...}}
# Mermaid for Markdown / GitLab / Confluence (great for docs)
print(kg.to_mermaid(max_nodes=30))
# GraphML for Gephi or yEd
kg.to_graphml("graph.graphml")
Color modes:
color_by="community"— distinct color per Louvain community (default if computed)color_by="type"— entity=blue, concept=green, list=orange, range=graycolor_by="industry"(or any column) — color entities by their value of that column
Layout options for HTML: "cose" (force, default), "concentric",
"breadthfirst", "grid", "circle". Switch on the fly via the side panel.
Benchmarks (10,000 accounts)
| Step | Time | Output |
|---|---|---|
| Build graph | 0.25s | 28,705 nodes, 153,642 edges |
| Louvain communities | <1s | 29 communities |
| Node2Vec embeddings | ~75s | 32-dim structural embeddings |
| ML features | 3s | 22+ features/entity |
| Temporal decay | built-in | 18x recency boost |
Graph features for ML
| Feature Type | Features | Use Case |
|---|---|---|
| Structural | degree, clustering_coeff, unique_neighbors | Account complexity |
| Community | community_id, community_size (Louvain) | Market segmentation |
| Centrality | pagerank | Account importance |
| Node2Vec | n2v_norm, n2v_avg_neighbor_dist | Structural position |
| KNN Distance | knn_avg_distance, knn_min_distance | Similarity scoring |
| KNN Rate | knn_rate_{product} per target column | Propensity / upsell |
| Co-occurrence | p_ba, lift, co_won, co_rate_won | Cross-sell with outcome awareness |
| Pairwise | shared_neighbors, jaccard, adamic_adar | Match scoring / dedup |
| Custom | compute_feature() with any reference group | Any ML use case |
| Confidence | edge confidence tiers (structured vs LLM-extracted) | Feature reliability |
| Temporal | edge weight decay by recency | Recency-weighted features |
Notebooks
| Notebook | What you'll learn | Has outputs? |
|---|---|---|
| 01_getting_started | First agent, structured output, model discovery | Yes |
| 02_web_research | Browse, extract, discover links, scrape | Yes |
| 03_rag_pipeline | KnowledgeBase (managed) + DIYKnowledgeBase (chunking, embedding, reranking) | Partial |
| 04_persistent_chat | Multi-session conversations with D1 | Template |
| 05_code_mode_monty | Parallel tool execution with Monty | Walkthrough |
| 06_complex_structured_output | cf_structured() across all Workers AI models |
Yes |
| 07_knowledge_graph | Build graph from 2000 rows, ML features, KNN rates, recommendations | Yes |
How It Compares
| pydantic-ai-cloudflare | langchain-cloudflare | Raw API calls | |
|---|---|---|---|
| Framework | PydanticAI | LangChain | None |
| Type safety | Full Pydantic models | Loose | Manual |
| Structured output | Automatic (handles Workers AI quirks) | Manual method choice | DIY |
| Response normalization | Built-in (dict, fences, prose) | Built-in | DIY |
| Agent factory | cloudflare_agent() one-liner |
No | No |
| Model discovery | list_models(), recommend_model() |
No | No |
| Schema optimization | simplify_schema(), schema_stats() |
No | No |
| Web browsing | BrowserRunToolset (6 tools) |
Loader + Tool | httpx calls |
| RAG | VectorizeToolset (2 tools) |
CloudflareVectorize | Multiple APIs |
| Persistence | D1MessageHistory |
D1Saver (checkpoint only) | SQL queries |
| Observability | Auto via AI Gateway | None | Manual logging |
| Code Mode | Works with Monty | No | No |
| Cost | Free tier | Free tier | Free tier |
Roadmap
- v0.1.0 — Provider, Browser Run, Embeddings, Vectorize, D1, Gateway, Model Catalog, Schema Utils
- v0.2.0 — Production correctness for EntityGraph (IDF, sentinel filter, point-in-time, freeze/score, parallel LLM), built-in visualization (HTML, Cytoscape, Mermaid, GraphML), build-time profiling, feature_report()
- v0.3.0 — Upstream CloudflareProvider to
pydantic/pydantic-ai, VCR cassette integration tests - v1.0.0 — Stable API, full docs site, PyPI release
Contributing
See CONTRIBUTING.md.
License
MIT
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file pydantic_ai_cloudflare-0.2.0.tar.gz.
File metadata
- Download URL: pydantic_ai_cloudflare-0.2.0.tar.gz
- Upload date:
- Size: 713.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.9.25 {"installer":{"name":"uv","version":"0.9.25","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"macOS","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
b1b101e1b361339bb2b1316395cbd55418aec39fdc979e4395e26e615a2522d0
|
|
| MD5 |
49fae810970fe10e5789e2a0e9db0b4d
|
|
| BLAKE2b-256 |
ff57163ad0240363679eabc06e0ddfa23a0210883a40305c14104b7dee1f2296
|
File details
Details for the file pydantic_ai_cloudflare-0.2.0-py3-none-any.whl.
File metadata
- Download URL: pydantic_ai_cloudflare-0.2.0-py3-none-any.whl
- Upload date:
- Size: 100.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.9.25 {"installer":{"name":"uv","version":"0.9.25","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"macOS","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
bf9facfb630a9bd9dfb90ebb7a9997abc092383bb4e9bc7270538a0050ff8040
|
|
| MD5 |
0d7c8c038fd9843cff0519d8a5c1d7e6
|
|
| BLAKE2b-256 |
2c5f97854429d9119f07e158946f2a57b600cb4ba3818803fee739b0a7f27b42
|