Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,37 +3,34 @@
#include <ydb/library/actors/core/log.h>
#include <yql/essentials/types/binary_json/read.h>
#include <util/generic/ylimits.h>
#include <util/string/cast.h>

#include <limits>
#include <cmath>

namespace NKikimr::NArrow::NAccessor {

namespace {

std::optional<std::string> JsonNumberToString(double val) {
if (std::isnan(val)) {
std::optional<TString> TBinaryJsonValueView::JsonNumberToString(double jsonNumber) {
if (std::isnan(jsonNumber)) {
return std::nullopt;
}

double integerPart;
double fractionPart = std::modf(val, &integerPart);
double fractionPart = std::modf(jsonNumber, &integerPart);
if (!(fractionPart == 0.0)) {
return std::to_string(val);
return ::ToString(jsonNumber);
}

static constexpr double minD = static_cast<double>(std::numeric_limits<i64>::min());
static constexpr double maxD = MaxFloor<i64>();

if (minD <= val && val <= maxD) {
return std::to_string(static_cast<i64>(val));
if (minD <= jsonNumber && jsonNumber <= maxD) {
return ::ToString(static_cast<i64>(jsonNumber));
}

return std::to_string(val);
return ::ToString(jsonNumber);
}

} // namespace

TBinaryJsonValueView::TBinaryJsonValueView(const TStringBuf& rawValue)
: RawValue(rawValue) {
if (!RawValue.empty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ class TBinaryJsonValueView {

std::optional<TStringBuf> GetScalarOptional() const;

static std::optional<TString> JsonNumberToString(double jsonNumber);

private:
TStringBuf RawValue;
mutable TString ScalarHolder;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@ IChunkedArray::TLocalChunkedArrayAddress TDeserializeChunkedArray::DoGetLocalChu
}
if (!!Data) {
auto result = Loader->ApplyConclusion(Data, GetRecordsCount());
Y_ABORT_UNLESS(result, "Incorrect object for result request. internal path id: %s portion id: %" PRIu64 " error: %s ", InternalPathId.data(), PortionId, result.GetErrorMessage().data());
Y_ABORT_UNLESS(result.IsSuccess(), "Incorrect object for result request. internal path id: %s portion id: %" PRIu64 " error: %s ", InternalPathId.data(), PortionId, result.GetErrorMessage().data());
return TLocalChunkedArrayAddress(result.DetachResult(), 0, 0);
} else {
AFL_VERIFY(!!DataBuffer);
auto result = Loader->ApplyConclusion(TString(DataBuffer.data(), DataBuffer.size()), GetRecordsCount());
Y_ABORT_UNLESS(result, "Incorrect object for result request. internal path id: %s portion id: %" PRIu64 " error: %s ", InternalPathId.data(), PortionId, result.GetErrorMessage().data());
Y_ABORT_UNLESS(result.IsSuccess(), "Incorrect object for result request. internal path id: %s portion id: %" PRIu64 " error: %s ", InternalPathId.data(), PortionId, result.GetErrorMessage().data());
return TLocalChunkedArrayAddress(result.DetachResult(), 0, 0);
}
}
Expand Down
137 changes: 40 additions & 97 deletions ydb/core/formats/arrow/accessor/sub_columns/accessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@

#include <ydb/core/formats/arrow/accessor/composite_serial/accessor.h>
#include <ydb/core/formats/arrow/accessor/plain/constructor.h>
#include <ydb/core/formats/arrow/accessor/sub_columns/json_value_path.h>
#include <ydb/core/formats/arrow/save_load/loader.h>
#include <ydb/core/formats/arrow/size_calcer.h>
#include <ydb/core/formats/arrow/splitter/simple.h>

#include <ydb/library/formats/arrow/protos/accessor.pb.h>
#include <ydb/library/formats/arrow/simple_arrays_cache.h>

#include <yql/essentials/minikql/jsonpath/parser/parser.h>
#include <yql/essentials/types/binary_json/format.h>
#include <yql/essentials/types/binary_json/write.h>

Expand Down Expand Up @@ -113,103 +115,16 @@ TString TSubColumnsArray::SerializeToString(const TChunkConstructionData& extern
return result;
}

class TJsonRestorer {
private:
NJson::TJsonValue Result;

public:
bool IsNull() const {
return !Result.IsDefined();
}

TConclusion<NBinaryJson::TBinaryJson> Finish() {
auto bJson = NBinaryJson::SerializeToBinaryJson(Result.GetStringRobust());
if (const TString* val = std::get_if<TString>(&bJson)) {
return TConclusionStatus::Fail(*val);
} else if (const NBinaryJson::TBinaryJson* val = std::get_if<NBinaryJson::TBinaryJson>(&bJson)) {
return std::move(*val);
} else {
return TConclusionStatus::Fail("undefined case for binary json construction");
}
}

void SetValueByPath(const TString& path, const NJson::TJsonValue& jsonValue) {
ui32 start = 0;
bool enqueue = false;
bool wasEnqueue = false;
NJson::TJsonValue* current = &Result;
for (ui32 i = 0; i < path.size(); ++i) {
if (path[i] == '\\') {
++i;
continue;
}
if (path[i] == '\'' || path[i] == '\"') {
wasEnqueue = true;
enqueue = !enqueue;
continue;
}
if (enqueue) {
continue;
}
if (path[i] == '.') {
if (wasEnqueue) {
AFL_VERIFY(i > start + 2);
TStringBuf key(path.data() + start + 1, (i - 1) - start - 1);
NJson::TJsonValue* currentNext = nullptr;
if (current->GetValuePointer(key, &currentNext)) {
current = currentNext;
} else {
current = &current->InsertValue(key, NJson::JSON_MAP);
}
} else {
AFL_VERIFY(i > start);
TStringBuf key(path.data() + start, i - start);
NJson::TJsonValue* currentNext = nullptr;
if (current->GetValuePointer(key, &currentNext)) {
current = currentNext;
} else {
ui32 keyIndex;
if (key.StartsWith("[") && key.EndsWith("]") && TryFromString<ui32>(key.data() + 1, key.size() - 2, keyIndex)) {
AFL_VERIFY(!current->IsDefined() || current->IsArray() || (current->IsMap() && current->GetMapSafe().empty()));
current->SetType(NJson::JSON_ARRAY);
if (current->GetArraySafe().size() <= keyIndex) {
current->GetArraySafe().resize(keyIndex + 1);
}
current = &current->GetArraySafe()[keyIndex];
} else {
AFL_VERIFY(!current->IsArray())("current_type", current->GetType())("current", current->GetStringRobust());
current = &current->InsertValue(key, NJson::JSON_MAP);
}
}
}
wasEnqueue = false;
start = i + 1;
}
}
if (wasEnqueue) {
AFL_VERIFY(path.size() > start + 2)("path", path)("start", start);
TStringBuf key(path.data() + start + 1, (path.size() - 1) - start - 1);
current->InsertValue(key, jsonValue);
} else {
AFL_VERIFY(path.size() >= start)("path", path)("start", start);
TStringBuf key(path.data() + start, (path.size()) - start);
ui32 keyIndex;
if (key.StartsWith("[") && key.EndsWith("]") && TryFromString<ui32>(key.data() + 1, key.size() - 2, keyIndex)) {
AFL_VERIFY(!current->IsDefined() || current->IsArray() || (current->IsMap() && current->GetMapSafe().empty()));
current->SetType(NJson::JSON_ARRAY);

if (current->GetArraySafe().size() <= keyIndex) {
current->GetArraySafe().resize(keyIndex + 1);
}
current->GetArraySafe()[keyIndex] = jsonValue;
} else {
AFL_VERIFY(!current->IsArray())("key", key)("current", current->GetStringRobust())("full", Result.GetStringRobust())(
"current_type", current->GetType());
current->InsertValue(key, jsonValue);
}
}
TConclusion<NBinaryJson::TBinaryJson> ToBinaryJson(const TJsonRestorer& restorer) {
auto bJson = NBinaryJson::SerializeToBinaryJson(restorer.GetResult().GetStringRobust());
if (const TString* val = std::get_if<TString>(&bJson)) {
return TConclusionStatus::Fail(*val);
} else if (const NBinaryJson::TBinaryJson* val = std::get_if<NBinaryJson::TBinaryJson>(&bJson)) {
return std::move(*val);
} else {
return TConclusionStatus::Fail("undefined case for binary json construction");
}
};
}

std::shared_ptr<arrow::Array> TSubColumnsArray::BuildBJsonArray(const TColumnConstructionContext& context) const {
auto it = BuildUnorderedIterator();
Expand All @@ -233,7 +148,7 @@ std::shared_ptr<arrow::Array> TSubColumnsArray::BuildBJsonArray(const TColumnCon
if (value.IsNull()) {
TStatusValidator::Validate(builder->AppendNull());
} else {
const TConclusion<NBinaryJson::TBinaryJson> bJson = value.Finish();
const TConclusion<NBinaryJson::TBinaryJson> bJson = ToBinaryJson(value);
NArrow::Append<arrow::BinaryType>(*builder, arrow::util::string_view(bJson->data(), bJson->size()));
}
};
Expand Down Expand Up @@ -268,4 +183,32 @@ IChunkedArray::TLocalDataAddress TSubColumnsArray::DoGetLocalData(
return TLocalDataAddress(BuildBJsonArray(TColumnConstructionContext()), 0, 0);
}

bool TJsonRestorer::IsNull() const {
return !Result.IsDefined();
}

const NJson::TJsonValue& TJsonRestorer::GetResult() const {
return Result;
}

void TJsonRestorer::SetValueByPath(const TString& path, const NJson::TJsonValue& jsonValue) {
// Path may be empty (for backward compatibility), so make it $."" in this case
auto splitResult = NSubColumns::SplitJsonPath(NSubColumns::ToJsonPath(path.empty() ? "\"\"" : path), NSubColumns::TJsonPathSplitSettings{.FillTypes = true});
AFL_VERIFY(splitResult.IsSuccess())("error", splitResult.GetErrorMessage())("path", path);
const auto [pathItems, pathTypes, _] = splitResult.DetachResult();
AFL_VERIFY(pathItems.size() > 0);
AFL_VERIFY(pathItems.size() == pathTypes.size());
NJson::TJsonValue* current = &Result;
for (decltype(pathItems)::size_type i = 0; i < pathItems.size() - 1; ++i) {
AFL_VERIFY(pathTypes[i] == NYql::NJsonPath::EJsonPathItemType::MemberAccess);
NJson::TJsonValue* currentNext = nullptr;
if (current->GetValuePointer(pathItems[i], &currentNext)) {
current = currentNext;
} else {
current = &current->InsertValue(pathItems[i], NJson::JSON_MAP);
}
}
current->InsertValue(pathItems[pathItems.size() - 1], jsonValue);
}

} // namespace NKikimr::NArrow::NAccessor
17 changes: 15 additions & 2 deletions ydb/core/formats/arrow/accessor/sub_columns/accessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

#include <ydb/core/formats/arrow/accessor/abstract/accessor.h>
#include <ydb/core/formats/arrow/accessor/common/chunk_data.h>
#include <ydb/core/formats/arrow/accessor/sub_columns/json_value_path.h>
#include <ydb/core/formats/arrow/arrow_filter.h>
#include <ydb/core/formats/arrow/arrow_helpers.h>
#include <ydb/core/formats/arrow/common/container.h>
Expand Down Expand Up @@ -111,13 +112,25 @@ class TSubColumnsArray: public IChunkedArray {
return nullptr;
}

std::shared_ptr<IChunkedArray> GetPathAccessor(const std::string_view svPath, const ui32 recordsCount) const {
TConclusion<std::shared_ptr<NSubColumns::TJsonPathAccessor>> GetPathAccessor(const std::string_view svPath, const ui32 recordsCount) const {
auto accResult = ColumnsData.GetPathAccessor(svPath);
if (accResult) {
if (accResult.IsFail() || accResult.GetResult()->IsValid()) {
return accResult;
}
return OthersData.GetPathAccessor(svPath, recordsCount);
}
};

class TJsonRestorer {
private:
NJson::TJsonValue Result;

public:
bool IsNull() const;

const NJson::TJsonValue& GetResult() const;

void SetValueByPath(const TString& path, const NJson::TJsonValue& jsonValue);
};

} // namespace NKikimr::NArrow::NAccessor
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ TColumnsData TColumnsData::Slice(const ui32 offset, const ui32 count) const {
}
records.DeleteFieldsByIndex(indexesToRemove);
return TColumnsData(builder.Finish(), std::make_shared<TGeneralContainer>(std::move(records)));

} else {
return TColumnsData(TDictStats::BuildEmpty(), std::make_shared<TGeneralContainer>(0));
}
Expand Down
13 changes: 7 additions & 6 deletions ydb/core/formats/arrow/accessor/sub_columns/columns_storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include <contrib/libs/apache/arrow/cpp/src/arrow/array/array_binary.h>
#include <ydb/core/formats/arrow/accessor/common/binary_json_value_view.h>
#include <ydb/core/formats/arrow/accessor/sparsed/accessor.h>
#include <ydb/core/formats/arrow/accessor/sub_columns/json_value_path.h>

namespace NKikimr::NArrow::NAccessor::NSubColumns {

Expand All @@ -18,13 +19,13 @@ class TColumnsData {
YDB_READONLY_DEF(std::shared_ptr<TGeneralContainer>, Records);

public:
std::shared_ptr<IChunkedArray> GetPathAccessor(const std::string_view path) const {
auto idx = Stats.GetKeyIndexOptional(path);
if (!idx) {
return nullptr;
} else {
return Records->GetColumnVerified(*idx);
TConclusion<std::shared_ptr<TJsonPathAccessor>> GetPathAccessor(const std::string_view path) const {
auto jsonPathAccessorTrie = std::make_shared<NKikimr::NArrow::NAccessor::NSubColumns::TJsonPathAccessorTrie>();
for (ui32 i = 0; i < Stats.GetColumnsCount(); ++i) {
auto insertResult = jsonPathAccessorTrie->Insert(ToSubcolumnName(Stats.GetColumnName(i)), Records->GetColumnVerified(i));
AFL_VERIFY(insertResult.IsSuccess())("error", insertResult.GetErrorMessage());
}
return jsonPathAccessorTrie->GetAccessor(path);
Comment on lines +22 to +28
Copy link

Copilot AI Dec 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar performance issue: A new TJsonPathAccessorTrie is created and populated on every call to GetPathAccessor. This is inefficient for repeated lookups. Consider caching the trie as a member variable or using a more efficient lookup structure.

Copilot uses AI. Check for mistakes.
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dorooleg как думаешь что тут лучше? Мне кажется тут сокрее будут одноразовые акции по вызову GetPathAccessor, поэтому, чтобы лишний раз не строить на каждое создание объекта это дерево, сделал это здесь.

}

NJson::TJsonValue DebugJson() const {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ TConclusionStatus TJsonScanExtractor::DoAddDataToBuilders(const std::shared_ptr<

if (cursor.GetType() == NBinaryJson::EContainerType::Object) {
iterators.push_back(std::make_unique<TKVExtractor>(cursor.GetObjectIterator(), TStringBuf(), FirstLevelOnly));
} else if (cursor.GetType() == NBinaryJson::EContainerType::Array) {
iterators.push_back(std::make_unique<TArrayExtractor>(cursor.GetArrayIterator(), TStringBuf(), FirstLevelOnly));
} else {
return TConclusionStatus::Fail("Only top-level objects are supported in JSON subcolumns");
}

while (iterators.size()) {
Expand Down
18 changes: 6 additions & 12 deletions ydb/core/formats/arrow/accessor/sub_columns/direct_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#include "columns_storage.h"
#include "direct_builder.h"

#include <util/string/escape.h>
#include <ydb/core/formats/arrow/accessor/plain/accessor.h>
#include <ydb/core/formats/arrow/accessor/sparsed/accessor.h>

Expand Down Expand Up @@ -121,19 +122,12 @@ TOthersData TDataBuilder::MergeOthers(const std::vector<TColumnElements*>& other
}

std::string BuildString(const TStringBuf currentPrefix, const TStringBuf key) {
if (key.find(".") != std::string::npos) {
if (currentPrefix.size()) {
return Sprintf("%.*s.\"%.*s\"", currentPrefix.size(), currentPrefix.data(), key.size(), key.data());
} else {
return Sprintf("\"%.*s\"", key.size(), key.data());
}
} else {
if (currentPrefix.size()) {
return Sprintf("%.*s.%.*s", currentPrefix.size(), currentPrefix.data(), key.size(), key.data());
} else {
return std::string(key.data(), key.size());
}
const auto escapedKey = QuoteJsonItem(key);
if (currentPrefix.size()) {
return Sprintf("%.*s.%.*s", currentPrefix.size(), currentPrefix.data(), escapedKey.size(), escapedKey.data());
}

return Sprintf("%.*s", escapedKey.size(), escapedKey.data());
}

TStringBuf TDataBuilder::AddKeyOwn(const TStringBuf currentPrefix, std::string&& key) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,14 @@ TConclusionStatus IJsonObjectExtractor::AddDataToBuilder(TDataBuilder& dataBuild
auto container = value.GetContainer();
if (FirstLevelOnly || container.GetType() == NBinaryJson::EContainerType::Array) {
res = NBinaryJson::SerializeToBinaryJson(value);
// TODO: add support for arrays if needed
// } else if (container.GetType() == NBinaryJson::EContainerType::Array) {
// iterators.emplace_back(std::make_unique<TArrayExtractor>(container.GetArrayIterator(), key));
// addRes = false;
} else if (container.GetType() == NBinaryJson::EContainerType::Object) {
iterators.emplace_back(std::make_unique<TKVExtractor>(container.GetObjectIterator(), key));
addRes = false;
auto containerIt = container.GetObjectIterator();
if (!containerIt.HasNext()) {
res = NBinaryJson::SerializeToBinaryJson("{}");
} else {
iterators.emplace_back(std::make_unique<TKVExtractor>(containerIt, key));
addRes = false;
}
} else {
return TConclusionStatus::Fail("unexpected top value scalar in container iterator");
}
Expand Down
Loading
Loading