Skip to main content

Swiss AI Hub Pipeline SDK: Dagster-based document ingestion, parsing, embedding, and vector storage for RAG.

Project description

swiss-ai-hub-pipeline

The data-ingestion SDK for Swiss AI Hub — turn documents into RAG-ready vectors with Dagster.

PyPI Python License


What is Swiss AI Hub?

Swiss AI Hub is an open-source, self-hosted AI platform for enterprises. One docker compose up starts ~30 integrated containers — LLM gateway (LiteLLM), vector search (Milvus), document parsing (MinerU), S3 storage (SeaweedFS), SSO (Keycloak), observability (Langfuse), a chat UI (Open-WebUI), and more. Agents answer questions over your organization's knowledge; this package is how that knowledge gets in.

What is this package?

swiss-ai-hub-pipeline is a Dagster-based SDK that ingests documents and produces the vectors RAG agents search. It implements a two-stage, asset-based pipeline:

  1. Source → data lake — monitor a source (SharePoint, OneDrive, Google Drive, S3, local/network shares — anything rclone supports) and sync changed files into the platform's S3 (SeaweedFS).
  2. Data lake → vector store — parse each file (MinerU OCR + structure), chunk it, embed it via the LLM gateway, and upsert the vectors into Milvus, with full lineage from every embedding back to its source document.

You compose a pipeline from one function, default_definitions(), which wires together all the assets, resources, IO managers, sensors, jobs, and schedules. It builds on swiss-ai-hub-core (installed automatically); RAG agents from swiss-ai-hub-agent query its output.

Should you use this package?

Probably not directly — most deployments use the pre-built pipeline images (default_rag_pipeline, shared_rag_pipeline), which ingest the platform's default buckets out of the box.

Use this PyPI package when you want a custom pipeline — connect a new data source, ingest into a different bucket, or tune parsing/chunking/embedding for your documents. It's an SDK for building your own ingestion as a Dagster code location.

Installation

pip install swiss-ai-hub-pipeline
# or
uv add swiss-ai-hub-pipeline

Requires Python 3.13.


Quick start

A pipeline is a Dagster code location — a module that exposes a Definitions object. default_definitions() builds a complete one:

# my_pipeline/__init__.py
from swiss_ai_hub.pipeline import default_definitions

defs = default_definitions(
    datalake_container_name="my_docs",                  # S3 bucket (Dagster name: letters, digits, underscores)
    embedding_model_name="embedding/bge-m3",            # any embedding model on the LiteLLM gateway
    llm_model_name="text-generation/gemma-4-31B-it",    # for summaries / table & figure refinement
    with_summary_nodes=True,                            # hierarchical RAG summaries
)

Run it with the Dagster UI and materialize the assets:

dagster dev -m my_pipeline      # opens http://localhost:3000

Drop a document into the my_docs bucket, click Materialize on the asset graph, and watch it flow: observe → documents (parse) → nodes (chunk + embed) → Milvus. A RAG agent pointed at that bucket can now answer questions over it.

To also pull from an external source, combine default_definitions() with a Stage-1 builder — e.g. default_rclone_to_datalake_definitions(...) for OneDrive/Google Drive/Dropbox, or default_sharepoint_to_datalake_definitions(...). The source templates (SharePoint, OneDrive, S3, Azure Blob, Google Drive, SFTP, local FS) are copy-paste starting points.


How it works

default_definitions() assembles a graph of Dagster assets connected by IO managers to the platform's stores:

Stage Assets Backed by
Source → data lake observable_*, data_lake_file, removed_data_lake_files SeaweedFS (S3)
Data lake → vector store documents (parse), nodes (chunk + embed), summary_nodes, removed_documents MinerU, LiteLLM, MongoDB, Milvus

Materialization is driven by eager automation, daily schedules, and a NATS sensor that fires when documents are uploaded through the API — so ingestion keeps up with changes without manual runs. Key default_definitions() knobs: with_summary_nodes, with_table_refinement, with_figure_descriptions, document_parser_loader_type (MinerU or Document Intelligence), and max_partitions.


Development

The dev stack runs the infrastructure a pipeline needs — SeaweedFS (S3), MongoDB, Milvus, MinerU, and the LiteLLM gateway — and exposes it on localhost:

# 1. Start the platform infrastructure (from a Swiss AI Hub checkout)
docker compose --env-file .env -f infra/docker-compose.dev.yml up -d

# 2. Load the dev connection settings into your shell
set -a && source .env && set +a

# 3. Run your pipeline's Dagster UI against the stack
dagster dev -m my_pipeline       # http://localhost:3000

Materialize assets from the UI to parse, embed, and store real documents. dagster definitions validate -m my_pipeline loads the whole code location (every asset, resource, and IO manager) without running it — handy as a fast sanity check and in CI.

Settings are not auto-loaded from the environment. The SDK reads connection settings only when constructed, so make sure the variables above are exported in the process that runs Dagster (set -a && source .env && set +a).

Production

In production a pipeline runs as a Dagster code location: a gRPC server in a container that the platform's Dagster webserver and daemon connect to.

1. Containerize it as a gRPC code-location server:

FROM python:3.13-slim
COPY --from=ghcr.io/astral-sh/uv:latest /uv /uvx /bin/

WORKDIR /app
COPY pyproject.toml uv.lock ./        # your project, depending on swiss-ai-hub-pipeline
RUN uv sync --frozen --no-dev
COPY . .

ENV PATH="/app/.venv/bin:$PATH" PYTHONUNBUFFERED=1
EXPOSE 4000
ENTRYPOINT ["dagster", "api", "grpc", "-h", "0.0.0.0", "-p", "4000", "-m", "my_pipeline"]

2. Run it alongside the platform on the right networks — a pipeline reaches MinerU + LiteLLM (backend), MongoDB + Milvus + NATS (data), and SeaweedFS/S3 (storage):

# docker-compose.my-pipeline.yml — deployed alongside the platform
services:
  my-pipeline:
    image: registry.example.com/my-pipeline:1.0.0
    restart: always
    environment:
      MONGO_CONNECTION_STRING: mongodb://${MONGO_USERNAME}:${MONGO_PASSWORD}@ferretdb:27017/
      MILVUS_URL: http://milvus-standalone:19530
      S3_STORAGE_ENDPOINT: http://seaweedfs-s3:9000
      S3_STORAGE_ACCESS_KEY: ${S3_STORAGE_ACCESS_KEY}
      S3_STORAGE_SECRET_KEY: ${S3_STORAGE_SECRET_KEY}
      LITE_LLM_PROXY_BASE_URL: http://litellm:4000
      LITE_LLM_PROXY_API_KEY: ${LITELLM_MASTER_KEY}
      MINERU_API_BASE_URL: http://mineru-api:8000
      NATS_ENDPOINT: nats://nats:4222
      NATS_TOKEN: ${NATS_TOKEN}
    networks: [backend, data, storage]

networks:
  backend: { external: true }
  data: { external: true }
  storage: { external: true }

3. Register it in the platform's Dagster workspace so the webserver/daemon load it:

# workspace.yaml
load_from:
  - grpc_server:
      host: my-pipeline      # the service name above
      port: 4000
      location_name: my-pipeline
docker compose -f docker-compose.my-pipeline.yml up -d

Reuse the platform's secrets (from its .env) for the ${…} values, and match the actual network names of your deployment. Your pipeline then shows up as a code location in the platform's Dagster UI, with its schedules and sensors running under the shared daemon.

Network reference. backend = LiteLLM, MinerU, OTEL. data = NATS, FerretDB, Milvus. storage = SeaweedFS/S3.


Links

License

Apache-2.0 — see packages/pipeline/LICENSE. For the full per-package license matrix, see LICENSES.md.


Part of Swiss AI Hub. Built in Switzerland by bbv Software Services.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

swiss_ai_hub_pipeline-0.300.0.tar.gz (82.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

swiss_ai_hub_pipeline-0.300.0-py3-none-any.whl (153.1 kB view details)

Uploaded Python 3

File details

Details for the file swiss_ai_hub_pipeline-0.300.0.tar.gz.

File metadata

  • Download URL: swiss_ai_hub_pipeline-0.300.0.tar.gz
  • Upload date:
  • Size: 82.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: uv/0.11.21 {"installer":{"name":"uv","version":"0.11.21","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for swiss_ai_hub_pipeline-0.300.0.tar.gz
Algorithm Hash digest
SHA256 1de7b7238ede8aabf61772784cfcb86241084fdf9664a915a13ea443ca0546d4
MD5 e9e054648c5efb7a064612c4a4f30ff6
BLAKE2b-256 bc05d5f6d399a841e69138e976a7629d693592d50696b8580a52d88550d0dfe6

See more details on using hashes here.

File details

Details for the file swiss_ai_hub_pipeline-0.300.0-py3-none-any.whl.

File metadata

  • Download URL: swiss_ai_hub_pipeline-0.300.0-py3-none-any.whl
  • Upload date:
  • Size: 153.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: uv/0.11.21 {"installer":{"name":"uv","version":"0.11.21","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for swiss_ai_hub_pipeline-0.300.0-py3-none-any.whl
Algorithm Hash digest
SHA256 02df7b00d045f17c22352542ef446f7d5600a728a6c160b71a8ab3d3e328feb8
MD5 e8964a24f71594fc5ec511bea70d54e8
BLAKE2b-256 9c0b7f73cf102e5f88b1b241b1035337dca818d13046f81b1b07702df388d5c4

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page