Python implementation of the C++ CANaerospace Speciication ( by www.stockflightsystems.com) and the library by mjs513 https://github.com/mjs513/CANaerospace
Project description
CANAerospace for Python
A Python implementation of the CANAS protocol, designed for hardware-agnostic CAN communication, testing, and simulation. This entire work is a port of the fantastic C++ library created by MJS513.
Features
- mjs513/CANaerospace Compatible API
- Hardware-agnostic CAN (Standard and FD)
- Built-in support for redundant CAN bus
- CANAS 1.7 DataTypes, packing, and unpacking
- Parameter subscription and advertisement
- Service handling with a centralized
NodeServiceHandler - Modular Identifier Directory support
- Support for virtual can (python-can), socket-can (python-can), and other
python-canbackends - Transmission scheduling for time-triggered messages
- Unit system using
pintfor dimensional analysis
Roadmap
This project is actively under development. Here is a roadmap of the key features and improvements that are planned:
Core Logic
- Implement the main
updatemethod: The core logic for processing incoming frames and dispatching them to handlers is now implemented. - Implement Repeated Message Filtering: Logic to filter out repeated messages based on timestamps and message codes is now implemented for both parameters and services.
- Implement Timeout Handling: Logic to handle timeouts for services and other time-sensitive operations is now implemented.
Advanced Service Handling
- Stateful, Multi-Frame Services: Refactor the
NodeServiceHandleror create new classes to manage stateful, multi-frame services like Data Download (DDS) and Data Upload (DUS). This will involve handling session management, data chunking, and checksum verification. - Complete Service Implementation: Implement the full logic for all services defined in the
ServiceCodeenum, including error handling and all specified message codes.
Comprehensive Testing
- Expand Test Coverage for Core Logic: The core logic is now well-tested.
- Full Service Lifecycle Tests: Create tests that cover the complete lifecycle of each service, including multi-frame transfers, error conditions, and timeouts.
- Integration Tests: Add integration tests that simulate a complete CAN bus with multiple nodes to verify the interaction between different components of the library.
Documentation
- API Reference: Generate a complete API reference using a tool like Sphinx.
- Usage Examples: Provide more detailed examples of how to use the library for common tasks, such as creating an ECU, sending and receiving messages, and interacting with services.
Extending the Transport Layer
The library is designed to be hardware-agnostic, and it is easy to add support for new transport layers. Here is an example of how to create a custom transport layer that reads CAN messages from a Redis server.
First, you will need a class that implements the BusAdapter protocol defined in src/canaerospace/driver.py. This class will need to implement the send and set_filters methods.
# src/canaerospace/transport/redis.py
import redis
import json
from src.canaerospace.driver import BusAdapter, CANFrame, CANFilterConfig
class RedisBus(BusAdapter):
def __init__(self, channel: str):
self.redis = redis.Redis()
self.pubsub = self.redis.pubsub()
self.channel = channel
self.pubsub.subscribe(self.channel)
def send(self, iface: int, frame: CANFrame) -> int:
message = {
'iface': iface,
'id': frame.id,
'data': frame.data.hex(),
'is_extended_id': frame.is_extended_id
}
return self.redis.publish(self.channel, json.dumps(message))
def set_filters(self, iface: int, filters: list[CANFilterConfig]):
# Redis pub/sub does not support filtering, so we do nothing.
pass
def receive(self, timeout: float | None = None) -> CANFrame | None:
message = self.pubsub.get_message(timeout=timeout)
if message and message['type'] == 'message':
data = json.loads(message['data'])
return CANFrame(
id=data['id'],
data=bytes.fromhex(data['data']),
is_extended_id=data['is_extended_id']
)
return None
Now, you can use this RedisBus with the CANASInstance:
# main.py
from src.canaerospace import CANASInstance, CANAeroConfig
from src.canaerospace.transport.redis import RedisBus
from src.canaerospace.frame import decode
import time
def main():
bus = RedisBus(channel='can-bus')
config = CANAeroConfig(bus=bus, node_id=1)
instance = CANASInstance(config)
while True:
frame = bus.receive(timeout=1.0)
if frame:
now = int(time.time() * 1e6)
instance.update_timestamp(0, frame, now)
# Decode and print the message
try:
msg = decode(frame.id, frame.data)
print(f"Received Message: {msg}")
print(f" Message ID: {msg.can_id}")
print(f" Data Type: {msg.data_type.name}")
print(f" Decoded Data: {msg.values()}")
except Exception as e:
print(f"Error decoding frame: {e}")
if __name__ == "__main__":
main()
Testing
This project uses pytest for unit testing. To run the full test suite, simply run the following command from the root of the project:
pytest
This will automatically discover and run all tests in the tests/ directory.
Contributing
Contributions are welcome! Please see the CONTRIBUTING.md file for detailed guidelines on how to contribute to the project.
Development
Linting
Pre-commit hooks for linting are enabled. More info at https://pre-commit.com/
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file python_canaerospace-0.2.0.tar.gz.
File metadata
- Download URL: python_canaerospace-0.2.0.tar.gz
- Upload date:
- Size: 40.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
0ecd67e3261589c101111eabe8331cdc67687cc6b71e3924931fb976e2b954d0
|
|
| MD5 |
88b9ad6d6bb9f34c579c7175ef8d2bd3
|
|
| BLAKE2b-256 |
526536ed20a09186595e02a1285654512fb2b3f973d264a2f2445004d6ca7ad8
|
File details
Details for the file python_canaerospace-0.2.0-py3-none-any.whl.
File metadata
- Download URL: python_canaerospace-0.2.0-py3-none-any.whl
- Upload date:
- Size: 36.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
253601d7163f3efd6fa0b2b14e991bd93e4d1b99b3373b9c06b4b5d793ebba2d
|
|
| MD5 |
a4558f4b0f6ca70efabf6bb805bf6acb
|
|
| BLAKE2b-256 |
0cd4c0514a212021d51be8052ca0287c591937ca6ad495f476f8a426af00d1dd
|