Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HADOOP-19341] HDFS Client's direct memory leaks with erasure coding enabled #7168

Open
wants to merge 6 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,6 @@
import org.apache.hadoop.thirdparty.com.google.common.collect.ComparisonChain;
import org.apache.commons.lang3.builder.HashCodeBuilder;

import java.nio.ByteBuffer;
import java.util.Map;
import java.util.TreeMap;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;

Expand All @@ -36,7 +32,7 @@
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
public class ElasticByteBufferPool implements ByteBufferPool {
public abstract class ElasticByteBufferPool implements ByteBufferPool {
EungsopYoo marked this conversation as resolved.
Show resolved Hide resolved
protected static final class Key implements Comparable<Key> {
private final int capacity;
private final long insertionTime;
Expand Down Expand Up @@ -76,48 +72,6 @@ public int hashCode() {
}
}

private final TreeMap<Key, ByteBuffer> buffers =
new TreeMap<Key, ByteBuffer>();

private final TreeMap<Key, ByteBuffer> directBuffers =
new TreeMap<Key, ByteBuffer>();

private final TreeMap<Key, ByteBuffer> getBufferTree(boolean direct) {
return direct ? directBuffers : buffers;
}

@Override
public synchronized ByteBuffer getBuffer(boolean direct, int length) {
TreeMap<Key, ByteBuffer> tree = getBufferTree(direct);
Map.Entry<Key, ByteBuffer> entry =
tree.ceilingEntry(new Key(length, 0));
if (entry == null) {
return direct ? ByteBuffer.allocateDirect(length) :
ByteBuffer.allocate(length);
}
tree.remove(entry.getKey());
entry.getValue().clear();
return entry.getValue();
}

@Override
public synchronized void putBuffer(ByteBuffer buffer) {
buffer.clear();
TreeMap<Key, ByteBuffer> tree = getBufferTree(buffer.isDirect());
while (true) {
Key key = new Key(buffer.capacity(), System.nanoTime());
if (!tree.containsKey(key)) {
tree.put(key, buffer);
return;
}
// Buffers are indexed by (capacity, time).
// If our key is not unique on the first try, we try again, since the
// time will be different. Since we use nanoseconds, it's pretty
// unlikely that we'll loop even once, unless the system clock has a
// poor granularity.
}
}

/**
* Get the size of the buffer pool, for the specified buffer type.
*
Expand All @@ -126,7 +80,5 @@ public synchronized void putBuffer(ByteBuffer buffer) {
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public int size(boolean direct) {
return getBufferTree(direct).size();
}
public abstract int getCurrentBuffersCount(boolean direct);
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public class TestMoreWeakReferencedElasticByteBufferPool

@Test
public void testMixedBuffersInPool() {
WeakReferencedElasticByteBufferPool pool = new WeakReferencedElasticByteBufferPool();
ElasticByteBufferPool pool = new WeakReferencedElasticByteBufferPool();
ByteBuffer buffer1 = pool.getBuffer(true, 5);
ByteBuffer buffer2 = pool.getBuffer(true, 10);
ByteBuffer buffer3 = pool.getBuffer(false, 5);
Expand All @@ -60,7 +60,7 @@ public void testMixedBuffersInPool() {

@Test
public void testUnexpectedBufferSizes() throws Exception {
WeakReferencedElasticByteBufferPool pool = new WeakReferencedElasticByteBufferPool();
ElasticByteBufferPool pool = new WeakReferencedElasticByteBufferPool();
ByteBuffer buffer1 = pool.getBuffer(true, 0);

// try writing a random byte in a 0 length buffer.
Expand All @@ -84,7 +84,7 @@ public void testUnexpectedBufferSizes() throws Exception {
* @param numDirectBuffersExpected expected number of direct buffers.
* @param numHeapBuffersExpected expected number of heap buffers.
*/
private void assertBufferCounts(WeakReferencedElasticByteBufferPool pool,
private void assertBufferCounts(ElasticByteBufferPool pool,
int numDirectBuffersExpected,
int numHeapBuffersExpected) {
Assertions.assertThat(pool.getCurrentBuffersCount(true))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public TestWeakReferencedElasticByteBufferPool(String type) {

@Test
public void testGetAndPutBasic() {
WeakReferencedElasticByteBufferPool pool = new WeakReferencedElasticByteBufferPool();
ElasticByteBufferPool pool = new WeakReferencedElasticByteBufferPool();
int bufferSize = 5;
ByteBuffer buffer = pool.getBuffer(isDirect, bufferSize);
Assertions.assertThat(buffer.isDirect())
Expand Down Expand Up @@ -83,7 +83,7 @@ public void testGetAndPutBasic() {

@Test
public void testPoolingWithDifferentSizes() {
WeakReferencedElasticByteBufferPool pool = new WeakReferencedElasticByteBufferPool();
ElasticByteBufferPool pool = new WeakReferencedElasticByteBufferPool();
ByteBuffer buffer = pool.getBuffer(isDirect, 5);
ByteBuffer buffer1 = pool.getBuffer(isDirect, 10);
ByteBuffer buffer2 = pool.getBuffer(isDirect, 15);
Expand Down Expand Up @@ -121,7 +121,7 @@ public void testPoolingWithDifferentSizes() {

@Test
public void testPoolingWithDifferentInsertionTime() {
WeakReferencedElasticByteBufferPool pool = new WeakReferencedElasticByteBufferPool();
ElasticByteBufferPool pool = new WeakReferencedElasticByteBufferPool();
ByteBuffer buffer = pool.getBuffer(isDirect, 10);
ByteBuffer buffer1 = pool.getBuffer(isDirect, 10);
ByteBuffer buffer2 = pool.getBuffer(isDirect, 10);
Expand Down Expand Up @@ -155,7 +155,7 @@ public void testPoolingWithDifferentInsertionTime() {

@Test
public void testGarbageCollection() {
WeakReferencedElasticByteBufferPool pool = new WeakReferencedElasticByteBufferPool();
ElasticByteBufferPool pool = new WeakReferencedElasticByteBufferPool();
ByteBuffer buffer = pool.getBuffer(isDirect, 5);
ByteBuffer buffer1 = pool.getBuffer(isDirect, 10);
ByteBuffer buffer2 = pool.getBuffer(isDirect, 15);
Expand Down Expand Up @@ -187,7 +187,7 @@ public void testGarbageCollection() {

@Test
public void testWeakReferencesPruning() {
WeakReferencedElasticByteBufferPool pool = new WeakReferencedElasticByteBufferPool();
ElasticByteBufferPool pool = new WeakReferencedElasticByteBufferPool();
ByteBuffer buffer1 = pool.getBuffer(isDirect, 5);
ByteBuffer buffer2 = pool.getBuffer(isDirect, 10);
ByteBuffer buffer3 = pool.getBuffer(isDirect, 15);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
import org.apache.hadoop.hdfs.util.StripedBlockUtil.StripeRange;
import org.apache.hadoop.io.ByteBufferPool;

import org.apache.hadoop.io.ElasticByteBufferPool;
import org.apache.hadoop.io.WeakReferencedElasticByteBufferPool;
import org.apache.hadoop.io.erasurecode.CodecUtil;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;

Expand Down Expand Up @@ -64,7 +64,7 @@
@InterfaceAudience.Private
public class DFSStripedInputStream extends DFSInputStream {

private static final ByteBufferPool BUFFER_POOL = new ElasticByteBufferPool();
private static final ByteBufferPool BUFFER_POOL = new WeakReferencedElasticByteBufferPool();
private final BlockReaderInfo[] blockReaders;
private final int cellSize;
private final short dataBlkNum;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@
import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.apache.hadoop.io.ByteBufferPool;
import org.apache.hadoop.io.ElasticByteBufferPool;
import org.apache.hadoop.io.MultipleIOException;
import org.apache.hadoop.io.WeakReferencedElasticByteBufferPool;
import org.apache.hadoop.io.erasurecode.CodecUtil;
import org.apache.hadoop.io.erasurecode.ErasureCoderOptions;
import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder;
Expand Down Expand Up @@ -83,7 +83,7 @@
@InterfaceAudience.Private
public class DFSStripedOutputStream extends DFSOutputStream
implements StreamCapabilities {
private static final ByteBufferPool BUFFER_POOL = new ElasticByteBufferPool();
private static final ByteBufferPool BUFFER_POOL = new WeakReferencedElasticByteBufferPool();

/**
* OutputStream level last exception, will be used to indicate the fatal
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.io.ByteBufferPool;
import org.apache.hadoop.io.ElasticByteBufferPool;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.WeakReferencedElasticByteBufferPool;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.token.Token;

Expand Down Expand Up @@ -69,7 +69,7 @@ class StripedBlockWriter {
private ByteBuffer targetBuffer;
private long blockOffset4Target = 0;
private long seqNo4Target = 0;
private static final ByteBufferPool BUFFER_POOL = new ElasticByteBufferPool();
private static final ByteBufferPool BUFFER_POOL = new WeakReferencedElasticByteBufferPool();

StripedBlockWriter(StripedWriter stripedWriter, DataNode datanode,
Configuration conf, ExtendedBlock block,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.apache.hadoop.hdfs.util.StripedBlockUtil.BlockReadStats;
import org.apache.hadoop.io.ByteBufferPool;
import org.apache.hadoop.io.ElasticByteBufferPool;
import org.apache.hadoop.io.WeakReferencedElasticByteBufferPool;
import org.apache.hadoop.io.erasurecode.CodecUtil;
import org.apache.hadoop.io.erasurecode.ErasureCoderOptions;
import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder;
Expand Down Expand Up @@ -108,7 +108,7 @@ abstract class StripedReconstructor {
private final ErasureCoderOptions coderOptions;
private RawErasureDecoder decoder;
private final ExtendedBlock blockGroup;
private static final ByteBufferPool BUFFER_POOL = new ElasticByteBufferPool();
private static final ByteBufferPool BUFFER_POOL = new WeakReferencedElasticByteBufferPool();

private final boolean isValidationEnabled;
private DecodingValidator validator;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -555,16 +555,16 @@ public void testCloseDoesNotAllocateNewBuffer() throws Exception {
final ElasticByteBufferPool ebbp =
(ElasticByteBufferPool) stream.getBufferPool();
// first clear existing pool
LOG.info("Current pool size: direct: " + ebbp.size(true) + ", indirect: "
+ ebbp.size(false));
LOG.info("Current pool size: direct: " + ebbp.getCurrentBuffersCount(true) + ", indirect: "
+ ebbp.getCurrentBuffersCount(false));
emptyBufferPoolForCurrentPolicy(ebbp, true);
emptyBufferPoolForCurrentPolicy(ebbp, false);
final int startSizeDirect = ebbp.size(true);
final int startSizeIndirect = ebbp.size(false);
final int startSizeDirect = ebbp.getCurrentBuffersCount(true);
final int startSizeIndirect = ebbp.getCurrentBuffersCount(false);
// close should not allocate new buffers in the pool.
stream.close();
assertEquals(startSizeDirect, ebbp.size(true));
assertEquals(startSizeIndirect, ebbp.size(false));
assertEquals(startSizeDirect, ebbp.getCurrentBuffersCount(true));
assertEquals(startSizeIndirect, ebbp.getCurrentBuffersCount(false));
}
}

Expand Down Expand Up @@ -621,10 +621,10 @@ public void testReadWhenLastIncompleteCellComeInToDecodeAlignedStripe()
private void emptyBufferPoolForCurrentPolicy(ElasticByteBufferPool ebbp,
boolean direct) {
int size;
while ((size = ebbp.size(direct)) != 0) {
while ((size = ebbp.getCurrentBuffersCount(direct)) != 0) {
ebbp.getBuffer(direct,
ecPolicy.getCellSize() * ecPolicy.getNumDataUnits());
if (size == ebbp.size(direct)) {
if (size == ebbp.getCurrentBuffersCount(direct)) {
// if getBuffer didn't decrease size, it means the pool for the buffer
// corresponding to current ecPolicy is empty
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -828,7 +828,7 @@ public void interceptFreeBlockReaderBuffer() {

private void assertBufferPoolIsEmpty(ElasticByteBufferPool bufferPool,
boolean direct) {
while (bufferPool.size(direct) != 0) {
while (bufferPool.getCurrentBuffersCount(direct) != 0) {
// iterate all ByteBuffers in ElasticByteBufferPool
ByteBuffer byteBuffer = bufferPool.getBuffer(direct, 0);
Assert.assertEquals(0, byteBuffer.position());
Expand All @@ -837,7 +837,7 @@ private void assertBufferPoolIsEmpty(ElasticByteBufferPool bufferPool,

private void emptyBufferPool(ElasticByteBufferPool bufferPool,
boolean direct) {
while (bufferPool.size(direct) != 0) {
while (bufferPool.getCurrentBuffersCount(direct) != 0) {
bufferPool.getBuffer(direct, 0);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@

import org.apache.hadoop.fs.impl.StoreImplementationUtils;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.io.WeakReferencedElasticByteBufferPool;
import org.apache.hadoop.util.Preconditions;
import org.apache.commons.lang3.StringUtils;

Expand Down Expand Up @@ -239,7 +240,7 @@ void dump() {}
* back to the queue
*/
private final ElasticByteBufferPool poolReadyByteBuffers
= new ElasticByteBufferPool();
= new WeakReferencedElasticByteBufferPool();

/**
* The blob's block list.
Expand Down
Loading