Skip to main content

Swiss AI Hub Pipeline SDK: Dagster-based document ingestion, parsing, embedding, and vector storage for RAG.

Project description

swiss-ai-hub-pipeline

The data-ingestion SDK for Swiss AI Hub — turn documents into RAG-ready vectors with Dagster.

PyPI Python License


What is Swiss AI Hub?

Swiss AI Hub is an open-source, self-hosted AI platform for enterprises. One docker compose up starts ~30 integrated containers — LLM gateway (LiteLLM), vector search (Milvus), document parsing (MinerU), S3 storage (SeaweedFS), SSO (Keycloak), observability (Langfuse), a chat UI (Open-WebUI), and more. Agents answer questions over your organization's knowledge; this package is how that knowledge gets in.

What is this package?

swiss-ai-hub-pipeline is a Dagster-based SDK that ingests documents and produces the vectors RAG agents search. It implements a two-stage, asset-based pipeline:

  1. Source → data lake — monitor a source (SharePoint, OneDrive, Google Drive, S3, local/network shares — anything rclone supports) and sync changed files into the platform's S3 (SeaweedFS).
  2. Data lake → vector store — parse each file (MinerU OCR + structure), chunk it, embed it via the LLM gateway, and upsert the vectors into Milvus, with full lineage from every embedding back to its source document.

You compose a pipeline from one function, default_definitions(), which wires together all the assets, resources, IO managers, sensors, jobs, and schedules. It builds on swiss-ai-hub-core (installed automatically); RAG agents from swiss-ai-hub-agent query its output.

Should you use this package?

Probably not directly — most deployments use the pre-built pipeline images (default_rag_pipeline, shared_rag_pipeline), which ingest the platform's default buckets out of the box.

Use this PyPI package when you want a custom pipeline — connect a new data source, ingest into a different bucket, or tune parsing/chunking/embedding for your documents. It's an SDK for building your own ingestion as a Dagster code location.

Installation

pip install swiss-ai-hub-pipeline
# or
uv add swiss-ai-hub-pipeline

Requires Python 3.13.


Quick start

A pipeline is a Dagster code location — a module that exposes a Definitions object. default_definitions() builds a complete one:

# my_pipeline/__init__.py
from swiss_ai_hub.pipeline import default_definitions

defs = default_definitions(
    datalake_container_name="my_docs",                  # S3 bucket (Dagster name: letters, digits, underscores)
    embedding_model_name="embedding/bge-m3",            # any embedding model on the LiteLLM gateway
    llm_model_name="text-generation/gemma-4-31B-it",    # for summaries / table & figure refinement
    with_summary_nodes=True,                            # hierarchical RAG summaries
)

Run it with the Dagster UI and materialize the assets:

dagster dev -m my_pipeline      # opens http://localhost:3000

Drop a document into the my_docs bucket, click Materialize on the asset graph, and watch it flow: observe → documents (parse) → nodes (chunk + embed) → Milvus. A RAG agent pointed at that bucket can now answer questions over it.

To also pull from an external source, combine default_definitions() with a Stage-1 builder — e.g. default_rclone_to_datalake_definitions(...) for OneDrive/Google Drive/Dropbox, or default_sharepoint_to_datalake_definitions(...). The source templates (SharePoint, OneDrive, S3, Azure Blob, Google Drive, SFTP, local FS) are copy-paste starting points.


How it works

default_definitions() assembles a graph of Dagster assets connected by IO managers to the platform's stores:

Stage Assets Backed by
Source → data lake observable_*, data_lake_file, removed_data_lake_files SeaweedFS (S3)
Data lake → vector store documents (parse), nodes (chunk + embed), summary_nodes, removed_documents MinerU, LiteLLM, MongoDB, Milvus

Materialization is driven by eager automation, daily schedules, and a NATS sensor that fires when documents are uploaded through the API — so ingestion keeps up with changes without manual runs. Key default_definitions() knobs: with_summary_nodes, with_table_refinement, with_figure_descriptions, document_parser_loader_type (MinerU or Document Intelligence), and max_partitions.


Development

The dev stack runs the infrastructure a pipeline needs — SeaweedFS (S3), MongoDB, Milvus, MinerU, and the LiteLLM gateway — and exposes it on localhost:

# 1. Start the platform infrastructure (from a Swiss AI Hub checkout)
docker compose --env-file .env -f infra/docker-compose.dev.yml up -d

# 2. Load the dev connection settings into your shell
set -a && source .env && set +a

# 3. Run your pipeline's Dagster UI against the stack
dagster dev -m my_pipeline       # http://localhost:3000

Materialize assets from the UI to parse, embed, and store real documents. dagster definitions validate -m my_pipeline loads the whole code location (every asset, resource, and IO manager) without running it — handy as a fast sanity check and in CI.

Settings are not auto-loaded from the environment. The SDK reads connection settings only when constructed, so make sure the variables above are exported in the process that runs Dagster (set -a && source .env && set +a).

Production

In production a pipeline runs as a Dagster code location: a gRPC server in a container that the platform's Dagster webserver and daemon connect to.

1. Containerize it as a gRPC code-location server:

FROM python:3.13-slim
COPY --from=ghcr.io/astral-sh/uv:latest /uv /uvx /bin/

WORKDIR /app
COPY pyproject.toml uv.lock ./        # your project, depending on swiss-ai-hub-pipeline
RUN uv sync --frozen --no-dev
COPY . .

ENV PATH="/app/.venv/bin:$PATH" PYTHONUNBUFFERED=1
EXPOSE 4000
ENTRYPOINT ["dagster", "api", "grpc", "-h", "0.0.0.0", "-p", "4000", "-m", "my_pipeline"]

2. Run it alongside the platform on the right networks — a pipeline reaches MinerU + LiteLLM (backend), MongoDB + Milvus + NATS (data), and SeaweedFS/S3 (storage):

# docker-compose.my-pipeline.yml — deployed alongside the platform
services:
  my-pipeline:
    image: registry.example.com/my-pipeline:1.0.0
    restart: always
    environment:
      MONGO_CONNECTION_STRING: mongodb://${MONGO_USERNAME}:${MONGO_PASSWORD}@ferretdb:27017/
      MILVUS_URL: http://milvus-standalone:19530
      S3_STORAGE_ENDPOINT: http://seaweedfs-s3:9000
      S3_STORAGE_ACCESS_KEY: ${S3_STORAGE_ACCESS_KEY}
      S3_STORAGE_SECRET_KEY: ${S3_STORAGE_SECRET_KEY}
      LITE_LLM_PROXY_BASE_URL: http://litellm:4000
      LITE_LLM_PROXY_API_KEY: ${LITELLM_MASTER_KEY}
      MINERU_API_BASE_URL: http://mineru-api:8000
      NATS_ENDPOINT: nats://nats:4222
      NATS_TOKEN: ${NATS_TOKEN}
    networks: [backend, data, storage]

networks:
  backend: { external: true }
  data: { external: true }
  storage: { external: true }

3. Register it in the platform's Dagster workspace so the webserver/daemon load it:

# workspace.yaml
load_from:
  - grpc_server:
      host: my-pipeline      # the service name above
      port: 4000
      location_name: my-pipeline
docker compose -f docker-compose.my-pipeline.yml up -d

Reuse the platform's secrets (from its .env) for the ${…} values, and match the actual network names of your deployment. Your pipeline then shows up as a code location in the platform's Dagster UI, with its schedules and sensors running under the shared daemon.

Network reference. backend = LiteLLM, MinerU, OTEL. data = NATS, FerretDB, Milvus. storage = SeaweedFS/S3.


Links

License

Apache-2.0 — see packages/pipeline/LICENSE. For the full per-package license matrix, see LICENSES.md.


Part of Swiss AI Hub. Built in Switzerland by bbv Software Services.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

swiss_ai_hub_pipeline-0.292.0.tar.gz (81.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

swiss_ai_hub_pipeline-0.292.0-py3-none-any.whl (152.0 kB view details)

Uploaded Python 3

File details

Details for the file swiss_ai_hub_pipeline-0.292.0.tar.gz.

File metadata

  • Download URL: swiss_ai_hub_pipeline-0.292.0.tar.gz
  • Upload date:
  • Size: 81.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: uv/0.11.19 {"installer":{"name":"uv","version":"0.11.19","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for swiss_ai_hub_pipeline-0.292.0.tar.gz
Algorithm Hash digest
SHA256 781acbdacb1961c808116e6df85761bfe782dff8380cf91fb9c0a06fb2b23eb8
MD5 d9b6ff9723958edf8c0f8b5af5fc3398
BLAKE2b-256 ffd3014a1020d6d19c7092ef04179dcc8e5013720496fc7890d8953b8870ad2f

See more details on using hashes here.

File details

Details for the file swiss_ai_hub_pipeline-0.292.0-py3-none-any.whl.

File metadata

  • Download URL: swiss_ai_hub_pipeline-0.292.0-py3-none-any.whl
  • Upload date:
  • Size: 152.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: uv/0.11.19 {"installer":{"name":"uv","version":"0.11.19","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for swiss_ai_hub_pipeline-0.292.0-py3-none-any.whl
Algorithm Hash digest
SHA256 8ed969fd302f13515d14978e586f8521bba466d4a2a430d125477b3e40e87fbe
MD5 eaf4c813b3f12de6205bed6f2a2622f3
BLAKE2b-256 1ea53cc28f93703e925df1ee1d6b9ce1a9ef95c1b814b55d93dd65e7465f4786

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page