Skip to main content

Timed loops made simple

Project description

Timelooper

Build Status

I found myself re-implementing the same pattern over and over when it comes to repeating some task until some condition is met OR the time is up, so here it is abstracted and generalized into a neat package.

Yep, that's 25 lines of code + tests.

Here's a demo use case:

from timelooper import Looped, loop_timed
from datetime import timedelta

# Suppose we are listening to some queue
#   and want to batch the incoming messages.
#   However, we only want to wait for
#   some limited time for a batch 
#   to be formed.


class CollectableBatch(Looped):
    def __init__(self, queue, maxsize):
        self.batch = []
        self._queue = queue
        self._maxsize = maxsize
    
    async def do(self) -> None:
        self.batch.append(await self._queue.get())

    def should_stop(self) -> bool:
        return len(self.batch) == self._maxsize

    
collected = CollectableBatch(queue, maxsize=10)
await loop_timed(collected, timedelta(seconds=30))  

print(collected.batch)  # or whatever

Installation

pip install timelooper

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

timelooper-0.1.2.tar.gz (3.0 kB view details)

Uploaded Source

Built Distribution

timelooper-0.1.2-py3-none-any.whl (3.1 kB view details)

Uploaded Python 3

File details

Details for the file timelooper-0.1.2.tar.gz.

File metadata

  • Download URL: timelooper-0.1.2.tar.gz
  • Upload date:
  • Size: 3.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.1.12 CPython/3.8.10 Linux/5.13.0-27-generic

File hashes

Hashes for timelooper-0.1.2.tar.gz
Algorithm Hash digest
SHA256 8fa07d41e31577072634759c6be1c897c41ceb5eeb4aa7ffd27f73a2af09690e
MD5 1d46dcf784b7a5300f7e01a68d0f88be
BLAKE2b-256 f29c70edcbaa776e10cc769f4810e23b6d280ce1980350ef4eb1cfa965b57144

See more details on using hashes here.

File details

Details for the file timelooper-0.1.2-py3-none-any.whl.

File metadata

  • Download URL: timelooper-0.1.2-py3-none-any.whl
  • Upload date:
  • Size: 3.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.1.12 CPython/3.8.10 Linux/5.13.0-27-generic

File hashes

Hashes for timelooper-0.1.2-py3-none-any.whl
Algorithm Hash digest
SHA256 a315d05ab33b9e07d7364ae7be87dbd372be7609b256a969da08a7081e343d45
MD5 a507116aeba2139025509b3317b8eb64
BLAKE2b-256 175470fa3e53d12f04c98de9eeac1dab17a84818f8efe17f79b164b46185dbe0

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page