High-performance semantic search with intelligent company grouping and parallel execution
Project description
Smart Batching Search
A high-performance semantic search system that reduces API queries by 67-99% (varies by topic specificity) through intelligent company grouping and parallel execution.
This module provides a two-step pipeline for efficient semantic search:
- Planning -- organize the search via smart batching and return a chunk upper bound estimate.
- Execution -- run the plan with proportional sampling to preserve the result distribution.
Key Benefits
- 67-99% query reduction -- search 4,732 companies with only 17-3,699 queries (varies by topic).
- Parallel execution -- rate-limited concurrent requests with semaphore-controlled concurrency.
- Proportional sampling -- retrieve a percentage of results while preserving the distribution across baskets.
- Category filtering -- restrict searches to specific document categories (news, transcripts, filings, ...).
- Deterministic plans -- every request stays within service limits; plans can be saved and replayed.
- Scalable -- handles universes with 10,000+ companies efficiently.
Installation
Install the package from PyPI (Python 3.11+):
pip install bigdata-smart-batching
With uv:
uv add bigdata-smart-batching
Development
To work on this repository locally, from the project root:
uv sync
Environment Setup
Set up environment variables:
export BIGDATA_API_KEY="your_api_key_here"
export BIGDATA_API_BASE_URL="https://api.bigdata.com" # Optional, defaults to this
Or create a .env file:
BIGDATA_API_KEY=your_api_key_here
BIGDATA_API_BASE_URL=https://api.bigdata.com
Universe (CSV path or entity list)
plan_search() takes a universe argument: either a path to a UTF-8 CSV file, or a list[str] of entity IDs. IDs must match the identifiers used by the Bigdata API for your dataset.
When using a CSV, two layouts are supported:
1. Header row with an id column (optional extra columns such as name are ignored):
id,name
B8EF97,Example Corp A
BB07E4,Example Corp B
3461CF,Example Corp C
You can also pass a plain list instead of a CSV path:
plan = plan_search(
universe=["B8EF97", "BB07E4", "3461CF"],
start_date="2023-01-01",
end_date="2023-12-31",
text="earnings revenue profit",
)
Quick Start
from bigdata_smart_batching import (
plan_search,
execute_search,
deduplicate_documents,
convert_to_dataframe,
)
plan = plan_search(
universe="id_name_mapping_us_top_3000.csv",
start_date="2023-01-01",
end_date="2023-12-31",
text="earnings revenue profit",
api_key="your_api_key", # or set BIGDATA_API_KEY env var
)
print(f"Chunk upper bound estimate: {plan['chunk_upper_bound_estimate']:,}")
results_raw = execute_search(
search_plan=plan,
chunk_percentage=0.1,
requests_per_minute=100,
)
results = deduplicate_documents(results_raw)
print(f"Retrieved {len(results)} documents (deduplicated)")
df = convert_to_dataframe(results) # one row per chunk
Category Filtering
Restrict searches to a specific set of document categories using the category
argument. It accepts either a CategoryFilter dataclass or a plain dict.
from bigdata_smart_batching import CategoryFilter, CategoryMode, plan_search
plan = plan_search(
universe="id_name_mapping_us_top_3000.csv",
start_date="2023-01-01",
end_date="2023-12-31",
text="merger acquisition",
category=CategoryFilter(
mode=CategoryMode.INCLUDE,
values=("news_premium", "transcripts"),
),
)
# Equivalent dict form:
plan = plan_search(
universe="id_name_mapping_us_top_3000.csv",
start_date="2023-01-01",
end_date="2023-12-31",
text="merger acquisition",
category={"mode": "INCLUDE", "values": ["news_premium", "transcripts"]},
)
Valid category values are exposed via VALID_CATEGORY_VALUES and include:
expert_interviews, filings, my_files, news, news_premium,
news_public, podcasts, research, research_academic_journals,
research_investment_research, transcripts.
Save and Load Plans
from bigdata_smart_batching import plan_search, execute_search, save_plan, load_plan
plan = plan_search(
universe="id_name_mapping_us_top_3000.csv",
start_date="2023-01-01",
end_date="2023-12-31",
text="merger acquisition",
)
save_plan(plan, "my_search_plan.json")
plan = load_plan("my_search_plan.json")
raw_10 = execute_search(plan, chunk_percentage=0.1)
raw_50 = execute_search(plan, chunk_percentage=0.5)
How It Works
Architecture Overview
Step 1: PLANNING
Universe --> Co-mention volumes --> Entity groups (by volume) --> Sub-period split --> Search plan
Step 2: EXECUTION
Proportional sampling --> Parallel search (rate limited) --> Collect & aggregate
Planning (plan_search())
- Loads the universe of companies from a CSV path or inline list.
- Queries the co-mention endpoint to get chunk volumes per company (split into sub-ranges of up to 365 days, summed per company).
- Splits date ranges by volume when a company exceeds the chunk limit, honoring
min_period_days. - Sorts companies by chunk volume and greedy-packs them into entity groups under
MAX_CHUNKS_PER_BASKET(1000) andMAX_ENTITIES_IN_ANY_OF(500). - Classifies each group by
volume_bucket(high,medium,low,very_low). - Returns a plan with the chunk upper bound estimate and per-basket query configurations.
Execution (execute_search())
- Calculates proportional chunks per basket, with a minimum of 1 chunk per basket when
expected_chunks > 0. - Executes searches in parallel with sliding-window rate limiting and a concurrency semaphore.
- Optionally performs a second pass, removing already-retrieved entities from each basket and re-querying for the remainder.
- Enriches each chunk with
entity_idsandprimary_entity_id, and returns the raw documents.
API Reference
plan_search()
| Parameter | Type | Default | Description |
|---|---|---|---|
universe |
str | list[str] |
required | CSV path or list of entity IDs |
start_date |
str |
required | Start date (YYYY-MM-DD) |
end_date |
str |
required | End date (YYYY-MM-DD) |
text |
str | None |
None |
Search query text; omitted from all payloads when None |
api_key |
str | None |
env var | API key |
api_base_url |
str | None |
env var | API base URL |
volume_query_mode |
str |
"three_pass" |
"three_pass" or "iterative" |
max_iterations_per_batch |
int |
10 |
Max iterations per batch in "iterative" mode |
apply_volume_splits |
bool |
True |
Use volume time series for period splitting |
min_period_days |
int |
30 |
Minimum days per sub-period |
min_entities_per_basket |
int |
1 |
Lower bound on entities per basket |
category |
CategoryFilter | dict | None |
None |
Optional category filter (mode + values) |
content_diversification |
bool |
True |
Enable/disable content diversification in ranking |
execute_search()
| Parameter | Type | Default | Description |
|---|---|---|---|
search_plan |
dict |
required | Plan from plan_search() |
chunk_percentage |
float |
required | 0.0 to 1.0 sampling ratio |
requests_per_minute |
int |
100 |
Rate limit |
api_key |
str | None |
env var | API key |
api_base_url |
str | None |
env var | API base URL |
max_workers |
int |
40 |
Parallel workers |
overwrite_chunks_per_basket |
int | None |
None |
Override the proportional chunk allocation |
second_pass |
bool |
False |
Re-query each basket for entities not yet retrieved |
Helper Functions
deduplicate_documents(documents)-- merge duplicate documents byid, combining their chunks.convert_to_dataframe(raw_results)-- convert documents to a DataFrame with one row per chunk.resolve_universe(universe)-- load entity IDs from a CSV path or validate alist[str].load_universe_from_csv(csv_path)-- load entity IDs from a CSV only.save_plan(plan, path)/load_plan(path)-- persist plans as JSON.validate_category_filter(category)-- validate and normalize a category filter to the API dict shape.build_comention_payload(...),build_volume_payload(...),build_search_query(...)-- low-level payload builders used by the planner; useful if you need to construct custom requests.
Public Types
CategoryFilter-- frozen dataclass withmode: CategoryModeandvalues: tuple[str, ...].CategoryMode--StrEnumwithINCLUDEandEXCLUDEvalues.CategoryFilterInput-- type alias:CategoryFilter | dict[str, str | list[str]].UniverseInput-- type alias:str | list[str].VALID_CATEGORY_VALUES--frozenset[str]of allowed category strings.
Testing
# Run all tests
uv run pytest
# With coverage
uv run pytest --cov=bigdata_smart_batching --cov-report=term-missing
# Specific test file
uv run pytest tests/test_validation.py -v
Project Structure
bigdata-smart-batching/
├── pyproject.toml
├── README.md
├── CHANGELOG.md
├── LICENSE
├── Makefile
├── .python-version
├── src/
│ └── bigdata_smart_batching/
│ ├── __init__.py
│ ├── smart_batching.py
│ ├── smart_batching_config.py
│ ├── search_function.py
│ └── output_converter.py
├── examples/
│ └── example.py
└── tests/
├── __init__.py
├── test_config.py
├── test_output_converter.py
├── test_rate_limiter.py
└── test_validation.py
Configuration
Environment Variables
BIGDATA_API_KEY-- required; your Bigdata API key.BIGDATA_API_BASE_URL-- optional; API base URL (default:https://api.bigdata.com).
Default Settings
requests_per_minute: 100max_workers: 40MAX_CHUNKS_PER_BASKET: 1000MAX_ENTITIES_IN_ANY_OF: 500volume_query_mode:"three_pass"
Volume Buckets
Companies (and basket groups) are classified by total chunk volume. Ranges use
[lower, upper) semantics:
| Bucket | Range |
|---|---|
high |
500+ chunks |
medium |
100-499 chunks |
low |
1-99 chunks |
very_low |
0 chunks (headlines only) |
Ties are resolved using VOLUME_BUCKET_PRIORITY: high > medium > low > very_low.
Changelog
See CHANGELOG.md for release notes. The package is currently at version 1.0.1.
License
This project is part of Bigdata.com.
Disclaimer: This software is provided "as is" without warranty of any kind, express or implied. The authors and contributors assume no responsibility for the accuracy, completeness, or usefulness of any information, results, or processes provided. This software is for educational and research purposes only and is not intended to be used as financial advice.
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file bigdata_smart_batching-1.0.2.tar.gz.
File metadata
- Download URL: bigdata_smart_batching-1.0.2.tar.gz
- Upload date:
- Size: 125.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.7.15
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
facef6145e65130d6aa05031c440e8e50a8d98facbbed02b902a5085d599cc46
|
|
| MD5 |
f30763bdac6a17b560b686e5d45edb7b
|
|
| BLAKE2b-256 |
9f66b4d9fbce996a45843c0f6fbe05bbb8b93980981adba00395e3814b9ffff5
|
File details
Details for the file bigdata_smart_batching-1.0.2-py3-none-any.whl.
File metadata
- Download URL: bigdata_smart_batching-1.0.2-py3-none-any.whl
- Upload date:
- Size: 35.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.7.15
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
3a05fdb5dd11a3c21c45ca0f866610966136e6192ee87fa8426f9024eaeee3a7
|
|
| MD5 |
12f219e46a542e6b40adf3db443f01d1
|
|
| BLAKE2b-256 |
f8c98ef17d11b926ca044db387cffd0a98d1b0ec4eb3f3f794cf7adfbc268d7c
|