Skip to content

Commit

Permalink
HDDS-10377. Allow datanodes to do chunk level modifications to closed…
Browse files Browse the repository at this point in the history
… containers. (#7111)
  • Loading branch information
aswinshakil authored Oct 9, 2024
1 parent 01b4437 commit 04c196c
Show file tree
Hide file tree
Showing 3 changed files with 230 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public class ContainerMetrics {

private final EnumMap<ContainerProtos.Type, MutableCounterLong> numOpsArray;
private final EnumMap<ContainerProtos.Type, MutableCounterLong> opsBytesArray;
private final EnumMap<ContainerProtos.Type, MutableCounterLong> opsForClosedContainer;
private final EnumMap<ContainerProtos.Type, MutableRate> opsLatency;
private final EnumMap<ContainerProtos.Type, MutableQuantiles[]> opsLatQuantiles;
private MetricsRegistry registry = null;
Expand All @@ -69,6 +70,7 @@ public ContainerMetrics(int[] intervals) {
MutableQuantiles[] latQuantiles = new MutableQuantiles[len];
this.numOpsArray = new EnumMap<>(ContainerProtos.Type.class);
this.opsBytesArray = new EnumMap<>(ContainerProtos.Type.class);
this.opsForClosedContainer = new EnumMap<>(ContainerProtos.Type.class);
this.opsLatency = new EnumMap<>(ContainerProtos.Type.class);
this.opsLatQuantiles = new EnumMap<>(ContainerProtos.Type.class);
this.registry = new MetricsRegistry("StorageContainerMetrics");
Expand All @@ -77,7 +79,9 @@ public ContainerMetrics(int[] intervals) {
numOpsArray.put(type, registry.newCounter(
"num" + type, "number of " + type + " ops", (long) 0));
opsBytesArray.put(type, registry.newCounter(
"bytes" + type, "bytes used by " + type + "op", (long) 0));
"bytes" + type, "bytes used by " + type + " op", (long) 0));
opsForClosedContainer.put(type, registry.newCounter("bytesForClosedContainer" + type,
"bytes used by " + type + " for closed container op", (long) 0));
opsLatency.put(type, registry.newRate("latencyNs" + type, type + " op"));

for (int j = 0; j < len; j++) {
Expand Down Expand Up @@ -121,6 +125,10 @@ public void incContainerBytesStats(ContainerProtos.Type type, long bytes) {
opsBytesArray.get(type).incr(bytes);
}

public void incClosedContainerBytesStats(ContainerProtos.Type type, long bytes) {
opsForClosedContainer.get(type).incr(bytes);
}

public void incContainerDeleteFailedBlockCountNotZero() {
containerDeleteFailedBlockCountNotZero.incr();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1022,6 +1022,58 @@ ContainerCommandResponseProto handleWriteChunk(
return getWriteChunkResponseSuccess(request, blockDataProto);
}

/**
* Handle Write Chunk operation for closed container. Calls ChunkManager to process the request.
*
*/
public void writeChunkForClosedContainer(ChunkInfo chunkInfo, BlockID blockID,
ChunkBuffer data, KeyValueContainer kvContainer)
throws IOException {
Preconditions.checkNotNull(kvContainer);
Preconditions.checkNotNull(chunkInfo);
Preconditions.checkNotNull(data);
long writeChunkStartTime = Time.monotonicNowNanos();
if (!checkContainerClose(kvContainer)) {
throw new IOException("Container #" + kvContainer.getContainerData().getContainerID() +
" is not in closed state, Container state is " + kvContainer.getContainerState());
}

DispatcherContext dispatcherContext = DispatcherContext.getHandleWriteChunk();
chunkManager.writeChunk(kvContainer, blockID, chunkInfo, data,
dispatcherContext);

// Increment write stats for WriteChunk after write.
metrics.incClosedContainerBytesStats(Type.WriteChunk, chunkInfo.getLen());
metrics.incContainerOpsLatencies(Type.WriteChunk, Time.monotonicNowNanos() - writeChunkStartTime);
}

/**
* Handle Put Block operation for closed container. Calls BlockManager to process the request.
*
*/
public void putBlockForClosedContainer(List<ContainerProtos.ChunkInfo> chunkInfos, KeyValueContainer kvContainer,
BlockData blockData, long blockCommitSequenceId)
throws IOException {
Preconditions.checkNotNull(kvContainer);
Preconditions.checkNotNull(blockData);
long startTime = Time.monotonicNowNanos();

if (!checkContainerClose(kvContainer)) {
throw new IOException("Container #" + kvContainer.getContainerData().getContainerID() +
" is not in closed state, Container state is " + kvContainer.getContainerState());
}
blockData.setChunks(chunkInfos);
// To be set from the Replica's BCSId
blockData.setBlockCommitSequenceId(blockCommitSequenceId);

blockManager.putBlock(kvContainer, blockData, false);
ContainerProtos.BlockData blockDataProto = blockData.getProtoBufMessage();
final long numBytes = blockDataProto.getSerializedSize();
// Increment write stats for PutBlock after write.
metrics.incClosedContainerBytesStats(Type.PutBlock, numBytes);
metrics.incContainerOpsLatencies(Type.PutBlock, Time.monotonicNowNanos() - startTime);
}

/**
* Handle Put Small File operation. Writes the chunk and associated key
* using a single RPC. Calls BlockManager and ChunkManager to process the
Expand Down Expand Up @@ -1198,6 +1250,19 @@ private void checkContainerOpen(KeyValueContainer kvContainer)
throw new StorageContainerException(msg, result);
}

/**
* Check if container is Closed.
* @param kvContainer
*/
private boolean checkContainerClose(KeyValueContainer kvContainer) {

final State containerState = kvContainer.getContainerState();
if (containerState == State.QUASI_CLOSED || containerState == State.CLOSED || containerState == State.UNHEALTHY) {
return true;
}
return false;
}

@Override
public Container importContainer(ContainerData originalContainerData,
final InputStream rawContainerStream,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,41 @@

import org.apache.commons.codec.binary.Hex;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.common.ChunkBuffer;
import org.apache.hadoop.ozone.container.ContainerTestHelper;
import org.apache.hadoop.ozone.container.common.ContainerTestUtils;
import org.apache.hadoop.ozone.container.common.helpers.BlockData;
import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
import org.apache.hadoop.ozone.container.common.interfaces.DBHandle;
import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
import org.apache.hadoop.ozone.container.common.volume.StorageVolume;
import org.apache.hadoop.ozone.container.keyvalue.ContainerLayoutTestInfo;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler;
import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
import org.apache.hadoop.ozone.container.keyvalue.interfaces.ChunkManager;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.security.MessageDigest;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
import java.util.stream.Stream;

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.hadoop.ozone.container.ContainerTestHelper.getChunk;
import static org.apache.hadoop.ozone.container.ContainerTestHelper.setDataChecksum;
import static org.apache.hadoop.ozone.container.common.ContainerTestUtils.WRITE_STAGE;
Expand Down Expand Up @@ -133,6 +154,141 @@ public void testPartialRead() throws Exception {
readData2.rewind().toByteString());
}

@ParameterizedTest
@MethodSource("getNonClosedStates")
public void testWriteChunkAndPutBlockFailureForNonClosedContainer(
ContainerProtos.ContainerDataProto.State state) throws IOException {
KeyValueContainer keyValueContainer = getKeyValueContainer();
keyValueContainer.getContainerData().setState(state);
ContainerSet containerSet = new ContainerSet(100);
containerSet.addContainer(keyValueContainer);
KeyValueHandler keyValueHandler = createKeyValueHandler(containerSet);
ChunkBuffer.wrap(getData());
Assertions.assertThrows(IOException.class, () -> keyValueHandler.writeChunkForClosedContainer(
getChunkInfo(), getBlockID(), ChunkBuffer.wrap(getData()), keyValueContainer));
Assertions.assertThrows(IOException.class, () -> keyValueHandler.putBlockForClosedContainer(
null, keyValueContainer, new BlockData(getBlockID()), 0L));
}

@Test
public void testWriteChunkForClosedContainer()
throws IOException {
ChunkBuffer writeChunkData = ChunkBuffer.wrap(getData());
KeyValueContainer kvContainer = getKeyValueContainer();
KeyValueContainerData containerData = kvContainer.getContainerData();
closedKeyValueContainer();
ContainerSet containerSet = new ContainerSet(100);
containerSet.addContainer(kvContainer);
KeyValueHandler keyValueHandler = createKeyValueHandler(containerSet);
keyValueHandler.writeChunkForClosedContainer(getChunkInfo(), getBlockID(), writeChunkData, kvContainer);
ChunkBuffer readChunkData = keyValueHandler.getChunkManager().readChunk(kvContainer,
getBlockID(), getChunkInfo(), WRITE_STAGE);
rewindBufferToDataStart();
Assertions.assertEquals(writeChunkData, readChunkData);
Assertions.assertEquals(containerData.getWriteBytes(), writeChunkData.remaining());
Assertions.assertEquals(containerData.getBytesUsed(), writeChunkData.remaining());

// Test Overwrite
keyValueHandler.writeChunkForClosedContainer(getChunkInfo(), getBlockID(),
writeChunkData, kvContainer);
readChunkData = keyValueHandler.getChunkManager().readChunk(kvContainer,
getBlockID(), getChunkInfo(), WRITE_STAGE);
rewindBufferToDataStart();
Assertions.assertEquals(writeChunkData, readChunkData);
Assertions.assertEquals(containerData.getWriteBytes(), 2L * writeChunkData.remaining());
// Overwrites won't increase the bytesUsed of a Container
Assertions.assertEquals(containerData.getBytesUsed(), writeChunkData.remaining());

// Test new write chunk after overwrite
byte[] bytes = "testing write chunks with after overwrite".getBytes(UTF_8);
ChunkBuffer newWriteChunkData = ChunkBuffer.allocate(bytes.length).put(bytes);
newWriteChunkData.rewind();

// Write chunk after the previous overwrite chunk.
ChunkInfo newChunkInfo = new ChunkInfo(String.format("%d.data.%d", getBlockID()
.getLocalID(), writeChunkData.remaining()), writeChunkData.remaining(), bytes.length);
keyValueHandler.writeChunkForClosedContainer(newChunkInfo, getBlockID(),
newWriteChunkData, kvContainer);
readChunkData = keyValueHandler.getChunkManager().readChunk(kvContainer,
getBlockID(), newChunkInfo, WRITE_STAGE);
newWriteChunkData.rewind();
Assertions.assertEquals(newWriteChunkData, readChunkData);
Assertions.assertEquals(containerData.getWriteBytes(), 2L * writeChunkData.remaining()
+ newWriteChunkData.remaining());
Assertions.assertEquals(containerData.getBytesUsed(), writeChunkData.remaining() + newWriteChunkData.remaining());
}

@Test
public void testPutBlockForClosedContainer() throws IOException {
KeyValueContainer kvContainer = getKeyValueContainer();
KeyValueContainerData containerData = kvContainer.getContainerData();
closedKeyValueContainer();
ContainerSet containerSet = new ContainerSet(100);
containerSet.addContainer(kvContainer);
KeyValueHandler keyValueHandler = createKeyValueHandler(containerSet);
List<ContainerProtos.ChunkInfo> chunkInfoList = new ArrayList<>();
chunkInfoList.add(getChunkInfo().getProtoBufMessage());
BlockData putBlockData = new BlockData(getBlockID());
keyValueHandler.putBlockForClosedContainer(chunkInfoList, kvContainer, putBlockData, 1L);
Assertions.assertEquals(containerData.getBlockCommitSequenceId(), 1L);
Assertions.assertEquals(containerData.getBlockCount(), 1L);

try (DBHandle dbHandle = BlockUtils.getDB(containerData, new OzoneConfiguration())) {
long localID = putBlockData.getLocalID();
BlockData getBlockData = dbHandle.getStore().getBlockDataTable()
.get(containerData.getBlockKey(localID));
Assertions.assertTrue(blockDataEquals(putBlockData, getBlockData));
}

// Add another chunk and check the put block data
ChunkInfo newChunkInfo = new ChunkInfo(String.format("%d.data.%d", getBlockID()
.getLocalID(), 1L), 0, 20L);
chunkInfoList.add(newChunkInfo.getProtoBufMessage());
keyValueHandler.putBlockForClosedContainer(chunkInfoList, kvContainer, putBlockData, 2L);
Assertions.assertEquals(containerData.getBlockCommitSequenceId(), 2L);
Assertions.assertEquals(containerData.getBlockCount(), 1L);

try (DBHandle dbHandle = BlockUtils.getDB(containerData, new OzoneConfiguration())) {
long localID = putBlockData.getLocalID();
BlockData getBlockData = dbHandle.getStore().getBlockDataTable()
.get(containerData.getBlockKey(localID));
Assertions.assertTrue(blockDataEquals(putBlockData, getBlockData));
}

// Put block on bcsId <= containerBcsId should be a no-op
keyValueHandler.putBlockForClosedContainer(chunkInfoList, kvContainer, putBlockData, 2L);
Assertions.assertEquals(containerData.getBlockCommitSequenceId(), 2L);
}

private boolean blockDataEquals(BlockData putBlockData, BlockData getBlockData) {
return getBlockData.getSize() == putBlockData.getSize() &&
Objects.equals(getBlockData.getBlockID(), putBlockData.getBlockID()) &&
Objects.equals(getBlockData.getMetadata(), putBlockData.getMetadata()) &&
Objects.equals(getBlockData.getChunks(), putBlockData.getChunks());
}


private static Stream<Arguments> getNonClosedStates() {
return Stream.of(
Arguments.of(ContainerProtos.ContainerDataProto.State.OPEN),
Arguments.of(ContainerProtos.ContainerDataProto.State.RECOVERING),
Arguments.of(ContainerProtos.ContainerDataProto.State.CLOSING),
Arguments.of(ContainerProtos.ContainerDataProto.State.INVALID));
}

public KeyValueHandler createKeyValueHandler(ContainerSet containerSet)
throws IOException {
OzoneConfiguration conf = new OzoneConfiguration();
String dnUuid = UUID.randomUUID().toString();
MutableVolumeSet volumeSet = new MutableVolumeSet(dnUuid, conf,
null, StorageVolume.VolumeType.DATA_VOLUME, null);
return ContainerTestUtils.getKeyValueHandler(conf, dnUuid, containerSet, volumeSet);
}

public void closedKeyValueContainer() {
getKeyValueContainer().getContainerData().setState(ContainerProtos.ContainerDataProto.State.CLOSED);
}

@Override
protected ContainerLayoutTestInfo getStrategy() {
return ContainerLayoutTestInfo.FILE_PER_BLOCK;
Expand Down

0 comments on commit 04c196c

Please sign in to comment.