Code-native, configurable RSS-to-Notion pipeline
Project description
๐ฐ sluice
Code-native pipelines for information. Wire up RSS, LLMs, and Notion in plain TOML โ think of it as n8n in code form.
English ยท ็ฎไฝไธญๆ ยท PyPI ยท GitHub
sluยทice /sluหs/ โ a water gate. You decide what flows through.
That's the metaphor. Pick your sources, set when the gate opens, route through processing channels, and land everything in downstream reservoirs โ except the water is information, and every gate, channel, and reservoir is code you can read.
โโโโโโโโโโโโ โโโโโโโโโโโ โโโโโโโโโโโโ โโโโโโโโโโโโ
RSS โโโโถโโ โ Source โโโโถโ Stages โโโโถโ Render โโโโถโ Sinks โโโโถ Notion
โโโโโโโโโโโโ โโโโโโโโโโโ โโโโโโโโโโโโ โโโโโโโโโโโโ
dedupe / fetch / file_md / email
summarize / filter / (pluggable)
analyze (LLM)
Table of contents
- Why sluice?
- Quickstart
- Concepts
- Configuration
- Built-in plugins
- Operations & observability
- Docker deployment
- CI/CD
- Roadmap
- Development
- License
What's new in V1.1
After months of running sluice daily on real feeds, here's what got better:
- Swappable output channels: Telegram (MarkdownV2), Feishu (post/text/interactive), Email (fail_fast/best_effort) โ all with
sink_delivery_logaudit trail. - Attachment mirroring:
mirror_attachmentsdownloads images/files to local disk withfile://,https://, or relative URL prefix. - Enricher protocol + hn_comments: pluggable enrichers that augment items with external data (HN comment threads via the HN Firebase API, with official HN site as fallback). Run this stage before
summarizeso the LLM can incorporate community discussion. - Sub-daily pipelines:
run_key_templatewith{run_hour},{run_minute},{run_iso},{run_epoch}for cron intervals under 24h. - Ranking stages:
sortorders by numeric/string/datetime fields;limitcan sort, group, and cap withsort_by/group_by/per_group_max. field_filterops:lower,strip,regex_replacein addition to existingtruncate/drop.- New cleanup + scoring stages:
cross_dedupe,html_strip,score_tag, andsummarize_score_taghelp deduplicate across feeds, normalize RSS fields, and combine scoring/tagging/summarization before downstream filtering. - Smart fetcher fallback:
on_all_failed = "use_raw_summary"gracefully falls back to RSS summary text when all fetchers fail. - URL cache size cap: configurable
max_rowswith LRU eviction โ keeps the DB lean. - GC command:
sluice gcreclaims storage fromfailed_items,url_cache,attachment_mirror+ orphan file cleanup. - Observability: custom Prometheus collector,
sluice stats,sluice metrics-server,sluice deliveriesaudit viewer. - Lazy registry: plugins register via stubs โ
pip install sluice-pipeline(no extras) stays lightweight.
Why sluice?
You track a bunch of RSS feeds. You want a daily digest in Notion โ each article auto-fetched, LLM-summarized, bundled into a brief, then pushed to a Notion database. You want it cheap, observable, and fully under your control.
Three approaches you'll look at:
| n8n / Zapier | A 200-line Python script | sluice | |
|---|---|---|---|
| Add a new feed | Click 12 buttons | Edit code | Add 3 lines to TOML |
| Swap LLM provider | Hope the integration exists | Hope you wrote it that way | Edit providers.toml, restart |
| Cost cap per run | Hard | Hand-rolled | One-line max_estimated_cost_usd |
| Failure handling | "Retry the whole workflow" | try: ... except: pass |
Per-item failed_items lifecycle, dead-letter, --retry |
| Self-hosted observability | n8n web UI (resource-heavy) | print + grep |
Rich progress bar, loguru diagnostics, Prefect run history |
| LLM fallback chain | Manual branching | None | 4-tier model chain with weighted routing + key cooldown |
| Idempotency | "Did it already run?" | "Did the script crash midway?" | sink_emissions table, upsert on retry |
sluice is n8n in code: business logic lives in plain Python and TOML โ no SaaS lock-in, no GUI walls, no opaque webhooks.
Quickstart
1. Install
# The PyPI package is called sluice-pipeline (sluice was taken).
# Your imports and CLI commands stay "sluice" โ no rename needed.
pip install sluice-pipeline
# With push-channel sinks (Telegram / Feishu / Email)
pip install "sluice-pipeline[channels]"
# With Prometheus metrics
pip install "sluice-pipeline[metrics]"
# With HN Comments enricher
pip install "sluice-pipeline[enrich-hn]"
# Everything
pip install "sluice-pipeline[all]"
Heads-up: Python 3.11+. You'll need a Notion integration token and at least one OpenAI-compatible API key (DeepSeek, GLM, OpenAI, OpenRouter โ anything that speaks the
/v1/chat/completionsprotocol works). If you're only using the RSS source + file_md sink, no LLM key is needed.
2. Project layout
my-digest/
โโโ configs/
โ โโโ sluice.toml # global: state, fetcher chain, runtime
โ โโโ providers.toml # LLM provider pool
โ โโโ pipelines/
โ โโโ ai_news.toml # one pipeline per file
โโโ prompts/
โ โโโ summarize_zh.md # Jinja2 prompt
โ โโโ daily_brief_zh.md
โโโ prompts/ (also contains render templates)
โโโ daily.md.j2 # render template
3. A minimal pipeline
# configs/pipelines/ai_news.toml
id = "ai_news"
window = "24h"
timezone = "Asia/Shanghai"
[[sources]]
type = "rss"
url = "https://openai.com/blog/rss"
[[stages]]
name = "dedupe"
type = "dedupe"
[[stages]]
name = "summarize"
type = "llm_stage"
mode = "per_item"
input_field = "raw_summary"
output_field = "summary"
prompt_file = "prompts/summarize_zh.md"
model = "openai/gpt-4o-mini"
[[stages]]
name = "render"
type = "render"
template = "prompts/daily.md.j2"
output_field = "context.markdown"
[[sinks]]
id = "notion_main"
type = "notion"
input = "context.markdown"
parent_id = "env:NOTION_DB_AI_NEWS"
parent_type = "database"
token = "env:NOTION_TOKEN"
title_template = "AI Daily ยท {run_date}"
4. Run it
# Dry-run: no DB writes, no sink emits โ see what *would* happen
sluice run ai_news --dry-run
# For real
sluice run ai_news
# With detailed diagnostics
sluice run ai_news --verbose --log-file logs/ai_news.jsonl
# Schedule it (registers a Prefect deployment with cron)
sluice deploy
That's it โ a complete daily digest pipeline in 30 lines of TOML.
Concepts
Here's the mental model. The water-gate metaphor maps directly into code โ five plugin Protocols, and every pipeline is just a composition of them:
| Protocol | What it does | Built-in implementations |
|---|---|---|
Source |
Bring items into the stream | rss |
Fetcher |
Hydrate an article URL โ markdown | trafilatura, crawl4ai, firecrawl, jina_reader |
Processor |
Transform the stream | dedupe, cross_dedupe, fetcher_apply, html_strip, filter, field_filter, score_tag, summarize_score_tag, sort, llm_stage, render, limit, enrich, mirror_attachments |
Sink |
Push items downstream | file_md, notion, telegram, feishu, email |
LLMProvider |
Talk to an OpenAI-compatible endpoint | weighted base/key pool with 4-tier fallback chain |
Every plugin registers itself with a single decorator. Adding a new source (e.g. IMAP, Telegram, Reddit) is one Python file plus one line of TOML.
The Item model
Items flow through the stages. Each carries provenance, content, and arbitrary extras written by upstream stages:
@dataclass
class Item:
source_id: str
pipeline_id: str
guid: str | None
url: str # canonicalized (utm_*/fbclid/... stripped)
title: str
published_at: datetime | None
raw_summary: str | None
fulltext: str | None # populated by fetcher_apply
attachments: list[Attachment] # RSS enclosures
summary: str | None # populated by summarize llm_stage
extras: dict[str, Any] # anything else stages write
tags: list[str]
def get(self, path: str, default=None):
"""Dot path: 'fulltext', 'extras.relevance', 'tags.0'"""
LLM provider pool โ the unfair advantage
This is where sluice earns its keep. A 200-line script can call an LLM, sure. But can it juggle a pool of weighted providers, cool down quota-exhausted keys automatically, and walk a 4-tier fallback chain per stage โ without you writing a single retry loop?
That's what the provider pool does.
Show full provider config
# configs/providers.toml
[[providers]]
name = "openrouter"
type = "openai_compatible"
[[providers.base]]
url = "https://openrouter.ai/api/v1"
weight = 3
key = [
{ value = "env:OR_KEY_1", weight = 2 },
{ value = "env:OR_KEY_2", weight = 1, quota_duration = 18000,
quota_error_tokens = ["exceed", "quota"] },
]
active_windows = ["00:00-08:00"] # only use this base off-peak
active_timezone = "Asia/Shanghai"
[[providers.models]]
model_name = "openai/gpt-4o-mini"
input_price_per_1k = 0.00015
output_price_per_1k = 0.0006
[[providers]]
name = "ollama"
type = "openai_compatible"
[[providers.base]]
url = "http://localhost:11434/v1"
key = [{ value = "local" }]
[[providers.models]]
model_name = "llama3"
# free local โ no price needed
Then in any llm_stage:
model = "openrouter/openai/gpt-4o-mini"
retry_model = "openrouter/openai/gpt-4o-mini" # same tier, retry on transient failures
fallback_model = "openrouter/google/gemini-flash" # cheaper backup
fallback_model_2 = "ollama/llama3" # last-resort local
The chain walks main โ retry โ fallback โ fallback_2. Each tier has independent worker/concurrency caps. Quota-exhausted keys are cooled down automatically. Time-window routing lets you push expensive traffic to off-peak hours.
Configuration
sluice has three TOML layers:
sluice.tomlโ state DB, runtime, default fetcher chainproviders.tomlโ LLM provider poolpipelines/<id>.tomlโ one file per pipeline
Show a full production-grade pipeline TOML
id = "ai_news"
description = "Daily AI/infra news digest"
enabled = true
cron = "0 8 * * *"
timezone = "Asia/Shanghai"
window = "24h"
lookback_overlap = "4h"
# โโโ Backpressure / cost caps โโโโโโโโโโโโโโโโโโโโโโโโโโโ
[limits]
max_items_per_run = 50 # tuned for Notion upsert latency
item_overflow_policy = "drop_oldest"
max_llm_calls_per_run = 500
max_estimated_cost_usd = 5.0 # hard fail before blowing budget
# โโโ Failed-item lifecycle โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
[failures]
retry_failed = true
max_retries = 3 # then dead-letter
retry_backoff = "next_run"
# โโโ Sources (multiple allowed) โโโโโโโโโโโโโโโโโโโโโโโโโ
[[sources]]
type = "rss"
url = "https://openai.com/blog/rss"
tag = "ai"
[[sources]]
type = "rss"
url = "https://www.anthropic.com/news/rss.xml"
tag = "ai"
# โโโ Stages (executed in order) โโโโโโโโโโโโโโโโโโโโโโโโโ
[[stages]]
name = "dedupe"
type = "dedupe"
[[stages]]
name = "fetch_fulltext"
type = "fetcher_apply"
write_field = "fulltext"
skip_if_field_longer_than = 2000 # trust the feed if content is already long
[[stages]]
name = "prefilter"
type = "filter"
mode = "keep_if_all"
rules = [
{ field = "fulltext", op = "min_length", value = 300 },
{ field = "title", op = "not_matches", value = "(?i)sponsored|advertisement" },
]
[[stages]]
name = "summarize"
type = "llm_stage"
mode = "per_item"
input_field = "fulltext"
output_field = "summary"
prompt_file = "prompts/summarize_zh.md"
model = "openrouter/openai/gpt-4o-mini"
fallback_model = "ollama/llama3"
workers = 8
max_input_chars = 20000
truncate_strategy = "head_tail"
[[stages]]
name = "daily_analysis"
type = "llm_stage"
mode = "aggregate"
input_field = "summary"
output_target = "context.daily_brief"
prompt_file = "prompts/daily_brief_zh.md"
model = "openrouter/openai/gpt-4o"
[[stages]]
name = "render"
type = "render"
template = "prompts/daily.md.j2"
output_field = "context.markdown"
# โโโ Sinks (multiple allowed, ID-keyed for idempotency) โ
[[sinks]]
id = "local_archive"
type = "file_md"
input = "context.markdown"
path = "./out/ai_news/{run_date}.md"
[[sinks]]
id = "notion_main"
type = "notion"
input = "context.markdown"
parent_id = "env:NOTION_DB_AI_NEWS"
parent_type = "database"
token = "env:NOTION_TOKEN"
title_template = "AI Daily ยท {run_date}"
properties = { Tag = "AI", Source = "sluice" }
mode = "upsert" # upsert | create_once | create_new
Built-in plugins
๐ฅ Sources
type |
Description |
|---|---|
rss |
Standard RSS/Atom via feedparser. URL canonicalization (UTM/fbclid/โฆ stripped), enclosures extracted as Item.attachments, future-dated items dropped. |
Coming in v2: IMAP, Telegram, Reddit, custom webhook.
๐ Fetchers (full-text extraction chain)
type |
Use when |
|---|---|
trafilatura |
Pure Python, no extra service. Fast. Default first try. |
crawl4ai |
Self-hosted Crawl4AI. Uses POST /crawl, then polls /task/{task_id} or /jobs/{task_id} when the service returns a task id. |
firecrawl |
Self-hosted Firecrawl for JS-rendered pages. |
jina_reader |
Hosted Jina Reader fallback when self-hosting fails. |
The fetcher chain is configured globally (or per-pipeline). Each request
walks the chain top-down with min_chars validation and an optional disk
cache:
[fetcher]
chain = ["trafilatura", "crawl4ai", "firecrawl", "jina_reader"]
min_chars = 500
on_all_failed = "skip" # or "continue_empty" / "use_raw_summary"
[fetcher.cache]
enabled = true
ttl = "7d"
SSRF guard built in: outbound fetches are blocked from hitting private/loopback IPs to prevent malicious feed entries from probing internal networks.
If you run behind a TUN/fake-IP proxy (for example Clash/mihomo fake-ip mode),
DNS may resolve public domains to 198.18.0.0/15. Keep the default strict
guard for normal environments; opt in only when you know your proxy is doing
this:
SLUICE_SSRF_ALLOW_TUN_FAKE_IP=1 sluice run ai_news
โ๏ธ Processors
type |
Purpose |
|---|---|
dedupe |
Drop items already in seen_items for this pipeline. |
cross_dedupe |
Merge duplicates across sources by URL first, then title similarity. Keeps source-priority winners and can merge tags. |
fetcher_apply |
Walk the fetcher chain to populate item.fulltext. |
html_strip |
Strip HTML from top-level fields or extras.<key>, preserving paragraph/header line breaks and dropping script/style/template content. |
filter |
Rule-based keep/drop. 17 operators incl. regex, length, time windows. ReDoS-guarded. |
field_filter |
Mutate fields (truncate, drop, lower, strip, regex_replace) โ e.g. trim fulltext to 20k chars before an expensive LLM call. |
score_tag |
Per-item LLM scorer that writes extras.<score_field> and appends/replaces tags. Handles JSON fences, numeric strings, truncation, and per-item failures. |
summarize_score_tag |
Per-item LLM stage that writes a summary, extras.<score_field>, and tags in one call. Default summary_field = "summary" writes item.summary; use extras.<key> for extras. |
sort |
Order all items by a numeric, string, or datetime field. sort_type = "auto" keeps score-like strings numeric; use string for lexical title sorting. |
llm_stage |
LLM call, per_item (fan out) or aggregate (single call over all items). JSON parsing, max input chars, head-tail truncation, 4-tier fallback chain, cost preflight. |
render |
Jinja2 template โ markdown into context.<key>. Receives a fixed context (items, stats, run_date, โฆ). |
limit |
Sort and cap output. sort_by, group_by, per_group_max, top_n. |
enrich |
Plug in enrichers that augment items with external data (e.g. HN comments). |
mirror_attachments |
Download item attachments/extras to local disk; rewrite URLs. |
๐ filter operator reference
filter is a cheap, deterministic, no-LLM keep/drop stage. Each rule
is { field, op, value } where field is a dot-path resolved through
Item.get() (so extras.relevance, tags.0, published_at all work).
Combine rules with one of four modes.
Modes:
mode |
Keep this item ifโฆ |
|---|---|
keep_if_all |
every rule matches (logical AND) |
keep_if_any |
at least one rule matches (logical OR) |
drop_if_all |
NOT all rules match |
drop_if_any |
NOT any rule matches |
Operators (17 total):
| Category | op |
What it checks |
|---|---|---|
| Existence | exists |
field is present and not None |
not_exists |
field is None or missing | |
| Numeric | gt / gte |
field > value / field >= value |
lt / lte |
field < value / field <= value |
|
eq |
field == value |
|
| String | matches |
regex search (ReDoS-guarded) |
not_matches |
inverse of matches |
|
contains |
substring / element membership in field | |
not_contains |
inverse of contains |
|
| Membership | in |
field value is in the given list |
not_in |
inverse of in |
|
| Length | min_length |
len(field) >= value |
max_length |
len(field) <= value |
|
| Time | newer_than |
field (datetime) is newer than now - value (e.g. "24h", "7d") |
older_than |
field is older than now - value |
Worked examples:
# 1. Pre-LLM cheap prefilter โ keep articles long enough to summarize
# and drop obvious ad/sponsored junk by title.
[[stages]]
name = "prefilter"
type = "filter"
mode = "keep_if_all"
rules = [
{ field = "fulltext", op = "min_length", value = 300 },
{ field = "title", op = "not_matches", value = "(?i)sponsored|advertisement|ๅนฟๅ|่ตๅฉ" },
{ field = "published_at", op = "newer_than", value = "48h" },
]
# 2. LLM-driven filter โ score_tag scores relevance 1-10,
# then `filter` drops anything below 6.
[[stages]]
name = "score_and_tag"
type = "score_tag"
input_field = "fulltext"
prompt_file = "prompts/score_tag.md"
model = "openrouter/openai/gpt-4o-mini"
score_field = "relevance"
tags_merge = "append"
[[stages]]
name = "drop_irrelevant"
type = "filter"
mode = "keep_if_all"
rules = [
{ field = "extras.relevance", op = "gte", value = 6 },
]
[[stages]]
name = "rank_relevant"
type = "sort"
sort_by = "extras.relevance"
sort_type = "number" # auto | number | string | datetime
sort_order = "desc"
sort_missing = "last"
# 3. Tag whitelist + URL blacklist combo
[[stages]]
name = "scope"
type = "filter"
mode = "keep_if_all"
rules = [
{ field = "tags", op = "contains", value = "ai" },
{ field = "url", op = "not_matches", value = "^https?://(twitter|x)\\.com/" },
{ field = "source_id", op = "not_in", value = ["spammy_feed_1", "spammy_feed_2"] },
]
Why "rule-based filter + LLM scorer" beats pure LLM filtering: scoring
costs LLM tokens, so do it once via score_tag, write the score into
extras, and let cheap deterministic rules fan it out. Same prompt money,
unlimited filter combinations downstream.
๐ค Sinks
type |
Description |
|---|---|
file_md |
Deterministic local markdown file. Useful as an audit trail. |
notion |
Wraps notionify โ markdown โ Notion page in your database. |
telegram |
Push messages to Telegram chats via Bot API. MarkdownV2 rendering, safe truncation, split-on-too-long. |
feishu |
Push messages to Feishu/Lark. Two auth modes: auth_mode = "webhook" (default) โ webhook URL + optional HMAC secret; auth_mode = "bot_api" โ app_id + app_secret + receive_id, sends Markdown-converted post messages via the Bot API. Supports post, text, and interactive (Card V2) message types. |
email |
Send HTML emails via SMTP. Auto-detects TLS mode from port (465โSSL, 587โSTARTTLS). Per-recipient batching, fail_fast or best_effort delivery. |
Idempotency modes:
mode |
Behavior |
|---|---|
upsert |
Re-running the same run_key updates the existing page (no duplicates). |
create_once |
First run creates; later runs no-op. |
create_new |
Always create a new page (intentional non-idempotent for "publish each rerun"). |
For database parents, properties may use friendly TOML values such as
{ Tag = "AI", Source = "sluice" }; sluice reads the database schema and
expands them to the Notion API shape for select, multi_select,
rich_text, url, date, and similar property types. Fully explicit Notion
property dictionaries are passed through unchanged.
Operations & observability
๐ง CLI
sluice list # list configured pipelines and their crons
sluice validate # validate all TOML
sluice run <pipeline_id> # run once
sluice run <pipeline_id> --dry-run # no DB writes, no sink emits
sluice run <pipeline_id> --verbose # include Sluice DEBUG logs
sluice run <pipeline_id> --log-file logs/run.jsonl # write DEBUG JSONL diagnostics
sluice deploy # register all enabled pipelines as Prefect deployments
sluice failures <pipeline_id> # list failed_items
sluice failures <pipeline_id> --retry <item_key> # move dead-letter back to failed
sluice gc # reclaim space from failed_items/url_cache/attachment_mirror
sluice gc --dry-run # show what would be deleted without modifying
sluice gc --older-than 90d --pipeline ai_news # target specific age and pipeline
sluice stats # show pipeline run stats (last 7 days)
sluice stats ai_news --since 30d --format json # per-pipeline stats in JSON
sluice metrics-server --host 0.0.0.0 --port 9090 # start Prometheus exposition endpoint
sluice deliveries <pipeline_id> # list sink delivery audit log
sluice deliveries <pipeline_id> --run <run_key> # filter deliveries by specific run
๐ชต Progress and logs
sluice run prints a tqdm progress bar while the pipeline is running, then a
Rich Step Summary table with per-source and per-stage counts:
source rss_0 - 2 total=2
stage fetch_fulltext 22 3 fetched=3 failed=19 AllFetchersFailed=19
sink local:file_md 3 emitted
The console log level is INFO by default. Add --verbose to show Sluice DEBUG
events such as individual fetcher attempts, cache hits, too-short pages, and
LLM retryable failures. Third-party internals (aiosqlite, httpx,
httpcore, feedparser, trafilatura, prefect) are kept at WARNING so
verbose mode doesn't turn into transport noise.
Use --log-file or SLUICE_LOG_FILE for full DEBUG JSONL diagnostics:
sluice run ai_news --log-file logs/ai_news.jsonl
SLUICE_LOG_FILE=logs/ai_news.jsonl sluice run ai_news --verbose
๐ State and what's persisted (SQLite)
Eight tables, no ORM, schema migrated via PRAGMA user_version:
| Table | What it tracks |
|---|---|
seen_items |
Dedupe registry per pipeline. Includes summary for future RAG. |
failed_items |
Per-item failures with full payload, status (failed/dead_letter/resolved), attempt count. |
sink_emissions |
Maps (pipeline_id, run_key, sink_id) โ external_id. Powers idempotent retries. |
url_cache |
Article extraction cache, TTL'd. Avoids re-hitting Firecrawl on retries. |
run_log |
Per-run metadata: items in/out, LLM calls, estimated cost, status, error. |
sink_delivery_log |
Per-message push-sink audit trail: ordinal, kind, recipient, external_id, status, error. |
attachment_mirror |
Mirrored attachment metadata: original URL, local path, mime type, size. |
gc_log |
GC run history: timestamp, tables cleaned, rows affected. |
๐ Failure handling lifecycle
RSS item โโโถ stage X fails โโโถ failed_items (status=failed, item_json saved)
โ
next run starts
โ
โโโโโโโโโโโโโโโโโดโโโโโโโโโโโโโโโโโ
โผ โผ
re-queued before processors succeeds โ status=resolved
โ
fails again โโโถ attempts++ โโโถ at max_retries โ dead_letter
โ
sluice failures --retry โ back to failed
Items are reconstructed from item_json so retry doesn't depend on the RSS
feed re-surfacing them.
๐ Scheduling with Prefect โ the production upgrade
You can run sluice with raw cron. A one-line crontab entry will do:
0 8 * * * cd /app && sluice run ai_news
That works. But after a few weeks of running real pipelines, you'll start asking questions that cron can't answer: Did this morning's run succeed? Which item failed and why? Can I retry just that one stage without re-running the whole pipeline?
This is where Prefect comes in. sluice deploy registers each enabled pipeline
as a Prefect deployment โ a scheduled job with its own cron expression,
run history, and per-task observability baked in. Think of Prefect as the
missing web UI for your pipelines: not a dependency, not a lock-in, just an
optional upgrade when cron isn't enough.
The full setup โ three terminals, 30 seconds:
# Terminal 1: Start the Prefect server (API + UI)
prefect server start
# Terminal 2: Register all your pipelines as scheduled deployments
sluice deploy
# Terminal 3: Start a worker to pick up scheduled runs
prefect worker start --pool default
That's it. Your pipelines will fire on their cron schedules, and the Prefect UI at http://localhost:4200 gives you per-pipeline run timelines, retry buttons, and per-task logs โ basically the operational dashboard you'd build if you had a spare weekend.
A few things worth knowing:
sluice deployis idempotent โ run it again after editing a pipeline's cron and Prefect will pick up the change.- The worker pool name (
--pool default) is configurable. Create separate pools for GPU-heavy pipelines vs lightweight ones. - If Prefect is down, sluice still works fine from the CLI. Prefect wraps sluice; sluice doesn't require Prefect.
- Run history and logs live in a local SQLite database alongside your sluice state โ no external Postgres needed for single-machine setups.
Prefect is fully optional. Start with raw cron if you're just kicking the tires,
add sluice deploy when you want visibility, and upgrade to a Prefect server
when you have enough pipelines to justify a dashboard.
Docker deployment
You don't need Docker to run sluice โ a Python venv works perfectly. But when you're deploying to a VPS, a home server, or a cron-triggered cloud function, containers make life simpler. Here are the two paths.
Docker files live in scripts/docker/.
The compose files use the published GHCR image by default. Set SLUICE_IMAGE
in scripts/docker/.env to pin a version for production.
Path A: Standalone โ run once, exit
Good for: serverless cron jobs, CI pipelines, or simply "I want to call sluice from a systemd timer without installing Python tooling on the host."
cd scripts/docker
# Copy your configs and .env into place
cp -r ../../configs ./configs
cp .env.example .env
$EDITOR .env
# Run once
docker compose run --rm sluice run ai_news
# Dry-run
docker compose run --rm sluice run ai_news --dry-run
# Validate configs
docker compose run --rm sluice validate
The compose file mounts ./configs (read-only) and ./data (writable SQLite
state). Tweak the cron trigger on your host (systemd timer, Kubernetes CronJob,
AWS EventBridge โ whatever you already use) to invoke docker compose run --rm.
The container starts, runs the pipeline, shuts down. No long-running process,
no port binding, no stateful server.
Path B: With Prefect โ always-on scheduler
Good for: multiple pipelines with overlapping schedules, run history you can inspect, a UI for retries and debugging.
cd scripts/docker
cp -r ../../configs ./configs
cp .env.example .env
$EDITOR .env
docker compose -f docker-compose.prefect.yml up
This starts three things:
- Prefect server โ API + dashboard at http://localhost:4200
- sluice deploy โ auto-registers all cron-enabled pipelines on container start
- Prefect worker โ picks up scheduled runs as they arrive
Data (SQLite state, run history, cached articles) is persisted across restarts via Docker volumes. If you stop the container and bring it back up next week, your pipeline state is intact.
Build your own image
If the published image doesn't fit your setup, build from source:
# Build from repo root
docker build -f scripts/docker/Dockerfile -t sluice:local .
# Run
docker run --rm \
-v $(pwd)/configs:/app/configs:ro \
-v $(pwd)/data:/app/data \
sluice:local run ai_news
The image includes all optional dependencies ([all] extras), so Telegram,
Feishu, Email sinks, Prometheus metrics, and the HN enricher are ready to go
out of the box.
Generating an Authorization: Basic header value
Some self-hosted services (crawl4ai, private Firecrawl instances, internal
proxies) use HTTP Basic Authentication. The header value is
Basic <base64(username:password)>. Generate it with any of these:
# Python (no dependencies)
python3 -c "import base64; print('Basic ' + base64.b64encode(b'alice:s3cr3t').decode())"
# OpenSSL
printf 'alice:s3cr3t' | openssl base64 | tr -d '\n' | sed 's/^/Basic /'
# GNU coreutils
printf 'alice:s3cr3t' | base64 | tr -d '\n' | sed 's/^/Basic /'
All three produce the same result: Basic YWxpY2U6czNjcjN0
Store the full string (including Basic ) in your .env, then reference it
from the fetcher config:
# .env
CRAWL4AI_AUTH=Basic YWxpY2U6czNjcjN0
# configs/sluice.toml โ [fetcher.crawl4ai] section
api_headers = { Authorization = "env:CRAWL4AI_AUTH" }
Never put raw credentials in TOML files or commit them to version control. Always use the
env:VAR_NAMEindirection shown above.
CI/CD
We ship sluice with the same workflows we use ourselves. Everything lives in
.github/workflows/.
ci.yml โ runs on every push and PR
Matrix over Python 3.11 through 3.14:
- Install dependencies (
uv sync --all-extras --frozen) ruff check .โ lintty checkโ type check (0 errors)pytest -q
publish.yml โ triggered on v*.*.* tags
Push a version tag (git tag v0.2.0 && git push --tags) and the rest is automatic:
- Verifies the tag matches
pyproject.tomlversion uv buildโdist/- Publishes to PyPI via OIDC trusted publishing โ no API tokens, no secret rotation
One-time PyPI setup:
- Create a
pypienvironment in GitHub โ Settings โ Environments - On PyPI โ sluice-pipeline โ Publishing โ add a trusted publisher:
- Owner:
nerdneilsfield - Repository:
sluice-pipeline - Workflow:
publish.yml - Environment:
pypi
- Owner:
After that, every git push --tags publishes a release. No manual steps, no expiring secrets.
Roadmap
โ v1 (now) โ RSS source, Notion sink, file_md sink, core processors, 4-tier LLM fallback, idempotent retries, dry-run, loguru diagnostics, Prefect scheduling, SSRF guard.
โ v1.1
- Push-channel sinks: Telegram, Feishu, Email
- Attachment mirroring (
mirror_attachmentsstage) - Enricher protocol +
hn_comments - Sub-daily pipelines (
run_key_template) -
sortandlimitstages -
field_filterops: lower, strip, regex_replace -
cross_dedupe,html_strip,score_tag, andsummarize_score_tagstages - Crawl4AI fetcher support
- Fetcher fallback (
on_all_failed) - URL cache size cap
- GC command + metrics + CLI audit viewer
- Lazy registry
๐ง v1.2
- Native Anthropic Messages API
- Per-tier worker counts
- Notion page cover image
- Plugin entry-points (auto-discovery for third-party plugins)
๐ฎ v2
- IMAP / email source
- GitHub repo sink (push markdown โ trigger build)
- RAG over historical summaries (semantic search across past digests)
Development
git clone https://github.com/nerdneilsfield/sluice-pipeline
cd sluice-pipeline
uv sync --all-extras # or pip install -e '.[dev,all]'
pytest -q # 408 tests
pytest --cov=sluice # 80% coverage
ruff check .
ty check # 0 errors
The architecture and design rationale live in
docs/superpowers/specs/ and the TDD-driven
implementation plan in docs/superpowers/plans/.
License
MIT ยฉ nerdneilsfield
โ open the gate. let the right water through. โ
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file sluice_pipeline-0.2.4.tar.gz.
File metadata
- Download URL: sluice_pipeline-0.2.4.tar.gz
- Upload date:
- Size: 119.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
3d4c6bd0ef478261b4547d8ec2c85f9c7f3d3e45c0a7a4c3fc02ebd0d5ec578e
|
|
| MD5 |
8e473a053ae34ca6adfb35df7c7743cb
|
|
| BLAKE2b-256 |
5676e67f43c6ba61def742d73a9bbbda0f4f99419a2c2cd689be8edf85d258be
|
Provenance
The following attestation bundles were made for sluice_pipeline-0.2.4.tar.gz:
Publisher:
publish.yml on nerdneilsfield/sluice-pipeline
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
sluice_pipeline-0.2.4.tar.gz -
Subject digest:
3d4c6bd0ef478261b4547d8ec2c85f9c7f3d3e45c0a7a4c3fc02ebd0d5ec578e - Sigstore transparency entry: 1417650286
- Sigstore integration time:
-
Permalink:
nerdneilsfield/sluice-pipeline@185b1b365b08d380e20f672a9138471ddfd59a62 -
Branch / Tag:
refs/tags/v0.2.4 - Owner: https://github.com/nerdneilsfield
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@185b1b365b08d380e20f672a9138471ddfd59a62 -
Trigger Event:
push
-
Statement type:
File details
Details for the file sluice_pipeline-0.2.4-py3-none-any.whl.
File metadata
- Download URL: sluice_pipeline-0.2.4-py3-none-any.whl
- Upload date:
- Size: 116.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
25f706720e6fd8c0f7ab312d9681f55fd8b2f7f01a76b33fc95dc28e1b61c865
|
|
| MD5 |
fcef2720fdd882c7b854ecb3f76d03ff
|
|
| BLAKE2b-256 |
e1605a07d681a22166f7324211cea046474d3c1cb737669bb089187b114facc4
|
Provenance
The following attestation bundles were made for sluice_pipeline-0.2.4-py3-none-any.whl:
Publisher:
publish.yml on nerdneilsfield/sluice-pipeline
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
sluice_pipeline-0.2.4-py3-none-any.whl -
Subject digest:
25f706720e6fd8c0f7ab312d9681f55fd8b2f7f01a76b33fc95dc28e1b61c865 - Sigstore transparency entry: 1417650298
- Sigstore integration time:
-
Permalink:
nerdneilsfield/sluice-pipeline@185b1b365b08d380e20f672a9138471ddfd59a62 -
Branch / Tag:
refs/tags/v0.2.4 - Owner: https://github.com/nerdneilsfield
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@185b1b365b08d380e20f672a9138471ddfd59a62 -
Trigger Event:
push
-
Statement type: