Asyncio CAN client for Waveshare 2-CH-CAN-TO-ETH with auto-reconnect and CLI
Project description
caneth
Asyncio CAN client for Waveshare 2-CH-CAN-TO-ETH devices.
It implements the device's 13‑byte transparent CAN frame format (1 flag/DLC byte + 4‑byte CAN ID + 8 data bytes), supports auto‑reconnect, and provides:
- A receive loop with:
- global observers (
on_frame) and - precise filters (
register_callback(can_id, d0, d1)).
- global observers (
- A one‑shot awaitable API (
wait_for) to await the next matching frame. - A CLI with
watch,send,wait, and an interactiverepl.
Requires Python 3.9+.
Install (local dev)
# inside the repo root (contains pyproject.toml)
python -m venv .venv
source .venv/bin/activate # Windows: .venv\Scripts\Activate.ps1
python -m pip install -U pip
pip install -e .
CLI
# Watch frames
caneth --host 192.168.0.7 --port 20001 watch
# Send a frame (standard ID 0x123, payload 01 02 03 04)
caneth --host 192.168.0.7 --port 20001 send --id 0x123 --data "01 02 03 04"
# Wait for a specific frame (ID 0x123, bytes 01 02) with 10s timeout
caneth --host 192.168.0.7 --port 20001 wait --id 0x123 --d0 0x01 --d1 0x02 --wait-timeout 10
# Interactive console (send, on, watch, wait, help, quit)
caneth --host 192.168.0.7 --port 20001 repl
If caneth is not on PATH, you can run the module directly:
python -m caneth.cli --host 192.168.0.7 --port 20001 watch
Python API
Import
from caneth import WaveShareCANClient, CANFrame, parse_hex_bytes
class WaveShareCANClient(host: str, port: int, *, reconnect_initial=0.5, reconnect_max=10.0, reconnect_cap=60.0, name="can1")
An asyncio TCP client that speaks the Waveshare transparent CAN protocol.
- host / port: IP and TCP port of the device's CAN channel (e.g.,
20001for CAN1). - reconnect_initial: initial reconnect backoff in seconds.
- reconnect_max: maximum backoff in seconds. Set to
0to reconnect forever. - reconnect_cap: when
reconnect_max=0, the delay is capped to this value (default 60.0s). - name: name used in logs/task names.
Methods
await start() -> None
Starts the background connection manager and receive loop. Returns immediately; use wait_connected() to wait for the first connection.
await wait_connected(timeout: float | None = None) -> None
Wait until the socket is connected (or raise asyncio.TimeoutError).
await close() -> None
Stops the background tasks and closes the socket.
register_callback(can_id: int, d0: int | None = None, d1: int | None = None, callback: Callable[[CANFrame], Awaitable[None] | None]) -> None
Register a callback for a specific CAN ID and optionally the first one or two data bytes.
- With only
can_id, it triggers on any payload for that ID. - With
d0, it triggers when the first byte matches. - With
d0andd1, it triggers when both first bytes match. - If you pass
d1, you must also passd0.
on_frame(callback: Callable[[CANFrame], Awaitable[None] | None]) -> None
Registers a callback invoked for every received frame (sync or async).
await wait_for(can_id: int, d0: int | None = None, d1: int | None = None, *, timeout: float | None = None, callback: Callable[[CANFrame], Awaitable[None] | None] | None = None) -> CANFrame
Waits for the next frame whose CAN ID matches and whose first one or two data bytes (if provided) match.
- Returns the matching
CANFrame. - Raises
asyncio.TimeoutErroron timeout. - If
callbackis provided, it is invoked once when the match occurs.
await send(can_id: int, data: bytes | list[int] | tuple[int, ...] = b"", *, extended: bool | None = None, rtr: bool = False) -> None
Sends one CAN frame:
extended: whenNone, it auto‑selects extended ifcan_id > 0x7FF.data: up to 8 bytes; you can pass bytes or a list/tuple of ints0..255.rtr: setTruefor RTR frames.- The device format is encoded for you:
flags/DLC (1) + CAN ID (4, big-endian) + data (8, padded).
Usage Examples
1) Observe every frame and send a couple
import asyncio
from caneth import WaveShareCANClient
async def main():
client = WaveShareCANClient("192.168.0.7", 20001, name="CAN1")
# Print every frame received
client.on_frame(lambda f: print("[RX]", f))
await client.start()
await client.wait_connected(timeout=10)
# Send standard
await client.send(0x123, [0x01, 0x02, 0x03, 0x04])
# Send extended
await client.send(0x12345678, b"\xDE\xAD\xBE\xEF", extended=True)
try:
while True:
await asyncio.sleep(3600)
except KeyboardInterrupt:
pass
finally:
await client.close()
asyncio.run(main())
2) Register a precise filter (ID + first two data bytes)
import asyncio
from caneth import WaveShareCANClient, CANFrame
async def main():
client = WaveShareCANClient("192.168.0.7", 20001, name="CAN1")
def on_specific(f: CANFrame) -> None:
print("[MATCH 0x123/01 02]", f)
client.register_callback(0x123, 0x01, 0x02, on_specific)
await client.start()
await client.wait_connected(timeout=10)
# keep running
await asyncio.sleep(3600)
asyncio.run(main())
3) Wait for a frame once (optionally with a one-off callback)
import asyncio
from caneth import WaveShareCANClient
async def main():
client = WaveShareCANClient("192.168.0.7", 20001, name="CAN1")
await client.start()
await client.wait_connected(timeout=10)
async def when_found(frame):
print("[ONE-OFF CALLBACK]", frame)
try:
frame = await client.wait_for(0x123, d0=0x01, d1=0x02, timeout=5, callback=when_found)
print("Received:", frame)
except asyncio.TimeoutError:
print("Timed out waiting for the frame")
finally:
await client.close()
asyncio.run(main())
4) Convert user-friendly hex strings to bytes
from caneth import parse_hex_bytes
print(parse_hex_bytes("12 34 56")) # b'\x12\x34\x56'
print(parse_hex_bytes("0x12,0xFF,0x00")) # b'\x12\xFF\x00'
print(parse_hex_bytes("12-34-56")) # b'\x12\x34\x56'
print(parse_hex_bytes("123456")) # b'\x12\x34\x56'
Protocol Notes (Waveshare transparent CAN)
- Each CAN frame is encoded as 13 bytes over TCP:
- flags/DLC (1 byte):
- bit7 (
0x80): 1 = Extended (29-bit), 0 = Standard (11-bit) - bit6 (
0x40): 1 = RTR, 0 = Data frame - bits3..0 (
0x0F): DLC (0..8) — number of valid data bytes
- bit7 (
- CAN ID (4 bytes, big-endian)
- Data (8 bytes, zero-padded; DLC says how many are valid)
- flags/DLC (1 byte):
- The device may batch multiple frames into one TCP packet; this client reads a stream and slices it into 13‑byte chunks.
- The client auto‑chooses extended if
can_id > 0x7FFunless you force it viaextended=True/Falseinsend().
Reconnect Behavior
- The client automatically reconnects on socket errors and EOF (set
reconnect_max=0to retry forever; delay capped byreconnect_cap). - Backoff grows from
reconnect_initialup toreconnect_max. wait_connected()is useful afterstart()to wait for first connect.
Threading & Concurrency
- Designed for single asyncio event loop; callbacks may be sync or async.
register_callbackandon_framecallbacks are invoked serially in the receive task; prefer light, non-blocking work (or spawn your own tasks).
Troubleshooting
- No frames? Verify the device mode (TCP server vs client), IP/port, and that the Waveshare channel (e.g., CAN1 → port 20001) is enabled.
- Extended IDs: If you pass an extended ID but force
extended=False, the device will still receive the 29‑bit ID value; ensure your CAN side expects standard vs extended correctly. - Permissions / firewalls: Make sure the host firewall allows outbound TCP to the device port.
Minimal Test Harness (optional)
If you don't have the device handy, you can simulate a server that pushes a single frame in the Waveshare format:
import asyncio
def build_frame(can_id: int, data: bytes, extended: bool = None, rtr: bool = False) -> bytes:
if extended is None:
extended = can_id > 0x7FF
dlc = len(data)
b0 = (0x80 if extended else 0) | (0x40 if rtr else 0) | (dlc & 0x0F)
buf = bytearray(13)
buf[0] = b0
buf[1:5] = can_id.to_bytes(4, "big")
buf[5:5+dlc] = data
return bytes(buf)
async def fake_server(reader, writer):
# send ID=0x123 with data 01 02 03 04 after a short delay
await asyncio.sleep(1)
writer.write(build_frame(0x123, b"\x01\x02\x03\x04"))
await writer.drain()
await asyncio.sleep(1)
writer.close()
async def main():
server = await asyncio.start_server(fake_server, "127.0.0.1", 20001)
print("Fake server on 127.0.0.1:20001 (Ctrl-C to quit)")
async with server:
await server.serve_forever()
asyncio.run(main())
Run the server in one terminal, then in another:
caneth --host 127.0.0.1 --port 20001 watch
# or:
caneth --host 127.0.0.1 --port 20001 wait --id 0x123 --d0 0x01 --d1 0x02 --wait-timeout 5
License
MIT (see pyproject.toml classifiers or add a LICENSE file if you plan to distribute).
Testing
This repo ships with a small pytest suite and a fake TCP server that emulates the Waveshare framing.
1) Install dev dependencies
python -m venv .venv
source .venv/bin/activate # Windows: .venv\Scripts\Activate.ps1
python -m pip install -U pip
pip install -e . pytest pytest-asyncio
Python 3.9+ is required. Pytest-asyncio is needed because the tests use async fixtures and async test functions.
2) (Recommended) Configure pytest for asyncio
Create a pytest.ini at the repo root (where pyproject.toml lives):
[pytest]
asyncio_mode = auto
This removes strict-mode warnings and ensures async fixtures work without extra markers.
3) Run the test suite
pytest -q
Run a single test file or a single test case:
pytest tests/test_wait_for.py -q
pytest tests/test_wait_for.py::test_wait_for_id_only -q
Stream print() output for debugging:
pytest -s tests/test_client_receive.py::test_on_frame_receives
(Optional) Coverage:
pip install pytest-cov
pytest --cov=caneth --cov-report=term-missing
What the tests do
- Unit helpers:
tests/test_utils.pychecksparse_hex_bytes. - Client RX:
tests/test_client_receive.pyensureson_frameobservers fire. - Callback registry:
tests/test_register_callback.pycovers ID-only, ID+d0, and ID+d0+d1matchers. - Awaiting frames:
tests/test_wait_for.pycoverswait_for(...)for ID-only and with first bytes. - Encoding:
tests/test_send_encoding.pyvalidates the 13-byte Waveshare format written bysend(...). - CLI parsers:
tests/test_cli_parsers.pyvalidates permissive hex/decimal parsing for IDs/bytes.
A fake in-process TCP server fixture (ws_server) is used to avoid real hardware.
Common issues & fixes
-
ModuleNotFoundError: No module named 'caneth'
Make sure you installed the package in editable mode from repo root:
pip install -e . -
Async fixture warnings/errors (e.g., “async_generator object”, “strict mode”)
Ensurepytest-asynciois installed andtests/conftest.pyuses@pytest_asyncio.fixture.
Prefer addingpytest.iniwithasyncio_mode = auto(see step 2). -
“attempted relative import with no known parent package”
Make sure you are runningpytestfrom the project root (so it discovers the package) and you have the currenttests/tree. If you created your own helpers, avoid importing fromconftest.py; put helpers intests/helpers.pyinstead. -
Old
conftest.pyerrors likeNameError: send is not defined
Ensure you’re on the updated tests where the fixture returns aSimpleNamespacefor state. Replace yourtests/conftest.pywith the one in this repo.
CI, Docs & Publishing (GitHub Actions)
This repo includes three workflows under .github/workflows/:
-
CI — test matrix on Python 3.9–3.13 with coverage.
- File:
.github/workflows/ci.yml - Triggers on pushes and PRs.
- Produces a
coverage.xmlartifact per Python version. - Optionally uploads coverage to Codecov if
CODECOV_TOKENsecret is set.
- File:
-
Docs — builds API docs with pdoc and publishes to GitHub Pages.
- File:
.github/workflows/docs.yml - Triggers on pushes to
main/masterand manual runs. - Output is published to the repository’s GitHub Pages.
- Enable Pages in Settings → Pages, or just run the workflow; it configures the Pages environment.
- File:
-
Publish — builds sdist/wheel and publishes to PyPI on version tags.
- File:
.github/workflows/publish.yml - Triggers on tags named like
v1.2.3. - Uses Trusted Publishing if your PyPI project is configured for it (OIDC), otherwise set Repository Secret
PYPI_API_TOKEN. - You can create an API token here: https://pypi.org/manage/account/token/
- File:
Setup steps
- Codecov (optional): add a repository secret
CODECOV_TOKEN. - PyPI (optional): either enable Trusted Publishing on PyPI (recommended) or add
PYPI_API_TOKENas a repository secret. - GitHub Pages (Docs): after the first successful run of
Docsworkflow, a Pages site URL will appear in the workflow summary. You can also configure it manually in Settings → Pages.
Run locally
# Build docs locally
pip install pdoc .
pdoc -o site caneth
python -m http.server -d site 8000 # view at http://localhost:8000
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file caneth-0.3.2.tar.gz.
File metadata
- Download URL: caneth-0.3.2.tar.gz
- Upload date:
- Size: 44.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
bd126ea095e058c8f0d7acc30afd58bfbecb63f4ae667bf57b01b249410d7753
|
|
| MD5 |
1a747eb291e460a08aaa91b8e7d51427
|
|
| BLAKE2b-256 |
6d4716d35635efb6f12445d2066c8eef1dd22dd8f46852e9210d6f9a5fd9e6b4
|
Provenance
The following attestation bundles were made for caneth-0.3.2.tar.gz:
Publisher:
publish.yml on kstaniek/caneth
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
caneth-0.3.2.tar.gz -
Subject digest:
bd126ea095e058c8f0d7acc30afd58bfbecb63f4ae667bf57b01b249410d7753 - Sigstore transparency entry: 506886965
- Sigstore integration time:
-
Permalink:
kstaniek/caneth@3fc63726964a4237faa577e10a3b18ed127a5f4e -
Branch / Tag:
refs/tags/v0.3.2 - Owner: https://github.com/kstaniek
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@3fc63726964a4237faa577e10a3b18ed127a5f4e -
Trigger Event:
push
-
Statement type:
File details
Details for the file caneth-0.3.2-py3-none-any.whl.
File metadata
- Download URL: caneth-0.3.2-py3-none-any.whl
- Upload date:
- Size: 21.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
b2827028f1e3441126b820908d40bcfe49f7ab475a3c039b6c9d921d13efeb35
|
|
| MD5 |
3fa6a0ff442a663900923ce2c88e2924
|
|
| BLAKE2b-256 |
cd3bfad7c977b777703ff087313d2ec9f3ac84f91836845fa16df72e7c1d4ea4
|
Provenance
The following attestation bundles were made for caneth-0.3.2-py3-none-any.whl:
Publisher:
publish.yml on kstaniek/caneth
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
caneth-0.3.2-py3-none-any.whl -
Subject digest:
b2827028f1e3441126b820908d40bcfe49f7ab475a3c039b6c9d921d13efeb35 - Sigstore transparency entry: 506886994
- Sigstore integration time:
-
Permalink:
kstaniek/caneth@3fc63726964a4237faa577e10a3b18ed127a5f4e -
Branch / Tag:
refs/tags/v0.3.2 - Owner: https://github.com/kstaniek
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@3fc63726964a4237faa577e10a3b18ed127a5f4e -
Trigger Event:
push
-
Statement type: