From f29cc14819506fd3a13aa3992964f523ef3275ff Mon Sep 17 00:00:00 2001 From: Pavel Kirienko Date: Fri, 18 Aug 2023 21:49:36 +0300 Subject: [PATCH 1/4] Add 2e2 tests and restructure the CMake project --- libudpard/.clang-tidy => .clang-tidy | 0 .github/workflows/main.yml | 4 +- {tests/.idea => .idea}/dictionaries/pavel.xml | 0 CMakeLists.txt | 29 + libudpard/.clang-format | 96 --- libudpard/udpard.h | 6 + tests/.clang-tidy | 1 + tests/CMakeLists.txt | 19 +- tests/src/test_e2e.cpp | 596 +++++++++++++++++- 9 files changed, 636 insertions(+), 115 deletions(-) rename libudpard/.clang-tidy => .clang-tidy (100%) rename {tests/.idea => .idea}/dictionaries/pavel.xml (100%) create mode 100644 CMakeLists.txt delete mode 100644 libudpard/.clang-format diff --git a/libudpard/.clang-tidy b/.clang-tidy similarity index 100% rename from libudpard/.clang-tidy rename to .clang-tidy diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 2c7b1b6..3dd5149 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -28,7 +28,7 @@ jobs: -DCMAKE_BUILD_TYPE=Debug -DCMAKE_C_COMPILER=${{ matrix.c-compiler }} -DCMAKE_CXX_COMPILER=${{ matrix.cxx-compiler }} - tests + . # language=bash - run: | cd ${{ github.workspace }}/build @@ -72,7 +72,7 @@ jobs: -DCMAKE_CXX_COMPILER=${{ matrix.cxx-compiler }} -DCMAKE_CXX_FLAGS="${{ matrix.cxx-flags }}" -DNO_STATIC_ANALYSIS=1 - tests + . # language=bash - run: | cd ${{ github.workspace }}/build diff --git a/tests/.idea/dictionaries/pavel.xml b/.idea/dictionaries/pavel.xml similarity index 100% rename from tests/.idea/dictionaries/pavel.xml rename to .idea/dictionaries/pavel.xml diff --git a/CMakeLists.txt b/CMakeLists.txt new file mode 100644 index 0000000..68d7b54 --- /dev/null +++ b/CMakeLists.txt @@ -0,0 +1,29 @@ +# This software is distributed under the terms of the MIT License. +# Copyright (C) OpenCyphal Development Team +# Copyright Amazon.com Inc. or its affiliates. +# SPDX-License-Identifier: MIT +# Author: Pavel Kirienko +# +# This file is only needed for library development and testing. It is not needed to use the library in your project; +# instead, users should integrate the library by copying its source files. + +cmake_minimum_required(VERSION 3.20) + +project(udpard) +enable_testing() + +# Shared Clang-Format target for all subprojects. +find_program(clang_format NAMES clang-format) +if (NOT clang_format) + message(STATUS "Could not locate clang-format") +else () + file(GLOB_RECURSE format_files + ${CMAKE_CURRENT_SOURCE_DIR}/demo/*.[ch] + ${CMAKE_CURRENT_SOURCE_DIR}/libudpard/*.[ch] + ${CMAKE_CURRENT_SOURCE_DIR}/tests/*.[ch] + ${CMAKE_CURRENT_SOURCE_DIR}/tests/*.[ch]pp) + message(STATUS "Using clang-format: ${clang_format}; files: ${format_files}") + add_custom_target(format COMMAND ${clang_format} -i -fallback-style=none -style=file --verbose ${format_files}) +endif () + +add_subdirectory(${CMAKE_CURRENT_SOURCE_DIR}/tests) diff --git a/libudpard/.clang-format b/libudpard/.clang-format deleted file mode 100644 index d6b6f78..0000000 --- a/libudpard/.clang-format +++ /dev/null @@ -1,96 +0,0 @@ ---- -Language: Cpp -# BasedOnStyle: LLVM -AccessModifierOffset: -4 -AlignAfterOpenBracket: Align -AlignConsecutiveAssignments: true -AlignConsecutiveDeclarations: true -AlignEscapedNewlines: Left -AlignOperands: true -AlignTrailingComments: true -AllowAllParametersOfDeclarationOnNextLine: false -AllowShortBlocksOnASingleLine: false -AllowShortCaseLabelsOnASingleLine: false -AllowShortFunctionsOnASingleLine: Inline -AllowShortIfStatementsOnASingleLine: Never -AllowShortLoopsOnASingleLine: false -AlwaysBreakAfterDefinitionReturnType: None -AlwaysBreakAfterReturnType: None -AlwaysBreakBeforeMultilineStrings: false -AlwaysBreakTemplateDeclarations: Yes -BinPackArguments: false -BinPackParameters: false -BraceWrapping: - AfterCaseLabel: true - AfterClass: true - AfterControlStatement: true - AfterEnum: true - AfterFunction: true - AfterNamespace: true - AfterStruct: true - AfterUnion: true - BeforeCatch: true - BeforeElse: true - IndentBraces: false - SplitEmptyFunction: false - SplitEmptyRecord: false - SplitEmptyNamespace: false - AfterExternBlock: false # Keeps the contents un-indented. -BreakBeforeBinaryOperators: None -BreakBeforeBraces: Custom -BreakBeforeTernaryOperators: true -BreakConstructorInitializers: AfterColon -# BreakInheritanceList: AfterColon -BreakStringLiterals: true -ColumnLimit: 120 -CommentPragmas: '^ (coverity|pragma:)' -CompactNamespaces: false -ConstructorInitializerAllOnOneLineOrOnePerLine: true -ConstructorInitializerIndentWidth: 4 -ContinuationIndentWidth: 4 -Cpp11BracedListStyle: true -DerivePointerAlignment: false -DisableFormat: false -ExperimentalAutoDetectBinPacking: false -FixNamespaceComments: true -ForEachMacros: [ foreach, Q_FOREACH, BOOST_FOREACH ] -IncludeBlocks: Preserve -IndentCaseLabels: false -IndentPPDirectives: AfterHash -IndentWidth: 4 -IndentWrappedFunctionNames: false -KeepEmptyLinesAtTheStartOfBlocks: false -MacroBlockBegin: '' -MacroBlockEnd: '' -MaxEmptyLinesToKeep: 1 -NamespaceIndentation: None -PenaltyBreakAssignment: 2 -PenaltyBreakBeforeFirstCallParameter: 10000 # Raised intentionally; prefer breaking all -PenaltyBreakComment: 300 -PenaltyBreakFirstLessLess: 120 -PenaltyBreakString: 1000 -PenaltyExcessCharacter: 1000000 -PenaltyReturnTypeOnItsOwnLine: 10000 # Raised intentionally because it hurts readability -PointerAlignment: Left -ReflowComments: true -SortIncludes: false -SortUsingDeclarations: false -SpaceAfterCStyleCast: true -SpaceAfterTemplateKeyword: true -SpaceBeforeAssignmentOperators: true -SpaceBeforeCpp11BracedList: false -SpaceBeforeInheritanceColon: true -SpaceBeforeParens: ControlStatements -SpaceBeforeCtorInitializerColon: true -SpaceBeforeRangeBasedForLoopColon: true -SpaceInEmptyParentheses: false -SpacesBeforeTrailingComments: 2 -SpacesInAngles: false -SpacesInCStyleCastParentheses: false -SpacesInContainerLiterals: false -SpacesInParentheses: false -SpacesInSquareBrackets: false -Standard: Cpp11 -TabWidth: 8 -UseTab: Never -... diff --git a/libudpard/udpard.h b/libudpard/udpard.h index 1b22a10..3943c35 100644 --- a/libudpard/udpard.h +++ b/libudpard/udpard.h @@ -523,6 +523,12 @@ int_fast8_t udpardTxInit(struct UdpardTx* const self, /// The library itself, however, does not use or check this value in any way, so it can be zero if not needed /// (this is not recommended for real-time systems). /// +/// Note that due to the priority ordering, transient transfer loss may occur if the user increases the priority +/// level on a given port. This is because the frames of the new transfer will be enqueued before the frames of +/// the previous transfer, so the frames of the previous transfer will be transmitted only after the frames of +/// the new transfer are transmitted, causing the receiver to discard them as duplicates due to their lower transfer-ID. +/// It is therefore not recommended to change the priority level dynamically. +/// /// The function returns the number of UDP datagrams enqueued, which is always a positive number, in case of success. /// In case of failure, the function returns a negated error code. /// diff --git a/tests/.clang-tidy b/tests/.clang-tidy index 7438077..e17f51c 100644 --- a/tests/.clang-tidy +++ b/tests/.clang-tidy @@ -1,6 +1,7 @@ --- # The tests are held to somewhat lower quality standards than production code. # The tests are also written in somewhat non-idiomatic C++ because they are tightly related to the C codebase. +InheritParentConfig: true Checks: >- boost-*, bugprone-*, diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 51583ba..8f90e22 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -1,5 +1,7 @@ # This software is distributed under the terms of the MIT License. -# Copyright (c) OpenCyphal. +# Copyright (C) OpenCyphal Development Team +# Copyright Amazon.com Inc. or its affiliates. +# SPDX-License-Identifier: MIT # Author: Pavel Kirienko cmake_minimum_required(VERSION 3.12) @@ -14,7 +16,7 @@ enable_testing() set(CTEST_OUTPUT_ON_FAILURE ON) set(NO_STATIC_ANALYSIS OFF CACHE BOOL "disable udpard static analysis") -set(library_dir "${CMAKE_SOURCE_DIR}/../libudpard") +set(library_dir "${CMAKE_CURRENT_SOURCE_DIR}/../libudpard") set(unity_root "${CMAKE_CURRENT_SOURCE_DIR}/../submodules/unity") # Use -DNO_STATIC_ANALYSIS=1 to suppress static analysis. @@ -28,19 +30,6 @@ if (NOT NO_STATIC_ANALYSIS) message(STATUS "Using clang-tidy: ${clang_tidy}") endif () -# clang-format -find_program(clang_format NAMES clang-format) -if (NOT clang_format) - message(STATUS "Could not locate clang-format") -else () - file(GLOB_RECURSE format_files - ${library_dir}/*.[ch] - ${CMAKE_SOURCE_DIR}/src/*.[ch] - ${CMAKE_SOURCE_DIR}/src/*.[ch]pp) - message(STATUS "Using clang-format: ${clang_format}; files: ${format_files}") - add_custom_target(format COMMAND ${clang_format} -i -fallback-style=none -style=file --verbose ${format_files}) -endif () - function(gen_test name files compile_definitions compile_flags link_flags c_standard) # Unity add_library("${name}_unity" STATIC "${unity_root}/src/unity.c") diff --git a/tests/src/test_e2e.cpp b/tests/src/test_e2e.cpp index d919958..8b38dab 100644 --- a/tests/src/test_e2e.cpp +++ b/tests/src/test_e2e.cpp @@ -3,12 +3,603 @@ /// Copyright Amazon.com Inc. or its affiliates. /// SPDX-License-Identifier: MIT +#include +#include "helpers.h" #include +#include +#include +#include +#include namespace { -// Here be dragons. +UdpardPayload makePayload(const std::string_view& payload) +{ + return {.size = payload.size(), .data = payload.data()}; +} + +/// A wrapper over udpardRxSubscriptionReceive() that copies the datagram payload into a newly allocated buffer. +[[nodiscard]] int_fast8_t rxSubscriptionReceive(UdpardRxSubscription* const self, + InstrumentedAllocator& payload_memory, + const UdpardMicrosecond timestamp_usec, + const UdpardMutablePayload datagram_payload, + const uint_fast8_t redundant_iface_index, + UdpardRxTransfer* const out_transfer) +{ + return udpardRxSubscriptionReceive(self, + timestamp_usec, + { + .size = datagram_payload.size, + .data = std::memmove(instrumentedAllocatorAllocate(&payload_memory, + datagram_payload.size), + datagram_payload.data, + datagram_payload.size), + }, + redundant_iface_index, + out_transfer); +} + +/// A wrapper over udpardRxRPCDispatcherReceive() that copies the datagram payload into a newly allocated buffer. +[[nodiscard]] int_fast8_t rxRPCDispatcherReceive(UdpardRxRPCDispatcher* const self, + InstrumentedAllocator& payload_memory, + const UdpardMicrosecond timestamp_usec, + const UdpardMutablePayload datagram_payload, + const uint_fast8_t redundant_iface_index, + UdpardRxRPCPort** const out_port, + UdpardRxRPCTransfer* const out_transfer) +{ + return udpardRxRPCDispatcherReceive(self, + timestamp_usec, + { + .size = datagram_payload.size, + .data = std::memmove(instrumentedAllocatorAllocate(&payload_memory, + datagram_payload.size), + datagram_payload.data, + datagram_payload.size), + }, + redundant_iface_index, + out_port, + out_transfer); +} + +void testPubSub() +{ + InstrumentedAllocator alloc_tx; + InstrumentedAllocator alloc_rx_session; + InstrumentedAllocator alloc_rx_fragment; + InstrumentedAllocator alloc_rx_payload; + instrumentedAllocatorNew(&alloc_tx); + instrumentedAllocatorNew(&alloc_rx_session); + instrumentedAllocatorNew(&alloc_rx_fragment); + instrumentedAllocatorNew(&alloc_rx_payload); + const auto mem_tx = instrumentedAllocatorMakeMemoryResource(&alloc_tx); + const UdpardRxMemoryResources mem_rx{ + .session = instrumentedAllocatorMakeMemoryResource(&alloc_rx_session), + .fragment = instrumentedAllocatorMakeMemoryResource(&alloc_rx_fragment), + .payload = instrumentedAllocatorMakeMemoryDeleter(&alloc_rx_payload), + }; + // Initialize the TX pipeline. Set the MTU to a low value to ensure that we test multi-frame transfers. + UdpardTx tx{}; + UdpardNodeID node_id = UDPARD_NODE_ID_UNSET; + TEST_ASSERT_EQUAL(0, udpardTxInit(&tx, &node_id, 7, mem_tx)); + tx.mtu = 100; + for (auto i = 0U; i <= UDPARD_PRIORITY_MAX; i++) + { + tx.dscp_value_per_priority[i] = static_cast(0xA0U + i); + } + // Initialize the subscriptions. + std::array sub{}; + TEST_ASSERT_EQUAL(0, udpardRxSubscriptionInit(&sub.at(0), 5000, 300, mem_rx)); + TEST_ASSERT_EQUAL(0, udpardRxSubscriptionInit(&sub.at(1), 5001, 200, mem_rx)); + TEST_ASSERT_EQUAL(0, udpardRxSubscriptionInit(&sub.at(2), 5002, 100, mem_rx)); + + // Publish something on subject 5000. + std::array transfer_id{}; + TEST_ASSERT_EQUAL(1, // Single-frame anonymous = success. + udpardTxPublish(&tx, + 10'000'000, + UdpardPrioritySlow, + 5000, + &transfer_id.at(0), + makePayload("Last night, I had a dream."), + nullptr)); + const std::string_view Eden = + "After speaking with Scott, Lan Xi halted his busy work amid chaotic feelings, and stopped to think, as the " + "colonel had advised. Faster than he had imagined, Eden's cold, slippery vipers crawled into his " + "consciousness. He found the fruit of knowledge and ate it, and the last rays of sunshine in his soul " + "disappeared forever as everything plunged into darkness."; + TEST_ASSERT_EQUAL(-UDPARD_ERROR_ANONYMOUS, + udpardTxPublish(&tx, + 10'001'000, + UdpardPriorityNominal, + 5000, + &transfer_id.at(0), + makePayload(Eden), + nullptr)); + node_id = 42; // Change the node-ID to allow multi-frame transfers, then try again. + TEST_ASSERT_EQUAL(4, + udpardTxPublish(&tx, + 10'002'000, + UdpardPriorityOptional, + 5000, + &transfer_id.at(0), + makePayload(Eden), + nullptr)); + TEST_ASSERT_EQUAL(5, tx.queue_size); + + // Publish something on subject 5001. The priority here is higher so it should be delivered earlier. + node_id = 43; // Change the node-ID. + const std::string_view Later = "Two days later, the captain of Ultimate Law committed suicide."; + TEST_ASSERT_EQUAL(1, + udpardTxPublish(&tx, + 10'003'000, + UdpardPriorityNominal, + 5001, + &transfer_id.at(1), + makePayload(Later), + nullptr)); + TEST_ASSERT_EQUAL(6, tx.queue_size); + + // Publish something on subject 5002. The priority here is the same. + const std::string_view Dark = "'Dark. It's so fucking dark,' the captain murmured, and then shot himself."; + TEST_ASSERT_EQUAL(1, + udpardTxPublish(&tx, + 10'004'000, + UdpardPriorityNominal, + 5002, + &transfer_id.at(2), + makePayload(Dark), + nullptr)); + TEST_ASSERT_EQUAL(7, tx.queue_size); + TEST_ASSERT_EQUAL(7, alloc_tx.allocated_fragments); + + // Transmit the enqueued frames by pushing them into the subscribers. + // Here we pop the frames one by one ensuring that they come out in the correct order. + UdpardRxTransfer transfer{}; + // First transfer. + TEST_ASSERT_EQUAL(0, alloc_rx_session.allocated_fragments); + TEST_ASSERT_EQUAL(0, alloc_rx_fragment.allocated_fragments); + TEST_ASSERT_EQUAL(0, alloc_rx_payload.allocated_fragments); + const UdpardTxItem* tx_item = udpardTxPeek(&tx); + TEST_ASSERT_NOT_NULL(tx_item); + TEST_ASSERT_EQUAL(sub.at(1).udp_ip_endpoint.ip_address, tx_item->destination.ip_address); + TEST_ASSERT_NULL(tx_item->next_in_transfer); + TEST_ASSERT_EQUAL(10'003'000, tx_item->deadline_usec); + TEST_ASSERT_EQUAL(0xA4, tx_item->dscp); + TEST_ASSERT_EQUAL(1, + rxSubscriptionReceive(&sub.at(1), + alloc_rx_payload, + 10'005'000, + tx_item->datagram_payload, + 0, + &transfer)); + TEST_ASSERT_EQUAL(1, alloc_rx_session.allocated_fragments); + TEST_ASSERT_EQUAL(0, alloc_rx_fragment.allocated_fragments); + TEST_ASSERT_EQUAL(1, alloc_rx_payload.allocated_fragments); + // Check the received transfer. + TEST_ASSERT_EQUAL(10'005'000, transfer.timestamp_usec); + TEST_ASSERT_EQUAL(UdpardPriorityNominal, transfer.priority); + TEST_ASSERT_EQUAL(43, transfer.source_node_id); + TEST_ASSERT_EQUAL(0, transfer.transfer_id); + TEST_ASSERT_EQUAL(Later.size(), transfer.payload_size); + TEST_ASSERT_EQUAL(Later.size(), transfer.payload.view.size); + TEST_ASSERT_EQUAL_MEMORY(Later.data(), transfer.payload.view.data, transfer.payload.view.size); + TEST_ASSERT_NULL(transfer.payload.next); + // Free the transfer payload. + udpardRxFragmentFree(transfer.payload, mem_rx.fragment, mem_rx.payload); + TEST_ASSERT_EQUAL(1, alloc_rx_session.allocated_fragments); + TEST_ASSERT_EQUAL(0, alloc_rx_fragment.allocated_fragments); + TEST_ASSERT_EQUAL(0, alloc_rx_payload.allocated_fragments); + // Send duplicates. + TEST_ASSERT_EQUAL(0, // Duplicate on same iface. + rxSubscriptionReceive(&sub.at(1), + alloc_rx_payload, + 10'005'100, + tx_item->datagram_payload, + 0, + &transfer)); + TEST_ASSERT_EQUAL(0, // Duplicate on another iface. + rxSubscriptionReceive(&sub.at(1), + alloc_rx_payload, + 10'005'200, + tx_item->datagram_payload, + 1, + &transfer)); + // Ensure the duplicates do no alter memory usage. + TEST_ASSERT_EQUAL(1, alloc_rx_session.allocated_fragments); + TEST_ASSERT_EQUAL(0, alloc_rx_fragment.allocated_fragments); + TEST_ASSERT_EQUAL(0, alloc_rx_payload.allocated_fragments); + // Free the TX item. + udpardTxFree(mem_tx, udpardTxPop(&tx, tx_item)); + TEST_ASSERT_EQUAL(6, alloc_tx.allocated_fragments); + + // Second transfer. + tx_item = udpardTxPeek(&tx); + TEST_ASSERT_NOT_NULL(tx_item); + TEST_ASSERT_EQUAL(sub.at(2).udp_ip_endpoint.ip_address, tx_item->destination.ip_address); + TEST_ASSERT_NULL(tx_item->next_in_transfer); + TEST_ASSERT_EQUAL(10'004'000, tx_item->deadline_usec); + TEST_ASSERT_EQUAL(0xA4, tx_item->dscp); + TEST_ASSERT_EQUAL(1, + rxSubscriptionReceive(&sub.at(2), + alloc_rx_payload, + 10'006'000, + tx_item->datagram_payload, + 1, + &transfer)); + TEST_ASSERT_EQUAL(2, alloc_rx_session.allocated_fragments); + TEST_ASSERT_EQUAL(0, alloc_rx_fragment.allocated_fragments); + TEST_ASSERT_EQUAL(1, alloc_rx_payload.allocated_fragments); + // Check the received transfer. + TEST_ASSERT_EQUAL(10'006'000, transfer.timestamp_usec); + TEST_ASSERT_EQUAL(UdpardPriorityNominal, transfer.priority); + TEST_ASSERT_EQUAL(43, transfer.source_node_id); + TEST_ASSERT_EQUAL(0, transfer.transfer_id); + TEST_ASSERT_EQUAL(Dark.size(), transfer.payload_size); + TEST_ASSERT_EQUAL(Dark.size(), transfer.payload.view.size); + TEST_ASSERT_EQUAL_MEMORY(Dark.data(), transfer.payload.view.data, transfer.payload.view.size); + TEST_ASSERT_NULL(transfer.payload.next); + // Free the transfer payload. + udpardRxFragmentFree(transfer.payload, mem_rx.fragment, mem_rx.payload); + TEST_ASSERT_EQUAL(2, alloc_rx_session.allocated_fragments); + TEST_ASSERT_EQUAL(0, alloc_rx_fragment.allocated_fragments); + TEST_ASSERT_EQUAL(0, alloc_rx_payload.allocated_fragments); + // Free the TX item. + udpardTxFree(mem_tx, udpardTxPop(&tx, tx_item)); + TEST_ASSERT_EQUAL(5, alloc_tx.allocated_fragments); + + // Third transfer. This one is anonymous. + tx_item = udpardTxPeek(&tx); + TEST_ASSERT_NOT_NULL(tx_item); + TEST_ASSERT_EQUAL(sub.at(0).udp_ip_endpoint.ip_address, tx_item->destination.ip_address); + TEST_ASSERT_NULL(tx_item->next_in_transfer); + TEST_ASSERT_EQUAL(10'000'000, tx_item->deadline_usec); + TEST_ASSERT_EQUAL(0xA6, tx_item->dscp); + TEST_ASSERT_EQUAL(1, + rxSubscriptionReceive(&sub.at(0), + alloc_rx_payload, + 10'007'000, + tx_item->datagram_payload, + 2, + &transfer)); + TEST_ASSERT_EQUAL(2, alloc_rx_session.allocated_fragments); // No increment, anonymous transfers are stateless. + TEST_ASSERT_EQUAL(0, alloc_rx_fragment.allocated_fragments); + TEST_ASSERT_EQUAL(1, alloc_rx_payload.allocated_fragments); + // Check the received transfer. + TEST_ASSERT_EQUAL(10'007'000, transfer.timestamp_usec); + TEST_ASSERT_EQUAL(UdpardPrioritySlow, transfer.priority); + TEST_ASSERT_EQUAL(UDPARD_NODE_ID_UNSET, transfer.source_node_id); + TEST_ASSERT_EQUAL(0, transfer.transfer_id); + TEST_ASSERT_EQUAL(26, transfer.payload_size); + TEST_ASSERT_EQUAL(26, transfer.payload.view.size); + TEST_ASSERT_EQUAL_MEMORY("Last night, I had a dream.", transfer.payload.view.data, transfer.payload.view.size); + TEST_ASSERT_NULL(transfer.payload.next); + // Free the transfer payload. + udpardRxFragmentFree(transfer.payload, mem_rx.fragment, mem_rx.payload); + TEST_ASSERT_EQUAL(2, alloc_rx_session.allocated_fragments); + TEST_ASSERT_EQUAL(0, alloc_rx_fragment.allocated_fragments); + TEST_ASSERT_EQUAL(0, alloc_rx_payload.allocated_fragments); + // Free the TX item. + udpardTxFree(mem_tx, udpardTxPop(&tx, tx_item)); + TEST_ASSERT_EQUAL(4, alloc_tx.allocated_fragments); + + // Fourth transfer. This one contains multiple frames. We process them one-by-one. + // Frame #0. + tx_item = udpardTxPeek(&tx); + TEST_ASSERT_NOT_NULL(tx_item); + const UdpardTxItem* prev_next = tx_item->next_in_transfer; + TEST_ASSERT_NOT_NULL(prev_next); + TEST_ASSERT_EQUAL(sub.at(0).udp_ip_endpoint.ip_address, tx_item->destination.ip_address); + TEST_ASSERT_EQUAL(10'002'000, tx_item->deadline_usec); + TEST_ASSERT_EQUAL(0xA7, tx_item->dscp); + TEST_ASSERT_EQUAL(0, + rxSubscriptionReceive(&sub.at(0), + alloc_rx_payload, + 10'008'000, + tx_item->datagram_payload, + 0, + &transfer)); + TEST_ASSERT_EQUAL(3, alloc_rx_session.allocated_fragments); + TEST_ASSERT_EQUAL(1, alloc_rx_fragment.allocated_fragments); + TEST_ASSERT_EQUAL(1, alloc_rx_payload.allocated_fragments); + // Free the TX item. + udpardTxFree(mem_tx, udpardTxPop(&tx, tx_item)); + TEST_ASSERT_EQUAL(3, alloc_tx.allocated_fragments); + // Frame #1. + tx_item = udpardTxPeek(&tx); + TEST_ASSERT_NOT_NULL(tx_item); + TEST_ASSERT_EQUAL_PTR(prev_next, tx_item); + prev_next = tx_item->next_in_transfer; + TEST_ASSERT_NOT_NULL(prev_next); + TEST_ASSERT_EQUAL(sub.at(0).udp_ip_endpoint.ip_address, tx_item->destination.ip_address); + TEST_ASSERT_EQUAL(10'002'000, tx_item->deadline_usec); + TEST_ASSERT_EQUAL(0xA7, tx_item->dscp); + TEST_ASSERT_EQUAL(0, + rxSubscriptionReceive(&sub.at(0), + alloc_rx_payload, + 10'008'001, + tx_item->datagram_payload, + 0, + &transfer)); + TEST_ASSERT_EQUAL(3, alloc_rx_session.allocated_fragments); + TEST_ASSERT_EQUAL(2, alloc_rx_fragment.allocated_fragments); + TEST_ASSERT_EQUAL(2, alloc_rx_payload.allocated_fragments); + // Free the TX item. + udpardTxFree(mem_tx, udpardTxPop(&tx, tx_item)); + TEST_ASSERT_EQUAL(2, alloc_tx.allocated_fragments); + // Frame #2. + tx_item = udpardTxPeek(&tx); + TEST_ASSERT_NOT_NULL(tx_item); + TEST_ASSERT_EQUAL_PTR(prev_next, tx_item); + prev_next = tx_item->next_in_transfer; + TEST_ASSERT_NOT_NULL(prev_next); + TEST_ASSERT_EQUAL(sub.at(0).udp_ip_endpoint.ip_address, tx_item->destination.ip_address); + TEST_ASSERT_EQUAL(10'002'000, tx_item->deadline_usec); + TEST_ASSERT_EQUAL(0xA7, tx_item->dscp); + TEST_ASSERT_EQUAL(0, + rxSubscriptionReceive(&sub.at(0), + alloc_rx_payload, + 10'008'002, + tx_item->datagram_payload, + 0, + &transfer)); + TEST_ASSERT_EQUAL(3, alloc_rx_session.allocated_fragments); + TEST_ASSERT_EQUAL(3, alloc_rx_fragment.allocated_fragments); + TEST_ASSERT_EQUAL(3, alloc_rx_payload.allocated_fragments); + // Free the TX item. + udpardTxFree(mem_tx, udpardTxPop(&tx, tx_item)); + TEST_ASSERT_EQUAL(1, alloc_tx.allocated_fragments); + // Frame #3. This is the last frame of the transfer. The payload is truncated, see the extent. + tx_item = udpardTxPeek(&tx); + TEST_ASSERT_NOT_NULL(tx_item); + TEST_ASSERT_EQUAL_PTR(prev_next, tx_item); + prev_next = tx_item->next_in_transfer; + TEST_ASSERT_NULL(prev_next); + TEST_ASSERT_EQUAL(sub.at(0).udp_ip_endpoint.ip_address, tx_item->destination.ip_address); + TEST_ASSERT_EQUAL(10'002'000, tx_item->deadline_usec); + TEST_ASSERT_EQUAL(0xA7, tx_item->dscp); + TEST_ASSERT_EQUAL(1, + rxSubscriptionReceive(&sub.at(0), + alloc_rx_payload, + 10'008'003, + tx_item->datagram_payload, + 0, + &transfer)); + TEST_ASSERT_EQUAL(3, alloc_rx_session.allocated_fragments); + TEST_ASSERT_EQUAL(2, alloc_rx_fragment.allocated_fragments); // Extent truncation + head optimization. + TEST_ASSERT_EQUAL(3, alloc_rx_payload.allocated_fragments); // Extent truncation. + // Check the received transfer. + TEST_ASSERT_EQUAL(10'008'000, transfer.timestamp_usec); + TEST_ASSERT_EQUAL(UdpardPriorityOptional, transfer.priority); + TEST_ASSERT_EQUAL(42, transfer.source_node_id); + TEST_ASSERT_EQUAL(1, transfer.transfer_id); + TEST_ASSERT_EQUAL(300, transfer.payload_size); // Defined by the configured extent setting for this sub. + TEST_ASSERT_EQUAL(100, transfer.payload.view.size); // Defined by the MTU setting. + std::array rx_eden{}; + TEST_ASSERT_EQUAL(300, udpardGather(transfer.payload, rx_eden.size(), rx_eden.data())); + TEST_ASSERT_EQUAL_MEMORY(Eden.data(), rx_eden.data(), 300); + // Free the transfer payload. + udpardRxFragmentFree(transfer.payload, mem_rx.fragment, mem_rx.payload); + TEST_ASSERT_EQUAL(3, alloc_rx_session.allocated_fragments); + TEST_ASSERT_EQUAL(0, alloc_rx_fragment.allocated_fragments); + TEST_ASSERT_EQUAL(0, alloc_rx_payload.allocated_fragments); + // Free the TX item. + udpardTxFree(mem_tx, udpardTxPop(&tx, tx_item)); + TEST_ASSERT_EQUAL(0, alloc_tx.allocated_fragments); + + // Close the subscriptions and ensure the memory is freed. + udpardRxSubscriptionFree(&sub.at(0)); + udpardRxSubscriptionFree(&sub.at(1)); + udpardRxSubscriptionFree(&sub.at(2)); + + // Final memory check. + TEST_ASSERT_EQUAL(0, alloc_rx_session.allocated_fragments); + TEST_ASSERT_EQUAL(0, alloc_rx_fragment.allocated_fragments); + TEST_ASSERT_EQUAL(0, alloc_rx_payload.allocated_fragments); + TEST_ASSERT_EQUAL(0, alloc_tx.allocated_fragments); +} + +void testRPC() +{ + InstrumentedAllocator alloc_tx; + InstrumentedAllocator alloc_rx_session; + InstrumentedAllocator alloc_rx_fragment; + InstrumentedAllocator alloc_rx_payload; + instrumentedAllocatorNew(&alloc_tx); + instrumentedAllocatorNew(&alloc_rx_session); + instrumentedAllocatorNew(&alloc_rx_fragment); + instrumentedAllocatorNew(&alloc_rx_payload); + const auto mem_tx = instrumentedAllocatorMakeMemoryResource(&alloc_tx); + const UdpardRxMemoryResources mem_rx{ + .session = instrumentedAllocatorMakeMemoryResource(&alloc_rx_session), + .fragment = instrumentedAllocatorMakeMemoryResource(&alloc_rx_fragment), + .payload = instrumentedAllocatorMakeMemoryDeleter(&alloc_rx_payload), + }; + // Initialize the TX pipeline. + UdpardTx tx{}; + const UdpardNodeID tx_node_id = 1234; + TEST_ASSERT_EQUAL(0, udpardTxInit(&tx, &tx_node_id, 2, mem_tx)); + tx.mtu = 500; + for (auto i = 0U; i <= UDPARD_PRIORITY_MAX; i++) + { + tx.dscp_value_per_priority[i] = static_cast(0xA0U + i); + } + // Initialize the RPC dispatcher and the RPC services. + UdpardRxRPCDispatcher dispatcher{}; + TEST_ASSERT_EQUAL(0, udpardRxRPCDispatcherInit(&dispatcher, 4321, mem_rx)); + UdpardRxRPCPort port_foo_a{}; + UdpardRxRPCPort port_foo_q{}; + TEST_ASSERT_EQUAL(1, udpardRxRPCDispatcherListen(&dispatcher, &port_foo_a, 200, false, 500)); + TEST_ASSERT_EQUAL(1, udpardRxRPCDispatcherListen(&dispatcher, &port_foo_q, 200, true, 500)); + + // Send a request. + UdpardTransferID transfer_id_shared = 0; + const std::string_view Entry = "But this simple world held a perplexing riddle: The entire galaxy was a vast " + "empty desert, but a highly intelligent civilization had appeared on the star " + "nearest to us. In this mystery, his thoughts found an entry point."; + TEST_ASSERT_EQUAL_INT32(1, + udpardTxRequest(&tx, + 10'000'000, + UdpardPriorityFast, + 200, + 4321, + &transfer_id_shared, + makePayload(Entry), + nullptr)); + TEST_ASSERT_EQUAL(1, tx.queue_size); + TEST_ASSERT_EQUAL(1, transfer_id_shared); + + // Send a response. + const std::string_view Forest = "In the dead, lonely, cold blackness, he saw the truth of the universe."; + TEST_ASSERT_EQUAL_INT32(1, + udpardTxRespond(&tx, + 10'001'000, + UdpardPriorityImmediate, + 200, + 4321, + transfer_id_shared, + makePayload(Forest), + nullptr)); + TEST_ASSERT_EQUAL(2, tx.queue_size); + TEST_ASSERT_EQUAL(1, transfer_id_shared); // Not incremented. + + // Transmit the enqueued frames by pushing them into the RPC dispatcher. + UdpardRxRPCTransfer transfer{}; + UdpardRxRPCPort* active_port = nullptr; + // First transfer. + TEST_ASSERT_EQUAL(0, alloc_rx_session.allocated_fragments); + TEST_ASSERT_EQUAL(0, alloc_rx_fragment.allocated_fragments); + TEST_ASSERT_EQUAL(0, alloc_rx_payload.allocated_fragments); + const UdpardTxItem* tx_item = udpardTxPeek(&tx); + TEST_ASSERT_NOT_NULL(tx_item); + TEST_ASSERT_EQUAL(dispatcher.udp_ip_endpoint.ip_address, tx_item->destination.ip_address); + TEST_ASSERT_NULL(tx_item->next_in_transfer); + TEST_ASSERT_EQUAL(10'001'000, tx_item->deadline_usec); + TEST_ASSERT_EQUAL(0xA1, tx_item->dscp); + TEST_ASSERT_EQUAL(1, + rxRPCDispatcherReceive(&dispatcher, + alloc_rx_payload, + 10'000'000, + tx_item->datagram_payload, + 0, + &active_port, + &transfer)); + TEST_ASSERT_EQUAL(1, alloc_rx_session.allocated_fragments); + TEST_ASSERT_EQUAL(0, alloc_rx_fragment.allocated_fragments); + TEST_ASSERT_EQUAL(1, alloc_rx_payload.allocated_fragments); + // Check the received transfer. + TEST_ASSERT_EQUAL(&port_foo_a, active_port); + TEST_ASSERT_EQUAL(200, transfer.service_id); + TEST_ASSERT_EQUAL(false, transfer.is_request); + TEST_ASSERT_EQUAL(10'000'000, transfer.base.timestamp_usec); + TEST_ASSERT_EQUAL(UdpardPriorityImmediate, transfer.base.priority); + TEST_ASSERT_EQUAL(1234, transfer.base.source_node_id); + TEST_ASSERT_EQUAL(1, transfer.base.transfer_id); + TEST_ASSERT_EQUAL(Forest.size(), transfer.base.payload_size); + TEST_ASSERT_EQUAL(Forest.size(), transfer.base.payload.view.size); + TEST_ASSERT_EQUAL_MEMORY(Forest.data(), transfer.base.payload.view.data, transfer.base.payload.view.size); + TEST_ASSERT_NULL(transfer.base.payload.next); + // Free the transfer payload. + udpardRxFragmentFree(transfer.base.payload, mem_rx.fragment, mem_rx.payload); + TEST_ASSERT_EQUAL(1, alloc_rx_session.allocated_fragments); + TEST_ASSERT_EQUAL(0, alloc_rx_fragment.allocated_fragments); + TEST_ASSERT_EQUAL(0, alloc_rx_payload.allocated_fragments); + // Send duplicates. + TEST_ASSERT_EQUAL(0, // Duplicate on the same iface. + rxRPCDispatcherReceive(&dispatcher, + alloc_rx_payload, + 10'000'100, + tx_item->datagram_payload, + 0, + &active_port, + &transfer)); + TEST_ASSERT_EQUAL(0, // Duplicate on another iface. + rxRPCDispatcherReceive(&dispatcher, + alloc_rx_payload, + 10'000'200, + tx_item->datagram_payload, + 2, + &active_port, + &transfer)); + // Ensure the duplicates do no alter memory usage. + TEST_ASSERT_EQUAL(1, alloc_rx_session.allocated_fragments); + TEST_ASSERT_EQUAL(0, alloc_rx_fragment.allocated_fragments); + TEST_ASSERT_EQUAL(0, alloc_rx_payload.allocated_fragments); + // Free the TX item. + udpardTxFree(mem_tx, udpardTxPop(&tx, tx_item)); + TEST_ASSERT_EQUAL(1, alloc_tx.allocated_fragments); + + // Second transfer. + tx_item = udpardTxPeek(&tx); + TEST_ASSERT_NOT_NULL(tx_item); + TEST_ASSERT_EQUAL(dispatcher.udp_ip_endpoint.ip_address, tx_item->destination.ip_address); + TEST_ASSERT_NULL(tx_item->next_in_transfer); + TEST_ASSERT_EQUAL(10'000'000, tx_item->deadline_usec); + TEST_ASSERT_EQUAL(0xA2, tx_item->dscp); + TEST_ASSERT_EQUAL(1, + rxRPCDispatcherReceive(&dispatcher, + alloc_rx_payload, + 10'001'000, + tx_item->datagram_payload, + 1, + &active_port, + &transfer)); + TEST_ASSERT_EQUAL(2, alloc_rx_session.allocated_fragments); + TEST_ASSERT_EQUAL(0, alloc_rx_fragment.allocated_fragments); + TEST_ASSERT_EQUAL(1, alloc_rx_payload.allocated_fragments); + // Check the received transfer. + TEST_ASSERT_EQUAL(&port_foo_q, active_port); + TEST_ASSERT_EQUAL(200, transfer.service_id); + TEST_ASSERT_EQUAL(true, transfer.is_request); + TEST_ASSERT_EQUAL(10'001'000, transfer.base.timestamp_usec); + TEST_ASSERT_EQUAL(UdpardPriorityFast, transfer.base.priority); + TEST_ASSERT_EQUAL(1234, transfer.base.source_node_id); + TEST_ASSERT_EQUAL(0, transfer.base.transfer_id); + TEST_ASSERT_EQUAL(Entry.size(), transfer.base.payload_size); + TEST_ASSERT_EQUAL(Entry.size(), transfer.base.payload.view.size); + TEST_ASSERT_EQUAL_MEMORY(Entry.data(), transfer.base.payload.view.data, transfer.base.payload.view.size); + TEST_ASSERT_NULL(transfer.base.payload.next); + // Free the transfer payload. + udpardRxFragmentFree(transfer.base.payload, mem_rx.fragment, mem_rx.payload); + TEST_ASSERT_EQUAL(2, alloc_rx_session.allocated_fragments); + TEST_ASSERT_EQUAL(0, alloc_rx_fragment.allocated_fragments); + TEST_ASSERT_EQUAL(0, alloc_rx_payload.allocated_fragments); + // Send duplicates. + TEST_ASSERT_EQUAL(0, // Duplicate on the same iface. + rxRPCDispatcherReceive(&dispatcher, + alloc_rx_payload, + 10'001'100, + tx_item->datagram_payload, + 0, + &active_port, + &transfer)); + TEST_ASSERT_EQUAL(0, // Duplicate on another iface. + rxRPCDispatcherReceive(&dispatcher, + alloc_rx_payload, + 10'001'200, + tx_item->datagram_payload, + 2, + &active_port, + &transfer)); + // Ensure the duplicates do no alter memory usage. + TEST_ASSERT_EQUAL(2, alloc_rx_session.allocated_fragments); + TEST_ASSERT_EQUAL(0, alloc_rx_fragment.allocated_fragments); + TEST_ASSERT_EQUAL(0, alloc_rx_payload.allocated_fragments); + // Free the TX item. + udpardTxFree(mem_tx, udpardTxPop(&tx, tx_item)); + TEST_ASSERT_EQUAL(0, alloc_tx.allocated_fragments); + + // Destroy the ports. + udpardRxRPCDispatcherCancel(&dispatcher, 200, false); + udpardRxRPCDispatcherCancel(&dispatcher, 200, true); + + // Final memory check. + TEST_ASSERT_EQUAL(0, alloc_rx_session.allocated_fragments); + TEST_ASSERT_EQUAL(0, alloc_rx_fragment.allocated_fragments); + TEST_ASSERT_EQUAL(0, alloc_rx_payload.allocated_fragments); + TEST_ASSERT_EQUAL(0, alloc_tx.allocated_fragments); +} } // namespace @@ -19,6 +610,7 @@ void tearDown() {} int main() { UNITY_BEGIN(); - // TODO + RUN_TEST(testPubSub); + RUN_TEST(testRPC); return UNITY_END(); } From 85e1b89d1375fd4e3c2306389cd405218d9195ed Mon Sep 17 00:00:00 2001 From: Pavel Kirienko Date: Tue, 22 Aug 2023 15:45:52 +0300 Subject: [PATCH 2/4] Expand a bit on the available iov options --- libudpard/udpard.h | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/libudpard/udpard.h b/libudpard/udpard.h index 3943c35..f4f389e 100644 --- a/libudpard/udpard.h +++ b/libudpard/udpard.h @@ -390,11 +390,11 @@ struct UdpardMemoryResource /// datagrams ready for transmission are not enqueued into the local prioritized queue but instead are sent directly /// to the network interface driver using a dedicated callback. The callback would accept not just a single /// chunk of data but a list of three chunks to avoid copying the source transfer payload: the datagram header, -/// the payload, and (only for the last frame) the CRC. The driver would then use some form of vectorized IO to -/// transmit the data; the advantage of this approach is that up to two data copy operations are eliminated from the -/// stack and the memory allocator is not used at all. The disadvantage is that if the driver callback is blocking, -/// the application thread will be blocked as well; plus the driver will be responsible for the correct prioritization -/// of the outgoing datagrams according to the DSCP value. +/// the payload, and (only for the last frame) the CRC. The driver would then use some form of vectorized IO or +/// MSG_MORE/UDP_CORK to transmit the data; the advantage of this approach is that up to two data copy operations are +/// eliminated from the stack and the memory allocator is not used at all. The disadvantage is that if the driver +/// callback is blocking, the application thread will be blocked as well; plus the driver will be responsible +/// for the correct prioritization of the outgoing datagrams according to the DSCP value. struct UdpardTx { /// Pointer to the node-ID of the local node, which is used to populate the source node-ID field of outgoing From 8789c3a493663b761ca47b169ab5c981e7d427c3 Mon Sep 17 00:00:00 2001 From: Pavel Kirienko Date: Tue, 22 Aug 2023 21:21:28 +0300 Subject: [PATCH 3/4] Review --- libudpard/udpard.h | 4 +++- tests/CMakeLists.txt | 4 ++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/libudpard/udpard.h b/libudpard/udpard.h index f4f389e..b845f1e 100644 --- a/libudpard/udpard.h +++ b/libudpard/udpard.h @@ -527,7 +527,9 @@ int_fast8_t udpardTxInit(struct UdpardTx* const self, /// level on a given port. This is because the frames of the new transfer will be enqueued before the frames of /// the previous transfer, so the frames of the previous transfer will be transmitted only after the frames of /// the new transfer are transmitted, causing the receiver to discard them as duplicates due to their lower transfer-ID. -/// It is therefore not recommended to change the priority level dynamically. +/// To avoid this, it is necessary to wait for all frames originating from the port to be delivered before increasing +/// the priority level on the port. The "user_transfer_reference" may help here as it allows the user to establish +/// traceability from enqueued transfer frames (datagrams) back to the port they originate from. /// /// The function returns the number of UDP datagrams enqueued, which is always a positive number, in case of success. /// In case of failure, the function returns a negated error code. diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 8f90e22..bc0f063 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -16,8 +16,8 @@ enable_testing() set(CTEST_OUTPUT_ON_FAILURE ON) set(NO_STATIC_ANALYSIS OFF CACHE BOOL "disable udpard static analysis") -set(library_dir "${CMAKE_CURRENT_SOURCE_DIR}/../libudpard") -set(unity_root "${CMAKE_CURRENT_SOURCE_DIR}/../submodules/unity") +set(library_dir "${CMAKE_SOURCE_DIR}/libudpard") +set(unity_root "${CMAKE_SOURCE_DIR}/submodules/unity") # Use -DNO_STATIC_ANALYSIS=1 to suppress static analysis. # If not suppressed, the tools used here shall be available, otherwise the build will fail. From 2bee1b55687b37ed7245c97ba7c80eff5956ea1d Mon Sep 17 00:00:00 2001 From: Pavel Kirienko Date: Fri, 25 Aug 2023 20:45:16 +0300 Subject: [PATCH 4/4] Fix Sonar runner --- tools/run_sonar.sh | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tools/run_sonar.sh b/tools/run_sonar.sh index 3645a63..c0440d1 100755 --- a/tools/run_sonar.sh +++ b/tools/run_sonar.sh @@ -37,7 +37,7 @@ mkdir $BUILD_DIR && cd $BUILD_DIR || die # RTFM: https://clang.llvm.org/docs/UsersManual.html#profiling-with-instrumentation # https://clang.llvm.org/docs/SourceBasedCodeCoverage.html profile_flags="-fprofile-instr-generate='%p.profraw' -fcoverage-mapping" -cmake ../tests \ +cmake .. \ -DNO_STATIC_ANALYSIS=1 \ -DCMAKE_BUILD_TYPE=Debug \ -DCMAKE_C_COMPILER=clang \ @@ -47,11 +47,11 @@ cmake ../tests \ build-wrapper-linux-x86-64 --out-dir . make VERBOSE=1 -j"$(nproc)" || die "Build wrapper failed" make test ARGS="--verbose" || die "Test execution failed" # These tools shall be of the same version as LLVM/Clang. -llvm-profdata merge -sparse *.profraw -o profdata || die +llvm-profdata merge -sparse tests/*.profraw -o profdata || die # Generate coverage reports both for the SonarCloud scanner and for us humans. llvm_cov_objects="" -for file in test_*_*; do llvm_cov_objects="$llvm_cov_objects -object $file"; done +for file in tests/test_*_*; do llvm_cov_objects="$llvm_cov_objects -object $file"; done echo "llvm-cov objects: $llvm_cov_objects" llvm-cov report $llvm_cov_objects -instr-profile=profdata || die llvm-cov show $llvm_cov_objects -instr-profile=profdata -format=text > "coverage.txt" || die