From c3afb09f942e0465b05155fda18fb24eabc212f8 Mon Sep 17 00:00:00 2001 From: Ankur Dave Date: Sun, 13 Sep 2020 14:13:35 -0700 Subject: [PATCH 1/6] [SPARK-32872][CORE] Prevent BytesToBytesMap from exceeding growth threshold near MAX_CAPACITY --- .../java/org/apache/spark/unsafe/map/BytesToBytesMap.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java index 8eea9db393aff..d1343a2652da4 100644 --- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java +++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java @@ -816,6 +816,10 @@ public boolean append(Object kbase, long koff, int klen, Object vbase, long voff } catch (SparkOutOfMemoryError oom) { canGrowArray = false; } + } else if (numKeys >= growthThreshold && longArray.size() / 2 >= MAX_CAPACITY) { + // The map needs to grow, but this would cause it to exceed MAX_CAPACITY. Prevent the map + // from accepting any more new elements. + canGrowArray = false; } } return true; From de0f9fe895bc9a6c14e69a8d4a4717a6d8faaa71 Mon Sep 17 00:00:00 2001 From: Ankur Dave Date: Sun, 13 Sep 2020 14:39:26 -0700 Subject: [PATCH 2/6] Reword comment based on review feedback --- .../java/org/apache/spark/unsafe/map/BytesToBytesMap.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java index d1343a2652da4..8d570b6b26230 100644 --- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java +++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java @@ -817,8 +817,8 @@ public boolean append(Object kbase, long koff, int klen, Object vbase, long voff canGrowArray = false; } } else if (numKeys >= growthThreshold && longArray.size() / 2 >= MAX_CAPACITY) { - // The map needs to grow, but this would cause it to exceed MAX_CAPACITY. Prevent the map - // from accepting any more new elements. + // The map cannot grow because doing so would cause it to exceed MAX_CAPACITY. Instead, we + // prevent the map from accepting any more new elements. canGrowArray = false; } } From 7d76463e94b62c31b563c00e68752139e34183b7 Mon Sep 17 00:00:00 2001 From: Ankur Dave Date: Sun, 13 Sep 2020 16:04:58 -0700 Subject: [PATCH 3/6] Better comment wording --- .../java/org/apache/spark/unsafe/map/BytesToBytesMap.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java index 8d570b6b26230..ace91eccc0e85 100644 --- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java +++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java @@ -817,8 +817,8 @@ public boolean append(Object kbase, long koff, int klen, Object vbase, long voff canGrowArray = false; } } else if (numKeys >= growthThreshold && longArray.size() / 2 >= MAX_CAPACITY) { - // The map cannot grow because doing so would cause it to exceed MAX_CAPACITY. Instead, we - // prevent the map from accepting any more new elements. + // The map has reached its growth threshold, but is already at MAX_CAPACITY and cannot + // grow. Instead, we prevent the map from accepting any more new elements. canGrowArray = false; } } From ec51e65585e3fb5fa982ea51b7d371672808f583 Mon Sep 17 00:00:00 2001 From: Ankur Dave Date: Sun, 13 Sep 2020 16:34:57 -0700 Subject: [PATCH 4/6] Adopt comment wording suggested by @viirya --- .../java/org/apache/spark/unsafe/map/BytesToBytesMap.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java index ace91eccc0e85..0cb17c14230e8 100644 --- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java +++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java @@ -818,7 +818,9 @@ public boolean append(Object kbase, long koff, int klen, Object vbase, long voff } } else if (numKeys >= growthThreshold && longArray.size() / 2 >= MAX_CAPACITY) { // The map has reached its growth threshold, but is already at MAX_CAPACITY and cannot - // grow. Instead, we prevent the map from accepting any more new elements. + // grow. Instead, we prevent the map from accepting any more new elements to make sure we + // don't exceed the load factor. If we need to spill later, this allows + // UnsafeKVExternalSorter to reuse the array for sorting. canGrowArray = false; } } From f610fbee9398b59e8bad7b8b2ec72fb72603352d Mon Sep 17 00:00:00 2001 From: Ankur Dave Date: Mon, 14 Sep 2020 00:17:09 -0700 Subject: [PATCH 5/6] Address review comment --- .../spark/unsafe/map/BytesToBytesMap.java | 26 ++++++++++--------- 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java index 0cb17c14230e8..5f37305d95196 100644 --- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java +++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java @@ -808,20 +808,22 @@ public boolean append(Object kbase, long koff, int klen, Object vbase, long voff longArray.set(pos * 2 + 1, keyHashcode); isDefined = true; - // We use two array entries per key, so the array size is twice the capacity. - // We should compare the current capacity of the array, instead of its size. - if (numKeys >= growthThreshold && longArray.size() / 2 < MAX_CAPACITY) { - try { - growAndRehash(); - } catch (SparkOutOfMemoryError oom) { + // If the map has reached its growth threshold, try to grow it. + if (numKeys >= growthThreshold) { + // We use two array entries per key, so the array size is twice the capacity. + // We should compare the current capacity of the array, instead of its size. + if (longArray.size() / 2 < MAX_CAPACITY) { + try { + growAndRehash(); + } catch (SparkOutOfMemoryError oom) { + canGrowArray = false; + } + } else { + // The map is already at MAX_CAPACITY and cannot grow. Instead, we prevent it from + // accepting any more new elements to make sure we don't exceed the load factor. If we + // need to spill later, this allows UnsafeKVExternalSorter to reuse the array for sorting. canGrowArray = false; } - } else if (numKeys >= growthThreshold && longArray.size() / 2 >= MAX_CAPACITY) { - // The map has reached its growth threshold, but is already at MAX_CAPACITY and cannot - // grow. Instead, we prevent the map from accepting any more new elements to make sure we - // don't exceed the load factor. If we need to spill later, this allows - // UnsafeKVExternalSorter to reuse the array for sorting. - canGrowArray = false; } } return true; From 680c4cf9437f8d8878de46446d4d4c18f31b88e9 Mon Sep 17 00:00:00 2001 From: Ankur Dave Date: Mon, 14 Sep 2020 09:03:42 -0700 Subject: [PATCH 6/6] Line length --- .../main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java index 5f37305d95196..d7940fc08e1a5 100644 --- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java +++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java @@ -821,7 +821,8 @@ public boolean append(Object kbase, long koff, int klen, Object vbase, long voff } else { // The map is already at MAX_CAPACITY and cannot grow. Instead, we prevent it from // accepting any more new elements to make sure we don't exceed the load factor. If we - // need to spill later, this allows UnsafeKVExternalSorter to reuse the array for sorting. + // need to spill later, this allows UnsafeKVExternalSorter to reuse the array for + // sorting. canGrowArray = false; } }