Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions cpp/src/arrow/compute/api_vector.cc
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,20 @@ Result<std::shared_ptr<Array>> Take(const Array& values, const Array& indices,
return out.make_array();
}

// ----------------------------------------------------------------------
// Dropnull functions

Result<Datum> DropNull(const Datum& values, ExecContext* ctx) {
// Invoke metafunction which deals with Datum kinds other than just Array,
// ChunkedArray.
return CallFunction("drop_null", {values}, ctx);
}

Result<std::shared_ptr<Array>> DropNull(const Array& values, ExecContext* ctx) {
ARROW_ASSIGN_OR_RAISE(Datum out, DropNull(Datum(values), ctx));
return out.make_array();
}

// ----------------------------------------------------------------------
// Deprecated functions

Expand Down
18 changes: 18 additions & 0 deletions cpp/src/arrow/compute/api_vector.h
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,24 @@ Result<std::shared_ptr<Array>> Take(const Array& values, const Array& indices,
const TakeOptions& options = TakeOptions::Defaults(),
ExecContext* ctx = NULLPTR);

/// \brief Drop Null from an array of values
///
/// The output array will be of the same type as the input values
/// array, with elements taken from the values array without nulls.
///
/// For example given values = ["a", "b", "c", null, "e", "f"],
/// the output will be = ["a", "b", "c", "e", "f"]
///
/// \param[in] values datum from which to take
/// \param[in] ctx the function execution context, optional
/// \return the resulting datum
ARROW_EXPORT
Result<Datum> DropNull(const Datum& values, ExecContext* ctx = NULLPTR);

/// \brief DropNull with Array inputs and output
ARROW_EXPORT
Result<std::shared_ptr<Array>> DropNull(const Array& values, ExecContext* ctx = NULLPTR);

/// \brief Returns indices that partition an array around n-th
/// sorted element.
///
Expand Down
168 changes: 168 additions & 0 deletions cpp/src/arrow/compute/kernels/vector_selection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
#include "arrow/util/bit_block_counter.h"
#include "arrow/util/bit_run_reader.h"
#include "arrow/util/bit_util.h"
#include "arrow/util/bitmap.h"
#include "arrow/util/bitmap_ops.h"
#include "arrow/util/bitmap_reader.h"
#include "arrow/util/int_util.h"
Expand Down Expand Up @@ -2146,6 +2147,170 @@ class TakeMetaFunction : public MetaFunction {
}
};

// ----------------------------------------------------------------------
// DropNull Implementation

Result<std::shared_ptr<arrow::BooleanArray>> GetDropNullFilter(const Array& values,
MemoryPool* memory_pool) {
auto bitmap_buffer = values.null_bitmap();
std::shared_ptr<arrow::BooleanArray> out_array = std::make_shared<BooleanArray>(
values.length(), bitmap_buffer, nullptr, 0, values.offset());
return out_array;
}

Result<std::shared_ptr<Array>> CreateEmptyArray(std::shared_ptr<DataType> type,
MemoryPool* memory_pool) {
std::unique_ptr<ArrayBuilder> builder;
RETURN_NOT_OK(MakeBuilder(memory_pool, type, &builder));
RETURN_NOT_OK(builder->Resize(0));
return builder->Finish();
}

Result<std::shared_ptr<ChunkedArray>> CreateEmptyChunkedArray(
std::shared_ptr<DataType> type, MemoryPool* memory_pool) {
std::vector<std::shared_ptr<Array>> new_chunks(1); // Hard-coded 1 for now
ARROW_ASSIGN_OR_RAISE(new_chunks[0], CreateEmptyArray(type, memory_pool));
return std::make_shared<ChunkedArray>(std::move(new_chunks));
}

Result<Datum> DropNullArray(const std::shared_ptr<Array>& values, ExecContext* ctx) {
if (values->null_count() == 0) {
return values;
}
if (values->null_count() == values->length()) {
return CreateEmptyArray(values->type(), ctx->memory_pool());
}
if (values->type()->id() == Type::type::NA) {
return std::make_shared<NullArray>(0);
}
ARROW_ASSIGN_OR_RAISE(auto drop_null_filter,
GetDropNullFilter(*values, ctx->memory_pool()));
return Filter(values, drop_null_filter, FilterOptions::Defaults(), ctx);
}

Result<Datum> DropNullChunkedArray(const std::shared_ptr<ChunkedArray>& values,
ExecContext* ctx) {
if (values->null_count() == 0) {
return values;
}
if (values->null_count() == values->length()) {
return CreateEmptyChunkedArray(values->type(), ctx->memory_pool());
}
std::vector<std::shared_ptr<Array>> new_chunks;
for (const auto& chunk : values->chunks()) {
ARROW_ASSIGN_OR_RAISE(auto new_chunk, DropNullArray(chunk, ctx));
if (new_chunk.length() > 0) {
new_chunks.push_back(new_chunk.make_array());
}
}
return std::make_shared<ChunkedArray>(std::move(new_chunks));
}

Result<Datum> DropNullRecordBatch(const std::shared_ptr<RecordBatch>& batch,
ExecContext* ctx) {
// Compute an upper bound of the final null count
int64_t null_count = 0;
for (const auto& column : batch->columns()) {
null_count += column->null_count();
}
if (null_count == 0) {
return batch;
}
ARROW_ASSIGN_OR_RAISE(auto dst,
AllocateEmptyBitmap(batch->num_rows(), ctx->memory_pool()));
BitUtil::SetBitsTo(dst->mutable_data(), 0, batch->num_rows(), true);
for (const auto& column : batch->columns()) {
if (column->type()->id() == Type::type::NA) {
BitUtil::SetBitsTo(dst->mutable_data(), 0, batch->num_rows(), false);
break;
}
if (column->null_bitmap_data()) {
::arrow::internal::BitmapAnd(column->null_bitmap_data(), column->offset(),
dst->data(), 0, column->length(), 0,
dst->mutable_data());
}
}
auto drop_null_filter = std::make_shared<BooleanArray>(batch->num_rows(), dst);
if (drop_null_filter->true_count() == 0) {
// Shortcut: construct empty result
ArrayVector empty_batch(batch->num_columns());
for (int i = 0; i < batch->num_columns(); i++) {
ARROW_ASSIGN_OR_RAISE(
empty_batch[i], CreateEmptyArray(batch->column(i)->type(), ctx->memory_pool()));
}
return RecordBatch::Make(batch->schema(), 0, std::move(empty_batch));
}
return Filter(Datum(batch), Datum(drop_null_filter), FilterOptions::Defaults(), ctx);
}

Result<Datum> DropNullTable(const std::shared_ptr<Table>& table, ExecContext* ctx) {
if (table->num_rows() == 0) {
return table;
}
// Compute an upper bound of the final null count
int64_t null_count = 0;
for (const auto& col : table->columns()) {
for (const auto& column_chunk : col->chunks()) {
null_count += column_chunk->null_count();
}
}
if (null_count == 0) {
return table;
}

arrow::RecordBatchVector filtered_batches;
TableBatchReader batch_iter(*table);
while (true) {
ARROW_ASSIGN_OR_RAISE(auto batch, batch_iter.Next());
if (batch == nullptr) {
break;
}
ARROW_ASSIGN_OR_RAISE(auto filtered_datum, DropNullRecordBatch(batch, ctx))
if (filtered_datum.length() > 0) {
filtered_batches.push_back(filtered_datum.record_batch());
}
}
return Table::FromRecordBatches(table->schema(), filtered_batches);
}

const FunctionDoc drop_null_doc(
"Drop nulls from the input",
("The output is populated with values from the input (Array, ChunkedArray,\n"
"RecordBatch, or Table) without the null values.\n"
"For the RecordBatch and Table cases, `drop_null` drops the full row if\n"
"there is any null."),
{"input"});

class DropNullMetaFunction : public MetaFunction {
public:
DropNullMetaFunction() : MetaFunction("drop_null", Arity::Unary(), &drop_null_doc) {}

Result<Datum> ExecuteImpl(const std::vector<Datum>& args,
const FunctionOptions* options,
ExecContext* ctx) const override {
switch (args[0].kind()) {
case Datum::ARRAY: {
return DropNullArray(args[0].make_array(), ctx);
} break;
case Datum::CHUNKED_ARRAY: {
return DropNullChunkedArray(args[0].chunked_array(), ctx);
} break;
case Datum::RECORD_BATCH: {
return DropNullRecordBatch(args[0].record_batch(), ctx);
} break;
case Datum::TABLE: {
return DropNullTable(args[0].table(), ctx);
} break;
default:
break;
}
return Status::NotImplemented(
"Unsupported types for drop_null operation: "
"values=",
args[0].ToString());
}
};

// ----------------------------------------------------------------------

template <typename Impl>
Expand Down Expand Up @@ -2261,6 +2426,9 @@ void RegisterVectorSelection(FunctionRegistry* registry) {
take_kernel_descrs, &kDefaultTakeOptions, registry);

DCHECK_OK(registry->AddFunction(std::make_shared<TakeMetaFunction>()));

// DropNull kernel
DCHECK_OK(registry->AddFunction(std::make_shared<DropNullMetaFunction>()));
}

} // namespace internal
Expand Down
Loading