diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStoreWithIncrementalChunkList.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStoreWithIncrementalChunkList.java index a71bb93963a..ee3556f2b0e 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStoreWithIncrementalChunkList.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStoreWithIncrementalChunkList.java @@ -18,6 +18,8 @@ package org.apache.hadoop.ozone.container.metadata; import com.google.common.base.Preconditions; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.hdds.conf.ConfigurationSource; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; @@ -30,7 +32,10 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Optional; +import java.util.concurrent.ExecutionException; +import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.NO_SUCH_BLOCK; import static org.apache.hadoop.ozone.OzoneConsts.INCREMENTAL_CHUNK_LIST; import static org.apache.hadoop.ozone.container.keyvalue.impl.BlockManagerImpl.FULL_CHUNK; @@ -43,6 +48,8 @@ * 3. A Delete Transaction Table. */ public class DatanodeStoreWithIncrementalChunkList extends AbstractDatanodeStore { + private final Cache> cachedBlockData; + private final Cache> cachedLastChunkData; /** * Constructs the metadata store and starts the DB services. * @@ -52,31 +59,42 @@ public class DatanodeStoreWithIncrementalChunkList extends AbstractDatanodeStore public DatanodeStoreWithIncrementalChunkList(ConfigurationSource config, AbstractDatanodeDBDefinition dbDef, boolean openReadOnly) throws IOException { super(config, dbDef, openReadOnly); + + this.cachedBlockData = CacheBuilder.newBuilder() + .recordStats() + .expireAfterAccess(60000, MILLISECONDS) + .maximumSize(1024) + .build(); + this.cachedLastChunkData = CacheBuilder.newBuilder() + .recordStats() + .expireAfterAccess(60000, MILLISECONDS) + .maximumSize(1024) + .build(); } @Override public BlockData getCompleteBlockData(BlockData blockData, BlockID blockID, String blockKey) throws IOException { - BlockData lastChunk = null; + Optional lastChunk = Optional.empty(); if (blockData == null || isPartialChunkList(blockData)) { // check last chunk table - lastChunk = getLastChunkInfoTable().get(blockKey); + lastChunk = getLastChunkData(blockKey); } if (blockData == null) { - if (lastChunk == null) { + if (!lastChunk.isPresent()) { throw new StorageContainerException( NO_SUCH_BLOCK_ERR_MSG + " BlockID : " + blockID, NO_SUCH_BLOCK); } else { if (LOG.isDebugEnabled()) { - LOG.debug("blockData=(null), lastChunk={}", lastChunk.getChunks()); + LOG.debug("blockData=(null), lastChunk={}", lastChunk.get().getChunks()); } - return lastChunk; + return lastChunk.get(); } } else { - if (lastChunk != null) { - reconcilePartialChunks(lastChunk, blockData); + if (lastChunk.isPresent()) { + reconcilePartialChunks(lastChunk.get(), blockData); } else { if (LOG.isDebugEnabled()) { LOG.debug("blockData={}, lastChunk=(null)", blockData.getChunks()); @@ -140,8 +158,7 @@ public void putBlockByID(BatchOperation batch, boolean incremental, boolean endOfBlock) throws IOException { if (!incremental || !isPartialChunkList(data)) { // Case (1) old client: override chunk list. - getBlockDataTable().putWithBatch( - batch, containerData.getBlockKey(localID), data); + putBlockData(batch, containerData.getBlockKey(localID), Optional.of(data)); } else if (shouldAppendLastChunk(endOfBlock, data)) { moveLastChunkToBlockData(batch, localID, data, containerData); } else { @@ -155,55 +172,49 @@ private void moveLastChunkToBlockData(BatchOperation batch, long localID, BlockData data, KeyValueContainerData containerData) throws IOException { // if data has no chunks, fetch the last chunk info from lastChunkInfoTable if (data.getChunks().isEmpty()) { - BlockData lastChunk = getLastChunkInfoTable().get(containerData.getBlockKey(localID)); - if (lastChunk != null) { - reconcilePartialChunks(lastChunk, data); - } + Optional lastChunk = getLastChunkData(containerData.getBlockKey(localID)); + lastChunk.ifPresent(blockData -> reconcilePartialChunks(blockData, data)); } // if eob or if the last chunk is full, // the 'data' is full so append it to the block table's chunk info // and then remove from lastChunkInfo - BlockData blockData = getBlockDataTable().get( - containerData.getBlockKey(localID)); - if (blockData == null) { + Optional blockData = getBlockData(containerData.getBlockKey(localID)); + if (!blockData.isPresent()) { // Case 2.1 if the block did not have full chunks before // the block's chunk is what received from client this time, plus the chunks in lastChunkInfoTable - blockData = data; + blockData = Optional.of(data); } else { // case 2.2 the block already has some full chunks - List chunkInfoList = blockData.getChunks(); - blockData.setChunks(new ArrayList<>(chunkInfoList)); + List chunkInfoList = blockData.get().getChunks(); + blockData.get().setChunks(new ArrayList<>(chunkInfoList)); for (ContainerProtos.ChunkInfo chunk : data.getChunks()) { - blockData.addChunk(chunk); + blockData.get().addChunk(chunk); } - blockData.setBlockCommitSequenceId(data.getBlockCommitSequenceId()); + blockData.get().setBlockCommitSequenceId(data.getBlockCommitSequenceId()); } // delete the entry from last chunk info table - getLastChunkInfoTable().deleteWithBatch( - batch, containerData.getBlockKey(localID)); + deleteLastChunkData(batch, containerData.getBlockKey(localID)); // update block data table - getBlockDataTable().putWithBatch(batch, - containerData.getBlockKey(localID), blockData); + putBlockData(batch, containerData.getBlockKey(localID), blockData); } private void putBlockWithPartialChunks(BatchOperation batch, long localID, BlockData data, KeyValueContainerData containerData) throws IOException { String blockKey = containerData.getBlockKey(localID); - BlockData blockData = getBlockDataTable().get(blockKey); + Optional blockData = getBlockData(blockKey); if (data.getChunks().size() == 1) { // Case (3.1) replace/update the last chunk info table - getLastChunkInfoTable().putWithBatch( - batch, blockKey, data); + putLastChunkData(batch, blockKey, data); // If the block does not exist in the block data table because it is the first chunk // and the chunk is not full, then add an empty block data to populate the block table. // This is required because some of the test code and utilities expect the block to be // present in the block data table, they don't check the last chunk info table. - if (blockData == null) { + if (!blockData.isPresent()) { // populate blockDataTable with empty chunk list - blockData = new BlockData(data.getBlockID()); - blockData.addMetadata(INCREMENTAL_CHUNK_LIST, ""); - blockData.setBlockCommitSequenceId(data.getBlockCommitSequenceId()); - getBlockDataTable().putWithBatch(batch, blockKey, blockData); + blockData = Optional.of(new BlockData(data.getBlockID())); + blockData.get().addMetadata(INCREMENTAL_CHUNK_LIST, ""); + blockData.get().setBlockCommitSequenceId(data.getBlockCommitSequenceId()); + putBlockData(batch, blockKey, blockData); } } else { int lastChunkIndex = data.getChunks().size() - 1; @@ -211,20 +222,22 @@ private void putBlockWithPartialChunks(BatchOperation batch, long localID, List lastChunkInfo = Collections.singletonList( data.getChunks().get(lastChunkIndex)); - if (blockData == null) { + if (!blockData.isPresent()) { // Case 3.2: if the block does not exist in the block data table List chunkInfos = new ArrayList<>(data.getChunks()); chunkInfos.remove(lastChunkIndex); - data.setChunks(chunkInfos); - blockData = data; + blockData = Optional.of(new BlockData(data.getBlockID())); + blockData.get().addMetadata(INCREMENTAL_CHUNK_LIST, ""); + blockData.get().setBlockCommitSequenceId(data.getBlockCommitSequenceId()); + blockData.get().setChunks(chunkInfos); LOG.debug("block {} does not have full chunks yet. Adding the " + "chunks to it {}", localID, blockData); } else { // Case 3.3: if the block exists in the block data table, // append chunks till except the last one (supposedly partial) List chunkInfos = - new ArrayList<>(blockData.getChunks()); + new ArrayList<>(blockData.get().getChunks()); LOG.debug("blockData.getChunks()={}", chunkInfos); LOG.debug("data.getChunks()={}", data.getChunks()); @@ -232,15 +245,49 @@ private void putBlockWithPartialChunks(BatchOperation batch, long localID, for (int i = 0; i < lastChunkIndex; i++) { chunkInfos.add(data.getChunks().get(i)); } - blockData.setChunks(chunkInfos); - blockData.setBlockCommitSequenceId(data.getBlockCommitSequenceId()); + blockData.get().setChunks(chunkInfos); + blockData.get().setBlockCommitSequenceId(data.getBlockCommitSequenceId()); } - getBlockDataTable().putWithBatch(batch, - containerData.getBlockKey(localID), blockData); + putBlockData(batch, containerData.getBlockKey(localID), blockData); // update the last partial chunk data.setChunks(lastChunkInfo); - getLastChunkInfoTable().putWithBatch( - batch, containerData.getBlockKey(localID), data); + putLastChunkData(batch, containerData.getBlockKey(localID), data); + } + } + + private void putBlockData(BatchOperation batch, String key, Optional blockData) + throws IOException { + getBlockDataTable().putWithBatch(batch, key, blockData.get()); + cachedBlockData.put(key, blockData); + } + + private Optional getBlockData(String key) + throws IOException { + try { + return cachedBlockData.get(key, () -> Optional.ofNullable(getBlockDataTable().get(key))); + } catch (ExecutionException e) { + throw (IOException) e.getCause(); + } + } + + private void putLastChunkData(BatchOperation batch, String key, BlockData blockData) + throws IOException { + getLastChunkInfoTable().putWithBatch(batch, key, blockData); + cachedLastChunkData.put(key, Optional.of(blockData)); + } + + private void deleteLastChunkData(BatchOperation batch, String key) + throws IOException { + getLastChunkInfoTable().deleteWithBatch(batch, key); + cachedLastChunkData.invalidate(key); + } + + private Optional getLastChunkData(String key) + throws IOException { + try { + return cachedLastChunkData.get(key, () -> Optional.ofNullable(getLastChunkInfoTable().get(key))); + } catch (ExecutionException e) { + throw (IOException) e.getCause(); } } } diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestBlockManagerImpl.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestBlockManagerImpl.java index afe6c952e91..edddd2b040d 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestBlockManagerImpl.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestBlockManagerImpl.java @@ -395,7 +395,6 @@ public void testFlush3(ContainerTestVersionInfo versionInfo) // verify that the four chunks are full BlockData getBlockData = blockManager.getBlock(keyValueContainer, new BlockID(containerID, blockNo)); - assertEquals(chunkLimit * 4, getBlockData.getSize()); assertEquals(2, getBlockData.getBlockCommitSequenceId()); List chunkInfos = getBlockData.getChunks(); assertEquals(4, chunkInfos.size()); @@ -403,5 +402,7 @@ public void testFlush3(ContainerTestVersionInfo versionInfo) assertEquals(chunkLimit, chunkInfos.get(i).getLen()); assertEquals(chunkLimit * i, chunkInfos.get(i).getOffset()); } + assertEquals(chunkLimit * 4, getBlockData.getSize()); + } }