General-purpose async Redis Streams consumer group library with DLQ, crash recovery, and monitoring
Project description
redis-stream-queue
Async Python library for Redis Streams consumer groups with built-in crash recovery, DLQ, and monitoring.
Features
- Producer: push messages from any number of pods — XADD is atomic, no coordination needed
- Consumer: callback-based read → ACK loop; partial ACK supported
- Crash recovery: XAUTOCLAIM cursor loop sweeps full PEL per iteration; NOGROUP auto-recovery if stream deleted externally
- Dead-letter queue: decode errors and poison pills (exceeding
max_deliveries) routed to DLQ handler - Consumer metrics:
consumer.metrics()—tps_in/tps_out/tps_total(60s sliding window), avg TPS, read/acked/DLQ/error counters - Producer metrics:
producer.metrics()— push TPS, total pushed, avg TPS, uptime - Process-wide aggregation:
StreamConsumer.all_metrics()/StreamProducer.all_metrics()— collect from all live instances in this process via weakref registry; zero Redis overhead - Stream monitoring: lag, PEL size, per-consumer idle time, health checks via
ConsumerGroup - Pluggable serializers: JSON (default), msgpack, pickle — or bring your own
- Redis Cluster:
from_cluster()andfrom_url()factory methods - Multi-pod safe: unique worker names auto-generated per pod (
{group}_{hostname}_{rand4})
Requirements
- Python ≥ 3.11
- Redis ≥ 6.2 (XAUTOCLAIM support)
Installation
pip install redis-stream-queue
With msgpack support:
pip install redis-stream-queue[msgpack]
Quick Start
1. Start Redis
docker run -p 6379:6379 redis
2. Producer
import asyncio
from redis_stream_queue import StreamClient, StreamProducer
async def main():
client = StreamClient(host="localhost")
producer = StreamProducer(client=client, stream="orders", group="order_workers")
await producer.ensure_group() # idempotent — safe to call on every startup
msg_id = await producer.push({"order_id": 1})
print(f"pushed: {msg_id}")
await client.close()
asyncio.run(main())
3. Consumer
import asyncio
from redis_stream_queue import StreamClient, StreamConsumer, ConsumerConfig
async def handle(messages):
for msg in messages:
print(f"processing: {msg.data}")
return [m.id for m in messages] # return IDs to ACK; omit to leave in PEL
async def on_dlq(msg, reason):
print(f"DLQ [{reason}]: {msg.data}")
async def main():
client = StreamClient(host="localhost")
config = ConsumerConfig(
group="order_workers",
dlq_stream="orders_dlq",
batch_size=100,
block_ms=5_000,
max_deliveries=3,
)
consumer = StreamConsumer(
client=client,
stream="orders",
config=config,
handler=handle,
dlq_handler=on_dlq,
)
await consumer.run() # infinite loop; Ctrl-C / CancelledError to stop
asyncio.run(main())
Handler contract: return a list of IDs to ACK. Return
[]to ACK nothing (messages stay in PEL for retry). Never returnNone— that triggers a warning and no ACK.
4. Consumer Throughput Metrics
consumer.metrics() is non-blocking and makes no Redis calls — safe to poll from any monitoring loop or health endpoint.
import asyncio
from redis_stream_queue import StreamClient, StreamConsumer, ConsumerConfig
async def monitor(consumer):
while True:
m = consumer.metrics()
print(
f"in={m.tps_in:.1f} msg/s out={m.tps_out:.1f} msg/s total={m.tps_total:.1f} msg/s "
f"avg={m.avg_tps:.1f} msg/s "
f"read={m.total_read} acked={m.total_acked} "
f"dlq={m.total_dlq} errors={m.total_errors} "
f"uptime={m.uptime_secs:.0f}s"
)
await asyncio.sleep(5)
async def main():
client = StreamClient(host="localhost")
config = ConsumerConfig(group="order_workers")
consumer = StreamConsumer(client=client, stream="orders", config=config, handler=handle)
await asyncio.gather(consumer.run(), monitor(consumer))
| Field | Type | Description |
|---|---|---|
tps_in |
float |
Reads/sec — XREADGROUP + XAUTOCLAIM, sliding 60s window |
tps_out |
float |
Acked/sec — sliding 60s window |
tps_total |
float |
tps_in + tps_out |
avg_tps |
float |
total_acked / uptime_secs since first message |
total_read |
int |
Messages pulled from stream (new + reclaimed via XAUTOCLAIM) |
total_acked |
int |
Successfully processed and ACKed by handler |
total_dlq |
int |
Routed to DLQ (decode_error + max_deliveries combined) |
total_errors |
int |
Handler exceptions — message stays in PEL for retry |
uptime_secs |
float |
Seconds since first message was processed |
5. Producer Throughput Metrics
m = producer.metrics()
print(f"push={m.tps:.1f} msg/s avg={m.avg_tps:.1f} msg/s total={m.total_pushed} uptime={m.uptime_secs:.0f}s")
| Field | Type | Description |
|---|---|---|
total_pushed |
int |
Messages pushed since instance creation |
tps |
float |
Pushed/sec — sliding 60s window |
avg_tps |
float |
total_pushed / uptime_secs since first push |
uptime_secs |
float |
Seconds since first push |
6. Process-Wide Metrics (Multiple Instances)
Each StreamConsumer and StreamProducer auto-registers in a process-level weakref registry on creation. Dead instances are evicted automatically by GC — no manual cleanup needed.
# Multiple consumers in same asyncio event loop
c1 = StreamConsumer(client=client, stream="orders", config=cfg_a, handler=handle_a)
c2 = StreamConsumer(client=client, stream="payments", config=cfg_b, handler=handle_b)
c3 = StreamConsumer(client=client, stream="events", config=cfg_c, handler=handle_c)
await asyncio.gather(c1.run(), c2.run(), c3.run())
# From a monitoring task running concurrently:
for m in StreamConsumer.all_metrics():
print(f"in={m.tps_in:.1f} out={m.tps_out:.1f} acked={m.total_acked}")
# Multiple producers
p1 = StreamProducer(client=client, stream="orders")
p2 = StreamProducer(client=client, stream="payments")
for m in StreamProducer.all_metrics():
print(f"tps={m.tps:.1f} pushed={m.total_pushed}")
Multi-pod note:
all_metrics()is in-process only — it sees instances in this pod, not other pods. For cross-pod aggregation, expose metrics via a health endpoint (FastAPI, Django, plain HTTP) and scrape with Prometheus or a similar tool. Each pod reports its own slice; your scraper aggregates across pods. Zero extra Redis IOPS.
7. Stream / Group Monitoring
from redis_stream_queue import StreamClient, ConsumerGroup
async def main():
client = StreamClient(host="localhost")
cg = ConsumerGroup(client, stream="orders", group="order_workers")
# Stream-level stats (requires Redis calls)
stats = await cg.stats(dlq_stream="orders_dlq")
print(f"length={stats.stream_length} lag={stats.lag} pel={stats.group_pel_size}")
for c in stats.consumers:
print(f" consumer={c.name} pending={c.pending} idle={c.idle_ms}ms")
# Health check
health = await cg.health_check(max_lag=1_000, max_idle_ms=60_000)
print(f"healthy={health['healthy']} issues={health['issues']}")
# Inspect stuck messages
pending = await cg.pending_details(count=50)
for entry in pending:
print(f" {entry.id} consumer={entry.consumer} deliveries={entry.delivery_count}")
Configuration Reference
StreamClient
| Param | Default | Description |
|---|---|---|
host |
"localhost" |
Redis host |
port |
6379 |
Redis port |
db |
0 |
Redis DB index |
username |
None |
AUTH username |
password |
None |
AUTH password |
prefix |
"" |
Key prefix prepended to all stream names ({prefix}_{stream}) |
max_connections |
1000 |
Connection pool size |
pool_timeout |
5 |
Seconds to wait for a free connection |
ssl |
False |
Enable TLS |
Cluster / URL variants:
# Redis Cluster
client = StreamClient.from_cluster(
startup_nodes=[{"host": "node1", "port": 6379}],
password="secret",
)
# URL — single node
client = StreamClient.from_url("redis://localhost:6379/0")
client = StreamClient.from_url("rediss://user:pass@host:6380/0") # TLS
# URL — cluster
client = StreamClient.from_url("redis+cluster://node1:6379")
client = StreamClient.from_url("rediss+cluster://node1:6380") # TLS cluster
Cluster key rule: stream and DLQ stream must share a hash tag to land on the same slot:
stream="{orders}_main", dlq_stream="{orders}_dlq"
ConsumerConfig
| Param | Default | Description |
|---|---|---|
group |
required | Consumer group name |
worker_name |
auto | Unique consumer name per pod. Auto = {group}_{hostname}_{rand4} (group part truncated if needed to preserve hostname+suffix) |
dlq_stream |
None |
Stream name to route poison pills and decode errors to |
dlq_group |
None |
Consumer group for the DLQ stream |
batch_size |
100 |
Max messages per XREADGROUP / XAUTOCLAIM call |
block_ms |
5000 |
XREADGROUP block timeout (ms) — 0 = non-blocking |
min_idle_claim_ms |
10000 |
XAUTOCLAIM idle threshold (ms). Set to at least 2× max handler latency |
max_deliveries |
3 |
Delivery count before message is routed to DLQ |
max_stream_size |
100000 |
Approximate XADD MAXLEN trim |
max_claim_passes |
None |
Max XAUTOCLAIM cursor iterations per run_once(). None = sweep full PEL. Set to 1 to restore single-pass behavior |
Serializers
from redis_stream_queue import JsonSerializer, MsgpackSerializer, PickleSerializer
# Serializer must match on both producer and consumer
producer = StreamProducer(..., serializer=MsgpackSerializer())
consumer = StreamConsumer(..., serializer=MsgpackSerializer())
| Serializer | Extra | Notes |
|---|---|---|
JsonSerializer |
none | Default; human-readable, broadest compat |
MsgpackSerializer |
[msgpack] |
Smaller wire size, faster encode/decode |
PickleSerializer |
none | Any Python type; requires same Python version on both ends; do not use with untrusted data |
Decode failures → dlq_handler(msg, "decode_error") + immediate XACK (no retry — corrupt data will always fail).
Custom serializer — implement the Serializer protocol:
from redis_stream_queue import Serializer
class CborSerializer:
def encode(self, data: dict) -> bytes:
import cbor2
return cbor2.dumps(data)
def decode(self, raw: bytes) -> dict:
import cbor2
return cbor2.loads(raw)
Multi-Pod Deployment
Producers
No coordination needed. XADD is atomic; Redis assigns unique IDs ({timestamp}-{seq}). All pods write to the same stream key safely.
Consumers
Each pod gets a unique worker_name. Redis distributes messages across consumers in the same group — each message delivered to exactly one pod.
Pod A: worker_name = "order_workers_pod-a_3821" ─┐
Pod B: worker_name = "order_workers_pod-b_9174" ─┼─ group "order_workers"
Pod C: worker_name = "order_workers_pod-c_0042" ─┘
Crash recovery: if a pod crashes, its unacknowledged PEL messages go idle. Other pods reclaim them via XAUTOCLAIM after min_idle_claim_ms. Set this to at least 2× your maximum handler latency.
NOGROUP recovery: if the stream or group is deleted externally (e.g. FLUSHALL, XGROUP DESTROY), the consumer detects the NOGROUP error, clears its entry from the group registry, and re-creates the group on the next iteration — no manual restart required.
Recommended settings for multi-pod:
| Setting | Value | Reason |
|---|---|---|
worker_name |
auto (default) | Unique per pod |
min_idle_claim_ms |
2× max handler latency | Avoid premature cross-pod reclaim |
block_ms |
5000 |
Balanced latency vs idle CPU |
max_deliveries |
3–5 |
Accounts for transient failures across restarts |
Consumer Loop Internals
Each run_once() call executes four steps:
1. ensure() — XGROUP CREATE mkstream; no-op if group known in class-level registry.
Registry entry removed on NOGROUP — re-creation runs on next iteration.
2. XREADGROUP ">" — fetch new undelivered messages
├─ decode error → dlq_handler(msg, "decode_error") + XACK (no retry)
├─ handler(msgs) → XACK returned IDs; total_acked += n; tps_out tracker updated
├─ tps_in updated with len(raw_messages)
├─ handler → None → warning logged; no XACK (treat as explicit "ACK nothing")
└─ unacked IDs stay in PEL for XAUTOCLAIM recovery
3. XAUTOCLAIM cursor loop — reclaims msgs idle > min_idle_claim_ms
├─ follows cursor until Redis returns "0-0" (full PEL swept) or stall detected
├─ max_claim_passes caps iterations if set; None = unlimited (default)
├─ tps_in updated with len(claimed) per batch
└─ reclaimed msgs → same handler → XACK; tps_out updated
4. XPENDING sweep — find entries with delivery_count >= max_deliveries
├─ no dlq_handler → warning logged with IDs; still ACKed (message cleared)
├─ msg missing from stream (XDEL'd) → warning logged; still ACKed
└─ poison pills → dlq_handler(msg, "max_deliveries") + batched XACK
run() wraps run_once() in an infinite loop:
├─ CancelledError → re-raised immediately (clean shutdown)
├─ NOGROUP error → registry entry cleared; sleep 1s; re-enter loop
└─ any other error → logged; sleep 1s; re-enter loop
Metrics updated per iteration:
total_readincremented on every XREADGROUP / XAUTOCLAIM batchtps_insliding tracker records every read batch (new + reclaimed)total_ackedincremented after each successful handler → XACKtps_outsliding tracker records every ack batchtotal_dlqincremented for decode errors + poison pill ACKstotal_errorsincremented for handler exceptions
Sequence Diagrams
Render with PlantUML, VS Code PlantUML extension, or IntelliJ PlantUML plugin.
1. Producer: Push Message
@startuml
title Producer — Push Message
participant "App" as App
participant "StreamProducer" as P
participant "StreamClient" as C
database "Redis Stream" as R
App -> P : push(data: dict)
P -> P : serializer.encode(data) → bytes
P -> C : push(stream, encoded_bytes, max_len)
C -> R : XADD {stream} MAXLEN ~ {max_len}\n * data {bytes}
R --> C : message_id (e.g. "1700000000-0")
C --> P : message_id
P -> P : total_pushed += 1\ntps_tracker.record(1)
P --> App : message_id
@enduml
2. Consumer: Normal Message Processing
@startuml
title Consumer — Normal Message Processing (run_once step 2)
participant "StreamConsumer\nrun_once()" as Consumer
participant "StreamClient" as C
participant "handler()" as H
database "Redis PEL" as PEL
Consumer -> C : read(stream, group, worker_name,\n count=batch_size, block_ms)
C -> PEL : XREADGROUP GROUP {group} {worker}\n COUNT {n} BLOCK {ms} STREAMS {stream} >
PEL --> C : [(id1, fields), (id2, fields), ...]
note right of PEL : Messages enter PEL\n(pending until ACKed)
C --> Consumer : [StreamMessage, ...]
Consumer -> Consumer : tps_in.record(n)\ntotal_read += n
Consumer -> Consumer : serializer.decode(raw) per message
Consumer -> H : handler(decoded_messages)
H --> Consumer : [acked_ids]
Consumer -> C : ack(stream, group, *acked_ids)
C -> PEL : XACK {stream} {group} {id1} {id2} ...
note right of PEL : Messages removed from PEL
Consumer -> Consumer : total_acked += len(acked_ids)\ntps_out.record(n)
note over Consumer : Unacked IDs stay in PEL\nfor crash recovery
@enduml
3. Crash Recovery: XAUTOCLAIM Cursor Loop
@startuml
title Crash Recovery — XAUTOCLAIM Cursor Loop (run_once step 3)
participant "Pod A" as A
participant "Pod B\nrun_once()" as B
database "Redis PEL" as PEL
== Pod A: reads, starts processing, then crashes ==
A -> PEL : XREADGROUP → msg_001 assigned to Pod A
note over A : Pod A crashes.\nmsg_001 never ACKed.\nSits in PEL, idle growing.
== Pod B: autoclaim cursor loop sweeps full PEL ==
loop cursor != "0-0" and no stall
B -> PEL : XAUTOCLAIM group={group} consumer={B}\n min_idle_time={ms} count={n} start_id={cursor}
PEL --> B : (next_cursor, [reclaimed_msgs])
B -> B : tps_in.record(n)\n_process_batch(reclaimed_msgs)
B -> B : cursor = next_cursor
end
note right of PEL : Loop exits when cursor="0-0"\n(full PEL swept) or stall detected.\nmax_claim_passes caps iterations if set.
B -> PEL : XACK {stream} {group} msg_001
note right of PEL : msg_001 cleared from PEL
@enduml
4. Decode Error → Immediate DLQ
@startuml
title Decode Error — Immediate DLQ (run_once step 2)
participant "_process_batch()" as Batch
participant "RetryHandler" as Retry
participant "dlq_handler()" as DLQ
database "Redis PEL" as PEL
Batch -> Batch : serializer.decode(raw) — raises Exception
note right of Batch : Corrupt or wrong format.\nCannot retry — will always fail.
Batch -> Retry : send_to_dlq(msg, "decode_error")
Retry -> DLQ : dlq_handler(msg, "decode_error")
DLQ --> Retry : (done; dlq handler errors swallowed)
Batch -> PEL : XACK bad_msg_id
Batch -> Batch : total_dlq += 1
note right of PEL : Removed immediately.\nWill NOT re-enter consumer\nloop for more retries.
@enduml
5. Poison Pill → DLQ After Max Deliveries
@startuml
title Poison Pill — DLQ After max_deliveries (run_once step 4)
participant "RetryHandler\nhandle_poison_pills()" as Retry
participant "dlq_handler()" as DLQ
database "Redis PEL" as PEL
database "Redis Stream" as Stream
note over PEL : msg_999 redelivered\ndelivery_count >= max_deliveries
Retry -> PEL : XPENDING_RANGE {stream} {group}\n - + count={batch_size}
PEL --> Retry : [PendingEntry(id=msg_999, delivery_count=5)]
Retry -> Retry : filter: delivery_count >= max_deliveries
alt message exists in stream
Retry -> Stream : XRANGE {stream} msg_999 msg_999 COUNT 1
Stream --> Retry : raw bytes
Retry -> Retry : try_decode(raw)
Retry -> DLQ : dlq_handler(decoded_msg, "max_deliveries")
else message deleted from stream (XDEL)
Retry -> Retry : log warning "missing from stream"\n(no DLQ call — nothing to forward)
end
Retry -> PEL : XACK {stream} {group} *all_poison_ids (batched)
note right of PEL : Poison pills cleared.\nNever retried again.
@enduml
6. NOGROUP Auto-Recovery
@startuml
title NOGROUP Auto-Recovery
participant "run() loop" as Loop
participant "ConsumerGroup" as CG
database "Redis" as R
participant "run_once()" as Once
Loop -> Once : await run_once()
Once -> CG : ensure() [key in _known → skipped]
Once -> R : XREADGROUP ...
R --> Once : ResponseError: NOGROUP ...
Once --> Loop : raises Exception("NOGROUP ...")
Loop -> Loop : "NOGROUP" in str(e) → True
Loop -> CG : reset() [removes key from _known]
Loop -> Loop : sleep(1)
Loop -> Once : await run_once() [next iteration]
Once -> CG : ensure() [key not in _known → runs]
CG -> R : XGROUP CREATE {stream} {group} MKSTREAM
R --> CG : OK
CG -> CG : _known.add(key)
Once -> R : XREADGROUP ... [normal processing resumes]
@enduml
7. Full run_once() Lifecycle with Metrics
@startuml
title Full run_once() Lifecycle
participant "run_once()" as Loop
participant "ConsumerGroup\nensure()" as CG
participant "StreamClient" as C
participant "RetryHandler" as Retry
database "Redis" as R
Loop -> CG : ensure(dlq_stream, dlq_group)
note right of CG : No-op if key in _known.\nRuns XGROUP CREATE on first call\nor after NOGROUP reset.
group Step 2: New messages
Loop -> C : read(stream, group, worker, batch_size, block_ms)
C -> R : XREADGROUP GROUP {group} {worker} COUNT {n} BLOCK {ms} STREAMS {stream} >
R --> Loop : [StreamMessage, ...]
Loop -> Loop : total_read += n\ntps_in.record(n)\n_process_batch(messages)
note right of Loop : ACKed IDs → total_acked, tps_out.record(n)
end
group Step 3: Orphan recovery (cursor loop)
loop cursor != "0-0"
Loop -> C : autoclaim(stream, group, worker, min_idle_ms, batch_size, cursor)
C -> R : XAUTOCLAIM min_idle={ms} count={n} start_id={cursor}
R --> Loop : (next_cursor, [reclaimed StreamMessage])
Loop -> Loop : total_read += n\ntps_in.record(n)\n_process_batch(claimed)
Loop -> Loop : cursor = next_cursor
end
end
group Step 4: Poison-pill sweep
Loop -> Retry : handle_poison_pills() → int
Retry -> R : XPENDING_RANGE → filter delivery_count >= max_deliveries
Retry -> R : XRANGE per poison ID (fetch raw bytes)
Retry -> Retry : dlq_handler(msg, "max_deliveries") per pill
Retry -> R : XACK {stream} {group} *poison_ids (batched)
Retry --> Loop : count of pills processed
Loop -> Loop : total_dlq += count
end
@enduml
Running the Example
# terminal 1
docker run -p 6379:6379 redis
# terminal 2
python examples/basic_worker.py
Development
Setup
git clone <repo>
cd redis-stream-queue
python3 -m venv .venv
source .venv/bin/activate # Windows: .venv\Scripts\activate
pip install -e ".[dev]" # editable install + test deps
Running Tests
Tests use fakeredis — no real Redis needed.
# all tests
pytest tests/
# single file
pytest tests/test_consumer.py
# single test with verbose output
pytest tests/test_consumer.py::test_poison_pill_goes_to_dlq -v -s
# with coverage
pip install pytest-cov
pytest tests/ --cov=src/redis_stream_queue --cov-report=term-missing
Project Structure
src/
└── redis_stream_queue/
├── __init__.py # public exports
├── client.py # StreamClient — connection pool, all X* commands
├── producer.py # StreamProducer — push + ProducerMetrics + all_metrics()
├── consumer.py # StreamConsumer — main loop + all_metrics()
├── group.py # ConsumerConfig + ConsumerGroup (class-level registry, stats, health)
├── message.py # StreamMessage, PendingEntry, StreamStats, ConsumerInfo,
│ # ConsumerMetrics, ProducerMetrics, _TpsTracker
├── retry.py # RetryHandler — poison-pill detection + DLQ routing
├── serializers.py # Json / Msgpack / Pickle
└── exceptions.py
tests/
conftest.py # shared fixtures: fake_redis, make_client, registry resets
test_consumer.py # consumer loop, metrics, all_metrics, NOGROUP recovery
test_group.py # stats, health check, pending details
test_producer.py # push, ensure_group, metrics, all_metrics
examples/
basic_worker.py
Known Limitations
fetch_by_idsN+1: poison-pill fetch does one XRANGE call per ID. Fine for typicalmax_deliveriescounts (< 10); would benefit from pipeline for very large poison batches.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file redis_stream_queue-0.1.1.tar.gz.
File metadata
- Download URL: redis_stream_queue-0.1.1.tar.gz
- Upload date:
- Size: 39.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.6
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
edf6a3e5fc760ed5d4ce6335f26c3b2b5bca3cd9ef669a796a15a6c694247920
|
|
| MD5 |
d95ff22ce4b6d1acc29a8684dceeccd0
|
|
| BLAKE2b-256 |
42fcb8dd1b6763223b418b76d3a2dedf397b314d95f9cad8d99708afedac071d
|
File details
Details for the file redis_stream_queue-0.1.1-py3-none-any.whl.
File metadata
- Download URL: redis_stream_queue-0.1.1-py3-none-any.whl
- Upload date:
- Size: 22.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.6
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
4df689f2644b2a09648e8d4377dcd8c8ea6dac245782b49012adba996799e326
|
|
| MD5 |
f188e981dcf6fdaad1ae6e698fbbdff3
|
|
| BLAKE2b-256 |
9641800a0758ce60bd0f8fc876f428ea2c3e5613df2370bf6bddd0e5dcc4fd48
|