Skip to main content

Minimal append-only durability layer and replay API for MetaSPN signals and emissions

Project description

metaspn-store

metaspn-store provides a minimal append-only event store for MetaSPN signals and emissions.

Features

  • Filesystem JSONL backend (partitioned by UTC date)
  • Append-only writes for signals and emissions
  • Idempotent event writes keyed by signal_id / emission_id
  • Batched signal/emission write helpers
  • Replay checkpoint utilities for worker resume
  • Snapshot writes for deterministic state rebuild checkpoints
  • Streaming replay by time window, entity reference, and source

Layout

workspace/
  store/
    signals/
      2026-02-05.jsonl
    emissions/
      2026-02-05.jsonl
    checkpoints/
      ingestion_worker.json
    snapshots/
      system_state__2026-02-05T120000Z.json

Idempotency Contract

  • write_signal(..., on_duplicate="return_existing") and write_emission(..., on_duplicate="return_existing") are idempotent by stable ID.
  • Duplicate policies:
    • return_existing (default): do not append; return the path of the first existing record.
    • ignore: alias for return_existing.
    • raise: raise DuplicateEventError.
  • Duplicate detection uses a lazily-hydrated in-memory ID index sourced from existing JSONL partitions.
  • Replay iterators suppress duplicate IDs and yield the first-seen record deterministically.

M0 Ingestion Usage

from datetime import datetime, timezone
from metaspn_store import FileSystemStore

store = FileSystemStore("/workspace")

# Batch ingest parsed envelopes
store.write_signals(signal_batch)
store.write_emissions(emission_batch)

# Resume-friendly replay for workers
checkpoint = store.read_checkpoint("ingestion_worker")
to_process = store.iter_signals_from_checkpoint(
    start=datetime(2026, 2, 1, tzinfo=timezone.utc),
    end=datetime.now(timezone.utc),
    checkpoint=checkpoint,
)

processed = process_batch(to_process)
next_checkpoint = store.build_signal_checkpoint(processed)
if next_checkpoint is not None:
    store.write_checkpoint("ingestion_worker", next_checkpoint)

M1 Routing/Profile/Scoring Usage

from datetime import datetime, timezone
from metaspn_store import FileSystemStore

store = FileSystemStore("/workspace")
window_start = datetime(2026, 2, 1, tzinfo=timezone.utc)
window_end = datetime.now(timezone.utc)

# Recent context for profile/scorer
recent_profile = store.get_recent_signals_by_entity(entity_ref=entity_ref, limit=20)
recent_route_source = store.get_recent_signals_by_source(source="route.worker", limit=50)

# Resolver candidate streams
resolved_candidates = store.iter_entity_candidate_signals(
    start=window_start,
    end=window_end,
    resolved=True,
)
unresolved_candidates = store.iter_entity_candidate_signals(
    start=window_start,
    end=window_end,
    resolved=False,
)

# Stage-window replay with checkpoint resume
checkpoint = store.read_checkpoint("route_worker")
batch = list(
    store.iter_stage_window_signals(
        stage="route",
        start=window_start,
        end=window_end,
        checkpoint=checkpoint,
        payload_types=["RouteInput"],
    )
)
next_checkpoint = store.build_signal_checkpoint(batch)
if next_checkpoint is not None:
    store.write_checkpoint("route_worker", next_checkpoint)

M2 Recommendation/Digest Usage

from datetime import datetime, timezone
from metaspn_store import FileSystemStore

store = FileSystemStore("/workspace")
window_start = datetime(2026, 2, 1, tzinfo=timezone.utc)
window_end = datetime.now(timezone.utc)

# Ranking candidates for recommendations
top_candidates = store.get_top_recommendation_candidates(
    start=window_start,
    end=window_end,
    limit=25,
    sources=["score.worker"],
    payload_types=["RecommendationCandidate"],
    score_field="score",
)

# Daily digest snapshots
store.write_daily_digest_snapshot(
    day=window_end,
    digest={"top_candidates": [item.signal_id for item in top_candidates]},
)
digest = store.read_daily_digest_snapshot(window_end)

# Draft/approval read models
latest_drafts = store.get_latest_draft_signals(
    limit=20,
    start=window_start,
    end=window_end,
    sources=["draft.worker"],
)
latest_approval_outcomes = store.get_latest_approval_outcomes(
    limit=20,
    start=window_start,
    end=window_end,
    emission_types=["DraftApproved", "DraftRejected"],
)

# Replay recommendation events with checkpoint safety
checkpoint = store.read_checkpoint("recommend_worker")
events = list(
    store.iter_recommendation_signals(
        start=window_start,
        end=window_end,
        checkpoint=checkpoint,
        sources=["recommend.worker"],
        payload_types=["RecommendationCandidate"],
    )
)
next_checkpoint = store.build_signal_checkpoint(events)
if next_checkpoint is not None:
    store.write_checkpoint("recommend_worker", next_checkpoint)

M3 Learning/Calibration Usage

from datetime import datetime, timezone
from metaspn_store import FileSystemStore

store = FileSystemStore("/workspace")
window_start = datetime(2026, 2, 1, tzinfo=timezone.utc)
window_end = datetime.now(timezone.utc)

# Learning replay with checkpoint support
checkpoint = store.read_checkpoint("learning_worker")
learning_events = list(
    store.iter_learning_signals(
        start=window_start,
        end=window_end,
        checkpoint=checkpoint,
        sources=["learning.worker"],
        payload_types=["OutcomePending"],
    )
)
next_checkpoint = store.build_signal_checkpoint(learning_events)
if next_checkpoint is not None:
    store.write_checkpoint("learning_worker", next_checkpoint)

# Window buckets for evaluator workers
buckets = store.get_outcome_window_buckets(
    now=window_end,
    start=window_start,
    end=window_end,
    sources=["learning.worker"],
    pending_payload_types=["OutcomePending"],
    success_emission_types=["OutcomeSuccess"],
    failure_emission_types=["OutcomeFailure"],
)

# Calibration snapshots
store.write_calibration_snapshot(day=window_end, report={"bucket_count": len(buckets["pending"])})
calibration_report = store.read_calibration_snapshot(window_end)

Demo Bridge Usage

from datetime import datetime, timezone
from metaspn_store import FileSystemStore

store = FileSystemStore("/workspace")
day_start = datetime(2026, 2, 5, 0, 0, tzinfo=timezone.utc)
day_end = datetime(2026, 2, 5, 23, 59, tzinfo=timezone.utc)

posts = store.get_last_posts_by_entity(entity_ref=entity_ref, limit=10)
ready_candidates = store.get_ready_candidates(
    start=day_start,
    end=day_end,
    sources=["score.worker"],
    payload_types=["RecommendationCandidate"],
)
outcomes = store.get_outcomes_for_window(
    start=day_start,
    end=day_end,
    emission_types=["OutcomeSuccess", "OutcomeFailure"],
)

# Date-scoped artifacts remain deterministic on rerun (same file path rewritten)
store.write_daily_digest_snapshot(day=day_start, digest={"ready": [item.signal_id for item in ready_candidates]})
store.write_calibration_snapshot(day=day_start, report={"outcomes": [item.emission_id for item in outcomes]})

Token/Promise Usage

from datetime import datetime, timezone
from metaspn_store import FileSystemStore

store = FileSystemStore("/workspace")
start = datetime(2026, 2, 5, 0, 0, tzinfo=timezone.utc)
end = datetime(2026, 2, 5, 23, 59, tzinfo=timezone.utc)

token_rows = store.get_token_signals(
    start=start,
    end=end,
    token_id="tok-123",
    project_entity_id="proj-456",
)
promise_rows = store.get_promise_signals(
    start=start,
    end=end,
    promise_id="promise-1",
    status="READY",
)
promise_outcomes = store.get_promise_outcomes_for_window(
    start=start,
    end=end,
    promise_id="promise-1",
)

store.write_credibility_snapshot(day=start, report={"health": 0.92})
credibility = store.read_credibility_snapshot(start)

Release

python -m pip install -e ".[dev]"
pytest -q
python -m build
python -m twine check dist/*
python -m twine upload dist/*

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

metaspn_store-0.1.8.tar.gz (17.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

metaspn_store-0.1.8-py3-none-any.whl (11.1 kB view details)

Uploaded Python 3

File details

Details for the file metaspn_store-0.1.8.tar.gz.

File metadata

  • Download URL: metaspn_store-0.1.8.tar.gz
  • Upload date:
  • Size: 17.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for metaspn_store-0.1.8.tar.gz
Algorithm Hash digest
SHA256 4a947574ac17ccbd31708d90a05443cbca66164f700b204d82e3b7dc4fee6cd5
MD5 74efbe1d65aa2447e8104cbd4ec31db2
BLAKE2b-256 957ad2ea3bb35fce3d69f4a2b7f5970d2229ceac58e8b2baf0fda276b2c821c0

See more details on using hashes here.

Provenance

The following attestation bundles were made for metaspn_store-0.1.8.tar.gz:

Publisher: publish.yml on MetaSPN/metaspn-store

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file metaspn_store-0.1.8-py3-none-any.whl.

File metadata

  • Download URL: metaspn_store-0.1.8-py3-none-any.whl
  • Upload date:
  • Size: 11.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for metaspn_store-0.1.8-py3-none-any.whl
Algorithm Hash digest
SHA256 4bd0647d000285365d3b9e1f284772652220cf26061fc487cadb4fe964e20da0
MD5 1633108cf8e26d8f37281d177865f3bf
BLAKE2b-256 3196ebd01c9fb63075af215abe9d1fe94dd587f58ccf9d4e08f239e5b4bc1bcd

See more details on using hashes here.

Provenance

The following attestation bundles were made for metaspn_store-0.1.8-py3-none-any.whl:

Publisher: publish.yml on MetaSPN/metaspn-store

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page