Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HDDS-11766. DataNode to cache blockDataTable and lastChunkInfoTable #7469

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package org.apache.hadoop.ozone.container.metadata;

import com.google.common.base.Preconditions;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
Expand All @@ -30,7 +32,10 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutionException;

import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.NO_SUCH_BLOCK;
import static org.apache.hadoop.ozone.OzoneConsts.INCREMENTAL_CHUNK_LIST;
import static org.apache.hadoop.ozone.container.keyvalue.impl.BlockManagerImpl.FULL_CHUNK;
Expand All @@ -43,6 +48,8 @@
* 3. A Delete Transaction Table.
*/
public class DatanodeStoreWithIncrementalChunkList extends AbstractDatanodeStore {
private final Cache<String, Optional<BlockData>> cachedBlockData;
private final Cache<String, Optional<BlockData>> cachedLastChunkData;
/**
* Constructs the metadata store and starts the DB services.
*
Expand All @@ -52,31 +59,42 @@ public class DatanodeStoreWithIncrementalChunkList extends AbstractDatanodeStore
public DatanodeStoreWithIncrementalChunkList(ConfigurationSource config,
AbstractDatanodeDBDefinition dbDef, boolean openReadOnly) throws IOException {
super(config, dbDef, openReadOnly);

this.cachedBlockData = CacheBuilder.newBuilder()
.recordStats()
.expireAfterAccess(60000, MILLISECONDS)
.maximumSize(1024)
.build();
this.cachedLastChunkData = CacheBuilder.newBuilder()
.recordStats()
.expireAfterAccess(60000, MILLISECONDS)
.maximumSize(1024)
.build();
}


@Override
public BlockData getCompleteBlockData(BlockData blockData,
BlockID blockID, String blockKey) throws IOException {
BlockData lastChunk = null;
Optional<BlockData> lastChunk = Optional.empty();
if (blockData == null || isPartialChunkList(blockData)) {
// check last chunk table
lastChunk = getLastChunkInfoTable().get(blockKey);
lastChunk = getLastChunkData(blockKey);
}

if (blockData == null) {
if (lastChunk == null) {
if (!lastChunk.isPresent()) {
throw new StorageContainerException(
NO_SUCH_BLOCK_ERR_MSG + " BlockID : " + blockID, NO_SUCH_BLOCK);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("blockData=(null), lastChunk={}", lastChunk.getChunks());
LOG.debug("blockData=(null), lastChunk={}", lastChunk.get().getChunks());
}
return lastChunk;
return lastChunk.get();
}
} else {
if (lastChunk != null) {
reconcilePartialChunks(lastChunk, blockData);
if (lastChunk.isPresent()) {
reconcilePartialChunks(lastChunk.get(), blockData);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("blockData={}, lastChunk=(null)", blockData.getChunks());
Expand Down Expand Up @@ -140,8 +158,7 @@ public void putBlockByID(BatchOperation batch, boolean incremental,
boolean endOfBlock) throws IOException {
if (!incremental || !isPartialChunkList(data)) {
// Case (1) old client: override chunk list.
getBlockDataTable().putWithBatch(
batch, containerData.getBlockKey(localID), data);
putBlockData(batch, containerData.getBlockKey(localID), Optional.of(data));
} else if (shouldAppendLastChunk(endOfBlock, data)) {
moveLastChunkToBlockData(batch, localID, data, containerData);
} else {
Expand All @@ -155,92 +172,122 @@ private void moveLastChunkToBlockData(BatchOperation batch, long localID,
BlockData data, KeyValueContainerData containerData) throws IOException {
// if data has no chunks, fetch the last chunk info from lastChunkInfoTable
if (data.getChunks().isEmpty()) {
BlockData lastChunk = getLastChunkInfoTable().get(containerData.getBlockKey(localID));
if (lastChunk != null) {
reconcilePartialChunks(lastChunk, data);
}
Optional<BlockData> lastChunk = getLastChunkData(containerData.getBlockKey(localID));
lastChunk.ifPresent(blockData -> reconcilePartialChunks(blockData, data));
}
// if eob or if the last chunk is full,
// the 'data' is full so append it to the block table's chunk info
// and then remove from lastChunkInfo
BlockData blockData = getBlockDataTable().get(
containerData.getBlockKey(localID));
if (blockData == null) {
Optional<BlockData> blockData = getBlockData(containerData.getBlockKey(localID));
if (!blockData.isPresent()) {
// Case 2.1 if the block did not have full chunks before
// the block's chunk is what received from client this time, plus the chunks in lastChunkInfoTable
blockData = data;
blockData = Optional.of(data);
} else {
// case 2.2 the block already has some full chunks
List<ContainerProtos.ChunkInfo> chunkInfoList = blockData.getChunks();
blockData.setChunks(new ArrayList<>(chunkInfoList));
List<ContainerProtos.ChunkInfo> chunkInfoList = blockData.get().getChunks();
blockData.get().setChunks(new ArrayList<>(chunkInfoList));
for (ContainerProtos.ChunkInfo chunk : data.getChunks()) {
blockData.addChunk(chunk);
blockData.get().addChunk(chunk);
}
blockData.setBlockCommitSequenceId(data.getBlockCommitSequenceId());
blockData.get().setBlockCommitSequenceId(data.getBlockCommitSequenceId());
}
// delete the entry from last chunk info table
getLastChunkInfoTable().deleteWithBatch(
batch, containerData.getBlockKey(localID));
deleteLastChunkData(batch, containerData.getBlockKey(localID));
// update block data table
getBlockDataTable().putWithBatch(batch,
containerData.getBlockKey(localID), blockData);
putBlockData(batch, containerData.getBlockKey(localID), blockData);
}

private void putBlockWithPartialChunks(BatchOperation batch, long localID,
BlockData data, KeyValueContainerData containerData) throws IOException {
String blockKey = containerData.getBlockKey(localID);
BlockData blockData = getBlockDataTable().get(blockKey);
Optional<BlockData> blockData = getBlockData(blockKey);
if (data.getChunks().size() == 1) {
// Case (3.1) replace/update the last chunk info table
getLastChunkInfoTable().putWithBatch(
batch, blockKey, data);
putLastChunkData(batch, blockKey, data);
// If the block does not exist in the block data table because it is the first chunk
// and the chunk is not full, then add an empty block data to populate the block table.
// This is required because some of the test code and utilities expect the block to be
// present in the block data table, they don't check the last chunk info table.
if (blockData == null) {
if (!blockData.isPresent()) {
// populate blockDataTable with empty chunk list
blockData = new BlockData(data.getBlockID());
blockData.addMetadata(INCREMENTAL_CHUNK_LIST, "");
blockData.setBlockCommitSequenceId(data.getBlockCommitSequenceId());
getBlockDataTable().putWithBatch(batch, blockKey, blockData);
blockData = Optional.of(new BlockData(data.getBlockID()));
blockData.get().addMetadata(INCREMENTAL_CHUNK_LIST, "");
blockData.get().setBlockCommitSequenceId(data.getBlockCommitSequenceId());
putBlockData(batch, blockKey, blockData);
}
} else {
int lastChunkIndex = data.getChunks().size() - 1;
// received more than one chunk this time
List<ContainerProtos.ChunkInfo> lastChunkInfo =
Collections.singletonList(
data.getChunks().get(lastChunkIndex));
if (blockData == null) {
if (!blockData.isPresent()) {
// Case 3.2: if the block does not exist in the block data table
List<ContainerProtos.ChunkInfo> chunkInfos =
new ArrayList<>(data.getChunks());
chunkInfos.remove(lastChunkIndex);
data.setChunks(chunkInfos);
blockData = data;
blockData = Optional.of(new BlockData(data.getBlockID()));
blockData.get().addMetadata(INCREMENTAL_CHUNK_LIST, "");
blockData.get().setBlockCommitSequenceId(data.getBlockCommitSequenceId());
blockData.get().setChunks(chunkInfos);
LOG.debug("block {} does not have full chunks yet. Adding the " +
"chunks to it {}", localID, blockData);
} else {
// Case 3.3: if the block exists in the block data table,
// append chunks till except the last one (supposedly partial)
List<ContainerProtos.ChunkInfo> chunkInfos =
new ArrayList<>(blockData.getChunks());
new ArrayList<>(blockData.get().getChunks());

LOG.debug("blockData.getChunks()={}", chunkInfos);
LOG.debug("data.getChunks()={}", data.getChunks());

for (int i = 0; i < lastChunkIndex; i++) {
chunkInfos.add(data.getChunks().get(i));
}
blockData.setChunks(chunkInfos);
blockData.setBlockCommitSequenceId(data.getBlockCommitSequenceId());
blockData.get().setChunks(chunkInfos);
blockData.get().setBlockCommitSequenceId(data.getBlockCommitSequenceId());
}
getBlockDataTable().putWithBatch(batch,
containerData.getBlockKey(localID), blockData);
putBlockData(batch, containerData.getBlockKey(localID), blockData);
// update the last partial chunk
data.setChunks(lastChunkInfo);
getLastChunkInfoTable().putWithBatch(
batch, containerData.getBlockKey(localID), data);
putLastChunkData(batch, containerData.getBlockKey(localID), data);
}
}

private void putBlockData(BatchOperation batch, String key, Optional<BlockData> blockData)
throws IOException {
getBlockDataTable().putWithBatch(batch, key, blockData.get());
cachedBlockData.put(key, blockData);
}

private Optional<BlockData> getBlockData(String key)
throws IOException {
try {
return cachedBlockData.get(key, () -> Optional.ofNullable(getBlockDataTable().get(key)));
} catch (ExecutionException e) {
throw (IOException) e.getCause();
}
}

private void putLastChunkData(BatchOperation batch, String key, BlockData blockData)
throws IOException {
getLastChunkInfoTable().putWithBatch(batch, key, blockData);
cachedLastChunkData.put(key, Optional.of(blockData));
}

private void deleteLastChunkData(BatchOperation batch, String key)
throws IOException {
getLastChunkInfoTable().deleteWithBatch(batch, key);
cachedLastChunkData.invalidate(key);
}

private Optional<BlockData> getLastChunkData(String key)
throws IOException {
try {
return cachedLastChunkData.get(key, () -> Optional.ofNullable(getLastChunkInfoTable().get(key)));
} catch (ExecutionException e) {
throw (IOException) e.getCause();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -395,13 +395,14 @@ public void testFlush3(ContainerTestVersionInfo versionInfo)
// verify that the four chunks are full
BlockData getBlockData = blockManager.getBlock(keyValueContainer,
new BlockID(containerID, blockNo));
assertEquals(chunkLimit * 4, getBlockData.getSize());
assertEquals(2, getBlockData.getBlockCommitSequenceId());
List<ContainerProtos.ChunkInfo> chunkInfos = getBlockData.getChunks();
assertEquals(4, chunkInfos.size());
for (int i = 0; i < 4; i++) {
assertEquals(chunkLimit, chunkInfos.get(i).getLen());
assertEquals(chunkLimit * i, chunkInfos.get(i).getOffset());
}
assertEquals(chunkLimit * 4, getBlockData.getSize());

}
}