Skip to content

Commit 6b842b6

Browse files
Add C++ per-bucket scan integration test
1 parent 74ff366 commit 6b842b6

1 file changed

Lines changed: 55 additions & 26 deletions

File tree

bindings/cpp/test/test_log_table.cpp

Lines changed: 55 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -113,8 +113,7 @@ TEST_F(LogTableTest, AppendRecordBatchAndScan) {
113113
while (records.size() < 6 && std::chrono::steady_clock::now() < deadline) {
114114
fluss::ScanRecords scan_records;
115115
ASSERT_OK(log_scanner.Poll(500, scan_records));
116-
for (size_t i = 0; i < scan_records.Size(); ++i) {
117-
auto rec = scan_records[i];
116+
for (auto rec : scan_records) {
118117
records.emplace_back(rec.row.GetInt32(0), std::string(rec.row.GetString(1)));
119118
}
120119
}
@@ -126,6 +125,45 @@ TEST_F(LogTableTest, AppendRecordBatchAndScan) {
126125
{1, "a1"}, {2, "a2"}, {3, "a3"}, {4, "a4"}, {5, "a5"}, {6, "a6"}};
127126
EXPECT_EQ(records, expected);
128127

128+
// Verify per-bucket iteration via BucketView
129+
{
130+
fluss::Table bucket_table;
131+
ASSERT_OK(conn.GetTable(table_path, bucket_table));
132+
auto bucket_scan = bucket_table.NewScan();
133+
fluss::LogScanner bucket_scanner;
134+
ASSERT_OK(bucket_scan.CreateLogScanner(bucket_scanner));
135+
136+
for (int32_t bid = 0; bid < num_buckets; ++bid) {
137+
ASSERT_OK(bucket_scanner.Subscribe(bid, fluss::EARLIEST_OFFSET));
138+
}
139+
140+
std::vector<std::pair<int32_t, std::string>> bucket_records;
141+
auto bucket_deadline = std::chrono::steady_clock::now() + std::chrono::seconds(10);
142+
size_t buckets_with_data = 0;
143+
while (bucket_records.size() < 6 && std::chrono::steady_clock::now() < bucket_deadline) {
144+
fluss::ScanRecords scan_records;
145+
ASSERT_OK(bucket_scanner.Poll(500, scan_records));
146+
147+
// Iterate by bucket
148+
for (size_t b = 0; b < scan_records.BucketCount(); ++b) {
149+
auto bucket_view = scan_records.BucketAt(b);
150+
if (!bucket_view.Empty()) {
151+
buckets_with_data++;
152+
}
153+
for (auto rec : bucket_view) {
154+
bucket_records.emplace_back(rec.row.GetInt32(0),
155+
std::string(rec.row.GetString(1)));
156+
}
157+
}
158+
}
159+
160+
ASSERT_EQ(bucket_records.size(), 6u) << "Expected 6 records via per-bucket iteration";
161+
EXPECT_GT(buckets_with_data, 1u) << "Records should be distributed across multiple buckets";
162+
163+
std::sort(bucket_records.begin(), bucket_records.end());
164+
EXPECT_EQ(bucket_records, expected);
165+
}
166+
129167
// Test unsubscribe
130168
ASSERT_OK(log_scanner.Unsubscribe(0));
131169

@@ -290,15 +328,14 @@ TEST_F(LogTableTest, TestProject) {
290328
fluss::ScanRecords records;
291329
ASSERT_OK(scanner.Poll(10000, records));
292330

293-
ASSERT_EQ(records.Size(), 3u) << "Should have 3 records with project_by_name";
331+
ASSERT_EQ(records.Count(), 3u) << "Should have 3 records with project_by_name";
294332

295333
std::vector<std::string> expected_col_b = {"x", "y", "z"};
296334
std::vector<int32_t> expected_col_c = {10, 20, 30};
297335

298336
// Collect and sort by col_c to get deterministic order
299337
std::vector<std::pair<std::string, int32_t>> collected;
300-
for (size_t i = 0; i < records.Size(); ++i) {
301-
auto rec = records[i];
338+
for (auto rec : records) {
302339
collected.emplace_back(std::string(rec.row.GetString(0)), rec.row.GetInt32(1));
303340
}
304341
std::sort(collected.begin(), collected.end(),
@@ -324,14 +361,13 @@ TEST_F(LogTableTest, TestProject) {
324361
fluss::ScanRecords records;
325362
ASSERT_OK(scanner.Poll(10000, records));
326363

327-
ASSERT_EQ(records.Size(), 3u);
364+
ASSERT_EQ(records.Count(), 3u);
328365

329366
std::vector<std::string> expected_col_b = {"x", "y", "z"};
330367
std::vector<int32_t> expected_col_a = {1, 2, 3};
331368

332369
std::vector<std::pair<std::string, int32_t>> collected;
333-
for (size_t i = 0; i < records.Size(); ++i) {
334-
auto rec = records[i];
370+
for (auto rec : records) {
335371
collected.emplace_back(std::string(rec.row.GetString(0)), rec.row.GetInt32(1));
336372
}
337373
std::sort(collected.begin(), collected.end(),
@@ -585,22 +621,19 @@ TEST_F(LogTableTest, AllSupportedDatatypes) {
585621
ASSERT_OK(log_scanner.Subscribe(0, 0));
586622

587623
// Poll until we get 2 records
588-
std::vector<fluss::ScanRecords> all_records;
589-
size_t total_records = 0;
624+
std::vector<fluss::ScanRecord> all_records;
590625
auto deadline = std::chrono::steady_clock::now() + std::chrono::seconds(10);
591-
while (total_records < 2 && std::chrono::steady_clock::now() < deadline) {
626+
while (all_records.size() < 2 && std::chrono::steady_clock::now() < deadline) {
592627
fluss::ScanRecords records;
593628
ASSERT_OK(log_scanner.Poll(5000, records));
594-
total_records += records.Size();
595-
if (records.Size() > 0) {
596-
all_records.push_back(std::move(records));
629+
for (auto rec : records) {
630+
all_records.push_back(rec);
597631
}
598632
}
599-
ASSERT_EQ(total_records, 2u) << "Expected 2 records";
633+
ASSERT_EQ(all_records.size(), 2u) << "Expected 2 records";
600634

601635
// Verify first record (all values)
602-
auto rec = all_records[0][0];
603-
auto& row = rec.row;
636+
auto& row = all_records[0].row;
604637

605638
EXPECT_EQ(row.GetInt32(0), col_tinyint) << "col_tinyint mismatch";
606639
EXPECT_EQ(row.GetInt32(1), col_smallint) << "col_smallint mismatch";
@@ -635,10 +668,9 @@ TEST_F(LogTableTest, AllSupportedDatatypes) {
635668
<< "col_binary mismatch";
636669

637670
// Verify second record (all nulls)
638-
// The second record might be in the same ScanRecords or a different one
639-
fluss::ScanRecord null_rec = (all_records[0].Size() > 1) ? all_records[0][1] : all_records[1][0];
671+
auto& null_row = all_records[1].row;
640672
for (size_t i = 0; i < field_count; ++i) {
641-
EXPECT_TRUE(null_rec.row.IsNull(i)) << "column " << i << " should be null";
673+
EXPECT_TRUE(null_row.IsNull(i)) << "column " << i << " should be null";
642674
}
643675

644676
ASSERT_OK(adm.DropTable(table_path, false));
@@ -767,8 +799,7 @@ TEST_F(LogTableTest, PartitionedTableAppendScan) {
767799
while (collected.size() < 8 && std::chrono::steady_clock::now() < deadline) {
768800
fluss::ScanRecords records;
769801
ASSERT_OK(log_scanner.Poll(500, records));
770-
for (size_t i = 0; i < records.Size(); ++i) {
771-
auto rec = records[i];
802+
for (auto rec : records) {
772803
collected.emplace_back(rec.row.GetInt32(0), std::string(rec.row.GetString(1)),
773804
rec.row.GetInt64(2));
774805
}
@@ -806,8 +837,7 @@ TEST_F(LogTableTest, PartitionedTableAppendScan) {
806837
while (us_only.size() < 4 && std::chrono::steady_clock::now() < unsub_deadline) {
807838
fluss::ScanRecords records;
808839
ASSERT_OK(unsub_scanner.Poll(300, records));
809-
for (size_t i = 0; i < records.Size(); ++i) {
810-
auto rec = records[i];
840+
for (auto rec : records) {
811841
us_only.emplace_back(rec.row.GetInt32(0), std::string(rec.row.GetString(1)),
812842
rec.row.GetInt64(2));
813843
}
@@ -838,8 +868,7 @@ TEST_F(LogTableTest, PartitionedTableAppendScan) {
838868
while (batch_collected.size() < 8 && std::chrono::steady_clock::now() < batch_deadline) {
839869
fluss::ScanRecords records;
840870
ASSERT_OK(batch_scanner.Poll(500, records));
841-
for (size_t i = 0; i < records.Size(); ++i) {
842-
auto rec = records[i];
871+
for (auto rec : records) {
843872
batch_collected.emplace_back(rec.row.GetInt32(0),
844873
std::string(rec.row.GetString(1)),
845874
rec.row.GetInt64(2));

0 commit comments

Comments
 (0)