diff --git a/.github/workflows/build_and_test_cpp.yml b/.github/workflows/build_and_test_cpp.yml new file mode 100644 index 00000000..5cdd14d7 --- /dev/null +++ b/.github/workflows/build_and_test_cpp.yml @@ -0,0 +1,78 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +name: C++ Build and Tests + +on: + push: + branches: + - main + paths-ignore: + - 'website/**' + - '**/*.md' + pull_request: + branches: + - main + paths-ignore: + - 'website/**' + - '**/*.md' + - 'bindings/python/**' + workflow_dispatch: + +concurrency: + group: ${{ github.workflow }}-${{ github.ref }}-${{ github.event_name }} + cancel-in-progress: true + +jobs: + build-and-test-cpp: + timeout-minutes: 60 + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + + - name: Install protoc + run: sudo apt-get update && sudo apt-get install -y protobuf-compiler + + - name: Install Apache Arrow C++ + run: | + sudo apt-get install -y -V ca-certificates lsb-release wget + wget https://apache.jfrog.io/artifactory/arrow/$(lsb_release --id --short | tr 'A-Z' 'a-z')/apache-arrow-apt-source-latest-$(lsb_release --codename --short).deb + sudo apt-get install -y -V ./apache-arrow-apt-source-latest-$(lsb_release --codename --short).deb + sudo apt-get update + sudo apt-get install -y -V libarrow-dev + + - name: Rust Cache + uses: actions/cache@v4 + with: + path: | + ~/.cargo/registry + ~/.cargo/git + target + key: cpp-test-${{ runner.os }}-${{ hashFiles('**/Cargo.lock') }} + + - name: Build C++ bindings and tests + working-directory: bindings/cpp + run: | + cmake -B build -DFLUSS_ENABLE_TESTING=ON -DCMAKE_BUILD_TYPE=Debug + cmake --build build --parallel + + - name: Run C++ integration tests + working-directory: bindings/cpp + run: cd build && ctest --output-on-failure --timeout 300 + env: + RUST_LOG: DEBUG + RUST_BACKTRACE: full diff --git a/.github/workflows/build_and_test_python.yml b/.github/workflows/build_and_test_python.yml new file mode 100644 index 00000000..efb5caab --- /dev/null +++ b/.github/workflows/build_and_test_python.yml @@ -0,0 +1,81 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +name: Python Build and Tests + +on: + push: + branches: + - main + paths-ignore: + - 'website/**' + - '**/*.md' + pull_request: + branches: + - main + paths-ignore: + - 'website/**' + - '**/*.md' + - 'bindings/cpp/**' + workflow_dispatch: + +concurrency: + group: ${{ github.workflow }}-${{ github.ref }}-${{ github.event_name }} + cancel-in-progress: true + +jobs: + build-and-test-python: + timeout-minutes: 60 + runs-on: ubuntu-latest + strategy: + matrix: + python: ["3.9", "3.10", "3.11", "3.12"] + steps: + - uses: actions/checkout@v4 + + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: ${{ matrix.python }} + + - name: Install uv + uses: astral-sh/setup-uv@v4 + + - name: Install protoc + run: sudo apt-get update && sudo apt-get install -y protobuf-compiler + + - name: Rust Cache + uses: actions/cache@v4 + with: + path: | + ~/.cargo/registry + ~/.cargo/git + target + key: python-test-${{ runner.os }}-${{ matrix.python }}-${{ hashFiles('**/Cargo.lock') }} + + - name: Build Python bindings + working-directory: bindings/python + run: | + uv sync --extra dev + uv run maturin develop + + - name: Run Python integration tests + working-directory: bindings/python + run: uv run pytest test/ -v + env: + RUST_LOG: DEBUG + RUST_BACKTRACE: full diff --git a/.github/workflows/build_and_test_rust.yml b/.github/workflows/build_and_test_rust.yml new file mode 100644 index 00000000..c9e05b74 --- /dev/null +++ b/.github/workflows/build_and_test_rust.yml @@ -0,0 +1,85 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +name: Rust Build and Tests + +on: + push: + branches: + - main + paths-ignore: + - 'website/**' + - '**/*.md' + pull_request: + branches: + - main + paths-ignore: + - 'website/**' + - '**/*.md' + - 'bindings/python/**' + - 'bindings/cpp/**' + workflow_dispatch: + +concurrency: + group: ${{ github.workflow }}-${{ github.ref }}-${{ github.event_name }} + cancel-in-progress: true + +jobs: + build-and-test-rust: + timeout-minutes: 60 + runs-on: ${{ matrix.os }} + strategy: + matrix: + os: + - ubuntu-latest + - macos-latest + steps: + - uses: actions/checkout@v4 + + - name: Install protoc + run: | + if [ "$RUNNER_OS" = "Linux" ]; then + sudo apt-get update && sudo apt-get install -y protobuf-compiler + elif [ "$RUNNER_OS" = "macOS" ]; then + brew install protobuf + fi + + - name: Rust Cache + uses: actions/cache@v4 + with: + path: | + ~/.cargo/registry + ~/.cargo/git + target + key: rust-${{ runner.os }}-${{ hashFiles('**/Cargo.lock') }} + + - name: Build + run: cargo build --workspace --all-targets --exclude fluss_python --exclude fluss-cpp + + - name: Unit Test + run: cargo test --all-targets --workspace --exclude fluss_python --exclude fluss-cpp + env: + RUST_LOG: DEBUG + RUST_BACKTRACE: full + + - name: Integration Test (Linux only) + if: runner.os == 'Linux' + run: | + RUST_TEST_THREADS=1 cargo test --features integration_tests --all-targets --workspace --exclude fluss_python --exclude fluss-cpp -- --nocapture + env: + RUST_LOG: DEBUG + RUST_BACKTRACE: full diff --git a/.github/workflows/docs-check.yml b/.github/workflows/check_documentation.yml similarity index 96% rename from .github/workflows/docs-check.yml rename to .github/workflows/check_documentation.yml index 6408c541..70e6a438 100644 --- a/.github/workflows/docs-check.yml +++ b/.github/workflows/check_documentation.yml @@ -17,7 +17,7 @@ ################################################################################ # This workflow is meant for checking broken links in the documentation. -name: Check Documentation +name: Documentation Check permissions: contents: read on: @@ -31,7 +31,7 @@ on: - 'website/**' jobs: - test-deploy: + check-documentation: runs-on: ubuntu-latest defaults: run: diff --git a/.github/workflows/check_license_and_formatting.yml b/.github/workflows/check_license_and_formatting.yml new file mode 100644 index 00000000..1b83b749 --- /dev/null +++ b/.github/workflows/check_license_and_formatting.yml @@ -0,0 +1,60 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +name: License and Formatting Check + +on: + push: + branches: + - main + paths-ignore: + - 'website/**' + - '**/*.md' + pull_request: + branches: + - main + workflow_dispatch: + +concurrency: + group: ${{ github.workflow }}-${{ github.ref }}-${{ github.event_name }} + cancel-in-progress: true + +jobs: + check-license-and-formatting: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + + - name: Check License Header + uses: apache/skywalking-eyes/header@v0.6.0 + + - name: Install cargo-deny + uses: taiki-e/install-action@v2 + with: + tool: cargo-deny@0.14.22 + + - name: Check dependency licenses (Apache-compatible) + run: cargo deny check licenses + + - name: Install protoc + run: sudo apt-get update && sudo apt-get install -y protobuf-compiler + + - name: Format + run: cargo fmt --all -- --check + + - name: Clippy + run: cargo clippy --all-targets --workspace -- -D warnings diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml deleted file mode 100644 index d51e3c07..00000000 --- a/.github/workflows/ci.yml +++ /dev/null @@ -1,187 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -name: CI - -on: - push: - branches: - - main - paths-ignore: - - 'website/**' - - '**/*.md' - pull_request: - branches: - - main - paths-ignore: - - 'website/**' - - '**/*.md' - -concurrency: - group: ${{ github.workflow }}-${{ github.ref }}-${{ github.event_name }} - cancel-in-progress: true - -jobs: - check: - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v4 - - - name: Check License Header - uses: apache/skywalking-eyes/header@v0.6.0 - - - name: Install cargo-deny - uses: taiki-e/install-action@v2 - with: - tool: cargo-deny@0.14.22 - - - name: Check dependency licenses (Apache-compatible) - run: cargo deny check licenses - - - name: Install protoc - run: sudo apt-get update && sudo apt-get install -y protobuf-compiler - - - name: Format - run: cargo fmt --all -- --check - - - name: Clippy - run: cargo clippy --all-targets --workspace -- -D warnings - - build: - runs-on: ${{ matrix.os }} - strategy: - matrix: - os: - - ubuntu-latest - - macos-latest - python: ["3.11", "3.12", "3.13"] - steps: - - uses: actions/checkout@v4 - - - name: Set up Python - uses: actions/setup-python@v5 - with: - python-version: ${{ matrix.python }} - - - name: Install protoc - run: | - if [ "$RUNNER_OS" = "Linux" ]; then - sudo apt-get update && sudo apt-get install -y protobuf-compiler - elif [ "$RUNNER_OS" = "macOS" ]; then - brew install protobuf - fi - - - name: Rust Cache - uses: actions/cache@v4 - with: - path: | - ~/.cargo/registry - ~/.cargo/git - target - key: build-${{ runner.os }}-${{ matrix.python }}-${{ hashFiles('**/Cargo.lock') }} - - - name: Build - run: cargo build --workspace --all-targets - - test: - runs-on: ${{ matrix.os }} - strategy: - matrix: - os: - - ubuntu-latest - - macos-latest - python: ["3.11", "3.12", "3.13"] - steps: - - uses: actions/checkout@v4 - - - name: Set up Python - uses: actions/setup-python@v5 - with: - python-version: ${{ matrix.python }} - - - name: Install protoc - run: | - if [ "$RUNNER_OS" = "Linux" ]; then - sudo apt-get update && sudo apt-get install -y protobuf-compiler - elif [ "$RUNNER_OS" = "macOS" ]; then - brew install protobuf - fi - - - name: Rust Cache - uses: actions/cache@v4 - with: - path: | - ~/.cargo/registry - ~/.cargo/git - target - key: test-${{ runner.os }}-${{ matrix.python }}-${{ hashFiles('**/Cargo.lock') }} - - - name: Unit Test - run: cargo test --all-targets --workspace - env: - RUST_LOG: DEBUG - RUST_BACKTRACE: full - - - name: Integration Test (Linux only) - if: runner.os == 'Linux' - run: | - RUST_TEST_THREADS=1 cargo test --features integration_tests --all-targets --workspace -- --nocapture - env: - RUST_LOG: DEBUG - RUST_BACKTRACE: full - - python-integration-test: - timeout-minutes: 60 - runs-on: ubuntu-latest - strategy: - matrix: - python: ["3.9", "3.10", "3.11", "3.12"] - steps: - - uses: actions/checkout@v4 - - - name: Set up Python - uses: actions/setup-python@v5 - with: - python-version: ${{ matrix.python }} - - - name: Install uv - uses: astral-sh/setup-uv@v4 - - - name: Install protoc - run: sudo apt-get update && sudo apt-get install -y protobuf-compiler - - - name: Rust Cache - uses: actions/cache@v4 - with: - path: | - ~/.cargo/registry - ~/.cargo/git - target - key: python-test-${{ runner.os }}-${{ matrix.python }}-${{ hashFiles('**/Cargo.lock') }} - - - name: Build Python bindings - working-directory: bindings/python - run: | - uv sync --extra dev - uv run maturin develop - - - name: Run Python integration tests - working-directory: bindings/python - run: uv run pytest test/ -v - env: - RUST_LOG: DEBUG - RUST_BACKTRACE: full diff --git a/bindings/cpp/CMakeLists.txt b/bindings/cpp/CMakeLists.txt index a8f527ed..6bd9fc79 100644 --- a/bindings/cpp/CMakeLists.txt +++ b/bindings/cpp/CMakeLists.txt @@ -123,3 +123,26 @@ if (FLUSS_ENABLE_ADDRESS_SANITIZER) target_compile_options(fluss_cpp PRIVATE -fsanitize=leak,address,undefined -fno-omit-frame-pointer -fno-common -O1) target_link_options(fluss_cpp PRIVATE -fsanitize=leak,address,undefined) endif() + +if (FLUSS_ENABLE_TESTING) + FetchContent_Declare( + googletest + URL https://github.com/google/googletest/archive/refs/tags/v${FLUSS_GOOGLETEST_VERSION}.tar.gz + ) + set(gtest_force_shared_crt ON CACHE BOOL "" FORCE) + FetchContent_MakeAvailable(googletest) + + enable_testing() + + file(GLOB TEST_SOURCE_FILES "test/*.cpp") + add_executable(fluss_cpp_test ${TEST_SOURCE_FILES}) + target_link_libraries(fluss_cpp_test PRIVATE fluss_cpp GTest::gtest) + target_link_libraries(fluss_cpp_test PRIVATE Arrow::arrow_shared) + target_compile_definitions(fluss_cpp_test PRIVATE ARROW_FOUND) + target_include_directories(fluss_cpp_test PRIVATE + ${CPP_INCLUDE_DIR} + ${PROJECT_SOURCE_DIR}/test + ) + + add_test(NAME fluss_cpp_integration_tests COMMAND fluss_cpp_test) +endif() diff --git a/bindings/cpp/test/test_admin.cpp b/bindings/cpp/test/test_admin.cpp new file mode 100644 index 00000000..b6bb25b7 --- /dev/null +++ b/bindings/cpp/test/test_admin.cpp @@ -0,0 +1,331 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include + +#include "test_utils.h" + +class AdminTest : public ::testing::Test { + protected: + fluss::Admin& admin() { return fluss_test::FlussTestEnvironment::Instance()->GetAdmin(); } +}; + +TEST_F(AdminTest, CreateDatabase) { + auto& adm = admin(); + + std::string db_name = "test_create_database_cpp"; + + // Database should not exist initially + bool exists = true; + ASSERT_OK(adm.DatabaseExists(db_name, exists)); + ASSERT_FALSE(exists); + + // Create database with descriptor + fluss::DatabaseDescriptor descriptor; + descriptor.comment = "test_db"; + descriptor.properties = {{"k1", "v1"}, {"k2", "v2"}}; + ASSERT_OK(adm.CreateDatabase(db_name, descriptor, false)); + + // Database should exist now + ASSERT_OK(adm.DatabaseExists(db_name, exists)); + ASSERT_TRUE(exists); + + // Get database info + fluss::DatabaseInfo db_info; + ASSERT_OK(adm.GetDatabaseInfo(db_name, db_info)); + EXPECT_EQ(db_info.database_name, db_name); + EXPECT_EQ(db_info.comment, "test_db"); + EXPECT_EQ(db_info.properties.at("k1"), "v1"); + EXPECT_EQ(db_info.properties.at("k2"), "v2"); + + // Drop database + ASSERT_OK(adm.DropDatabase(db_name, false, true)); + + // Database should not exist now + ASSERT_OK(adm.DatabaseExists(db_name, exists)); + ASSERT_FALSE(exists); +} + +TEST_F(AdminTest, CreateTable) { + auto& adm = admin(); + + std::string db_name = "test_create_table_cpp_db"; + fluss::DatabaseDescriptor db_desc; + db_desc.comment = "Database for test_create_table"; + + bool exists = false; + ASSERT_OK(adm.DatabaseExists(db_name, exists)); + ASSERT_FALSE(exists); + + ASSERT_OK(adm.CreateDatabase(db_name, db_desc, false)); + + std::string table_name = "test_user_table"; + fluss::TablePath table_path(db_name, table_name); + + // Build schema + auto schema = fluss::Schema::NewBuilder() + .AddColumn("id", fluss::DataType::Int()) + .AddColumn("name", fluss::DataType::String()) + .AddColumn("age", fluss::DataType::Int(), "User's age (optional)") + .AddColumn("email", fluss::DataType::String()) + .SetPrimaryKeys({"id"}) + .Build(); + + // Build table descriptor + auto table_descriptor = fluss::TableDescriptor::NewBuilder() + .SetSchema(schema) + .SetComment("Test table for user data (id, name, age, email)") + .SetBucketCount(3) + .SetBucketKeys({"id"}) + .SetProperty("table.replication.factor", "1") + .SetLogFormat("arrow") + .SetKvFormat("indexed") + .Build(); + + // Create table + ASSERT_OK(adm.CreateTable(table_path, table_descriptor, false)); + + // Table should exist + ASSERT_OK(adm.TableExists(table_path, exists)); + ASSERT_TRUE(exists); + + // List tables + std::vector tables; + ASSERT_OK(adm.ListTables(db_name, tables)); + ASSERT_EQ(tables.size(), 1u); + EXPECT_TRUE(std::find(tables.begin(), tables.end(), table_name) != tables.end()); + + // Get table info + fluss::TableInfo table_info; + ASSERT_OK(adm.GetTableInfo(table_path, table_info)); + + EXPECT_EQ(table_info.comment, "Test table for user data (id, name, age, email)"); + EXPECT_EQ(table_info.primary_keys, std::vector{"id"}); + EXPECT_EQ(table_info.num_buckets, 3); + EXPECT_EQ(table_info.bucket_keys, std::vector{"id"}); + + // Drop table + ASSERT_OK(adm.DropTable(table_path, false)); + ASSERT_OK(adm.TableExists(table_path, exists)); + ASSERT_FALSE(exists); + + // Drop database + ASSERT_OK(adm.DropDatabase(db_name, false, true)); + ASSERT_OK(adm.DatabaseExists(db_name, exists)); + ASSERT_FALSE(exists); +} + +TEST_F(AdminTest, PartitionApis) { + auto& adm = admin(); + + std::string db_name = "test_partition_apis_cpp_db"; + fluss::DatabaseDescriptor db_desc; + db_desc.comment = "Database for test_partition_apis"; + ASSERT_OK(adm.CreateDatabase(db_name, db_desc, true)); + + fluss::TablePath table_path(db_name, "partitioned_table"); + + auto schema = fluss::Schema::NewBuilder() + .AddColumn("id", fluss::DataType::Int()) + .AddColumn("name", fluss::DataType::String()) + .AddColumn("dt", fluss::DataType::String()) + .AddColumn("region", fluss::DataType::String()) + .SetPrimaryKeys({"id", "dt", "region"}) + .Build(); + + auto table_descriptor = fluss::TableDescriptor::NewBuilder() + .SetSchema(schema) + .SetBucketCount(3) + .SetBucketKeys({"id"}) + .SetPartitionKeys({"dt", "region"}) + .SetProperty("table.replication.factor", "1") + .SetLogFormat("arrow") + .SetKvFormat("compacted") + .Build(); + + ASSERT_OK(adm.CreateTable(table_path, table_descriptor, true)); + + // No partitions initially + std::vector partitions; + ASSERT_OK(adm.ListPartitionInfos(table_path, partitions)); + ASSERT_TRUE(partitions.empty()); + + // Create a partition + std::unordered_map partition_spec = { + {"dt", "2024-01-15"}, {"region", "EMEA"}}; + ASSERT_OK(adm.CreatePartition(table_path, partition_spec, false)); + + // Should have one partition + ASSERT_OK(adm.ListPartitionInfos(table_path, partitions)); + ASSERT_EQ(partitions.size(), 1u); + EXPECT_EQ(partitions[0].partition_name, "2024-01-15$EMEA"); + + // List with partial spec filter - should find the partition + std::unordered_map partial_spec = {{"dt", "2024-01-15"}}; + std::vector partitions_with_spec; + ASSERT_OK(adm.ListPartitionInfos(table_path, partial_spec, partitions_with_spec)); + ASSERT_EQ(partitions_with_spec.size(), 1u); + EXPECT_EQ(partitions_with_spec[0].partition_name, "2024-01-15$EMEA"); + + // List with non-matching spec - should find no partitions + std::unordered_map non_matching_spec = {{"dt", "2024-01-16"}}; + std::vector empty_partitions; + ASSERT_OK(adm.ListPartitionInfos(table_path, non_matching_spec, empty_partitions)); + ASSERT_TRUE(empty_partitions.empty()); + + // Drop partition + ASSERT_OK(adm.DropPartition(table_path, partition_spec, false)); + + ASSERT_OK(adm.ListPartitionInfos(table_path, partitions)); + ASSERT_TRUE(partitions.empty()); + + // Cleanup + ASSERT_OK(adm.DropTable(table_path, true)); + ASSERT_OK(adm.DropDatabase(db_name, true, true)); +} + +TEST_F(AdminTest, FlussErrorResponse) { + auto& adm = admin(); + + fluss::TablePath table_path("fluss", "not_exist_cpp"); + + fluss::TableInfo info; + auto result = adm.GetTableInfo(table_path, info); + ASSERT_FALSE(result.Ok()); + EXPECT_EQ(result.error_code, fluss::ErrorCode::TABLE_NOT_EXIST); +} + +TEST_F(AdminTest, ErrorDatabaseNotExist) { + auto& adm = admin(); + + // get_database_info for non-existent database + fluss::DatabaseInfo info; + auto result = adm.GetDatabaseInfo("no_such_db_cpp", info); + ASSERT_FALSE(result.Ok()); + EXPECT_EQ(result.error_code, fluss::ErrorCode::DATABASE_NOT_EXIST); + + // drop_database without ignore flag + result = adm.DropDatabase("no_such_db_cpp", false, false); + ASSERT_FALSE(result.Ok()); + EXPECT_EQ(result.error_code, fluss::ErrorCode::DATABASE_NOT_EXIST); + + // list_tables for non-existent database + std::vector tables; + result = adm.ListTables("no_such_db_cpp", tables); + ASSERT_FALSE(result.Ok()); + EXPECT_EQ(result.error_code, fluss::ErrorCode::DATABASE_NOT_EXIST); +} + +TEST_F(AdminTest, ErrorDatabaseAlreadyExist) { + auto& adm = admin(); + + std::string db_name = "test_error_db_already_exist_cpp"; + fluss::DatabaseDescriptor descriptor; + + ASSERT_OK(adm.CreateDatabase(db_name, descriptor, false)); + + // Create same database again without ignore flag + auto result = adm.CreateDatabase(db_name, descriptor, false); + ASSERT_FALSE(result.Ok()); + EXPECT_EQ(result.error_code, fluss::ErrorCode::DATABASE_ALREADY_EXIST); + + // With ignore flag should succeed + ASSERT_OK(adm.CreateDatabase(db_name, descriptor, true)); + + // Cleanup + ASSERT_OK(adm.DropDatabase(db_name, true, true)); +} + +TEST_F(AdminTest, ErrorTableAlreadyExist) { + auto& adm = admin(); + + std::string db_name = "test_error_tbl_already_exist_cpp_db"; + fluss::DatabaseDescriptor db_desc; + ASSERT_OK(adm.CreateDatabase(db_name, db_desc, true)); + + fluss::TablePath table_path(db_name, "my_table"); + auto schema = fluss::Schema::NewBuilder() + .AddColumn("id", fluss::DataType::Int()) + .AddColumn("name", fluss::DataType::String()) + .Build(); + auto table_desc = fluss::TableDescriptor::NewBuilder() + .SetSchema(schema) + .SetBucketCount(1) + .SetProperty("table.replication.factor", "1") + .Build(); + + ASSERT_OK(adm.CreateTable(table_path, table_desc, false)); + + // Create same table again without ignore flag + auto result = adm.CreateTable(table_path, table_desc, false); + ASSERT_FALSE(result.Ok()); + EXPECT_EQ(result.error_code, fluss::ErrorCode::TABLE_ALREADY_EXIST); + + // With ignore flag should succeed + ASSERT_OK(adm.CreateTable(table_path, table_desc, true)); + + // Cleanup + ASSERT_OK(adm.DropTable(table_path, true)); + ASSERT_OK(adm.DropDatabase(db_name, true, true)); +} + +TEST_F(AdminTest, ErrorTableNotExist) { + auto& adm = admin(); + + fluss::TablePath table_path("fluss", "no_such_table_cpp"); + + // Drop without ignore flag + auto result = adm.DropTable(table_path, false); + ASSERT_FALSE(result.Ok()); + EXPECT_EQ(result.error_code, fluss::ErrorCode::TABLE_NOT_EXIST); + + // Drop with ignore flag should succeed + ASSERT_OK(adm.DropTable(table_path, true)); +} + +TEST_F(AdminTest, ErrorTableNotPartitioned) { + auto& adm = admin(); + + std::string db_name = "test_error_not_partitioned_cpp_db"; + fluss::DatabaseDescriptor db_desc; + ASSERT_OK(adm.CreateDatabase(db_name, db_desc, true)); + + fluss::TablePath table_path(db_name, "non_partitioned_table"); + auto schema = fluss::Schema::NewBuilder() + .AddColumn("id", fluss::DataType::Int()) + .AddColumn("name", fluss::DataType::String()) + .Build(); + auto table_desc = fluss::TableDescriptor::NewBuilder() + .SetSchema(schema) + .SetBucketCount(1) + .SetProperty("table.replication.factor", "1") + .Build(); + + ASSERT_OK(adm.CreateTable(table_path, table_desc, false)); + + // list_partition_infos on non-partitioned table + std::vector partitions; + auto result = adm.ListPartitionInfos(table_path, partitions); + ASSERT_FALSE(result.Ok()); + EXPECT_EQ(result.error_code, fluss::ErrorCode::TABLE_NOT_PARTITIONED_EXCEPTION); + + // Cleanup + ASSERT_OK(adm.DropTable(table_path, true)); + ASSERT_OK(adm.DropDatabase(db_name, true, true)); +} diff --git a/bindings/cpp/test/test_kv_table.cpp b/bindings/cpp/test/test_kv_table.cpp new file mode 100644 index 00000000..9c4f7a02 --- /dev/null +++ b/bindings/cpp/test/test_kv_table.cpp @@ -0,0 +1,733 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include + +#include "test_utils.h" + +class KvTableTest : public ::testing::Test { + protected: + fluss::Admin& admin() { return fluss_test::FlussTestEnvironment::Instance()->GetAdmin(); } + + fluss::Connection& connection() { + return fluss_test::FlussTestEnvironment::Instance()->GetConnection(); + } +}; + +TEST_F(KvTableTest, UpsertDeleteAndLookup) { + auto& adm = admin(); + auto& conn = connection(); + + fluss::TablePath table_path("fluss", "test_upsert_and_lookup_cpp"); + + auto schema = fluss::Schema::NewBuilder() + .AddColumn("id", fluss::DataType::Int()) + .AddColumn("name", fluss::DataType::String()) + .AddColumn("age", fluss::DataType::BigInt()) + .SetPrimaryKeys({"id"}) + .Build(); + + auto table_descriptor = fluss::TableDescriptor::NewBuilder() + .SetSchema(schema) + .SetProperty("table.replication.factor", "1") + .Build(); + + fluss_test::CreateTable(adm, table_path, table_descriptor); + + fluss::Table table; + ASSERT_OK(conn.GetTable(table_path, table)); + + // Create upsert writer + auto table_upsert = table.NewUpsert(); + fluss::UpsertWriter upsert_writer; + ASSERT_OK(table_upsert.CreateWriter(upsert_writer)); + + // Upsert 3 rows (fire-and-forget, then flush) + struct TestData { + int32_t id; + std::string name; + int64_t age; + }; + std::vector test_data = {{1, "Verso", 32}, {2, "Noco", 25}, {3, "Esquie", 35}}; + + for (const auto& d : test_data) { + fluss::GenericRow row(3); + row.SetInt32(0, d.id); + row.SetString(1, d.name); + row.SetInt64(2, d.age); + ASSERT_OK(upsert_writer.Upsert(row)); + } + ASSERT_OK(upsert_writer.Flush()); + + // Create lookuper + fluss::Lookuper lookuper; + ASSERT_OK(table.NewLookup().CreateLookuper(lookuper)); + + // Verify lookup results + for (const auto& d : test_data) { + fluss::GenericRow key(3); + key.SetInt32(0, d.id); + + fluss::LookupResult result; + ASSERT_OK(lookuper.Lookup(key, result)); + ASSERT_TRUE(result.Found()) << "Row with id=" << d.id << " should exist"; + + EXPECT_EQ(result.GetInt32(0), d.id) << "id mismatch"; + EXPECT_EQ(result.GetString(1), d.name) << "name mismatch"; + EXPECT_EQ(result.GetInt64(2), d.age) << "age mismatch"; + } + + // Update record with id=1 (await acknowledgment) + { + fluss::GenericRow updated_row(3); + updated_row.SetInt32(0, 1); + updated_row.SetString(1, "Verso"); + updated_row.SetInt64(2, 33); + fluss::WriteResult wr; + ASSERT_OK(upsert_writer.Upsert(updated_row, wr)); + ASSERT_OK(wr.Wait()); + } + + // Verify the update + { + fluss::GenericRow key(3); + key.SetInt32(0, 1); + fluss::LookupResult result; + ASSERT_OK(lookuper.Lookup(key, result)); + ASSERT_TRUE(result.Found()); + EXPECT_EQ(result.GetInt64(2), 33) << "Age should be updated"; + EXPECT_EQ(result.GetString(1), "Verso") << "Name should remain unchanged"; + } + + // Delete record with id=1 (await acknowledgment) + { + fluss::GenericRow delete_row(3); + delete_row.SetInt32(0, 1); + fluss::WriteResult wr; + ASSERT_OK(upsert_writer.Delete(delete_row, wr)); + ASSERT_OK(wr.Wait()); + } + + // Verify deletion + { + fluss::GenericRow key(3); + key.SetInt32(0, 1); + fluss::LookupResult result; + ASSERT_OK(lookuper.Lookup(key, result)); + ASSERT_FALSE(result.Found()) << "Record 1 should not exist after delete"; + } + + // Verify other records still exist + for (int id : {2, 3}) { + fluss::GenericRow key(3); + key.SetInt32(0, id); + fluss::LookupResult result; + ASSERT_OK(lookuper.Lookup(key, result)); + ASSERT_TRUE(result.Found()) << "Record " << id + << " should still exist after deleting record 1"; + } + + // Lookup non-existent key + { + fluss::GenericRow key(3); + key.SetInt32(0, 999); + fluss::LookupResult result; + ASSERT_OK(lookuper.Lookup(key, result)); + ASSERT_FALSE(result.Found()) << "Non-existent key should return not found"; + } + + ASSERT_OK(adm.DropTable(table_path, false)); +} + +TEST_F(KvTableTest, CompositePrimaryKeys) { + auto& adm = admin(); + auto& conn = connection(); + + fluss::TablePath table_path("fluss", "test_composite_pk_cpp"); + + auto schema = fluss::Schema::NewBuilder() + .AddColumn("region", fluss::DataType::String()) + .AddColumn("score", fluss::DataType::BigInt()) + .AddColumn("user_id", fluss::DataType::Int()) + .SetPrimaryKeys({"region", "user_id"}) + .Build(); + + auto table_descriptor = fluss::TableDescriptor::NewBuilder() + .SetSchema(schema) + .SetProperty("table.replication.factor", "1") + .Build(); + + fluss_test::CreateTable(adm, table_path, table_descriptor); + + fluss::Table table; + ASSERT_OK(conn.GetTable(table_path, table)); + + auto table_upsert = table.NewUpsert(); + fluss::UpsertWriter upsert_writer; + ASSERT_OK(table_upsert.CreateWriter(upsert_writer)); + + // Insert records with composite keys + struct TestData { + std::string region; + int32_t user_id; + int64_t score; + }; + std::vector test_data = { + {"US", 1, 100}, {"US", 2, 200}, {"EU", 1, 150}, {"EU", 2, 250}}; + + for (const auto& d : test_data) { + auto row = table.NewRow(); + row.Set("region", d.region); + row.Set("score", d.score); + row.Set("user_id", d.user_id); + ASSERT_OK(upsert_writer.Upsert(row)); + } + ASSERT_OK(upsert_writer.Flush()); + + // Create lookuper + fluss::Lookuper lookuper; + ASSERT_OK(table.NewLookup().CreateLookuper(lookuper)); + + // Lookup (US, 1) - should return score 100 + { + auto key = table.NewRow(); + key.Set("region", "US"); + key.Set("user_id", 1); + fluss::LookupResult result; + ASSERT_OK(lookuper.Lookup(key, result)); + ASSERT_TRUE(result.Found()); + EXPECT_EQ(result.GetInt64("score"), 100) << "Score for (US, 1) should be 100"; + } + + // Lookup (EU, 2) - should return score 250 + { + auto key = table.NewRow(); + key.Set("region", "EU"); + key.Set("user_id", 2); + fluss::LookupResult result; + ASSERT_OK(lookuper.Lookup(key, result)); + ASSERT_TRUE(result.Found()); + EXPECT_EQ(result.GetInt64("score"), 250) << "Score for (EU, 2) should be 250"; + } + + // Update (US, 1) score (await acknowledgment) + { + auto update_row = table.NewRow(); + update_row.Set("region", "US"); + update_row.Set("user_id", 1); + update_row.Set("score", static_cast(500)); + fluss::WriteResult wr; + ASSERT_OK(upsert_writer.Upsert(update_row, wr)); + ASSERT_OK(wr.Wait()); + } + + // Verify update + { + auto key = table.NewRow(); + key.Set("region", "US"); + key.Set("user_id", 1); + fluss::LookupResult result; + ASSERT_OK(lookuper.Lookup(key, result)); + ASSERT_TRUE(result.Found()); + EXPECT_EQ(result.GetInt64("score"), 500) << "Row score should be updated"; + } + + ASSERT_OK(adm.DropTable(table_path, false)); +} + +TEST_F(KvTableTest, PartialUpdate) { + auto& adm = admin(); + auto& conn = connection(); + + fluss::TablePath table_path("fluss", "test_partial_update_cpp"); + + auto schema = fluss::Schema::NewBuilder() + .AddColumn("id", fluss::DataType::Int()) + .AddColumn("name", fluss::DataType::String()) + .AddColumn("age", fluss::DataType::BigInt()) + .AddColumn("score", fluss::DataType::BigInt()) + .SetPrimaryKeys({"id"}) + .Build(); + + auto table_descriptor = fluss::TableDescriptor::NewBuilder() + .SetSchema(schema) + .SetProperty("table.replication.factor", "1") + .Build(); + + fluss_test::CreateTable(adm, table_path, table_descriptor); + + fluss::Table table; + ASSERT_OK(conn.GetTable(table_path, table)); + + // Insert initial record with all columns + auto table_upsert = table.NewUpsert(); + fluss::UpsertWriter upsert_writer; + ASSERT_OK(table_upsert.CreateWriter(upsert_writer)); + + { + fluss::GenericRow row(4); + row.SetInt32(0, 1); + row.SetString(1, "Verso"); + row.SetInt64(2, 32); + row.SetInt64(3, 6942); + fluss::WriteResult wr; + ASSERT_OK(upsert_writer.Upsert(row, wr)); + ASSERT_OK(wr.Wait()); + } + + // Verify initial record + fluss::Lookuper lookuper; + ASSERT_OK(table.NewLookup().CreateLookuper(lookuper)); + + { + fluss::GenericRow key(4); + key.SetInt32(0, 1); + fluss::LookupResult result; + ASSERT_OK(lookuper.Lookup(key, result)); + ASSERT_TRUE(result.Found()); + EXPECT_EQ(result.GetInt32(0), 1); + EXPECT_EQ(result.GetString(1), "Verso"); + EXPECT_EQ(result.GetInt64(2), 32); + EXPECT_EQ(result.GetInt64(3), 6942); + } + + // Create partial update writer to update only score column + auto partial_upsert = table.NewUpsert(); + partial_upsert.PartialUpdateByName({"id", "score"}); + fluss::UpsertWriter partial_writer; + ASSERT_OK(partial_upsert.CreateWriter(partial_writer)); + + // Update only the score column (await acknowledgment) + { + fluss::GenericRow partial_row(4); + partial_row.SetInt32(0, 1); + partial_row.SetNull(1); // not in partial update + partial_row.SetNull(2); // not in partial update + partial_row.SetInt64(3, 420); + fluss::WriteResult wr; + ASSERT_OK(partial_writer.Upsert(partial_row, wr)); + ASSERT_OK(wr.Wait()); + } + + // Verify partial update - name and age should remain unchanged + { + fluss::GenericRow key(4); + key.SetInt32(0, 1); + fluss::LookupResult result; + ASSERT_OK(lookuper.Lookup(key, result)); + ASSERT_TRUE(result.Found()); + EXPECT_EQ(result.GetInt32(0), 1) << "id should remain 1"; + EXPECT_EQ(result.GetString(1), "Verso") << "name should remain unchanged"; + EXPECT_EQ(result.GetInt64(2), 32) << "age should remain unchanged"; + EXPECT_EQ(result.GetInt64(3), 420) << "score should be updated to 420"; + } + + ASSERT_OK(adm.DropTable(table_path, false)); +} + +TEST_F(KvTableTest, PartialUpdateByIndex) { + auto& adm = admin(); + auto& conn = connection(); + + fluss::TablePath table_path("fluss", "test_partial_update_by_index_cpp"); + + auto schema = fluss::Schema::NewBuilder() + .AddColumn("id", fluss::DataType::Int()) + .AddColumn("name", fluss::DataType::String()) + .AddColumn("age", fluss::DataType::BigInt()) + .AddColumn("score", fluss::DataType::BigInt()) + .SetPrimaryKeys({"id"}) + .Build(); + + auto table_descriptor = fluss::TableDescriptor::NewBuilder() + .SetSchema(schema) + .SetProperty("table.replication.factor", "1") + .Build(); + + fluss_test::CreateTable(adm, table_path, table_descriptor); + + fluss::Table table; + ASSERT_OK(conn.GetTable(table_path, table)); + + // Insert initial record with all columns + auto table_upsert = table.NewUpsert(); + fluss::UpsertWriter upsert_writer; + ASSERT_OK(table_upsert.CreateWriter(upsert_writer)); + + { + fluss::GenericRow row(4); + row.SetInt32(0, 1); + row.SetString(1, "Verso"); + row.SetInt64(2, 32); + row.SetInt64(3, 6942); + fluss::WriteResult wr; + ASSERT_OK(upsert_writer.Upsert(row, wr)); + ASSERT_OK(wr.Wait()); + } + + // Verify initial record + fluss::Lookuper lookuper; + ASSERT_OK(table.NewLookup().CreateLookuper(lookuper)); + + { + fluss::GenericRow key(4); + key.SetInt32(0, 1); + fluss::LookupResult result; + ASSERT_OK(lookuper.Lookup(key, result)); + ASSERT_TRUE(result.Found()); + EXPECT_EQ(result.GetInt32(0), 1); + EXPECT_EQ(result.GetString(1), "Verso"); + EXPECT_EQ(result.GetInt64(2), 32); + EXPECT_EQ(result.GetInt64(3), 6942); + } + + // Create partial update writer using column indices: 0 (id) and 3 (score) + auto partial_upsert = table.NewUpsert(); + partial_upsert.PartialUpdateByIndex({0, 3}); + fluss::UpsertWriter partial_writer; + ASSERT_OK(partial_upsert.CreateWriter(partial_writer)); + + // Update only the score column (await acknowledgment) + { + fluss::GenericRow partial_row(4); + partial_row.SetInt32(0, 1); + partial_row.SetNull(1); // not in partial update + partial_row.SetNull(2); // not in partial update + partial_row.SetInt64(3, 420); + fluss::WriteResult wr; + ASSERT_OK(partial_writer.Upsert(partial_row, wr)); + ASSERT_OK(wr.Wait()); + } + + // Verify partial update - name and age should remain unchanged + { + fluss::GenericRow key(4); + key.SetInt32(0, 1); + fluss::LookupResult result; + ASSERT_OK(lookuper.Lookup(key, result)); + ASSERT_TRUE(result.Found()); + EXPECT_EQ(result.GetInt32(0), 1) << "id should remain 1"; + EXPECT_EQ(result.GetString(1), "Verso") << "name should remain unchanged"; + EXPECT_EQ(result.GetInt64(2), 32) << "age should remain unchanged"; + EXPECT_EQ(result.GetInt64(3), 420) << "score should be updated to 420"; + } + + ASSERT_OK(adm.DropTable(table_path, false)); +} + +TEST_F(KvTableTest, PartitionedTableUpsertAndLookup) { + auto& adm = admin(); + auto& conn = connection(); + + fluss::TablePath table_path("fluss", "test_partitioned_kv_table_cpp"); + + // Create a partitioned KV table with region as partition key + auto schema = fluss::Schema::NewBuilder() + .AddColumn("region", fluss::DataType::String()) + .AddColumn("user_id", fluss::DataType::Int()) + .AddColumn("name", fluss::DataType::String()) + .AddColumn("score", fluss::DataType::BigInt()) + .SetPrimaryKeys({"region", "user_id"}) + .Build(); + + auto table_descriptor = fluss::TableDescriptor::NewBuilder() + .SetSchema(schema) + .SetPartitionKeys({"region"}) + .SetProperty("table.replication.factor", "1") + .Build(); + + fluss_test::CreateTable(adm, table_path, table_descriptor); + + // Create partitions + fluss_test::CreatePartitions(adm, table_path, "region", {"US", "EU", "APAC"}); + + fluss::Table table; + ASSERT_OK(conn.GetTable(table_path, table)); + + auto table_upsert = table.NewUpsert(); + fluss::UpsertWriter upsert_writer; + ASSERT_OK(table_upsert.CreateWriter(upsert_writer)); + + // Insert records with different partitions + struct TestData { + std::string region; + int32_t user_id; + std::string name; + int64_t score; + }; + std::vector test_data = {{"US", 1, "Gustave", 100}, {"US", 2, "Lune", 200}, + {"EU", 1, "Sciel", 150}, {"EU", 2, "Maelle", 250}, + {"APAC", 1, "Noco", 300}}; + + for (const auto& d : test_data) { + fluss::GenericRow row(4); + row.SetString(0, d.region); + row.SetInt32(1, d.user_id); + row.SetString(2, d.name); + row.SetInt64(3, d.score); + ASSERT_OK(upsert_writer.Upsert(row)); + } + ASSERT_OK(upsert_writer.Flush()); + + // Create lookuper + fluss::Lookuper lookuper; + ASSERT_OK(table.NewLookup().CreateLookuper(lookuper)); + + // Lookup records + for (const auto& d : test_data) { + fluss::GenericRow key(4); + key.SetString(0, d.region); + key.SetInt32(1, d.user_id); + + fluss::LookupResult result; + ASSERT_OK(lookuper.Lookup(key, result)); + ASSERT_TRUE(result.Found()); + + EXPECT_EQ(std::string(result.GetString(0)), d.region) << "region mismatch"; + EXPECT_EQ(result.GetInt32(1), d.user_id) << "user_id mismatch"; + EXPECT_EQ(std::string(result.GetString(2)), d.name) << "name mismatch"; + EXPECT_EQ(result.GetInt64(3), d.score) << "score mismatch"; + } + + // Update within a partition (await acknowledgment) + { + fluss::GenericRow updated_row(4); + updated_row.SetString(0, "US"); + updated_row.SetInt32(1, 1); + updated_row.SetString(2, "Gustave Updated"); + updated_row.SetInt64(3, 999); + fluss::WriteResult wr; + ASSERT_OK(upsert_writer.Upsert(updated_row, wr)); + ASSERT_OK(wr.Wait()); + } + + // Verify the update + { + fluss::GenericRow key(4); + key.SetString(0, "US"); + key.SetInt32(1, 1); + fluss::LookupResult result; + ASSERT_OK(lookuper.Lookup(key, result)); + ASSERT_TRUE(result.Found()); + EXPECT_EQ(std::string(result.GetString(2)), "Gustave Updated"); + EXPECT_EQ(result.GetInt64(3), 999); + } + + // Lookup in non-existent partition should return not found + { + fluss::GenericRow key(4); + key.SetString(0, "UNKNOWN_REGION"); + key.SetInt32(1, 1); + fluss::LookupResult result; + ASSERT_OK(lookuper.Lookup(key, result)); + ASSERT_FALSE(result.Found()) << "Lookup in non-existent partition should return not found"; + } + + // Delete a record within a partition (await acknowledgment) + { + fluss::GenericRow delete_key(4); + delete_key.SetString(0, "EU"); + delete_key.SetInt32(1, 1); + fluss::WriteResult wr; + ASSERT_OK(upsert_writer.Delete(delete_key, wr)); + ASSERT_OK(wr.Wait()); + } + + // Verify deletion + { + fluss::GenericRow key(4); + key.SetString(0, "EU"); + key.SetInt32(1, 1); + fluss::LookupResult result; + ASSERT_OK(lookuper.Lookup(key, result)); + ASSERT_FALSE(result.Found()) << "Deleted record should not exist"; + } + + // Verify other records in same partition still exist + { + fluss::GenericRow key(4); + key.SetString(0, "EU"); + key.SetInt32(1, 2); + fluss::LookupResult result; + ASSERT_OK(lookuper.Lookup(key, result)); + ASSERT_TRUE(result.Found()); + EXPECT_EQ(std::string(result.GetString(2)), "Maelle"); + } + + ASSERT_OK(adm.DropTable(table_path, false)); +} + +TEST_F(KvTableTest, AllSupportedDatatypes) { + auto& adm = admin(); + auto& conn = connection(); + + fluss::TablePath table_path("fluss", "test_all_datatypes_cpp"); + + // Create a table with all supported datatypes + auto schema = fluss::Schema::NewBuilder() + .AddColumn("pk_int", fluss::DataType::Int()) + .AddColumn("col_boolean", fluss::DataType::Boolean()) + .AddColumn("col_tinyint", fluss::DataType::TinyInt()) + .AddColumn("col_smallint", fluss::DataType::SmallInt()) + .AddColumn("col_int", fluss::DataType::Int()) + .AddColumn("col_bigint", fluss::DataType::BigInt()) + .AddColumn("col_float", fluss::DataType::Float()) + .AddColumn("col_double", fluss::DataType::Double()) + .AddColumn("col_char", fluss::DataType::Char(10)) + .AddColumn("col_string", fluss::DataType::String()) + .AddColumn("col_decimal", fluss::DataType::Decimal(10, 2)) + .AddColumn("col_date", fluss::DataType::Date()) + .AddColumn("col_time", fluss::DataType::Time()) + .AddColumn("col_timestamp", fluss::DataType::Timestamp()) + .AddColumn("col_timestamp_ltz", fluss::DataType::TimestampLtz()) + .AddColumn("col_bytes", fluss::DataType::Bytes()) + .AddColumn("col_binary", fluss::DataType::Binary(20)) + .SetPrimaryKeys({"pk_int"}) + .Build(); + + auto table_descriptor = fluss::TableDescriptor::NewBuilder() + .SetSchema(schema) + .SetProperty("table.replication.factor", "1") + .Build(); + + fluss_test::CreateTable(adm, table_path, table_descriptor); + + fluss::Table table; + ASSERT_OK(conn.GetTable(table_path, table)); + + auto table_upsert = table.NewUpsert(); + fluss::UpsertWriter upsert_writer; + ASSERT_OK(table_upsert.CreateWriter(upsert_writer)); + + // Test data + int32_t pk_int = 1; + bool col_boolean = true; + int32_t col_tinyint = 127; + int32_t col_smallint = 32767; + int32_t col_int = 2147483647; + int64_t col_bigint = 9223372036854775807LL; + float col_float = 3.14f; + double col_double = 2.718281828459045; + std::string col_char = "hello"; + std::string col_string = "world of fluss rust client"; + std::string col_decimal = "123.45"; + auto col_date = fluss::Date::FromDays(20476); // 2026-01-23 + auto col_time = fluss::Time::FromMillis(36827000); // 10:13:47 + auto col_timestamp = fluss::Timestamp::FromMillis(1769163227123); // 2026-01-23 10:13:47.123 + auto col_timestamp_ltz = fluss::Timestamp::FromMillis(1769163227123); + std::vector col_bytes = {'b', 'i', 'n', 'a', 'r', 'y', ' ', 'd', 'a', 't', 'a'}; + std::vector col_binary = {'f', 'i', 'x', 'e', 'd', ' ', 'b', 'i', 'n', 'a', + 'r', 'y', ' ', 'd', 'a', 't', 'a', '!', '!', '!'}; + + // Upsert a row with all datatypes + { + fluss::GenericRow row(17); + row.SetInt32(0, pk_int); + row.SetBool(1, col_boolean); + row.SetInt32(2, col_tinyint); + row.SetInt32(3, col_smallint); + row.SetInt32(4, col_int); + row.SetInt64(5, col_bigint); + row.SetFloat32(6, col_float); + row.SetFloat64(7, col_double); + row.SetString(8, col_char); + row.SetString(9, col_string); + row.SetDecimal(10, col_decimal); + row.SetDate(11, col_date); + row.SetTime(12, col_time); + row.SetTimestampNtz(13, col_timestamp); + row.SetTimestampLtz(14, col_timestamp_ltz); + row.SetBytes(15, col_bytes); + row.SetBytes(16, col_binary); + fluss::WriteResult wr; + ASSERT_OK(upsert_writer.Upsert(row, wr)); + ASSERT_OK(wr.Wait()); + } + + // Lookup the record + fluss::Lookuper lookuper; + ASSERT_OK(table.NewLookup().CreateLookuper(lookuper)); + + { + fluss::GenericRow key(17); + key.SetInt32(0, pk_int); + + fluss::LookupResult result; + ASSERT_OK(lookuper.Lookup(key, result)); + ASSERT_TRUE(result.Found()); + + // Verify all datatypes + EXPECT_EQ(result.GetInt32(0), pk_int) << "pk_int mismatch"; + EXPECT_EQ(result.GetBool(1), col_boolean) << "col_boolean mismatch"; + EXPECT_EQ(result.GetInt32(2), col_tinyint) << "col_tinyint mismatch"; + EXPECT_EQ(result.GetInt32(3), col_smallint) << "col_smallint mismatch"; + EXPECT_EQ(result.GetInt32(4), col_int) << "col_int mismatch"; + EXPECT_EQ(result.GetInt64(5), col_bigint) << "col_bigint mismatch"; + EXPECT_NEAR(result.GetFloat32(6), col_float, 1e-6f) << "col_float mismatch"; + EXPECT_NEAR(result.GetFloat64(7), col_double, 1e-15) << "col_double mismatch"; + EXPECT_EQ(result.GetString(8), col_char) << "col_char mismatch"; + EXPECT_EQ(result.GetString(9), col_string) << "col_string mismatch"; + EXPECT_EQ(result.GetDecimalString(10), col_decimal) << "col_decimal mismatch"; + EXPECT_EQ(result.GetDate(11).days_since_epoch, col_date.days_since_epoch) << "col_date mismatch"; + EXPECT_EQ(result.GetTime(12).millis_since_midnight, col_time.millis_since_midnight) << "col_time mismatch"; + EXPECT_EQ(result.GetTimestamp(13).epoch_millis, col_timestamp.epoch_millis) + << "col_timestamp mismatch"; + EXPECT_EQ(result.GetTimestamp(14).epoch_millis, col_timestamp_ltz.epoch_millis) + << "col_timestamp_ltz mismatch"; + + auto [bytes_ptr, bytes_len] = result.GetBytes(15); + EXPECT_EQ(bytes_len, col_bytes.size()) << "col_bytes length mismatch"; + EXPECT_TRUE(std::memcmp(bytes_ptr, col_bytes.data(), bytes_len) == 0) + << "col_bytes mismatch"; + + auto [binary_ptr, binary_len] = result.GetBytes(16); + EXPECT_EQ(binary_len, col_binary.size()) << "col_binary length mismatch"; + EXPECT_TRUE(std::memcmp(binary_ptr, col_binary.data(), binary_len) == 0) + << "col_binary mismatch"; + } + + // Test with null values for nullable columns + { + fluss::GenericRow row_with_nulls(17); + row_with_nulls.SetInt32(0, 2); // pk_int = 2 + for (size_t i = 1; i < 17; ++i) { + row_with_nulls.SetNull(i); + } + fluss::WriteResult wr; + ASSERT_OK(upsert_writer.Upsert(row_with_nulls, wr)); + ASSERT_OK(wr.Wait()); + } + + // Lookup row with nulls + { + fluss::GenericRow key(17); + key.SetInt32(0, 2); + + fluss::LookupResult result; + ASSERT_OK(lookuper.Lookup(key, result)); + ASSERT_TRUE(result.Found()); + + EXPECT_EQ(result.GetInt32(0), 2) << "pk_int mismatch"; + for (size_t i = 1; i < 17; ++i) { + EXPECT_TRUE(result.IsNull(i)) << "column " << i << " should be null"; + } + } + + ASSERT_OK(adm.DropTable(table_path, false)); +} diff --git a/bindings/cpp/test/test_log_table.cpp b/bindings/cpp/test/test_log_table.cpp new file mode 100644 index 00000000..47ab6f25 --- /dev/null +++ b/bindings/cpp/test/test_log_table.cpp @@ -0,0 +1,831 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include +#include + +#include +#include +#include +#include + +#include "test_utils.h" + +class LogTableTest : public ::testing::Test { + protected: + fluss::Admin& admin() { return fluss_test::FlussTestEnvironment::Instance()->GetAdmin(); } + + fluss::Connection& connection() { + return fluss_test::FlussTestEnvironment::Instance()->GetConnection(); + } +}; + +TEST_F(LogTableTest, AppendRecordBatchAndScan) { + auto& adm = admin(); + auto& conn = connection(); + + fluss::TablePath table_path("fluss", "test_append_record_batch_and_scan_cpp"); + + auto schema = fluss::Schema::NewBuilder() + .AddColumn("c1", fluss::DataType::Int()) + .AddColumn("c2", fluss::DataType::String()) + .Build(); + + auto table_descriptor = fluss::TableDescriptor::NewBuilder() + .SetSchema(schema) + .SetBucketCount(3) + .SetBucketKeys({"c1"}) + .SetProperty("table.replication.factor", "1") + .Build(); + + fluss_test::CreateTable(adm, table_path, table_descriptor); + + fluss::Table table; + ASSERT_OK(conn.GetTable(table_path, table)); + + // Create append writer + auto table_append = table.NewAppend(); + fluss::AppendWriter append_writer; + ASSERT_OK(table_append.CreateWriter(append_writer)); + + // Append Arrow record batches + { + auto c1 = arrow::Int32Builder(); + c1.AppendValues({1, 2, 3}).ok(); + auto c2 = arrow::StringBuilder(); + c2.AppendValues({"a1", "a2", "a3"}).ok(); + + auto batch = arrow::RecordBatch::Make( + arrow::schema({arrow::field("c1", arrow::int32()), arrow::field("c2", arrow::utf8())}), + 3, {c1.Finish().ValueOrDie(), c2.Finish().ValueOrDie()}); + + ASSERT_OK(append_writer.AppendArrowBatch(batch)); + } + + { + auto c1 = arrow::Int32Builder(); + c1.AppendValues({4, 5, 6}).ok(); + auto c2 = arrow::StringBuilder(); + c2.AppendValues({"a4", "a5", "a6"}).ok(); + + auto batch = arrow::RecordBatch::Make( + arrow::schema({arrow::field("c1", arrow::int32()), arrow::field("c2", arrow::utf8())}), + 3, {c1.Finish().ValueOrDie(), c2.Finish().ValueOrDie()}); + + ASSERT_OK(append_writer.AppendArrowBatch(batch)); + } + + ASSERT_OK(append_writer.Flush()); + + // Create scanner and subscribe to all 3 buckets + fluss::Table scan_table; + ASSERT_OK(conn.GetTable(table_path, scan_table)); + int32_t num_buckets = scan_table.GetTableInfo().num_buckets; + ASSERT_EQ(num_buckets, 3) << "Table should have 3 buckets"; + + auto table_scan = scan_table.NewScan(); + fluss::LogScanner log_scanner; + ASSERT_OK(table_scan.CreateLogScanner(log_scanner)); + + for (int32_t bucket_id = 0; bucket_id < num_buckets; ++bucket_id) { + ASSERT_OK(log_scanner.Subscribe(bucket_id, fluss::EARLIEST_OFFSET)); + } + + // Poll for records across all buckets + std::vector> records; + fluss_test::PollRecords(log_scanner, 6, [](const fluss::ScanRecord& rec) { + return std::make_pair(rec.row.GetInt32(0), std::string(rec.row.GetString(1))); + }, records); + ASSERT_EQ(records.size(), 6u) << "Expected 6 records"; + std::sort(records.begin(), records.end()); + + std::vector> expected = { + {1, "a1"}, {2, "a2"}, {3, "a3"}, {4, "a4"}, {5, "a5"}, {6, "a6"}}; + EXPECT_EQ(records, expected); + + // Verify per-bucket iteration via BucketView + { + fluss::Table bucket_table; + ASSERT_OK(conn.GetTable(table_path, bucket_table)); + auto bucket_scan = bucket_table.NewScan(); + fluss::LogScanner bucket_scanner; + ASSERT_OK(bucket_scan.CreateLogScanner(bucket_scanner)); + + for (int32_t bid = 0; bid < num_buckets; ++bid) { + ASSERT_OK(bucket_scanner.Subscribe(bid, fluss::EARLIEST_OFFSET)); + } + + std::vector> bucket_records; + auto bucket_deadline = std::chrono::steady_clock::now() + std::chrono::seconds(10); + size_t buckets_with_data = 0; + while (bucket_records.size() < 6 && std::chrono::steady_clock::now() < bucket_deadline) { + fluss::ScanRecords scan_records; + ASSERT_OK(bucket_scanner.Poll(500, scan_records)); + + // Iterate by bucket + for (size_t b = 0; b < scan_records.BucketCount(); ++b) { + auto bucket_view = scan_records.BucketAt(b); + if (!bucket_view.Empty()) { + buckets_with_data++; + } + for (auto rec : bucket_view) { + bucket_records.emplace_back(rec.row.GetInt32(0), + std::string(rec.row.GetString(1))); + } + } + } + + ASSERT_EQ(bucket_records.size(), 6u) << "Expected 6 records via per-bucket iteration"; + EXPECT_GT(buckets_with_data, 1u) << "Records should be distributed across multiple buckets"; + + std::sort(bucket_records.begin(), bucket_records.end()); + EXPECT_EQ(bucket_records, expected); + } + + // Test unsubscribe + ASSERT_OK(log_scanner.Unsubscribe(0)); + + // Verify unsubscribe_partition fails on a non-partitioned table + auto unsub_result = log_scanner.UnsubscribePartition(0, 0); + ASSERT_FALSE(unsub_result.Ok()) + << "unsubscribe_partition should fail on a non-partitioned table"; + + ASSERT_OK(adm.DropTable(table_path, false)); +} + +TEST_F(LogTableTest, ListOffsets) { + auto& adm = admin(); + auto& conn = connection(); + + fluss::TablePath table_path("fluss", "test_list_offsets_cpp"); + + auto schema = fluss::Schema::NewBuilder() + .AddColumn("id", fluss::DataType::Int()) + .AddColumn("name", fluss::DataType::String()) + .Build(); + + auto table_descriptor = fluss::TableDescriptor::NewBuilder() + .SetSchema(schema) + .SetProperty("table.replication.factor", "1") + .Build(); + + fluss_test::CreateTable(adm, table_path, table_descriptor); + + // Wait for table initialization + std::this_thread::sleep_for(std::chrono::seconds(2)); + + // Earliest offset should be 0 for empty table + std::unordered_map earliest_offsets; + ASSERT_OK(adm.ListOffsets(table_path, {0}, fluss::OffsetSpec::Earliest(), earliest_offsets)); + EXPECT_EQ(earliest_offsets[0], 0) << "Earliest offset should be 0 for bucket 0"; + + // Latest offset should be 0 for empty table + std::unordered_map latest_offsets; + ASSERT_OK(adm.ListOffsets(table_path, {0}, fluss::OffsetSpec::Latest(), latest_offsets)); + EXPECT_EQ(latest_offsets[0], 0) << "Latest offset should be 0 for empty table"; + + auto before_append_ms = + std::chrono::duration_cast( + std::chrono::system_clock::now().time_since_epoch()) + .count(); + + // Append records + fluss::Table table; + ASSERT_OK(conn.GetTable(table_path, table)); + auto table_append = table.NewAppend(); + fluss::AppendWriter append_writer; + ASSERT_OK(table_append.CreateWriter(append_writer)); + + { + auto id_builder = arrow::Int32Builder(); + id_builder.AppendValues({1, 2, 3}).ok(); + auto name_builder = arrow::StringBuilder(); + name_builder.AppendValues({"alice", "bob", "charlie"}).ok(); + + auto batch = arrow::RecordBatch::Make( + arrow::schema( + {arrow::field("id", arrow::int32()), arrow::field("name", arrow::utf8())}), + 3, {id_builder.Finish().ValueOrDie(), name_builder.Finish().ValueOrDie()}); + + ASSERT_OK(append_writer.AppendArrowBatch(batch)); + } + ASSERT_OK(append_writer.Flush()); + + std::this_thread::sleep_for(std::chrono::seconds(1)); + + auto after_append_ms = + std::chrono::duration_cast( + std::chrono::system_clock::now().time_since_epoch()) + .count(); + + // Latest offset after appending should be 3 + std::unordered_map latest_after; + ASSERT_OK(adm.ListOffsets(table_path, {0}, fluss::OffsetSpec::Latest(), latest_after)); + EXPECT_EQ(latest_after[0], 3) << "Latest offset should be 3 after appending 3 records"; + + // Earliest offset should still be 0 + std::unordered_map earliest_after; + ASSERT_OK(adm.ListOffsets(table_path, {0}, fluss::OffsetSpec::Earliest(), earliest_after)); + EXPECT_EQ(earliest_after[0], 0) << "Earliest offset should still be 0"; + + // Timestamp before append should resolve to offset 0 + std::unordered_map ts_offsets; + ASSERT_OK(adm.ListOffsets(table_path, {0}, fluss::OffsetSpec::Timestamp(before_append_ms), + ts_offsets)); + EXPECT_EQ(ts_offsets[0], 0) + << "Timestamp before append should resolve to offset 0"; + + // Timestamp after append should resolve to offset 3 + std::unordered_map ts_after_offsets; + ASSERT_OK(adm.ListOffsets(table_path, {0}, fluss::OffsetSpec::Timestamp(after_append_ms), + ts_after_offsets)); + EXPECT_EQ(ts_after_offsets[0], 3) + << "Timestamp after append should resolve to offset 3"; + + ASSERT_OK(adm.DropTable(table_path, false)); +} + +TEST_F(LogTableTest, TestProject) { + auto& adm = admin(); + auto& conn = connection(); + + fluss::TablePath table_path("fluss", "test_project_cpp"); + + auto schema = fluss::Schema::NewBuilder() + .AddColumn("col_a", fluss::DataType::Int()) + .AddColumn("col_b", fluss::DataType::String()) + .AddColumn("col_c", fluss::DataType::Int()) + .Build(); + + auto table_descriptor = fluss::TableDescriptor::NewBuilder() + .SetSchema(schema) + .SetProperty("table.replication.factor", "1") + .Build(); + + fluss_test::CreateTable(adm, table_path, table_descriptor); + + fluss::Table table; + ASSERT_OK(conn.GetTable(table_path, table)); + + // Append 3 records + auto table_append = table.NewAppend(); + fluss::AppendWriter append_writer; + ASSERT_OK(table_append.CreateWriter(append_writer)); + + { + auto col_a_builder = arrow::Int32Builder(); + col_a_builder.AppendValues({1, 2, 3}).ok(); + auto col_b_builder = arrow::StringBuilder(); + col_b_builder.AppendValues({"x", "y", "z"}).ok(); + auto col_c_builder = arrow::Int32Builder(); + col_c_builder.AppendValues({10, 20, 30}).ok(); + + auto batch = arrow::RecordBatch::Make( + arrow::schema({arrow::field("col_a", arrow::int32()), + arrow::field("col_b", arrow::utf8()), + arrow::field("col_c", arrow::int32())}), + 3, + {col_a_builder.Finish().ValueOrDie(), col_b_builder.Finish().ValueOrDie(), + col_c_builder.Finish().ValueOrDie()}); + + ASSERT_OK(append_writer.AppendArrowBatch(batch)); + } + ASSERT_OK(append_writer.Flush()); + + // Test project_by_name: select col_b and col_c only + { + fluss::Table proj_table; + ASSERT_OK(conn.GetTable(table_path, proj_table)); + auto scan = proj_table.NewScan(); + scan.ProjectByName({"col_b", "col_c"}); + fluss::LogScanner scanner; + ASSERT_OK(scan.CreateLogScanner(scanner)); + + ASSERT_OK(scanner.Subscribe(0, 0)); + + fluss::ScanRecords records; + ASSERT_OK(scanner.Poll(10000, records)); + + ASSERT_EQ(records.Count(), 3u) << "Should have 3 records with project_by_name"; + + std::vector expected_col_b = {"x", "y", "z"}; + std::vector expected_col_c = {10, 20, 30}; + + // Collect and sort by col_c to get deterministic order + std::vector> collected; + for (auto rec : records) { + collected.emplace_back(std::string(rec.row.GetString(0)), rec.row.GetInt32(1)); + } + std::sort(collected.begin(), collected.end(), + [](const auto& a, const auto& b) { return a.second < b.second; }); + + for (size_t i = 0; i < 3; ++i) { + EXPECT_EQ(collected[i].first, expected_col_b[i]) << "col_b mismatch at index " << i; + EXPECT_EQ(collected[i].second, expected_col_c[i]) << "col_c mismatch at index " << i; + } + } + + // Test project by column indices: select col_b (1) and col_a (0) in that order + { + fluss::Table proj_table; + ASSERT_OK(conn.GetTable(table_path, proj_table)); + auto scan = proj_table.NewScan(); + scan.ProjectByIndex({1, 0}); + fluss::LogScanner scanner; + ASSERT_OK(scan.CreateLogScanner(scanner)); + + ASSERT_OK(scanner.Subscribe(0, 0)); + + fluss::ScanRecords records; + ASSERT_OK(scanner.Poll(10000, records)); + + ASSERT_EQ(records.Count(), 3u); + + std::vector expected_col_b = {"x", "y", "z"}; + std::vector expected_col_a = {1, 2, 3}; + + std::vector> collected; + for (auto rec : records) { + collected.emplace_back(std::string(rec.row.GetString(0)), rec.row.GetInt32(1)); + } + std::sort(collected.begin(), collected.end(), + [](const auto& a, const auto& b) { return a.second < b.second; }); + + for (size_t i = 0; i < 3; ++i) { + EXPECT_EQ(collected[i].first, expected_col_b[i]) << "col_b mismatch at index " << i; + EXPECT_EQ(collected[i].second, expected_col_a[i]) << "col_a mismatch at index " << i; + } + } + + ASSERT_OK(adm.DropTable(table_path, false)); +} + +TEST_F(LogTableTest, TestPollBatches) { + auto& adm = admin(); + auto& conn = connection(); + + fluss::TablePath table_path("fluss", "test_poll_batches_cpp"); + + auto schema = fluss::Schema::NewBuilder() + .AddColumn("id", fluss::DataType::Int()) + .AddColumn("name", fluss::DataType::String()) + .Build(); + + auto table_descriptor = fluss::TableDescriptor::NewBuilder() + .SetSchema(schema) + .SetProperty("table.replication.factor", "1") + .Build(); + + fluss_test::CreateTable(adm, table_path, table_descriptor); + + std::this_thread::sleep_for(std::chrono::seconds(1)); + + fluss::Table table; + ASSERT_OK(conn.GetTable(table_path, table)); + + auto scan = table.NewScan(); + fluss::LogScanner scanner; + ASSERT_OK(scan.CreateRecordBatchLogScanner(scanner)); + ASSERT_OK(scanner.Subscribe(0, 0)); + + // Test 1: Empty table should return empty result + { + fluss::ArrowRecordBatches batches; + ASSERT_OK(scanner.PollRecordBatch(500, batches)); + ASSERT_TRUE(batches.Empty()); + } + + // Append data + auto table_append = table.NewAppend(); + fluss::AppendWriter writer; + ASSERT_OK(table_append.CreateWriter(writer)); + + auto make_batch = [](std::vector ids, std::vector names) { + auto id_builder = arrow::Int32Builder(); + id_builder.AppendValues(ids).ok(); + auto name_builder = arrow::StringBuilder(); + name_builder.AppendValues(names).ok(); + return arrow::RecordBatch::Make( + arrow::schema( + {arrow::field("id", arrow::int32()), arrow::field("name", arrow::utf8())}), + static_cast(ids.size()), + {id_builder.Finish().ValueOrDie(), name_builder.Finish().ValueOrDie()}); + }; + + ASSERT_OK(writer.AppendArrowBatch(make_batch({1, 2}, {"a", "b"}))); + ASSERT_OK(writer.AppendArrowBatch(make_batch({3, 4}, {"c", "d"}))); + ASSERT_OK(writer.AppendArrowBatch(make_batch({5, 6}, {"e", "f"}))); + ASSERT_OK(writer.Flush()); + + // Extract ids from Arrow batches + auto extract_ids = [](const fluss::ArrowRecordBatches& batches) { + std::vector ids; + for (const auto& batch : batches) { + auto arr = + std::static_pointer_cast(batch->GetArrowRecordBatch()->column(0)); + for (int64_t i = 0; i < arr->length(); ++i) { + ids.push_back(arr->Value(i)); + } + } + return ids; + }; + + // Test 2: Poll until we get all 6 records + std::vector all_ids; + fluss_test::PollRecordBatches(scanner, 6, extract_ids, all_ids); + ASSERT_EQ(all_ids, (std::vector{1, 2, 3, 4, 5, 6})); + + // Test 3: Append more and verify offset continuation (no duplicates) + ASSERT_OK(writer.AppendArrowBatch(make_batch({7, 8}, {"g", "h"}))); + ASSERT_OK(writer.Flush()); + + std::vector new_ids; + fluss_test::PollRecordBatches(scanner, 2, extract_ids, new_ids); + ASSERT_EQ(new_ids, (std::vector{7, 8})); + + // Test 4: Subscribing from mid-offset should truncate batch + { + fluss::Table trunc_table; + ASSERT_OK(conn.GetTable(table_path, trunc_table)); + auto trunc_scan = trunc_table.NewScan(); + fluss::LogScanner trunc_scanner; + ASSERT_OK(trunc_scan.CreateRecordBatchLogScanner(trunc_scanner)); + ASSERT_OK(trunc_scanner.Subscribe(0, 3)); + + std::vector trunc_ids; + fluss_test::PollRecordBatches(trunc_scanner, 5, extract_ids, trunc_ids); + ASSERT_EQ(trunc_ids, (std::vector{4, 5, 6, 7, 8})); + } + + // Test 5: Projection should only return requested columns + { + fluss::Table proj_table; + ASSERT_OK(conn.GetTable(table_path, proj_table)); + auto proj_scan = proj_table.NewScan(); + proj_scan.ProjectByName({"id"}); + fluss::LogScanner proj_scanner; + ASSERT_OK(proj_scan.CreateRecordBatchLogScanner(proj_scanner)); + ASSERT_OK(proj_scanner.Subscribe(0, 0)); + + fluss::ArrowRecordBatches proj_batches; + ASSERT_OK(proj_scanner.PollRecordBatch(10000, proj_batches)); + + ASSERT_FALSE(proj_batches.Empty()); + EXPECT_EQ(proj_batches[0]->GetArrowRecordBatch()->num_columns(), 1) + << "Projected batch should have 1 column (id), not 2"; + } + + ASSERT_OK(adm.DropTable(table_path, false)); +} + +TEST_F(LogTableTest, AllSupportedDatatypes) { + auto& adm = admin(); + auto& conn = connection(); + + fluss::TablePath table_path("fluss", "test_log_all_datatypes_cpp"); + + // Create a log table with all supported datatypes + auto schema = + fluss::Schema::NewBuilder() + .AddColumn("col_tinyint", fluss::DataType::TinyInt()) + .AddColumn("col_smallint", fluss::DataType::SmallInt()) + .AddColumn("col_int", fluss::DataType::Int()) + .AddColumn("col_bigint", fluss::DataType::BigInt()) + .AddColumn("col_float", fluss::DataType::Float()) + .AddColumn("col_double", fluss::DataType::Double()) + .AddColumn("col_boolean", fluss::DataType::Boolean()) + .AddColumn("col_char", fluss::DataType::Char(10)) + .AddColumn("col_string", fluss::DataType::String()) + .AddColumn("col_decimal", fluss::DataType::Decimal(10, 2)) + .AddColumn("col_date", fluss::DataType::Date()) + .AddColumn("col_time", fluss::DataType::Time()) + .AddColumn("col_timestamp", fluss::DataType::Timestamp()) + .AddColumn("col_timestamp_ltz", fluss::DataType::TimestampLtz()) + .AddColumn("col_bytes", fluss::DataType::Bytes()) + .AddColumn("col_binary", fluss::DataType::Binary(4)) + .Build(); + + auto table_descriptor = fluss::TableDescriptor::NewBuilder() + .SetSchema(schema) + .SetProperty("table.replication.factor", "1") + .Build(); + + fluss_test::CreateTable(adm, table_path, table_descriptor); + + fluss::Table table; + ASSERT_OK(conn.GetTable(table_path, table)); + + size_t field_count = table.GetTableInfo().schema.columns.size(); + + auto table_append = table.NewAppend(); + fluss::AppendWriter append_writer; + ASSERT_OK(table_append.CreateWriter(append_writer)); + + // Test data + int32_t col_tinyint = 127; + int32_t col_smallint = 32767; + int32_t col_int = 2147483647; + int64_t col_bigint = 9223372036854775807LL; + float col_float = 3.14f; + double col_double = 2.718281828459045; + bool col_boolean = true; + std::string col_char = "hello"; + std::string col_string = "world of fluss rust client"; + std::string col_decimal = "123.45"; + auto col_date = fluss::Date::FromDays(20476); // 2026-01-23 + auto col_time = fluss::Time::FromMillis(36827000); // 10:13:47 + auto col_timestamp = fluss::Timestamp::FromMillisNanos(1769163227123, 456000); + auto col_timestamp_ltz = fluss::Timestamp::FromMillisNanos(1769163227123, 456000); + std::vector col_bytes = {'b', 'i', 'n', 'a', 'r', 'y', ' ', 'd', 'a', 't', 'a'}; + std::vector col_binary = {0xDE, 0xAD, 0xBE, 0xEF}; + + // Append a row with all datatypes + { + fluss::GenericRow row(field_count); + row.SetInt32(0, col_tinyint); + row.SetInt32(1, col_smallint); + row.SetInt32(2, col_int); + row.SetInt64(3, col_bigint); + row.SetFloat32(4, col_float); + row.SetFloat64(5, col_double); + row.SetBool(6, col_boolean); + row.SetString(7, col_char); + row.SetString(8, col_string); + row.SetDecimal(9, col_decimal); + row.SetDate(10, col_date); + row.SetTime(11, col_time); + row.SetTimestampNtz(12, col_timestamp); + row.SetTimestampLtz(13, col_timestamp_ltz); + row.SetBytes(14, col_bytes); + row.SetBytes(15, col_binary); + ASSERT_OK(append_writer.Append(row)); + } + + // Append a row with null values + { + fluss::GenericRow row_with_nulls(field_count); + for (size_t i = 0; i < field_count; ++i) { + row_with_nulls.SetNull(i); + } + ASSERT_OK(append_writer.Append(row_with_nulls)); + } + + ASSERT_OK(append_writer.Flush()); + + // Scan the records + fluss::Table scan_table; + ASSERT_OK(conn.GetTable(table_path, scan_table)); + auto table_scan = scan_table.NewScan(); + fluss::LogScanner log_scanner; + ASSERT_OK(table_scan.CreateLogScanner(log_scanner)); + ASSERT_OK(log_scanner.Subscribe(0, 0)); + + // Poll until we get 2 records + std::vector all_records; + fluss_test::PollRecords(log_scanner, 2, + [](const fluss::ScanRecord& rec) { return rec; }, all_records); + ASSERT_EQ(all_records.size(), 2u) << "Expected 2 records"; + + // Verify first record (all values) + auto& row = all_records[0].row; + + EXPECT_EQ(row.GetInt32(0), col_tinyint) << "col_tinyint mismatch"; + EXPECT_EQ(row.GetInt32(1), col_smallint) << "col_smallint mismatch"; + EXPECT_EQ(row.GetInt32(2), col_int) << "col_int mismatch"; + EXPECT_EQ(row.GetInt64(3), col_bigint) << "col_bigint mismatch"; + EXPECT_NEAR(row.GetFloat32(4), col_float, 1e-6f) << "col_float mismatch"; + EXPECT_NEAR(row.GetFloat64(5), col_double, 1e-15) << "col_double mismatch"; + EXPECT_EQ(row.GetBool(6), col_boolean) << "col_boolean mismatch"; + EXPECT_EQ(row.GetString(7), col_char) << "col_char mismatch"; + EXPECT_EQ(row.GetString(8), col_string) << "col_string mismatch"; + EXPECT_EQ(row.GetDecimalString(9), col_decimal) << "col_decimal mismatch"; + EXPECT_EQ(row.GetDate(10).days_since_epoch, col_date.days_since_epoch) << "col_date mismatch"; + EXPECT_EQ(row.GetTime(11).millis_since_midnight, col_time.millis_since_midnight) + << "col_time mismatch"; + EXPECT_EQ(row.GetTimestamp(12).epoch_millis, col_timestamp.epoch_millis) + << "col_timestamp millis mismatch"; + EXPECT_EQ(row.GetTimestamp(12).nano_of_millisecond, col_timestamp.nano_of_millisecond) + << "col_timestamp nanos mismatch"; + EXPECT_EQ(row.GetTimestamp(13).epoch_millis, col_timestamp_ltz.epoch_millis) + << "col_timestamp_ltz millis mismatch"; + EXPECT_EQ(row.GetTimestamp(13).nano_of_millisecond, col_timestamp_ltz.nano_of_millisecond) + << "col_timestamp_ltz nanos mismatch"; + + auto [bytes_ptr, bytes_len] = row.GetBytes(14); + EXPECT_EQ(bytes_len, col_bytes.size()) << "col_bytes length mismatch"; + EXPECT_TRUE(std::memcmp(bytes_ptr, col_bytes.data(), bytes_len) == 0) + << "col_bytes mismatch"; + + auto [binary_ptr, binary_len] = row.GetBytes(15); + EXPECT_EQ(binary_len, col_binary.size()) << "col_binary length mismatch"; + EXPECT_TRUE(std::memcmp(binary_ptr, col_binary.data(), binary_len) == 0) + << "col_binary mismatch"; + + // Verify second record (all nulls) + auto& null_row = all_records[1].row; + for (size_t i = 0; i < field_count; ++i) { + EXPECT_TRUE(null_row.IsNull(i)) << "column " << i << " should be null"; + } + + ASSERT_OK(adm.DropTable(table_path, false)); +} + +TEST_F(LogTableTest, PartitionedTableAppendScan) { + auto& adm = admin(); + auto& conn = connection(); + + fluss::TablePath table_path("fluss", "test_partitioned_log_append_cpp"); + + // Create a partitioned log table + auto schema = fluss::Schema::NewBuilder() + .AddColumn("id", fluss::DataType::Int()) + .AddColumn("region", fluss::DataType::String()) + .AddColumn("value", fluss::DataType::BigInt()) + .Build(); + + auto table_descriptor = fluss::TableDescriptor::NewBuilder() + .SetSchema(schema) + .SetPartitionKeys({"region"}) + .SetProperty("table.replication.factor", "1") + .Build(); + + fluss_test::CreateTable(adm, table_path, table_descriptor); + + // Create partitions + fluss_test::CreatePartitions(adm, table_path, "region", {"US", "EU"}); + + // Wait for partitions + std::this_thread::sleep_for(std::chrono::seconds(2)); + + fluss::Table table; + ASSERT_OK(conn.GetTable(table_path, table)); + + auto table_append = table.NewAppend(); + fluss::AppendWriter append_writer; + ASSERT_OK(table_append.CreateWriter(append_writer)); + + // Append rows + struct TestData { + int32_t id; + std::string region; + int64_t value; + }; + std::vector test_data = {{1, "US", 100}, {2, "US", 200}, {3, "EU", 300}, {4, "EU", 400}}; + + for (const auto& d : test_data) { + fluss::GenericRow row(3); + row.SetInt32(0, d.id); + row.SetString(1, d.region); + row.SetInt64(2, d.value); + ASSERT_OK(append_writer.Append(row)); + } + ASSERT_OK(append_writer.Flush()); + + // Append arrow batches per partition + { + auto id_builder = arrow::Int32Builder(); + id_builder.AppendValues({5, 6}).ok(); + auto region_builder = arrow::StringBuilder(); + region_builder.AppendValues({"US", "US"}).ok(); + auto value_builder = arrow::Int64Builder(); + value_builder.AppendValues({500, 600}).ok(); + + auto batch = arrow::RecordBatch::Make( + arrow::schema({arrow::field("id", arrow::int32()), + arrow::field("region", arrow::utf8()), + arrow::field("value", arrow::int64())}), + 2, + {id_builder.Finish().ValueOrDie(), region_builder.Finish().ValueOrDie(), + value_builder.Finish().ValueOrDie()}); + + ASSERT_OK(append_writer.AppendArrowBatch(batch)); + } + + { + auto id_builder = arrow::Int32Builder(); + id_builder.AppendValues({7, 8}).ok(); + auto region_builder = arrow::StringBuilder(); + region_builder.AppendValues({"EU", "EU"}).ok(); + auto value_builder = arrow::Int64Builder(); + value_builder.AppendValues({700, 800}).ok(); + + auto batch = arrow::RecordBatch::Make( + arrow::schema({arrow::field("id", arrow::int32()), + arrow::field("region", arrow::utf8()), + arrow::field("value", arrow::int64())}), + 2, + {id_builder.Finish().ValueOrDie(), region_builder.Finish().ValueOrDie(), + value_builder.Finish().ValueOrDie()}); + + ASSERT_OK(append_writer.AppendArrowBatch(batch)); + } + ASSERT_OK(append_writer.Flush()); + + // Test list partition offsets + std::unordered_map us_offsets; + ASSERT_OK(adm.ListPartitionOffsets(table_path, "US", {0}, fluss::OffsetSpec::Latest(), + us_offsets)); + EXPECT_EQ(us_offsets[0], 4) << "US partition should have 4 records"; + + std::unordered_map eu_offsets; + ASSERT_OK(adm.ListPartitionOffsets(table_path, "EU", {0}, fluss::OffsetSpec::Latest(), + eu_offsets)); + EXPECT_EQ(eu_offsets[0], 4) << "EU partition should have 4 records"; + + // Subscribe to all partitions and scan + fluss::Table scan_table; + ASSERT_OK(conn.GetTable(table_path, scan_table)); + auto table_scan = scan_table.NewScan(); + fluss::LogScanner log_scanner; + ASSERT_OK(table_scan.CreateLogScanner(log_scanner)); + + std::vector partition_infos; + ASSERT_OK(adm.ListPartitionInfos(table_path, partition_infos)); + + for (const auto& pi : partition_infos) { + ASSERT_OK(log_scanner.SubscribePartitionBuckets(pi.partition_id, 0, 0)); + } + + // Collect all records + using Record = std::tuple; + auto extract_record = [](const fluss::ScanRecord& rec) -> Record { + return {rec.row.GetInt32(0), std::string(rec.row.GetString(1)), rec.row.GetInt64(2)}; + }; + std::vector collected; + fluss_test::PollRecords(log_scanner, 8, extract_record, collected); + + ASSERT_EQ(collected.size(), 8u) << "Expected 8 records total"; + std::sort(collected.begin(), collected.end()); + + std::vector expected = {{1, "US", 100}, {2, "US", 200}, {3, "EU", 300}, + {4, "EU", 400}, {5, "US", 500}, {6, "US", 600}, + {7, "EU", 700}, {8, "EU", 800}}; + EXPECT_EQ(collected, expected); + + // Test unsubscribe_partition: unsubscribe EU, should only get US data + { + fluss::Table unsub_table; + ASSERT_OK(conn.GetTable(table_path, unsub_table)); + auto unsub_scan = unsub_table.NewScan(); + fluss::LogScanner unsub_scanner; + ASSERT_OK(unsub_scan.CreateLogScanner(unsub_scanner)); + + int64_t eu_partition_id = -1; + for (const auto& pi : partition_infos) { + ASSERT_OK(unsub_scanner.SubscribePartitionBuckets(pi.partition_id, 0, 0)); + if (pi.partition_name == "EU") { + eu_partition_id = pi.partition_id; + } + } + ASSERT_GE(eu_partition_id, 0) << "EU partition should exist"; + + ASSERT_OK(unsub_scanner.UnsubscribePartition(eu_partition_id, 0)); + + std::vector us_only; + fluss_test::PollRecords(unsub_scanner, 4, extract_record, us_only); + + ASSERT_EQ(us_only.size(), 4u) << "Should receive exactly 4 US records"; + for (const auto& [id, region, val] : us_only) { + EXPECT_EQ(region, "US") << "After unsubscribe EU, only US data should be read"; + } + } + + // Test subscribe_partition_buckets (batch subscribe) + { + fluss::Table batch_table; + ASSERT_OK(conn.GetTable(table_path, batch_table)); + auto batch_scan = batch_table.NewScan(); + fluss::LogScanner batch_scanner; + ASSERT_OK(batch_scan.CreateLogScanner(batch_scanner)); + + std::vector subs; + for (const auto& pi : partition_infos) { + subs.push_back({pi.partition_id, 0, 0}); + } + ASSERT_OK(batch_scanner.SubscribePartitionBuckets(subs)); + + std::vector batch_collected; + fluss_test::PollRecords(batch_scanner, 8, extract_record, batch_collected); + ASSERT_EQ(batch_collected.size(), 8u); + std::sort(batch_collected.begin(), batch_collected.end()); + EXPECT_EQ(batch_collected, expected); + } + + ASSERT_OK(adm.DropTable(table_path, false)); +} diff --git a/bindings/cpp/test/test_main.cpp b/bindings/cpp/test/test_main.cpp new file mode 100644 index 00000000..8c2e2d96 --- /dev/null +++ b/bindings/cpp/test/test_main.cpp @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include + +#include "test_utils.h" + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + + // Register the global test environment (manages the Fluss cluster lifecycle). + ::testing::AddGlobalTestEnvironment(fluss_test::FlussTestEnvironment::Instance()); + + return RUN_ALL_TESTS(); +} diff --git a/bindings/cpp/test/test_utils.h b/bindings/cpp/test/test_utils.h new file mode 100644 index 00000000..bae52377 --- /dev/null +++ b/bindings/cpp/test/test_utils.h @@ -0,0 +1,315 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +#ifdef _WIN32 +#include +#include +#pragma comment(lib, "ws2_32.lib") +#else +#include +#include +#include +#include +#endif + +#include "fluss.hpp" + +// Macro to assert Result is OK and print error message on failure +#define ASSERT_OK(result) ASSERT_TRUE((result).Ok()) << (result).error_message +#define EXPECT_OK(result) EXPECT_TRUE((result).Ok()) << (result).error_message + +namespace fluss_test { + +static constexpr const char* kFlussVersion = "0.7.0"; +static constexpr const char* kNetworkName = "fluss-cpp-test-network"; +static constexpr const char* kZookeeperName = "zookeeper-cpp-test"; +static constexpr const char* kCoordinatorName = "coordinator-server-cpp-test"; +static constexpr const char* kTabletServerName = "tablet-server-cpp-test"; +static constexpr int kCoordinatorPort = 9123; +static constexpr int kTabletServerPort = 9124; + +/// Execute a shell command and return its exit code. +inline int RunCommand(const std::string& cmd) { + return system(cmd.c_str()); +} + +/// Wait until a TCP port is accepting connections, or timeout. +inline bool WaitForPort(const std::string& host, int port, int timeout_seconds = 60) { + auto deadline = + std::chrono::steady_clock::now() + std::chrono::seconds(timeout_seconds); + + while (std::chrono::steady_clock::now() < deadline) { + int sock = socket(AF_INET, SOCK_STREAM, 0); + if (sock < 0) { + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + continue; + } + + struct sockaddr_in addr {}; + addr.sin_family = AF_INET; + addr.sin_port = htons(static_cast(port)); + inet_pton(AF_INET, host.c_str(), &addr.sin_addr); + + int result = connect(sock, reinterpret_cast(&addr), sizeof(addr)); +#ifdef _WIN32 + closesocket(sock); +#else + close(sock); +#endif + if (result == 0) { + return true; + } + + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + } + return false; +} + +/// Manages a Docker-based Fluss cluster for integration testing. +class FlussTestCluster { + public: + FlussTestCluster() = default; + + bool Start() { + const char* env_servers = std::getenv("FLUSS_BOOTSTRAP_SERVERS"); + if (env_servers && std::strlen(env_servers) > 0) { + bootstrap_servers_ = env_servers; + external_cluster_ = true; + std::cout << "Using external cluster: " << bootstrap_servers_ << std::endl; + return true; + } + + std::cout << "Starting Fluss cluster via Docker..." << std::endl; + + // Create network + RunCommand(std::string("docker network create ") + kNetworkName + " 2>/dev/null || true"); + + // Start ZooKeeper + std::string zk_cmd = std::string("docker run -d --rm") + + " --name " + kZookeeperName + + " --network " + kNetworkName + + " zookeeper:3.9.2"; + if (RunCommand(zk_cmd) != 0) { + std::cerr << "Failed to start ZooKeeper" << std::endl; + return false; + } + + // Wait for ZooKeeper to be ready before starting Fluss servers + std::this_thread::sleep_for(std::chrono::seconds(5)); + + // Start Coordinator Server + std::string coord_props = + "zookeeper.address: " + std::string(kZookeeperName) + ":2181\\n" + "bind.listeners: INTERNAL://" + std::string(kCoordinatorName) + ":0, CLIENT://" + + std::string(kCoordinatorName) + ":9123\\n" + "advertised.listeners: CLIENT://localhost:9123\\n" + "internal.listener.name: INTERNAL\\n" + "netty.server.num-network-threads: 1\\n" + "netty.server.num-worker-threads: 3"; + + std::string coord_cmd = std::string("docker run -d --rm") + + " --name " + kCoordinatorName + + " --network " + kNetworkName + + " -p 9123:9123" + + " -e FLUSS_PROPERTIES=\"$(printf '" + coord_props + "')\"" + + " fluss/fluss:" + kFlussVersion + + " coordinatorServer"; + if (RunCommand(coord_cmd) != 0) { + std::cerr << "Failed to start Coordinator Server" << std::endl; + Stop(); + return false; + } + + // Wait for coordinator to be ready + if (!WaitForPort("127.0.0.1", kCoordinatorPort)) { + std::cerr << "Coordinator Server did not become ready" << std::endl; + Stop(); + return false; + } + + // Start Tablet Server + std::string ts_props = + "zookeeper.address: " + std::string(kZookeeperName) + ":2181\\n" + "bind.listeners: INTERNAL://" + std::string(kTabletServerName) + ":0, CLIENT://" + + std::string(kTabletServerName) + ":9123\\n" + "advertised.listeners: CLIENT://localhost:" + std::to_string(kTabletServerPort) + "\\n" + "internal.listener.name: INTERNAL\\n" + "tablet-server.id: 0\\n" + "netty.server.num-network-threads: 1\\n" + "netty.server.num-worker-threads: 3"; + + std::string ts_cmd = std::string("docker run -d --rm") + + " --name " + kTabletServerName + + " --network " + kNetworkName + + " -p " + std::to_string(kTabletServerPort) + ":9123" + + " -e FLUSS_PROPERTIES=\"$(printf '" + ts_props + "')\"" + + " fluss/fluss:" + kFlussVersion + + " tabletServer"; + if (RunCommand(ts_cmd) != 0) { + std::cerr << "Failed to start Tablet Server" << std::endl; + Stop(); + return false; + } + + // Wait for tablet server to be ready + if (!WaitForPort("127.0.0.1", kTabletServerPort)) { + std::cerr << "Tablet Server did not become ready" << std::endl; + Stop(); + return false; + } + + bootstrap_servers_ = "127.0.0.1:9123"; + std::cout << "Fluss cluster started successfully." << std::endl; + return true; + } + + void Stop() { + if (external_cluster_) return; + + std::cout << "Stopping Fluss cluster..." << std::endl; + RunCommand(std::string("docker stop ") + kTabletServerName + " 2>/dev/null || true"); + RunCommand(std::string("docker stop ") + kCoordinatorName + " 2>/dev/null || true"); + RunCommand(std::string("docker stop ") + kZookeeperName + " 2>/dev/null || true"); + RunCommand(std::string("docker network rm ") + kNetworkName + " 2>/dev/null || true"); + std::cout << "Fluss cluster stopped." << std::endl; + } + + const std::string& GetBootstrapServers() const { return bootstrap_servers_; } + + private: + std::string bootstrap_servers_; + bool external_cluster_{false}; +}; + +/// GoogleTest Environment that manages the Fluss cluster lifecycle. +class FlussTestEnvironment : public ::testing::Environment { + public: + static FlussTestEnvironment* Instance() { + static FlussTestEnvironment* instance = nullptr; + if (!instance) { + instance = new FlussTestEnvironment(); + } + return instance; + } + + void SetUp() override { + if (!cluster_.Start()) { + GTEST_SKIP() << "Failed to start Fluss cluster. Skipping integration tests."; + } + + // Retry connection creation until the coordinator is fully initialized. + fluss::Configuration config; + config.bootstrap_servers = cluster_.GetBootstrapServers(); + + auto deadline = + std::chrono::steady_clock::now() + std::chrono::seconds(60); + while (std::chrono::steady_clock::now() < deadline) { + auto result = fluss::Connection::Create(config, connection_); + if (result.Ok()) { + auto admin_result = connection_.GetAdmin(admin_); + if (admin_result.Ok()) { + std::cout << "Connected to Fluss cluster." << std::endl; + return; + } + } + std::cout << "Waiting for Fluss cluster to be ready..." << std::endl; + std::this_thread::sleep_for(std::chrono::seconds(2)); + } + GTEST_SKIP() << "Fluss cluster did not become ready within timeout."; + } + + void TearDown() override { + cluster_.Stop(); + } + + fluss::Connection& GetConnection() { return connection_; } + fluss::Admin& GetAdmin() { return admin_; } + const std::string& GetBootstrapServers() { return cluster_.GetBootstrapServers(); } + + private: + FlussTestEnvironment() = default; + + FlussTestCluster cluster_; + fluss::Connection connection_; + fluss::Admin admin_; +}; + +/// Helper: create a table (assert success). Drops existing table first if it exists. +inline void CreateTable(fluss::Admin& admin, const fluss::TablePath& path, + const fluss::TableDescriptor& descriptor) { + admin.DropTable(path, true); // ignore if not exists + auto result = admin.CreateTable(path, descriptor, false); + ASSERT_OK(result); +} + +/// Helper: create partitions for a partitioned table. +inline void CreatePartitions(fluss::Admin& admin, const fluss::TablePath& path, + const std::string& partition_column, + const std::vector& values) { + for (const auto& value : values) { + std::unordered_map spec; + spec[partition_column] = value; + auto result = admin.CreatePartition(path, spec, true); + ASSERT_OK(result); + } +} + +/// Poll a LogScanner for ScanRecords until `expected_count` items are collected or timeout. +/// `extract_fn` is called for each ScanRecord and should return a value of type T. +template +void PollRecords(fluss::LogScanner& scanner, size_t expected_count, + ExtractFn extract_fn, std::vector& out) { + auto deadline = std::chrono::steady_clock::now() + std::chrono::seconds(10); + while (out.size() < expected_count && std::chrono::steady_clock::now() < deadline) { + fluss::ScanRecords records; + ASSERT_OK(scanner.Poll(1000, records)); + for (auto rec : records) { + out.push_back(extract_fn(rec)); + } + } +} + +/// Poll a LogScanner for ArrowRecordBatches until `expected_count` items are collected or timeout. +/// `extract_fn` is called with the full ArrowRecordBatches and should return a std::vector. +template +void PollRecordBatches(fluss::LogScanner& scanner, size_t expected_count, + ExtractFn extract_fn, std::vector& out) { + auto deadline = std::chrono::steady_clock::now() + std::chrono::seconds(10); + while (out.size() < expected_count && std::chrono::steady_clock::now() < deadline) { + fluss::ArrowRecordBatches batches; + ASSERT_OK(scanner.PollRecordBatch(1000, batches)); + auto items = extract_fn(batches); + out.insert(out.end(), items.begin(), items.end()); + } +} + +} // namespace fluss_test diff --git a/bindings/python/test/test_log_table.py b/bindings/python/test/test_log_table.py index bfa97897..dd1a4d4f 100644 --- a/bindings/python/test/test_log_table.py +++ b/bindings/python/test/test_log_table.py @@ -36,7 +36,9 @@ async def test_append_and_scan(connection, admin): schema = fluss.Schema( pa.schema([pa.field("c1", pa.int32()), pa.field("c2", pa.string())]) ) - table_descriptor = fluss.TableDescriptor(schema) + table_descriptor = fluss.TableDescriptor( + schema, bucket_count=3, bucket_keys=["c1"] + ) await admin.create_table(table_path, table_descriptor, ignore_if_exists=False) table = await connection.get_table(table_path) diff --git a/crates/fluss/tests/integration/log_table.rs b/crates/fluss/tests/integration/log_table.rs index 76420676..eac72e5c 100644 --- a/crates/fluss/tests/integration/log_table.rs +++ b/crates/fluss/tests/integration/log_table.rs @@ -39,7 +39,7 @@ mod table_test { }; use arrow::array::record_batch; use fluss::client::{EARLIEST_OFFSET, FlussTable, TableScan}; - use fluss::metadata::{DataTypes, Schema, TableBucket, TableDescriptor, TablePath}; + use fluss::metadata::{DataTypes, Schema, TableDescriptor, TablePath}; use fluss::record::ScanRecord; use fluss::row::InternalRow; use fluss::rpc::message::OffsetSpec; @@ -79,6 +79,7 @@ mod table_test { .build() .expect("Failed to build schema"), ) + .distributed_by(Some(3), vec!["c1".to_string()]) .build() .expect("Failed to build table"); @@ -127,38 +128,34 @@ mod table_test { .expect("Failed to subscribe with EARLIEST_OFFSET"); } - // Poll for records - let scan_records = log_scanner - .poll(tokio::time::Duration::from_secs(10)) - .await - .expect("Failed to poll records"); - - // Verify the scanned records - let table_bucket = TableBucket::new(table.get_table_info().table_id, 0); - let records = scan_records.records(&table_bucket); - - assert_eq!(records.len(), 6, "Expected 6 records"); - - // Verify record contents match what was appended - let expected_c1_values = vec![1, 2, 3, 4, 5, 6]; - let expected_c2_values = vec!["a1", "a2", "a3", "a4", "a5", "a6"]; - - for (i, record) in records.iter().enumerate() { - let row = record.row(); - assert_eq!( - row.get_int(0), - expected_c1_values[i], - "c1 value mismatch at row {}", - i - ); - assert_eq!( - row.get_string(1), - expected_c2_values[i], - "c2 value mismatch at row {}", - i - ); + // Poll for records across all buckets + let mut collected: Vec<(i32, String)> = Vec::new(); + let start_time = std::time::Instant::now(); + while collected.len() < 6 && start_time.elapsed() < Duration::from_secs(10) { + let scan_records = log_scanner + .poll(Duration::from_millis(500)) + .await + .expect("Failed to poll records"); + for rec in scan_records { + let row = rec.row(); + collected.push((row.get_int(0), row.get_string(1).to_string())); + } } + assert_eq!(collected.len(), 6, "Expected 6 records"); + + // Sort and verify record contents + collected.sort(); + let expected: Vec<(i32, String)> = vec![ + (1, "a1".to_string()), + (2, "a2".to_string()), + (3, "a3".to_string()), + (4, "a4".to_string()), + (5, "a5".to_string()), + (6, "a6".to_string()), + ]; + assert_eq!(collected, expected); + // Test unsubscribe: unsubscribe from bucket 0, verify no error log_scanner .unsubscribe(0)