Artifacts to test dependencies with AWS using boto3
Testing artifacts to work with the boto3 library.
The focus is on python 3 and boto3. So far, utils cover working with SQS queues and SNS topics.
LiveTestQueue allows to quicky test code that depends on a SQS queue.
>>> with LiveTestQueue() as queue: >>> queue.send_message(MessageBody='some') >>> msgs = queue.receive_messages() >>> print(msgs.body)
The context manager takes care of creating and finally deleting the queue, as well as ensuring the queue has a unique name (prefixed, to be identified as a “test” queue).
LiveTestTopicQueue allows to test code on topics.
>>> with LiveTestTopicQueue() as topic, queue: >>> topic.publish(Message='some') >>> msgs = queue.receive_messages() >>> print(msgs.body)
The context manager creates (and finally deletes) a pair of objects, one topic and one queue, that work together. Messages published to the topic can be red back on the queue. The topic has the appropriate policy to publish to the queue, and the queue is subscribed to the topic to operate as its endpoint.
LiveTestDynamoDBTable allows to test using live test DynamoDB tables:
>>> key_schema, attribute_definitions, provisioned_throughput = LiveTestDynamoDBTable.create_key_schema( >>> partition_key_name='my_partition_key', sorting_key_name='my_sorting_key', >>> partition_key_type='S', sorting_key_type='N', read_capacity_units=1, write_capacity_units=1) >>> with LiveTestDynamoDBTable(key_schema_definition=key_schema, >>> attribute_definitions=attribute_definitions, >>> provisioned_throughput=provisioned_throughput) as table:
Note the helper function to create the key schemas. Upong exiting the context manager, the test table is deleted.
The package includes a set of integration tests. These test live objects against the AWS backend, so the network must be up and the boto3 must be correctly configured (as described here).
An example test can be found in examples.py.
Test an object that uses a topic to send data:
with LiveTestQueue() as queue: o = ObjectUnderTest(sqs_queue=queue) o.do_something() o.send_results_to_backend() msgs = queue.receive_messages() self.assertEqual(len(msgs), 1) expected = json.dumps(expected_output) self.assertEqual(msgs.body, expected)
Testing an object that publishes to a topic, inspecting the message published:
with LiveTestTopicQueue() as (topic, queue): o = ObjectUnderTest(topic) o.do_something() o.send_results_to_aws() msgs = queue.receive_messages() expected = json.dumps(expected_output) self.assertEqual(msgs.body, expected)
Thank you both!
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
|File Name & Checksum SHA256 Checksum Help||Version||File Type||Upload Date|
|awstestutils-1.0.0-py3-none-any.whl (9.6 kB) Copy SHA256 Checksum SHA256||3.5||Wheel||Sep 4, 2016|
|awstestutils-1.0.0.tar.gz (7.5 kB) Copy SHA256 Checksum SHA256||–||Source||Sep 4, 2016|