Skip to main content

Swiss AI Hub Pipeline SDK: Dagster-based document ingestion, parsing, embedding, and vector storage for RAG.

Project description

swiss-ai-hub-pipeline

The data-ingestion SDK for Swiss AI Hub — turn documents into RAG-ready vectors with Dagster.

PyPI Python License


What is Swiss AI Hub?

Swiss AI Hub is an open-source, self-hosted AI platform for enterprises. One docker compose up starts ~30 integrated containers — LLM gateway (LiteLLM), vector search (Milvus), document parsing (MinerU), S3 storage (SeaweedFS), SSO (Keycloak), observability (Langfuse), a chat UI (Open-WebUI), and more. Agents answer questions over your organization's knowledge; this package is how that knowledge gets in.

What is this package?

swiss-ai-hub-pipeline is a Dagster-based SDK that ingests documents and produces the vectors RAG agents search. It implements a two-stage, asset-based pipeline:

  1. Source → data lake — monitor a source (SharePoint, OneDrive, Google Drive, S3, local/network shares — anything rclone supports) and sync changed files into the platform's S3 (SeaweedFS).
  2. Data lake → vector store — parse each file (MinerU OCR + structure), chunk it, embed it via the LLM gateway, and upsert the vectors into Milvus, with full lineage from every embedding back to its source document.

You compose a pipeline from one function, default_definitions(), which wires together all the assets, resources, IO managers, sensors, jobs, and schedules. It builds on swiss-ai-hub-core (installed automatically); RAG agents from swiss-ai-hub-agent query its output.

Should you use this package?

Probably not directly — most deployments use the pre-built pipeline images (default_rag_pipeline, shared_rag_pipeline), which ingest the platform's default buckets out of the box.

Use this PyPI package when you want a custom pipeline — connect a new data source, ingest into a different bucket, or tune parsing/chunking/embedding for your documents. It's an SDK for building your own ingestion as a Dagster code location.

Installation

pip install swiss-ai-hub-pipeline
# or
uv add swiss-ai-hub-pipeline

Requires Python 3.13.


Quick start

A pipeline is a Dagster code location — a module that exposes a Definitions object. default_definitions() builds a complete one:

# my_pipeline/__init__.py
from swiss_ai_hub.pipeline import default_definitions

defs = default_definitions(
    datalake_container_name="my_docs",                  # S3 bucket (Dagster name: letters, digits, underscores)
    embedding_model_name="embedding/bge-m3",            # any embedding model on the LiteLLM gateway
    llm_model_name="text-generation/gemma-4-31B-it",    # for summaries / table & figure refinement
    with_summary_nodes=True,                            # hierarchical RAG summaries
)

Run it with the Dagster UI and materialize the assets:

dagster dev -m my_pipeline      # opens http://localhost:3000

Drop a document into the my_docs bucket, click Materialize on the asset graph, and watch it flow: observe → documents (parse) → nodes (chunk + embed) → Milvus. A RAG agent pointed at that bucket can now answer questions over it.

To also pull from an external source, combine default_definitions() with a Stage-1 builder — e.g. default_rclone_to_datalake_definitions(...) for OneDrive/Google Drive/Dropbox, or default_sharepoint_to_datalake_definitions(...). The source templates (SharePoint, OneDrive, S3, Azure Blob, Google Drive, SFTP, local FS) are copy-paste starting points.


How it works

default_definitions() assembles a graph of Dagster assets connected by IO managers to the platform's stores:

Stage Assets Backed by
Source → data lake observable_*, data_lake_file, removed_data_lake_files SeaweedFS (S3)
Data lake → vector store documents (parse), nodes (chunk + embed), summary_nodes, removed_documents MinerU, LiteLLM, MongoDB, Milvus

Materialization is driven by eager automation, daily schedules, and a NATS sensor that fires when documents are uploaded through the API — so ingestion keeps up with changes without manual runs. Key default_definitions() knobs: with_summary_nodes, with_table_refinement, with_figure_descriptions, document_parser_loader_type (MinerU or Document Intelligence), and max_partitions.


Development

The dev stack runs the infrastructure a pipeline needs — SeaweedFS (S3), MongoDB, Milvus, MinerU, and the LiteLLM gateway — and exposes it on localhost:

# 1. Start the platform infrastructure (from a Swiss AI Hub checkout)
docker compose --env-file .env -f infra/docker-compose.dev.yml up -d

# 2. Load the dev connection settings into your shell
set -a && source .env && set +a

# 3. Run your pipeline's Dagster UI against the stack
dagster dev -m my_pipeline       # http://localhost:3000

Materialize assets from the UI to parse, embed, and store real documents. dagster definitions validate -m my_pipeline loads the whole code location (every asset, resource, and IO manager) without running it — handy as a fast sanity check and in CI.

Settings are not auto-loaded from the environment. The SDK reads connection settings only when constructed, so make sure the variables above are exported in the process that runs Dagster (set -a && source .env && set +a).

Production

In production a pipeline runs as a Dagster code location: a gRPC server in a container that the platform's Dagster webserver and daemon connect to.

1. Containerize it as a gRPC code-location server:

FROM python:3.13-slim
COPY --from=ghcr.io/astral-sh/uv:latest /uv /uvx /bin/

WORKDIR /app
COPY pyproject.toml uv.lock ./        # your project, depending on swiss-ai-hub-pipeline
RUN uv sync --frozen --no-dev
COPY . .

ENV PATH="/app/.venv/bin:$PATH" PYTHONUNBUFFERED=1
EXPOSE 4000
ENTRYPOINT ["dagster", "api", "grpc", "-h", "0.0.0.0", "-p", "4000", "-m", "my_pipeline"]

2. Run it alongside the platform on the right networks — a pipeline reaches MinerU + LiteLLM (backend), MongoDB + Milvus + NATS (data), and SeaweedFS/S3 (storage):

# docker-compose.my-pipeline.yml — deployed alongside the platform
services:
  my-pipeline:
    image: registry.example.com/my-pipeline:1.0.0
    restart: always
    environment:
      MONGO_CONNECTION_STRING: mongodb://${MONGO_USERNAME}:${MONGO_PASSWORD}@ferretdb:27017/
      MILVUS_URL: http://milvus-standalone:19530
      S3_STORAGE_ENDPOINT: http://seaweedfs-s3:9000
      S3_STORAGE_ACCESS_KEY: ${S3_STORAGE_ACCESS_KEY}
      S3_STORAGE_SECRET_KEY: ${S3_STORAGE_SECRET_KEY}
      LITE_LLM_PROXY_BASE_URL: http://litellm:4000
      LITE_LLM_PROXY_API_KEY: ${LITELLM_MASTER_KEY}
      MINERU_API_BASE_URL: http://mineru-api:8000
      NATS_ENDPOINT: nats://nats:4222
      NATS_TOKEN: ${NATS_TOKEN}
    networks: [backend, data, storage]

networks:
  backend: { external: true }
  data: { external: true }
  storage: { external: true }

3. Register it in the platform's Dagster workspace so the webserver/daemon load it:

# workspace.yaml
load_from:
  - grpc_server:
      host: my-pipeline      # the service name above
      port: 4000
      location_name: my-pipeline
docker compose -f docker-compose.my-pipeline.yml up -d

Reuse the platform's secrets (from its .env) for the ${…} values, and match the actual network names of your deployment. Your pipeline then shows up as a code location in the platform's Dagster UI, with its schedules and sensors running under the shared daemon.

Network reference. backend = LiteLLM, MinerU, OTEL. data = NATS, FerretDB, Milvus. storage = SeaweedFS/S3.


Links

License

Apache-2.0 — see packages/pipeline/LICENSE. For the full per-package license matrix, see LICENSES.md.


Part of Swiss AI Hub. Built in Switzerland by bbv Software Services.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

swiss_ai_hub_pipeline-0.296.3.tar.gz (81.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

swiss_ai_hub_pipeline-0.296.3-py3-none-any.whl (152.0 kB view details)

Uploaded Python 3

File details

Details for the file swiss_ai_hub_pipeline-0.296.3.tar.gz.

File metadata

  • Download URL: swiss_ai_hub_pipeline-0.296.3.tar.gz
  • Upload date:
  • Size: 81.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: uv/0.11.21 {"installer":{"name":"uv","version":"0.11.21","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for swiss_ai_hub_pipeline-0.296.3.tar.gz
Algorithm Hash digest
SHA256 1ec38b25648e0ff909ebef46c925e4b7005bd5e28e7f65228062d7bdbeb8f147
MD5 947f50b19b5aa59b8cb2e8d063e8f6ed
BLAKE2b-256 e1445a1d4f6342704ccae6ef083cfe34270dbe361c314d5bde37673095199772

See more details on using hashes here.

File details

Details for the file swiss_ai_hub_pipeline-0.296.3-py3-none-any.whl.

File metadata

  • Download URL: swiss_ai_hub_pipeline-0.296.3-py3-none-any.whl
  • Upload date:
  • Size: 152.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: uv/0.11.21 {"installer":{"name":"uv","version":"0.11.21","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for swiss_ai_hub_pipeline-0.296.3-py3-none-any.whl
Algorithm Hash digest
SHA256 2c20afd0635dd887d3ca8e563f6a9eefa57569321534fb9882846cede965b206
MD5 bb47b4754e8f76ae5efcf33d2ab8c310
BLAKE2b-256 588959a7eb38cd4d1ed9490f4497af4c0cdd242f5cdabbc11c28fc7fd3890945

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page