No project description provided
Project description
Newscatcher CatchAll Python Library
The Newscatcher CatchAll Python library provides access to the CatchAll API, which transforms natural language queries into structured data extracted from web sources.
Installation
pip install newscatcher-catchall-sdk
Reference
A full reference for this library is available here.
Usage
Jobs
Submit a query and retrieve structured results:
from newscatcher_catchall import CatchAllApi
import time
client = CatchAllApi(api_key="YOUR_API_KEY")
# Create a job with optional limit for testing
job = client.jobs.create_job(
query="Tech company earnings this quarter",
context="Focus on revenue and profit margins",
limit=10, # Start with 10 records for quick testing
)
print(f"Job created: {job.job_id}")
# Poll for completion with progress updates
while True:
status = client.jobs.get_job_status(job.job_id)
# Check if completed or enriching (early access)
current_status = status.status
if current_status in ["completed", "enriching"]:
print(f"Job {current_status}!")
break
# Show current processing step
current_step = next((s for s in status.steps if not s.completed), None)
if current_step:
print(f"Processing: {current_step.status} (step {current_step.order}/7)")
time.sleep(60)
# Retrieve initial results (available during enriching stage)
results = client.jobs.get_job_results(job.job_id)
print(f"Found {results.valid_records} valid records")
print(f"Progress: {results.progress_validated}/{results.candidate_records} validated")
# Continue job to process more records
if results.valid_records >= 10:
continued = client.jobs.continue_job(
job_id=job.job_id,
new_limit=50, # Increase to 50 records
)
print(f"Job continued: {continued.job_id}")
# Wait for completion
while True:
status = client.jobs.get_job_status(job.job_id)
if status.status == "completed":
break
time.sleep(60)
# Get final results
results = client.jobs.get_job_results(job.job_id)
print(f"Final: {results.valid_records} records")
Jobs process asynchronously and typically complete in 10-15 minutes. To learn more, see the Quickstart.
Monitors
Automate recurring queries with scheduled execution:
from newscatcher_catchall import CatchAllApi
client = CatchAllApi(api_key="YOUR_API_KEY")
# Create a monitor from a completed job
monitor = client.monitors.create_monitor(
reference_job_id=job.job_id,
schedule="every day at 12 PM UTC",
webhook={
"url": "https://your-endpoint.com/webhook",
"method": "POST",
"headers": {"Authorization": "Bearer YOUR_TOKEN"},
},
)
print(f"Monitor created: {monitor.monitor_id}")
# Update webhook configuration without recreating monitor
updated = client.monitors.update_monitor(
monitor_id=monitor.monitor_id,
webhook={
"url": "https://new-endpoint.com/webhook",
"method": "POST",
"headers": {"Authorization": "Bearer NEW_TOKEN"},
},
)
# Pause monitor execution
client.monitors.disable_monitor(monitor.monitor_id)
print("Monitor paused")
# Resume monitor execution
client.monitors.enable_monitor(monitor.monitor_id)
print("Monitor resumed")
# List monitor execution history
jobs = client.monitors.list_monitor_jobs(
monitor_id=monitor.monitor_id,
sort="desc", # Most recent first
)
print(f"Monitor has executed {jobs.total_jobs} jobs")
for job in jobs.jobs:
print(f" Job {job.job_id}: {job.start_date} to {job.end_date}")
# Get aggregated results
results = client.monitors.pull_monitor_results(monitor.monitor_id)
print(f"Collected {results.records} records across all executions")
Monitors run jobs on your schedule and send webhook notifications when complete. See the Monitors documentation for setup and configuration.
Async client
Use the async client for non-blocking API calls:
async def main() -> None:
job = await client.jobs.create_job(
query="Tech company earnings this quarter",
context="Focus on revenue and profit margins",
)
print(f"Job created: {job.job_id}")
# Wait for completion
while True:
status = await client.jobs.get_job_status(job.job_id)
completed = any(s.status == "completed" and s.completed for s in status.steps)
if completed:
print("Job completed!")
break
current_step = next((s for s in status.steps if not s.completed), None)
if current_step:
print(f"Processing: {current_step.status} (step {current_step.order}/7)")
await asyncio.sleep(60)
Exception handling
Handle API errors with the ApiError exception:
from newscatcher_catchall.core.api_error import ApiError
try:
client.jobs.create_job(query="...")
except ApiError as e:
print(f"Status: {e.status_code}")
print(f"Error: {e.body}")
Advanced
Pagination
Retrieve large result sets with pagination:
# Retrieve large result sets with pagination
page = 1
while True:
results = client.jobs.get_job_results(
job_id="...",
page=page,
page_size=100,
)
print(f"Page {results.page}/{results.total_pages}: {len(results.all_records)} records")
for record in results.all_records:
# Process each record
print(f" - {record.record_title}")
if results.page >= results.total_pages:
break
page += 1
print(f"Processed {results.valid_records} total records")
Access raw response data
Access response headers and raw data:
response = client.jobs.with_raw_response.create_job(query="...")
print(response.headers)
print(response.data)
Retries
The SDK retries failed requests automatically with exponential backoff. Configure retry behavior:
client.jobs.create_job(
query="...",
request_options={"max_retries": 3},
)
Timeouts
Set custom timeouts at the client or request level:
# Client-level timeout
client = CatchAllApi(api_key="YOUR_API_KEY", timeout=30.0)
# Request-level timeout
client.jobs.create_job(
query="...",
request_options={"timeout_in_seconds": 10},
)
Custom HTTP client
Customize the underlying HTTP client for proxies or custom transports:
import httpx
from newscatcher_catchall import CatchAllApi
client = CatchAllApi(
api_key="YOUR_API_KEY",
httpx_client=httpx.Client(
proxy="http://my.proxy.example.com",
transport=httpx.HTTPTransport(local_address="0.0.0.0"),
),
)
Beta status
CatchAll API is in beta. Breaking changes may occur in minor version updates. See the Changelog for updates.
Contributing
This library is generated programmatically from our API specification. Direct contributions to the generated code cannot be merged, but README improvements are welcome. To suggest SDK changes, please open an issue.
Support
- Documentation: https://www.newscatcherapi.com/docs/v3/catch-all
- Support: support@newscatcherapi.com
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file newscatcher_catchall_sdk-0.4.0.tar.gz.
File metadata
- Download URL: newscatcher_catchall_sdk-0.4.0.tar.gz
- Upload date:
- Size: 41.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.5.1 CPython/3.8.18 Linux/6.11.0-1018-azure
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
0547f2a72321b5c5eeba812b312e399bb3997f0b384dcfd100f1b7e8ca0ef0c1
|
|
| MD5 |
f6da7b80e5895eb2a4b3d9d070fa19a9
|
|
| BLAKE2b-256 |
78448d45f679a32fc8c324c91b4bdd93c9213dc74dfea1298f30756567fa5aee
|
File details
Details for the file newscatcher_catchall_sdk-0.4.0-py3-none-any.whl.
File metadata
- Download URL: newscatcher_catchall_sdk-0.4.0-py3-none-any.whl
- Upload date:
- Size: 73.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.5.1 CPython/3.8.18 Linux/6.11.0-1018-azure
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
d1291f09c5b1372fee1b2cd87c9b640d8fe0fb452dcffeebb5c076ccf5766206
|
|
| MD5 |
57c5ebe090cc4635e8465c88d68983e9
|
|
| BLAKE2b-256 |
f4aed149c05bb6a3af71d0c20e8ecccd4792c9248542a03e6800cfcc1acd915b
|