Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extract inbound tcp and websocket connection flow to separate classes, unify retrying #773

Open
wants to merge 19 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,6 @@ THE SOFTWARE.
<spotless.check.skip>false</spotless.check.skip>
<bc-version>1.79</bc-version>
<argLine>-Xms256M -Xmx256M -XX:+TieredCompilation -XX:TieredStopAtLevel=1</argLine>
<!-- TODO until we are ready to drop support for Java 11 agents -->
<maven.compiler.release>11</maven.compiler.release>
</properties>

<dependencies>
Expand Down
866 changes: 60 additions & 806 deletions src/main/java/hudson/remoting/Engine.java

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion src/main/java/hudson/remoting/Launcher.java
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
import org.jenkinsci.remoting.engine.WorkDirManager;
import org.jenkinsci.remoting.util.DurationFormatter;
import org.jenkinsci.remoting.util.PathUtils;
import org.jenkinsci.remoting.util.SSLUtils;
import org.jenkinsci.remoting.util.https.NoCheckHostnameVerifier;
import org.kohsuke.args4j.Argument;
import org.kohsuke.args4j.CmdLineException;
Expand Down Expand Up @@ -517,7 +518,7 @@ private synchronized void initialize() throws IOException {
// Initialize certificates
createX509Certificates();
try {
sslSocketFactory = Engine.getSSLSocketFactory(x509Certificates, noCertificateCheck);
sslSocketFactory = SSLUtils.getSSLSocketFactory(x509Certificates, noCertificateCheck);
} catch (GeneralSecurityException | PrivilegedActionException e) {
throw new RuntimeException(e);
}
Expand Down
41 changes: 41 additions & 0 deletions src/main/java/org/jenkinsci/remoting/engine/EndpointConnector.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package org.jenkinsci.remoting.engine;

import edu.umd.cs.findbugs.annotations.CheckForNull;
import edu.umd.cs.findbugs.annotations.Nullable;
import hudson.remoting.Channel;
import java.io.Closeable;
import java.net.URL;
import java.util.concurrent.Future;

/**
* Represents a connection to a remote endpoint to open a {@link Channel}.
* @since TODO
*/
public interface EndpointConnector extends Closeable {

/**
* @return a future to the channel to be established. Returns null if the connection cannot be established at all.
* @throws Exception
*/
@CheckForNull
Future<Channel> connect() throws Exception;

/**
* Waits until the connection can be established.
* @return true if the connection is ready, null if the connection never got ready
* @throws InterruptedException if the thread is interrupted
*/
@CheckForNull
Boolean waitUntilReady() throws InterruptedException;

/**
* @return The name of the protocol used by this connection.
*/
String getProtocol();

/**
* @return the URL of the endpoint, if {@link #waitUntilReady()} returned {@code true}.
*/
@Nullable
URL getUrl();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package org.jenkinsci.remoting.engine;

import hudson.remoting.EngineListenerSplitter;
import hudson.remoting.JarCache;
import java.security.cert.X509Certificate;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.ExecutorService;

/**
* Captures the data needed to connect to any endpoint.
* @param agentName the agent name
* @param secretKey the secret key
* @param executor the thread pool to use for handling TCP connections
* @param events the listener to log to
* @param noReconnectAfter Specifies the duration after which the connection should not be re-established.
* @param candidateCertificates the list of certificates to be used for the connection
* @param disableHttpsCertValidation whether to disable HTTPS certificate validation
* @param jarCache Where to store the jar cache
* @param proxyCredentials Credentials to use for proxy authentication, if any.
* @since TODO
*/
public record EndpointConnectorData(
String agentName,
String secretKey,
ExecutorService executor,
EngineListenerSplitter events,
Duration noReconnectAfter,
List<X509Certificate> candidateCertificates,
boolean disableHttpsCertValidation,
JarCache jarCache,
String proxyCredentials) {}
256 changes: 256 additions & 0 deletions src/main/java/org/jenkinsci/remoting/engine/InboundTCPConnector.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,256 @@
package org.jenkinsci.remoting.engine;

import edu.umd.cs.findbugs.annotations.CheckForNull;
import edu.umd.cs.findbugs.annotations.NonNull;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import hudson.remoting.Channel;
import hudson.remoting.ChannelBuilder;
import hudson.remoting.Engine;
import java.io.Closeable;
import java.io.IOException;
import java.net.Socket;
import java.net.URL;
import java.security.cert.X509Certificate;
import java.security.interfaces.RSAPublicKey;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.logging.Logger;
import java.util.stream.Collectors;
import org.jenkinsci.remoting.protocol.IOHub;
import org.jenkinsci.remoting.protocol.cert.DelegatingX509ExtendedTrustManager;
import org.jenkinsci.remoting.protocol.cert.PublicKeyMatchingX509ExtendedTrustManager;
import org.jenkinsci.remoting.protocol.impl.ConnectionRefusalException;
import org.jenkinsci.remoting.util.KeyUtils;
import org.jenkinsci.remoting.util.SSLUtils;
import org.kohsuke.accmod.Restricted;
import org.kohsuke.accmod.restrictions.NoExternalUse;

/**
* Connects to a controller using inbound TCP.
*/
@Restricted(NoExternalUse.class)
public class InboundTCPConnector implements EndpointConnector {
private static final Logger LOGGER = Logger.getLogger(InboundTCPConnector.class.getName());

private final JnlpEndpointResolver jnlpEndpointResolver;
private final List<URL> candidateUrls;
private final DelegatingX509ExtendedTrustManager agentTrustManager;
private final boolean keepAlive;
private final EndpointConnectorData data;

private URL url;
/**
* Name of the protocol that was used to successfully connect to the controller.
*/
private String protocolName;

/**
* Tracks {@link Closeable} resources that need to be closed when this connector is closed.
*/
@NonNull
private final List<Closeable> closeables = new ArrayList<>();

@Override
public URL getUrl() {
return url;
}

@SuppressFBWarnings(value = "HARD_CODE_PASSWORD", justification = "Password doesn't need to be protected.")
public InboundTCPConnector(
EndpointConnectorData data,
@NonNull List<URL> candidateUrls,
@CheckForNull DelegatingX509ExtendedTrustManager agentTrustManager,
boolean keepAlive,
@NonNull JnlpEndpointResolver jnlpEndpointResolver) {
this.data = data;
this.candidateUrls = new ArrayList<>(candidateUrls);
this.agentTrustManager = agentTrustManager;
this.keepAlive = keepAlive;
this.jnlpEndpointResolver = jnlpEndpointResolver;
}

private class EngineJnlpConnectionStateListener extends JnlpConnectionStateListener {

private final RSAPublicKey publicKey;
private final Map<String, String> headers;

public EngineJnlpConnectionStateListener(RSAPublicKey publicKey, Map<String, String> headers) {
this.publicKey = publicKey;
this.headers = headers;
}
Comment on lines +76 to +82
Copy link
Member

@jglick jglick Nov 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could be a record. (see #749)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Too bad JnlpConnectionStateListener is an abstract class (for no good reason)


@Override
public void beforeProperties(@NonNull JnlpConnectionState event) {
if (event instanceof Jnlp4ConnectionState) {
X509Certificate certificate = ((Jnlp4ConnectionState) event).getCertificate();
if (certificate != null) {
String fingerprint = KeyUtils.fingerprint(certificate.getPublicKey());
if (!KeyUtils.equals(publicKey, certificate.getPublicKey())) {
event.reject(new ConnectionRefusalException("Expecting identity " + fingerprint));
}
data.events().status("Remote identity confirmed: " + fingerprint);
}
}
}

@Override
public void afterProperties(@NonNull JnlpConnectionState event) {
event.approve();
}

@Override
public void beforeChannel(@NonNull JnlpConnectionState event) {
ChannelBuilder bldr = event.getChannelBuilder().withMode(Channel.Mode.BINARY);
if (data.jarCache() != null) {
bldr.withJarCache(data.jarCache());
}
}

@Override
public void afterChannel(@NonNull JnlpConnectionState event) {
// store the new cookie for next connection attempt
String cookie = event.getProperty(JnlpConnectionState.COOKIE_KEY);
if (cookie == null) {
headers.remove(JnlpConnectionState.COOKIE_KEY);
} else {
headers.put(JnlpConnectionState.COOKIE_KEY, cookie);
}
}
}

@Override
public Future<Channel> connect() throws Exception {
var hub = IOHub.create(data.executor());
closeables.add(hub);
var context = SSLUtils.createSSLContext(agentTrustManager);

final JnlpAgentEndpoint endpoint = RetryUtils.succeedsWithRetries(
jnlpEndpointResolver::resolve,
data.noReconnectAfter(),
data.events(),
x -> "Could not locate server among " + candidateUrls + ": " + x.getMessage());
if (endpoint == null) {
data.events().status("Could not resolve server among " + this.candidateUrls);
return null;
}
url = endpoint.getServiceUrl();

data.events()
.status(String.format(
"Agent discovery successful%n"
+ " Agent address: %s%n"
+ " Agent port: %d%n"
+ " Identity: %s",
endpoint.getHost(), endpoint.getPort(), KeyUtils.fingerprint(endpoint.getPublicKey())));
PublicKeyMatchingX509ExtendedTrustManager delegate = new PublicKeyMatchingX509ExtendedTrustManager();
RSAPublicKey publicKey = endpoint.getPublicKey();
if (publicKey != null) {
// This is so that JNLP4-connect will only connect if the public key matches
// if the public key is not published then JNLP4-connect will refuse to connect
delegate.add(publicKey);
}
this.agentTrustManager.setDelegate(delegate);

data.events().status("Handshaking");
// must be read-write
final Map<String, String> headers = new HashMap<>();
headers.put(JnlpConnectionState.CLIENT_NAME_KEY, data.agentName());
headers.put(JnlpConnectionState.SECRET_KEY, data.secretKey());
// Create the protocols that will be attempted to connect to the controller.
var clientProtocols = new JnlpProtocolHandlerFactory(data.executor())
.withIOHub(hub)
.withSSLContext(context)
.withPreferNonBlockingIO(false) // we only have one connection, prefer blocking I/O
.handlers();
var negotiatedProtocols = clientProtocols.stream()
.filter(JnlpProtocolHandler::isEnabled)
.filter(p -> endpoint.isProtocolSupported(p.getName()))
.collect(Collectors.toSet());
var serverProtocols = endpoint.getProtocols() == null ? "?" : String.join(",", endpoint.getProtocols());
LOGGER.info(buildDebugProtocolsMessage(serverProtocols, clientProtocols, negotiatedProtocols));
Comment on lines +162 to +172
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gives clearer output to users on which protocols are supported on either side of the connection.

for (var protocol : negotiatedProtocols) {
var jnlpSocket = RetryUtils.succeedsWithRetries(
() -> {
data.events().status("Connecting to " + endpoint.describe() + " using " + protocol.getName());
// default is 30 mins. See PingThread for the ping interval
final Socket s = endpoint.open(Engine.SOCKET_TIMEOUT);
s.setKeepAlive(keepAlive);
return s;
},
data.noReconnectAfter(),
data.events());
if (jnlpSocket == null) {
return null;
}
closeables.add(jnlpSocket);
try {
protocolName = protocol.getName();
return protocol.connect(
jnlpSocket, headers, new EngineJnlpConnectionStateListener(endpoint.getPublicKey(), headers));
} catch (IOException ioe) {
data.events().status("Protocol " + protocol.getName() + " failed to establish channel", ioe);
protocolName = null;
} catch (RuntimeException e) {
data.events().status("Protocol " + protocol.getName() + " encountered a runtime error", e);
protocolName = null;
}
// On failure form a new connection.
jnlpSocket.close();
closeables.remove(jnlpSocket);
}
if (negotiatedProtocols.isEmpty()) {
data.events()
.status(
"reconnect rejected",
new Exception("The server rejected the connection: None of the protocols were accepted"));
} else {
data.events()
.status(
"reconnect rejected",
new Exception("The server rejected the connection: None of the protocols are enabled"));
}
return null;
}

@NonNull
private static String buildDebugProtocolsMessage(
String serverProtocols,
List<JnlpProtocolHandler<? extends JnlpConnectionState>> clientProtocols,
Set<JnlpProtocolHandler<? extends JnlpConnectionState>> negotiatedProtocols) {
return "Protocols support: Server " + "[" + serverProtocols + "]"
+ ", Client " + "["
+ clientProtocols.stream()
.map(p -> p.getName() + (!p.isEnabled() ? " (disabled)" : ""))
.collect(Collectors.joining(","))
+ "]"
+ ", Negociated: " + "["
+ negotiatedProtocols.stream().map(JnlpProtocolHandler::getName).collect(Collectors.joining(","))
+ "]";
}

@Override
public Boolean waitUntilReady() throws InterruptedException {
jnlpEndpointResolver.waitForReady();
return true;
}

@Override
public String getProtocol() {
return protocolName;
}

@Override
public void close() {
closeables.forEach(c -> {
try {
c.close();
} catch (IOException e) {
data.events().status("Failed to close resource " + c, e);
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,10 @@ public JnlpAgentEndpoint(
this.proxyCredentials = proxyCredentials;
}

String describe() {
return getHost() + ':' + getPort();
}

/**
* Gets the socket address.
*
Expand Down
Loading