-
Notifications
You must be signed in to change notification settings - Fork 24
Add CAN and LIN database creation examples #219
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
87436c7
f617740
e43b498
b5c5f9a
57fb0cf
855b783
02dac70
024bacc
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,4 +1,5 @@ | ||
| .cache/ | ||
| .mypy_cache/ | ||
| .tox/ | ||
| junit/ | ||
| __pycache__ | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -14,3 +14,4 @@ Examples | |
| examples/conversion | ||
| examples/can_lin_diff | ||
| examples/programmatic_databases | ||
| examples/database_creation | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,18 @@ | ||
| Database Creation Example | ||
| ========================= | ||
|
|
||
| This example programmatically modifies the in-memory database to | ||
| contain a cluster, a frame, and two signals. The database is then | ||
| used in a :any:`nixnet.session.SignalOutSinglePointSession` and | ||
| :any:`nixnet.session.SignalInSinglePointSession` to write and then | ||
| read a pair of signals. | ||
|
|
||
| CAN Database Creation | ||
| --------------------- | ||
|
|
||
| .. literalinclude:: ../../nixnet_examples/can_dynamic_database_creation.py | ||
|
|
||
| LIN Database Creation | ||
| --------------------- | ||
|
|
||
| .. literalinclude:: ../../nixnet_examples/lin_dynamic_database_creation.py |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -2,8 +2,11 @@ | |
| from __future__ import division | ||
| from __future__ import print_function | ||
|
|
||
| import typing # NOQA: F401 | ||
|
|
||
| from nixnet import _props | ||
| from nixnet import constants | ||
| from nixnet.db import _frame | ||
|
|
||
|
|
||
| class LinSchedEntry(object): | ||
|
|
@@ -56,11 +59,15 @@ def event_id(self, value): | |
|
|
||
| @property | ||
| def frames(self): | ||
| return _props.get_lin_sched_entry_frames(self._handle) | ||
| # type: () -> typing.Iterable[_frame.Frame] | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. From travis: You need to import typing. See some of the other files for how to do it without causing flake8 problems |
||
| for ref in _props.get_lin_sched_entry_frames(self._handle): | ||
| yield _frame.Frame(ref) | ||
|
|
||
| @frames.setter | ||
| def frames(self, value): | ||
| _props.set_lin_sched_entry_frames(self._handle, value) | ||
| # type: (typing.Iterable[_frame.Frame]) -> None | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ditto about mypy error from travis |
||
| frame_handles = [frame._handle for frame in value] | ||
| _props.set_lin_sched_entry_frames(self._handle, frame_handles) | ||
|
|
||
| @property | ||
| def name(self): | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,87 @@ | ||
| from __future__ import absolute_import | ||
| from __future__ import division | ||
| from __future__ import print_function | ||
|
|
||
| from random import randint | ||
| import six | ||
| import time | ||
|
|
||
| import nixnet | ||
| from nixnet import constants | ||
| from nixnet import db | ||
|
|
||
|
|
||
| def main(): | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Each example
|
||
| database_name = ':memory:' | ||
| cluster_name = 'CAN_Cluster' | ||
| frame_name = 'CAN_Event_Frame' | ||
| signal_1_name = 'CAN_Event_Signal_1' | ||
| signal_2_name = 'CAN_Event_Signal_2' | ||
| signal_list = [signal_1_name, signal_2_name] | ||
| output_interface = 'CAN1' | ||
| input_interface = 'CAN2' | ||
|
|
||
| # Open the default in-memory database. | ||
| with db.Database(database_name) as database: | ||
|
|
||
| # Add a CAN cluster, a frame, and two signals to the database. | ||
| cluster = database.clusters.add(cluster_name) | ||
| cluster.protocol = constants.Protocol.CAN | ||
| cluster.baud_rate = 125000 | ||
| frame = cluster.frames.add(frame_name) | ||
| frame.id = 1 | ||
| frame.payload_len = 2 | ||
| signal_1 = frame.mux_static_signals.add(signal_1_name) | ||
| signal_1.byte_ordr = constants.SigByteOrdr.BIG_ENDIAN | ||
| signal_1.data_type = constants.SigDataType.UNSIGNED | ||
| signal_1.start_bit = 0 | ||
| signal_1.num_bits = 8 | ||
| signal_2 = frame.mux_static_signals.add(signal_2_name) | ||
| signal_2.byte_ordr = constants.SigByteOrdr.BIG_ENDIAN | ||
| signal_2.data_type = constants.SigDataType.UNSIGNED | ||
| signal_2.start_bit = 8 | ||
| signal_2.num_bits = 8 | ||
|
|
||
| # Use the database we just created, write and then read a pair of signals. | ||
| with nixnet.SignalOutSinglePointSession( | ||
| output_interface, | ||
| database_name, | ||
| cluster_name, | ||
| signal_list) as output_session: | ||
| with nixnet.SignalInSinglePointSession( | ||
| input_interface, | ||
| database_name, | ||
| cluster_name, | ||
| signal_list) as input_session: | ||
| terminated_cable = six.moves.input('Are you using a terminated cable (Y or N)? ') | ||
| if terminated_cable.lower() == "y": | ||
| input_session.intf.can_term = constants.CanTerm.ON | ||
| output_session.intf.can_term = constants.CanTerm.OFF | ||
| elif terminated_cable.lower() == "n": | ||
| input_session.intf.can_term = constants.CanTerm.ON | ||
| output_session.intf.can_term = constants.CanTerm.ON | ||
| else: | ||
| print("Unrecognised input ({}), assuming 'n'".format(terminated_cable)) | ||
| input_session.intf.can_term = constants.CanTerm.ON | ||
| output_session.intf.can_term = constants.CanTerm.ON | ||
|
|
||
| # Start the input session manually to make sure that the first | ||
| # signal values sent before the initial read will be received. | ||
| input_session.start() | ||
|
|
||
| # Generate a pair of random values and send out the signals. | ||
| output_values = [randint(0, 255), randint(0, 255)] | ||
| output_session.signals.write(output_values) | ||
| print('Sent signal values: {}'.format(output_values)) | ||
|
|
||
| # Wait 1 s and then read the received values. | ||
| # They should be the same as the ones sent. | ||
| time.sleep(1) | ||
|
|
||
| input_signals = input_session.signals.read() | ||
| input_values = [int(value) for timestamp, value in input_signals] | ||
| print('Received signal values: {}'.format(input_values)) | ||
|
|
||
|
|
||
| if __name__ == '__main__': | ||
| main() | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,110 @@ | ||
| from __future__ import absolute_import | ||
| from __future__ import division | ||
| from __future__ import print_function | ||
|
|
||
| from random import randint | ||
| import six | ||
| import time | ||
|
|
||
| import nixnet | ||
| from nixnet import constants | ||
| from nixnet import db | ||
|
|
||
|
|
||
| def main(): | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Each example
|
||
| database_name = ':memory:' | ||
| cluster_name = 'LIN_Cluster' | ||
| ecu_1_name = 'LIN_ECU_1' | ||
| ecu_2_name = 'LIN_ECU_2' | ||
| schedule_name = 'LIN_Schedule_1' | ||
| schedule_entry_name = 'LIN_Schedule_Entry' | ||
| frame_name = 'LIN_Frame' | ||
| signal_1_name = 'LIN_Signal_1' | ||
| signal_2_name = 'LIN_Signal_2' | ||
| signal_list = [signal_1_name, signal_2_name] | ||
| output_interface = 'LIN1' | ||
| input_interface = 'LIN2' | ||
|
|
||
| # Open the default in-memory database. | ||
| with db.Database(database_name) as database: | ||
|
|
||
| # Add a LIN cluster, a frame, and two signals to the database. | ||
| cluster = database.clusters.add(cluster_name) | ||
| cluster.protocol = constants.Protocol.LIN | ||
| cluster.baud_rate = 19200 | ||
| frame = cluster.frames.add(frame_name) | ||
| frame.id = 1 | ||
| frame.payload_len = 2 | ||
| signal_1 = frame.mux_static_signals.add(signal_1_name) | ||
| signal_1.byte_ordr = constants.SigByteOrdr.BIG_ENDIAN | ||
| signal_1.data_type = constants.SigDataType.UNSIGNED | ||
| signal_1.start_bit = 0 | ||
| signal_1.num_bits = 8 | ||
| signal_2 = frame.mux_static_signals.add(signal_2_name) | ||
| signal_2.byte_ordr = constants.SigByteOrdr.BIG_ENDIAN | ||
| signal_2.data_type = constants.SigDataType.UNSIGNED | ||
| signal_2.start_bit = 8 | ||
| signal_2.num_bits = 8 | ||
|
|
||
| # Add a LIN ECU and LIN Schedule to the cluster. | ||
| ecu_1 = cluster.ecus.add(ecu_1_name) | ||
| ecu_1.lin_protocol_ver = constants.LinProtocolVer.VER_2_2 | ||
| ecu_1.lin_master = True | ||
| ecu_2 = cluster.ecus.add(ecu_2_name) | ||
| ecu_2.lin_protocol_ver = constants.LinProtocolVer.VER_2_2 | ||
| ecu_2.lin_master = False | ||
| cluster.lin_tick = 0.01 | ||
| schedule = cluster.lin_schedules.add(schedule_name) | ||
| schedule.priority = 0 | ||
| schedule.run_mode = constants.LinSchedRunMode.CONTINUOUS | ||
| schedule_entry = schedule.entries.add(schedule_entry_name) | ||
| schedule_entry.delay = 1000.0 | ||
| schedule_entry.type = constants.LinSchedEntryType.UNCONDITIONAL | ||
| schedule_entry.frames = [frame] | ||
|
|
||
| # Use the database we just created, write and then read a pair of signals. | ||
| with nixnet.SignalOutSinglePointSession( | ||
| output_interface, | ||
| database_name, | ||
| cluster_name, | ||
| signal_list) as output_session: | ||
| with nixnet.SignalInSinglePointSession( | ||
| input_interface, | ||
| database_name, | ||
| cluster_name, | ||
| signal_list) as input_session: | ||
| terminated_cable = six.moves.input('Are you using a terminated cable (Y or N)? ') | ||
| if terminated_cable.lower() == "y": | ||
| input_session.intf.lin_term = constants.LinTerm.ON | ||
| output_session.intf.lin_term = constants.LinTerm.OFF | ||
| elif terminated_cable.lower() == "n": | ||
| input_session.intf.lin_term = constants.LinTerm.ON | ||
| output_session.intf.lin_term = constants.LinTerm.ON | ||
| else: | ||
| print("Unrecognised input ({}), assuming 'n'".format(terminated_cable)) | ||
| input_session.intf.lin_term = constants.LinTerm.ON | ||
| output_session.intf.lin_term = constants.LinTerm.ON | ||
|
|
||
| # Start the input session manually to make sure that the first | ||
| # signal values sent before the initial read will be received. | ||
| input_session.start() | ||
|
|
||
| # Set the schedule. This will also automatically enable master mode. | ||
| output_session.change_lin_schedule(0) | ||
|
|
||
| # Generate a pair of random values and send out the signals. | ||
| output_values = [randint(0, 255), randint(0, 255)] | ||
| output_session.signals.write(output_values) | ||
| print('Sent signal values: {}'.format(output_values)) | ||
|
|
||
| # Wait 1 s and then read the received values. | ||
| # They should be the same as the ones sent. | ||
| time.sleep(1) | ||
|
|
||
| input_signals = input_session.signals.read() | ||
| input_values = [int(value) for timestamp, value in input_signals] | ||
| print('Received signal values: {}'.format(input_values)) | ||
|
|
||
|
|
||
| if __name__ == '__main__': | ||
| main() | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops, copy/paste bug on my part. Sorry you had to deal with it