Fast-forward time in asyncio Python by patching loop.time, loop.call_later, loop.call_at, and asyncio.sleep
Project description
aiofastforward
Fast-forward time in asyncio Python by patching loop.call_later, loop.call_at, loop.time, and asyncio.sleep. This allows you to test asynchronous code synchronously.
Inspired by AngularJS $timeout.$flush.
Installation
pip install aiofastforward
Usage
Patching is done through a context manager, similar to unittest.patch.
import asyncio
import aiofastforward
loop = asyncio.get_event_loop()
with aiofastforward.FastForward(loop) as forward:
# Call production function(s)
# ...
# Fast-forward time 1 second.
await forward(1)
# More production functions or assertions
# ...
Examples
asyncio.sleep
# Production code
async def sleeper(callback):
await asyncio.sleep(2)
callback(0)
# Test code
from unittest.mock import Mock, call
loop = asyncio.get_event_loop()
callback = Mock()
with aiofastforward.FastForward(loop) as forward:
asyncio.ensure_future(sleeper())
await forward(1)
self.assertEqual(callback.mock_calls, [])
await forward(1)
self.assertEqual(callback.mock_calls, [call(0)])
loop.call_later
# Production code
async def schedule_callback(loop, callback):
loop.call_later(1, callback, 0)
loop.call_later(2, callback, 1)
# Test code
from unittest.mock import Mock, call
loop = asyncio.get_event_loop()
with aiofastforward.FastForward(loop) as forward:
callback = Mock()
await schedule_callback(loop, callback)
await forward(1)
self.assertEqual(callback.mock_calls, [call(0)])
await forward(1)
self.assertEqual(callback.mock_calls, [call(0), call(1)])
loop.call_at
# Production code
async def schedule_callback(loop, callback):
now = loop.time()
loop.call_at(now + 1, callback, 0)
loop.call_at(now + 2, callback, 1)
# Test code
from unittest.mock import Mock, call
loop = asyncio.get_event_loop()
with aiofastforward.FastForward(loop) as forward:
callback = Mock()
await schedule_callback(loop, callback)
await forward(1)
self.assertEqual(callback.mock_calls, [call(0)])
await forward(1)
self.assertEqual(callback.mock_calls, [call(0), call(1)])
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
aiofastforward-0.0.8.tar.gz
(2.7 kB
view hashes)
Built Distribution
Close
Hashes for aiofastforward-0.0.8-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | f987e4281535e73d8509708004d9ef8e7e8b1396c1b3b2e09f12e1476d466dfb |
|
MD5 | fbc640810a18110445b3000e29ea37ef |
|
BLAKE2b-256 | c85063e3ed32de752c2ee8982eed36677a314caf224f6468817767e9b2775aac |