Skip to content

Commit

Permalink
Merge branch 'apache:master' into HDDS-11243
Browse files Browse the repository at this point in the history
  • Loading branch information
slfan1989 authored Aug 21, 2024
2 parents c5ece58 + 3db5ad3 commit b365587
Show file tree
Hide file tree
Showing 68 changed files with 3,638 additions and 333 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -248,21 +248,21 @@ public enum ChecksumCombineMode {
private String fsDefaultBucketLayout = "FILE_SYSTEM_OPTIMIZED";

@Config(key = "incremental.chunk.list",
defaultValue = "false",
defaultValue = "true",
type = ConfigType.BOOLEAN,
description = "Client PutBlock request can choose incremental chunk " +
"list rather than full chunk list to optimize performance. " +
"Critical to HBase.",
"Critical to HBase. EC does not support this feature.",
tags = ConfigTag.CLIENT)
private boolean incrementalChunkList = true;

@Config(key = "stream.putblock.piggybacking",
defaultValue = "false",
defaultValue = "true",
type = ConfigType.BOOLEAN,
description = "Allow PutBlock to be piggybacked in WriteChunk " +
"requests if the chunk is small.",
tags = ConfigTag.CLIENT)
private boolean enablePutblockPiggybacking = false;
private boolean enablePutblockPiggybacking = true;

@Config(key = "key.write.concurrency",
defaultValue = "1",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
Expand Down Expand Up @@ -94,6 +95,9 @@ public class BlockOutputStream extends OutputStream {
KeyValue.newBuilder().setKey(FULL_CHUNK).build();

private AtomicReference<BlockID> blockID;
// planned block full size
private long blockSize;
private AtomicBoolean eofSent = new AtomicBoolean(false);
private final AtomicReference<ChunkInfo> previousChunkInfo
= new AtomicReference<>();

Expand Down Expand Up @@ -149,6 +153,7 @@ public class BlockOutputStream extends OutputStream {
private Pipeline pipeline;
private final ContainerClientMetrics clientMetrics;
private boolean allowPutBlockPiggybacking;
private boolean supportIncrementalChunkList;

private CompletableFuture<Void> lastFlushFuture;
private CompletableFuture<Void> allPendingFlushFutures = CompletableFuture.completedFuture(null);
Expand All @@ -164,6 +169,7 @@ public class BlockOutputStream extends OutputStream {
@SuppressWarnings("checkstyle:ParameterNumber")
public BlockOutputStream(
BlockID blockID,
long blockSize,
XceiverClientFactory xceiverClientManager,
Pipeline pipeline,
BufferPool bufferPool,
Expand All @@ -175,6 +181,7 @@ public BlockOutputStream(
this.xceiverClientFactory = xceiverClientManager;
this.config = config;
this.blockID = new AtomicReference<>(blockID);
this.blockSize = blockSize;
replicationIndex = pipeline.getReplicaIndex(pipeline.getClosestNode());
KeyValue keyValue =
KeyValue.newBuilder().setKey("TYPE").setValue("KEY").build();
Expand All @@ -189,8 +196,13 @@ public BlockOutputStream(
}
this.containerBlockData = BlockData.newBuilder().setBlockID(
blkIDBuilder.build()).addMetadata(keyValue);
this.pipeline = pipeline;
// tell DataNode I will send incremental chunk list
if (config.getIncrementalChunkList()) {
// EC does not support incremental chunk list.
this.supportIncrementalChunkList = config.getIncrementalChunkList() &&
this instanceof RatisBlockOutputStream && allDataNodesSupportPiggybacking();
LOG.debug("incrementalChunkList is {}", supportIncrementalChunkList);
if (supportIncrementalChunkList) {
this.containerBlockData.addMetadata(INCREMENTAL_CHUNK_LIST_KV);
this.lastChunkBuffer = DIRECT_BUFFER_POOL.getBuffer(config.getStreamBufferSize());
this.lastChunkOffset = 0;
Expand Down Expand Up @@ -223,16 +235,17 @@ public BlockOutputStream(
checksum = new Checksum(config.getChecksumType(),
config.getBytesPerChecksum());
this.clientMetrics = clientMetrics;
this.pipeline = pipeline;
this.streamBufferArgs = streamBufferArgs;
this.allowPutBlockPiggybacking = config.getEnablePutblockPiggybacking() &&
allDataNodesSupportPiggybacking();
LOG.debug("PutBlock piggybacking is {}", allowPutBlockPiggybacking);
}

private boolean allDataNodesSupportPiggybacking() {
// return true only if all DataNodes in the pipeline are on a version
// that supports PutBlock piggybacking.
for (DatanodeDetails dn : pipeline.getNodes()) {
LOG.debug("dn = {}, version = {}", dn, dn.getCurrentVersion());
if (dn.getCurrentVersion() <
COMBINED_PUTBLOCK_WRITECHUNK_RPC.toProtoValue()) {
return false;
Expand Down Expand Up @@ -530,15 +543,17 @@ CompletableFuture<PutBlockResult> executePutBlock(boolean close,
final XceiverClientReply asyncReply;
try {
BlockData blockData = containerBlockData.build();
LOG.debug("sending PutBlock {}", blockData);
LOG.debug("sending PutBlock {} flushPos {}", blockData, flushPos);

if (config.getIncrementalChunkList()) {
if (supportIncrementalChunkList) {
// remove any chunks in the containerBlockData list.
// since they are sent.
containerBlockData.clearChunks();
}

asyncReply = putBlockAsync(xceiverClient, blockData, close, tokenString);
// if block is full, send the eof
boolean isBlockFull = (blockSize != -1 && flushPos == blockSize);
asyncReply = putBlockAsync(xceiverClient, blockData, close || isBlockFull, tokenString);
CompletableFuture<ContainerCommandResponseProto> future = asyncReply.getResponse();
flushFuture = future.thenApplyAsync(e -> {
try {
Expand All @@ -550,6 +565,7 @@ CompletableFuture<PutBlockResult> executePutBlock(boolean close,
if (getIoException() == null && !force) {
handleSuccessfulPutBlock(e.getPutBlock().getCommittedBlockLength(),
asyncReply, flushPos, byteBufferList);
eofSent.set(close || isBlockFull);
}
return e;
}, responseExecutor).exceptionally(e -> {
Expand Down Expand Up @@ -690,7 +706,7 @@ private synchronized CompletableFuture<Void> handleFlushInternalSynchronized(boo
// There're no pending written data, but there're uncommitted data.
updatePutBlockLength();
putBlockResultFuture = executePutBlock(close, false);
} else if (close) {
} else if (close && !eofSent.get()) {
// forcing an "empty" putBlock if stream is being closed without new
// data since latest flush - we need to send the "EOF" flag
updatePutBlockLength();
Expand Down Expand Up @@ -866,7 +882,7 @@ private CompletableFuture<PutBlockResult> writeChunkToContainer(
try {
BlockData blockData = null;

if (config.getIncrementalChunkList()) {
if (supportIncrementalChunkList) {
updateBlockDataForWriteChunk(chunk);
} else {
containerBlockData.addChunks(chunkInfo);
Expand All @@ -880,7 +896,7 @@ private CompletableFuture<PutBlockResult> writeChunkToContainer(
blockData = containerBlockData.build();
LOG.debug("piggyback chunk list {}", blockData);

if (config.getIncrementalChunkList()) {
if (supportIncrementalChunkList) {
// remove any chunks in the containerBlockData list.
// since they are sent.
containerBlockData.clearChunks();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public ECBlockOutputStream(
ContainerClientMetrics clientMetrics, StreamBufferArgs streamBufferArgs,
Supplier<ExecutorService> executorServiceSupplier
) throws IOException {
super(blockID, xceiverClientManager,
super(blockID, -1, xceiverClientManager,
pipeline, bufferPool, config, token, clientMetrics, streamBufferArgs, executorServiceSupplier);
// In EC stream, there will be only one node in pipeline.
this.datanodeDetails = pipeline.getClosestNode();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ public class RatisBlockOutputStream extends BlockOutputStream
@SuppressWarnings("checkstyle:ParameterNumber")
public RatisBlockOutputStream(
BlockID blockID,
long blockSize,
XceiverClientFactory xceiverClientManager,
Pipeline pipeline,
BufferPool bufferPool,
Expand All @@ -80,7 +81,7 @@ public RatisBlockOutputStream(
ContainerClientMetrics clientMetrics, StreamBufferArgs streamBufferArgs,
Supplier<ExecutorService> blockOutputStreamResourceProvider
) throws IOException {
super(blockID, xceiverClientManager, pipeline,
super(blockID, blockSize, xceiverClientManager, pipeline,
bufferPool, config, token, clientMetrics, streamBufferArgs, blockOutputStreamResourceProvider);
this.commitWatcher = new CommitWatcher(bufferPool, getXceiverClient());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ private BlockOutputStream createBlockOutputStream(BufferPool bufferPool)

return new RatisBlockOutputStream(
new BlockID(1L, 1L),
-1,
xcm,
pipeline,
bufferPool,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,9 @@ public static DatanodeDetails.Builder newBuilder(
}
if (datanodeDetailsProto.hasCurrentVersion()) {
builder.setCurrentVersion(datanodeDetailsProto.getCurrentVersion());
} else {
// fallback to version 1 if not present
builder.setCurrentVersion(DatanodeVersion.SEPARATE_RATIS_PORTS_AVAILABLE.toProtoValue());
}
return builder;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hdds.protocol;

import org.apache.hadoop.hdds.DatanodeVersion;
import org.apache.hadoop.hdds.protocol.DatanodeDetails.Port;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -48,6 +49,24 @@ void protoIncludesNewPortsOnlyForV1() {
assertPorts(protoV1, ALL_PORTS);
}

@Test
public void testNewBuilderCurrentVersion() {
// test that if the current version is not set (Ozone 1.4.0 and earlier),
// it falls back to SEPARATE_RATIS_PORTS_AVAILABLE
DatanodeDetails dn = MockDatanodeDetails.randomDatanodeDetails();
HddsProtos.DatanodeDetailsProto.Builder protoBuilder =
dn.toProtoBuilder(DEFAULT_VERSION.toProtoValue());
protoBuilder.clearCurrentVersion();
DatanodeDetails dn2 = DatanodeDetails.newBuilder(protoBuilder.build()).build();
assertEquals(DatanodeVersion.SEPARATE_RATIS_PORTS_AVAILABLE.toProtoValue(), dn2.getCurrentVersion());

// test that if the current version is set, it is used
protoBuilder =
dn.toProtoBuilder(DEFAULT_VERSION.toProtoValue());
DatanodeDetails dn3 = DatanodeDetails.newBuilder(protoBuilder.build()).build();
assertEquals(DatanodeVersion.CURRENT.toProtoValue(), dn3.getCurrentVersion());
}

public static void assertPorts(HddsProtos.DatanodeDetailsProto dn,
Set<Port.Name> expectedPorts) throws IllegalArgumentException {
assertEquals(expectedPorts.size(), dn.getPortsCount());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.hadoop.ozone.container.metadata.DatanodeStoreSchemaThreeImpl;
import org.apache.hadoop.ozone.container.metadata.DatanodeStoreSchemaTwoImpl;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
import org.apache.hadoop.ozone.container.metadata.DatanodeStoreWithIncrementalChunkList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -501,8 +502,6 @@ static PendingDelete countPendingDeletesSchemaV2(

Table<Long, DeletedBlocksTransaction> delTxTable =
schemaTwoStore.getDeleteTransactionTable();
final Table<String, BlockData> blockDataTable
= schemaTwoStore.getBlockDataTable();

try (TableIterator<Long, ? extends Table.KeyValue<Long,
DeletedBlocksTransaction>> iterator = delTxTable.iterator()) {
Expand All @@ -515,7 +514,7 @@ static PendingDelete countPendingDeletesSchemaV2(
// counted towards bytes used and total block count above.
pendingDeleteBlockCountTotal += localIDs.size();
pendingDeleteBytes += computePendingDeleteBytes(
localIDs, containerData, blockDataTable);
localIDs, containerData, schemaTwoStore);
}
}

Expand All @@ -525,12 +524,12 @@ static PendingDelete countPendingDeletesSchemaV2(

static long computePendingDeleteBytes(List<Long> localIDs,
KeyValueContainerData containerData,
Table<String, BlockData> blockDataTable) {
DatanodeStoreWithIncrementalChunkList store) {
long pendingDeleteBytes = 0;
for (long id : localIDs) {
try {
final String blockKey = containerData.getBlockKey(id);
final BlockData blockData = blockDataTable.get(blockKey);
final BlockData blockData = store.getBlockByID(null, blockKey);
if (blockData != null) {
pendingDeleteBytes += blockData.getSize();
}
Expand All @@ -544,23 +543,21 @@ static long computePendingDeleteBytes(List<Long> localIDs,
}

static PendingDelete countPendingDeletesSchemaV3(
DatanodeStoreSchemaThreeImpl schemaThreeStore,
DatanodeStoreSchemaThreeImpl store,
KeyValueContainerData containerData) throws IOException {
long pendingDeleteBlockCountTotal = 0;
long pendingDeleteBytes = 0;
final Table<String, BlockData> blockDataTable
= schemaThreeStore.getBlockDataTable();
try (
TableIterator<String, ? extends Table.KeyValue<String,
DeletedBlocksTransaction>>
iter = schemaThreeStore.getDeleteTransactionTable()
iter = store.getDeleteTransactionTable()
.iterator(containerData.containerPrefix())) {
while (iter.hasNext()) {
DeletedBlocksTransaction delTx = iter.next().getValue();
final List<Long> localIDs = delTx.getLocalIDList();
pendingDeleteBlockCountTotal += localIDs.size();
pendingDeleteBytes += computePendingDeleteBytes(
localIDs, containerData, blockDataTable);
localIDs, containerData, store);
}
}
return new PendingDelete(pendingDeleteBlockCountTotal,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ public List<BlockData> listBlock(Container container, long startLocalID, int
.getSequentialRangeKVs(startKey, count,
cData.containerPrefix(), cData.getUnprefixedKeyFilter());
for (Table.KeyValue<String, BlockData> entry : range) {
result.add(entry.getValue());
result.add(db.getStore().getCompleteBlockData(entry.getValue(), null, entry.getKey()));
}
return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,8 @@ private ContainerBackgroundTaskResult deleteViaTransactionStore(
try {
Table<String, BlockData> blockDataTable =
meta.getStore().getBlockDataTable();
Table<String, BlockData> lastChunkInfoTable =
meta.getStore().getLastChunkInfoTable();
DeleteTransactionStore<?> txnStore =
(DeleteTransactionStore<?>) meta.getStore();
Table<?, DeletedBlocksTransaction> deleteTxns =
Expand Down Expand Up @@ -376,8 +378,11 @@ private ContainerBackgroundTaskResult deleteViaTransactionStore(
for (DeletedBlocksTransaction delTx : deletedBlocksTxs) {
deleter.apply(deleteTxns, batch, delTx.getTxID());
for (Long blk : delTx.getLocalIDList()) {
// delete from both blockDataTable and lastChunkInfoTable.
blockDataTable.deleteWithBatch(batch,
containerData.getBlockKey(blk));
lastChunkInfoTable.deleteWithBatch(batch,
containerData.getBlockKey(blk));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,11 @@ default BlockData getBlockByID(BlockID blockID,
// check block data table
BlockData blockData = getBlockDataTable().get(blockKey);

return getCompleteBlockData(blockData, blockID, blockKey);
}

default BlockData getCompleteBlockData(BlockData blockData,
BlockID blockID, String blockKey) throws IOException {
if (blockData == null) {
throw new StorageContainerException(
NO_SUCH_BLOCK_ERR_MSG + " BlockID : " + blockID, NO_SUCH_BLOCK);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.hadoop.hdds.StringUtils;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
import org.apache.hadoop.hdds.upgrade.HDDSLayoutFeature;
import org.apache.hadoop.hdds.utils.MetadataKeyFilters;
import org.apache.hadoop.hdds.utils.db.BatchOperation;
import org.apache.hadoop.hdds.utils.db.FixedLengthStringCodec;
Expand All @@ -31,6 +32,7 @@
import org.apache.hadoop.ozone.container.common.helpers.BlockData;
import org.apache.hadoop.ozone.container.common.interfaces.BlockIterator;
import org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
import org.apache.hadoop.ozone.container.upgrade.VersionedDatanodeFeatures;
import org.rocksdb.LiveFileMetaData;

import java.io.File;
Expand Down Expand Up @@ -106,6 +108,9 @@ public void removeKVContainerData(long containerID) throws IOException {
try (BatchOperation batch = getBatchHandler().initBatchOperation()) {
getMetadataTable().deleteBatchWithPrefix(batch, prefix);
getBlockDataTable().deleteBatchWithPrefix(batch, prefix);
if (VersionedDatanodeFeatures.isFinalized(HDDSLayoutFeature.HBASE_SUPPORT)) {
getLastChunkInfoTable().deleteBatchWithPrefix(batch, prefix);
}
getDeleteTransactionTable().deleteBatchWithPrefix(batch, prefix);
getBatchHandler().commitBatchOperation(batch);
}
Expand All @@ -118,6 +123,10 @@ public void dumpKVContainerData(long containerID, File dumpDir)
getTableDumpFile(getMetadataTable(), dumpDir), prefix);
getBlockDataTable().dumpToFileWithPrefix(
getTableDumpFile(getBlockDataTable(), dumpDir), prefix);
if (VersionedDatanodeFeatures.isFinalized(HDDSLayoutFeature.HBASE_SUPPORT)) {
getLastChunkInfoTable().dumpToFileWithPrefix(
getTableDumpFile(getLastChunkInfoTable(), dumpDir), prefix);
}
getDeleteTransactionTable().dumpToFileWithPrefix(
getTableDumpFile(getDeleteTransactionTable(), dumpDir),
prefix);
Expand All @@ -129,6 +138,10 @@ public void loadKVContainerData(File dumpDir)
getTableDumpFile(getMetadataTable(), dumpDir));
getBlockDataTable().loadFromFile(
getTableDumpFile(getBlockDataTable(), dumpDir));
if (VersionedDatanodeFeatures.isFinalized(HDDSLayoutFeature.HBASE_SUPPORT)) {
getLastChunkInfoTable().loadFromFile(
getTableDumpFile(getLastChunkInfoTable(), dumpDir));
}
getDeleteTransactionTable().loadFromFile(
getTableDumpFile(getDeleteTransactionTable(), dumpDir));
}
Expand Down
Loading

0 comments on commit b365587

Please sign in to comment.