Distributed task coordination using PostgreSQL - ensures singleton execution across multiple workers/pods
Project description
pglease
Distributed task coordination for Python using PostgreSQL.
Ensures singleton execution of tasks across multiple workers, pods, or processes. Simple, reliable, and production-ready.
Why pglease?
- ✅ Simple API - Context managers, decorators, or explicit acquire/release
- ✅ Production-Ready - Thread-safe, tested, with proper error handling
- ✅ Observable - Query who holds what lease and for how long
- ✅ Async Support - Full asyncio integration with
AsyncPGLease - ✅ Fast Failover - Optional hybrid backend for <1s recovery when workers crash
- ✅ No New Infrastructure - Uses your existing PostgreSQL database
The Problem
You have multiple replicas of your application running (Kubernetes pods, containers, or processes), but certain tasks should only run on one worker at a time:
- 🔧 Database migrations
- ⏰ Scheduled jobs (cron-like tasks)
- 📊 Report generation
- 🔄 Data synchronization
- ⚡ Leader election
- 🎯 Any critical section needing cluster-wide mutual exclusion
pglease solves this by turning PostgreSQL into a distributed lock manager.
Quick Start
Installation
pip install pglease
30-Second Example
from pglease import PGLease
# Connect to your PostgreSQL database
pglease = PGLease("postgresql://user:pass@localhost/db")
# Only one worker executes this across your entire cluster
with pglease.acquire("daily-report", ttl=300) as acquired:
if acquired:
generate_daily_report() # This runs on exactly one worker
That's it! All other workers will skip the task while one holds the lease.
Adding to an Existing Project
Step 1: Install pglease
# Add to your requirements.txt
pip install pglease
# Or with poetry
poetry add pglease
# Or with pipenv
pipenv install pglease
Step 2: Set Up the Database Table
pglease creates the lease table automatically on first use, but you can also create it manually:
-- Run this migration on your PostgreSQL database
CREATE TABLE IF NOT EXISTS pglease_leases (
task_name VARCHAR(255) PRIMARY KEY,
owner_id VARCHAR(255) NOT NULL,
acquired_at TIMESTAMPTZ NOT NULL,
expires_at TIMESTAMPTZ NOT NULL,
heartbeat_at TIMESTAMPTZ NOT NULL
);
-- Optional: Add index for faster lease expiry queries
CREATE INDEX IF NOT EXISTS idx_pglease_expires_at ON pglease_leases(expires_at);
Django: Create a migration:
# myapp/migrations/0002_create_pglease_table.py
from django.db import migrations
class Migration(migrations.Migration):
dependencies = [
('myapp', '0001_initial'),
]
operations = [
migrations.RunSQL(
sql="""
CREATE TABLE IF NOT EXISTS pglease_leases (
task_name VARCHAR(255) PRIMARY KEY,
owner_id VARCHAR(255) NOT NULL,
acquired_at TIMESTAMPTZ NOT NULL,
expires_at TIMESTAMPTZ NOT NULL,
heartbeat_at TIMESTAMPTZ NOT NULL
);
""",
reverse_sql="DROP TABLE IF EXISTS pglease_leases;"
),
]
Alembic (Flask/SQLAlchemy):
# alembic/versions/xxx_add_pglease_table.py
def upgrade():
op.execute("""
CREATE TABLE IF NOT EXISTS pglease_leases (
task_name VARCHAR(255) PRIMARY KEY,
owner_id VARCHAR(255) NOT NULL,
acquired_at TIMESTAMPTZ NOT NULL,
expires_at TIMESTAMPTZ NOT NULL,
heartbeat_at TIMESTAMPTZ NOT NULL
);
""")
def downgrade():
op.execute("DROP TABLE IF EXISTS pglease_leases;")
Step 3: Initialize pglease in Your Application
Django Example:
# myapp/coordination.py
import os
from pglease import PGLease
from django.conf import settings
# Create a singleton instance
_pglease = None
def get_coordinator():
global _pglease
if _pglease is None:
db_url = settings.DATABASES['default']
connection_string = (
f"postgresql://{db_url['USER']}:{db_url['PASSWORD']}"
f"@{db_url['HOST']}:{db_url['PORT']}/{db_url['NAME']}"
)
_pglease = PGLease(
connection_string,
owner_id=os.environ.get('HOSTNAME', 'django-worker'),
)
return _pglease
# Use in your management commands
from django.core.management.base import BaseCommand
class Command(BaseCommand):
def handle(self, *args, **options):
coordinator = get_coordinator()
with coordinator.acquire("daily-report", ttl=300) as acquired:
if acquired:
self.generate_report()
else:
self.stdout.write("Another worker is generating the report")
Flask Example:
# app/__init__.py
from flask import Flask
from pglease import PGLease
import os
app = Flask(__name__)
# Initialize coordinator
pglease = PGLease(
os.environ.get('DATABASE_URL'),
owner_id=os.environ.get('DYNO', 'flask-worker'), # Heroku
)
# Use in scheduled tasks (e.g., with Flask-APScheduler)
@app.route('/tasks/cleanup')
def cleanup_task():
with pglease.acquire("cleanup", ttl=300) as acquired:
if acquired:
# Do cleanup work
return {"status": "completed"}
return {"status": "skipped", "reason": "another worker is running"}
# Cleanup on shutdown
@app.teardown_appcontext
def shutdown_coordinator(exception=None):
pglease.close()
FastAPI Example:
# app/main.py
from fastapi import FastAPI
from pglease import AsyncPGLease # Use async version
import os
app = FastAPI()
pglease = None
@app.on_event("startup")
async def startup():
global pglease
pglease = AsyncPGLease(os.environ['DATABASE_URL'])
@app.on_event("shutdown")
async def shutdown():
await pglease.close()
@app.post("/tasks/sync")
async def sync_task():
async with pglease.acquire("data-sync", ttl=600) as acquired:
if acquired:
await perform_sync()
return {"status": "completed"}
return {"status": "skipped"}
Celery Example:
# tasks.py
from celery import Celery
from pglease import PGLease
import os
app = Celery('tasks')
pglease = PGLease(os.environ['DATABASE_URL'])
@app.task
def process_batch():
"""Ensure only one worker processes batches at a time"""
with pglease.acquire("batch-processing", ttl=1800) as acquired:
if acquired:
# Process the batch
do_batch_work()
else:
# Another worker is processing, safe to skip
pass
Step 4: Configure Environment Variables
# .env file
DATABASE_URL=postgresql://user:pass@localhost:5432/mydb
# Kubernetes - use pod name as owner_id
HOSTNAME=my-app-pod-xyz-123
# Heroku - dyno name
DYNO=web.1
# Docker Compose
HOSTNAME=worker-1
Step 5: Use in Your Application
For Database Migrations:
# Ensure migration runs only once across all pods
with pglease.acquire("db-migration-v2", ttl=600, raise_on_failure=True):
apply_migration()
For Scheduled Tasks:
# Cron job that should run on one worker only
@pglease.singleton_task("hourly-sync", ttl=3600)
def hourly_data_sync():
sync_external_api()
# Call from all workers - only one executes
hourly_data_sync()
For Background Jobs:
# Run at app startup, but only on one instance
if pglease.try_acquire("cache-warmup", ttl=300):
try:
warm_cache()
finally:
pglease.release("cache-warmup")
Step 6: Testing Strategy
Local Development:
# Use a separate database or table for testing
pglease = PGLease(
os.environ.get('TEST_DATABASE_URL', 'postgresql://localhost/test'),
table_name='pglease_test_leases' # Separate table
)
Unit Tests:
import pytest
from pglease import PGLease
@pytest.fixture
def coordinator(postgresql):
"""Provide a test coordinator with clean database"""
pglease = PGLease(f"postgresql://localhost/{postgresql.info.dbname}")
yield pglease
pglease.cleanup_expired() # Clean up
pglease.close()
def test_singleton_task(coordinator):
# First acquisition should succeed
assert coordinator.try_acquire("test-task", ttl=60) is True
# Second acquisition should fail (lease held)
assert coordinator.try_acquire("test-task", ttl=60) is False
# After release, should succeed again
coordinator.release("test-task")
assert coordinator.try_acquire("test-task", ttl=60) is True
Common Integration Patterns
Pattern 1: Singleton Service Initialization
# Run expensive initialization only once
if pglease.try_acquire("ml-model-load", ttl=120):
try:
load_ml_model() # Only one worker loads this
finally:
pglease.release("ml-model-load")
Pattern 2: Leader Election
# One worker becomes the leader
def run_as_leader():
if pglease.try_acquire("leader", ttl=30):
try:
while True:
do_leader_work()
time.sleep(10)
# Heartbeat keeps lease alive
finally:
pglease.release("leader")
Pattern 3: Preventing Duplicate Webhooks
@app.post("/webhook/payment")
def payment_webhook(payload):
webhook_id = payload['id']
# Prevent duplicate processing if webhook is retried
with pglease.acquire(f"webhook-{webhook_id}", ttl=300) as acquired:
if acquired:
process_payment(payload)
return {"status": "processed"}
return {"status": "duplicate", "message": "Already processing"}
Migrating from Other Solutions
From Redis locks:
# Before (with redis-py)
import redis
r = redis.Redis()
with r.lock("my-task", timeout=60):
do_work()
# After (with pglease)
from pglease import PGLease
pglease = PGLease(db_url)
with pglease.acquire("my-task", ttl=60, raise_on_failure=True):
do_work()
From Celery Beat:
# Before: Single Celery Beat instance (SPOF)
# After: All workers can try, one executes
@pglease.singleton_task("daily-task", ttl=3600)
def daily_task():
generate_report()
Usage
from pglease import PGLease
pglease = PGLease("postgresql://user:pass@localhost/db")
# Context manager (recommended)
with pglease.acquire("my-task", ttl=60) as acquired:
if acquired:
run_migration()
# Raise on failure instead of silently skipping
with pglease.acquire("my-task", ttl=60, raise_on_failure=True):
run_migration()
# Decorator
@pglease.singleton_task("cleanup", ttl=300)
def cleanup_old_data():
# Only one worker executes this
pass
# Explicit control
if pglease.try_acquire("sync-job", ttl=120):
try:
sync_data()
finally:
pglease.release("sync-job")
How It Works
Uses PostgreSQL for coordination. Each task gets a row in a lease table with an expiry time. Workers try to acquire the lease atomically using SELECT FOR UPDATE. Background threads send heartbeats to keep leases alive during long-running tasks.
If a worker crashes, its lease expires (TTL) and another worker can take over.
Backends
Standard Backend (PostgresBackend)
Uses a lease table for coordination. Gives you observability and rich metadata.
CREATE TABLE pglease_leases (
task_name VARCHAR(255) PRIMARY KEY,
owner_id VARCHAR(255) NOT NULL,
acquired_at TIMESTAMPTZ NOT NULL,
expires_at TIMESTAMPTZ NOT NULL,
heartbeat_at TIMESTAMPTZ NOT NULL
);
You can query this table to see who's holding what:
SELECT task_name, owner_id, expires_at - NOW() AS remaining
FROM pglease_leases WHERE expires_at > NOW();
Trade-off: If a worker crashes, others wait for TTL to expire (10–30 s typically). This is fine for most use cases like migrations or hourly jobs.
Hybrid Backend (HybridPostgresBackend)
Combines PostgreSQL advisory locks with the lease table. Advisory locks give you instant failover (<1 s) when a worker crashes, while the lease table gives you observability.
from pglease import PGLease, HybridPostgresBackend
backend = HybridPostgresBackend("postgresql://user:pass@localhost/db")
pglease = PGLease(backend)
Use this when you need fast recovery and can't wait for TTL expiry.
Configuration
pglease = PGLease(
"postgresql://user:pass@localhost/db",
owner_id="pod-xyz-123", # Auto-generated if not provided
heartbeat_interval=10, # Heartbeat every 10 seconds
on_lease_lost=handle_lost, # Called when a heartbeat fails (see below)
)
Both PostgresBackend and HybridPostgresBackend accept additional options:
from pglease.backends.postgres import PostgresBackend
backend = PostgresBackend(
"postgresql://user:pass@localhost/db",
connect_timeout=10, # Seconds before a connection attempt times out
pool_size=4, # Max simultaneous DB connections (default: 1)
)
pglease = PGLease(backend)
When pool_size > 1 a psycopg2.pool.ThreadedConnectionPool is used so multiple threads can perform DB operations concurrently instead of serialising through a single connection.
Using an External Connection Pool or Dependency Injection
If your application already manages a database connection pool (Django ORM, SQLAlchemy, psycopg2 ThreadedConnectionPool, or any DI container), you can hand pglease a connection factory instead of a connection string. pglease will borrow a connection for each operation and hand it back immediately afterward — it will never call connection.close() and the pool lifecycle is entirely yours.
A connection factory is a zero-argument callable that returns a context manager yielding a psycopg2-compatible connection. pglease calls commit() on success and rollback() on failure inside the context; your factory's __exit__ is responsible only for returning the connection to the pool.
psycopg2 ThreadedConnectionPool
from contextlib import contextmanager
from psycopg2 import pool
from psycopg2.extras import RealDictCursor
from pglease import PGLease
from pglease.backends.postgres import PostgresBackend
pg_pool = pool.ThreadedConnectionPool(
minconn=2, maxconn=10,
dsn="postgresql://user:pass@host/db",
cursor_factory=RealDictCursor,
)
@contextmanager
def borrow():
conn = pg_pool.getconn()
try:
yield conn
finally:
pg_pool.putconn(conn)
backend = PostgresBackend.from_connection_factory(borrow)
pglease = PGLease(backend)
SQLAlchemy engine
from contextlib import contextmanager
from sqlalchemy import create_engine
from pglease import PGLease
from pglease.backends.postgres import PostgresBackend
engine = create_engine("postgresql+psycopg2://user:pass@host/db")
@contextmanager
def sa_conn():
conn = engine.raw_connection()
try:
yield conn
finally:
conn.close() # returns the connection to SQLAlchemy's pool
backend = PostgresBackend.from_connection_factory(sa_conn)
pglease = PGLease(backend)
Django (reuse the ORM connection)
from contextlib import contextmanager
from django.db import connection as django_conn
from pglease import PGLease
from pglease.backends.postgres import PostgresBackend
@contextmanager
def django_factory():
django_conn.ensure_connection()
yield django_conn.connection
# pglease commits/rolls back; Django manages the connection lifetime.
backend = PostgresBackend.from_connection_factory(django_factory)
pglease = PGLease(backend)
FastAPI / dependency injection
from contextlib import contextmanager
import psycopg2
from pglease import PGLease
from pglease.backends.postgres import PostgresBackend
# Shared connection pool, created once at application startup
_pool = psycopg2.pool.ThreadedConnectionPool(2, 10, dsn=DATABASE_URL)
@contextmanager
def get_db_conn():
conn = _pool.getconn()
try:
yield conn
finally:
_pool.putconn(conn)
# Single PGLease instance reused across all requests
pglease = PGLease(PostgresBackend.from_connection_factory(get_db_conn))
# ─── FastAPI endpoint ─────────────────────────────────────────────────────────
@app.post("/run-job")
async def run_job():
with pglease.acquire("my-job", ttl=60) as acquired:
if acquired:
do_work()
return {"ok": True}
Tip: When using a connection factory,
close()on the backend is a no-op — pglease will not close or invalidate any connection from the pool. You are responsible for callingpg_pool.closeall()(or the equivalent) at application shutdown.
In Kubernetes, use the pod name as owner_id:
import os
pglease = PGLease(
os.environ["DATABASE_URL"],
owner_id=os.environ.get("HOSTNAME"),
)
Waiting for a Lease
wait_for_lease() blocks until the lease becomes available or a timeout fires:
# Wait up to 2 minutes, polling every 5 seconds
pglease.wait_for_lease("nightly-report", ttl=300, timeout=120, poll_interval=5)
generate_report()
# Wait indefinitely
pglease.wait_for_lease("nightly-report", ttl=300, timeout=None)
Raises AcquisitionError on timeout. Passing timeout=0 raises ValueError — use try_acquire() for a single non-blocking attempt instead.
Asyncio Support
AsyncPGLease is a drop-in async wrapper that dispatches all blocking DB calls to a thread-pool executor so they never block the event loop:
import asyncio
from pglease import AsyncPGLease
async def main():
async with AsyncPGLease("postgresql://user:pass@localhost/db") as pglease:
async with pglease.acquire("my-task", ttl=60) as acquired:
if acquired:
await do_async_work()
# Or wait for the lease asynchronously
await pglease.wait_for_lease("my-task", ttl=60, timeout=120)
await do_async_work()
asyncio.run(main())
The underlying heartbeat threads continue to run in the background as normal.
Observability
List all leases
for lease in pglease.list_leases():
remaining = lease.time_remaining()
print(f"{lease.task_name}: {lease.owner_id} — {remaining:.0f}s left")
Detect zombie leases
A zombie lease has not expired yet but its heartbeat has gone silent, indicating a dead worker:
lease = pglease.get_lease("my-task")
if lease and lease.is_zombie(heartbeat_timeout=30):
# Worker is gone — safe to take over
pglease.try_acquire("my-task", ttl=60)
Clean up expired rows
Expired rows accumulate when workers crash without calling release(). Call this periodically to keep the table tidy:
deleted = pglease.cleanup_expired()
print(f"Removed {deleted} stale lease(s)")
React to heartbeat failure
Supply on_lease_lost to be notified (on the heartbeat thread) when a background heartbeat fails:
def handle_lost(task_name: str) -> None:
logging.critical("Lost lease %s — aborting!", task_name)
os.abort()
pglease = PGLease(url, on_lease_lost=handle_lost)
Common Use Cases
| Use Case | Example | TTL Recommendation |
|---|---|---|
| Database Migrations | Apply schema changes once | 600s (10 min) |
| Scheduled Jobs | Hourly data sync, cleanup tasks | 3600s (1 hour) |
| Report Generation | Daily/weekly reports | 1800s (30 min) |
| Leader Election | Master selection in cluster | 30s (fast failover) |
| Batch Processing | Process queues without duplication | 300s (5 min) |
| Cache Warming | Rebuild cache on one instance | 120s (2 min) |
Features
- Multiple APIs: Context managers, decorators, or explicit control
- Automatic Heartbeats: Keep leases alive during long-running tasks
- Graceful Failover: When a worker crashes, lease expires (TTL) and another takes over
- Observability: Query lease table to see who owns what
- Zombie Detection: Identify dead workers still holding leases
- Async Support: Full asyncio integration
- Two Backends: Standard (lease table) or Hybrid (+ advisory locks for <1s failover)
- Thread-Safe: All operations properly synchronized
- Retry Logic: Resilient to transient database errors
Production Considerations
Error Handling
from pglease import PGLease, AcquisitionError
pglease = PGLease(url)
# Option 1: Silent skip if can't acquire
with pglease.acquire("task", ttl=60) as acquired:
if acquired:
do_work()
else:
logger.info("Another worker is handling this")
# Option 2: Raise exception if can't acquire (fail-fast)
try:
with pglease.acquire("critical-task", ttl=60, raise_on_failure=True):
do_critical_work()
except AcquisitionError:
logger.error("Could not acquire lease - another worker has it")
sys.exit(1)
Monitoring
# Detect zombie threads (should be zero in healthy system)
zombies = pglease.heartbeat_manager.get_zombie_threads()
if zombies:
alert(f"Zombie threads detected: {zombies}")
# List active leases
for lease in pglease.list_leases():
print(f"{lease.task_name}: {lease.owner_id} ({lease.time_remaining():.0f}s left)")
# Clean up expired leases periodically
deleted = pglease.cleanup_expired()
logger.info(f"Cleaned up {deleted} expired leases")
Kubernetes Example
import os
from pglease import PGLease
# Use pod name as owner_id for traceability
pglease = PGLease(
os.environ["DATABASE_URL"],
owner_id=os.environ.get("HOSTNAME"), # e.g., "web-pod-xyz-123"
heartbeat_interval=10,
on_lease_lost=lambda task: logger.critical(f"Lost lease: {task}")
)
@pglease.singleton_task("db-migration", ttl=600)
def run_migration():
"""Only runs on one pod even if 10 replicas are deployed"""
apply_schema_changes()
run_migration() # Safe to call from all pods
Development & Testing
# Setup development environment
python -m venv venv
source venv/bin/activate
pip install -e ".[dev]"
# Start test database
docker run -d -p 5432:5432 -e POSTGRES_PASSWORD=test postgres:16
# Run tests
export TEST_POSTGRES_URL="postgresql://postgres:test@localhost/postgres"
pytest # Run all tests
pytest --cov=src # With coverage report
pytest -v # Verbose output
# Code quality
ruff check src/ tests/ # Lint
ruff format src/ tests/ # Format
mypy src/ # Type check
# Build distribution
python -m build
# Install locally for testing
pip install -e .
Running the Demo
# See examples/ directory for complete working demos
cd examples/
python simple_demo.py
python kubernetes_demo.py
Contributing
Contributions welcome! Please:
- Fork the repository
- Create a feature branch (
git checkout -b feature/amazing-feature) - Add tests for new functionality
- Ensure all tests pass (
pytest) - Ensure code quality (
ruff check,mypy src/) - Submit a Pull Request
See CONTRIBUTING.md for detailed guidelines.
License
MIT
Comparison to Alternatives
| Feature | pglease | Redis Lock | ZooKeeper | Celery Beat |
|---|---|---|---|---|
| Infrastructure | PostgreSQL only | Requires Redis | Requires ZK cluster | Requires broker + beat |
| Failover Time | TTL-based (10-30s) or <1s (hybrid) | Manual/TTL | Automatic | Single point of failure |
| Observability | SQL queries | Limited | Complex API | Web UI |
| Setup Complexity | Low | Medium | High | High |
| Python Async | ✅ Full support | ✅ | ❌ | ✅ |
| Thread-Safe | ✅ | ✅ | ✅ | ⚠️ Beat only |
| Production Ready | ✅ | ✅ | ✅ | ⚠️ Single beat instance |
When to Use pglease
✅ You already use PostgreSQL
✅ You need simple distributed coordination
✅ You run multiple replicas/pods/workers
✅ You want observability via SQL
✅ You need automatic failover
✅ You don't want to manage additional infrastructure
When NOT to Use pglease
❌ You need sub-100ms lock acquisition
❌ You're coordinating 1000+ workers on a single task
❌ You don't have PostgreSQL
❌ You need distributed transactions (use PostgreSQL 2PC instead)
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file pglease-0.0.3.tar.gz.
File metadata
- Download URL: pglease-0.0.3.tar.gz
- Upload date:
- Size: 76.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
fa7bd3e2684a6d3f22ac17df44276370f2917ee2edfd1c2f2b38c2cfdf6be33a
|
|
| MD5 |
6f3611a5bdc514b9964f6eb416f32cc5
|
|
| BLAKE2b-256 |
6959e609203f676fcc121923f93de1bf0f3b671fc2140a2ae853f44331539c3e
|
Provenance
The following attestation bundles were made for pglease-0.0.3.tar.gz:
Publisher:
publish-pypi.yml on rguiu/pglease
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
pglease-0.0.3.tar.gz -
Subject digest:
fa7bd3e2684a6d3f22ac17df44276370f2917ee2edfd1c2f2b38c2cfdf6be33a - Sigstore transparency entry: 1235549772
- Sigstore integration time:
-
Permalink:
rguiu/pglease@9e7c765503f6d63530a66d455fa918ee587a45a8 -
Branch / Tag:
refs/tags/v0.0.3 - Owner: https://github.com/rguiu
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish-pypi.yml@9e7c765503f6d63530a66d455fa918ee587a45a8 -
Trigger Event:
workflow_dispatch
-
Statement type:
File details
Details for the file pglease-0.0.3-py3-none-any.whl.
File metadata
- Download URL: pglease-0.0.3-py3-none-any.whl
- Upload date:
- Size: 33.4 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
d9459e91b95771cfb63841bc9a30d23bf8fc059afca1ce3b6b42cd76d738ca73
|
|
| MD5 |
7e540fa192207c65967053e60cf3d17f
|
|
| BLAKE2b-256 |
58d16af34149bb0791b208390317cf701dafe5f15dfca7e627fee96ffaf26777
|
Provenance
The following attestation bundles were made for pglease-0.0.3-py3-none-any.whl:
Publisher:
publish-pypi.yml on rguiu/pglease
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
pglease-0.0.3-py3-none-any.whl -
Subject digest:
d9459e91b95771cfb63841bc9a30d23bf8fc059afca1ce3b6b42cd76d738ca73 - Sigstore transparency entry: 1235550177
- Sigstore integration time:
-
Permalink:
rguiu/pglease@9e7c765503f6d63530a66d455fa918ee587a45a8 -
Branch / Tag:
refs/tags/v0.0.3 - Owner: https://github.com/rguiu
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish-pypi.yml@9e7c765503f6d63530a66d455fa918ee587a45a8 -
Trigger Event:
workflow_dispatch
-
Statement type: