Skip to main content

Fork of SAE J1939 stack implementation, originally created by Juergen Heilgemeir

Project description

Documentation build Status

Overview

This is a fork of the https://github.com/juergenH87/python-can-j1939 project, it essentially adds the possibility to get the origin of the message. For more information please check the readme file in the author’s project.

Installation

Install can-j1939 with pip:

$ pip install can-j1939-ttpsc

or do the trick with:

$ git clone https://github.com/ttpscmolinaf/python-can-j1939
$ cd j1939
$ pip install .

Upgrade

Upgrade an already installed can-j1939 package:

$ pip install --upgrade can-j1939

Quick start

To simply receive all passing (public) messages on the bus you can subscribe to the ECU object.

import logging
import time
import can
import j1939

logging.getLogger('j1939').setLevel(logging.DEBUG)
logging.getLogger('can').setLevel(logging.DEBUG)

def on_message(priority, pgn, sa, timestamp, data):
    """Receive incoming messages from the bus

    :param int priority:
        Priority of the message
    :param int pgn:
        Parameter Group Number of the message
    :param int sa:
        Source Address of the message
    :param int timestamp:
        Timestamp of the message
    :param bytearray data:
        Data of the PDU
    """
    print("PGN {} length {}".format(pgn, len(data)))

def main():
    print("Initializing")

    # create the ElectronicControlUnit (one ECU can hold multiple ControllerApplications)
    ecu = j1939.ElectronicControlUnit()

    # Connect to the CAN bus
    # Arguments are passed to python-can's can.interface.Bus() constructor
    # (see https://python-can.readthedocs.io/en/stable/bus.html).
    # ecu.connect(bustype='socketcan', channel='can0')
    # ecu.connect(bustype='kvaser', channel=0, bitrate=250000)
    ecu.connect(bustype='pcan', channel='PCAN_USBBUS1', bitrate=250000)
    # ecu.connect(bustype='ixxat', channel=0, bitrate=250000)
    # ecu.connect(bustype='vector', app_name='CANalyzer', channel=0, bitrate=250000)
    # ecu.connect(bustype='nican', channel='CAN0', bitrate=250000)

    # subscribe to all (global) messages on the bus
    ecu.subscribe(on_message)

    time.sleep(120)

    print("Deinitializing")
    ecu.disconnect()

if __name__ == '__main__':
    main()

A more sophisticated example in which the CA class was overloaded to include its own functionality:

import logging
import time
import can
import j1939

logging.getLogger('j1939').setLevel(logging.DEBUG)
logging.getLogger('can').setLevel(logging.DEBUG)

# compose the name descriptor for the new ca
name = j1939.Name(
    arbitrary_address_capable=0,
    industry_group=j1939.Name.IndustryGroup.Industrial,
    vehicle_system_instance=1,
    vehicle_system=1,
    function=1,
    function_instance=1,
    ecu_instance=1,
    manufacturer_code=666,
    identity_number=1234567
    )

# create the ControllerApplications
ca = j1939.ControllerApplication(name, 128)


def ca_receive(priority, pgn, source, timestamp, data):
    """Feed incoming message to this CA.
    (OVERLOADED function)
    :param int priority:
        Priority of the message
    :param int pgn:
        Parameter Group Number of the message
    :param intsa:
        Source Address of the message
    :param int timestamp:
        Timestamp of the message
    :param bytearray data:
        Data of the PDU
    """
    print("PGN {} length {}".format(pgn, len(data)))

def ca_timer_callback1(cookie):
    """Callback for sending messages

    This callback is registered at the ECU timer event mechanism to be
    executed every 500ms.

    :param cookie:
        A cookie registered at 'add_timer'. May be None.
    """
    # wait until we have our device_address
    if ca.state != j1939.ControllerApplication.State.NORMAL:
        # returning true keeps the timer event active
        return True

    # create data with 8 bytes
    data = [j1939.ControllerApplication.FieldValue.NOT_AVAILABLE_8] * 8

    # sending normal broadcast message
    ca.send_pgn(0, 0xFD, 0xED, 6, data)

    # sending normal peer-to-peer message, destintion address is 0x04
    ca.send_pgn(0, 0xE0, 0x04, 6, data)

    # returning true keeps the timer event active
    return True


def ca_timer_callback2(cookie):
    """Callback for sending messages

    This callback is registered at the ECU timer event mechanism to be
    executed every 500ms.

    :param cookie:
        A cookie registered at 'add_timer'. May be None.
    """
    # wait until we have our device_address
    if ca.state != j1939.ControllerApplication.State.NORMAL:
        # returning true keeps the timer event active
        return True

    # create data with 100 bytes
    data = [j1939.ControllerApplication.FieldValue.NOT_AVAILABLE_8] * 100

    # sending multipacket message with TP-BAM
    ca.send_pgn(0, 0xFE, 0xF6, 6, data)

    # sending multipacket message with TP-CMDT, destination address is 0x05
    ca.send_pgn(0, 0xD0, 0x05, 6, data)

    # returning true keeps the timer event active
    return True

def main():
    print("Initializing")

    # create the ElectronicControlUnit (one ECU can hold multiple ControllerApplications)
    ecu = j1939.ElectronicControlUnit()

    # Connect to the CAN bus
    # Arguments are passed to python-can's can.interface.Bus() constructor
    # (see https://python-can.readthedocs.io/en/stable/bus.html).
    # ecu.connect(bustype='socketcan', channel='can0')
    # ecu.connect(bustype='kvaser', channel=0, bitrate=250000)
    ecu.connect(bustype='pcan', channel='PCAN_USBBUS1', bitrate=250000)
    # ecu.connect(bustype='ixxat', channel=0, bitrate=250000)
    # ecu.connect(bustype='vector', app_name='CANalyzer', channel=0, bitrate=250000)
    # ecu.connect(bustype='nican', channel='CAN0', bitrate=250000)
    # ecu.connect('testchannel_1', bustype='virtual')

    # add CA to the ECU
    ecu.add_ca(controller_application=ca)
    ca.subscribe(ca_receive)
    # callback every 0.5s
    ca.add_timer(0.500, ca_timer_callback1)
    # callback every 5s
    ca.add_timer(5, ca_timer_callback2)
    # by starting the CA it starts the address claiming procedure on the bus
    ca.start()

    time.sleep(120)

    print("Deinitializing")
    ca.stop()
    ecu.disconnect()

if __name__ == '__main__':
    main()

Credits

This implementation was taken from https://github.com/juergenH87/python-can-j1939, as we needed to get information of the bus for the message that was received by the library.

Thanks for your great work Juergen :)!

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

can-j1939-ttpsc-1.0.3.tar.gz (36.1 kB view details)

Uploaded Source

Built Distribution

can_j1939_ttpsc-1.0.3-py2.py3-none-any.whl (39.3 kB view details)

Uploaded Python 2 Python 3

File details

Details for the file can-j1939-ttpsc-1.0.3.tar.gz.

File metadata

  • Download URL: can-j1939-ttpsc-1.0.3.tar.gz
  • Upload date:
  • Size: 36.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.2 CPython/3.9.18

File hashes

Hashes for can-j1939-ttpsc-1.0.3.tar.gz
Algorithm Hash digest
SHA256 1a079bb90d8aa7c6ed34078140aba2c3553efbf5eb1115f2eb2094c412734f14
MD5 a6289aed92cc1a3d2ab9d6c96d55ad29
BLAKE2b-256 ca3171a58e466c3673af5421bc5ca9ea3e9bef1b70fe0cdda9285cbb56e0acd5

See more details on using hashes here.

Provenance

File details

Details for the file can_j1939_ttpsc-1.0.3-py2.py3-none-any.whl.

File metadata

File hashes

Hashes for can_j1939_ttpsc-1.0.3-py2.py3-none-any.whl
Algorithm Hash digest
SHA256 ae8ad9a156b2048c93495120a9e2ce92a38b04d509e929bc34bab1c5b348a4d7
MD5 fc5e28ddd0d2600fa8c7ba02ef046e61
BLAKE2b-256 e5cc995c2023f2f7c1629c3939da8e09f4b198427cffd5480d6ae69086dd247a

See more details on using hashes here.

Provenance

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page