SDK for building custom Glean indexing integrations
Project description
Glean Indexing SDK
A Python SDK for building custom Glean indexing integrations. This package provides the base classes and utilities to create custom connectors for Glean's indexing APIs.
[!WARNING] This is an experimental repository. APIs, interfaces, and functionality may change significantly without notice.
Installation
pip install glean-indexing-sdk
Architecture Overview
The Glean Indexing SDK follows a simple, predictable pattern for all connector types. Understanding this flow will help you implement any connector quickly:
sequenceDiagram
participant User
participant Connector as "Connector<br/>(BaseDatasourceConnector<br/>or BasePeopleConnector)"
participant DataClient as "DataClient<br/>(BaseConnectorDataClient<br/>or StreamingConnectorDataClient)"
participant External as "External System<br/>(API/Database)"
participant Glean as "Glean API"
User->>+Connector: 1. connector.index_data()<br/>or connector.index_people()
Connector->>+DataClient: 2. get_source_data()
DataClient->>+External: 3. Fetch data
External-->>-DataClient: Raw source data
DataClient-->>-Connector: Typed source data
Connector->>Connector: 4. transform() or<br/>transform_people()
Note over Connector: Transform to<br/>DocumentDefinition or<br/>EmployeeInfoDefinition
Connector->>+Glean: 5. Batch upload documents<br/>or employee data
Glean-->>-Connector: Upload response
Connector-->>-User: Indexing complete
Key Components:
- DataClient - Fetches raw data from your external system (API, database, files, etc.)
- Connector - Transforms your data into Glean's format and handles the upload process
Datasource Connectors
Use datasource connectors to index documents, files, and content from external systems into Glean. This is the most common use case.
Datasource Quickstart
Environment Setup
-
Set up environment variables for Glean API access:
# Copy the environment template cp env.template .env # Set your Glean credentials export GLEAN_INSTANCE="acme" export GLEAN_INDEXING_API_TOKEN="your-indexing-api-token"
[!TIP] Choose the right connector type:
BaseDatasourceConnector - For most use cases where all data can fit comfortably in memory and your API can return all data efficiently in one call. BaseStreamingDatasourceConnector - For very large datasets, memory-constrained environments, or when your API requires incremental/paginated access. Single Document Indexing - For real-time updates of individual documents
BaseDatasourceConnector
When to Use This
Perfect for
- Document repositories where all data can fit comfortably in memory
- Wikis, knowledge bases, documentation sites
- File systems with moderate amounts of content
- Systems where you can fetch all data in memory at once
- Documents that cannot be fetched via paginated APIs
Avoid when
- You have very large datasets that cannot fit in memory
- Documents are very large (> 10MB each)
- Memory usage is a concern
Step-by-Step Implementation
Step 1: Define Your Data Type
from typing import List, TypedDict
class WikiPageData(TypedDict):
"""Type definition for your source data structure."""
id: str
title: str
content: str
author: str
created_at: str
updated_at: str
url: str
tags: List[str]
Step 2: Create Your DataClient
from typing import Sequence
from glean.indexing.connectors.base_data_client import BaseConnectorDataClient
from .wiki_page_data import WikiPageData
class WikiDataClient(BaseConnectorDataClient[WikiPageData]):
"""Fetches data from your external system."""
def __init__(self, wiki_base_url: str, api_token: str):
self.wiki_base_url = wiki_base_url
self.api_token = api_token
def get_source_data(self, since=None) -> Sequence[WikiPageData]:
"""Fetch all your documents here."""
# Your implementation here - call APIs, read files, query databases
pass
Step 3: Create Your Connector
from typing import List, Sequence
from glean.indexing.connectors import BaseDatasourceConnector
from glean.indexing.models import (
ContentDefinition,
CustomDatasourceConfig,
DocumentDefinition,
UserReferenceDefinition,
)
from .wiki_page_data import WikiPageData
class CompanyWikiConnector(BaseDatasourceConnector[WikiPageData]):
"""Transform and upload your data to Glean."""
configuration: CustomDatasourceConfig = CustomDatasourceConfig(
name="company_wiki",
display_name="Company Wiki",
url_regex=r"https://wiki\.company\.com/.*",
trust_url_regex_for_view_activity=True,
is_user_referenced_by_email=True,
)
def transform(self, data: Sequence[WikiPageData]) -> List[DocumentDefinition]:
"""Transform your data to Glean's format."""
documents = []
for page in data:
document = DocumentDefinition(
id=page["id"],
title=page["title"],
datasource=self.name,
view_url=page["url"],
body=ContentDefinition(mime_type="text/plain", text_content=page["content"]),
author=UserReferenceDefinition(email=page["author"]),
created_at=self._parse_timestamp(page["created_at"]),
updated_at=self._parse_timestamp(page["updated_at"]),
tags=page["tags"],
)
documents.append(document)
return documents
def _parse_timestamp(self, timestamp_str: str) -> int:
"""Convert ISO timestamp to Unix epoch seconds."""
from datetime import datetime
dt = datetime.fromisoformat(timestamp_str.replace("Z", "+00:00"))
return int(dt.timestamp())
Step 4: Run the Connector
from glean.indexing.models import IndexingMode
from .wiki_connector import CompanyWikiConnector
from .wiki_data_client import WikiDataClient
# Initialize
data_client = WikiDataClient(wiki_base_url="https://wiki.company.com", api_token="your-wiki-token")
connector = CompanyWikiConnector(name="company_wiki", data_client=data_client)
# Configure the datasource in Glean
connector.configure_datasource()
# Index all documents
connector.index_data(mode=IndexingMode.FULL)
When to use forced restarts:
- When you need to abort and restart a failed or interrupted upload
- When you want to ensure a clean upload state by discarding partial uploads
- When recovering from upload errors or inconsistent states
How it works:
- Generates a new
upload_idto ensure clean separation from previous uploads - Sets
forceRestartUpload=Trueon the first batch only - Continues with normal batch processing for subsequent batches
This feature is available on all connector types: BaseDatasourceConnector, BaseStreamingDatasourceConnector, and BasePeopleConnector.
Complete Example
from typing import List, Sequence, TypedDict
from glean.indexing.connectors import BaseConnectorDataClient, BaseDatasourceConnector
from glean.indexing.models import (
ContentDefinition,
CustomDatasourceConfig,
DocumentDefinition,
IndexingMode,
UserReferenceDefinition,
)
class WikiPageData(TypedDict):
id: str
title: str
content: str
author: str
created_at: str
updated_at: str
url: str
tags: List[str]
class WikiDataClient(BaseConnectorDataClient[WikiPageData]):
def __init__(self, wiki_base_url: str, api_token: str):
self.wiki_base_url = wiki_base_url
self.api_token = api_token
def get_source_data(self, since=None) -> Sequence[WikiPageData]:
# Example static data
return [
{
"id": "page_123",
"title": "Engineering Onboarding Guide",
"content": "Welcome to the engineering team...",
"author": "jane.smith@company.com",
"created_at": "2024-01-15T10:00:00Z",
"updated_at": "2024-02-01T14:30:00Z",
"url": f"{self.wiki_base_url}/pages/123",
"tags": ["onboarding", "engineering"],
},
{
"id": "page_124",
"title": "API Documentation Standards",
"content": "Our standards for API documentation...",
"author": "john.doe@company.com",
"created_at": "2024-01-20T09:15:00Z",
"updated_at": "2024-01-25T16:45:00Z",
"url": f"{self.wiki_base_url}/pages/124",
"tags": ["api", "documentation", "standards"],
},
]
class CompanyWikiConnector(BaseDatasourceConnector[WikiPageData]):
configuration: CustomDatasourceConfig = CustomDatasourceConfig(
name="company_wiki",
display_name="Company Wiki",
url_regex=r"https://wiki\.company\.com/.*",
trust_url_regex_for_view_activity=True,
is_user_referenced_by_email=True,
)
def transform(self, data: Sequence[WikiPageData]) -> List[DocumentDefinition]:
documents = []
for page in data:
documents.append(
DocumentDefinition(
id=page["id"],
title=page["title"],
datasource=self.name,
view_url=page["url"],
body=ContentDefinition(mime_type="text/plain", text_content=page["content"]),
author=UserReferenceDefinition(email=page["author"]),
created_at=self._parse_timestamp(page["created_at"]),
updated_at=self._parse_timestamp(page["updated_at"]),
tags=page["tags"],
)
)
return documents
def _parse_timestamp(self, timestamp_str: str) -> int:
from datetime import datetime
dt = datetime.fromisoformat(timestamp_str.replace("Z", "+00:00"))
return int(dt.timestamp())
data_client = WikiDataClient(wiki_base_url="https://wiki.company.com", api_token="your-wiki-token")
connector = CompanyWikiConnector(name="company_wiki", data_client=data_client)
connector.configure_datasource()
connector.index_data(mode=IndexingMode.FULL)
BaseStreamingDatasourceConnector
When to Use This
Perfect for
- Large document repositories that cannot fit in memory
- Memory-constrained environments
- Documents that are fetched via paginated APIs
- Very large individual documents (> 10MB)
- When you want to process data incrementally
Avoid when
- You have a small document set that fits comfortably in memory
- Your API can return all data efficiently in one call
- Memory usage is not a concern
Step-by-Step Implementation
Step 1: Define Your Data Type
from typing import TypedDict
class ArticleData(TypedDict):
"""Type definition for knowledge base article data."""
id: str
title: str
content: str
author: str
updated_at: str
url: str
Step 2: Create Your Streaming DataClient
from typing import Generator
import requests
from glean.indexing.connectors.base_streaming_data_client import StreamingConnectorDataClient
from .article_data import ArticleData
class LargeKnowledgeBaseClient(StreamingConnectorDataClient[ArticleData]):
"""Streaming client that yields data incrementally."""
def __init__(self, kb_api_url: str, api_key: str):
self.kb_api_url = kb_api_url
self.api_key = api_key
def get_source_data(self, since=None) -> Generator[ArticleData, None, None]:
"""Stream documents one page at a time to save memory."""
page = 1
page_size = 100
while True:
params = {"page": page, "size": page_size}
if since:
params["modified_since"] = since
response = requests.get(
f"{self.kb_api_url}/articles",
headers={"Authorization": f"Bearer {self.api_key}"},
params=params,
)
response.raise_for_status()
data = response.json()
articles = data.get("articles", [])
if not articles:
break
for article in articles:
yield ArticleData(article)
if len(articles) < page_size:
break
page += 1
Step 3: Create Your Streaming Connector
from typing import List, Sequence
from glean.api_client.models.userreferencedefinition import UserReferenceDefinition
from glean.indexing.connectors import BaseStreamingDatasourceConnector
from glean.indexing.models import ContentDefinition, CustomDatasourceConfig, DocumentDefinition
from .article_data import ArticleData
class KnowledgeBaseConnector(BaseStreamingDatasourceConnector[ArticleData]):
configuration: CustomDatasourceConfig = CustomDatasourceConfig(
name="knowledge_base",
display_name="Knowledge Base",
url_regex=r"https://kb\.company\.com/.*",
trust_url_regex_for_view_activity=True,
)
def __init__(self, name: str, data_client):
super().__init__(name, data_client)
self.batch_size = 50
def transform(self, data: Sequence[ArticleData]) -> List[DocumentDefinition]:
documents = []
for article in data:
documents.append(
DocumentDefinition(
id=article["id"],
title=article["title"],
datasource=self.name,
view_url=article["url"],
body=ContentDefinition(mime_type="text/html", text_content=article["content"]),
author=UserReferenceDefinition(email=article["author"]),
updated_at=self._parse_timestamp(article["updated_at"]),
)
)
return documents
def _parse_timestamp(self, timestamp_str: str) -> int:
from datetime import datetime
dt = datetime.fromisoformat(timestamp_str.replace("Z", "+00:00"))
return int(dt.timestamp())
Step 4: Run the Connector
from glean.indexing.models import IndexingMode
from .article_connector import KnowledgeBaseConnector
from .article_data_client import LargeKnowledgeBaseClient
data_client = LargeKnowledgeBaseClient(
kb_api_url="https://kb-api.company.com", api_key="your-kb-api-key"
)
connector = KnowledgeBaseConnector(name="knowledge_base", data_client=data_client)
connector.configure_datasource()
connector.index_data(mode=IndexingMode.FULL)
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file glean_indexing_sdk-0.3.0.tar.gz.
File metadata
- Download URL: glean_indexing_sdk-0.3.0.tar.gz
- Upload date:
- Size: 107.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
06bfdbd62e5f6f03718563e8f1ceb4b7f23946c28aa526474a45841bee12ca5e
|
|
| MD5 |
d1b8c5bd215b82de5835bce8eaa360d0
|
|
| BLAKE2b-256 |
d352128b9f2068e8ca973e5c94599e974f7e6014507c402efaa7829f8c93035c
|
Provenance
The following attestation bundles were made for glean_indexing_sdk-0.3.0.tar.gz:
Publisher:
publish.yml on gleanwork/glean-indexing-sdk
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
glean_indexing_sdk-0.3.0.tar.gz -
Subject digest:
06bfdbd62e5f6f03718563e8f1ceb4b7f23946c28aa526474a45841bee12ca5e - Sigstore transparency entry: 912655029
- Sigstore integration time:
-
Permalink:
gleanwork/glean-indexing-sdk@0a71ab4dbef3e7e20eabc42f5e16fdb6d48fdd8f -
Branch / Tag:
refs/tags/v0.3.0 - Owner: https://github.com/gleanwork
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@0a71ab4dbef3e7e20eabc42f5e16fdb6d48fdd8f -
Trigger Event:
push
-
Statement type:
File details
Details for the file glean_indexing_sdk-0.3.0-py3-none-any.whl.
File metadata
- Download URL: glean_indexing_sdk-0.3.0-py3-none-any.whl
- Upload date:
- Size: 33.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
850adafcba2d5e4bca18a9fd7fbac6794acee350e39bceaaaf5212bb35a716a2
|
|
| MD5 |
0bad92ecdbdd5ca689c9179187e2b955
|
|
| BLAKE2b-256 |
134fa68381fbdaa5fa85064aeb732a7543b2bead171d684fc543c2836f9ded02
|
Provenance
The following attestation bundles were made for glean_indexing_sdk-0.3.0-py3-none-any.whl:
Publisher:
publish.yml on gleanwork/glean-indexing-sdk
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
glean_indexing_sdk-0.3.0-py3-none-any.whl -
Subject digest:
850adafcba2d5e4bca18a9fd7fbac6794acee350e39bceaaaf5212bb35a716a2 - Sigstore transparency entry: 912655094
- Sigstore integration time:
-
Permalink:
gleanwork/glean-indexing-sdk@0a71ab4dbef3e7e20eabc42f5e16fdb6d48fdd8f -
Branch / Tag:
refs/tags/v0.3.0 - Owner: https://github.com/gleanwork
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@0a71ab4dbef3e7e20eabc42f5e16fdb6d48fdd8f -
Trigger Event:
push
-
Statement type: