Skip to main content

pylibschc - A python wrapper for libSCHC

Project description

https://github.com/anr-bmbf-pivot/pylibschc/actions/workflows/test.yml/badge.svg https://codecov.io/gh/anr-bmbf-pivot/pylibschc/branch/main/graph/badge.svg?token=KPOQ0ERP9H PyPI - Status PyPI - Python Version

This provides a pythonic wrapper for libSCHC.

Installation

You can use pip to install the package once from PyPI:

pip install pylibschc

Usage

More documentation can be found here.

Rules

Rules are managed using a pydantic model, i.e., they can be loaded from a correctly typed dictionary (e.g. generated from a JSON or YAML file) using the pylibschc.rules module:

>>> import json
>>> from pylibschc.rules import Config
>>>
>>> with open("tests/artifacts/rules_example.json", encoding="utf-8") as f:
...    rules = Config(**json.load(f))
...    config = rules.deploy()

Do not forget to call the deploy() method if you change any rules to re-deploy the rules with libSCHC.

The header file for the rules, so they can be used with libSCHC on an embedded device, can be generated using

>>> with open("rule_config.h", "w", encoding="utf-8") as f:
...     written = f.write(rules.to_c_header())

An example for such a dictionary is provided in rules_example.json as JSON, the documentation of the concrete pydantic model you can find its API reference.

Compression/Decompression

Both compression and decompression can be achieved by using the pylibschc.compressor.Decompressor class. We use scapy in our example to construct a valid CoAP over IPv6 packet for compression for which the output() method is called:

>>> from scapy.all import IPv6, UDP, raw
>>> from scapy.contrib.coap import CoAP
>>> import pylibschc.compressor
>>>
>>> comp_dec = pylibschc.compressor.CompressorDecompressor(device=config.devices[0])
>>> pkt = raw(
...     IPv6(hlim=64, src="2001:db8::1", dst="2001:db8:1::2")
...     / UDP(
...         sport=5683,
...         dport=61618,
...     )
...     / CoAP(
...         ver=1,
...         code="2.05 Content",
...         type="NON",
...         msg_id=0x23B3,
...         token=b"\x32\x3a\xf3\xa3",
...         paymark=b"\xff",
...     )
...     / (
...         b'[{"id":1, "name":"CJ.H.L.(Joe) Lecomte) Heliport","code":"YOY","country":"CA"}]'
...     )
... )
>>> res, bit_array = comp_dec.output(pkt, direction=pylibschc.rules.Direction.UP)
>>> res
<CompressionResult.COMPRESSED: 1>
>>> bit_array.buffer
b'\x01\t3#\xaf:5\xb7\xb2&\x96B#\xa3\x12\xc2\x02&\xe6\x16\xd6R#\xa2$4\xa2\xe4\x82\xe4\xc2\xe2\x84\xa6\xf6R\x92\x04\xc6V6\xf6\xd7FR\x92\x04\x86V\xc6\x97\x06\xf7\'B"\xc2&6\xf6FR#\xa2%\x94\xf5\x92"\xc2&6\xf7V\xe7G\'\x92#\xa2$4\x12\'\xd5\xd0'

For decompression, the input() method is called:

>>> comp_dec.input(bit_array, direction=pylibschc.rules.Direction.UP)
b'`\x00\x00\x00\x00`\x11@ \x01\r\xb8\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01 \x01\r\xb8\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x02\x163\xf0\xb2\x00`r\xf2TE#\xb32:\xf3\xa3\xff[{"id":1, "name":"CJ.H.L.(Joe) Lecomte) Heliport","code":"YOY","country":"CA"}]'
>>> pkt == comp_dec.input(bit_array, direction=pylibschc.rules.Direction.UP)
True

Both input() and output() take either BitArray- or bytes-typed variables as input.

Fragmentation/Reassembly

For fragmentation, call the output() method of a pylibschc.fragmenter.FragmenterReassembler object. To actually send then from the, a send function needs to be registered for the device of the fragmenter. For reassembly or to handle acknowledgments, call the input() method of a pylibschc.fragmenter.FragmenterReassembler object. Both input() and output(), take either BitArray- or bytes-typed variables as input.

>>> import asyncio
>>> import logging
>>> import pylibschc.fragmenter
>>>
>>> fragmenter_queue = None
>>> loop = None
>>> timer_tasks = {}
>>> reassembly_buffer = None
>>> # shorten waiting times for this example
>>> config.devices[0].duty_cycle_ms = 500
>>>
>>> def send(buffer):
...     fragmenter_queue.put_nowait({"cmd": "send", "data": buffer})
...     return len(buffer)
...
>>> def post_timer_task(conn, timer_task, delay_sec, arg):
...     if conn in timer_tasks:
...         remove_timer_entry(conn)
...     timer_tasks[conn] = loop.call_later(delay_sec, timer_task, arg)
...
>>> def remove_timer_entry(conn):
...     if conn in timer_tasks:
...         timer_tasks[conn].cancel()
...         del timer_tasks[conn]
...
>>> def end_rx(conn):
...     reassembly_buffer.set_result(conn.mbuf)
...
>>> def end_tx(conn):
...     fragmenter_queue.put_nowait({"cmd": "end_tx"})
...
>>> async def asyncized_input(reassembler, buffer):
...     return reassembler.input(buffer)
...
>>> async def fragment_and_reassemble():
...     # just making sure these variables are initialized in the same loop
...     global fragmenter_queue
...     global loop
...     global reassembly_buffer
...
...     fragmenter_queue = asyncio.Queue()
...     loop = asyncio.get_running_loop()
...     reassembly_buffer = loop.create_future()
...     fragmenter = pylibschc.fragmenter.FragmenterReassembler(
...         device=config.devices[0],
...         mode=pylibschc.fragmenter.FragmentationMode.NO_ACK,
...         post_timer_task=post_timer_task,
...         end_tx=end_tx,
...         remove_timer_entry=remove_timer_entry,
...     )
...     fragmenter.register_send(send)
...     reassembler = pylibschc.fragmenter.FragmenterReassembler(
...         device=config.devices[0],
...         post_timer_task=post_timer_task,
...         end_rx=end_rx,
...         remove_timer_entry=remove_timer_entry,
...     )
...     print("fragmenter.output ->", fragmenter.output(bit_array))
...     cmd = {}
...     while cmd.get("cmd") != "end_tx":
...         cmd = await asyncio.wait_for(fragmenter_queue.get(), timeout=2)
...         if cmd["cmd"] == "send":
...             print(
...                 "reassembler.input ->",
...                 await asyncized_input(reassembler, cmd["data"])
...             )
...     return await asyncio.wait_for(reassembly_buffer, timeout=5)
...
>>> asyncio.run(fragment_and_reassemble()) == bit_array.buffer
fragmenter.output -> FragmentationResult.SUCCESS
reassembler.input -> ReassemblyStatus.ONGOING
reassembler.input -> ReassemblyStatus.COMPLETED
True

While this example uses asyncio to parallelize timer calls, any method to establish concurrency can be used (see test for a threaded fragmenter/reassembler for an example using the threading module) as long as the access to libSCHC (including calls to timer tasks) is synchronized.

License

This code is published under the GNU General Public License Version 3 (GPLv3). Please keep in mind, that libSCHC is dual licensed for non-open source use. For more, have a look at the license information over at libSCHC.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pylibschc-0.2.0b2.tar.gz (244.1 kB view details)

Uploaded Source

File details

Details for the file pylibschc-0.2.0b2.tar.gz.

File metadata

  • Download URL: pylibschc-0.2.0b2.tar.gz
  • Upload date:
  • Size: 244.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.2 CPython/3.9.16

File hashes

Hashes for pylibschc-0.2.0b2.tar.gz
Algorithm Hash digest
SHA256 c5c5be58c69a1e1ea0792db538bc0a53eea47fe878b8b42ba7b927a4d66a8b2c
MD5 3895a1872edfd2e88040fc09c30663f4
BLAKE2b-256 d7293df8a497e5a99cd89163ba662a679b7598a03a61b714f9bb6f3169d3e6b1

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page