concurrent.futures extension with a ScheduledThreadPoolExecutor to handle delayed and periodic futures with rescheduling
Project description
Scheduled Futures
concurrent.futures extension with a ScheduledThreadPoolExecutor to handle delayed and periodic futures with rescheduling
Preamble
concurrent.futures is pretty awesome, but it does not handle periodic work which comes up a fair amount when creating applications.
apscheduler is great, but trying to wait for jobs to complete involves a racy mess of callbacks.
asyncio is also good and can simply handle periodic work with a asyncio.sleep
call, but this is more trouble than it is worth when most of your codebase uses system calls / dlls / synchronous libraries you dont want to rewrite or coat with asyncio.to_thread
calls.
This package was created to solve this problem. Please see Features below.
Documentation is just what there is in this README and the code itself.
Table of Contents
Inspiration
This package was inspired by this feature request and some of the code there.
Features
- Schedule futures at a fixed delay using a thread pool executor
- Rescheduling of
ScheduledFuture
instances after they have started ScheduledThreadPoolExecutor
is a subclass ofThreadPoolExecutor
, so it can be used anywhereThreadPoolExecutor
can, such as for the asyncio default executor- Logged warnings when a
ScheduledFuture's
runtime exceeded its period, indicating it may be scheduled to run too quickly - Logged warnings when a
ScheduledFuture
is run later scheduled, indicating you may need more workers to keep up - Statistics on the number of executions / exceptions / total runtime / average runtime for each
ScheduledFuture
Technologies
- Python >= 3.5
Install
pip install scheduled_futures
Development
Pull requests and new feature requests are welcome.
Example
Code
import time
from concurrent.futures import wait, CancelledError
import logging
from scheduled_futures import ScheduledThreadPoolExecutor
logging.basicConfig() # to see warning messages from lib
# which are pumped into a null handler by default
def work():
print(time.time(), 'work!')
def slow_work():
time.sleep(0.25)
print(time.time(), 'slow work...huuh')
with ScheduledThreadPoolExecutor() as stpool:
stpool.submit(work) # same interface as regular ThreadPoolExecutor still works
# simple demo of scheduling a callable a little later
print('\nDelayed future example')
print(time.time(), 'work scheduled')
future = stpool.schedule(work, start=time.time() + 0.5)
time.sleep(1)
# simple demo of scheduling a callable periodically
print('\nPeriodic future example')
future = stpool.schedule(work, period=0.25)
time.sleep(1)
future.cancel()
wait([future])
# show log warning for each execution because the future
# ran for longer than the period between runs
print('\nPeriod too short for long-running future example')
future = stpool.schedule(slow_work, period=0.20)
time.sleep(1)
future.cancel()
wait([future])
with ScheduledThreadPoolExecutor() as stpool:
print('\nRescheduling work example')
future = stpool.schedule(work, period=0.25)
time.sleep(0.55)
print('Rescheduling')
stpool.reschedule(future, period=1)
time.sleep(2)
def slow_work2():
time.sleep(0.15)
print(time.time(), 'slow work2...huuuuuuh')
with ScheduledThreadPoolExecutor(late_run_limit=0.1, max_workers=1) as stpool:
# show log warning before some executions because there are not enough workers
# to keep up with the execution schedule
print('\nNot enough workers example')
futures = []
for _ in range(3):
futures.append(stpool.schedule(slow_work2, period=0.20))
time.sleep(1)
list(map(lambda f: f.cancel(), futures))
wait(futures)
i = 0
def cancelled_work():
global i
i += 1
if i >= 3:
raise CancelledError('Stop working, now!')
print(time.time(), 'ran without cancellation')
with ScheduledThreadPoolExecutor() as stpool:
# cancel a periodic from inside a periodic
print('\nCancel from inside callable example')
future = stpool.schedule(cancelled_work, period=0.25)
try:
future.result()
except CancelledError:
print('work cancelled!')
Output
1654566752.6947718 work!
Delayed future example
1654566752.6957695 work scheduled
1654566753.2106197 work!
Periodic future example
1654566753.700693 work!
1654566753.9519553 work!
1654566754.2048206 work!
1654566754.4566417 work!
1654566754.7094283 work!
Period too short for long-running future example
1654566754.963385 slow work...huuh
WARNING:scheduled_futures:Periodic scheduled future runtime exceeded period.
1654566755.4197686 slow work...huuh
WARNING:scheduled_futures:Periodic scheduled future runtime exceeded period.
1654566755.8865035 slow work...huuh
WARNING:scheduled_futures:Periodic scheduled future runtime exceeded period.
Rescheduling work example
1654566755.8884244 work!
1654566756.1499033 work!
1654566756.4140623 work!
Rescheduling
1654566756.4449573 work!
1654566757.454106 work!
1654566758.462207 work!
Not enough workers example
1654566758.6189985 slow work2...huuuuuuh
WARNING:scheduled_futures:Late to run scheduled future.
1654566758.7743778 slow work2...huuuuuuh
WARNING:scheduled_futures:Late to run scheduled future.
1654566758.932357 slow work2...huuuuuuh
WARNING:scheduled_futures:Late to run scheduled future.
1654566759.0854318 slow work2...huuuuuuh
WARNING:scheduled_futures:Late to run scheduled future.
1654566759.2414527 slow work2...huuuuuuh
WARNING:scheduled_futures:Late to run scheduled future.
1654566759.3976445 slow work2...huuuuuuh
WARNING:scheduled_futures:Late to run scheduled future.
1654566759.5528805 slow work2...huuuuuuh
Cancel from inside callable example
1654566759.554882 ran without cancellation
1654566759.8188088 ran without cancellation
work cancelled!
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distributions
Built Distribution
Hashes for scheduled_futures-1.0.0-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 0eedddeb0926754e12448ad0b32da16de8d1ec1cb927c4d9a2e69cdfc6824ba4 |
|
MD5 | 0fa0cabe6518a1e5ce019126e0cb7bec |
|
BLAKE2b-256 | c422e43a3df26257169b4e89500e3c4d88f91a1fd4d5f43d88e12a38d88f58e4 |