Python library for the DocuTray API
Project description
DocuTray Python Library
The official Python library for the DocuTray API, providing access to document processing capabilities including OCR, document identification, data extraction, and knowledge bases.
Documentation
Full API documentation is available at docs.docutray.com.
Installation
pip install docutray
For API reference generation tools (maintainers only):
pip install docutray[docs]
Requirements
- Python 3.10+
Quick Start
Synchronous Usage
from pathlib import Path
from docutray import Client
client = Client(api_key="your-api-key")
# Convert a document
result = client.convert.run(
file=Path("invoice.pdf"),
document_type_code="invoice"
)
print(result.data)
client.close()
Asynchronous Usage
import asyncio
from pathlib import Path
from docutray import AsyncClient
async def main():
async with AsyncClient(api_key="your-api-key") as client:
result = await client.convert.run(
file=Path("invoice.pdf"),
document_type_code="invoice"
)
print(result.data)
asyncio.run(main())
Configuration
API Key
Set your API key via constructor argument or environment variable:
# Via constructor
client = Client(api_key="your-api-key")
# Via environment variable
# export DOCUTRAY_API_KEY="your-api-key"
client = Client() # Reads from DOCUTRAY_API_KEY
Base URL
Override the default API endpoint:
client = Client(
api_key="your-api-key",
base_url="https://custom-api.example.com"
)
Timeout
Configure request timeouts (in seconds):
import httpx
# Simple timeout (applies to all operations)
client = Client(api_key="your-api-key", timeout=30.0)
# Granular timeout control
client = Client(
api_key="your-api-key",
timeout=httpx.Timeout(
connect=5.0,
read=60.0,
write=60.0,
pool=10.0
)
)
Retries
Configure automatic retry behavior:
# Default: 2 retries with exponential backoff
client = Client(api_key="your-api-key")
# Custom retry count
client = Client(api_key="your-api-key", max_retries=5)
# Disable retries
client = Client(api_key="your-api-key", max_retries=0)
Error Handling
The SDK provides a comprehensive exception hierarchy:
DocuTrayError (base)
├── APIConnectionError (network errors)
│ └── APITimeoutError (request timeout)
└── APIError (HTTP errors)
├── BadRequestError (400)
├── AuthenticationError (401)
├── PermissionDeniedError (403)
├── NotFoundError (404)
├── ConflictError (409)
├── UnprocessableEntityError (422)
├── RateLimitError (429)
└── InternalServerError (5xx)
Catching Errors
from pathlib import Path
from docutray import Client
from docutray import (
DocuTrayError,
APIConnectionError,
APIError,
AuthenticationError,
RateLimitError,
NotFoundError,
)
client = Client(api_key="your-api-key")
try:
result = client.convert.run(
file=Path("document.pdf"),
document_type_code="invoice"
)
except AuthenticationError as e:
print(f"Invalid API key: {e.message}")
except RateLimitError as e:
print(f"Rate limited. Retry after {e.retry_after} seconds")
except NotFoundError as e:
print(f"Resource not found: {e.message}")
except APIError as e:
print(f"API error {e.status_code}: {e.message}")
print(f"Request ID: {e.request_id}")
except APIConnectionError as e:
print(f"Connection failed: {e.message}")
except DocuTrayError as e:
print(f"SDK error: {e.message}")
Rate Limit Handling
import time
from pathlib import Path
from docutray import Client, RateLimitError
client = Client(api_key="your-api-key")
try:
result = client.convert.run(file=Path("document.pdf"), document_type_code="invoice")
except RateLimitError as e:
if e.retry_after:
print(f"Rate limited. Waiting {e.retry_after} seconds...")
time.sleep(e.retry_after)
# Retry the request
Resources
Convert
Convert documents to structured data using OCR and AI extraction.
from pathlib import Path
# Synchronous conversion (waits for result)
result = client.convert.run(
file=Path("invoice.pdf"),
document_type_code="invoice"
)
print(result.data)
# From bytes
with open("invoice.pdf", "rb") as f:
result = client.convert.run(
file=f.read(),
document_type_code="invoice"
)
# From URL
result = client.convert.run(
url="https://example.com/invoice.pdf",
document_type_code="invoice"
)
# Asynchronous conversion (returns immediately)
status = client.convert.run_async(
file=Path("large_document.pdf"),
document_type_code="invoice"
)
print(f"Conversion ID: {status.conversion_id}")
# Poll for completion
final_status = status.wait()
if final_status.is_success():
print(final_status.data)
Identify
Automatically identify document types.
result = client.identify.run(file=Path("unknown_document.pdf"))
print(f"Identified as: {result.document_type.name}")
print(f"Confidence: {result.document_type.confidence:.2%}")
# View alternatives
for alt in result.alternatives:
print(f" Alternative: {alt.name} ({alt.confidence:.2%})")
Document Types
List, create, update, and manage document type definitions.
# List all document types
page = client.document_types.list()
for doc_type in page.data:
print(f"{doc_type.code}: {doc_type.name}")
# Search document types
page = client.document_types.list(search="invoice")
# Get a specific document type
doc_type = client.document_types.get("dt_invoice")
print(f"Schema: {doc_type.jsonSchema}")
# Create a custom document type
doc_type = client.document_types.create(
name="Invoice",
code_type="invoice",
description="Invoice documents with line items",
json_schema={
"type": "object",
"properties": {
"invoice_number": {"type": "string"},
"total": {"type": "number"},
},
},
conversion_mode="json",
)
print(f"Created: {doc_type.id}")
# Update a document type
doc_type = client.document_types.update(
"dt_invoice",
name="Updated Invoice",
is_draft=False,
)
# Validate data against a document type schema
validation = client.document_types.validate(
"dt_invoice",
{"invoice_number": "INV-001", "total": 100.00}
)
if validation.is_valid():
print("Data is valid!")
else:
for error in validation.errors.messages:
print(f"Validation error: {error}")
Steps
Execute predefined document processing workflows.
# Start async step execution
status = client.steps.run_async(
step_id="step_invoice_extraction",
file=Path("invoice.pdf")
)
# Wait for completion with progress callback
def on_progress(s):
print(f"Status: {s.status}")
final = status.wait(on_status=on_progress)
print(final.data)
Knowledge Bases
Manage document collections with semantic search capabilities.
# List knowledge bases
for kb in client.knowledge_bases.list().auto_paging_iter():
print(f"{kb.name}: {kb.document_count} documents")
# Create a knowledge base
kb = client.knowledge_bases.create(
name="Product Documentation",
description="Technical documentation for products",
schema={
"type": "object",
"properties": {
"title": {"type": "string"},
"content": {"type": "string"},
"category": {"type": "string"}
}
}
)
# Add documents
doc = client.knowledge_bases.documents(kb.id).create(
content={
"title": "Getting Started",
"content": "Welcome to our product...",
"category": "guides"
},
metadata={"source": "manual"}
)
# Semantic search
results = client.knowledge_bases.search(
kb.id,
query="how to configure authentication",
limit=5
)
for item in results.data:
print(f"{item.similarity:.2%}: {item.document.content['title']}")
# Update a document
client.knowledge_bases.documents(kb.id).update(
doc.id,
content={"title": "Updated Title", "content": "..."}
)
# Delete a document
client.knowledge_bases.documents(kb.id).delete(doc.id)
# Delete knowledge base
client.knowledge_bases.delete(kb.id)
Pagination
Resources that return lists support pagination:
# Get the first page
page = client.document_types.list(limit=10)
print(f"Page {page.page} of {page.total_pages}")
# Iterate through all pages manually
for page in client.document_types.list().iter_pages():
for doc_type in page.data:
print(doc_type.name)
# Auto-iterate through all items
for doc_type in client.document_types.list().auto_paging_iter():
print(doc_type.name)
# Async pagination (inside an async function)
# async for doc_type in (await client.document_types.list()).auto_paging_iter_async():
# print(doc_type.name)
Raw Response Access
Access raw HTTP response data for debugging:
from pathlib import Path
from docutray import Client
client = Client(api_key="your-api-key")
response = client.convert.with_raw_response.run(
file=Path("invoice.pdf"),
document_type_code="invoice"
)
print(f"Status: {response.status_code}")
print(f"Headers: {response.headers}")
print(f"Request ID: {response.headers.get('x-request-id')}")
# Parse the response body
result = response.parse()
print(result.data)
Async Operations
For long-running operations, use async methods with polling:
from pathlib import Path
from docutray import Client
client = Client(api_key="your-api-key")
# Start async conversion
status = client.convert.run_async(
file=Path("large_document.pdf"),
document_type_code="invoice"
)
# Poll with progress callback
def on_status(s):
print(f"Status: {s.status}, Progress: {s.progress or 'N/A'}")
final = status.wait(
on_status=on_status,
poll_interval=2.0, # seconds between polls
timeout=300.0 # maximum wait time
)
if final.is_success():
print("Conversion complete!")
print(final.data)
elif final.is_failed():
print(f"Conversion failed: {final.error}")
Type Safety
The SDK uses Pydantic models for all responses, providing full type safety:
from pathlib import Path
from docutray import Client
from docutray.types import ConversionResult, DocumentType
client = Client(api_key="your-api-key")
# Type hints work with your IDE
result: ConversionResult = client.convert.run(
file=Path("invoice.pdf"),
document_type_code="invoice"
)
# Access typed attributes
print(result.conversion_id) # str
print(result.data) # dict[str, Any]
print(result.status) # str
Contributing
We welcome contributions! Here's how to get started:
Development Setup
# Clone the repository
git clone https://github.com/docutray/docutray-python.git
cd docutray-python
# Install dependencies with uv
uv sync
# Run tests
uv run pytest
# Run tests with coverage
uv run pytest --cov=src/docutray
# Type checking
uv run mypy src
# Linting
uv run ruff check src
# Format code
uv run ruff format src
Running Tests
# All tests
uv run pytest
# Specific test file
uv run pytest tests/test_client.py
# Specific test
uv run pytest tests/test_client.py::test_client_initialization
Support
License
MIT
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file docutray-0.2.1.tar.gz.
File metadata
- Download URL: docutray-0.2.1.tar.gz
- Upload date:
- Size: 36.4 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
98b72d08a9edadd4f62b21a4a26a997e6fbd40bdab1feda8686b1bce08cbdb39
|
|
| MD5 |
edb5e45e12292779ba5c7adf2b1f7f65
|
|
| BLAKE2b-256 |
a849150905eb777e9d11c91326c52ed530b0fdba5c144a2c489bec5ab897b777
|
Provenance
The following attestation bundles were made for docutray-0.2.1.tar.gz:
Publisher:
publish.yml on docutray/docutray-python
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
docutray-0.2.1.tar.gz -
Subject digest:
98b72d08a9edadd4f62b21a4a26a997e6fbd40bdab1feda8686b1bce08cbdb39 - Sigstore transparency entry: 1462883211
- Sigstore integration time:
-
Permalink:
docutray/docutray-python@80bc83441cc130b47eb6b3021259d7bf83b1e16a -
Branch / Tag:
refs/tags/v0.2.1 - Owner: https://github.com/docutray
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@80bc83441cc130b47eb6b3021259d7bf83b1e16a -
Trigger Event:
push
-
Statement type:
File details
Details for the file docutray-0.2.1-py3-none-any.whl.
File metadata
- Download URL: docutray-0.2.1-py3-none-any.whl
- Upload date:
- Size: 50.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
ae5938cf5eb3a73ac166d99ee3055759bef60ae638bf1c913a523d582fe01f97
|
|
| MD5 |
59837d535ab31df032ddeb31467f22f0
|
|
| BLAKE2b-256 |
9b2ae0d13bc421b3c333027785ae28672c7a751afa54879da1e0206ee5087b66
|
Provenance
The following attestation bundles were made for docutray-0.2.1-py3-none-any.whl:
Publisher:
publish.yml on docutray/docutray-python
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
docutray-0.2.1-py3-none-any.whl -
Subject digest:
ae5938cf5eb3a73ac166d99ee3055759bef60ae638bf1c913a523d582fe01f97 - Sigstore transparency entry: 1462883242
- Sigstore integration time:
-
Permalink:
docutray/docutray-python@80bc83441cc130b47eb6b3021259d7bf83b1e16a -
Branch / Tag:
refs/tags/v0.2.1 - Owner: https://github.com/docutray
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@80bc83441cc130b47eb6b3021259d7bf83b1e16a -
Trigger Event:
push
-
Statement type: