Skip to main content

High-performance semantic search with intelligent company grouping and parallel execution

Project description

Smart Batching Search

A high-performance semantic search system that reduces API queries by 67-99% (varies by topic specificity) through intelligent company grouping and parallel execution.

This module provides a two-step pipeline for efficient semantic search:

  1. Planning -- organize the search via smart batching and return a chunk upper bound estimate.
  2. Execution -- run the plan with proportional sampling to preserve the result distribution.

Key Benefits

  • 67-99% query reduction -- search 4,732 companies with only 17-3,699 queries (varies by topic).
  • Parallel execution -- rate-limited concurrent requests with semaphore-controlled concurrency.
  • Proportional sampling -- retrieve a percentage of results while preserving the distribution across baskets.
  • Category filtering -- restrict searches to specific document categories (news, transcripts, filings, ...).
  • Source filtering -- include or exclude documents by source identifiers (INCLUDE / EXCLUDE plus a list of source IDs).
  • Sentiment filtering -- restrict searches to one or more inclusive document-level sentiment ranges.
  • Throughput controls -- shared sliding-window rate limiter and concurrency semaphore across all planning HTTP work, with shared exponential backoff on 429 / 403 / timeout retries.
  • Deterministic plans -- every request stays within service limits; plans can be saved and replayed.
  • Scalable -- handles universes with 10,000+ companies efficiently.

Installation

Install the package from PyPI (Python 3.11+):

pip install bigdata-smart-batching

With uv:

uv add bigdata-smart-batching

Development

To work on this repository locally, from the project root:

uv sync

Environment Setup

Set up environment variables:

export BIGDATA_API_KEY="your_api_key_here"
export BIGDATA_API_BASE_URL="https://api.bigdata.com"  # Optional, defaults to this

Or create a .env file:

BIGDATA_API_KEY=your_api_key_here
BIGDATA_API_BASE_URL=https://api.bigdata.com

Universe (CSV path or entity list)

plan_search() takes a universe argument: either a path to a UTF-8 CSV file, or a list[str] of entity IDs. IDs must match the identifiers used by the Bigdata API for your dataset.

When using a CSV, two layouts are supported:

1. Header row with an id column (optional extra columns such as name are ignored):

id,name
B8EF97,Example Corp A
BB07E4,Example Corp B
3461CF,Example Corp C

You can also pass a plain list instead of a CSV path:

plan = plan_search(
    universe=["B8EF97", "BB07E4", "3461CF"],
    start_date="2023-01-01",
    end_date="2023-12-31",
    text="earnings revenue profit",
)

Quick Start

from bigdata_smart_batching import (
    plan_search,
    execute_search,
    deduplicate_documents,
    convert_to_dataframe,
)

plan = plan_search(
    universe="id_name_mapping_us_top_3000.csv",
    start_date="2023-01-01",
    end_date="2023-12-31",
    text="earnings revenue profit",
    api_key="your_api_key",  # or set BIGDATA_API_KEY env var
)

print(f"Chunk upper bound estimate: {plan['chunk_upper_bound_estimate']:,}")

results_raw = execute_search(
    search_plan=plan,
    chunk_percentage=0.1,
    requests_per_minute=100,
)

results = deduplicate_documents(results_raw)
print(f"Retrieved {len(results)} documents (deduplicated)")

df = convert_to_dataframe(results)  # one row per chunk

Category Filtering

Restrict searches to a specific set of document categories using the category argument. It accepts either a CategoryFilter dataclass or a plain dict.

from bigdata_smart_batching import CategoryFilter, CategoryMode, plan_search

plan = plan_search(
    universe="id_name_mapping_us_top_3000.csv",
    start_date="2023-01-01",
    end_date="2023-12-31",
    text="merger acquisition",
    category=CategoryFilter(
        mode=CategoryMode.INCLUDE,
        values=("news_premium", "transcripts"),
    ),
)

# Equivalent dict form:
plan = plan_search(
    universe="id_name_mapping_us_top_3000.csv",
    start_date="2023-01-01",
    end_date="2023-12-31",
    text="merger acquisition",
    category={"mode": "INCLUDE", "values": ["news_premium", "transcripts"]},
)

Valid category values are exposed via VALID_CATEGORY_VALUES and include: expert_interviews, filings, my_files, news, news_premium, news_public, podcasts, research, research_academic_journals, research_investment_research, transcripts.

Source Filtering

Restrict searches by source (document source IDs) using the source argument. It accepts either a SourceFilter dataclass or a plain dict with mode (INCLUDE or EXCLUDE) and values (a non-empty list of source ID strings). The filter is serialized to query.filters.source on co-mention, volume, and search payloads during planning.

from bigdata_smart_batching import SourceFilter, SourceMode, plan_search

plan = plan_search(
    universe="id_name_mapping_us_top_3000.csv",
    start_date="2023-01-01",
    end_date="2023-12-31",
    text="merger acquisition",
    source=SourceFilter(
        mode=SourceMode.INCLUDE,
        values=("source-id-1", "source-id-2"),
    ),
)

# Equivalent dict form:
plan = plan_search(
    universe="id_name_mapping_us_top_3000.csv",
    start_date="2023-01-01",
    end_date="2023-12-31",
    text="merger acquisition",
    source={"mode": "EXCLUDE", "values": ["source-id-to-drop"]},
)

SourceMode is the same INCLUDE / EXCLUDE enum as CategoryMode. String values are trimmed; empty strings are rejected. Use validate_source_filter(source) to validate and normalize input into the API dict shape.

Sentiment Filtering

Restrict searches to documents whose sentiment falls inside one or more inclusive min / max ranges using the sentiment argument. It accepts a single SentimentRange / dict, or a list of them (multiple disjoint ranges). The filter is emitted as query.filters.sentiment.ranges on co-mention, volume, and search payloads.

from bigdata_smart_batching import SentimentRange, plan_search

plan = plan_search(
    universe="id_name_mapping_us_top_3000.csv",
    start_date="2023-01-01",
    end_date="2023-12-31",
    text="earnings revenue profit",
    sentiment=SentimentRange(min=0.3, max=1.0),
)

plan = plan_search(
    universe="id_name_mapping_us_top_3000.csv",
    start_date="2023-01-01",
    end_date="2023-12-31",
    text="earnings revenue profit",
    sentiment=[
        {"min": -1.0, "max": -0.5},
        {"min": 0.5, "max": 1.0},
    ],
)

Each range must satisfy min <= max. Use validate_sentiment_range(sentiment) to validate and normalize input into the list-of-dicts shape the API expects.

Save and Load Plans

from bigdata_smart_batching import plan_search, execute_search, save_plan, load_plan

plan = plan_search(
    universe="id_name_mapping_us_top_3000.csv",
    start_date="2023-01-01",
    end_date="2023-12-31",
    text="merger acquisition",
)
save_plan(plan, "my_search_plan.json")

plan = load_plan("my_search_plan.json")
raw_10 = execute_search(plan, chunk_percentage=0.1)
raw_50 = execute_search(plan, chunk_percentage=0.5)

How It Works

Architecture Overview

Step 1: PLANNING
  Universe  -->  Co-mention volumes  -->  Entity groups (by volume)  -->  Sub-period split  -->  Search plan

Step 2: EXECUTION
  Proportional sampling  -->  Parallel search (rate limited)  -->  Collect & aggregate

Planning (plan_search())

  1. Loads the universe of companies from a CSV path or inline list.
  2. Queries the co-mention endpoint to get chunk volumes per company (split into sub-ranges of up to 365 days, summed per company).
  3. Splits date ranges by volume when a company exceeds the chunk limit, honoring min_period_days.
  4. Sorts companies by chunk volume and greedy-packs them into entity groups under MAX_CHUNKS_PER_BASKET (1000) and MAX_ENTITIES_IN_ANY_OF (500).
  5. Classifies each group by volume_bucket (high, medium, low, very_low).
  6. Returns a plan with the chunk upper bound estimate and per-basket query configurations.

Execution (execute_search())

  1. Calculates proportional chunks per basket, with a minimum of 1 chunk per basket when expected_chunks > 0.
  2. Executes searches in parallel with sliding-window rate limiting and a concurrency semaphore.
  3. Optionally performs a second pass, removing already-retrieved entities from each basket and re-querying for the remainder.
  4. Enriches each chunk with entity_ids and primary_entity_id, and returns the raw documents.

API Reference

plan_search()

Parameter Type Default Description
universe str | list[str] required CSV path or list of entity IDs
start_date str required Start date (YYYY-MM-DD)
end_date str required End date (YYYY-MM-DD)
text str | None None Search query text; omitted from all payloads when None
api_key str | None env var API key
api_base_url str | None env var API base URL
volume_query_mode str "three_pass" "three_pass" or "iterative"
max_iterations_per_batch int 10 Max iterations per batch in "iterative" mode
apply_volume_splits bool True Use volume time series for period splitting
min_period_days int 30 Minimum days per sub-period
min_entities_per_basket int 1 Lower bound on entities per basket
category CategoryFilter | dict | None None Optional category filter (mode + values)
source SourceFilter | dict | None None Optional source filter by source IDs (mode + values)
sentiment SentimentRange | dict | list | None None Optional sentiment filter: one inclusive {min, max} range or a list of disjoint ranges
content_diversification bool True Enable/disable content diversification in ranking
requests_per_minute int 100 Global rate limit shared across all planning HTTP work (co-mention + volume)
max_workers int 40 Maximum concurrent in-flight planning requests (shared semaphore)

execute_search()

Parameter Type Default Description
search_plan dict required Plan from plan_search()
chunk_percentage float required 0.0 to 1.0 sampling ratio
requests_per_minute int 100 Rate limit
api_key str | None env var API key
api_base_url str | None env var API base URL
max_workers int 40 Parallel workers
overwrite_chunks_per_basket int | None None Override the proportional chunk allocation
second_pass bool False Re-query each basket for entities not yet retrieved
basket_filtered_entities bool False When True, each chunk's entity_ids is filtered to only include IDs present in the basket's query.filters.entity.any_of list; entity IDs are always deduplicated

Helper Functions

  • deduplicate_documents(documents) -- merge duplicate documents by id, combining their chunks.
  • convert_to_dataframe(raw_results) -- convert documents to a DataFrame with one row per chunk.
  • resolve_universe(universe) -- load entity IDs from a CSV path or validate a list[str].
  • load_universe_from_csv(csv_path) -- load entity IDs from a CSV only.
  • save_plan(plan, path) / load_plan(path) -- persist plans as JSON.
  • validate_category_filter(category) -- validate and normalize a category filter to the API dict shape.
  • validate_source_filter(source) -- validate and normalize a source filter to the API dict shape (filters.source).
  • validate_sentiment_range(sentiment) -- validate and normalize sentiment range(s) into the list-of-{min, max} dicts emitted as filters.sentiment.ranges.
  • build_comention_payload(...), build_volume_payload(...), build_search_query(...) -- low-level payload builders used by the planner; each accepts optional category_filter, source_filter, and sentiment_filter alongside entities and dates. Useful if you need to construct custom requests.

Public Types

  • CategoryFilter -- frozen dataclass with mode: CategoryMode and values: tuple[str, ...].
  • CategoryMode -- StrEnum with INCLUDE and EXCLUDE values.
  • CategoryFilterInput -- type alias: CategoryFilter | dict[str, str | list[str]].
  • SourceFilter -- frozen dataclass with mode: SourceMode and values: tuple[str, ...] (source IDs).
  • SourceMode -- type alias of CategoryMode (INCLUDE, EXCLUDE).
  • SourceFilterInput -- type alias: SourceFilter | dict[str, str | list[str]].
  • SentimentRange -- frozen dataclass with inclusive min: float and max: float.
  • SentimentRangeInput -- type alias for one SentimentRange / {min, max} dict, or a non-empty list of either.
  • UniverseInput -- type alias: str | list[str].
  • VALID_CATEGORY_VALUES -- frozenset[str] of allowed category strings.

Testing

# Run all tests
uv run pytest

# With coverage
uv run pytest --cov=bigdata_smart_batching --cov-report=term-missing

# Specific test file
uv run pytest tests/test_validation.py -v

Project Structure

bigdata-smart-batching/
├── pyproject.toml
├── README.md
├── CHANGELOG.md
├── LICENSE
├── Makefile
├── .python-version
├── src/
│   └── bigdata_smart_batching/
│       ├── __init__.py
│       ├── smart_batching.py
│       ├── smart_batching_config.py
│       ├── search_function.py
│       └── output_converter.py
├── examples/
│   └── example.py
└── tests/
    ├── __init__.py
    ├── test_config.py
    ├── test_output_converter.py
    ├── test_rate_limiter.py
    └── test_validation.py

Configuration

Environment Variables

  • BIGDATA_API_KEY -- required; your Bigdata API key.
  • BIGDATA_API_BASE_URL -- optional; API base URL (default: https://api.bigdata.com).

Default Settings

  • requests_per_minute: 100
  • max_workers: 40
  • MAX_CHUNKS_PER_BASKET: 1000
  • MAX_ENTITIES_IN_ANY_OF: 500
  • volume_query_mode: "three_pass"

Volume Buckets

Companies (and basket groups) are classified by total chunk volume. Ranges use [lower, upper) semantics:

Bucket Range
high 500+ chunks
medium 100-499 chunks
low 1-99 chunks
very_low 0 chunks (headlines only)

Ties are resolved using VOLUME_BUCKET_PRIORITY: high > medium > low > very_low.

Changelog

See CHANGELOG.md for release notes. The package is currently at version 1.4.0.

License

This project is part of Bigdata.com.

Disclaimer: This software is provided "as is" without warranty of any kind, express or implied. The authors and contributors assume no responsibility for the accuracy, completeness, or usefulness of any information, results, or processes provided. This software is for educational and research purposes only and is not intended to be used as financial advice.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

bigdata_smart_batching-1.4.1.tar.gz (133.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

bigdata_smart_batching-1.4.1-py3-none-any.whl (40.1 kB view details)

Uploaded Python 3

File details

Details for the file bigdata_smart_batching-1.4.1.tar.gz.

File metadata

File hashes

Hashes for bigdata_smart_batching-1.4.1.tar.gz
Algorithm Hash digest
SHA256 16a3d34d30ca0daf62d4c44c8d4921f2c6d8163cb386f39bf850999dd2c07d4a
MD5 eade8d6c69af6d94090e94577382a29d
BLAKE2b-256 5c581caa6c70fd613a9396a394ba12337ceaf63c15f81f88bf6aad5289bbfd95

See more details on using hashes here.

File details

Details for the file bigdata_smart_batching-1.4.1-py3-none-any.whl.

File metadata

File hashes

Hashes for bigdata_smart_batching-1.4.1-py3-none-any.whl
Algorithm Hash digest
SHA256 1a09dae2c20aea7346fc7ac94f239ce2f4d7f222e9ff70832e056603c6c7d860
MD5 cb8d2652e96b1f34836295ff0c32df12
BLAKE2b-256 b6ed4e7ae1c8ed5c808d319db36298dbd08059663e5f5f5c02abb4288fef91fd

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page