Skip to content

Commit

Permalink
Merge branch 'HDDS-10239-container-reconciliation' into HDDS-11253-co…
Browse files Browse the repository at this point in the history
…rrupt-files

* HDDS-10239-container-reconciliation: (61 commits)
  HDDS-11081. Use thread-local instance of FileSystem in Freon tests (apache#7091)
  HDDS-11333. Avoid hard-coded current version in upgrade/xcompat tests (apache#7089)
  Mark TestPipelineManagerMXBean#testPipelineInfo as flaky
  Mark TestAddRemoveOzoneManager#testForceBootstrap as flaky
  HDDS-11352. HDDS-11353. Mark TestOzoneManagerHAWithStoppedNodes as flaky
  HDDS-11354. Mark TestOzoneManagerSnapshotAcl#testLookupKeyWithNotAllowedUserForPrefixAcl as flaky
  HDDS-11355. Mark TestMultiBlockWritesWithDnFailures#testMultiBlockWritesWithIntermittentDnFailures as flaky
  HDDS-11227. Use server default key provider to encrypt/decrypt keys from multiple OMs. (apache#7081)
  HDDS-11316. Improve Create Key and Chunk IO Dashboards (apache#7075)
  HDDS-11239. Fix KeyOutputStream's exception handling when calling hsync concurrently (apache#7047)
  HDDS-11254. Reconcile commands should be handled by datanode ReplicationSupervisor (apache#7076)
  HDDS-11331. Fix Datanode unable to report for a long time (apache#7090)
  HDDS-11346. FS CLI gives incorrect recursive volume deletion prompt (apache#7102)
  HDDS-11349. Add NullPointer handling when volume/bucket tables are not initialized (apache#7103)
  HDDS-11209. Avoid insufficient EC pipelines in the container pipeline cache (apache#6974)
  HDDS-11284. refactor quota repair non-blocking while upgrade (apache#7035)
  HDDS-9790. Add tests for Overview page (apache#6983)
  HDDS-10904. [hsync] Enable PutBlock piggybacking and incremental chunk list by default (apache#7074)
  HDDS-11322. [hsync] Block ECKeyOutputStream from calling hsync and hflush (apache#7098)
  HDDS-11325. Intermittent failure in TestBlockOutputStreamWithFailures#testContainerClose (apache#7099)
  ...

Conflicts:
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/ContainerChecksumTreeManager.java
  • Loading branch information
errose28 committed Aug 28, 2024
2 parents 7bd832f + 5cd6a07 commit 426beac
Show file tree
Hide file tree
Showing 261 changed files with 10,899 additions and 2,702 deletions.
57 changes: 57 additions & 0 deletions hadoop-hdds/client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,67 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd">
<groupId>org.apache.ozone</groupId>
<artifactId>hdds-common</artifactId>
</dependency>
<dependency>
<groupId>org.apache.ozone</groupId>
<artifactId>hdds-config</artifactId>
</dependency>
<dependency>
<groupId>org.apache.ozone</groupId>
<artifactId>hdds-erasurecode</artifactId>
</dependency>
<dependency>
<groupId>org.apache.ozone</groupId>
<artifactId>hdds-interface-client</artifactId>
</dependency>

<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>org.apache.ratis</groupId>
<artifactId>ratis-client</artifactId>
</dependency>
<dependency>
<groupId>org.apache.ratis</groupId>
<artifactId>ratis-common</artifactId>
</dependency>
<dependency>
<groupId>org.apache.ratis</groupId>
<artifactId>ratis-grpc</artifactId>
</dependency>
<dependency>
<groupId>org.apache.ratis</groupId>
<artifactId>ratis-proto</artifactId>
</dependency>
<dependency>
<groupId>org.apache.ratis</groupId>
<artifactId>ratis-thirdparty-misc</artifactId>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>

<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>

<dependency>
<groupId>io.opentracing</groupId>
<artifactId>opentracing-api</artifactId>
</dependency>
<dependency>
<groupId>io.opentracing</groupId>
<artifactId>opentracing-util</artifactId>
</dependency>

<dependency>
<groupId>jakarta.annotation</groupId>
<artifactId>jakarta.annotation-api</artifactId>
</dependency>

<!-- Test dependencies -->
<dependency>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.scm;

import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.RaftClientReply;

/**
* Client side error injector allowing simulating receiving errors from server side.
*/
@FunctionalInterface
public interface ErrorInjector {
RaftClientReply getResponse(ContainerProtos.ContainerCommandRequestProto request, ClientId id, Pipeline pipeline);
}
Original file line number Diff line number Diff line change
Expand Up @@ -248,21 +248,29 @@ public enum ChecksumCombineMode {
private String fsDefaultBucketLayout = "FILE_SYSTEM_OPTIMIZED";

@Config(key = "incremental.chunk.list",
defaultValue = "false",
defaultValue = "true",
type = ConfigType.BOOLEAN,
description = "Client PutBlock request can choose incremental chunk " +
"list rather than full chunk list to optimize performance. " +
"Critical to HBase.",
"Critical to HBase. EC does not support this feature.",
tags = ConfigTag.CLIENT)
private boolean incrementalChunkList = true;

@Config(key = "stream.putblock.piggybacking",
defaultValue = "false",
defaultValue = "true",
type = ConfigType.BOOLEAN,
description = "Allow PutBlock to be piggybacked in WriteChunk " +
"requests if the chunk is small.",
tags = ConfigTag.CLIENT)
private boolean enablePutblockPiggybacking = false;
private boolean enablePutblockPiggybacking = true;

@Config(key = "key.write.concurrency",
defaultValue = "1",
description = "Maximum concurrent writes allowed on each key. " +
"Defaults to 1 which matches the behavior before HDDS-9844. " +
"For unlimited write concurrency, set this to -1 or any negative integer value.",
tags = ConfigTag.CLIENT)
private int maxConcurrentWritePerKey = 1;

@PostConstruct
public void validate() {
Expand Down Expand Up @@ -485,4 +493,12 @@ public void setIncrementalChunkList(boolean enable) {
public boolean getIncrementalChunkList() {
return this.incrementalChunkList;
}

public void setMaxConcurrentWritePerKey(int maxConcurrentWritePerKey) {
this.maxConcurrentWritePerKey = maxConcurrentWritePerKey;
}

public int getMaxConcurrentWritePerKey() {
return this.maxConcurrentWritePerKey;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.scm;

import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.scm.client.ClientTrustManager;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.utils.IOUtils;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneSecurityUtil;

import java.io.IOException;

/**
* Factory for XceiverClientSpi implementations. Client instances are not cached.
*/
public class XceiverClientCreator implements XceiverClientFactory {
private static ErrorInjector errorInjector;

public static void enableErrorInjection(ErrorInjector injector) {
errorInjector = injector;
}

private final ConfigurationSource conf;
private final boolean topologyAwareRead;
private final ClientTrustManager trustManager;
private final boolean securityEnabled;

public XceiverClientCreator(ConfigurationSource conf) {
this(conf, null);
}

public XceiverClientCreator(ConfigurationSource conf, ClientTrustManager trustManager) {
this.conf = conf;
this.securityEnabled = OzoneSecurityUtil.isSecurityEnabled(conf);
topologyAwareRead = conf.getBoolean(
OzoneConfigKeys.OZONE_NETWORK_TOPOLOGY_AWARE_READ_KEY,
OzoneConfigKeys.OZONE_NETWORK_TOPOLOGY_AWARE_READ_DEFAULT);
this.trustManager = trustManager;
if (securityEnabled) {
Preconditions.checkNotNull(trustManager);
}
}

public boolean isSecurityEnabled() {
return securityEnabled;
}

protected XceiverClientSpi newClient(Pipeline pipeline) throws IOException {
XceiverClientSpi client;
switch (pipeline.getType()) {
case RATIS:
client = XceiverClientRatis.newXceiverClientRatis(pipeline, conf, trustManager, errorInjector);
break;
case STAND_ALONE:
client = new XceiverClientGrpc(pipeline, conf, trustManager);
break;
case EC:
client = new ECXceiverClientGrpc(pipeline, conf, trustManager);
break;
case CHAINED:
default:
throw new IOException("not implemented " + pipeline.getType());
}
try {
client.connect();
} catch (Exception e) {
throw new IOException(e);
}
return client;
}

@Override
public XceiverClientSpi acquireClient(Pipeline pipeline) throws IOException {
return acquireClient(pipeline, false);
}

@Override
public void releaseClient(XceiverClientSpi xceiverClient, boolean invalidateClient) {
releaseClient(xceiverClient, invalidateClient, false);
}

@Override
public XceiverClientSpi acquireClientForReadData(Pipeline pipeline) throws IOException {
return acquireClient(pipeline);
}

@Override
public void releaseClientForReadData(XceiverClientSpi xceiverClient, boolean invalidateClient) {
releaseClient(xceiverClient, invalidateClient, topologyAwareRead);
}

@Override
public XceiverClientSpi acquireClient(Pipeline pipeline, boolean topologyAware) throws IOException {
return newClient(pipeline);
}

@Override
public void releaseClient(XceiverClientSpi xceiverClient, boolean invalidateClient, boolean topologyAware) {
IOUtils.closeQuietly(xceiverClient);
}

@Override
public void close() throws Exception {
// clients are not tracked, closing each client is the responsibility of users of this class
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,53 @@
*/
public interface XceiverClientFactory extends AutoCloseable {

/**
* Acquires a XceiverClientSpi connected to a container capable of
* storing the specified key. It does not consider the topology
* of the datanodes in the pipeline (e.g. closest datanode to the
* client)
*
* @param pipeline the container pipeline for the client connection
* @return XceiverClientSpi connected to a container
* @throws IOException if a XceiverClientSpi cannot be acquired
*/
XceiverClientSpi acquireClient(Pipeline pipeline) throws IOException;

void releaseClient(XceiverClientSpi xceiverClient, boolean invalidateClient);
/**
* Releases a XceiverClientSpi after use.
*
* @param client client to release
* @param invalidateClient if true, invalidates the client in cache
*/
void releaseClient(XceiverClientSpi client, boolean invalidateClient);

/**
* Acquires a XceiverClientSpi connected to a container for read.
*
* @param pipeline the container pipeline for the client connection
* @return XceiverClientSpi connected to a container
* @throws IOException if a XceiverClientSpi cannot be acquired
*/
XceiverClientSpi acquireClientForReadData(Pipeline pipeline)
throws IOException;

void releaseClientForReadData(XceiverClientSpi xceiverClient,
/**
* Releases a read XceiverClientSpi after use.
*
* @param client client to release
* @param invalidateClient if true, invalidates the client in cache
*/
void releaseClientForReadData(XceiverClientSpi client,
boolean invalidateClient);

/**
* Acquires a XceiverClientSpi connected to a container capable of
* storing the specified key.
*
* @param pipeline the container pipeline for the client connection
* @return XceiverClientSpi connected to a container
* @throws IOException if a XceiverClientSpi cannot be acquired
*/
XceiverClientSpi acquireClient(Pipeline pipeline, boolean topologyAware)
throws IOException;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@
import org.apache.hadoop.ozone.ClientVersion;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneConsts;
import java.util.concurrent.TimeoutException;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
Expand Down Expand Up @@ -649,14 +648,6 @@ private void reconnect(DatanodeDetails dn)
}
}

@Override
public XceiverClientReply watchForCommit(long index)
throws InterruptedException, ExecutionException, TimeoutException,
IOException {
// there is no notion of watch for commit index in standalone pipeline
return null;
}

@Override
public long getReplicatedMinCommitIndex() {
return 0;
Expand Down
Loading

0 comments on commit 426beac

Please sign in to comment.