A dynamic, extensible Python client for the APIHUB service supporting any APIs following the extract → status → retrieve pattern
Project description
Unstract API Hub Python Client
A Python client for the Unstract ApiHub service that provides a clean, Pythonic interface for document processing APIs following the extract → status → retrieve pattern.
🚀 Features
- Simple API Interface: Clean, easy-to-use client for Unstract ApiHub services
- File Processing: Support for document processing with file uploads
- Status Monitoring: Track processing status with polling capabilities
- Error Handling: Comprehensive exception handling with meaningful messages
- Flexible Parameters: Support for custom parameters and configurations
- Automatic Polling: Optional wait-for-completion functionality
- Type Safety: Full type hints for better development experience
📦 Installation
pip install apihub-python-client
Or install from source:
git clone https://github.com/Zipstack/apihub-python-client.git
cd apihub-python-client
pip install -e .
🎯 Quick Start
Basic Usage
from apihub_client import ApiHubClient
# Initialize the client
client = ApiHubClient(
api_key="your-api-key-here",
base_url="https://api-hub.us-central.unstract.com/api/v1"
)
# Process a document with automatic completion waiting
result = client.extract(
endpoint="bank_statement",
vertical="table",
sub_vertical="bank_statement",
file_path="statement.pdf",
wait_for_completion=True,
polling_interval=3 # Check status every 3 seconds
)
print("Processing completed!")
print(result)
🛠️ Common Use Cases
Document Splitter API
Split documents into smaller parts using the doc-splitter service:
from apihub_client import DocSplitterClient
# Initialize the doc-splitter client
doc_client = DocSplitterClient(
api_key="your-api-key-here",
base_url="http://localhost:8005"
)
# Simple upload and wait for completion
result = doc_client.upload(
file_path="large_document.pdf",
wait_for_completion=True,
polling_interval=5 # Check status every 5 seconds
)
# Download the split result
output_file = doc_client.download_result(
job_id=result["job_id"],
output_path="split_result.zip"
)
print(f"Downloaded result to: {output_file}")
Step-by-Step Doc-Splitter Processing
# Step 1: Upload document
upload_result = doc_client.upload(file_path="document.pdf")
job_id = upload_result["job_id"]
print(f"Upload completed. Job ID: {job_id}")
# Step 2: Monitor status manually
status = doc_client.get_job_status(job_id)
print(f"Current status: {status['status']}")
# Step 3: Wait for completion (with custom timeout)
final_result = doc_client.wait_for_completion(
job_id=job_id,
timeout=600, # Wait up to 10 minutes
polling_interval=3 # Check every 3 seconds
)
# Step 4: Download the processed result
downloaded_file = doc_client.download_result(
job_id=job_id,
output_path="processed_document.zip"
)
print(f"Processing complete! Downloaded: {downloaded_file}")
Batch Processing with Doc-Splitter
import os
from pathlib import Path
def process_documents_batch(file_paths):
"""Process multiple documents with doc-splitter."""
results = []
for file_path in file_paths:
try:
print(f"Processing {file_path}...")
# Upload and wait for completion
result = doc_client.upload(
file_path=file_path,
wait_for_completion=True,
polling_interval=5
)
# Generate output filename
input_name = Path(file_path).stem
output_path = f"{input_name}_split.zip"
# Download result
downloaded_file = doc_client.download_result(
job_id=result["job_id"],
output_path=output_path
)
results.append({
"input": file_path,
"output": downloaded_file,
"job_id": result["job_id"],
"success": True
})
except Exception as e:
print(f"Failed to process {file_path}: {e}")
results.append({
"input": file_path,
"error": str(e),
"success": False
})
return results
# Process multiple files
files = ["document1.pdf", "document2.pdf", "document3.pdf"]
results = process_documents_batch(files)
# Summary
successful = [r for r in results if r["success"]]
failed = [r for r in results if not r["success"]]
print(f"Processed: {len(successful)} successful, {len(failed)} failed")
Generic Unstract API
Process documents using dynamic endpoints like invoice, contract, receipt, etc.:
from apihub_client import GenericUnstractClient
# Initialize the generic client
client = GenericUnstractClient(
api_key="your-api-key-here",
base_url="http://localhost:8005"
)
# Simple processing with automatic completion waiting
result = client.process(
endpoint="invoice",
file_path="invoice.pdf",
wait_for_completion=True,
polling_interval=5 # Check status every 5 seconds
)
print("Invoice processing completed:", result)
Step-by-Step Generic API Processing
# Step 1: Start processing
process_result = client.process(
endpoint="contract",
file_path="contract.pdf"
)
execution_id = process_result["execution_id"]
print(f"Processing started. Execution ID: {execution_id}")
# Step 2: Check status manually
status = client.check_status("contract", execution_id)
print(f"Current status: {status}")
# Step 3: Wait for completion (with custom timeout)
final_result = client.wait_for_completion(
endpoint="contract",
execution_id=execution_id,
timeout=600, # Wait up to 10 minutes
polling_interval=3 # Check every 3 seconds
)
# Step 4: Get result later (if needed)
result = client.get_result("contract", execution_id)
print("Processing complete:", result)
Batch Processing with Generic APIs
def process_documents_batch(endpoint, file_paths):
"""Process multiple documents with the same endpoint."""
results = []
for file_path in file_paths:
try:
print(f"Processing {file_path} with {endpoint} endpoint...")
# Process and wait for completion
result = client.process(
endpoint=endpoint,
file_path=file_path,
wait_for_completion=True,
polling_interval=5
)
results.append({
"input": file_path,
"execution_id": result["execution_id"],
"result": result,
"success": True
})
except Exception as e:
print(f"Failed to process {file_path}: {e}")
results.append({
"input": file_path,
"error": str(e),
"success": False
})
return results
# Process multiple invoices
invoice_files = ["invoice1.pdf", "invoice2.pdf", "invoice3.pdf"]
results = process_documents_batch("invoice", invoice_files)
# Process multiple contracts
contract_files = ["contract1.pdf", "contract2.pdf"]
contract_results = process_documents_batch("contract", contract_files)
# Summary
successful = [r for r in results if r["success"]]
failed = [r for r in results if not r["success"]]
print(f"Processed: {len(successful)} successful, {len(failed)} failed")
Integration: Doc-Splitter + Extraction APIs
Combine doc-splitter with extraction APIs for complete document processing:
from apihub_client import ApiHubClient, DocSplitterClient
# Initialize both clients
api_client = ApiHubClient(
api_key="your-api-key",
base_url="https://api-hub.us-central.unstract.com/api/v1"
)
doc_splitter = DocSplitterClient(
api_key="your-api-key",
base_url="http://localhost:8005"
)
# Step 1: Split the large document
split_result = doc_splitter.upload(
file_path="large_contract.pdf",
wait_for_completion=True
)
# Step 2: Download split result
doc_splitter.download_result(
job_id=split_result["job_id"],
output_path="split_documents.zip"
)
# Step 3: Process individual documents (example with one document)
# (assuming you extract individual PDFs from the zip)
table_result = api_client.extract(
endpoint="bank_statement",
vertical="table",
sub_vertical="bank_statement",
file_path="individual_page.pdf",
wait_for_completion=True
)
print("Extracted data:", table_result)
Complete Workflow: All Three Clients
from apihub_client import ApiHubClient, DocSplitterClient, GenericUnstractClient
# Initialize all clients
api_client = ApiHubClient(
api_key="your-api-key",
base_url="https://api-hub.us-central.unstract.com/api/v1"
)
doc_splitter = DocSplitterClient(
api_key="your-api-key",
base_url="http://localhost:8005"
)
generic_client = GenericUnstractClient(
api_key="your-api-key",
base_url="http://localhost:8005"
)
# Workflow: Split → Extract → Process with Generic API
# Step 1: Split large document
split_result = doc_splitter.upload(
file_path="large_document.pdf",
wait_for_completion=True
)
# Step 2: Extract tables from split documents
# (after extracting individual files from the zip)
table_result = api_client.extract(
endpoint="discover_tables",
vertical="table",
sub_vertical="discover_tables",
file_path="split_page_1.pdf",
wait_for_completion=True
)
# Step 3: Process with generic invoice API
invoice_result = generic_client.process(
endpoint="invoice",
file_path="split_page_2.pdf",
wait_for_completion=True
)
print("Complete workflow finished!")
print("Tables extracted:", len(table_result.get('data', [])))
print("Invoice processed:", invoice_result.get('execution_id'))
All Table Extraction API
# Step 1: Discover tables from the uploaded PDF
initial_result = client.extract(
endpoint="discover_tables",
vertical="table",
sub_vertical="discover_tables",
ext_cache_result="true",
ext_cache_text="true",
file_path="statement.pdf"
)
file_hash = initial_result.get("file_hash")
print("File hash", file_hash)
discover_tables_result = client.wait_for_complete(file_hash,
timeout=600, # max wait for 10 mins
polling_interval=3 # polling every 3s
)
tables = json.loads(discover_tables_result['data'])
print(f"Total tables in this document: {len(tables)}")
all_table_result = []
# Step 2: Extract specific table
for i, table in enumerate(tables):
table_result = client.extract(
endpoint="extract_table",
vertical="table",
sub_vertical="extract_table",
file_hash=file_hash,
ext_table_no=i, # extracting nth table
wait_for_completion=True
)
print(f"Extracted table : {table['table_name']}")
all_table_result.append({table["table_name"]: table_result})
print("All table result")
print(all_table_result)
Bank Statement Extraction API
# Process bank statement
result = client.extract(
endpoint="bank_statement",
vertical="table",
sub_vertical="bank_statement",
file_path="bank_statement.pdf",
wait_for_completion=True,
polling_interval=3
)
print("Bank statement processed:", result)
Step-by-Step Processing
# Step 1: Start processing
initial_result = client.extract(
endpoint="discover_tables",
vertical="table",
sub_vertical="discover_tables",
file_path="document.pdf"
)
file_hash = initial_result["file_hash"]
print(f"Processing started with hash: {file_hash}")
# Step 2: Monitor status
status = client.get_status(file_hash)
print(f"Current status: {status['status']}")
# Step 3: Wait for completion (using wait_for_complete method)
final_result = client.wait_for_complete(
file_hash=file_hash,
timeout=600, # Wait up to 10 minutes
polling_interval=3 # Check every 3 seconds
)
print("Final result:", final_result)
Using Cached Files
Once a file has been processed, you can reuse it by file hash:
# Process a different operation on the same file
table_result = client.extract(
endpoint="extract_table",
vertical="table",
sub_vertical="extract_table",
file_hash="previously-obtained-hash",
ext_table_no=1, # Extract second table. Indexing starts at 0
wait_for_completion=True
)
🔧 Configuration
Environment Variables
Create a .env file:
API_KEY=your_api_key_here
BASE_URL=https://api.example.com
LOG_LEVEL=INFO
Then load in your code:
import os
from dotenv import load_dotenv
from apihub_client import ApiHubClient
load_dotenv()
client = ApiHubClient(
api_key=os.getenv("API_KEY"),
base_url=os.getenv("BASE_URL")
)
📚 API Reference
ApiHubClient
The main client class for interacting with the ApiHub service.
client = ApiHubClient(api_key: str, base_url: str)
Parameters:
api_key(str): Your API key for authenticationbase_url(str): The base URL of the ApiHub service
DocSplitterClient
Client for interacting with doc-splitter APIs for document splitting operations.
doc_client = DocSplitterClient(api_key: str, base_url: str)
Parameters:
api_key(str): Your API key for authenticationbase_url(str): The base URL of the doc-splitter service
GenericUnstractClient
Client for interacting with generic Unstract APIs using dynamic endpoints.
generic_client = GenericUnstractClient(api_key: str, base_url: str)
Parameters:
api_key(str): Your API key for authenticationbase_url(str): The base URL of the Unstract service
Methods
extract()
Start a document processing operation.
extract(
endpoint: str,
vertical: str,
sub_vertical: str,
file_path: str | None = None,
file_hash: str | None = None,
wait_for_completion: bool = False,
polling_interval: int = 5,
**kwargs
) -> dict
Parameters:
endpoint(str): The API endpoint to call (e.g., "discover_tables", "extract_table")vertical(str): The processing verticalsub_vertical(str): The processing sub-verticalfile_path(str, optional): Path to file for upload (for new files)file_hash(str, optional): Hash of previously uploaded file (for cached operations)wait_for_completion(bool): If True, polls until completion and returns final resultpolling_interval(int): Seconds between status checks when waiting (default: 5)**kwargs: Additional parameters specific to the endpoint
Returns:
dict: API response containing processing results or file hash for tracking
get_status()
Check the status of a processing job.
get_status(file_hash: str) -> dict
Parameters:
file_hash(str): The file hash returned from extract()
Returns:
dict: Status information including current processing state
retrieve()
Get the final results of a completed processing job.
retrieve(file_hash: str) -> dict
Parameters:
file_hash(str): The file hash of the completed job
Returns:
dict: Final processing results
wait_for_complete()
Wait for a processing job to complete by polling its status.
wait_for_complete(
file_hash: str,
timeout: int = 600,
polling_interval: int = 3
) -> dict
Parameters:
file_hash(str): The file hash of the job to wait fortimeout(int): Maximum time to wait in seconds (default: 600)polling_interval(int): Seconds between status checks (default: 3)
Returns:
dict: Final processing results when completed
Raises:
ApiHubClientException: If processing fails or times out
DocSplitterClient Methods
upload()
Upload a document for splitting.
upload(
file_path: str,
wait_for_completion: bool = False,
polling_interval: int = 5,
) -> dict
Parameters:
file_path(str): Path to the file to uploadwait_for_completion(bool): If True, polls until completion and returns final resultpolling_interval(int): Seconds between status checks when waiting (default: 5)
Returns:
dict: Response containing job_id and status information
get_job_status()
Check the status of a splitting job.
get_job_status(job_id: str) -> dict
Parameters:
job_id(str): The job ID to check status for
Returns:
dict: Status information including current processing state
download_result()
Download the result of a completed splitting job.
download_result(
job_id: str,
output_path: str | None = None
) -> str
Parameters:
job_id(str): The job ID to download results foroutput_path(str, optional): Path where to save the downloaded file. If None, uses 'result_{job_id}.zip'
Returns:
str: Path to the downloaded file
wait_for_completion()
Wait for a splitting job to complete by polling its status.
wait_for_completion(
job_id: str,
timeout: int = 600,
polling_interval: int = 3
) -> dict
Parameters:
job_id(str): The job ID to wait fortimeout(int): Maximum time to wait in seconds (default: 600)polling_interval(int): Seconds between status checks (default: 3)
Returns:
dict: Final job status information when completed
Raises:
ApiHubClientException: If processing fails or times out
GenericUnstractClient Methods
process()
Process a document using the specified endpoint.
process(
endpoint: str,
file_path: str,
wait_for_completion: bool = False,
polling_interval: int = 5,
timeout: int = 600,
) -> dict
Parameters:
endpoint(str): The endpoint name (e.g., 'invoice', 'contract', 'receipt')file_path(str): Path to the file to uploadwait_for_completion(bool): If True, polls until completion and returns final resultpolling_interval(int): Seconds between status checks when waiting (default: 5)timeout(int): Maximum time to wait for completion in seconds (default: 600)
Returns:
dict: Response containing execution_id and processing information
get_result()
Get the result of a processing operation.
get_result(endpoint: str, execution_id: str) -> dict
Parameters:
endpoint(str): The endpoint name used for processingexecution_id(str): The execution ID to get results for
Returns:
dict: Processing result or status information
wait_for_completion()
Wait for a processing operation to complete by polling its status.
wait_for_completion(
endpoint: str,
execution_id: str,
timeout: int = 600,
polling_interval: int = 3,
) -> dict
Parameters:
endpoint(str): The endpoint name used for processingexecution_id(str): The execution ID to wait fortimeout(int): Maximum time to wait in seconds (default: 600)polling_interval(int): Seconds between status checks (default: 3)
Returns:
dict: Final processing result when completed
check_status()
Check the current status of a processing operation.
check_status(endpoint: str, execution_id: str) -> str | None
Parameters:
endpoint(str): The endpoint name used for processingexecution_id(str): The execution ID to check status for
Returns:
str | None: Current status string, or None if not available
Raises:
ApiHubClientException: If processing fails or times out
Exception Handling
All clients (ApiHubClient, DocSplitterClient, and GenericUnstractClient) use the same exception handling:
from apihub_client import ApiHubClientException, GenericUnstractClient
generic_client = GenericUnstractClient(api_key="key", base_url="http://localhost:8005")
try:
result = generic_client.process(
endpoint="invoice",
file_path="invoice.pdf",
wait_for_completion=True
)
print("Processing completed:", result["execution_id"])
except ApiHubClientException as e:
print(f"Error: {e.message}")
print(f"Status Code: {e.status_code}")
Batch Processing
import time
from pathlib import Path
def process_documents(file_paths, endpoint):
results = []
for file_path in file_paths:
try:
print(f"Processing {file_path}...")
# Start processing
initial_result = client.extract(
endpoint=endpoint,
vertical="table",
sub_vertical=endpoint,
file_path=file_path
)
# Wait for completion with custom settings
result = client.wait_for_complete(
file_hash=initial_result["file_hash"],
timeout=900, # 15 minutes for batch processing
polling_interval=5 # Less frequent polling for batch
)
results.append({"file": file_path, "result": result, "success": True})
except ApiHubClientException as e:
print(f"Failed to process {file_path}: {e.message}")
results.append({"file": file_path, "error": str(e), "success": False})
return results
# Process multiple files
file_paths = ["doc1.pdf", "doc2.pdf", "doc3.pdf"]
results = process_documents(file_paths, "bank_statement")
# Summary
successful = [r for r in results if r["success"]]
failed = [r for r in results if not r["success"]]
print(f"Processed: {len(successful)} successful, {len(failed)} failed")
🧪 Testing
Run the test suite:
# Install development dependencies
pip install -e ".[dev]"
# Run all tests
pytest
# Run tests with coverage
pytest --cov=apihub_client --cov-report=html
# Run specific test files
pytest test/test_client.py -v
pytest test/test_integration.py -v
Integration Testing
For integration tests with a real API:
# Create .env file with real credentials
cp .env.example .env
# Edit .env with your API credentials
# Run integration tests
pytest test/test_integration.py -v
🔍 Logging
Enable debug logging to see detailed request/response information:
import logging
# Enable debug logging
logging.basicConfig(level=logging.DEBUG)
client = ApiHubClient(api_key="your-key", base_url="https://api.example.com")
# Now all API calls will show detailed logs
result = client.extract(...)
🚀 Releases
This project uses automated releases through GitHub Actions with PyPI Trusted Publishers for secure publishing.
Creating a Release
- Go to GitHub Actions → "Release Tag and Publish Package"
- Click "Run workflow" and configure:
- Version bump:
patch(bug fixes),minor(new features), ormajor(breaking changes) - Pre-release: Check for beta/alpha versions
- Release notes: Optional custom notes
- Version bump:
- Click "Run workflow" - the automation handles the rest!
The workflow will automatically:
- Update version in the code
- Create Git tags and GitHub releases
- Run all tests and quality checks
- Publish to PyPI using
uv publishwith Trusted Publishers
For more details, see Release Documentation.
🤝 Contributing
We welcome contributions! Please see our Contributing Guide for details.
Development Setup
# Clone the repository
git clone https://github.com/Zipstack/apihub-python-client.git
cd apihub-python-client
# Install dependencies using uv (required - do not use pip)
uv sync
# Install pre-commit hooks
uv run --frozen pre-commit install
# Run tests
uv run --frozen pytest
# Run linting and formatting
uv run --frozen ruff check .
uv run --frozen ruff format .
# Run type checking
uv run --frozen mypy src/
# Run all pre-commit hooks manually
uv run --frozen pre-commit run --all-files
📄 License
This project is licensed under the MIT License - see the LICENSE file for details.
🆘 Support
- Issues: GitHub Issues
- Documentation: Check this README and inline code documentation
- Examples: See the
examples/directory for more usage patterns
📈 Version History
v0.1.0
- Initial release
- Basic client functionality with extract, status, and retrieve operations
- File upload support
- Automatic polling with wait_for_completion
- Comprehensive test suite
Made with ❤️ by the Unstract team
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file apihub_python_client-0.2.1.tar.gz.
File metadata
- Download URL: apihub_python_client-0.2.1.tar.gz
- Upload date:
- Size: 94.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: uv/0.6.14
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
fe7dbf7b92778da0aaae59a56b03a64abc7800b30b5d03b493644758b458f176
|
|
| MD5 |
e24fe6d0bbe38c2d74015d1424760836
|
|
| BLAKE2b-256 |
a3e5435fa44ebd3652f6fc9d684815c847d248b5bb915cc3e38767584a6bd257
|
File details
Details for the file apihub_python_client-0.2.1-py3-none-any.whl.
File metadata
- Download URL: apihub_python_client-0.2.1-py3-none-any.whl
- Upload date:
- Size: 15.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: uv/0.6.14
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
362c5d4af264d21f1b76f375d77860ff59afe3831351e5e399656dfba685396e
|
|
| MD5 |
7ed73539cc2908f7348c48cb48252b3b
|
|
| BLAKE2b-256 |
7eb94ccecca8e6cbc8ca2af067b20a7004375632b01b0c414410edd785052360
|