Fast-forward time in asyncio Python by patching loop.time, loop.call_later, loop.call_at, and asyncio.sleep
Project description
aiofastfoward
Fast-forward time in asyncio Python by patching loop.call_later, loop.call_at, loop.time, and asyncio.sleep. This allows you to test asynchronous code synchronously.
Inspired by AngularJS $timeout.$flush.
Installation
pip install aiofastforward
Usage
Patching is done through a context manager, similar to unittest.patch.
import asyncio
import aiofastfoward
loop = asyncio.get_event_loop()
with aiofastfoward.FastForward(loop) as forward:
# Call production function(s)
# ...
# Fast-forward time 1 second.
await forward(1)
# More production functions or assertions
# ...
Examples
asyncio.sleep
# Production code
async def sleeper(callback):
await asyncio.sleep(2)
callback(0)
# Test code
from unittest.mock import Mock, call
loop = asyncio.get_event_loop()
callback = Mock()
with aiofastforward.FastForward(loop) as forward:
asyncio.ensure_future(sleeper())
await forward(1)
self.assertEqual(callback.mock_calls, [])
await forward(1)
self.assertEqual(callback.mock_calls, [call(0)])
loop.call_later
# Production code
async def schedule_callback(loop, callback):
loop.call_later(1, callback, 0)
loop.call_later(2, callback, 1)
# Test code
from unittest.mock import Mock, call
loop = asyncio.get_event_loop()
with aiofastfoward.FastForward(loop) as forward:
callback = Mock()
await schedule_callback(loop, callback)
await forward(1)
self.assertEqual(callback.mock_calls, [call(0)])
await forward(1)
self.assertEqual(callback.mock_calls, [call(0), call(1)])
loop.call_at
# Production code
async def schedule_callback(loop, callback):
now = loop.time()
loop.call_at(now + 1, callback, 0)
loop.call_at(now + 2, callback, 1)
# Test code
from unittest.mock import Mock, call
loop = asyncio.get_event_loop()
with aiofastfoward.FastForward(loop) as forward:
callback = Mock()
await schedule_callback(loop, callback)
await forward(1)
self.assertEqual(callback.mock_calls, [call(0)])
await forward(1)
self.assertEqual(callback.mock_calls, [call(0), call(1)])
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
aiofastforward-0.0.7.tar.gz
(2.7 kB
view hashes)
Built Distribution
Close
Hashes for aiofastforward-0.0.7-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | f80058cf3df49b9c805baa956f844cfc282f46a3d298c9456997ffdc1556e629 |
|
MD5 | 2545c8c7bb46f78b4644ae4cca9b3aca |
|
BLAKE2b-256 | 03a69332f5d866402233ffe8fbda7f9905834f82ac3d5d94ebe59d063b516978 |