From c0abd3c2c0379c74c13f939d63e71556dcef491c Mon Sep 17 00:00:00 2001 From: wanghongbing Date: Sat, 20 Jul 2024 16:15:30 +0800 Subject: [PATCH 1/6] HDDS-11209. Avoid insufficient EC pipelines in the container pipeline cache --- .../org/apache/hadoop/ozone/om/ScmClient.java | 26 ++++++++++++++++--- 1 file changed, 22 insertions(+), 4 deletions(-) diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ScmClient.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ScmClient.java index 77ee0d5851f..0d08e263916 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ScmClient.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ScmClient.java @@ -21,6 +21,8 @@ import com.google.common.cache.CacheLoader; import com.google.common.cache.CacheLoader.InvalidCacheLoadException; import com.google.common.cache.LoadingCache; +import org.apache.hadoop.hdds.client.ECReplicationConfig; +import org.apache.hadoop.hdds.client.ReplicationConfig; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; @@ -33,9 +35,11 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import java.util.stream.IntStream; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_CONTAINER_LOCATION_CACHE_SIZE; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_CONTAINER_LOCATION_CACHE_SIZE_DEFAULT; @@ -113,12 +117,26 @@ public Map getContainerLocations(Iterable containerIds, } try { Map result = containerLocationCache.getAll(containerIds); - // Don't keep empty pipelines in the cache. - List emptyPipelines = result.entrySet().stream() - .filter(e -> e.getValue().isEmpty()) + // Don't keep empty pipelines or insufficient EC pipelines in the cache. + List unsatisfiedCachePipelines = result.entrySet().stream() + .filter(e -> { + Pipeline pipeline = e.getValue(); + // filter empty pipelines + if (pipeline.isEmpty()) { + return true; + } + // filter insufficient EC pipelines + ReplicationConfig repConfig = pipeline.getReplicationConfig(); + if (repConfig instanceof ECReplicationConfig) { + int d = ((ECReplicationConfig) repConfig).getData(); + Set dataIndexes = IntStream.rangeClosed(1, d).boxed().collect(Collectors.toSet()); + return !pipeline.getReplicaIndexes().values().containsAll(dataIndexes); + } + return false; + }) .map(Map.Entry::getKey) .collect(Collectors.toList()); - containerLocationCache.invalidateAll(emptyPipelines); + containerLocationCache.invalidateAll(unsatisfiedCachePipelines); return result; } catch (ExecutionException e) { return handleCacheExecutionException(e); From 166d31993ace65ba4e7e6e2b6efd61d57716c9bb Mon Sep 17 00:00:00 2001 From: wanghongbing Date: Mon, 29 Jul 2024 15:16:19 +0800 Subject: [PATCH 2/6] ec data indexes add to map --- .../main/java/org/apache/hadoop/ozone/om/ScmClient.java | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ScmClient.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ScmClient.java index 0d08e263916..ffe14c8fe10 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ScmClient.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ScmClient.java @@ -55,6 +55,7 @@ public class ScmClient { private final StorageContainerLocationProtocol containerClient; private final LoadingCache containerLocationCache; private final CacheMetrics containerCacheMetrics; + private static final Map> EC_DATA_INDEXES = new HashMap<>(); ScmClient(ScmBlockLocationProtocol blockClient, StorageContainerLocationProtocol containerClient, @@ -118,7 +119,7 @@ public Map getContainerLocations(Iterable containerIds, try { Map result = containerLocationCache.getAll(containerIds); // Don't keep empty pipelines or insufficient EC pipelines in the cache. - List unsatisfiedCachePipelines = result.entrySet().stream() + List uncachePipelines = result.entrySet().stream() .filter(e -> { Pipeline pipeline = e.getValue(); // filter empty pipelines @@ -129,14 +130,15 @@ public Map getContainerLocations(Iterable containerIds, ReplicationConfig repConfig = pipeline.getReplicationConfig(); if (repConfig instanceof ECReplicationConfig) { int d = ((ECReplicationConfig) repConfig).getData(); - Set dataIndexes = IntStream.rangeClosed(1, d).boxed().collect(Collectors.toSet()); + Set dataIndexes = EC_DATA_INDEXES.computeIfAbsent(d, k -> + IntStream.rangeClosed(1, d).boxed().collect(Collectors.toSet())); return !pipeline.getReplicaIndexes().values().containsAll(dataIndexes); } return false; }) .map(Map.Entry::getKey) .collect(Collectors.toList()); - containerLocationCache.invalidateAll(unsatisfiedCachePipelines); + containerLocationCache.invalidateAll(uncachePipelines); return result; } catch (ExecutionException e) { return handleCacheExecutionException(e); From b4d26b2e800564f8fb85615e8a5e790e9c35f2f0 Mon Sep 17 00:00:00 2001 From: wanghongbing Date: Sun, 4 Aug 2024 21:30:58 +0800 Subject: [PATCH 3/6] add test --- .../om/TestOmContainerLocationCache.java | 110 +++++++++++++++++- 1 file changed, 109 insertions(+), 1 deletion(-) diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmContainerLocationCache.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmContainerLocationCache.java index 2ae69dc3c96..4ea4c1b48a2 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmContainerLocationCache.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmContainerLocationCache.java @@ -18,10 +18,12 @@ package org.apache.hadoop.ozone.om; +import com.google.common.collect.ImmutableMap; import org.apache.commons.io.FileUtils; import org.apache.commons.io.IOUtils; import org.apache.hadoop.hdds.HddsConfigKeys; import org.apache.hadoop.hdds.client.ContainerBlockID; +import org.apache.hadoop.hdds.client.ECReplicationConfig; import org.apache.hadoop.hdds.client.RatisReplicationConfig; import org.apache.hadoop.hdds.client.ReplicationConfig; import org.apache.hadoop.hdds.conf.OzoneConfiguration; @@ -89,8 +91,11 @@ import java.io.File; import java.io.IOException; import java.io.InputStream; +import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; @@ -145,10 +150,17 @@ public class TestOmContainerLocationCache { private static ObjectStore objectStore; private static XceiverClientGrpc mockDn1Protocol; private static XceiverClientGrpc mockDn2Protocol; + private static XceiverClientGrpc mockDnEcProtocol; private static final DatanodeDetails DN1 = MockDatanodeDetails.createDatanodeDetails(UUID.randomUUID()); private static final DatanodeDetails DN2 = MockDatanodeDetails.createDatanodeDetails(UUID.randomUUID()); + private static final DatanodeDetails DN3 = + MockDatanodeDetails.createDatanodeDetails(UUID.randomUUID()); + private static final DatanodeDetails DN4 = + MockDatanodeDetails.createDatanodeDetails(UUID.randomUUID()); + private static final DatanodeDetails DN5 = + MockDatanodeDetails.createDatanodeDetails(UUID.randomUUID()); private static final AtomicLong CONTAINER_ID = new AtomicLong(1); @@ -200,6 +212,8 @@ private static XceiverClientManager mockDataNodeClientFactory() throws IOException { mockDn1Protocol = spy(new XceiverClientGrpc(createPipeline(DN1), conf)); mockDn2Protocol = spy(new XceiverClientGrpc(createPipeline(DN2), conf)); + mockDnEcProtocol = spy(new XceiverClientGrpc(createEcPipeline( + ImmutableMap.of(DN1, 1, DN2, 2, DN3, 3, DN4, 4, DN5, 5)), conf)); XceiverClientManager manager = mock(XceiverClientManager.class); when(manager.acquireClient(argThat(matchEmptyPipeline()))) .thenCallRealMethod(); @@ -217,6 +231,11 @@ private static XceiverClientManager mockDataNodeClientFactory() .thenReturn(mockDn2Protocol); when(manager.acquireClientForReadData(argThat(matchPipeline(DN2)))) .thenReturn(mockDn2Protocol); + + when(manager.acquireClient(argThat(matchEcPipeline()))) + .thenReturn(mockDnEcProtocol); + when(manager.acquireClientForReadData(argThat(matchEcPipeline()))) + .thenReturn(mockDnEcProtocol); return manager; } @@ -231,6 +250,11 @@ private static ArgumentMatcher matchPipeline(DatanodeDetails dn) { && argument.getNodes().get(0).getUuid().equals(dn.getUuid()); } + private static ArgumentMatcher matchEcPipeline() { + return argument -> argument != null && !argument.getNodes().isEmpty() + && argument.getReplicationConfig() instanceof ECReplicationConfig; + } + private static void createBucket(String volumeName, String bucketName, boolean isVersionEnabled) throws IOException { @@ -256,12 +280,14 @@ private static void createVolume(String volumeName) throws IOException { public void beforeEach() throws IOException { CONTAINER_ID.getAndIncrement(); reset(mockScmBlockLocationProtocol, mockScmContainerClient, - mockDn1Protocol, mockDn2Protocol); + mockDn1Protocol, mockDn2Protocol, mockDnEcProtocol); InnerNode.Factory factory = InnerNodeImpl.FACTORY; when(mockScmBlockLocationProtocol.getNetworkTopology()).thenReturn( factory.newInnerNode("", "", null, NetConstants.ROOT_LEVEL, 1)); when(mockDn1Protocol.getPipeline()).thenReturn(createPipeline(DN1)); when(mockDn2Protocol.getPipeline()).thenReturn(createPipeline(DN2)); + when(mockDnEcProtocol.getPipeline()).thenReturn(createEcPipeline( + ImmutableMap.of(DN1, 1, DN2, 2, DN3, 3, DN4, 4, DN5, 5))); } /** @@ -575,6 +601,48 @@ public void containerRefreshedOnEmptyPipelines() throws Exception { .getContainerWithPipelineBatch(newHashSet(CONTAINER_ID.get())); } + @Test + public void containerRefreshedOnInsufficientEcPipelines() throws Exception { + int chunkSize = 1024 * 1024; + int dataBlocks = 3; + int parityBlocks = 2; + int inputSize = chunkSize * dataBlocks; + byte[][] inputChunks = new byte[dataBlocks][chunkSize]; + + mockScmAllocationEcPipeline(CONTAINER_ID.get(), 1L); + mockWriteChunkResponse(mockDnEcProtocol); + mockPutBlockResponse(mockDnEcProtocol, CONTAINER_ID.get(), 1L, null); + + OzoneBucket bucket = objectStore.getVolume(VOLUME_NAME).getBucket(BUCKET_NAME); + + String keyName = "ecKey"; + try (OzoneOutputStream os = bucket.createKey(keyName, inputSize, + new ECReplicationConfig(dataBlocks, parityBlocks, ECReplicationConfig.EcCodec.RS, + chunkSize), new HashMap<>())) { + for (int i = 0; i < dataBlocks; i++) { + os.write(inputChunks[i]); + } + } + + // case1: pipeline replicaIndexes missing some data indexes, not cache + mockScmGetContainerEcPipeline(CONTAINER_ID.get(), ImmutableMap.of(DN1, 1, DN2, 2, DN4, 4)); + bucket.getKey(keyName); + verify(mockScmContainerClient, times(1)) + .getContainerWithPipelineBatch(newHashSet(CONTAINER_ID.get())); + bucket.getKey(keyName); + verify(mockScmContainerClient, times(2)) + .getContainerWithPipelineBatch(newHashSet(CONTAINER_ID.get())); + + // case2: pipeline replicaIndexes contain all data indexes, cache + mockScmGetContainerEcPipeline(CONTAINER_ID.get(), ImmutableMap.of(DN1, 1, DN2, 2, DN3, 3, DN4, 4)); + bucket.getKey(keyName); + verify(mockScmContainerClient, times(3)) + .getContainerWithPipelineBatch(newHashSet(CONTAINER_ID.get())); + bucket.getKey(keyName); + verify(mockScmContainerClient, times(3)) + .getContainerWithPipelineBatch(newHashSet(CONTAINER_ID.get())); + } + private void mockPutBlockResponse(XceiverClientSpi mockDnProtocol, long containerId, long localId, byte[] data) @@ -639,6 +707,22 @@ private void mockScmAllocationOnDn1(long containerID, .thenReturn(Collections.singletonList(block)); } + private void mockScmAllocationEcPipeline(long containerID, long localId) + throws IOException { + ContainerBlockID blockId = new ContainerBlockID(containerID, localId); + AllocatedBlock block = new AllocatedBlock.Builder() + .setPipeline(createEcPipeline(ImmutableMap.of(DN1, 1, DN2, 2, DN3, 3, DN4, 4, DN5, 5))) + .setContainerBlockID(blockId) + .build(); + when(mockScmBlockLocationProtocol + .allocateBlock(anyLong(), anyInt(), + any(ECReplicationConfig.class), + anyString(), + any(ExcludeList.class), + anyString())) + .thenReturn(Collections.singletonList(block)); + } + private void mockScmGetContainerPipeline(long containerId, DatanodeDetails dn) throws IOException { @@ -668,6 +752,20 @@ private void mockScmGetContainerPipelineEmpty(long containerId) newHashSet(containerId))).thenReturn(containerWithPipelines); } + private void mockScmGetContainerEcPipeline(long containerId, Map indexes) + throws IOException { + Pipeline pipeline = createEcPipeline(indexes); + ContainerInfo containerInfo = new ContainerInfo.Builder() + .setContainerID(containerId) + .setPipelineID(pipeline.getId()).build(); + List containerWithPipelines = + Collections.singletonList( + new ContainerWithPipeline(containerInfo, pipeline)); + + when(mockScmContainerClient.getContainerWithPipelineBatch( + newHashSet(containerId))).thenReturn(containerWithPipelines); + } + private void mockGetBlock(XceiverClientGrpc mockDnProtocol, long containerId, long localId, byte[] data, @@ -766,4 +864,14 @@ private static Pipeline createPipeline(List nodes) { .setNodes(nodes) .build(); } + + private static Pipeline createEcPipeline(Map indexes) { + return Pipeline.newBuilder() + .setState(Pipeline.PipelineState.OPEN) + .setId(PipelineID.randomId()) + .setReplicationConfig(new ECReplicationConfig(3, 2)) + .setReplicaIndexes(indexes) + .setNodes(new ArrayList<>(indexes.keySet())) + .build(); + } } From 3dffac1806696fee349e0b697d38075c6d9e5509 Mon Sep 17 00:00:00 2001 From: wanghongbing Date: Sat, 17 Aug 2024 11:25:49 +0800 Subject: [PATCH 4/6] improve compare logic --- .../ozone/om/TestOmContainerLocationCache.java | 4 ++-- .../java/org/apache/hadoop/ozone/om/ScmClient.java | 12 +++++------- 2 files changed, 7 insertions(+), 9 deletions(-) diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmContainerLocationCache.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmContainerLocationCache.java index 4ea4c1b48a2..ee5c6c667f2 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmContainerLocationCache.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmContainerLocationCache.java @@ -624,7 +624,7 @@ public void containerRefreshedOnInsufficientEcPipelines() throws Exception { } } - // case1: pipeline replicaIndexes missing some data indexes, not cache + // case1: pipeline replicaIndexes missing some data indexes, should not cache mockScmGetContainerEcPipeline(CONTAINER_ID.get(), ImmutableMap.of(DN1, 1, DN2, 2, DN4, 4)); bucket.getKey(keyName); verify(mockScmContainerClient, times(1)) @@ -633,7 +633,7 @@ public void containerRefreshedOnInsufficientEcPipelines() throws Exception { verify(mockScmContainerClient, times(2)) .getContainerWithPipelineBatch(newHashSet(CONTAINER_ID.get())); - // case2: pipeline replicaIndexes contain all data indexes, cache + // case2: pipeline replicaIndexes contain all data indexes, should cache mockScmGetContainerEcPipeline(CONTAINER_ID.get(), ImmutableMap.of(DN1, 1, DN2, 2, DN3, 3, DN4, 4)); bucket.getKey(keyName); verify(mockScmContainerClient, times(3)) diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ScmClient.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ScmClient.java index ffe14c8fe10..5bfb4b540a5 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ScmClient.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ScmClient.java @@ -35,11 +35,9 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; -import java.util.stream.IntStream; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_CONTAINER_LOCATION_CACHE_SIZE; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_CONTAINER_LOCATION_CACHE_SIZE_DEFAULT; @@ -55,7 +53,6 @@ public class ScmClient { private final StorageContainerLocationProtocol containerClient; private final LoadingCache containerLocationCache; private final CacheMetrics containerCacheMetrics; - private static final Map> EC_DATA_INDEXES = new HashMap<>(); ScmClient(ScmBlockLocationProtocol blockClient, StorageContainerLocationProtocol containerClient, @@ -126,13 +123,14 @@ public Map getContainerLocations(Iterable containerIds, if (pipeline.isEmpty()) { return true; } - // filter insufficient EC pipelines + // filter insufficient EC pipelines which missing any data index ReplicationConfig repConfig = pipeline.getReplicationConfig(); if (repConfig instanceof ECReplicationConfig) { int d = ((ECReplicationConfig) repConfig).getData(); - Set dataIndexes = EC_DATA_INDEXES.computeIfAbsent(d, k -> - IntStream.rangeClosed(1, d).boxed().collect(Collectors.toSet())); - return !pipeline.getReplicaIndexes().values().containsAll(dataIndexes); + List indexes = pipeline.getReplicaIndexes().values().stream() + .sorted() + .collect(Collectors.toList()); + return !(indexes.size() >= d && indexes.get(d - 1) == d); } return false; }) From 17b54688bcd7edc9df6851555bd960470606677a Mon Sep 17 00:00:00 2001 From: wanghongbing Date: Sun, 18 Aug 2024 17:22:44 +0800 Subject: [PATCH 5/6] distinct index --- .../src/main/java/org/apache/hadoop/ozone/om/ScmClient.java | 1 + 1 file changed, 1 insertion(+) diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ScmClient.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ScmClient.java index 5bfb4b540a5..b4c8676c8fd 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ScmClient.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ScmClient.java @@ -129,6 +129,7 @@ public Map getContainerLocations(Iterable containerIds, int d = ((ECReplicationConfig) repConfig).getData(); List indexes = pipeline.getReplicaIndexes().values().stream() .sorted() + .distinct() .collect(Collectors.toList()); return !(indexes.size() >= d && indexes.get(d - 1) == d); } From 71618f5b1800bd25ff62cc24425cf2ce8de4baf7 Mon Sep 17 00:00:00 2001 From: wanghongbing Date: Wed, 21 Aug 2024 15:33:12 +0800 Subject: [PATCH 6/6] simply contains index logic --- .../java/org/apache/hadoop/ozone/om/ScmClient.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ScmClient.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ScmClient.java index b4c8676c8fd..e64a32a45e5 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ScmClient.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ScmClient.java @@ -127,11 +127,11 @@ public Map getContainerLocations(Iterable containerIds, ReplicationConfig repConfig = pipeline.getReplicationConfig(); if (repConfig instanceof ECReplicationConfig) { int d = ((ECReplicationConfig) repConfig).getData(); - List indexes = pipeline.getReplicaIndexes().values().stream() - .sorted() - .distinct() - .collect(Collectors.toList()); - return !(indexes.size() >= d && indexes.get(d - 1) == d); + for (int i = 1; i <= d; i++) { + if (!pipeline.getReplicaIndexes().containsValue(i)) { + return true; + } + } } return false; })