Base package to build indexing scripts for DataLinks
Project description
DataLinks Python SDK
The DataLinks Python SDK is designed to simplify data ingestion, normalization, linking, and querying processes with DataLinks. It integrates with the DataLinks API to provide a seamless development experience for managing data workflows, including entity resolution and inference steps, with robust configuration options.
This SDK is designed to accelerate the development of applications with DataLinks by wrapping the API integrations with a Pythonic interface, supporting flexible chaining of inference and validation steps.
Get started by installing the SDK and viewing the Quick Start guide.
Features
- Ingestion API: Easily ingest data into namespaces with built-in batching and retry mechanisms.
- Multipart Upload: Reliable large file ingestion via chunked S3 uploads with automatic abort on failure.
- Ingestion Tracking: Poll for async ingestion completion and progress after multipart uploads.
- Inference Workflow Management: Define custom chains of inference and validation steps.
- Entity Resolution: Match entities using configurable exact or geo-based matching methods.
- Namespace Management: Create and manage namespaces with privacy options.
- Data Querying: Query data with options to include/exclude metadata.
- Custom Loaders: Load custom data formats like JSON into defined workflows.
- CLI Tool: Standardized command-line interface for managing ingestion pipelines quickly.
Installation
To install the SDK, simply use pip or uv:
pip install datalinks
# or
uv add datalinks
If you want to install the package in editable development mode (includes pytest, tox, and twine):
- Clone the repository from your version-control system.
- Create a virtual environment with your tool of choice.
- Run the following:
pip install -e ".[dev]"
# or
uv pip install -e ".[dev]"
Quick Start
1. Install
Install the DataLinks SDK if you haven't already.
2. Configure
Ensure the required environment variables are set:
| Variable | Required | Description |
|---|---|---|
DL_HOST |
Yes | Base URL of the DataLinks API (e.g. https://api.datalinks.com) |
DL_API_KEY |
Yes | Your DataLinks API key (JWT token) |
DL_NAMESPACE |
Yes | Target namespace for ingest and query operations |
OBJECT_NAME |
No | Object name scoping for ingest/query URLs; defaults to empty |
If you use the Ingest Proxy (IngestProxyAPI) for auto-modelled pipelines, set these instead:
| Variable | Required | Description |
|---|---|---|
DL_INGEST_PROXY |
Yes | Base URL of the ingest proxy (e.g. https://your-proxy.vercel.app) |
DL_API_KEY |
Yes | Your DataLinks API key (shared with the main API) |
DL_USERNAME |
Yes | Your DataLinks username |
DL_NAMESPACE |
Yes | Target namespace |
Alternatively, you can use a
.envfile in the root of your project.
3. Ingest and query
from datalinks.api import DataLinksAPI, DLConfig
# Initialize configuration
config = DLConfig.from_env()
# Instantiate API client
client = DataLinksAPI(config=config)
# Sample rows
rows = [
{"brand": "Gilette", "category": "Razor", "product": "Heated Razor"},
{"brand": "Oral-B", "category": "Electric Toothbrush", "product": "iO Series 10"},
]
# Ingest pre-structured rows directly (no pipeline required)
result = client.ingest(data=rows)
# Query data
data = client.query_data(include_metadata=False)
print(data)
CLI Usage
The SDK also provides a built-in CLI that can be extended:
datalinks-client [-h] --verbose <input-folder>
Components
1. DLConfig
DLConfig reads configurations (e.g., API keys) via environment variables or .env files. This enables dynamic adaptation across deployment environments.
2. DataLinksAPI
DataLinksAPI handles interactions with the API. You can:
- Ingest data directly or via multipart upload for large files.
- Track and wait for async ingestion completion.
- Query or retrieve data with complex parameters.
- Manage namespaces.
3. Inference Workflow
Use a chain of inference and validation steps defined through classes like ProcessUnstructured, Normalize, and Validate to automate data preparation workflows.
from datalinks.pipeline import Pipeline, ProcessUnstructured, Normalize, Validate, ValidateModes
# Define an inference pipeline
inference_steps = Pipeline(
ProcessUnstructured(derive_from="source_field", helper_prompt="This extracts tables."),
Normalize(target_cols={"email": "email_address"}, mode="all-in-one"),
Validate(mode=ValidateModes.FIELDS, columns=["email", "phone"]),
)
4. Entity Resolution
Supports multiple resolution strategies, configurable via MatchTypeConfig:
from datalinks.links import MatchTypeConfig, ExactMatch
entity_resolution = MatchTypeConfig(
# parameters are optional
exact_match=ExactMatch(minVariation=0.2, minDistinct=0.3)
)
5. Loaders
Abstract base loaders (e.g., JSONLoader) allow seamless data ingestion from custom file formats like .json.
6. Parametrize LLMs
You can choose the model and provider to be used in inference steps (eg.: ProcessUnstructured, Normalize, Validate).
from datalinks.pipeline import Pipeline, ProcessUnstructured
steps = Pipeline(
ProcessUnstructured(
derive_from="text",
helper_prompt="If you find a numeric field use only the value and omit the rest.",
model="gpt-4.1-nano-2025-04-14",
provider="openai"
)
)
Examples
The following examples demonstrate end-to-end usage of the DataLinks Python SDK across a range of common scenarios. Each example is self-contained and can be run directly after configuring your environment variables.
Direct Ingestion
Demonstrates the simplest possible ingestion flow: load pre-structured rows from a JSON file and push them to DataLinks without any inference pipeline or entity resolution. Use this pattern when your data is already in a clean, tabular format and requires no AI-assisted transformation.
Components covered: DLConfig, DataLinksAPI, create_space, ingest, query_data
"""
Direct ingestion example using data/pgproducts.json.
Ingests pre-structured rows directly without any inference pipeline or
entity resolution — the data is stored as-is.
"""
import json
import logging
from pprint import pformat
import datalinks
from datalinks.api import DLConfig
def main():
logging.basicConfig(level=logging.INFO)
dl_config = DLConfig.from_env()
dl_config.namespace = "pg"
dl_config.objectname = "products_direct"
dlapi = datalinks.api.DataLinksAPI(dl_config)
dlapi.create_space(is_private=True)
jsonfile = "data/pgproducts.json"
logging.info(f"Loading {jsonfile}")
with open(jsonfile) as f:
rows = json.load(f)["rows"]
logging.info(f"Ingesting {len(rows)} rows")
result = dlapi.ingest(data=rows)
logging.info(
f"Ingestion result: {len(result.successful)} succeeded, "
f"{len(result.failed)} failed"
)
data = dlapi.query_data()
logging.info(f"Ingested data:\n{pformat(data)}")
if __name__ == "__main__":
main()
JSON Ingestion with Pipeline and Entity Resolution
Demonstrates how to ingest structured JSON data using a ProcessStructured pipeline step and ExactMatch entity resolution. The pipeline instructs DataLinks to derive tabular rows from the JSON "rows" key, and entity resolution deduplicates records by exact field matching.
Components covered: DLConfig, DataLinksAPI, Pipeline, ProcessStructured, MatchTypeConfig, ExactMatch, ingest, query_data
import json
import logging
from pprint import pformat
import datalinks
from datalinks.api import DLConfig
from datalinks.links import EntityResolutionTypes, MatchTypeConfig, ExactMatch
from datalinks.pipeline import Pipeline, ProcessStructured
def main():
logging.basicConfig(level=logging.INFO)
dl_config = DLConfig.from_env()
# we did not set namespace and object because it varies with each example
dl_config.namespace = "pg"
dl_config.objectname = "products"
# OR
#dl_config = DLConfig(
# host="http://localhost:9001",
# apikey="", # your DataLinks API key
# index="tests",
# namespace="pg",
# objectname="products"
#)
dlapi = datalinks.api.DataLinksAPI(dl_config)
dlapi.create_space(is_private=True) # default
jsonfile = "data/pgproducts.json"
logging.info(f"Loading json data in {jsonfile}")
with open(jsonfile) as f:
data = json.load(f)
steps = Pipeline(
ProcessStructured(derive_from="rows") # Data is already tabular
)
entity_resolution = MatchTypeConfig(ExactMatch())
result = dlapi.ingest(
data = [data], # supports multiple files
inference_steps=steps,
entity_resolution=entity_resolution,
batch_size=0 # default (no file batching)
)
logging.info(f"Ingestion result:"
f"\nSuccessfully ingested {len(result.successful)} dataset(s)."
f"\nFailed {len(result.failed)} dataset(s).")
data = dlapi.query_data(
model="gpt-4.1-nano-2025-04-14",
provider="openai"
)
logging.info(f"Ingested data:"
f"{pformat(data)}")
if __name__ == '__main__':
main()
Tabular Inference from Unstructured Text
Demonstrates a full AI-powered inference pipeline that transforms raw unstructured text into a structured table. The three-step pipeline uses ProcessUnstructured to extract an initial table from free-form text, Normalize to map columns to a target schema, and Validate to verify row integrity — all powered by an LLM.
Components covered: DLConfig, DataLinksAPI, Pipeline, ProcessUnstructured, Normalize, NormalizeModes, Validate, ValidateModes, MatchTypeConfig, ExactMatch, ingest, query_data
import logging
from pprint import pformat
import datalinks
from datalinks.api import DLConfig
from datalinks.links import MatchTypeConfig, ExactMatch
from datalinks.pipeline import Pipeline, ProcessUnstructured, Normalize, NormalizeModes, Validate, ValidateModes
def main():
logging.basicConfig(level=logging.DEBUG)
dl_config = DLConfig.from_env()
# we did not set namespace and object because it varies with each example
dl_config.namespace = "cinema"
dl_config.objectname = "awards"
# OR
# dl_config = DLConfig(
# host="http://localhost:9001",
# apikey="", # your DataLinks API key
# index="tests",
# namespace="pg",
# objectname="products"
# )
dlapi = datalinks.api.DataLinksAPI(dl_config)
dlapi.create_space(is_private=True) # default
textfile = "data/movies.txt"
logging.info(f"Loading text in {textfile}")
with open(textfile) as f:
data = {"text": f.read()}
steps = Pipeline(
ProcessUnstructured(
derive_from="text",
helper_prompt="If you find a numeric field use only the value and omit the rest.",
model="gpt-4.1-mini-2025-04-14",
provider="openai"
), # Infer table from unstructured text
Normalize(
target_cols={
"Name": "the actor/actress name",
"Titles": "the list of notable films where the actor was in",
"Oscars": "the number of oscars won"
},
mode=NormalizeModes.ALL_IN_ONE,
model="gpt-4.1-mini-2025-04-14",
provider="openai"
),
Validate(
mode=ValidateModes.ROWS,
columns=["Name", "Titles", "Oscars"],
model="gpt-4.1-mini-2025-04-14",
provider="openai"
)
)
entity_resolution = MatchTypeConfig(ExactMatch())
result = dlapi.ingest(
data = [data], # supports multiple files
inference_steps=steps,
entity_resolution=entity_resolution,
max_attempts=1,
batch_size=0 # default (no file batching)
)
logging.info(f"Ingestion result:"
f"\nSuccessfully ingested {len(result.successful)} dataset(s)."
f"\nFailed {len(result.failed)} dataset(s).")
data = dlapi.query_data(
model="gpt-4.1-mini-2025-04-14",
provider="openai",
include_metadata=True
)
logging.info(f"Ingested data:\n"
f"{pformat(data)}")
if __name__ == '__main__':
main()
Multipart Upload
Demonstrates how to upload large files to DataLinks using the multipart upload API. The three-phase flow — prepare, upload, finish — streams the file in chunks directly to presigned S3 URLs, avoiding memory constraints for large datasets. If any part fails, the upload session is aborted to free server-side resources.
Components covered: DLConfig, DataLinksAPI, prepare_multipart_upload, finish_multipart_upload, wait_for_ingestion, abort_multipart_upload
"""
Multipart upload example using data/pgproducts_mp.json (~10 MB, 2 parts).
Multipart upload is the recommended approach for large files. The flow is:
1. Prepare — DataLinks allocates an upload session and returns presigned S3 URLs
and the server-side partSize to use when splitting the file.
2. Upload — Each file chunk (sized to partSize) is PUT directly to its presigned
URL; S3 returns an ETag per part.
3. Finish — DataLinks assembles the parts and triggers ingestion.
If anything goes wrong during upload, abort is called to clean up the partial upload.
"""
import logging
import os
import requests
import datalinks
from datalinks.api import DLConfig
def upload_multipart(filepath: str):
dl_config = DLConfig.from_env()
dl_config.namespace = "pg_multipart"
dl_config.objectname = "products"
dlapi = datalinks.api.DataLinksAPI(dl_config)
dlapi.create_space(is_private=True)
filename = os.path.basename(filepath)
size = os.path.getsize(filepath)
logging.info(f"Preparing multipart upload for '{filename}' ({size:,} bytes)")
prepare = dlapi.prepare_multipart_upload(filename, size)
upload_id = prepare["uploadId"]
key = prepare["key"]
part_size = prepare["partSize"]
presigned_urls = [entry["url"] for entry in prepare["presignedUrls"]]
logging.info(
f"Upload session ready: {len(presigned_urls)} part(s) of {part_size:,} bytes, "
f"uploadId={upload_id}"
)
parts = []
part_num = 1
try:
with open(filepath, "rb") as f:
for part_num, url in enumerate(presigned_urls, start=1):
chunk = f.read(part_size)
if not chunk:
break
logging.info(
f"Uploading part {part_num}/{len(presigned_urls)} ({len(chunk):,} bytes)"
)
response = requests.put(url, data=chunk)
response.raise_for_status()
etag = response.headers["ETag"].strip('"')
parts.append({"partNumber": part_num, "etag": etag})
logging.info(f"Part {part_num} uploaded, ETag={etag}")
except Exception as e:
logging.error(f"Upload failed on part {part_num}: {e} — aborting")
dlapi.abort_multipart_upload(upload_id, key)
raise
logging.info("All parts uploaded, finishing ingestion")
result = dlapi.finish_multipart_upload(upload_id, key, parts, name=filename)
ingestion_id = result["id"]
logging.info(f"Ingestion queued: id={ingestion_id}")
final = dlapi.wait_for_ingestion(ingestion_id)
logging.info(
f"Ingestion finished: status={final.get('status')!r}, "
f"rows={final.get('processedRows')}, message={final.get('statusMessage')!r}"
)
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
upload_multipart("data/pgproducts_mp.json")
Interactive Assistant
Demonstrates how to build an interactive command-line assistant that answers natural language questions about your data using the ask streaming API. The example handles each AskEvent type — plan, step, answer, and error — and renders responses using the rich library for a polished terminal experience.
Components covered: DLConfig, DataLinksAPI, AskEvent, ask (streaming)
# NOTE:
# This example creates an interactive CLI to ask natural language questions about your data.
import datalinks
from datalinks.api import DLConfig, AskEvent
from rich.console import Console
from rich.markdown import Markdown
from rich.text import Text
console = Console()
def handle_event(event: AskEvent) -> None:
if event.type == "plan":
steps = event.data.get("steps", [])
console.print()
for step in steps:
console.print(f" {step}", style="dim")
elif event.type == "step":
idx = event.data.get("index", 0)
reasoning = event.data.get("reasoning", "")
query = event.data.get("query", "")
data = event.data.get("data", [])
console.print(f" [{idx + 1}] {reasoning}", style="dim")
if query:
console.print(f" query: {query}", style="dim italic")
if data:
console.print(f" → {len(data)} record(s) retrieved", style="dim")
elif event.type == "answer":
response = event.data.get("response", "")
console.print()
console.print(Markdown(response))
console.print()
elif event.type == "error":
message = event.data.get("message", "Unknown error")
console.print(f"\nError: {message}\n", style="bold red")
def ask_loop(dlapi: datalinks.api.DataLinksAPI, namespace: str) -> None:
console.print(Text.assemble(("DataLinks Assistant", "bold cyan"), " ", (f"({namespace})", "dim")))
console.print("Type your question and press Enter. Type 'exit' to quit.\n", style="dim")
while True:
try:
question = console.input("[bold yellow]You:[/bold yellow] ").strip()
except (EOFError, KeyboardInterrupt):
console.print()
break
if not question:
continue
if question.lower() == "exit":
break
console.print("[bold cyan]DataLinks:[/bold cyan]", end=" ")
for event in dlapi.ask(question):
handle_event(event)
def main() -> None:
dl_config = DLConfig.from_env()
if dl_config.namespace == "namespace-notset":
try:
dl_config.namespace = console.input("Namespace: ").strip()
except (EOFError, KeyboardInterrupt):
console.print()
return
dl_config.objectname = ""
dlapi = datalinks.api.DataLinksAPI(dl_config)
ask_loop(dlapi, dl_config.namespace)
if __name__ == "__main__":
main()
GitHub Tickets Ingestion with Auto-Modelled Ontology (Vercel)
Demonstrates how to use the IngestProxyAPI to ingest raw GitHub tickets and pull requests and let DataLinks automatically model an ontology from them. A natural-language prompt describes the desired schema — tables for tickets, contributors, triage insights, and applied technologies — and the pipeline surfaces that structure without any manual mapping. Events from the streaming pipeline run are logged as they arrive. The client reconnects automatically if the stream drops, resuming from the last received event.
Components covered: IngestProxyConfig, IngestProxyAPI, run_pipeline
Required env vars: DL_INGEST_PROXY, DL_API_KEY, DL_USERNAME, DL_NAMESPACE
import json
import logging
from datalinks.api import IngestProxyAPI, IngestProxyConfig
USER_PROMPT = """
## What we are doing
We are building an assistant to help with the coordinations tasks of a development team. It is effectively a combination of multiple agents that collaborate on a shared memory pool to perform their tasks, these agents include:
- An agent that helps assign tickets and pull requests to the correct developer for implementation or investigation
- An agent that helps to estimate effort of specific tasks
- An agent that does duplicate detection
- An agent that helps generating prompts for Claude Code to implement whenever the ticket is estimated to be of acceptable complexity and size
- A workflow that ingests new tickets and and updates existing tickets
## What are we working with
We're feeding all the data to DataLinks automodeler (note: that's you, and thank you for the help!), and we're expecting an ontology to surface automatically. We're submitting raw GitHub tickets and pull requests.
## What we want
We want to design and populate an ontology for agents to write and read from, it should have at least the following tables:
1. **Tickets**: Core ticket or PR identity — ticket_id, title, body, labels, state, lock status, reporter, ticket_url.
2. **Contributors**: People/orgs involved (reporter, assignees, closers). Track roles and contributor ids and usernames.
3. **Triage**: Triage insights — inferred priority (Low/Med/High), effort size (Low/Med/High), technology stack, risk level, labels, and any other useful signals.
4. **Applied Technologies** - What are the dependencies and technologies a specific ticket has. A single ticket may have multiple technologies or frameworks, we want them to be represented here.
5. **Additional tables** as needed, we expect the ontology to evolve as new data comes in.
Rules:
- Use identifiers always as string, never as numbers.
- Always include the available timestamps
"""
def main():
logging.basicConfig(level=logging.INFO)
config = IngestProxyConfig.from_env()
config.namespace = "vercel"
proxy = IngestProxyAPI(config)
with open("data/vercel100.json") as f:
data = json.load(f)
logging.info(f"Starting pipeline run for {len(data)} records")
with proxy.run_pipeline(
data=data,
user_prompt=USER_PROMPT,
) as run:
logging.info(f"Run ID: {run.run_id}")
for event in run:
logging.info(f"Event: {event}")
logging.info("Pipeline run complete")
if __name__ == "__main__":
main()
Run Unit Tests
Run tests to verify your implementation:
tox
License
DataLinks Python SDK is licensed under the MIT License. See the LICENSE file for more details.
Support
For questions or support, please contact us.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distributions
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file datalinks-1.2.8-py3-none-any.whl.
File metadata
- Download URL: datalinks-1.2.8-py3-none-any.whl
- Upload date:
- Size: 30.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.15
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
fe7580a5dd419fe266c771e95ca2d0837c44e90e9456d44ef959e01bb1e7d520
|
|
| MD5 |
38525e9da7cc109d91c982e5c8f16bfd
|
|
| BLAKE2b-256 |
10b9423c870e5c16e8c1c3cc98ee9be71bdbb9ce7e744572d8cbd8c28433cab6
|