Skip to content

Commit ddb9b59

Browse files
authored
Merge 79a09f7 into cd0aab0
2 parents cd0aab0 + 79a09f7 commit ddb9b59

File tree

5 files changed

+17
-8
lines changed

5 files changed

+17
-8
lines changed

ydb/core/kqp/executer_actor/kqp_data_executer.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2834,7 +2834,9 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
28342834
<< ", execution id " << context->CurrentExecutionId
28352835
<< ", checkpoint id " << checkpointId << ", generation " << Generation
28362836
<< ", state load mode " << FederatedQuery::StateLoadMode_Name(stateLoadMode)
2837-
<< ", streaming disposition " << streamingDisposition.ShortDebugString());
2837+
<< ", streaming disposition " << streamingDisposition.ShortDebugString()
2838+
<< ", has QueryPhysicalGraph " << (Request.QueryPhysicalGraph != nullptr)
2839+
<< ", enabled watermarks " << Request.QueryPhysicalGraph->GetPreparedQuery().GetPhysicalQuery().GetEnableWatermarks());
28382840
}
28392841

28402842
private:

ydb/core/kqp/executer_actor/kqp_planner.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -639,6 +639,7 @@ void TKqpPlanner::PrepareCheckpoints() {
639639
}
640640

641641
const auto enableCheckpoints = static_cast<bool>(CheckpointCoordinatorId);
642+
LOG_D("BuildCheckpointingAndWatermarksMode, enableCheckpoints: " << enableCheckpoints << ", EnableWatermarks: " << EnableWatermarks);
642643
TasksGraph.BuildCheckpointingAndWatermarksMode(enableCheckpoints, EnableWatermarks);
643644

644645
if (!enableCheckpoints) {

ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1555,9 +1555,11 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
15551555
auto sink = outputInfo.Buffer;
15561556

15571557
NKikimr::NMiniKQL::TUnboxedValueBatch dataBatch(sink->GetOutputType());
1558+
NDqProto::TWatermark watermark;
15581559
NDqProto::TCheckpoint checkpoint;
15591560

15601561
const ui64 dataSize = !outputInfo.Finished ? sink->Pop(dataBatch, bytes) : 0;
1562+
std::ignore = sink->Pop(watermark);
15611563
const bool hasCheckpoint = sink->Pop(checkpoint);
15621564
if (!dataSize && !hasCheckpoint) {
15631565
if (!sink->IsFinished()) {

ydb/library/yql/dq/runtime/dq_input_producer.cpp

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ class TDqInputUnionStreamValue : public TComputationValue<TDqInputUnionStreamVal
7676

7777
if (Batch.empty()) {
7878
// pass watermark and wait for drain only if watermarks enabled
79-
if (WatermarksEnabled() && TrySendWatermark()) {
79+
if (TrySendWatermark()) {
8080
return NUdf::EFetchStatus::Yield;
8181
}
8282

@@ -94,7 +94,8 @@ class TDqInputUnionStreamValue : public TComputationValue<TDqInputUnionStreamVal
9494
}
9595

9696
// pass watermark and wait for drain only if watermarks enabled and batch is still empty
97-
if (Batch.empty() && WatermarksEnabled() && TrySendWatermark()) {
97+
if (Batch.empty()) {
98+
std::ignore = TrySendWatermark();
9899
return NUdf::EFetchStatus::Yield;
99100
}
100101
}
@@ -122,7 +123,7 @@ class TDqInputUnionStreamValue : public TComputationValue<TDqInputUnionStreamVal
122123

123124
if (Batch.empty()) {
124125
// pass watermark and wait for drain only if watermarks enabled
125-
if (WatermarksEnabled() && TrySendWatermark()) {
126+
if (TrySendWatermark()) {
126127
return NUdf::EFetchStatus::Yield;
127128
}
128129

@@ -140,7 +141,8 @@ class TDqInputUnionStreamValue : public TComputationValue<TDqInputUnionStreamVal
140141
}
141142

142143
// pass watermark and wait for drain only if watermarks enabled and batch is still empty
143-
if (Batch.empty() && WatermarksEnabled() && TrySendWatermark()) {
144+
if (Batch.empty()) {
145+
std::ignore = TrySendWatermark();
144146
return NUdf::EFetchStatus::Yield;
145147
}
146148
}
@@ -200,8 +202,10 @@ class TDqInputUnionStreamValue : public TComputationValue<TDqInputUnionStreamVal
200202
}
201203

202204
[[nodiscard]] bool TrySendWatermark() {
203-
Y_DEBUG_ABORT_UNLESS(WatermarksEnabled());
204-
if (!Watermark || !NotifyWatermarkTracker(InputKey, *Watermark)) {
205+
if (!WatermarksEnabled()) {
206+
return false;
207+
}
208+
if (!Watermark || !NotifyWatermarkTracker(InputKey, *Watermark) || !WatermarksTracker->HasPendingWatermark()) {
205209
return false;
206210
}
207211
Y_DEBUG_ABORT_UNLESS(WatermarksTracker->HasPendingWatermark());

yql/essentials/minikql/comp_nodes/mkql_multihopping.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -408,7 +408,7 @@ class TMultiHoppingCoreWrapper: public TStatefulSourceComputationNode<TMultiHopp
408408
auto curHopIndex = keyState.HopIndex;
409409
auto curHopIndexModBuckets = curHopIndex % bucketsForKey.size();
410410

411-
if (curHopIndex > closeBeforeIndex) {
411+
if (curHopIndex >= closeBeforeIndex) {
412412
return keyState.NextHopIndex <= keyState.HopIndex && keyState.FutureEvents.empty();
413413
}
414414

0 commit comments

Comments
 (0)