Libraries for testing openmodule services
Project description
Openmodule Test SDK
ZMQ Messages
Our zmq stack requires a broker. For testcases a mixin exists, which starts a simple broker on setUp(...)
and shuts it
down on tearDown(...)
. Furthermore the mixin takes care of the slow joiner issue, by testing connectivity with each
topic it has subscribed to.
Context
The ZMQTextMixin
has some useful functions:
zmq_context()
returns the ZMQ Context of the broker. This context has to be used in order to utilize theinproc://
transport.zmq_config()
returns a config object withBROKER_SUB
andBROKER_PUB
set to the correct binds.
Subscribing
The mixin automatically subscribes to all topics listed in the topics
list, and all rpc response channels for the
rpc's listed in rpc_channels
.
class MyTestCase(ZMQTestMixin, TestCase):
topics = ["health"]
rpc_channels = ["controller"]
For dynamically subscribing you must use the provided subscribe function. It again takes care of the slow joiner issue by sending to the requested topic and waiting for a response.
class MyTestCase(ZMQTestMixin, TestCase):
def test_something(self):
...
self.zmq_client.subscribe("my-topic", "some-more-topics")
...
Sending
In order to send, you have to specify the topic in the topics
list of the testcase. You can then send by passing a
dict to the send function, or by passing keyword arguments.
class MyTestCase(ZMQTestMixin, TestCase):
topics = ["test"]
def test_something(self):
self.zmq_client.send("test", type="test", some="data", even_more="data")
You can also pass a model instance directly and send it:
class MyModel(OpenModuleModel):
data: str
zmq_client.send("test", MyModel(some="data"))
Receiving
Next message on a specific topic:
topic, message = self.zmq_client.wait_for_message_on_topic(b"test")
Next message which matches a filter:
rpc_id = ...
topic, message = self.zmq_client.wait_for_message(
filter=lambda topic, message: message.get("rpc_id") == rpc_id
)
RPC Servers
When testing RPC servers we again have to combat the slow joiner issue. In order to avoid it, a utility function exists
which waits until a passed RPC server has booted up compeltely. The method wait_for_rpc_server(...)
registers a dummy
handler in each of the rpc server's channels, and calls it repeatetly until it receives an answer.
class MyTestCase(RPCServerTestMixin, TestCase):
rpc_channels = ["controller"]
def test_my_server(self):
my_server = my_code.get_server()
self.wait_for_rpc_server(my_server)
Channels and dynamic registering of handlers
All RPC channels have to be listed in the ZMQTestMixin.rpc_channels
of the testcase.
When registering RPC channels on the go, the function needs to be called again, to ensure that the rpc server is properly listening on the new channels.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Hashes for openmodule_test-0.1.6-py2.py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 4b0b48854db2186ea8dfbfe9fab8c079c790c58f6f4d2e23451b7e47d83fa3b6 |
|
MD5 | 6d5f4a65a8365cf970a853991c2e08e0 |
|
BLAKE2b-256 | 04328cfa6300473fc30e0355e5ecf8307dcba29240cd0066679aeb69d97d265d |