Skip to main content

Swiss AI Hub Pipeline SDK: Dagster-based document ingestion, parsing, embedding, and vector storage for RAG.

Project description

swiss-ai-hub-pipeline

The data-ingestion SDK for Swiss AI Hub — turn documents into RAG-ready vectors with Dagster.

PyPI Python License


What is Swiss AI Hub?

Swiss AI Hub is an open-source, self-hosted AI platform for enterprises. One docker compose up starts ~30 integrated containers — LLM gateway (LiteLLM), vector search (Milvus), document parsing (MinerU), S3 storage (SeaweedFS), SSO (Keycloak), observability (Langfuse), a chat UI (Open-WebUI), and more. Agents answer questions over your organization's knowledge; this package is how that knowledge gets in.

What is this package?

swiss-ai-hub-pipeline is a Dagster-based SDK that ingests documents and produces the vectors RAG agents search. It implements a two-stage, asset-based pipeline:

  1. Source → data lake — monitor a source (SharePoint, OneDrive, Google Drive, S3, local/network shares — anything rclone supports) and sync changed files into the platform's S3 (SeaweedFS).
  2. Data lake → vector store — parse each file (MinerU OCR + structure), chunk it, embed it via the LLM gateway, and upsert the vectors into Milvus, with full lineage from every embedding back to its source document.

You compose a pipeline from one function, default_definitions(), which wires together all the assets, resources, IO managers, sensors, jobs, and schedules. It builds on swiss-ai-hub-core (installed automatically); RAG agents from swiss-ai-hub-agent query its output.

Should you use this package?

Probably not directly — most deployments use the pre-built pipeline images (default_rag_pipeline, shared_rag_pipeline), which ingest the platform's default buckets out of the box.

Use this PyPI package when you want a custom pipeline — connect a new data source, ingest into a different bucket, or tune parsing/chunking/embedding for your documents. It's an SDK for building your own ingestion as a Dagster code location.

Installation

pip install swiss-ai-hub-pipeline
# or
uv add swiss-ai-hub-pipeline

Requires Python 3.13.


Quick start

A pipeline is a Dagster code location — a module that exposes a Definitions object. default_definitions() builds a complete one:

# my_pipeline/__init__.py
from swiss_ai_hub.pipeline import default_definitions

defs = default_definitions(
    datalake_container_name="my_docs",                  # S3 bucket (Dagster name: letters, digits, underscores)
    embedding_model_name="embedding/bge-m3",            # any embedding model on the LiteLLM gateway
    llm_model_name="text-generation/gemma-4-31B-it",    # for summaries / table & figure refinement
    with_summary_nodes=True,                            # hierarchical RAG summaries
)

Run it with the Dagster UI and materialize the assets:

dagster dev -m my_pipeline      # opens http://localhost:3000

Drop a document into the my_docs bucket, click Materialize on the asset graph, and watch it flow: observe → documents (parse) → nodes (chunk + embed) → Milvus. A RAG agent pointed at that bucket can now answer questions over it.

To also pull from an external source, combine default_definitions() with a Stage-1 builder — e.g. default_rclone_to_datalake_definitions(...) for OneDrive/Google Drive/Dropbox, or default_sharepoint_to_datalake_definitions(...). The source templates (SharePoint, OneDrive, S3, Azure Blob, Google Drive, SFTP, local FS) are copy-paste starting points.


How it works

default_definitions() assembles a graph of Dagster assets connected by IO managers to the platform's stores:

Stage Assets Backed by
Source → data lake observable_*, data_lake_file, removed_data_lake_files SeaweedFS (S3)
Data lake → vector store documents (parse), nodes (chunk + embed), summary_nodes, removed_documents MinerU, LiteLLM, MongoDB, Milvus

Materialization is driven by eager automation, daily schedules, and a NATS sensor that fires when documents are uploaded through the API — so ingestion keeps up with changes without manual runs. Key default_definitions() knobs: with_summary_nodes, with_table_refinement, with_figure_descriptions, document_parser_loader_type (MinerU or Document Intelligence), and max_partitions.


Development

The dev stack runs the infrastructure a pipeline needs — SeaweedFS (S3), MongoDB, Milvus, MinerU, and the LiteLLM gateway — and exposes it on localhost:

# 1. Start the platform infrastructure (from a Swiss AI Hub checkout)
docker compose --env-file .env -f infra/docker-compose.dev.yml up -d

# 2. Load the dev connection settings into your shell
set -a && source .env && set +a

# 3. Run your pipeline's Dagster UI against the stack
dagster dev -m my_pipeline       # http://localhost:3000

Materialize assets from the UI to parse, embed, and store real documents. dagster definitions validate -m my_pipeline loads the whole code location (every asset, resource, and IO manager) without running it — handy as a fast sanity check and in CI.

Settings are not auto-loaded from the environment. The SDK reads connection settings only when constructed, so make sure the variables above are exported in the process that runs Dagster (set -a && source .env && set +a).

Production

In production a pipeline runs as a Dagster code location: a gRPC server in a container that the platform's Dagster webserver and daemon connect to.

1. Containerize it as a gRPC code-location server:

FROM python:3.13-slim
COPY --from=ghcr.io/astral-sh/uv:latest /uv /uvx /bin/

WORKDIR /app
COPY pyproject.toml uv.lock ./        # your project, depending on swiss-ai-hub-pipeline
RUN uv sync --frozen --no-dev
COPY . .

ENV PATH="/app/.venv/bin:$PATH" PYTHONUNBUFFERED=1
EXPOSE 4000
ENTRYPOINT ["dagster", "api", "grpc", "-h", "0.0.0.0", "-p", "4000", "-m", "my_pipeline"]

2. Run it alongside the platform on the right networks — a pipeline reaches MinerU + LiteLLM (backend), MongoDB + Milvus + NATS (data), and SeaweedFS/S3 (storage):

# docker-compose.my-pipeline.yml — deployed alongside the platform
services:
  my-pipeline:
    image: registry.example.com/my-pipeline:1.0.0
    restart: always
    environment:
      MONGO_CONNECTION_STRING: mongodb://${MONGO_USERNAME}:${MONGO_PASSWORD}@ferretdb:27017/
      MILVUS_URL: http://milvus-standalone:19530
      S3_STORAGE_ENDPOINT: http://seaweedfs-s3:9000
      S3_STORAGE_ACCESS_KEY: ${S3_STORAGE_ACCESS_KEY}
      S3_STORAGE_SECRET_KEY: ${S3_STORAGE_SECRET_KEY}
      LITE_LLM_PROXY_BASE_URL: http://litellm:4000
      LITE_LLM_PROXY_API_KEY: ${LITELLM_MASTER_KEY}
      MINERU_API_BASE_URL: http://mineru-api:8000
      NATS_ENDPOINT: nats://nats:4222
      NATS_TOKEN: ${NATS_TOKEN}
    networks: [backend, data, storage]

networks:
  backend: { external: true }
  data: { external: true }
  storage: { external: true }

3. Register it in the platform's Dagster workspace so the webserver/daemon load it:

# workspace.yaml
load_from:
  - grpc_server:
      host: my-pipeline      # the service name above
      port: 4000
      location_name: my-pipeline
docker compose -f docker-compose.my-pipeline.yml up -d

Reuse the platform's secrets (from its .env) for the ${…} values, and match the actual network names of your deployment. Your pipeline then shows up as a code location in the platform's Dagster UI, with its schedules and sensors running under the shared daemon.

Network reference. backend = LiteLLM, MinerU, OTEL. data = NATS, FerretDB, Milvus. storage = SeaweedFS/S3.


Links

License

Apache-2.0 — see packages/pipeline/LICENSE. For the full per-package license matrix, see LICENSES.md.


Part of Swiss AI Hub. Built in Switzerland by bbv Software Services.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

swiss_ai_hub_pipeline-0.292.2.tar.gz (81.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

swiss_ai_hub_pipeline-0.292.2-py3-none-any.whl (152.0 kB view details)

Uploaded Python 3

File details

Details for the file swiss_ai_hub_pipeline-0.292.2.tar.gz.

File metadata

  • Download URL: swiss_ai_hub_pipeline-0.292.2.tar.gz
  • Upload date:
  • Size: 81.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: uv/0.11.19 {"installer":{"name":"uv","version":"0.11.19","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for swiss_ai_hub_pipeline-0.292.2.tar.gz
Algorithm Hash digest
SHA256 b0090fe830312f12af5b1286551a87f380c52fece176fab81437d7307487a38b
MD5 28a1211b88aef02e4d896d033537488a
BLAKE2b-256 75d543f62b2b833a2fe5fdd30406aebf60b2d88360434d23f1afcdf725402464

See more details on using hashes here.

File details

Details for the file swiss_ai_hub_pipeline-0.292.2-py3-none-any.whl.

File metadata

  • Download URL: swiss_ai_hub_pipeline-0.292.2-py3-none-any.whl
  • Upload date:
  • Size: 152.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: uv/0.11.19 {"installer":{"name":"uv","version":"0.11.19","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for swiss_ai_hub_pipeline-0.292.2-py3-none-any.whl
Algorithm Hash digest
SHA256 69aab61b0d2a40e5ba5b7b0f496d014e9f9801849c1c897a152133f80af0b509
MD5 76b11ccf3f3c9e51e1cefdf7fe43c4b9
BLAKE2b-256 00dd51a43cce0fc009ef6679a0fcab05a6ba997c6a83777434a28083e974e380

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page