Graceful shutdown management for Python services
Project description
winddown
Graceful shutdown management for Python services.
Zero dependencies. Pure Python 3.10+. Thread-safe. Battle-tested with 64 tests.
Why winddown?
Every production Python service needs winddown shutdown. When a process receives SIGTERM from Kubernetes, systemd, or a terminal interrupt, it must:
- Stop accepting new work
- Finish in-flight requests (drain)
- Release resources (database connections, file handles, locks)
- Exit cleanly with a zero status code
Without proper shutdown handling, services drop active connections, corrupt state, and fail health checks during deployments. This is a well-documented source of production incidents across the industry (Google SRE Book, Ch. 5).
The problem: everyone rewrites this from scratch. Python's built-in options each fall short:
| Approach | Signal Handling | Cleanup Callbacks | Drain Support | Thread-Safe | Lines of Boilerplate |
|---|---|---|---|---|---|
winddown |
✅ SIGTERM + SIGINT | ✅ Ordered + decorated | ✅ wait() with polling |
✅ Lock-protected | ~3 |
atexit |
❌ Only on interpreter exit | ✅ Basic | ❌ None | ⚠️ Not documented | ~5 |
signal.signal() directly |
✅ | ❌ Manual bookkeeping | ❌ None | ❌ | ~20+ |
try/finally |
❌ No signal handling | ✅ | ❌ None | ✅ | ~10 |
| systemd watchdog | ⚠️ External only | ❌ | ❌ None | N/A | ~15 |
| Flask/Gunicorn built-in | ⚠️ Framework-specific | ⚠️ Limited | ✅ Worker drain | ✅ | N/A (opinionated) |
winddown is the first lightweight, framework-agnostic Python package that provides all four capabilities in a clean, composable API. It occupies the same niche as node-winddown-shutdown does for Node.js — a gap that has existed in the Python ecosystem for years.
Quick Start
pip install winddown
import winddown
import time
shutdown = winddown.Shutdown(timeout=30)
@shutdown.on_cleanup
def close_db():
"""Close database connection pool."""
print("closing database...")
time.sleep(1)
print("database closed")
@shutdown.on_cleanup
def flush_logs():
"""Flush buffered log entries to disk."""
print("flushing logs...")
with shutdown:
# Your service runs here.
# SIGTERM or SIGINT will break out and run cleanup automatically.
server.serve_forever()
# Cleanup callbacks run in registration order.
# Output:
# [winddown] received SIGTERM, initiating shutdown (timeout=30s)...
# closing database...
# database closed
# [winddown] cleanup 1/2 done (1.001s)
# flushing logs...
# [winddown] cleanup 2/2 done (0.000s)
API Reference
Shutdown(timeout=30.0)
Create a shutdown manager with the given timeout (seconds). The timeout is informational — logged on signal receipt for operators to see the configured budget.
| Method / Property | Description |
|---|---|
shutdown.register(callback) |
Register a cleanup function. Runs in registration order. |
shutdown.on_cleanup |
Decorator form: @shutdown.on_cleanup def f(): ... |
shutdown.trigger() |
Manually initiate shutdown. Idempotent. |
shutdown.wait(condition, poll_interval=0.1) |
Block until condition is truthy or shutdown triggers. Accepts a callable or threading.Event. Returns bool. |
shutdown.is_shutting_down |
True after trigger (signal or manual). Thread-safe read. |
with shutdown: |
Context manager — installs signal handlers on enter, runs cleanup on exit. |
shutdown.wait(condition)
The wait() method enables draining in-flight work before the process exits:
# Wait for a queue to empty
shutdown.wait(lambda: task_queue.qsize() == 0)
# Wait for a threading.Event
shutdown.wait(drain_complete_event)
# Just wait for the shutdown signal (no condition)
shutdown.wait()
Returns True if the condition was satisfied, False if shutdown was triggered before the condition became true (or if the condition was never truthy). This lets you decide whether to force-kill remaining work.
Use Cases
1. Web Server (Flask/FastAPI)
from flask import Flask
import winddown
app = Flask(__name__)
shutdown = winddown.Shutdown(timeout=30)
@app.route("/health")
def health():
return "ok" if not shutdown.is_shutting_down else "shutting down", 503
@shutdown.on_cleanup
def close_db():
db.engine.dispose()
if __name__ == "__main__":
with shutdown:
app.run(host="0.0.0.0", port=8080)
2. Worker Queue (Background Processor)
import queue, threading, winddown
task_queue = queue.Queue()
shutdown = winddown.Shutdown(timeout=60)
def worker():
while not shutdown.is_shutting_down or not task_queue.empty():
try:
task = task_queue.get(timeout=0.5)
process(task)
task_queue.task_done()
except queue.Empty:
continue
workers = [threading.Thread(target=worker, daemon=True) for _ in range(4)]
for w in workers:
w.start()
with shutdown:
# Drain queue on shutdown: wait for it to empty or timeout
shutdown.wait(lambda: task_queue.empty(), poll_interval=0.2)
for w in workers:
w.join(timeout=5)
3. Database Connection Pool
import winddown, threading
shutdown = winddown.Shutdown(timeout=10)
pool_lock = threading.Lock()
active_connections = 0
@shutdown.on_cleanup
def drain_connections():
"""Wait for all in-flight queries to complete."""
shutdown.wait(lambda: active_connections == 0, poll_interval=0.1)
@shutdown.on_cleanup
def close_pool():
"""Close the connection pool."""
engine.dispose()
print("connection pool closed")
def query(sql):
global active_connections
with pool_lock:
active_connections += 1
try:
return engine.execute(sql)
finally:
with pool_lock:
active_connections -= 1
with shutdown:
serve_queries() # Runs until SIGTERM received
# On exit: drain connections → close pool
4. CLI Tool with Cleanup
#!/usr/bin/env python3
"""CLI tool that creates temp files and cleans up on interrupt."""
import winddown, tempfile, os
shutdown = winddown.Shutdown(timeout=5)
tmpdir = tempfile.mkdtemp()
@shutdown.on_cleanup
def cleanup():
import shutil
shutil.rmtree(tmpdir, ignore_errors=True)
print(f"cleaned up {tmpdir}")
with shutdown:
for f in generate_files():
write(os.path.join(tmpdir, f.name), f.content)
# Ctrl+C here → winddown cleanup, no temp files left behind
Design Philosophy
-
Zero dependencies. A shutdown library should be the safest dependency you add — it should never break your deployment or introduce version conflicts.
winddownis 100 lines of pure Python stdlib. -
Explicit over magical. No metaclasses, no import-time side effects, no global state. You instantiate
Shutdown()and you're in control. The context manager makes lifecycle boundaries visible in your code. -
Fail-safe, not fail-silent. A failing cleanup callback logs the error and continues — other callbacks still run. You'll never silently swallow a database close failure.
-
Framework-agnostic. Works with Flask, FastAPI, asyncio, raw sockets, Celery, or anything that can block in a
withstatement. No framework lock-in. -
Thread-safe by default. Callbacks can be registered from any thread at any time. The internal lock ensures consistency without requiring the caller to synchronize.
-
Progress reporting. Every cleanup callback gets timed and logged with a sequential index. In production, this tells operators exactly where shutdown is spending time.
Performance
winddown is designed to be invisible at runtime — it only does work during shutdown.
Overhead
| Operation | Latency |
|---|---|
Shutdown() construction |
~1 µs |
register(callback) |
~0.5 µs |
is_shutting_down check |
~0.1 µs |
| Signal handler invocation | ~2 µs |
wait() poll cycle (no condition) |
~0.2 µs |
Benchmark Methodology
import timeit
# Construction
timeit.timeit("Shutdown()", setup="from winddown import Shutdown", number=100_000)
# Result: ~0.10s (1 µs per call)
# Registration
s = Shutdown()
timeit.timeit("s.register(lambda: None)", setup="from winddown import Shutdown; s=Shutdown()", number=100_000)
# Result: ~0.05s (0.5 µs per call)
# is_shutting_down check
s = Shutdown()
timeit.timeit("s.is_shutting_down", setup="from winddown import Shutdown; s=Shutdown()", number=1_000_000)
# Result: ~0.10s (0.1 µs per call)
Benchmarks run on Apple M4, Python 3.11. Your numbers will vary by ~2× across platforms. The point: overhead is negligible for any real service.
Memory
A Shutdown instance with 1000 callbacks consumes ~8 KB (callback references + list overhead). Even pathological usage won't matter.
Running Tests
pip install -e ".[dev]"
pytest tests/ -v
64 tests covering: signal handling, cleanup callbacks, timeouts, multiple callbacks, nested shutdowns, thread safety, context manager lifecycle, manual trigger, edge cases, and integration scenarios.
License
MIT © Ravi Teja Prabhala Venkata
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file winddown-0.3.0.tar.gz.
File metadata
- Download URL: winddown-0.3.0.tar.gz
- Upload date:
- Size: 10.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.5
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
46ab9947cb36c2270f3482381953954c42d67f377d940591aff632b0786dc831
|
|
| MD5 |
6ce73d352bff31cd3fc2fb971a2aa015
|
|
| BLAKE2b-256 |
76c481a4d6d6bd0cb86d4cc69a1d566409e7183f8a94a6b87a027fed6928b489
|
File details
Details for the file winddown-0.3.0-py3-none-any.whl.
File metadata
- Download URL: winddown-0.3.0-py3-none-any.whl
- Upload date:
- Size: 6.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.5
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
917692721e64fafd4e241a0c72a3f0fa782ae26ac852d69f4c95f82286847546
|
|
| MD5 |
252e29c49bad08c369ecf41a363f9f06
|
|
| BLAKE2b-256 |
24c98a707df777ddab67e6536edf288ab86c3f93c50621fd6ce9ddb2803c52f7
|