diff --git a/cpp/src/arrow/compute/api_vector.cc b/cpp/src/arrow/compute/api_vector.cc index 9f3b3fa71b3..d4c4a915999 100644 --- a/cpp/src/arrow/compute/api_vector.cc +++ b/cpp/src/arrow/compute/api_vector.cc @@ -233,6 +233,20 @@ Result> Take(const Array& values, const Array& indices, return out.make_array(); } +// ---------------------------------------------------------------------- +// Dropnull functions + +Result DropNull(const Datum& values, ExecContext* ctx) { + // Invoke metafunction which deals with Datum kinds other than just Array, + // ChunkedArray. + return CallFunction("drop_null", {values}, ctx); +} + +Result> DropNull(const Array& values, ExecContext* ctx) { + ARROW_ASSIGN_OR_RAISE(Datum out, DropNull(Datum(values), ctx)); + return out.make_array(); +} + // ---------------------------------------------------------------------- // Deprecated functions diff --git a/cpp/src/arrow/compute/api_vector.h b/cpp/src/arrow/compute/api_vector.h index 2d9522b0732..5dc68fc5c83 100644 --- a/cpp/src/arrow/compute/api_vector.h +++ b/cpp/src/arrow/compute/api_vector.h @@ -216,6 +216,24 @@ Result> Take(const Array& values, const Array& indices, const TakeOptions& options = TakeOptions::Defaults(), ExecContext* ctx = NULLPTR); +/// \brief Drop Null from an array of values +/// +/// The output array will be of the same type as the input values +/// array, with elements taken from the values array without nulls. +/// +/// For example given values = ["a", "b", "c", null, "e", "f"], +/// the output will be = ["a", "b", "c", "e", "f"] +/// +/// \param[in] values datum from which to take +/// \param[in] ctx the function execution context, optional +/// \return the resulting datum +ARROW_EXPORT +Result DropNull(const Datum& values, ExecContext* ctx = NULLPTR); + +/// \brief DropNull with Array inputs and output +ARROW_EXPORT +Result> DropNull(const Array& values, ExecContext* ctx = NULLPTR); + /// \brief Returns indices that partition an array around n-th /// sorted element. /// diff --git a/cpp/src/arrow/compute/kernels/vector_selection.cc b/cpp/src/arrow/compute/kernels/vector_selection.cc index 3ff503a3ee1..69e6324cde5 100644 --- a/cpp/src/arrow/compute/kernels/vector_selection.cc +++ b/cpp/src/arrow/compute/kernels/vector_selection.cc @@ -38,6 +38,7 @@ #include "arrow/util/bit_block_counter.h" #include "arrow/util/bit_run_reader.h" #include "arrow/util/bit_util.h" +#include "arrow/util/bitmap.h" #include "arrow/util/bitmap_ops.h" #include "arrow/util/bitmap_reader.h" #include "arrow/util/int_util.h" @@ -2146,6 +2147,170 @@ class TakeMetaFunction : public MetaFunction { } }; +// ---------------------------------------------------------------------- +// DropNull Implementation + +Result> GetDropNullFilter(const Array& values, + MemoryPool* memory_pool) { + auto bitmap_buffer = values.null_bitmap(); + std::shared_ptr out_array = std::make_shared( + values.length(), bitmap_buffer, nullptr, 0, values.offset()); + return out_array; +} + +Result> CreateEmptyArray(std::shared_ptr type, + MemoryPool* memory_pool) { + std::unique_ptr builder; + RETURN_NOT_OK(MakeBuilder(memory_pool, type, &builder)); + RETURN_NOT_OK(builder->Resize(0)); + return builder->Finish(); +} + +Result> CreateEmptyChunkedArray( + std::shared_ptr type, MemoryPool* memory_pool) { + std::vector> new_chunks(1); // Hard-coded 1 for now + ARROW_ASSIGN_OR_RAISE(new_chunks[0], CreateEmptyArray(type, memory_pool)); + return std::make_shared(std::move(new_chunks)); +} + +Result DropNullArray(const std::shared_ptr& values, ExecContext* ctx) { + if (values->null_count() == 0) { + return values; + } + if (values->null_count() == values->length()) { + return CreateEmptyArray(values->type(), ctx->memory_pool()); + } + if (values->type()->id() == Type::type::NA) { + return std::make_shared(0); + } + ARROW_ASSIGN_OR_RAISE(auto drop_null_filter, + GetDropNullFilter(*values, ctx->memory_pool())); + return Filter(values, drop_null_filter, FilterOptions::Defaults(), ctx); +} + +Result DropNullChunkedArray(const std::shared_ptr& values, + ExecContext* ctx) { + if (values->null_count() == 0) { + return values; + } + if (values->null_count() == values->length()) { + return CreateEmptyChunkedArray(values->type(), ctx->memory_pool()); + } + std::vector> new_chunks; + for (const auto& chunk : values->chunks()) { + ARROW_ASSIGN_OR_RAISE(auto new_chunk, DropNullArray(chunk, ctx)); + if (new_chunk.length() > 0) { + new_chunks.push_back(new_chunk.make_array()); + } + } + return std::make_shared(std::move(new_chunks)); +} + +Result DropNullRecordBatch(const std::shared_ptr& batch, + ExecContext* ctx) { + // Compute an upper bound of the final null count + int64_t null_count = 0; + for (const auto& column : batch->columns()) { + null_count += column->null_count(); + } + if (null_count == 0) { + return batch; + } + ARROW_ASSIGN_OR_RAISE(auto dst, + AllocateEmptyBitmap(batch->num_rows(), ctx->memory_pool())); + BitUtil::SetBitsTo(dst->mutable_data(), 0, batch->num_rows(), true); + for (const auto& column : batch->columns()) { + if (column->type()->id() == Type::type::NA) { + BitUtil::SetBitsTo(dst->mutable_data(), 0, batch->num_rows(), false); + break; + } + if (column->null_bitmap_data()) { + ::arrow::internal::BitmapAnd(column->null_bitmap_data(), column->offset(), + dst->data(), 0, column->length(), 0, + dst->mutable_data()); + } + } + auto drop_null_filter = std::make_shared(batch->num_rows(), dst); + if (drop_null_filter->true_count() == 0) { + // Shortcut: construct empty result + ArrayVector empty_batch(batch->num_columns()); + for (int i = 0; i < batch->num_columns(); i++) { + ARROW_ASSIGN_OR_RAISE( + empty_batch[i], CreateEmptyArray(batch->column(i)->type(), ctx->memory_pool())); + } + return RecordBatch::Make(batch->schema(), 0, std::move(empty_batch)); + } + return Filter(Datum(batch), Datum(drop_null_filter), FilterOptions::Defaults(), ctx); +} + +Result DropNullTable(const std::shared_ptr& table, ExecContext* ctx) { + if (table->num_rows() == 0) { + return table; + } + // Compute an upper bound of the final null count + int64_t null_count = 0; + for (const auto& col : table->columns()) { + for (const auto& column_chunk : col->chunks()) { + null_count += column_chunk->null_count(); + } + } + if (null_count == 0) { + return table; + } + + arrow::RecordBatchVector filtered_batches; + TableBatchReader batch_iter(*table); + while (true) { + ARROW_ASSIGN_OR_RAISE(auto batch, batch_iter.Next()); + if (batch == nullptr) { + break; + } + ARROW_ASSIGN_OR_RAISE(auto filtered_datum, DropNullRecordBatch(batch, ctx)) + if (filtered_datum.length() > 0) { + filtered_batches.push_back(filtered_datum.record_batch()); + } + } + return Table::FromRecordBatches(table->schema(), filtered_batches); +} + +const FunctionDoc drop_null_doc( + "Drop nulls from the input", + ("The output is populated with values from the input (Array, ChunkedArray,\n" + "RecordBatch, or Table) without the null values.\n" + "For the RecordBatch and Table cases, `drop_null` drops the full row if\n" + "there is any null."), + {"input"}); + +class DropNullMetaFunction : public MetaFunction { + public: + DropNullMetaFunction() : MetaFunction("drop_null", Arity::Unary(), &drop_null_doc) {} + + Result ExecuteImpl(const std::vector& args, + const FunctionOptions* options, + ExecContext* ctx) const override { + switch (args[0].kind()) { + case Datum::ARRAY: { + return DropNullArray(args[0].make_array(), ctx); + } break; + case Datum::CHUNKED_ARRAY: { + return DropNullChunkedArray(args[0].chunked_array(), ctx); + } break; + case Datum::RECORD_BATCH: { + return DropNullRecordBatch(args[0].record_batch(), ctx); + } break; + case Datum::TABLE: { + return DropNullTable(args[0].table(), ctx); + } break; + default: + break; + } + return Status::NotImplemented( + "Unsupported types for drop_null operation: " + "values=", + args[0].ToString()); + } +}; + // ---------------------------------------------------------------------- template @@ -2261,6 +2426,9 @@ void RegisterVectorSelection(FunctionRegistry* registry) { take_kernel_descrs, &kDefaultTakeOptions, registry); DCHECK_OK(registry->AddFunction(std::make_shared())); + + // DropNull kernel + DCHECK_OK(registry->AddFunction(std::make_shared())); } } // namespace internal diff --git a/cpp/src/arrow/compute/kernels/vector_selection_test.cc b/cpp/src/arrow/compute/kernels/vector_selection_test.cc index 1ba89e1a4cf..ec023aee7b1 100644 --- a/cpp/src/arrow/compute/kernels/vector_selection_test.cc +++ b/cpp/src/arrow/compute/kernels/vector_selection_test.cc @@ -23,6 +23,7 @@ #include #include +#include "arrow/array/concatenate.h" #include "arrow/chunked_array.h" #include "arrow/compute/api.h" #include "arrow/compute/kernels/test_util.h" @@ -1734,5 +1735,533 @@ TEST(TestTake, RandomFixedSizeBinary) { TakeRandomTest::Test(fixed_size_binary(16)); } +// ---------------------------------------------------------------------- +// DropNull tests + +void AssertDropNullArrays(const std::shared_ptr& values, + const std::shared_ptr& expected) { + ASSERT_OK_AND_ASSIGN(std::shared_ptr actual, DropNull(*values)); + ValidateOutput(actual); + AssertArraysEqual(*expected, *actual, /*verbose=*/true); +} + +Status DropNullJSON(const std::shared_ptr& type, const std::string& values, + std::shared_ptr* out) { + return DropNull(*ArrayFromJSON(type, values)).Value(out); +} + +void CheckDropNull(const std::shared_ptr& type, const std::string& values, + const std::string& expected) { + std::shared_ptr actual; + + ASSERT_OK(DropNullJSON(type, values, &actual)); + ValidateOutput(actual); + AssertArraysEqual(*ArrayFromJSON(type, expected), *actual, /*verbose=*/true); +} + +struct TestDropNullKernel : public ::testing::Test { + void TestNoValidityBitmapButUnknownNullCount(const std::shared_ptr& values) { + ASSERT_EQ(values->null_count(), 0); + auto expected = (*DropNull(values)).make_array(); + + auto new_values = MakeArray(values->data()->Copy()); + new_values->data()->buffers[0].reset(); + new_values->data()->null_count = kUnknownNullCount; + auto result = (*DropNull(new_values)).make_array(); + AssertArraysEqual(*expected, *result); + } + + void TestNoValidityBitmapButUnknownNullCount(const std::shared_ptr& type, + const std::string& values) { + TestNoValidityBitmapButUnknownNullCount(ArrayFromJSON(type, values)); + } +}; + +TEST_F(TestDropNullKernel, DropNull) { + CheckDropNull(null(), "[null, null, null]", "[]"); + CheckDropNull(null(), "[null]", "[]"); +} + +TEST_F(TestDropNullKernel, DropNullBoolean) { + CheckDropNull(boolean(), "[true, false, true]", "[true, false, true]"); + CheckDropNull(boolean(), "[null, false, true]", "[false, true]"); + CheckDropNull(boolean(), "[]", "[]"); + CheckDropNull(boolean(), "[null, null]", "[]"); + + TestNoValidityBitmapButUnknownNullCount(boolean(), "[true, false, true]"); +} + +template +struct TestDropNullKernelTyped : public TestDropNullKernel { + TestDropNullKernelTyped() : rng_(seed_) {} + + std::shared_ptr Offsets(int32_t length, int32_t slice_count) { + return checked_pointer_cast(rng_.Offsets(slice_count, 0, length)); + } + + // Slice `array` into multiple chunks along `offsets` + ArrayVector Slices(const std::shared_ptr& array, + const std::shared_ptr& offsets) { + ArrayVector slices(offsets->length() - 1); + for (int64_t i = 0; i != static_cast(slices.size()); ++i) { + slices[i] = + array->Slice(offsets->Value(i), offsets->Value(i + 1) - offsets->Value(i)); + } + return slices; + } + + random::SeedType seed_ = 0xdeadbeef; + random::RandomArrayGenerator rng_; +}; + +template +class TestDropNullKernelWithNumeric : public TestDropNullKernelTyped { + protected: + void AssertDropNull(const std::string& values, const std::string& expected) { + CheckDropNull(type_singleton(), values, expected); + } + + std::shared_ptr type_singleton() { + return TypeTraits::type_singleton(); + } +}; + +TYPED_TEST_SUITE(TestDropNullKernelWithNumeric, NumericArrowTypes); +TYPED_TEST(TestDropNullKernelWithNumeric, DropNullNumeric) { + this->AssertDropNull("[7, 8, 9]", "[7, 8, 9]"); + this->AssertDropNull("[null, 8, 9]", "[8, 9]"); + this->AssertDropNull("[null, null, null]", "[]"); +} + +template +class TestDropNullKernelWithString : public TestDropNullKernelTyped { + public: + std::shared_ptr value_type() { + return TypeTraits::type_singleton(); + } + + void AssertDropNull(const std::string& values, const std::string& expected) { + CheckDropNull(value_type(), values, expected); + } + + void AssertDropNullDictionary(const std::string& dictionary_values, + const std::string& dictionary_indices, + const std::string& expected_indices) { + auto dict = ArrayFromJSON(value_type(), dictionary_values); + auto type = dictionary(int8(), value_type()); + ASSERT_OK_AND_ASSIGN(auto values, + DictionaryArray::FromArrays( + type, ArrayFromJSON(int8(), dictionary_indices), dict)); + ASSERT_OK_AND_ASSIGN( + auto expected, + DictionaryArray::FromArrays(type, ArrayFromJSON(int8(), expected_indices), dict)); + AssertDropNullArrays(values, expected); + } +}; + +TYPED_TEST_SUITE(TestDropNullKernelWithString, BinaryArrowTypes); + +TYPED_TEST(TestDropNullKernelWithString, DropNullString) { + this->AssertDropNull(R"(["a", "b", "c"])", R"(["a", "b", "c"])"); + this->AssertDropNull(R"([null, "b", "c"])", "[\"b\", \"c\"]"); + this->AssertDropNull(R"(["a", "b", null])", R"(["a", "b"])"); + + this->TestNoValidityBitmapButUnknownNullCount(this->value_type(), R"(["a", "b", "c"])"); +} + +TYPED_TEST(TestDropNullKernelWithString, DropNullDictionary) { + auto dict = R"(["a", "b", "c", "d", "e"])"; + this->AssertDropNullDictionary(dict, "[3, 4, 2]", "[3, 4, 2]"); + this->AssertDropNullDictionary(dict, "[null, 4, 2]", "[4, 2]"); +} + +class TestDropNullKernelFSB : public TestDropNullKernelTyped { + public: + std::shared_ptr value_type() { return fixed_size_binary(3); } + + void AssertDropNull(const std::string& values, const std::string& expected) { + CheckDropNull(value_type(), values, expected); + } +}; + +TEST_F(TestDropNullKernelFSB, DropNullFixedSizeBinary) { + this->AssertDropNull(R"(["aaa", "bbb", "ccc"])", R"(["aaa", "bbb", "ccc"])"); + this->AssertDropNull(R"([null, "bbb", "ccc"])", "[\"bbb\", \"ccc\"]"); + + this->TestNoValidityBitmapButUnknownNullCount(this->value_type(), + R"(["aaa", "bbb", "ccc"])"); +} + +class TestDropNullKernelWithList : public TestDropNullKernelTyped {}; + +TEST_F(TestDropNullKernelWithList, DropNullListInt32) { + std::string list_json = "[[], [1,2], null, [3]]"; + CheckDropNull(list(int32()), list_json, "[[], [1,2], [3]]"); + this->TestNoValidityBitmapButUnknownNullCount(list(int32()), "[[], [1,2], [3]]"); +} + +TEST_F(TestDropNullKernelWithList, DropNullListListInt32) { + std::string list_json = R"([ + [], + [[1], [2, null, 2], []], + null, + [[3, null], null] + ])"; + auto type = list(list(int32())); + CheckDropNull(type, list_json, R"([ + [], + [[1], [2, null, 2], []], + [[3, null], null] + ])"); + + this->TestNoValidityBitmapButUnknownNullCount(type, + "[[[1], [2, null, 2], []], [[3, null]]]"); +} + +class TestDropNullKernelWithLargeList : public TestDropNullKernelTyped {}; + +TEST_F(TestDropNullKernelWithLargeList, DropNullLargeListInt32) { + std::string list_json = "[[], [1,2], null, [3]]"; + CheckDropNull(large_list(int32()), list_json, "[[], [1,2], [3]]"); + + this->TestNoValidityBitmapButUnknownNullCount( + fixed_size_list(int32(), 3), "[[1, null, 3], [4, 5, 6], [7, 8, null]]"); +} + +class TestDropNullKernelWithFixedSizeList + : public TestDropNullKernelTyped {}; + +TEST_F(TestDropNullKernelWithFixedSizeList, DropNullFixedSizeListInt32) { + std::string list_json = "[null, [1, null, 3], [4, 5, 6], [7, 8, null]]"; + CheckDropNull(fixed_size_list(int32(), 3), list_json, + "[[1, null, 3], [4, 5, 6], [7, 8, null]]"); + + this->TestNoValidityBitmapButUnknownNullCount( + fixed_size_list(int32(), 3), "[[1, null, 3], [4, 5, 6], [7, 8, null]]"); +} + +class TestDropNullKernelWithMap : public TestDropNullKernelTyped {}; + +TEST_F(TestDropNullKernelWithMap, DropNullMapStringToInt32) { + std::string map_json = R"([ + [["joe", 0], ["mark", null]], + null, + [["cap", 8]], + [] + ])"; + std::string expected_json = R"([ + [["joe", 0], ["mark", null]], + [["cap", 8]], + [] + ])"; + CheckDropNull(map(utf8(), int32()), map_json, expected_json); +} + +class TestDropNullKernelWithStruct : public TestDropNullKernelTyped {}; + +TEST_F(TestDropNullKernelWithStruct, DropNullStruct) { + auto struct_type = struct_({field("a", int32()), field("b", utf8())}); + auto struct_json = R"([ + null, + {"a": 1, "b": ""}, + {"a": 2, "b": "hello"}, + {"a": 4, "b": "eh"} + ])"; + auto expected_struct_json = R"([ + {"a": 1, "b": ""}, + {"a": 2, "b": "hello"}, + {"a": 4, "b": "eh"} + ])"; + CheckDropNull(struct_type, struct_json, expected_struct_json); + this->TestNoValidityBitmapButUnknownNullCount(struct_type, expected_struct_json); +} + +class TestDropNullKernelWithUnion : public TestDropNullKernelTyped {}; + +TEST_F(TestDropNullKernelWithUnion, DropNullUnion) { + auto union_type = dense_union({field("a", int32()), field("b", utf8())}, {2, 5}); + auto union_json = R"([ + [2, null], + [2, 222], + [5, "hello"], + [5, "eh"], + [2, null], + [2, 111], + [5, null] + ])"; + CheckDropNull(union_type, union_json, union_json); +} + +class TestDropNullKernelWithRecordBatch : public TestDropNullKernelTyped { + public: + void AssertDropNull(const std::shared_ptr& schm, const std::string& batch_json, + const std::string& expected_batch) { + std::shared_ptr actual; + + ASSERT_OK(this->DoDropNull(schm, batch_json, &actual)); + ValidateOutput(actual); + ASSERT_BATCHES_EQUAL(*RecordBatchFromJSON(schm, expected_batch), *actual); + } + + Status DoDropNull(const std::shared_ptr& schm, const std::string& batch_json, + std::shared_ptr* out) { + auto batch = RecordBatchFromJSON(schm, batch_json); + ARROW_ASSIGN_OR_RAISE(Datum out_datum, DropNull(batch)); + *out = out_datum.record_batch(); + return Status::OK(); + } +}; + +TEST_F(TestDropNullKernelWithRecordBatch, DropNullRecordBatch) { + std::vector> fields = {field("a", int32()), field("b", utf8())}; + auto schm = schema(fields); + + auto batch_json = R"([ + {"a": null, "b": "yo"}, + {"a": 1, "b": ""}, + {"a": 2, "b": "hello"}, + {"a": 4, "b": "eh"} + ])"; + this->AssertDropNull(schm, batch_json, R"([ + {"a": 1, "b": ""}, + {"a": 2, "b": "hello"}, + {"a": 4, "b": "eh"} + ])"); + + batch_json = R"([ + {"a": null, "b": "yo"}, + {"a": 1, "b": null}, + {"a": null, "b": "hello"}, + {"a": 4, "b": null} + ])"; + this->AssertDropNull(schm, batch_json, R"([])"); + this->AssertDropNull(schm, R"([])", R"([])"); +} + +class TestDropNullKernelWithChunkedArray : public TestDropNullKernelTyped { + public: + TestDropNullKernelWithChunkedArray() + : sizes_({0, 1, 2, 4, 16, 31, 1234}), + null_probabilities_({0.0, 0.1, 0.5, 0.9, 1.0}) {} + + void AssertDropNull(const std::shared_ptr& type, + const std::vector& values, + const std::vector& expected) { + std::shared_ptr actual; + ASSERT_OK(this->DoDropNull(type, values, &actual)); + ValidateOutput(actual); + + AssertChunkedEqual(*ChunkedArrayFromJSON(type, expected), *actual); + } + + Status DoDropNull(const std::shared_ptr& type, + const std::vector& values, + std::shared_ptr* out) { + ARROW_ASSIGN_OR_RAISE(Datum out_datum, DropNull(ChunkedArrayFromJSON(type, values))); + *out = out_datum.chunked_array(); + return Status::OK(); + } + + template + void CheckDropNullWithSlices(ArrayFactory&& factory) { + for (auto size : this->sizes_) { + for (auto null_probability : this->null_probabilities_) { + std::shared_ptr concatenated_array; + std::shared_ptr chunked_array; + factory(size, null_probability, &chunked_array, &concatenated_array); + + ASSERT_OK_AND_ASSIGN(auto out_datum, DropNull(chunked_array)); + auto actual_chunked_array = out_datum.chunked_array(); + ASSERT_OK_AND_ASSIGN(auto actual, Concatenate(actual_chunked_array->chunks())); + + ASSERT_OK_AND_ASSIGN(out_datum, DropNull(*concatenated_array)); + auto expected = out_datum.make_array(); + + AssertArraysEqual(*expected, *actual); + } + } + } + + std::vector sizes_; + std::vector null_probabilities_; +}; + +TEST_F(TestDropNullKernelWithChunkedArray, DropNullChunkedArray) { + this->AssertDropNull(int8(), {"[]"}, {"[]"}); + this->AssertDropNull(int8(), {"[null]", "[8, null]"}, {"[8]"}); + + this->AssertDropNull(int8(), {"[null]", "[null, null]"}, {"[]"}); + this->AssertDropNull(int8(), {"[7]", "[8, 9]"}, {"[7]", "[8, 9]"}); + this->AssertDropNull(int8(), {"[]", "[]"}, {"[]", "[]"}); +} + +TEST_F(TestDropNullKernelWithChunkedArray, DropNullChunkedArrayWithSlices) { + // With Null Arrays + this->CheckDropNullWithSlices([this](int32_t size, double null_probability, + std::shared_ptr* out_chunked_array, + std::shared_ptr* out_concatenated_array) { + auto array = std::make_shared(size); + auto offsets = this->Offsets(size, 3); + auto slices = this->Slices(array, offsets); + *out_chunked_array = std::make_shared(std::move(slices)); + + ASSERT_OK_AND_ASSIGN(*out_concatenated_array, + Concatenate((*out_chunked_array)->chunks())); + }); + // Without Null Arrays + this->CheckDropNullWithSlices([this](int32_t size, double null_probability, + std::shared_ptr* out_chunked_array, + std::shared_ptr* out_concatenated_array) { + auto array = this->rng_.ArrayOf(int16(), size, null_probability); + auto offsets = this->Offsets(size, 3); + auto slices = this->Slices(array, offsets); + *out_chunked_array = std::make_shared(std::move(slices)); + + ASSERT_OK_AND_ASSIGN(*out_concatenated_array, + Concatenate((*out_chunked_array)->chunks())); + }); +} + +class TestDropNullKernelWithTable : public TestDropNullKernelTyped
{ + public: + TestDropNullKernelWithTable() + : sizes_({0, 1, 4, 31, 1234}), null_probabilities_({0.0, 0.1, 0.5, 0.9, 1.0}) {} + + void AssertDropNull(const std::shared_ptr& schm, + const std::vector& table_json, + const std::vector& expected_table) { + std::shared_ptr
actual; + ASSERT_OK(this->DoDropNull(schm, table_json, &actual)); + ValidateOutput(actual); + ASSERT_TABLES_EQUAL(*TableFromJSON(schm, expected_table), *actual); + } + + Status DoDropNull(const std::shared_ptr& schm, + const std::vector& values, std::shared_ptr
* out) { + ARROW_ASSIGN_OR_RAISE(Datum out_datum, DropNull(TableFromJSON(schm, values))); + *out = out_datum.table(); + return Status::OK(); + } + + template + void CheckDropNullWithSlices(ArrayFactory&& factory) { + for (auto size : this->sizes_) { + for (auto null_probability : this->null_probabilities_) { + std::shared_ptr
table_w_slices; + std::shared_ptr
table_wo_slices; + + factory(size, null_probability, &table_w_slices, &table_wo_slices); + + ASSERT_OK_AND_ASSIGN(auto out_datum, DropNull(table_w_slices)); + ValidateOutput(out_datum); + auto actual = out_datum.table(); + + ASSERT_OK_AND_ASSIGN(out_datum, DropNull(table_wo_slices)); + ValidateOutput(out_datum); + auto expected = out_datum.table(); + if (actual->num_rows() > 0) { + ASSERT_TRUE(actual->num_rows() == expected->num_rows()); + for (int index = 0; index < actual->num_columns(); index++) { + ASSERT_OK_AND_ASSIGN(auto actual_col, + Concatenate(actual->column(index)->chunks())); + ASSERT_OK_AND_ASSIGN(auto expected_col, + Concatenate(expected->column(index)->chunks())); + AssertArraysEqual(*actual_col, *expected_col); + } + } + } + } + } + + std::vector sizes_; + std::vector null_probabilities_; +}; + +TEST_F(TestDropNullKernelWithTable, DropNullTable) { + std::vector> fields = {field("a", int32()), field("b", utf8())}; + auto schm = schema(fields); + + { + std::vector table_json = {R"([ + {"a": null, "b": "yo"}, + {"a": 1, "b": ""} + ])", + R"([ + {"a": 2, "b": "hello"}, + {"a": 4, "b": "eh"} + ])"}; + std::vector expected_table_json = {R"([ + {"a": 1, "b": ""} + ])", + R"([ + {"a": 2, "b": "hello"}, + {"a": 4, "b": "eh"} + ])"}; + this->AssertDropNull(schm, table_json, expected_table_json); + } + { + std::vector table_json = {R"([ + {"a": null, "b": "yo"}, + {"a": 1, "b": null} + ])", + R"([ + {"a": 2, "b": null}, + {"a": null, "b": "eh"} + ])"}; + std::shared_ptr
actual; + ASSERT_OK(this->DoDropNull(schm, table_json, &actual)); + AssertSchemaEqual(schm, actual->schema()); + ASSERT_EQ(actual->num_rows(), 0); + } +} + +TEST_F(TestDropNullKernelWithTable, DropNullTableWithSlices) { + // With Null Arrays + this->CheckDropNullWithSlices([this](int32_t size, double null_probability, + std::shared_ptr
* out_table_w_slices, + std::shared_ptr
* out_table_wo_slices) { + FieldVector fields = {field("a", int32()), field("b", utf8())}; + auto schm = schema(fields); + ASSERT_OK_AND_ASSIGN(auto col_a, MakeArrayOfNull(int32(), size)); + ASSERT_OK_AND_ASSIGN(auto col_b, MakeArrayOfNull(utf8(), size)); + + // Compute random chunkings of columns `a` and `b` + auto slices_a = this->Slices(col_a, this->Offsets(size, 3)); + auto slices_b = this->Slices(col_b, this->Offsets(size, 3)); + + ChunkedArrayVector table_content_w_slices{ + std::make_shared(std::move(slices_a)), + std::make_shared(std::move(slices_b))}; + *out_table_w_slices = Table::Make(schm, std::move(table_content_w_slices), size); + + ChunkedArrayVector table_content_wo_slices{std::make_shared(col_a), + std::make_shared(col_b)}; + *out_table_wo_slices = Table::Make(schm, std::move(table_content_wo_slices), size); + }); + + // Without Null Arrays + this->CheckDropNullWithSlices([this](int32_t size, double null_probability, + std::shared_ptr
* out_table_w_slices, + std::shared_ptr
* out_table_wo_slices) { + FieldVector fields = {field("a", int32()), field("b", utf8())}; + auto schm = schema(fields); + auto col_a = this->rng_.ArrayOf(int32(), size, null_probability); + auto col_b = this->rng_.ArrayOf(utf8(), size, null_probability); + + // Compute random chunkings of columns `a` and `b` + auto slices_a = this->Slices(col_a, this->Offsets(size, 3)); + auto slices_b = this->Slices(col_b, this->Offsets(size, 3)); + + ChunkedArrayVector table_content_w_slices{ + std::make_shared(std::move(slices_a)), + std::make_shared(std::move(slices_b))}; + *out_table_w_slices = Table::Make(schm, std::move(table_content_w_slices), size); + + ChunkedArrayVector table_content_wo_slices{std::make_shared(col_a), + std::make_shared(col_b)}; + *out_table_wo_slices = Table::Make(schm, std::move(table_content_wo_slices), size); + }); +} + } // namespace compute } // namespace arrow diff --git a/cpp/src/arrow/datum.cc b/cpp/src/arrow/datum.cc index d3ff6aba0af..397e91de5b9 100644 --- a/cpp/src/arrow/datum.cc +++ b/cpp/src/arrow/datum.cc @@ -112,14 +112,20 @@ const std::shared_ptr& Datum::schema() const { } int64_t Datum::length() const { - if (this->kind() == Datum::ARRAY) { - return util::get>(this->value)->length; - } else if (this->kind() == Datum::CHUNKED_ARRAY) { - return util::get>(this->value)->length(); - } else if (this->kind() == Datum::SCALAR) { - return 1; + switch (this->kind()) { + case Datum::ARRAY: + return util::get>(this->value)->length; + case Datum::CHUNKED_ARRAY: + return util::get>(this->value)->length(); + case Datum::RECORD_BATCH: + return util::get>(this->value)->num_rows(); + case Datum::TABLE: + return util::get>(this->value)->num_rows(); + case Datum::SCALAR: + return 1; + default: + return kUnknownLength; } - return kUnknownLength; } int64_t Datum::null_count() const { diff --git a/docs/source/cpp/compute.rst b/docs/source/cpp/compute.rst index d429b6cfecd..638d47e2990 100644 --- a/docs/source/cpp/compute.rst +++ b/docs/source/cpp/compute.rst @@ -1164,23 +1164,30 @@ Associative transforms Selections ~~~~~~~~~~ -These functions select a subset of the first input defined by the second input. +These functions select and return a subset of their input. +---------------+--------+--------------+--------------+--------------+-------------------------+-----------+ | Function name | Arity | Input type 1 | Input type 2 | Output type | Options class | Notes | +===============+========+==============+==============+==============+=========================+===========+ -| filter | Binary | Any | Boolean | Input type 1 | :struct:`FilterOptions` | \(1) \(2) | +| drop_null | Unary | Any | - | Input type 1 | | \(1) \(2) | +---------------+--------+--------------+--------------+--------------+-------------------------+-----------+ -| take | Binary | Any | Integer | Input type 1 | :struct:`TakeOptions` | \(1) \(3) | +| filter | Binary | Any | Boolean | Input type 1 | :struct:`FilterOptions` | \(1) \(3) | +---------------+--------+--------------+--------------+--------------+-------------------------+-----------+ +| take | Binary | Any | Integer | Input type 1 | :struct:`TakeOptions` | \(1) \(4) | ++---------------+--------+--------------+--------------+--------------+-------------------------+-----------+ + +* \(1) Sparse unions are unsupported. -* \(1) Unions are unsupported. +* \(2) Each element in the input is appended to the output iff it is non-null. + If the input is a record batch or table, any null value in a column drops + the entire row. -* \(2) Each element in input 1 is appended to the output iff the corresponding - element in input 2 is true. +* \(3) Each element in input 1 (the values) is appended to the output iff + the corresponding element in input 2 (the filter) is true. How + nulls in the filter are handled can be configured using FilterOptions. -* \(3) For each element *i* in input 2, the *i*'th element in input 1 is - appended to the output. +* \(4) For each element *i* in input 2 (the indices), the *i*'th element + in input 1 (the values) is appended to the output. Sorts and partitions ~~~~~~~~~~~~~~~~~~~~ diff --git a/python/pyarrow/array.pxi b/python/pyarrow/array.pxi index 62523696c8b..455ec3bac06 100644 --- a/python/pyarrow/array.pxi +++ b/python/pyarrow/array.pxi @@ -1117,6 +1117,12 @@ cdef class Array(_PandasConvertible): """ return _pc().take(self, indices) + def drop_null(self): + """ + Remove missing values from an array. + """ + return _pc().drop_null(self) + def filter(self, Array mask, null_selection_behavior='drop'): """ Select values from an array. See pyarrow.compute.filter for full usage. diff --git a/python/pyarrow/table.pxi b/python/pyarrow/table.pxi index d92bdb2efa3..8c1355d2016 100644 --- a/python/pyarrow/table.pxi +++ b/python/pyarrow/table.pxi @@ -397,6 +397,13 @@ cdef class ChunkedArray(_PandasConvertible): """ return _pc().take(self, indices) + def drop_null(self): + """ + Remove missing values from a chunked array. + See pyarrow.compute.drop_null for full description. + """ + return _pc().drop_null(self) + def unify_dictionaries(self, MemoryPool memory_pool=None): """ Unify dictionaries across all chunks. @@ -951,11 +958,18 @@ cdef class RecordBatch(_PandasConvertible): def take(self, object indices): """ - Select records from an RecordBatch. See pyarrow.compute.take for full + Select records from a RecordBatch. See pyarrow.compute.take for full usage. """ return _pc().take(self, indices) + def drop_null(self): + """ + Remove missing values from a RecordBatch. + See pyarrow.compute.drop_null for full usage. + """ + return _pc().drop_null(self) + def to_pydict(self): """ Convert the RecordBatch to a dict or OrderedDict. @@ -1318,11 +1332,18 @@ cdef class Table(_PandasConvertible): def take(self, object indices): """ - Select records from an Table. See :func:`pyarrow.compute.take` for full + Select records from a Table. See :func:`pyarrow.compute.take` for full usage. """ return _pc().take(self, indices) + def drop_null(self): + """ + Remove missing values from a Table. + See :func:`pyarrow.compute.drop_null` for full usage. + """ + return _pc().drop_null(self) + def select(self, object columns): """ Select columns of the Table. diff --git a/python/pyarrow/tests/test_compute.py b/python/pyarrow/tests/test_compute.py index 60a2f60f942..099687967ac 100644 --- a/python/pyarrow/tests/test_compute.py +++ b/python/pyarrow/tests/test_compute.py @@ -974,6 +974,82 @@ def test_take_null_type(): assert len(table.take(indices).column(0)) == 4 +@pytest.mark.parametrize(('ty', 'values'), all_array_types) +def test_drop_null(ty, values): + arr = pa.array(values, type=ty) + result = arr.drop_null() + result.validate(full=True) + indices = [i for i in range(len(arr)) if arr[i].is_valid] + expected = arr.take(pa.array(indices)) + assert result.equals(expected) + + +def test_drop_null_chunked_array(): + arr = pa.chunked_array([["a", None], ["c", "d", None], [None], []]) + expected_drop = pa.chunked_array([["a"], ["c", "d"], [], []]) + + result = arr.drop_null() + assert result.equals(expected_drop) + + +def test_drop_null_record_batch(): + batch = pa.record_batch( + [pa.array(["a", None, "c", "d", None])], names=["a'"]) + result = batch.drop_null() + expected = pa.record_batch([pa.array(["a", "c", "d"])], names=["a'"]) + assert result.equals(expected) + + batch = pa.record_batch( + [pa.array(["a", None, "c", "d", None]), + pa.array([None, None, "c", None, "e"])], names=["a'", "b'"]) + + result = batch.drop_null() + expected = pa.record_batch( + [pa.array(["c"]), pa.array(["c"])], names=["a'", "b'"]) + assert result.equals(expected) + + +def test_drop_null_table(): + table = pa.table([pa.array(["a", None, "c", "d", None])], names=["a"]) + expected = pa.table([pa.array(["a", "c", "d"])], names=["a"]) + result = table.drop_null() + assert result.equals(expected) + + table = pa.table([pa.chunked_array([["a", None], ["c", "d", None]]), + pa.chunked_array([["a", None], [None, "d", None]]), + pa.chunked_array([["a"], ["b"], [None], ["d", None]])], + names=["a", "b", "c"]) + expected = pa.table([pa.array(["a", "d"]), + pa.array(["a", "d"]), + pa.array(["a", "d"])], + names=["a", "b", "c"]) + result = table.drop_null() + assert result.equals(expected) + + table = pa.table([pa.chunked_array([["a", "b"], ["c", "d", "e"]]), + pa.chunked_array([["A"], ["B"], [None], ["D", None]]), + pa.chunked_array([["a`", None], ["c`", "d`", None]])], + names=["a", "b", "c"]) + expected = pa.table([pa.array(["a", "d"]), + pa.array(["A", "D"]), + pa.array(["a`", "d`"])], + names=["a", "b", "c"]) + result = table.drop_null() + assert result.equals(expected) + + +def test_drop_null_null_type(): + arr = pa.array([None] * 10) + chunked_arr = pa.chunked_array([[None] * 5] * 2) + batch = pa.record_batch([arr], names=['a']) + table = pa.table({'a': arr}) + + assert len(arr.drop_null()) == 0 + assert len(chunked_arr.drop_null()) == 0 + assert len(batch.drop_null().column(0)) == 0 + assert len(table.drop_null().column(0)) == 0 + + @pytest.mark.parametrize(('ty', 'values'), all_array_types) def test_filter(ty, values): arr = pa.array(values, type=ty)