Skip to main content

General-purpose async Redis Streams consumer group library with DLQ, crash recovery, and monitoring

Project description

redis-stream-queue

Async Python library for Redis Streams consumer groups with built-in crash recovery, DLQ, and monitoring.

Features

  • Producer: push messages from any number of pods — XADD is atomic, no coordination needed
  • Consumer: callback-based read → ACK loop; partial ACK supported
  • Crash recovery: XAUTOCLAIM cursor loop sweeps full PEL per iteration; NOGROUP auto-recovery if stream deleted externally
  • Dead-letter queue: decode errors and poison pills (exceeding max_deliveries) routed to DLQ handler
  • Consumer metrics: consumer.metrics()tps_in / tps_out / tps_total (60s sliding window), avg TPS, read/acked/DLQ/error counters
  • Producer metrics: producer.metrics() — push TPS, total pushed, avg TPS, uptime
  • Process-wide aggregation: StreamConsumer.all_metrics() / StreamProducer.all_metrics() — collect from all live instances in this process via weakref registry; zero Redis overhead
  • Stream monitoring: lag, PEL size, per-consumer idle time, health checks via ConsumerGroup
  • Pluggable serializers: JSON (default), msgpack, pickle — or bring your own
  • Redis Cluster: from_cluster() and from_url() factory methods
  • Multi-pod safe: unique worker names auto-generated per pod ({group}_{hostname}_{rand4})

Requirements

  • Python ≥ 3.11
  • Redis ≥ 6.2 (XAUTOCLAIM support)

Installation

pip install redis-stream-queue

With msgpack support:

pip install redis-stream-queue[msgpack]

Quick Start

1. Start Redis

docker run -p 6379:6379 redis

2. Producer

import asyncio
from redis_stream_queue import StreamClient, StreamProducer

async def main():
    client = StreamClient(host="localhost")
    producer = StreamProducer(client=client, stream="orders", group="order_workers")

    await producer.ensure_group()           # idempotent — safe to call on every startup
    msg_id = await producer.push({"order_id": 1})
    print(f"pushed: {msg_id}")
    await client.close()

asyncio.run(main())

3. Consumer

import asyncio
from redis_stream_queue import StreamClient, StreamConsumer, ConsumerConfig

async def handle(messages):
    for msg in messages:
        print(f"processing: {msg.data}")
    return [m.id for m in messages]     # return IDs to ACK; omit to leave in PEL

async def on_dlq(msg, reason):
    print(f"DLQ [{reason}]: {msg.data}")

async def main():
    client = StreamClient(host="localhost")
    config = ConsumerConfig(
        group="order_workers",
        dlq_stream="orders_dlq",
        batch_size=100,
        block_ms=5_000,
        max_deliveries=3,
    )
    consumer = StreamConsumer(
        client=client,
        stream="orders",
        config=config,
        handler=handle,
        dlq_handler=on_dlq,
    )
    await consumer.run()    # infinite loop; Ctrl-C / CancelledError to stop

asyncio.run(main())

Handler contract: return a list of IDs to ACK. Return [] to ACK nothing (messages stay in PEL for retry). Never return None — that triggers a warning and no ACK.

4. Consumer Throughput Metrics

consumer.metrics() is non-blocking and makes no Redis calls — safe to poll from any monitoring loop or health endpoint.

import asyncio
from redis_stream_queue import StreamClient, StreamConsumer, ConsumerConfig

async def monitor(consumer):
    while True:
        m = consumer.metrics()
        print(
            f"in={m.tps_in:.1f} msg/s  out={m.tps_out:.1f} msg/s  total={m.tps_total:.1f} msg/s  "
            f"avg={m.avg_tps:.1f} msg/s  "
            f"read={m.total_read}  acked={m.total_acked}  "
            f"dlq={m.total_dlq}  errors={m.total_errors}  "
            f"uptime={m.uptime_secs:.0f}s"
        )
        await asyncio.sleep(5)

async def main():
    client = StreamClient(host="localhost")
    config = ConsumerConfig(group="order_workers")
    consumer = StreamConsumer(client=client, stream="orders", config=config, handler=handle)

    await asyncio.gather(consumer.run(), monitor(consumer))
Field Type Description
tps_in float Reads/sec — XREADGROUP + XAUTOCLAIM, sliding 60s window
tps_out float Acked/sec — sliding 60s window
tps_total float tps_in + tps_out
avg_tps float total_acked / uptime_secs since first message
total_read int Messages pulled from stream (new + reclaimed via XAUTOCLAIM)
total_acked int Successfully processed and ACKed by handler
total_dlq int Routed to DLQ (decode_error + max_deliveries combined)
total_errors int Handler exceptions — message stays in PEL for retry
uptime_secs float Seconds since first message was processed

5. Producer Throughput Metrics

m = producer.metrics()
print(f"push={m.tps:.1f} msg/s  avg={m.avg_tps:.1f} msg/s  total={m.total_pushed}  uptime={m.uptime_secs:.0f}s")
Field Type Description
total_pushed int Messages pushed since instance creation
tps float Pushed/sec — sliding 60s window
avg_tps float total_pushed / uptime_secs since first push
uptime_secs float Seconds since first push

6. Process-Wide Metrics (Multiple Instances)

Each StreamConsumer and StreamProducer auto-registers in a process-level weakref registry on creation. Dead instances are evicted automatically by GC — no manual cleanup needed.

# Multiple consumers in same asyncio event loop
c1 = StreamConsumer(client=client, stream="orders", config=cfg_a, handler=handle_a)
c2 = StreamConsumer(client=client, stream="payments", config=cfg_b, handler=handle_b)
c3 = StreamConsumer(client=client, stream="events", config=cfg_c, handler=handle_c)

await asyncio.gather(c1.run(), c2.run(), c3.run())

# From a monitoring task running concurrently:
for m in StreamConsumer.all_metrics():
    print(f"in={m.tps_in:.1f}  out={m.tps_out:.1f}  acked={m.total_acked}")

# Multiple producers
p1 = StreamProducer(client=client, stream="orders")
p2 = StreamProducer(client=client, stream="payments")

for m in StreamProducer.all_metrics():
    print(f"tps={m.tps:.1f}  pushed={m.total_pushed}")

Multi-pod note: all_metrics() is in-process only — it sees instances in this pod, not other pods. For cross-pod aggregation, expose metrics via a health endpoint (FastAPI, Django, plain HTTP) and scrape with Prometheus or a similar tool. Each pod reports its own slice; your scraper aggregates across pods. Zero extra Redis IOPS.

7. Stream / Group Monitoring

from redis_stream_queue import StreamClient, ConsumerGroup

async def main():
    client = StreamClient(host="localhost")
    cg = ConsumerGroup(client, stream="orders", group="order_workers")

    # Stream-level stats (requires Redis calls)
    stats = await cg.stats(dlq_stream="orders_dlq")
    print(f"length={stats.stream_length}  lag={stats.lag}  pel={stats.group_pel_size}")
    for c in stats.consumers:
        print(f"  consumer={c.name}  pending={c.pending}  idle={c.idle_ms}ms")

    # Health check
    health = await cg.health_check(max_lag=1_000, max_idle_ms=60_000)
    print(f"healthy={health['healthy']}  issues={health['issues']}")

    # Inspect stuck messages
    pending = await cg.pending_details(count=50)
    for entry in pending:
        print(f"  {entry.id}  consumer={entry.consumer}  deliveries={entry.delivery_count}")

Configuration Reference

StreamClient

Param Default Description
host "localhost" Redis host
port 6379 Redis port
db 0 Redis DB index
username None AUTH username
password None AUTH password
prefix "" Key prefix prepended to all stream names ({prefix}_{stream})
max_connections 1000 Connection pool size
pool_timeout 5 Seconds to wait for a free connection
ssl False Enable TLS

Cluster / URL variants:

# Redis Cluster
client = StreamClient.from_cluster(
    startup_nodes=[{"host": "node1", "port": 6379}],
    password="secret",
)

# URL — single node
client = StreamClient.from_url("redis://localhost:6379/0")
client = StreamClient.from_url("rediss://user:pass@host:6380/0")   # TLS

# URL — cluster
client = StreamClient.from_url("redis+cluster://node1:6379")
client = StreamClient.from_url("rediss+cluster://node1:6380")      # TLS cluster

Cluster key rule: stream and DLQ stream must share a hash tag to land on the same slot:

stream="{orders}_main", dlq_stream="{orders}_dlq"

ConsumerConfig

Param Default Description
group required Consumer group name
worker_name auto Unique consumer name per pod. Auto = {group}_{hostname}_{rand4} (group part truncated if needed to preserve hostname+suffix)
dlq_stream None Stream name to route poison pills and decode errors to
dlq_group None Consumer group for the DLQ stream
batch_size 100 Max messages per XREADGROUP / XAUTOCLAIM call
block_ms 5000 XREADGROUP block timeout (ms) — 0 = non-blocking
min_idle_claim_ms 10000 XAUTOCLAIM idle threshold (ms). Set to at least 2× max handler latency
max_deliveries 3 Delivery count before message is routed to DLQ
max_stream_size 100000 Approximate XADD MAXLEN trim
max_claim_passes None Max XAUTOCLAIM cursor iterations per run_once(). None = sweep full PEL. Set to 1 to restore single-pass behavior

Serializers

from redis_stream_queue import JsonSerializer, MsgpackSerializer, PickleSerializer

# Serializer must match on both producer and consumer
producer = StreamProducer(..., serializer=MsgpackSerializer())
consumer = StreamConsumer(..., serializer=MsgpackSerializer())
Serializer Extra Notes
JsonSerializer none Default; human-readable, broadest compat
MsgpackSerializer [msgpack] Smaller wire size, faster encode/decode
PickleSerializer none Any Python type; requires same Python version on both ends; do not use with untrusted data

Decode failures → dlq_handler(msg, "decode_error") + immediate XACK (no retry — corrupt data will always fail).

Custom serializer — implement the Serializer protocol:

from redis_stream_queue import Serializer

class CborSerializer:
    def encode(self, data: dict) -> bytes:
        import cbor2
        return cbor2.dumps(data)

    def decode(self, raw: bytes) -> dict:
        import cbor2
        return cbor2.loads(raw)

Multi-Pod Deployment

Producers

No coordination needed. XADD is atomic; Redis assigns unique IDs ({timestamp}-{seq}). All pods write to the same stream key safely.

Consumers

Each pod gets a unique worker_name. Redis distributes messages across consumers in the same group — each message delivered to exactly one pod.

Pod A: worker_name = "order_workers_pod-a_3821" ─┐
Pod B: worker_name = "order_workers_pod-b_9174" ─┼─ group "order_workers"
Pod C: worker_name = "order_workers_pod-c_0042" ─┘

Crash recovery: if a pod crashes, its unacknowledged PEL messages go idle. Other pods reclaim them via XAUTOCLAIM after min_idle_claim_ms. Set this to at least 2× your maximum handler latency.

NOGROUP recovery: if the stream or group is deleted externally (e.g. FLUSHALL, XGROUP DESTROY), the consumer detects the NOGROUP error, clears its entry from the group registry, and re-creates the group on the next iteration — no manual restart required.

Recommended settings for multi-pod:

Setting Value Reason
worker_name auto (default) Unique per pod
min_idle_claim_ms 2× max handler latency Avoid premature cross-pod reclaim
block_ms 5000 Balanced latency vs idle CPU
max_deliveries 3–5 Accounts for transient failures across restarts

Consumer Loop Internals

Each run_once() call executes four steps:

1. ensure()           — XGROUP CREATE mkstream; no-op if group known in class-level registry.
                        Registry entry removed on NOGROUP — re-creation runs on next iteration.

2. XREADGROUP ">"     — fetch new undelivered messages
   ├─ decode error    → dlq_handler(msg, "decode_error") + XACK (no retry)
   ├─ handler(msgs)   → XACK returned IDs; total_acked += n; tps_out tracker updated
   ├─ tps_in          updated with len(raw_messages)
   ├─ handler → None  → warning logged; no XACK (treat as explicit "ACK nothing")
   └─ unacked IDs     stay in PEL for XAUTOCLAIM recovery

3. XAUTOCLAIM cursor loop — reclaims msgs idle > min_idle_claim_ms
   ├─ follows cursor until Redis returns "0-0" (full PEL swept) or stall detected
   ├─ max_claim_passes caps iterations if set; None = unlimited (default)
   ├─ tps_in          updated with len(claimed) per batch
   └─ reclaimed msgs  → same handler → XACK; tps_out updated

4. XPENDING sweep     — find entries with delivery_count >= max_deliveries
   ├─ no dlq_handler  → warning logged with IDs; still ACKed (message cleared)
   ├─ msg missing from stream (XDEL'd) → warning logged; still ACKed
   └─ poison pills    → dlq_handler(msg, "max_deliveries") + batched XACK

run() wraps run_once() in an infinite loop:
   ├─ CancelledError  → re-raised immediately (clean shutdown)
   ├─ NOGROUP error   → registry entry cleared; sleep 1s; re-enter loop
   └─ any other error → logged; sleep 1s; re-enter loop

Metrics updated per iteration:

  • total_read incremented on every XREADGROUP / XAUTOCLAIM batch
  • tps_in sliding tracker records every read batch (new + reclaimed)
  • total_acked incremented after each successful handler → XACK
  • tps_out sliding tracker records every ack batch
  • total_dlq incremented for decode errors + poison pill ACKs
  • total_errors incremented for handler exceptions

Sequence Diagrams

Render with PlantUML, VS Code PlantUML extension, or IntelliJ PlantUML plugin.

1. Producer: Push Message

@startuml
title Producer — Push Message

participant "App" as App
participant "StreamProducer" as P
participant "StreamClient" as C
database "Redis Stream" as R

App -> P : push(data: dict)
P -> P : serializer.encode(data) → bytes
P -> C : push(stream, encoded_bytes, max_len)
C -> R : XADD {stream} MAXLEN ~ {max_len}\n  * data {bytes}
R --> C : message_id (e.g. "1700000000-0")
C --> P : message_id
P -> P : total_pushed += 1\ntps_tracker.record(1)
P --> App : message_id
@enduml

2. Consumer: Normal Message Processing

@startuml
title Consumer — Normal Message Processing (run_once step 2)

participant "StreamConsumer\nrun_once()" as Consumer
participant "StreamClient" as C
participant "handler()" as H
database "Redis PEL" as PEL

Consumer -> C : read(stream, group, worker_name,\n      count=batch_size, block_ms)
C -> PEL : XREADGROUP GROUP {group} {worker}\n  COUNT {n} BLOCK {ms} STREAMS {stream} >
PEL --> C : [(id1, fields), (id2, fields), ...]
note right of PEL : Messages enter PEL\n(pending until ACKed)
C --> Consumer : [StreamMessage, ...]

Consumer -> Consumer : tps_in.record(n)\ntotal_read += n
Consumer -> Consumer : serializer.decode(raw) per message
Consumer -> H : handler(decoded_messages)
H --> Consumer : [acked_ids]

Consumer -> C : ack(stream, group, *acked_ids)
C -> PEL : XACK {stream} {group} {id1} {id2} ...
note right of PEL : Messages removed from PEL
Consumer -> Consumer : total_acked += len(acked_ids)\ntps_out.record(n)

note over Consumer : Unacked IDs stay in PEL\nfor crash recovery
@enduml

3. Crash Recovery: XAUTOCLAIM Cursor Loop

@startuml
title Crash Recovery — XAUTOCLAIM Cursor Loop (run_once step 3)

participant "Pod A" as A
participant "Pod B\nrun_once()" as B
database "Redis PEL" as PEL

== Pod A: reads, starts processing, then crashes ==
A -> PEL : XREADGROUP → msg_001 assigned to Pod A
note over A : Pod A crashes.\nmsg_001 never ACKed.\nSits in PEL, idle growing.

== Pod B: autoclaim cursor loop sweeps full PEL ==
loop cursor != "0-0" and no stall
    B -> PEL : XAUTOCLAIM group={group} consumer={B}\n  min_idle_time={ms} count={n} start_id={cursor}
    PEL --> B : (next_cursor, [reclaimed_msgs])
    B -> B : tps_in.record(n)\n_process_batch(reclaimed_msgs)
    B -> B : cursor = next_cursor
end
note right of PEL : Loop exits when cursor="0-0"\n(full PEL swept) or stall detected.\nmax_claim_passes caps iterations if set.

B -> PEL : XACK {stream} {group} msg_001
note right of PEL : msg_001 cleared from PEL
@enduml

4. Decode Error → Immediate DLQ

@startuml
title Decode Error — Immediate DLQ (run_once step 2)

participant "_process_batch()" as Batch
participant "RetryHandler" as Retry
participant "dlq_handler()" as DLQ
database "Redis PEL" as PEL

Batch -> Batch : serializer.decode(raw) — raises Exception
note right of Batch : Corrupt or wrong format.\nCannot retry — will always fail.

Batch -> Retry : send_to_dlq(msg, "decode_error")
Retry -> DLQ : dlq_handler(msg, "decode_error")
DLQ --> Retry : (done; dlq handler errors swallowed)

Batch -> PEL : XACK bad_msg_id
Batch -> Batch : total_dlq += 1
note right of PEL : Removed immediately.\nWill NOT re-enter consumer\nloop for more retries.
@enduml

5. Poison Pill → DLQ After Max Deliveries

@startuml
title Poison Pill — DLQ After max_deliveries (run_once step 4)

participant "RetryHandler\nhandle_poison_pills()" as Retry
participant "dlq_handler()" as DLQ
database "Redis PEL" as PEL
database "Redis Stream" as Stream

note over PEL : msg_999 redelivered\ndelivery_count >= max_deliveries

Retry -> PEL : XPENDING_RANGE {stream} {group}\n  - + count={batch_size}
PEL --> Retry : [PendingEntry(id=msg_999, delivery_count=5)]

Retry -> Retry : filter: delivery_count >= max_deliveries

alt message exists in stream
    Retry -> Stream : XRANGE {stream} msg_999 msg_999 COUNT 1
    Stream --> Retry : raw bytes
    Retry -> Retry : try_decode(raw)
    Retry -> DLQ : dlq_handler(decoded_msg, "max_deliveries")
else message deleted from stream (XDEL)
    Retry -> Retry : log warning "missing from stream"\n(no DLQ call — nothing to forward)
end

Retry -> PEL : XACK {stream} {group} *all_poison_ids  (batched)
note right of PEL : Poison pills cleared.\nNever retried again.
@enduml

6. NOGROUP Auto-Recovery

@startuml
title NOGROUP Auto-Recovery

participant "run() loop" as Loop
participant "ConsumerGroup" as CG
database "Redis" as R
participant "run_once()" as Once

Loop -> Once : await run_once()
Once -> CG : ensure()  [key in _known → skipped]
Once -> R : XREADGROUP ...
R --> Once : ResponseError: NOGROUP ...
Once --> Loop : raises Exception("NOGROUP ...")

Loop -> Loop : "NOGROUP" in str(e) → True
Loop -> CG : reset()  [removes key from _known]
Loop -> Loop : sleep(1)

Loop -> Once : await run_once()  [next iteration]
Once -> CG : ensure()  [key not in _known → runs]
CG -> R : XGROUP CREATE {stream} {group} MKSTREAM
R --> CG : OK
CG -> CG : _known.add(key)
Once -> R : XREADGROUP ...  [normal processing resumes]
@enduml

7. Full run_once() Lifecycle with Metrics

@startuml
title Full run_once() Lifecycle

participant "run_once()" as Loop
participant "ConsumerGroup\nensure()" as CG
participant "StreamClient" as C
participant "RetryHandler" as Retry
database "Redis" as R

Loop -> CG : ensure(dlq_stream, dlq_group)
note right of CG : No-op if key in _known.\nRuns XGROUP CREATE on first call\nor after NOGROUP reset.

group Step 2: New messages
    Loop -> C : read(stream, group, worker, batch_size, block_ms)
    C -> R : XREADGROUP GROUP {group} {worker} COUNT {n} BLOCK {ms} STREAMS {stream} >
    R --> Loop : [StreamMessage, ...]
    Loop -> Loop : total_read += n\ntps_in.record(n)\n_process_batch(messages)
    note right of Loop : ACKed IDs → total_acked, tps_out.record(n)
end

group Step 3: Orphan recovery (cursor loop)
    loop cursor != "0-0"
        Loop -> C : autoclaim(stream, group, worker, min_idle_ms, batch_size, cursor)
        C -> R : XAUTOCLAIM min_idle={ms} count={n} start_id={cursor}
        R --> Loop : (next_cursor, [reclaimed StreamMessage])
        Loop -> Loop : total_read += n\ntps_in.record(n)\n_process_batch(claimed)
        Loop -> Loop : cursor = next_cursor
    end
end

group Step 4: Poison-pill sweep
    Loop -> Retry : handle_poison_pills() → int
    Retry -> R : XPENDING_RANGE → filter delivery_count >= max_deliveries
    Retry -> R : XRANGE per poison ID (fetch raw bytes)
    Retry -> Retry : dlq_handler(msg, "max_deliveries") per pill
    Retry -> R : XACK {stream} {group} *poison_ids  (batched)
    Retry --> Loop : count of pills processed
    Loop -> Loop : total_dlq += count
end
@enduml

Running the Example

# terminal 1
docker run -p 6379:6379 redis

# terminal 2
python examples/basic_worker.py

Development

Setup

git clone <repo>
cd redis-stream-queue

python3 -m venv .venv
source .venv/bin/activate        # Windows: .venv\Scripts\activate

pip install -e ".[dev]"          # editable install + test deps

Running Tests

Tests use fakeredis — no real Redis needed.

# all tests
pytest tests/

# single file
pytest tests/test_consumer.py

# single test with verbose output
pytest tests/test_consumer.py::test_poison_pill_goes_to_dlq -v -s

# with coverage
pip install pytest-cov
pytest tests/ --cov=src/redis_stream_queue --cov-report=term-missing

Project Structure

src/
└── redis_stream_queue/
    ├── __init__.py     # public exports
    ├── client.py       # StreamClient — connection pool, all X* commands
    ├── producer.py     # StreamProducer — push + ProducerMetrics + all_metrics()
    ├── consumer.py     # StreamConsumer — main loop + all_metrics()
    ├── group.py        # ConsumerConfig + ConsumerGroup (class-level registry, stats, health)
    ├── message.py      # StreamMessage, PendingEntry, StreamStats, ConsumerInfo,
    │                   # ConsumerMetrics, ProducerMetrics, _TpsTracker
    ├── retry.py        # RetryHandler — poison-pill detection + DLQ routing
    ├── serializers.py  # Json / Msgpack / Pickle
    └── exceptions.py
tests/
    conftest.py         # shared fixtures: fake_redis, make_client, registry resets
    test_consumer.py    # consumer loop, metrics, all_metrics, NOGROUP recovery
    test_group.py       # stats, health check, pending details
    test_producer.py    # push, ensure_group, metrics, all_metrics
examples/
    basic_worker.py

Known Limitations

  • fetch_by_ids N+1: poison-pill fetch does one XRANGE call per ID. Fine for typical max_deliveries counts (< 10); would benefit from pipeline for very large poison batches.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

redis_stream_queue-0.1.0.tar.gz (26.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

redis_stream_queue-0.1.0-py3-none-any.whl (22.8 kB view details)

Uploaded Python 3

File details

Details for the file redis_stream_queue-0.1.0.tar.gz.

File metadata

  • Download URL: redis_stream_queue-0.1.0.tar.gz
  • Upload date:
  • Size: 26.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.11

File hashes

Hashes for redis_stream_queue-0.1.0.tar.gz
Algorithm Hash digest
SHA256 41d6e661825e1cc4a5f4f00849f35d2c96817547671a8e3a97e0073711313169
MD5 ceddea20d86a2c97a941d15b47bea11a
BLAKE2b-256 9484a139c7243f44e4dbf2c303601474ef59aac9b3bf2163c5dc9a83a6cbea9e

See more details on using hashes here.

File details

Details for the file redis_stream_queue-0.1.0-py3-none-any.whl.

File metadata

File hashes

Hashes for redis_stream_queue-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 8da9c9cacd1c8fe35040d93d3863f5b89ab241b6e015ea23e2dc75da8453fb7c
MD5 f3e3bb147a60091bdf0e73d1a347aa8e
BLAKE2b-256 186f4ba1c906778b0583adbeaa4241431d428d0b4ad7f943225858f80b46c278

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page