concurrent.futures extension with a ScheduledThreadPoolExecutor to handle delayed and periodic futures with rescheduling
Project description
Scheduled Futures
concurrent.futures extension with a ScheduledThreadPoolExecutor to handle delayed and periodic futures with rescheduling
Preamble
concurrent.futures is pretty awesome, but it does not handle periodic work which comes up a fair amount when creating applications.
apscheduler is great, but trying to wait for jobs to complete involves a racy mess of callbacks.
asyncio is also good and can simply handle periodic work with a asyncio.sleep call, but this is more trouble than it is worth when most of your codebase uses system calls / dlls / synchronous libraries you dont want to rewrite or coat with asyncio.to_thread calls.
This package was created to solve this problem. Please see Features below.
Documentation is just what there is in this README and the code itself.
Table of Contents
Inspiration
This package was inspired by this feature request and some of the code there.
Features
- Schedule futures at a fixed delay using a thread pool executor
- Rescheduling of
ScheduledFutureinstances after they have started ScheduledThreadPoolExecutoris a subclass ofThreadPoolExecutor, so it can be used anywhereThreadPoolExecutorcan, such as for the asyncio default executor- Logged warnings when a
ScheduledFuture'sruntime exceeded its period, indicating it may be scheduled to run too quickly - Logged warnings when a
ScheduledFutureis run later scheduled, indicating you may need more workers to keep up - Statistics on the number of executions / exceptions / total runtime / average runtime for each
ScheduledFuture
Technologies
- Python >= 3.5
Install
pip install scheduled_futures
Development
Pull requests and new feature requests are welcome.
Example
Code
import time
from concurrent.futures import wait, CancelledError
import logging
from scheduled_futures import ScheduledThreadPoolExecutor
logging.basicConfig() # to see warning messages from lib
# which are pumped into a null handler by default
def work():
print(time.time(), 'work!')
def slow_work():
time.sleep(0.25)
print(time.time(), 'slow work...huuh')
with ScheduledThreadPoolExecutor() as stpool:
stpool.submit(work) # same interface as regular ThreadPoolExecutor still works
# simple demo of scheduling a callable a little later
print('\nDelayed future example')
print(time.time(), 'work scheduled')
future = stpool.schedule(work, start=time.time() + 0.5)
time.sleep(1)
# simple demo of scheduling a callable periodically
print('\nPeriodic future example')
future = stpool.schedule(work, period=0.25)
time.sleep(1)
future.cancel()
wait([future])
# show log warning for each execution because the future
# ran for longer than the period between runs
print('\nPeriod too short for long-running future example')
future = stpool.schedule(slow_work, period=0.20)
time.sleep(1)
future.cancel()
wait([future])
with ScheduledThreadPoolExecutor() as stpool:
print('\nRescheduling work example')
future = stpool.schedule(work, period=0.25)
time.sleep(0.55)
print('Rescheduling')
stpool.reschedule(future, period=1)
time.sleep(2)
def slow_work2():
time.sleep(0.15)
print(time.time(), 'slow work2...huuuuuuh')
with ScheduledThreadPoolExecutor(late_run_limit=0.1, max_workers=1) as stpool:
# show log warning before some executions because there are not enough workers
# to keep up with the execution schedule
print('\nNot enough workers example')
futures = []
for _ in range(3):
futures.append(stpool.schedule(slow_work2, period=0.20))
time.sleep(1)
list(map(lambda f: f.cancel(), futures))
wait(futures)
i = 0
def cancelled_work():
global i
i += 1
if i >= 3:
raise CancelledError('Stop working, now!')
print(time.time(), 'ran without cancellation')
with ScheduledThreadPoolExecutor() as stpool:
# cancel a periodic from inside a periodic
print('\nCancel from inside callable example')
future = stpool.schedule(cancelled_work, period=0.25)
try:
future.result()
except CancelledError:
print('work cancelled!')
Output
1654566752.6947718 work!
Delayed future example
1654566752.6957695 work scheduled
1654566753.2106197 work!
Periodic future example
1654566753.700693 work!
1654566753.9519553 work!
1654566754.2048206 work!
1654566754.4566417 work!
1654566754.7094283 work!
Period too short for long-running future example
1654566754.963385 slow work...huuh
WARNING:scheduled_futures:Periodic scheduled future runtime exceeded period.
1654566755.4197686 slow work...huuh
WARNING:scheduled_futures:Periodic scheduled future runtime exceeded period.
1654566755.8865035 slow work...huuh
WARNING:scheduled_futures:Periodic scheduled future runtime exceeded period.
Rescheduling work example
1654566755.8884244 work!
1654566756.1499033 work!
1654566756.4140623 work!
Rescheduling
1654566756.4449573 work!
1654566757.454106 work!
1654566758.462207 work!
Not enough workers example
1654566758.6189985 slow work2...huuuuuuh
WARNING:scheduled_futures:Late to run scheduled future.
1654566758.7743778 slow work2...huuuuuuh
WARNING:scheduled_futures:Late to run scheduled future.
1654566758.932357 slow work2...huuuuuuh
WARNING:scheduled_futures:Late to run scheduled future.
1654566759.0854318 slow work2...huuuuuuh
WARNING:scheduled_futures:Late to run scheduled future.
1654566759.2414527 slow work2...huuuuuuh
WARNING:scheduled_futures:Late to run scheduled future.
1654566759.3976445 slow work2...huuuuuuh
WARNING:scheduled_futures:Late to run scheduled future.
1654566759.5528805 slow work2...huuuuuuh
Cancel from inside callable example
1654566759.554882 ran without cancellation
1654566759.8188088 ran without cancellation
work cancelled!
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file scheduled_futures-1.1.0.tar.gz.
File metadata
- Download URL: scheduled_futures-1.1.0.tar.gz
- Upload date:
- Size: 12.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.1 CPython/3.10.5
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
71c92d4e7e31957ba489f6a9c60d1af6614691b54f52d8cd64960ce3ea3ccb84
|
|
| MD5 |
0215997374990261142baf1752bbb0ed
|
|
| BLAKE2b-256 |
efdb2599cbdacf1837b572a1abc99e8d1a6fb2a4ede7784becb15ca17c8a8969
|
File details
Details for the file scheduled_futures-1.1.0-py3-none-any.whl.
File metadata
- Download URL: scheduled_futures-1.1.0-py3-none-any.whl
- Upload date:
- Size: 10.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.1 CPython/3.10.5
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
93233d937ed93d0e6b6983fd7efccd3513f45a0f8e23ed61da20455e66a59ffc
|
|
| MD5 |
1d857c790c11fd61a9ad26d5ade82900
|
|
| BLAKE2b-256 |
50c1baa2e7bc646b3568d8dc58e5677f2f203e4380dc2590328e38d463bea6e5
|