Skip to content

Commit 5889876

Browse files
committed
DRILL-6071: Limit batch size for flatten operator
1 parent a9ea4ec commit 5889876

File tree

12 files changed

+1095
-157
lines changed

12 files changed

+1095
-157
lines changed

exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,9 @@ private ExecConstants() {
7676
public static final String SPILL_FILESYSTEM = "drill.exec.spill.fs";
7777
public static final String SPILL_DIRS = "drill.exec.spill.directories";
7878

79+
public static final String OUTPUT_BATCH_SIZE = "drill.exec.memory.operator.output_batch_size";
80+
public static final LongValidator OUTPUT_BATCH_SIZE_VALIDATOR = new RangeLongValidator(OUTPUT_BATCH_SIZE, 1024, 512 * 1024 * 1024);
81+
7982
// External Sort Boot configuration
8083

8184
public static final String EXTERNAL_SORT_TARGET_SPILL_BATCH_SIZE = "drill.exec.sort.external.spill.batch.size";

exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.drill.common.expression.SchemaPath;
2929
import org.apache.drill.common.logical.data.NamedExpression;
3030
import org.apache.drill.common.types.Types;
31+
import org.apache.drill.exec.ExecConstants;
3132
import org.apache.drill.exec.exception.ClassTransformationException;
3233
import org.apache.drill.exec.exception.OutOfMemoryException;
3334
import org.apache.drill.exec.exception.SchemaChangeException;
@@ -40,6 +41,7 @@
4041
import org.apache.drill.exec.expr.ValueVectorWriteExpression;
4142
import org.apache.drill.exec.ops.FragmentContext;
4243
import org.apache.drill.exec.physical.config.FlattenPOP;
44+
import org.apache.drill.exec.physical.impl.spill.RecordBatchSizer;
4345
import org.apache.drill.exec.record.AbstractSingleRecordBatch;
4446
import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
4547
import org.apache.drill.exec.record.MaterializedField;
@@ -68,6 +70,7 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> {
6870
private boolean hasRemainder = false;
6971
private int remainderIndex = 0;
7072
private int recordCount;
73+
private long outputBatchSize;
7174

7275
private final Flattener.Monitor monitor = new Flattener.Monitor() {
7376
@Override
@@ -94,8 +97,57 @@ private void clear() {
9497
}
9598
}
9699

100+
private class FlattenMemoryManager {
101+
private final int outputRowCount;
102+
private static final int OFFSET_VECTOR_WIDTH = 4;
103+
private static final int WORST_CASE_FRAGMENTATION_FACTOR = 2;
104+
private static final int MAX_NUM_ROWS = ValueVector.MAX_ROW_COUNT;
105+
private static final int MIN_NUM_ROWS = 1;
106+
107+
private FlattenMemoryManager(RecordBatch incoming, long outputBatchSize, SchemaPath flattenColumn) {
108+
// Get sizing information for the batch.
109+
RecordBatchSizer sizer = new RecordBatchSizer(incoming);
110+
111+
final TypedFieldId typedFieldId = incoming.getValueVectorId(flattenColumn);
112+
final MaterializedField field = incoming.getSchema().getColumn(typedFieldId.getFieldIds()[0]);
113+
114+
// Get column size of flatten column.
115+
RecordBatchSizer.ColumnSize columnSize = RecordBatchSizer.getColumn(incoming.getValueAccessorById(field.getValueClass(),
116+
typedFieldId.getFieldIds()).getValueVector(), field.getName());
117+
118+
// Average rowWidth of flatten column
119+
final int avgRowWidthFlattenColumn = RecordBatchSizer.safeDivide(columnSize.netSize, incoming.getRecordCount());
120+
121+
// Average rowWidth excluding the flatten column.
122+
final int avgRowWidthWithOutFlattenColumn = sizer.netRowWidth() - avgRowWidthFlattenColumn;
123+
124+
// Average rowWidth of single element in the flatten list.
125+
// subtract the offset vector size from column data size.
126+
final int avgRowWidthSingleFlattenEntry =
127+
RecordBatchSizer.safeDivide(columnSize.netSize - (OFFSET_VECTOR_WIDTH * columnSize.valueCount), columnSize.elementCount);
128+
129+
// Average rowWidth of outgoing batch.
130+
final int avgOutgoingRowWidth = avgRowWidthWithOutFlattenColumn + avgRowWidthSingleFlattenEntry;
131+
132+
// Number of rows in outgoing batch
133+
outputRowCount = Math.max(MIN_NUM_ROWS, Math.min(MAX_NUM_ROWS,
134+
RecordBatchSizer.safeDivide((outputBatchSize/WORST_CASE_FRAGMENTATION_FACTOR), avgOutgoingRowWidth)));
135+
136+
logger.debug("flatten incoming batch sizer : {}, outputBatchSize : {}," +
137+
"avgOutgoingRowWidth : {}, outputRowCount : {}", sizer, outputBatchSize, avgOutgoingRowWidth, outputRowCount);
138+
}
139+
140+
public int getOutputRowCount() {
141+
return outputRowCount;
142+
}
143+
}
144+
145+
97146
public FlattenRecordBatch(FlattenPOP pop, RecordBatch incoming, FragmentContext context) throws OutOfMemoryException {
98147
super(pop, context, incoming);
148+
149+
// get the output batch size from config.
150+
outputBatchSize = context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR);
99151
}
100152

101153
@Override
@@ -148,6 +200,9 @@ private void setFlattenVector() {
148200

149201
@Override
150202
protected IterOutcome doWork() {
203+
FlattenMemoryManager flattenMemoryManager = new FlattenMemoryManager(incoming, outputBatchSize, popConfig.getColumn());
204+
flattener.setOutputCount(flattenMemoryManager.getOutputRowCount());
205+
151206
int incomingRecordCount = incoming.getRecordCount();
152207

153208
if (!doAlloc()) {

exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenTemplate.java

Lines changed: 9 additions & 133 deletions
Original file line numberDiff line numberDiff line change
@@ -31,30 +31,28 @@
3131

3232
import com.google.common.collect.ImmutableList;
3333

34+
import org.apache.drill.exec.vector.ValueVector;
3435
import org.apache.drill.exec.vector.complex.RepeatedValueVector;
3536
import org.slf4j.Logger;
3637
import org.slf4j.LoggerFactory;
3738

3839
public abstract class FlattenTemplate implements Flattener {
3940
private static final Logger logger = LoggerFactory.getLogger(FlattenTemplate.class);
4041

41-
private static final int OUTPUT_BATCH_SIZE = 4*1024;
42-
private static final int OUTPUT_MEMORY_LIMIT = 512 * 1024 * 1024;
42+
private static final int OUTPUT_ROW_COUNT = ValueVector.MAX_ROW_COUNT;
4343

4444
private ImmutableList<TransferPair> transfers;
4545
private BufferAllocator outputAllocator;
4646
private SelectionVectorMode svMode;
4747
private RepeatedValueVector fieldToFlatten;
4848
private RepeatedValueVector.RepeatedAccessor accessor;
4949
private int valueIndex;
50-
private boolean bigRecords = false;
51-
private int bigRecordsBufferSize;
5250

5351
/**
54-
* The output batch limit starts at OUTPUT_BATCH_SIZE, but may be decreased
52+
* The output batch limit starts at OUTPUT_ROW_COUNT, but may be decreased
5553
* if records are found to be large.
5654
*/
57-
private int outputLimit = OUTPUT_BATCH_SIZE;
55+
private int outputLimit = OUTPUT_ROW_COUNT;
5856

5957
// this allows for groups to be written between batches if we run out of space, for cases where we have finished
6058
// a batch on the boundary it will be set to 0
@@ -72,6 +70,11 @@ public RepeatedValueVector getFlattenField() {
7270
return fieldToFlatten;
7371
}
7472

73+
@Override
74+
public void setOutputCount(int outputCount) {
75+
outputLimit = outputCount;
76+
}
77+
7578
@Override
7679
public final int flattenRecords(final int recordCount, final int firstOutputIndex,
7780
final Flattener.Monitor monitor) {
@@ -101,75 +104,10 @@ public final int flattenRecords(final int recordCount, final int firstOutputInde
101104
for ( ; innerValueIndexLocal < innerValueCount; innerValueIndexLocal++) {
102105
// If we've hit the batch size limit, stop and flush what we've got so far.
103106
if (recordsThisCall == outputLimit) {
104-
if (bigRecords) {
105-
/*
106-
* We got to the limit we used before, but did we go over
107-
* the bigRecordsBufferSize in the second half of the batch? If
108-
* so, we'll need to adjust the batch limits.
109-
*/
110-
adjustBatchLimits(1, monitor, recordsThisCall);
111-
}
112-
113107
// Flush this batch.
114108
break outer;
115109
}
116110

117-
/*
118-
* At the moment, the output record includes the input record, so for very
119-
* large records that we're flattening, we're carrying forward the original
120-
* record as well as the flattened element. We've seen a case where flattening a 4MB
121-
* record with a 20,000 element array causing memory usage to explode. To avoid
122-
* that until we can push down the selected fields to operators like this, we
123-
* also limit the amount of memory in use at one time.
124-
*
125-
* We have to have written at least one record to be able to get a buffer that will
126-
* have a real allocator, so we have to do this lazily. We won't check the limit
127-
* for the first two records, but that keeps this simple.
128-
*/
129-
if (bigRecords) {
130-
/*
131-
* If we're halfway through the outputLimit, check on our memory
132-
* usage so far.
133-
*/
134-
if (recordsThisCall == outputLimit / 2) {
135-
/*
136-
* If we've used more than half the space we've used for big records
137-
* in the past, we've seen even bigger records than before, so stop and
138-
* see if we need to flush here before we go over bigRecordsBufferSize
139-
* memory usage, and reduce the outputLimit further before we continue
140-
* with the next batch.
141-
*/
142-
if (adjustBatchLimits(2, monitor, recordsThisCall)) {
143-
break outer;
144-
}
145-
}
146-
} else {
147-
if (outputAllocator.getAllocatedMemory() > OUTPUT_MEMORY_LIMIT) {
148-
/*
149-
* We're dealing with big records. Reduce the outputLimit to
150-
* the current record count, and take note of how much space the
151-
* vectors report using for that. We'll use those numbers as limits
152-
* going forward in order to avoid allocating more memory.
153-
*/
154-
bigRecords = true;
155-
outputLimit = Math.min(recordsThisCall, outputLimit);
156-
if (outputLimit < 1) {
157-
throw new IllegalStateException("flatten outputLimit (" + outputLimit
158-
+ ") won't make progress");
159-
}
160-
161-
/*
162-
* This will differ from what the allocator reports because of
163-
* overhead. But the allocator check is much cheaper to do, so we
164-
* only compute this at selected times.
165-
*/
166-
bigRecordsBufferSize = monitor.getBufferSizeFor(recordsThisCall);
167-
168-
// Stop and flush.
169-
break outer;
170-
}
171-
}
172-
173111
try {
174112
doEval(valueIndexLocal, outputIndex);
175113
} catch (OversizedAllocationException ex) {
@@ -211,68 +149,6 @@ public final int flattenRecords(final int recordCount, final int firstOutputInde
211149
}
212150
}
213151

214-
/**
215-
* Determine if the current batch record limit needs to be adjusted (when handling
216-
* bigRecord mode). If so, adjust the limit, and return true, otherwise return false.
217-
*
218-
* <p>If the limit is adjusted, it will always be adjusted down, because we need to operate
219-
* based on the largest sized record we've ever seen.</p>
220-
*
221-
* <p>If the limit is adjusted, then the current batch should be flushed, because
222-
* continuing would lead to going over the large memory limit that has already been
223-
* established.</p>
224-
*
225-
* @param multiplier Multiply currently used memory (according to the monitor) before
226-
* checking against past memory limits. This allows for checking the currently used
227-
* memory after processing a fraction of the expected batch limit, but using that as
228-
* a predictor of the full batch's size. For example, if this is checked after half
229-
* the batch size limit's records are processed, then using a multiplier of two will
230-
* do the check under the assumption that processing the full batch limit will use
231-
* twice as much memory.
232-
* @param monitor the Flattener.Monitor instance to use for the current memory usage check
233-
* @param recordsThisCall the number of records processed so far during this call to
234-
* flattenRecords().
235-
* @return true if the batch size limit was adjusted, false otherwise
236-
*/
237-
private boolean adjustBatchLimits(final int multiplier, final Flattener.Monitor monitor,
238-
final int recordsThisCall) {
239-
assert bigRecords : "adjusting batch limits when no big records";
240-
final int bufferSize = multiplier * monitor.getBufferSizeFor(recordsThisCall);
241-
242-
/*
243-
* If the amount of space we've used so far is below the amount that triggered
244-
* the bigRecords mode, then no adjustment is needed.
245-
*/
246-
if (bufferSize <= bigRecordsBufferSize) {
247-
return false;
248-
}
249-
250-
/*
251-
* We've used more space than we've used for big records in the past, we've seen
252-
* even bigger records, so we need to adjust our limits, and flush what we've got so far.
253-
*
254-
* We should reduce the outputLimit proportionately to get the predicted
255-
* amount of memory used back down to bigRecordsBufferSize.
256-
*
257-
* The number of records to limit is therefore
258-
* outputLimit *
259-
* (1 - (bufferSize - bigRecordsBufferSize) / bigRecordsBufferSize)
260-
*
261-
* Doing some algebra on the multiplier:
262-
* (bigRecordsBufferSize - (bufferSize - bigRecordsBufferSize)) / bigRecordsBufferSize
263-
* (bigRecordsBufferSize - bufferSize + bigRecordsBufferSize) / bigRecordsBufferSize
264-
* (2 * bigRecordsBufferSize - bufferSize) / bigRecordsBufferSize
265-
*
266-
* If bufferSize has gotten so big that this would be negative, we'll
267-
* just go down to one record per batch. We need to check for that on
268-
* outputLimit anyway, in order to make sure that we make progress.
269-
*/
270-
final int newLimit = (int)
271-
(outputLimit * (2.0 * ((double) bigRecordsBufferSize) - bufferSize) / bigRecordsBufferSize);
272-
outputLimit = Math.max(1, newLimit);
273-
return true;
274-
}
275-
276152
@Override
277153
public final void setup(FragmentContext context, RecordBatch incoming, RecordBatch outgoing, List<TransferPair> transfers) throws SchemaChangeException{
278154

exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/Flattener.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ interface Monitor {
4545
int flattenRecords(int recordCount, int firstOutputIndex, Monitor monitor);
4646

4747
void setFlattenField(RepeatedValueVector repeatedColumn);
48+
void setOutputCount(int outputCount);
4849

4950
RepeatedValueVector getFlattenField();
5051

exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/RecordBatchSizer.java

Lines changed: 40 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -93,8 +93,18 @@ public static class ColumnSize {
9393

9494
public final int elementCount;
9595

96+
/**
97+
* Size of the top level value vector. For map and repeated list,
98+
* this is just size of offset vector.
99+
*/
96100
public int dataSize;
97101

102+
/**
103+
* Total size of the column includes the sum total of memory for all
104+
* value vectors representing the column.
105+
*/
106+
public int netSize;
107+
98108
/**
99109
* The estimated, average number of elements per parent value.
100110
* Always 1 for a non-repeated type. For a repeated type,
@@ -131,9 +141,15 @@ public ColumnSize(ValueVector v, String prefix) {
131141
break;
132142
default:
133143
dataSize = v.getPayloadByteCount(valueCount);
134-
stdSize = TypeHelper.getSize(metadata.getType()) * elementCount;
144+
try {
145+
stdSize = TypeHelper.getSize(metadata.getType()) * elementCount;
146+
} catch (Exception e) {
147+
// For unsupported types, just set stdSize to 0.
148+
stdSize = 0;
149+
}
135150
}
136151
estSize = safeDivide(dataSize, valueCount);
152+
netSize = v.getPayloadByteCount(valueCount);
137153
}
138154

139155
@SuppressWarnings("resource")
@@ -154,8 +170,14 @@ private int buildRepeated(ValueVector v) {
154170
return childCount;
155171
}
156172

173+
@SuppressWarnings("resource")
157174
private void buildList(ValueVector v) {
158-
@SuppressWarnings("resource")
175+
// complex ListVector cannot be casted to RepeatedListVector.
176+
// check the mode.
177+
if (v.getField().getDataMode() != DataMode.REPEATED) {
178+
dataSize = v.getPayloadByteCount(valueCount);
179+
return;
180+
}
159181
UInt4Vector offsetVector = ((RepeatedListVector) v).getOffsetVector();
160182
dataSize = offsetVector.getPayloadByteCount(valueCount);
161183
}
@@ -232,6 +254,10 @@ else if (width > 0) {
232254
}
233255
}
234256

257+
public static ColumnSize getColumn(ValueVector v, String prefix) {
258+
return new ColumnSize(v, prefix);
259+
}
260+
235261
public static final int MAX_VECTOR_SIZE = ValueVector.MAX_BUFFER_SIZE; // 16 MiB
236262

237263
private List<ColumnSize> columnSizes = new ArrayList<>();
@@ -380,14 +406,18 @@ private void measureColumn(ValueVector v, String prefix) {
380406
// vectors do consume space, so visit columns recursively.
381407

382408
switch (v.getField().getType().getMinorType()) {
383-
case MAP:
384-
expandMap((AbstractMapVector) v, prefix + v.getField().getName() + ".");
385-
break;
386-
case LIST:
387-
expandList((RepeatedListVector) v, prefix + v.getField().getName() + ".");
388-
break;
389-
default:
390-
v.collectLedgers(ledgers);
409+
case MAP:
410+
expandMap((AbstractMapVector) v, prefix + v.getField().getName() + ".");
411+
break;
412+
case LIST:
413+
// complex ListVector cannot be casted to RepeatedListVector.
414+
// do not expand the list if it is not repeated mode.
415+
if (v.getField().getDataMode() == DataMode.REPEATED) {
416+
expandList((RepeatedListVector) v, prefix + v.getField().getName() + ".");
417+
}
418+
break;
419+
default:
420+
v.collectLedgers(ledgers);
391421
}
392422

393423
netRowWidth += colSize.estSize;

0 commit comments

Comments
 (0)