From 8945924bdccc3db1b942d523ca77031f31d5282d Mon Sep 17 00:00:00 2001 From: Piotr Wielgolaski Date: Mon, 10 Jan 2022 14:42:51 +0100 Subject: [PATCH] fix: handle properly removal of information point it was failing, leading to connection leaks --- .../io/InformationPointDTO.java | 22 ++++++ .../io/InformationPointYamlDTO.java | 28 ++++++++ .../kubernetes/KubernetesController.java | 25 +++---- .../kubernetes/KubernetesControllerTest.java | 72 ++++++++++++++++++- 4 files changed, 131 insertions(+), 16 deletions(-) diff --git a/james-agent-io/src/main/java/com/tomtom/james/store/informationpoints/io/InformationPointDTO.java b/james-agent-io/src/main/java/com/tomtom/james/store/informationpoints/io/InformationPointDTO.java index f866ec3..13d394f 100644 --- a/james-agent-io/src/main/java/com/tomtom/james/store/informationpoints/io/InformationPointDTO.java +++ b/james-agent-io/src/main/java/com/tomtom/james/store/informationpoints/io/InformationPointDTO.java @@ -3,6 +3,7 @@ import com.fasterxml.jackson.annotation.JsonIgnore; import com.tomtom.james.common.api.informationpoint.InformationPoint; import com.tomtom.james.common.api.informationpoint.Metadata; +import java.util.Objects; public abstract class InformationPointDTO { @@ -35,4 +36,25 @@ public InformationPointDTO withMethodReference(String classAndMethodName){ public String getMethodReference(){ return className+"!"+methodName; } + + @Override + public boolean equals(final Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + final InformationPointDTO that = (InformationPointDTO)o; + return Objects.equals(className, that.className) && Objects.equals(methodName, that.methodName) + && Objects.equals(sampleRate, that.sampleRate) && Objects.equals(successSampleRate, + that.successSampleRate) + && Objects.equals(errorSampleRate, that.errorSampleRate) && Objects.equals( + successExecutionThreshold, that.successExecutionThreshold) && Objects.equals(metadata, that.metadata) + && Objects.equals(baseScriptPath, that.baseScriptPath) && Objects.equals(scriptPath, + that.scriptPath) + && Objects.equals(version, that.version); + } + + @Override + public int hashCode() { + return Objects.hash(className, methodName, sampleRate, successSampleRate, errorSampleRate, successExecutionThreshold, + metadata, baseScriptPath, scriptPath, version); + } } diff --git a/james-agent-io/src/main/java/com/tomtom/james/store/informationpoints/io/InformationPointYamlDTO.java b/james-agent-io/src/main/java/com/tomtom/james/store/informationpoints/io/InformationPointYamlDTO.java index c2356d0..2023ab7 100644 --- a/james-agent-io/src/main/java/com/tomtom/james/store/informationpoints/io/InformationPointYamlDTO.java +++ b/james-agent-io/src/main/java/com/tomtom/james/store/informationpoints/io/InformationPointYamlDTO.java @@ -5,6 +5,7 @@ import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.ObjectIdGenerators; import com.tomtom.james.common.api.informationpoint.InformationPoint; +import java.util.Objects; @JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.ANY) @JsonInclude(JsonInclude.Include.NON_NULL) @@ -69,6 +70,19 @@ static class BaseScript { public String getScript() { return script; } + + @Override + public boolean equals(final Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + final BaseScript that = (BaseScript)o; + return Objects.equals(script, that.script); + } + + @Override + public int hashCode() { + return Objects.hash(script); + } } private String safeTrim(String script){ @@ -77,4 +91,18 @@ private String safeTrim(String script){ } return null; } + + @Override + public boolean equals(final Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + if (!super.equals(o)) return false; + final InformationPointYamlDTO that = (InformationPointYamlDTO)o; + return Objects.equals(baseScript, that.baseScript) && Objects.equals(script, that.script); + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), baseScript, script); + } } diff --git a/james-controller-kubernetes/src/main/java/com/tomtom/james/controller/kubernetes/KubernetesController.java b/james-controller-kubernetes/src/main/java/com/tomtom/james/controller/kubernetes/KubernetesController.java index 4e38cb9..5370a0f 100644 --- a/james-controller-kubernetes/src/main/java/com/tomtom/james/controller/kubernetes/KubernetesController.java +++ b/james-controller-kubernetes/src/main/java/com/tomtom/james/controller/kubernetes/KubernetesController.java @@ -49,11 +49,9 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.LinkedHashMap; import java.util.Map; import java.util.Optional; -import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; @@ -100,9 +98,8 @@ public void initialize(final JamesControllerConfiguration jamesControllerConfigu apiClient = createApiClient(configuration.getUrl(), configuration.getToken()); executor.execute(() -> { while (!Thread.interrupted()) { - try { - final Watch watch = - createConfigMapWatch(apiClient, configuration.getNamespace(), configuration.getLabels()); + try(final Watch watch = + createConfigMapWatch(apiClient, configuration.getNamespace(), configuration.getLabels())) { watchConfigMapChanges(watch, informationPointService); } catch (final Exception e) { LOG.info("Unable to setup k8s watcher", e); @@ -255,23 +252,23 @@ private void processUpdate(final String configName, final Collection difference = Maps.difference(informationPointsMap, cache); difference.entriesOnlyOnLeft() - .forEach((name, value) -> onInformationPointAdded(name, value, informationPointService)); + .forEach((name, value) -> onInformationPointAdded(value, informationPointService)); difference.entriesDiffering() - .forEach((name, value) -> onInformationPointModified(name, value.leftValue(), informationPointService)); - difference.entriesOnlyOnRight().forEach((name, value) -> onInformationPointRemoved(name, informationPointService)); + .forEach((name, value) -> onInformationPointModified(value.leftValue(), informationPointService)); + difference.entriesOnlyOnRight().forEach((name, value) -> onInformationPointRemoved(value, informationPointService)); cache.clear(); cache.putAll(informationPointsMap); } - private void onInformationPointAdded(final String methodReference, final InformationPointDTO informationPointDto, + private void onInformationPointAdded(final InformationPointDTO informationPointDto, final InformationPointService informationPointService) { final InformationPoint ip = informationPointDto.toInformationPoint(); informationPointService.addInformationPoint(ip); LOG.debug(() -> "Information point " + ip + " added"); } - private void onInformationPointModified(final String methodReference, final InformationPointDTO informationPointDto, + private void onInformationPointModified(final InformationPointDTO informationPointDto, final InformationPointService informationPointService) { final InformationPoint ip = informationPointDto.toInformationPoint(); informationPointService.removeInformationPoint(ip); @@ -280,11 +277,11 @@ private void onInformationPointModified(final String methodReference, final Info } - private void onInformationPointRemoved(final String methodReference, + private void onInformationPointRemoved(final InformationPointDTO informationPointDto, final InformationPointService informationPointService) { - final InformationPoint informationPoint = InformationPoint.builder().withMethodReference(methodReference).build(); - informationPointService.removeInformationPoint(informationPoint); - LOG.debug(() -> "Information point " + informationPoint + " removed"); + final InformationPoint ip = informationPointDto.toInformationPoint(); + informationPointService.removeInformationPoint(ip); + LOG.debug(() -> "Information point " + ip + " removed"); } } diff --git a/james-controller-kubernetes/src/test/java/com/tomtom/james/controller/kubernetes/KubernetesControllerTest.java b/james-controller-kubernetes/src/test/java/com/tomtom/james/controller/kubernetes/KubernetesControllerTest.java index 0a841e2..608e67e 100644 --- a/james-controller-kubernetes/src/test/java/com/tomtom/james/controller/kubernetes/KubernetesControllerTest.java +++ b/james-controller-kubernetes/src/test/java/com/tomtom/james/controller/kubernetes/KubernetesControllerTest.java @@ -30,10 +30,12 @@ import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import java.io.IOException; -import java.util.ArrayList; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -46,7 +48,7 @@ public class KubernetesControllerTest { private InformationPointService informationPointService; @Mock private JamesControllerConfiguration jamesControllerConfiguration; - private final List informationPoints = new ArrayList<>(); + private final List informationPoints = new CopyOnWriteArrayList<>(); private final ObjectMapper objectMapper = new ObjectMapper(); public KubernetesController startKubernetesController() { @@ -208,4 +210,70 @@ public void shouldRegisterYamlWithFiles() throws InterruptedException, IOExcepti assertThat(ip.getBaseScript()).hasValue("base"); }); } + + @Test + public void shouldRegisterAndRemoveYaml() throws InterruptedException, IOException { + final V1ConfigMap configMap = new V1ConfigMapBuilder() + .withApiVersion("v1") + .withKind("ConfigMap") + .withMetadata(new V1ObjectMetaBuilder().withName("james-test").addToLabels("app", "qa-webservice").build()) + .addToData("app.yaml", "Class!methodY:\n" + + " baseScript:\n" + + " script: base\n" + + " script: |\n" + + " boom\n" + + " version: 1") + .build(); + + final V1ConfigMap modifiedConfigMap = new V1ConfigMapBuilder() + .withApiVersion("v1") + .withKind("ConfigMap") + .withMetadata(new V1ObjectMetaBuilder().withName("james-test").addToLabels("app", "qa-webservice").build()) + .addToData("app.yaml", "Class!methodX:\n" + + " baseScript:\n" + + " script: base\n" + + " script: |\n" + + " boom\n" + + " version: 1") + .build(); + + final CountDownLatch countDownLatch = new CountDownLatch(1); + doAnswer(invocationOnMock -> { + informationPoints.add(invocationOnMock.getArgument(0, InformationPoint.class)); + countDownLatch.countDown(); + return null; + }).when(informationPointService).addInformationPoint(any(InformationPoint.class)); + final CountDownLatch countDownModifiedLatch = new CountDownLatch(1); + doAnswer(invocationOnMock -> { + informationPoints.remove(invocationOnMock.getArgument(0, InformationPoint.class)); + countDownModifiedLatch.countDown(); + return null; + }).when(informationPointService).removeInformationPoint(any(InformationPoint.class)); + + final KubernetesController controller = startKubernetesController(); + stubFor(get(urlEqualTo("/api/v1/namespaces/dev/configmaps?labelSelector=app%3Dmy-app&watch=true")) + .willReturn(aResponse() + .withStatus(200) + .withHeader("Content-Type", "application/json") + .withBody(objectMapper.writeValueAsString(new Watch.Response<>("ADDED", configMap))))); + + countDownLatch.await(3, TimeUnit.SECONDS); + // modify + stubFor(get(urlEqualTo("/api/v1/namespaces/dev/configmaps?labelSelector=app%3Dmy-app&watch=true")) + .willReturn(aResponse() + .withStatus(200) + .withHeader("Content-Type", "application/json") + .withBody(objectMapper.writeValueAsString(new Watch.Response<>("MODIFIED", modifiedConfigMap))))); + + countDownModifiedLatch.await(3, TimeUnit.SECONDS); + controller.close(); + assertThat(informationPoints) + .hasSize(1) + .allSatisfy(ip -> { + assertThat(ip.getClassName()).isEqualTo("Class"); + assertThat(ip.getMethodName()).isEqualTo("methodX"); + assertThat(ip.getScript()).hasValue("boom"); + assertThat(ip.getBaseScript()).hasValue("base"); + }); + } }