Skip to main content

High-performance semantic search with intelligent company grouping and parallel execution

Project description

Smart Batching Search

A high-performance semantic search system that reduces API queries by 67-99% (varies by topic specificity) through intelligent company grouping and parallel execution.

This module provides a two-step system for efficient semantic search:

  1. Planning: Organize search using smart batching and return Chunk upper bound estimate
  2. Execution: Perform search with proportional sampling to preserve distribution

Key Benefits

  • 67-99% Query Reduction: Search 4,732 companies with only 17-3,699 queries (varies by topic)
  • Parallel Execution: Rate-limited concurrent requests with semaphore control
  • Proportional Sampling: Retrieve percentage of results while preserving distribution
  • Production Ready: Comprehensive error handling, retries, and logging
  • Scalable: Efficiently handles universes with 10,000+ companies

Installation

Install the package from PyPI (Python 3.11+):

pip install bigdata-smart-batching

With uv:

uv add bigdata-smart-batching

Development

To work on this repository locally, from the project root:

uv sync

Environment Setup

Set up environment variables:

export BIGDATA_API_KEY="your_api_key_here"
export BIGDATA_API_BASE_URL="https://api.bigdata.com"  # Optional, defaults to this

Or create a .env file:

BIGDATA_API_KEY=your_api_key_here
BIGDATA_API_BASE_URL=https://api.bigdata.com

Universe (CSV path or entity list)

plan_search() takes a universe argument: either a path to a UTF-8 CSV file, or a list[str] of entity IDs. IDs must match the identifiers used by the Bigdata API for your dataset.

When using a CSV, two layouts are supported:

1. Header row with an id column (optional extra columns such as name are ignored):

id,name
B8EF97,Example Corp A
BB07E4,Example Corp B
3461CF,Example Corp C

Quick Start

from bigdata_smart_batching import (
    plan_search,
    execute_search,
    deduplicate_documents,
    convert_to_dataframe,
)

# Step 1: Plan the search
plan = plan_search(
    text="earnings revenue profit",
    universe="id_name_mapping_us_top_3000.csv",
    start_date="2023-01-01",
    end_date="2023-12-31",
    api_key="your_api_key",  # or set BIGDATA_API_KEY env var
)

print(f"Chunk upper bound estimate: {plan['chunk_upper_bound_estimate']:,}")

# Step 2: Execute search with 10% of total chunks (preserves distribution)
results_raw = execute_search(
    search_plan=plan,
    chunk_percentage=0.1,
    requests_per_minute=100,
)

# Step 3: Deduplicate and convert to DataFrame
results = deduplicate_documents(results_raw)
print(f"Retrieved {len(results)} documents (deduplicated)")

df = convert_to_dataframe(results)  # one row per chunk

Save and Load Plans

from bigdata_smart_batching import plan_search, execute_search, save_plan, load_plan

# Create and save a plan
plan = plan_search(
    text="merger acquisition",
    universe="id_name_mapping_us_top_3000.csv",
    start_date="2023-01-01",
    end_date="2023-12-31",
)
save_plan(plan, "my_search_plan.json")

# Later: reload and run with different sampling
plan = load_plan("my_search_plan.json")
raw_10 = execute_search(plan, chunk_percentage=0.1)
raw_50 = execute_search(plan, chunk_percentage=0.5)

How It Works

Architecture Overview

Step 1: PLANNING
  Universe CSV  -->  Co-mention API Query  -->  Basket Creation  -->  Search Plan

Step 2: EXECUTION
  Proportional Sampling  -->  Parallel Search (Rate Limited)  -->  Collect & Aggregate

Planning (plan_search())

  1. Loads the universe of companies from a CSV path or inline list
  2. Queries the comention endpoint to get chunk volumes per company
  3. Splits date ranges by volume when a company exceeds the chunk limit
  4. Creates optimized baskets grouped by volume
  5. Returns a plan with Chunk upper bound estimate and basket configurations

Execution (execute_search())

  1. Calculates proportional chunks per basket
  2. Ensures minimum of 1 chunk per basket (if expected > 0)
  3. Executes searches in parallel with rate limiting and semaphore
  4. Collects and returns document results

API Reference

plan_search()

Parameter Type Default Description
text str required Search query text
universe str | list[str] required CSV path or list of entity IDs
start_date str required Start date (YYYY-MM-DD)
end_date str required End date (YYYY-MM-DD)
api_key str env var API key
api_base_url str env var API base URL
volume_query_mode str "three_pass" "three_pass" or "iterative"
apply_volume_splits bool True Use volume time series for period splitting
min_period_days int 30 Minimum days per sub-period

execute_search()

Parameter Type Default Description
search_plan Dict required Plan from plan_search()
chunk_percentage float required 0.0 to 1.0 sampling ratio
requests_per_minute int 100 Rate limit
api_key str env var API key
max_workers int 40 Parallel workers

Helper Functions

  • deduplicate_documents(documents) -- Merges duplicate documents by id
  • resolve_universe(universe) -- Loads entity IDs from a CSV path or validates a list[str]
  • load_universe_from_csv(csv_path) -- Loads entity IDs from CSV only
  • convert_to_dataframe(raw_results) -- Converts documents to DataFrame (one row per chunk)
  • save_plan(plan, path) / load_plan(path) -- Persist plans as JSON
  • portfolio_backtesting_pipeline(...) -- Long-short portfolio backtesting

Testing

# Run all tests
uv run pytest

# With coverage
uv run pytest --cov=bigdata_smart_batching --cov-report=term-missing

# Specific test file
uv run pytest tests/test_validation.py -v

Project Structure

bigdata-smart-batching/
├── pyproject.toml
├── README.md
├── .python-version
├── src/
│   └── bigdata_smart_batching/
│       ├── __init__.py
│       ├── smart_batching.py
│       ├── smart_batching_config.py
│       ├── search_function.py
│       ├── output_converter.py
│       └── portfolio_backtesting.py
└── tests/
    ├── __init__.py
    ├── test_config.py
    ├── test_output_converter.py
    ├── test_validation.py
    └── test_rate_limiter.py

Configuration

Environment Variables

  • BIGDATA_API_KEY: Required -- Your Bigdata API key
  • BIGDATA_API_BASE_URL: Optional -- API base URL (default: https://api.bigdata.com)

Default Settings

  • requests_per_minute: 100
  • max_workers: 40
  • max_chunks_per_basket: 1000
  • volume_query_mode: "three_pass"

License

This project is part of Bigdata.com.

Disclaimer: This software is provided "as is" without warranty of any kind, express or implied. The authors and contributors assume no responsibility for the accuracy, completeness, or usefulness of any information, results, or processes provided. This software is for educational and research purposes only and is not intended to be used as financial advice.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

bigdata_smart_batching-1.0.1.tar.gz (124.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

bigdata_smart_batching-1.0.1-py3-none-any.whl (34.3 kB view details)

Uploaded Python 3

File details

Details for the file bigdata_smart_batching-1.0.1.tar.gz.

File metadata

File hashes

Hashes for bigdata_smart_batching-1.0.1.tar.gz
Algorithm Hash digest
SHA256 3ebf50992c8dbf431bb603407b82f151bddfd37b932155e15019180e03bffbde
MD5 21cb3bc4b74a7337a8961569f50465ef
BLAKE2b-256 a1d33c2420d6280178fc17c887f5e6822553411aaa41d3e018d6194ff978e628

See more details on using hashes here.

File details

Details for the file bigdata_smart_batching-1.0.1-py3-none-any.whl.

File metadata

File hashes

Hashes for bigdata_smart_batching-1.0.1-py3-none-any.whl
Algorithm Hash digest
SHA256 2a91bca84eb96ea5d88b9c75d73817d29940641431058f5ef24df67d31b923c3
MD5 6211617cdd196b00b7f7577a94ba4716
BLAKE2b-256 d4f686f1bca327c4fe6683597d785f378ca756ae055296aed3036001106eee9e

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page