Skip to main content

Python implementation of the C++ CANaerospace Speciication ( by www.stockflightsystems.com) and the library by mjs513 https://github.com/mjs513/CANaerospace

Project description

CANAerospace for Python

A Python implementation of the CANAS protocol, designed for hardware-agnostic CAN communication, testing, and simulation. This entire work is a port of the fantastic C++ library created by MJS513.

Features

  • mjs513/CANaerospace Compatible API
  • Hardware-agnostic CAN (Standard and FD)
  • Built-in support for redundant CAN bus
  • CANAS 1.7 DataTypes, packing, and unpacking
  • Parameter subscription and advertisement
  • Service handling with a centralized NodeServiceHandler
  • Modular Identifier Directory support
  • Support for virtual can (python-can), socket-can (python-can), and other python-can backends
  • Transmission scheduling for time-triggered messages
  • Unit system using pint for dimensional analysis

Roadmap

This project is actively under development. Here is a roadmap of the key features and improvements that are planned:

Core Logic

  • Implement the main update method: The core logic for processing incoming frames and dispatching them to handlers is now implemented.
  • Implement Repeated Message Filtering: Logic to filter out repeated messages based on timestamps and message codes is now implemented for both parameters and services.
  • Implement Timeout Handling: Logic to handle timeouts for services and other time-sensitive operations is now implemented.

Advanced Service Handling

  • Stateful, Multi-Frame Services: Refactor the NodeServiceHandler or create new classes to manage stateful, multi-frame services like Data Download (DDS) and Data Upload (DUS). This will involve handling session management, data chunking, and checksum verification.
  • Complete Service Implementation: Implement the full logic for all services defined in the ServiceCode enum, including error handling and all specified message codes.

Comprehensive Testing

  • Expand Test Coverage for Core Logic: The core logic is now well-tested.
  • Full Service Lifecycle Tests: Create tests that cover the complete lifecycle of each service, including multi-frame transfers, error conditions, and timeouts.
  • Integration Tests: Add integration tests that simulate a complete CAN bus with multiple nodes to verify the interaction between different components of the library.

Documentation

  • API Reference: Generate a complete API reference using a tool like Sphinx.
  • Usage Examples: Provide more detailed examples of how to use the library for common tasks, such as creating an ECU, sending and receiving messages, and interacting with services.

Extending the Transport Layer

The library is designed to be hardware-agnostic, and it is easy to add support for new transport layers. Here is an example of how to create a custom transport layer that reads CAN messages from a Redis server.

First, you will need a class that implements the BusAdapter protocol defined in src/canaerospace/driver.py. This class will need to implement the send and set_filters methods.

# src/canaerospace/transport/redis.py
import redis
import json
from src.canaerospace.driver import BusAdapter, CANFrame, CANFilterConfig

class RedisBus(BusAdapter):
    def __init__(self, channel: str):
        self.redis = redis.Redis()
        self.pubsub = self.redis.pubsub()
        self.channel = channel
        self.pubsub.subscribe(self.channel)

    def send(self, iface: int, frame: CANFrame) -> int:
        message = {
            'iface': iface,
            'id': frame.id,
            'data': frame.data.hex(),
            'is_extended_id': frame.is_extended_id
        }
        return self.redis.publish(self.channel, json.dumps(message))

    def set_filters(self, iface: int, filters: list[CANFilterConfig]):
        # Redis pub/sub does not support filtering, so we do nothing.
        pass

    def receive(self, timeout: float | None = None) -> CANFrame | None:
        message = self.pubsub.get_message(timeout=timeout)
        if message and message['type'] == 'message':
            data = json.loads(message['data'])
            return CANFrame(
                id=data['id'],
                data=bytes.fromhex(data['data']),
                is_extended_id=data['is_extended_id']
            )
        return None

Now, you can use this RedisBus with the CANASInstance:

# main.py
from src.canaerospace import CANASInstance, CANAeroConfig
from src.canaerospace.transport.redis import RedisBus
from src.canaerospace.frame import decode
import time

def main():
    bus = RedisBus(channel='can-bus')
    config = CANAeroConfig(bus=bus, node_id=1)
    instance = CANASInstance(config)

    while True:
        frame = bus.receive(timeout=1.0)
        if frame:
            now = int(time.time() * 1e6)
            instance.update_timestamp(0, frame, now)

            # Decode and print the message
            try:
                msg = decode(frame.id, frame.data)
                print(f"Received Message: {msg}")
                print(f"  Message ID: {msg.can_id}")
                print(f"  Data Type: {msg.data_type.name}")
                print(f"  Decoded Data: {msg.values()}")
            except Exception as e:
                print(f"Error decoding frame: {e}")

if __name__ == "__main__":
    main()

Testing

This project uses pytest for unit testing. To run the full test suite, simply run the following command from the root of the project:

pytest

This will automatically discover and run all tests in the tests/ directory.

Contributing

Contributions are welcome! Please see the CONTRIBUTING.md file for detailed guidelines on how to contribute to the project.

Development

Linting

Pre-commit hooks for linting are enabled. More info at https://pre-commit.com/

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

python_canaerospace-0.2.0.tar.gz (40.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

python_canaerospace-0.2.0-py3-none-any.whl (36.3 kB view details)

Uploaded Python 3

File details

Details for the file python_canaerospace-0.2.0.tar.gz.

File metadata

  • Download URL: python_canaerospace-0.2.0.tar.gz
  • Upload date:
  • Size: 40.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.3

File hashes

Hashes for python_canaerospace-0.2.0.tar.gz
Algorithm Hash digest
SHA256 0ecd67e3261589c101111eabe8331cdc67687cc6b71e3924931fb976e2b954d0
MD5 88b9ad6d6bb9f34c579c7175ef8d2bd3
BLAKE2b-256 526536ed20a09186595e02a1285654512fb2b3f973d264a2f2445004d6ca7ad8

See more details on using hashes here.

File details

Details for the file python_canaerospace-0.2.0-py3-none-any.whl.

File metadata

File hashes

Hashes for python_canaerospace-0.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 253601d7163f3efd6fa0b2b14e991bd93e4d1b99b3373b9c06b4b5d793ebba2d
MD5 a4558f4b0f6ca70efabf6bb805bf6acb
BLAKE2b-256 0cd4c0514a212021d51be8052ca0287c591937ca6ad495f476f8a426af00d1dd

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page