diff --git a/.gitignore b/.gitignore index 350b1ecd..65942020 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ .cache/ +.mypy_cache/ .tox/ junit/ __pycache__ diff --git a/CONTRIBUTING.rst b/CONTRIBUTING.rst index 7095e64b..0129bfa3 100644 --- a/CONTRIBUTING.rst +++ b/CONTRIBUTING.rst @@ -53,7 +53,7 @@ To run the tests:: $ # Unit tests: $ tox $ # Integration tests: - $ tox -c tox-integration.ini -- --can-in-interface CAN1 --can-out-interface CAN2 --lin-in-interface LIN1 --lin-out-interface CAN2 + $ tox -c tox-integration.ini -- --can-in-interface CAN1 --can-out-interface CAN2 --lin-in-interface LIN1 --lin-out-interface LIN2 $ # Integration tests (no LIN board): $ tox -c tox-integration.ini -- --can-in-interface CAN1 --can-out-interface CAN2 diff --git a/docs/examples.rst b/docs/examples.rst index 1db3145b..368c082a 100644 --- a/docs/examples.rst +++ b/docs/examples.rst @@ -14,3 +14,4 @@ Examples examples/conversion examples/can_lin_diff examples/programmatic_databases + examples/database_creation diff --git a/docs/examples/database_creation.rst b/docs/examples/database_creation.rst new file mode 100644 index 00000000..c7f6cd18 --- /dev/null +++ b/docs/examples/database_creation.rst @@ -0,0 +1,18 @@ +Database Creation Example +========================= + +This example programmatically modifies the in-memory database to +contain a cluster, a frame, and two signals. The database is then +used in a :any:`nixnet.session.SignalOutSinglePointSession` and +:any:`nixnet.session.SignalInSinglePointSession` to write and then +read a pair of signals. + +CAN Database Creation +--------------------- + +.. literalinclude:: ../../nixnet_examples/can_dynamic_database_creation.py + +LIN Database Creation +--------------------- + +.. literalinclude:: ../../nixnet_examples/lin_dynamic_database_creation.py diff --git a/nixnet/_cprops.py b/nixnet/_cprops.py index 483c8b2e..f102931c 100644 --- a/nixnet/_cprops.py +++ b/nixnet/_cprops.py @@ -391,7 +391,7 @@ def set_database_u8(ref, prop_id, value): def get_database_u8_array(ref, prop_id): # type: (int, int) -> typing.Iterable[int] - value_size = _funcs.nx_get_property_size(ref, prop_id) + value_size = _funcs.nxdb_get_property_size(ref, prop_id) elements = value_size // _ctypedefs.u8.BYTES ref_ctypes = _ctypedefs.nxDatabaseRef_t(ref) @@ -458,7 +458,7 @@ def set_database_u32(ref, prop_id, value): def get_database_u32_array(ref, prop_id): # type: (int, int) -> typing.Iterable[int] - value_size = _funcs.nx_get_property_size(ref, prop_id) + value_size = _funcs.nxdb_get_property_size(ref, prop_id) elements = value_size // _ctypedefs.u32.BYTES ref_ctypes = _ctypedefs.nxDatabaseRef_t(ref) @@ -556,7 +556,7 @@ def set_database_f64(ref, prop_id, value): def get_database_string(ref, prop_id): # type: (int, int) -> typing.Text - value_size = _funcs.nx_get_property_size(ref, prop_id) + value_size = _funcs.nxdb_get_property_size(ref, prop_id) ref_ctypes = _ctypedefs.nxDatabaseRef_t(ref) prop_id_ctypes = _ctypedefs.u32(prop_id) @@ -621,14 +621,14 @@ def set_database_ref(ref, prop_id, value): def get_database_ref_array_len(ref, prop_id): # type: (int, int) -> int - value_size = _funcs.nx_get_property_size(ref, prop_id) + value_size = _funcs.nxdb_get_property_size(ref, prop_id) elements = value_size // _ctypedefs.nxDatabaseRef_t.BYTES return elements def get_database_ref_array(ref, prop_id): # type: (int, int) -> typing.Iterable[int] - value_size = _funcs.nx_get_property_size(ref, prop_id) + value_size = _funcs.nxdb_get_property_size(ref, prop_id) elements = value_size // _ctypedefs.nxDatabaseRef_t.BYTES ref_ctypes = _ctypedefs.nxDatabaseRef_t(ref) diff --git a/nixnet/_enums.py b/nixnet/_enums.py index 79c3f6a7..050f6568 100644 --- a/nixnet/_enums.py +++ b/nixnet/_enums.py @@ -1539,6 +1539,14 @@ class LinLastErr(enum.Enum): CRC = _cconsts.NX_LIN_LAST_ERR_CODE_CRC +class LinProtocolVer(enum.Enum): + VER_1_2 = _cconsts.NX_LIN_PROTOCOL_VER_1_2 + VER_1_3 = _cconsts.NX_LIN_PROTOCOL_VER_1_3 + VER_2_0 = _cconsts.NX_LIN_PROTOCOL_VER_2_0 + VER_2_1 = _cconsts.NX_LIN_PROTOCOL_VER_2_1 + VER_2_2 = _cconsts.NX_LIN_PROTOCOL_VER_2_2 + + class Condition(enum.Enum): TRANSMIT_COMPLETE = _cconsts.NX_CONDITION_TRANSMIT_COMPLETE INTF_COMMUNICATING = _cconsts.NX_CONDITION_INTF_COMMUNICATING diff --git a/nixnet/db/_ecu.py b/nixnet/db/_ecu.py index 645551c6..c6551f3c 100644 --- a/nixnet/db/_ecu.py +++ b/nixnet/db/_ecu.py @@ -3,6 +3,7 @@ from __future__ import print_function from nixnet import _props +from nixnet import constants class Ecu(object): @@ -111,11 +112,13 @@ def lin_master(self, value): @property def lin_protocol_ver(self): - return _props.get_ecu_lin_protocol_ver(self._handle) + # type: () -> constants.LinProtocolVer + return constants.LinProtocolVer(_props.get_ecu_lin_protocol_ver(self._handle)) @lin_protocol_ver.setter def lin_protocol_ver(self, value): - _props.set_ecu_lin_protocol_ver(self._handle, value) + # type: (constants.LinProtocolVer) -> None + _props.set_ecu_lin_protocol_ver(self._handle, value.value) @property def lin_initial_nad(self): diff --git a/nixnet/db/_linsched_entry.py b/nixnet/db/_linsched_entry.py index 936351f4..820c48ab 100644 --- a/nixnet/db/_linsched_entry.py +++ b/nixnet/db/_linsched_entry.py @@ -2,8 +2,11 @@ from __future__ import division from __future__ import print_function +import typing # NOQA: F401 + from nixnet import _props from nixnet import constants +from nixnet.db import _frame class LinSchedEntry(object): @@ -56,11 +59,15 @@ def event_id(self, value): @property def frames(self): - return _props.get_lin_sched_entry_frames(self._handle) + # type: () -> typing.Iterable[_frame.Frame] + for ref in _props.get_lin_sched_entry_frames(self._handle): + yield _frame.Frame(ref) @frames.setter def frames(self, value): - _props.set_lin_sched_entry_frames(self._handle, value) + # type: (typing.Iterable[_frame.Frame]) -> None + frame_handles = [frame._handle for frame in value] + _props.set_lin_sched_entry_frames(self._handle, frame_handles) @property def name(self): diff --git a/nixnet/db/database.py b/nixnet/db/database.py index e8fcfae2..c17175f1 100644 --- a/nixnet/db/database.py +++ b/nixnet/db/database.py @@ -75,7 +75,7 @@ def name(self): return _props.get_database_name(self._handle) @property - def cluster(self): + def clusters(self): return self._clusters @property diff --git a/nixnet_examples/can_dynamic_database_creation.py b/nixnet_examples/can_dynamic_database_creation.py new file mode 100644 index 00000000..05d3d57e --- /dev/null +++ b/nixnet_examples/can_dynamic_database_creation.py @@ -0,0 +1,87 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +from random import randint +import six +import time + +import nixnet +from nixnet import constants +from nixnet import db + + +def main(): + database_name = ':memory:' + cluster_name = 'CAN_Cluster' + frame_name = 'CAN_Event_Frame' + signal_1_name = 'CAN_Event_Signal_1' + signal_2_name = 'CAN_Event_Signal_2' + signal_list = [signal_1_name, signal_2_name] + output_interface = 'CAN1' + input_interface = 'CAN2' + + # Open the default in-memory database. + with db.Database(database_name) as database: + + # Add a CAN cluster, a frame, and two signals to the database. + cluster = database.clusters.add(cluster_name) + cluster.protocol = constants.Protocol.CAN + cluster.baud_rate = 125000 + frame = cluster.frames.add(frame_name) + frame.id = 1 + frame.payload_len = 2 + signal_1 = frame.mux_static_signals.add(signal_1_name) + signal_1.byte_ordr = constants.SigByteOrdr.BIG_ENDIAN + signal_1.data_type = constants.SigDataType.UNSIGNED + signal_1.start_bit = 0 + signal_1.num_bits = 8 + signal_2 = frame.mux_static_signals.add(signal_2_name) + signal_2.byte_ordr = constants.SigByteOrdr.BIG_ENDIAN + signal_2.data_type = constants.SigDataType.UNSIGNED + signal_2.start_bit = 8 + signal_2.num_bits = 8 + + # Use the database we just created, write and then read a pair of signals. + with nixnet.SignalOutSinglePointSession( + output_interface, + database_name, + cluster_name, + signal_list) as output_session: + with nixnet.SignalInSinglePointSession( + input_interface, + database_name, + cluster_name, + signal_list) as input_session: + terminated_cable = six.moves.input('Are you using a terminated cable (Y or N)? ') + if terminated_cable.lower() == "y": + input_session.intf.can_term = constants.CanTerm.ON + output_session.intf.can_term = constants.CanTerm.OFF + elif terminated_cable.lower() == "n": + input_session.intf.can_term = constants.CanTerm.ON + output_session.intf.can_term = constants.CanTerm.ON + else: + print("Unrecognised input ({}), assuming 'n'".format(terminated_cable)) + input_session.intf.can_term = constants.CanTerm.ON + output_session.intf.can_term = constants.CanTerm.ON + + # Start the input session manually to make sure that the first + # signal values sent before the initial read will be received. + input_session.start() + + # Generate a pair of random values and send out the signals. + output_values = [randint(0, 255), randint(0, 255)] + output_session.signals.write(output_values) + print('Sent signal values: {}'.format(output_values)) + + # Wait 1 s and then read the received values. + # They should be the same as the ones sent. + time.sleep(1) + + input_signals = input_session.signals.read() + input_values = [int(value) for timestamp, value in input_signals] + print('Received signal values: {}'.format(input_values)) + + +if __name__ == '__main__': + main() diff --git a/nixnet_examples/lin_dynamic_database_creation.py b/nixnet_examples/lin_dynamic_database_creation.py new file mode 100644 index 00000000..b3002bea --- /dev/null +++ b/nixnet_examples/lin_dynamic_database_creation.py @@ -0,0 +1,110 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +from random import randint +import six +import time + +import nixnet +from nixnet import constants +from nixnet import db + + +def main(): + database_name = ':memory:' + cluster_name = 'LIN_Cluster' + ecu_1_name = 'LIN_ECU_1' + ecu_2_name = 'LIN_ECU_2' + schedule_name = 'LIN_Schedule_1' + schedule_entry_name = 'LIN_Schedule_Entry' + frame_name = 'LIN_Frame' + signal_1_name = 'LIN_Signal_1' + signal_2_name = 'LIN_Signal_2' + signal_list = [signal_1_name, signal_2_name] + output_interface = 'LIN1' + input_interface = 'LIN2' + + # Open the default in-memory database. + with db.Database(database_name) as database: + + # Add a LIN cluster, a frame, and two signals to the database. + cluster = database.clusters.add(cluster_name) + cluster.protocol = constants.Protocol.LIN + cluster.baud_rate = 19200 + frame = cluster.frames.add(frame_name) + frame.id = 1 + frame.payload_len = 2 + signal_1 = frame.mux_static_signals.add(signal_1_name) + signal_1.byte_ordr = constants.SigByteOrdr.BIG_ENDIAN + signal_1.data_type = constants.SigDataType.UNSIGNED + signal_1.start_bit = 0 + signal_1.num_bits = 8 + signal_2 = frame.mux_static_signals.add(signal_2_name) + signal_2.byte_ordr = constants.SigByteOrdr.BIG_ENDIAN + signal_2.data_type = constants.SigDataType.UNSIGNED + signal_2.start_bit = 8 + signal_2.num_bits = 8 + + # Add a LIN ECU and LIN Schedule to the cluster. + ecu_1 = cluster.ecus.add(ecu_1_name) + ecu_1.lin_protocol_ver = constants.LinProtocolVer.VER_2_2 + ecu_1.lin_master = True + ecu_2 = cluster.ecus.add(ecu_2_name) + ecu_2.lin_protocol_ver = constants.LinProtocolVer.VER_2_2 + ecu_2.lin_master = False + cluster.lin_tick = 0.01 + schedule = cluster.lin_schedules.add(schedule_name) + schedule.priority = 0 + schedule.run_mode = constants.LinSchedRunMode.CONTINUOUS + schedule_entry = schedule.entries.add(schedule_entry_name) + schedule_entry.delay = 1000.0 + schedule_entry.type = constants.LinSchedEntryType.UNCONDITIONAL + schedule_entry.frames = [frame] + + # Use the database we just created, write and then read a pair of signals. + with nixnet.SignalOutSinglePointSession( + output_interface, + database_name, + cluster_name, + signal_list) as output_session: + with nixnet.SignalInSinglePointSession( + input_interface, + database_name, + cluster_name, + signal_list) as input_session: + terminated_cable = six.moves.input('Are you using a terminated cable (Y or N)? ') + if terminated_cable.lower() == "y": + input_session.intf.lin_term = constants.LinTerm.ON + output_session.intf.lin_term = constants.LinTerm.OFF + elif terminated_cable.lower() == "n": + input_session.intf.lin_term = constants.LinTerm.ON + output_session.intf.lin_term = constants.LinTerm.ON + else: + print("Unrecognised input ({}), assuming 'n'".format(terminated_cable)) + input_session.intf.lin_term = constants.LinTerm.ON + output_session.intf.lin_term = constants.LinTerm.ON + + # Start the input session manually to make sure that the first + # signal values sent before the initial read will be received. + input_session.start() + + # Set the schedule. This will also automatically enable master mode. + output_session.change_lin_schedule(0) + + # Generate a pair of random values and send out the signals. + output_values = [randint(0, 255), randint(0, 255)] + output_session.signals.write(output_values) + print('Sent signal values: {}'.format(output_values)) + + # Wait 1 s and then read the received values. + # They should be the same as the ones sent. + time.sleep(1) + + input_signals = input_session.signals.read() + input_values = [int(value) for timestamp, value in input_signals] + print('Received signal values: {}'.format(input_values)) + + +if __name__ == '__main__': + main() diff --git a/nixnet_examples/lin_frame_stream_io.py b/nixnet_examples/lin_frame_stream_io.py index 08d9e423..cd296316 100644 --- a/nixnet_examples/lin_frame_stream_io.py +++ b/nixnet_examples/lin_frame_stream_io.py @@ -36,8 +36,7 @@ def main(): # frame value sent before the initial read will be received. input_session.start() - # Set the schedule. This will also automically enable - # master mode + # Set the schedule. This will also automatically enable master mode. output_session.change_lin_schedule(0) user_value = six.moves.input('Enter payload [int, int]: ') diff --git a/tests/test_examples.py b/tests/test_examples.py index 5a36ffd3..266e096b 100644 --- a/tests/test_examples.py +++ b/tests/test_examples.py @@ -10,10 +10,12 @@ from nixnet import _cfuncs from nixnet import _ctypedefs +from nixnet_examples import can_dynamic_database_creation from nixnet_examples import can_frame_queued_io from nixnet_examples import can_frame_stream_io from nixnet_examples import can_signal_conversion from nixnet_examples import can_signal_single_point_io +from nixnet_examples import lin_dynamic_database_creation from nixnet_examples import lin_frame_stream_io from nixnet_examples import programmatic_database_usage @@ -33,9 +35,13 @@ MockXnetLibrary.nx_stop.return_value = _ctypedefs.u32(0) MockXnetLibrary.nx_clear.return_value = _ctypedefs.u32(0) MockXnetLibrary.nx_system_open.return_value = _ctypedefs.u32(0) +MockXnetLibrary.nx_system_close.return_value = _ctypedefs.u32(0) MockXnetLibrary.nxdb_add_alias64.return_value = _ctypedefs.u32(0) MockXnetLibrary.nxdb_remove_alias.return_value = _ctypedefs.u32(0) -MockXnetLibrary.nx_system_close.return_value = _ctypedefs.u32(0) +MockXnetLibrary.nxdb_open_database.return_value = _ctypedefs.u32(0) +MockXnetLibrary.nxdb_close_database.return_value = _ctypedefs.u32(0) +MockXnetLibrary.nxdb_create_object.return_value = _ctypedefs.u32(0) +MockXnetLibrary.nxdb_set_property.return_value = _ctypedefs.u32(0) def six_input(queue): @@ -50,6 +56,18 @@ def _six_input(prompt=""): return _six_input +@pytest.mark.parametrize("input_values", [ + ['y'], + ['n'], + ['invalid'], +]) +@mock.patch('nixnet._cfuncs.lib', MockXnetLibrary) +@mock.patch('time.sleep', lambda time: None) +def test_can_dynamic_database_creation(input_values): + with mock.patch('six.moves.input', six_input(input_values)): + can_dynamic_database_creation.main() + + @pytest.mark.parametrize("input_values", [ ['y', '1, 2, 3', 'q'], ['n', '1, 2, 3', 'q'], @@ -94,6 +112,18 @@ def test_can_signal_single_point_empty_session(input_values): can_signal_single_point_io.main() +@pytest.mark.parametrize("input_values", [ + ['y'], + ['n'], + ['invalid'], +]) +@mock.patch('nixnet._cfuncs.lib', MockXnetLibrary) +@mock.patch('time.sleep', lambda time: None) +def test_lin_dynamic_database_creation(input_values): + with mock.patch('six.moves.input', six_input(input_values)): + lin_dynamic_database_creation.main() + + @pytest.mark.parametrize("input_values", [ ['1, 2'], ['1'],