Skip to content

Commit

Permalink
Merge branch 'master' into HDDS-10239-container-reconciliation
Browse files Browse the repository at this point in the history
* master: (181 commits)
  HDDS-11289. Bump docker-maven-plugin to 0.45.0 (apache#7024)
  HDDS-11287. Code cleanup in XceiverClientSpi (apache#7043)
  HDDS-11283. Refactor KeyValueStreamDataChannel to avoid spurious IDE build issues (apache#7040)
  HDDS-11257. Ozone write does not work when http proxy is set for the JVM. (apache#7036)
  HDDS-11249. Bump ozone-runner to 20240729-jdk17-1 (apache#7003)
  HDDS-10517. Recon - Add a UI component for showing DN decommissioning detailed status and info (apache#6724)
  HDDS-11270. [hsync] Add DN layout version (HBASE_SUPPORT/version 8) upgrade test. (apache#7021)
  HDDS-11272. Statistics some node status information (apache#7025)
  HDDS-11278. Move code out of Hadoop util package (apache#7028)
  HDDS-11274. (addendum) Replace Hadoop annotations/configs with Ozone-specific ones
  HDDS-11274. Replace Hadoop annotations/configs with Ozone-specific ones (apache#7026)
  HDDS-11280. Add Synchronize in AbstractCommitWatcher.addAckDataLength (apache#7032)
  HDDS-11235. Spare InfoBucket RPC call in FileSystem#mkdir() call. (apache#6990)
  HDDS-11273. Bump commons-compress to 1.26.2 (apache#7023)
  HDDS-11225. Increase ipc.server.read.threadpool.size (apache#7007)
  HDDS-11224. Increase hdds.datanode.handler.count (apache#7011)
  HDDS-11259. [hsync] DataNode should verify HBASE_SUPPORT layout version for every PutBlock. (apache#7012)
  HDDS-11214. Added config to set rocksDB's max log file size and num of log files (apache#7014)
  HDDS-11226. Make ExponentialBackoffPolicy maxRetries configurable (apache#6985)
  HDDS-11260. [hsync] Add Ozone Manager protocol version (apache#7015)
  ...

Conflicts:
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/audit/DNAction.java
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto
hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java
  • Loading branch information
errose28 committed Aug 7, 2024
2 parents ab35173 + 51ba4c8 commit 98369a8
Show file tree
Hide file tree
Showing 388 changed files with 12,544 additions and 2,844 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ public final class ContainerClientMetrics {
private MutableQuantiles[] getCommittedBlockLengthLatency;
private MutableQuantiles[] readChunkLatency;
private MutableQuantiles[] getSmallFileLatency;
private MutableQuantiles[] hsyncLatencyNs;
private MutableQuantiles[] omHsyncLatencyNs;
private MutableQuantiles[] datanodeHsyncLatencyNs;
private final Map<PipelineID, MutableCounterLong> writeChunkCallsByPipeline;
private final Map<PipelineID, MutableCounterLong> writeChunkBytesByPipeline;
private final Map<UUID, MutableCounterLong> writeChunksCallsByLeaders;
Expand Down Expand Up @@ -96,6 +99,9 @@ private ContainerClientMetrics() {
getCommittedBlockLengthLatency = new MutableQuantiles[3];
readChunkLatency = new MutableQuantiles[3];
getSmallFileLatency = new MutableQuantiles[3];
hsyncLatencyNs = new MutableQuantiles[3];
omHsyncLatencyNs = new MutableQuantiles[3];
datanodeHsyncLatencyNs = new MutableQuantiles[3];
int[] intervals = {60, 300, 900};
for (int i = 0; i < intervals.length; i++) {
int interval = intervals[i];
Expand All @@ -119,6 +125,18 @@ private ContainerClientMetrics() {
.newQuantiles("getSmallFileLatency" + interval
+ "s", "GetSmallFile latency in microseconds", "ops",
"latency", interval);
hsyncLatencyNs[i] = registry
.newQuantiles("hsyncLatency" + interval
+ "s", "client hsync latency in nanoseconds", "ops",
"latency", interval);
omHsyncLatencyNs[i] = registry
.newQuantiles("omHsyncLatency" + interval
+ "s", "client hsync latency to OM in nanoseconds", "ops",
"latency", interval);
datanodeHsyncLatencyNs[i] = registry
.newQuantiles("dnHsyncLatency" + interval
+ "s", "client hsync latency to DN in nanoseconds", "ops",
"latency", interval);
}
}

Expand Down Expand Up @@ -155,6 +173,14 @@ public void addListBlockLatency(long latency) {
}
}

public void addHsyncLatency(long hsyncLatencyTime) {
for (MutableQuantiles q : hsyncLatencyNs) {
if (q != null) {
q.add(hsyncLatencyTime);
}
}
}

public void addGetBlockLatency(long latency) {
for (MutableQuantiles q : getBlockLatency) {
if (q != null) {
Expand All @@ -163,6 +189,14 @@ public void addGetBlockLatency(long latency) {
}
}

public void addOMHsyncLatency(long hsyncLatencyTime) {
for (MutableQuantiles q : omHsyncLatencyNs) {
if (q != null) {
q.add(hsyncLatencyTime);
}
}
}

public void addGetCommittedBlockLengthLatency(long latency) {
for (MutableQuantiles q : getCommittedBlockLengthLatency) {
if (q != null) {
Expand All @@ -187,6 +221,14 @@ public void addGetSmallFileLatency(long latency) {
}
}

public void addDataNodeHsyncLatency(long hsyncLatencyTime) {
for (MutableQuantiles q : datanodeHsyncLatencyNs) {
if (q != null) {
q.add(hsyncLatencyTime);
}
}
}

@VisibleForTesting
public MutableCounterLong getTotalWriteChunkBytes() {
return totalWriteChunkBytes;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,23 @@ public enum ChecksumCombineMode {
tags = ConfigTag.CLIENT)
private String fsDefaultBucketLayout = "FILE_SYSTEM_OPTIMIZED";

@Config(key = "incremental.chunk.list",
defaultValue = "false",
type = ConfigType.BOOLEAN,
description = "Client PutBlock request can choose incremental chunk " +
"list rather than full chunk list to optimize performance. " +
"Critical to HBase.",
tags = ConfigTag.CLIENT)
private boolean incrementalChunkList = true;

@Config(key = "stream.putblock.piggybacking",
defaultValue = "false",
type = ConfigType.BOOLEAN,
description = "Allow PutBlock to be piggybacked in WriteChunk " +
"requests if the chunk is small.",
tags = ConfigTag.CLIENT)
private boolean enablePutblockPiggybacking = false;

@PostConstruct
public void validate() {
Preconditions.checkState(streamBufferSize > 0);
Expand Down Expand Up @@ -445,11 +462,27 @@ public String getFsDefaultBucketLayout() {
return fsDefaultBucketLayout;
}

public void setEnablePutblockPiggybacking(boolean enablePutblockPiggybacking) {
this.enablePutblockPiggybacking = enablePutblockPiggybacking;
}

public boolean getEnablePutblockPiggybacking() {
return enablePutblockPiggybacking;
}

public boolean isDatastreamPipelineMode() {
return datastreamPipelineMode;
}

public void setDatastreamPipelineMode(boolean datastreamPipelineMode) {
this.datastreamPipelineMode = datastreamPipelineMode;
}

public void setIncrementalChunkList(boolean enable) {
this.incrementalChunkList = enable;
}

public boolean getIncrementalChunkList() {
return this.incrementalChunkList;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -174,10 +174,9 @@ private synchronized void connectToDatanode(DatanodeDetails dn)
OzoneConfigKeys.HDDS_CONTAINER_IPC_PORT_DEFAULT);
}

// Add credential context to the client call
if (LOG.isDebugEnabled()) {
LOG.debug("Nodes in pipeline : {}", pipeline.getNodes());
LOG.debug("Connecting to server : {}", dn.getIpAddress());
LOG.debug("Connecting to server : {}; nodes in pipeline : {}, ",
dn, pipeline.getNodes());
}
ManagedChannel channel = createChannel(dn, port).build();
XceiverClientProtocolServiceStub asyncStub =
Expand All @@ -191,6 +190,7 @@ protected NettyChannelBuilder createChannel(DatanodeDetails dn, int port)
NettyChannelBuilder channelBuilder =
NettyChannelBuilder.forAddress(dn.getIpAddress(), port).usePlaintext()
.maxInboundMessageSize(OzoneConsts.OZONE_SCM_CHUNK_MAX_SIZE)
.proxyDetector(uri -> null)
.intercept(new GrpcClientInterceptor());
if (secConfig.isSecurityEnabled() && secConfig.isGrpcTlsEnabled()) {
SslContextBuilder sslContextBuilder = GrpcSslContexts.forClient();
Expand Down Expand Up @@ -320,7 +320,6 @@ public ContainerCommandResponseProto sendCommand(
/**
* @param request
* @param dn
* @param pipeline
* In case of getBlock for EC keys, it is required to set replicaIndex for
* every request with the replicaIndex for that DN for which the request is
* sent to. This method unpacks proto and reconstructs request after setting
Expand All @@ -345,8 +344,7 @@ public ContainerCommandResponseProto sendCommand(
ContainerCommandRequestProto request, List<Validator> validators)
throws IOException {
try {
XceiverClientReply reply;
reply = sendCommandWithTraceIDAndRetry(request, validators);
XceiverClientReply reply = sendCommandWithTraceIDAndRetry(request, validators);
return reply.getResponse().get();
} catch (ExecutionException e) {
throw getIOExceptionForSendCommand(request, e);
Expand Down Expand Up @@ -490,7 +488,7 @@ private XceiverClientReply sendCommandWithRetry(
LOG.debug(message + " on the pipeline {}.",
processForDebug(request), pipeline);
} else {
LOG.error(message + " on the pipeline {}.",
LOG.warn(message + " on the pipeline {}.",
request.getCmdType(), pipeline);
}
throw ioException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
import static org.apache.hadoop.hdds.conf.ConfigTag.PERFORMANCE;
import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes.NO_REPLICA_FOUND;

import org.apache.hadoop.util.CacheMetrics;
import org.apache.hadoop.ozone.util.CacheMetrics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import org.apache.hadoop.metrics2.lib.MetricsRegistry;
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.util.PerformanceMetrics;
import org.apache.hadoop.ozone.util.PerformanceMetrics;

import java.util.EnumMap;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,8 @@ public long updateCommitInfosMap(
// been replicating data successfully.
} else {
stream = commitInfoProtos.stream().map(proto -> commitInfoMap
.computeIfPresent(RatisHelper.toDatanodeId(proto.getServer()),
(address, index) -> proto.getCommitIndex()))
.computeIfPresent(RatisHelper.toDatanodeId(proto.getServer()),
(address, index) -> proto.getCommitIndex()))
.filter(Objects::nonNull);
}
if (level == ReplicationLevel.ALL_COMMITTED) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;

/**
* This class executes watchForCommit on ratis pipeline and releases
Expand All @@ -62,7 +63,7 @@ abstract class AbstractCommitWatcher<BUFFER> {

private final XceiverClientSpi client;

private long totalAckDataLength;
private final AtomicLong totalAckDataLength = new AtomicLong();

AbstractCommitWatcher(XceiverClientSpi client) {
this.client = client;
Expand All @@ -80,12 +81,11 @@ synchronized void updateCommitInfoMap(long index, List<BUFFER> buffers) {

/** @return the total data which has been acknowledged. */
long getTotalAckDataLength() {
return totalAckDataLength;
return totalAckDataLength.get();
}

long addAckDataLength(long acked) {
totalAckDataLength += acked;
return totalAckDataLength;
return totalAckDataLength.addAndGet(acked);
}

/**
Expand Down Expand Up @@ -141,8 +141,7 @@ XceiverClientReply watchForCommit(long commitIndex)
try {
final XceiverClientReply reply = client.watchForCommit(commitIndex);
f.complete(reply);
final CompletableFuture<XceiverClientReply> removed
= replies.remove(commitIndex);
final CompletableFuture<XceiverClientReply> removed = replies.remove(commitIndex);
Preconditions.checkState(removed == f);

final long index = reply != null ? reply.getLogIndex() : 0;
Expand All @@ -166,7 +165,7 @@ List<BUFFER> remove(long i) {
/** Release the buffers for the given index. */
abstract void releaseBuffers(long index);

void adjustBuffers(long commitIndex) {
synchronized void adjustBuffers(long commitIndex) {
commitIndexMap.keySet().stream()
.filter(p -> p <= commitIndex)
.forEach(this::releaseBuffers);
Expand Down
Loading

0 comments on commit 98369a8

Please sign in to comment.