[SPARK-21517][CORE] Avoid copying memory when transfer chunks remotely#18723
[SPARK-21517][CORE] Avoid copying memory when transfer chunks remotely#18723caneGuy wants to merge 2 commits into
Conversation
|
Can anyone help verify this?Thanks too much. |
| */ | ||
| def toNetty: ByteBuf = { | ||
| Unpooled.wrappedBuffer(getChunks(): _*) | ||
| Unpooled.wrappedBuffer(chunks.length + 1, getChunks(): _*) |
There was a problem hiding this comment.
Why chunks.length+1 instead of chunks.length?
There was a problem hiding this comment.
I have changed the value to chunks.length
|
IIUC, this PR avoid data copy for consolidation in I have two questions to understand this change.
|
|
@kiszk Thanks for your time. |
|
Thank you. 1. makes sense since memory is allocated by 2x at that point. I have another question to understand why it happens. Does this OOM occurs when any OpenBlocks message are received. Or, any specific scenario (e.g. receive a large message, a lot of multiple messages, or so on.) |
|
@kiszk Actually, i am confused with default value 16 too. |
|
Sound good to me |
|
ok to test |
|
LGTM |
|
Test build #79943 has finished for PR 18723 at commit
|
|
Thanks! Merging to master. |
What changes were proposed in this pull request?
In our production cluster,oom happens when NettyBlockRpcServer receive OpenBlocks message.The reason we observed is below:
When BlockManagerManagedBuffer call ChunkedByteBuffer#toNetty, it will use Unpooled.wrappedBuffer(ByteBuffer... buffers) which use default maxNumComponents=16 in low-level CompositeByteBuf.When our component's number is bigger than 16, it will execute consolidateIfNeeded
in CompositeByteBuf which will consume some memory during buffer copy.
We can use another api Unpooled. wrappedBuffer(int maxNumComponents, ByteBuffer... buffers) to avoid this comsuming.
How was this patch tested?
Test in production cluster.