Skip to content

Commit

Permalink
Clean temp atomic (#141)
Browse files Browse the repository at this point in the history
* Update README.md

* clean tmp graphs in case of error and on startup

* relax jacoco

* relax jacoco

* elastic delete old instance after harvester
  • Loading branch information
ndc-dxc authored Oct 2, 2024
1 parent d6e1c0a commit 111d083
Show file tree
Hide file tree
Showing 8 changed files with 112 additions and 46 deletions.
4 changes: 3 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
plugins {
id 'org.springframework.boot' version '3.3.2'
id 'org.springframework.boot' version '3.3.4'
id 'io.spring.dependency-management' version '1.1.6'
id 'java'
id 'checkstyle'
Expand Down Expand Up @@ -42,6 +42,8 @@ dependencies {

implementation 'org.mapstruct:mapstruct:1.5.5.Final'

implementation group: 'com.google.protobuf', name: 'protobuf-java', version: '3.25.5'

implementation 'org.springframework.data:spring-data-elasticsearch'
implementation 'org.apache.jena:apache-jena-libs:4.9.0'
implementation 'org.apache.jena:jena-querybuilder:4.9.0'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import it.gov.innovazione.ndc.harvester.SemanticAssetType;
import it.gov.innovazione.ndc.harvester.model.Instance;
import it.gov.innovazione.ndc.harvester.model.index.SemanticAssetMetadata;
import it.gov.innovazione.ndc.repository.SemanticAssetMetadataDeleter;
import it.gov.innovazione.ndc.repository.SemanticAssetMetadataRepository;
import org.elasticsearch.client.RestClient;
import org.jetbrains.annotations.NotNull;
Expand All @@ -27,6 +28,7 @@ public class SemanticAssetMetadataRepositoryIntegrationTest {
public static final int INDEX_COUNT = 2;
public static final int ASSET_COUNT = 5;
private static SemanticAssetMetadataRepository repository;
private static SemanticAssetMetadataDeleter deleter;
private static ElasticsearchOperations elasticsearchOperations;

@BeforeAll
Expand All @@ -35,7 +37,8 @@ public static void beforeAll() {
elasticsearchOperations = buildElasticsearchOps();
InMemoryInstanceManager instanceManager = new InMemoryInstanceManager();
instanceManager.setAllInstances(Instance.PRIMARY);
repository = new SemanticAssetMetadataRepository(elasticsearchOperations, instanceManager);
deleter = new SemanticAssetMetadataDeleter(elasticsearchOperations);
repository = new SemanticAssetMetadataRepository(elasticsearchOperations, deleter, instanceManager);
}

@NotNull
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import it.gov.innovazione.ndc.harvester.model.exception.InvalidModelException;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import org.apache.commons.lang3.tuple.Pair;
import lombok.extern.slf4j.Slf4j;
import org.apache.jena.rdf.model.Property;
import org.apache.jena.rdf.model.Resource;
import org.apache.jena.rdf.model.Statement;
Expand All @@ -17,10 +17,13 @@
import static it.gov.innovazione.ndc.harvester.model.BaseSemanticAssetModel.maybeThrowInvalidModelException;
import static it.gov.innovazione.ndc.harvester.model.SemanticAssetModelValidationContext.NO_VALIDATION;
import static java.lang.String.format;
import static java.util.stream.Collectors.groupingBy;
import static java.util.stream.Collectors.mapping;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toMap;

@NoArgsConstructor(access = AccessLevel.PRIVATE)
@Slf4j
public class LiteralExtractor {

public static String extractOptional(Resource mainResource, Property versionInfo) {
Expand Down Expand Up @@ -58,10 +61,26 @@ public static String extract(Resource resource, Property property, SemanticAsset
}

public static Map<String, String> extractAllLanguages(Resource resource, Property property) {
return resource.listProperties(property).toList().stream()
Map<String, List<String>> collect = resource.listProperties(property).toList().stream()
.filter(s -> s.getObject().isLiteral())
.map(s -> Pair.of(s.getLanguage(), s.getString()))
.collect(toMap(Pair::getLeft, Pair::getRight));
.collect(groupingBy(
Statement::getLanguage,
mapping(Statement::getString, toList())));

// log if any entry has more than one value
collect.entrySet().stream()
.filter(e -> e.getValue().size() > 1)
.forEach(e -> {
String message = format("Found multiple values for language '%s' in property '%s' for resource '%s': %s",
e.getKey(), property, resource, e.getValue());
log.warn(message);
});

return collect
.entrySet().stream()
.collect(toMap(
Map.Entry::getKey,
e -> e.getValue().stream().sorted().toList().get(0)));
}

public static List<String> extractAll(Resource resource, Property property) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package it.gov.innovazione.ndc.repository;

import it.gov.innovazione.ndc.harvester.model.Instance;
import it.gov.innovazione.ndc.harvester.model.index.SemanticAssetMetadata;
import lombok.RequiredArgsConstructor;
import org.springframework.data.elasticsearch.core.ElasticsearchOperations;
import org.springframework.stereotype.Service;

@Service
@RequiredArgsConstructor
public class SemanticAssetMetadataDeleter {
private final ElasticsearchOperations esOps;

public long deleteByRepoUrl(String repoUrl, Instance instance) {
return esOps.delete(
SemanticAssetMetadataQuery.getDeleteQueryBuilder(repoUrl, instance)
.build(),
SemanticAssetMetadata.class).getDeleted();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package it.gov.innovazione.ndc.repository;

import co.elastic.clients.elasticsearch._types.FieldValue;
import co.elastic.clients.elasticsearch._types.query_dsl.BoolQuery;
import co.elastic.clients.elasticsearch._types.query_dsl.Query;
import co.elastic.clients.elasticsearch._types.query_dsl.QueryVariant;
import co.elastic.clients.elasticsearch._types.query_dsl.TermsQuery;
import it.gov.innovazione.ndc.harvester.model.Instance;
import org.springframework.data.elasticsearch.client.elc.NativeQuery;
import org.springframework.data.elasticsearch.core.query.DeleteQuery;

import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Stream;

public class SemanticAssetMetadataQuery {
static BoolQuery getBoolQueryForRepo(String url, Instance instance) {
List<Query> termsQueries = Stream.of(
termsQuery("repoUrl", url),
termsQuery("instance", instance.name()))
.map(QueryVariant::_toQuery)
.toList();

return BoolQuery.of(bq -> bq.must(termsQueries));
}

static TermsQuery termsQuery(String field, String value) {
return termsQuery(field, Set.of(value));
}

static TermsQuery termsQuery(String field, Set<String> values) {
return TermsQuery.of(t -> t.field(field).terms(
terms -> terms.value(values.stream()
.filter(Objects::nonNull)
.map(FieldValue::of)
.toList())));
}

static DeleteQuery.Builder getDeleteQueryBuilder(String repoUrl, Instance instance) {
return DeleteQuery.builder(
NativeQuery.builder()
.withQuery(getBoolQueryForRepo(repoUrl, instance)._toQuery())
.build());
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package it.gov.innovazione.ndc.repository;

import co.elastic.clients.elasticsearch._types.FieldValue;
import co.elastic.clients.elasticsearch._types.query_dsl.BoolQuery;
import co.elastic.clients.elasticsearch._types.query_dsl.MatchQuery;
import co.elastic.clients.elasticsearch._types.query_dsl.Query;
Expand All @@ -19,17 +18,14 @@
import org.springframework.data.elasticsearch.core.SearchHit;
import org.springframework.data.elasticsearch.core.SearchHits;
import org.springframework.data.elasticsearch.core.SearchPage;
import org.springframework.data.elasticsearch.core.query.DeleteQuery;
import org.springframework.stereotype.Repository;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static it.gov.innovazione.ndc.harvester.SemanticAssetType.CONTROLLED_VOCABULARY;
import static java.util.Objects.nonNull;
Expand All @@ -41,6 +37,7 @@
@Slf4j
public class SemanticAssetMetadataRepository {
private final ElasticsearchOperations esOps;
private final SemanticAssetMetadataDeleter semanticAssetMetadataDeleter;
private final InstanceManager instanceManager;

public SearchPage<SemanticAssetMetadata> search(String queryPattern, Set<String> types,
Expand Down Expand Up @@ -74,7 +71,7 @@ private Optional<BoolQuery> getConditionForInstances() {
List<RepositoryInstance> currentRepoInstances = instanceManager.getCurrentInstances();

List<Query> repoInstanceQueries = currentRepoInstances.stream()
.map(repositoryInstance -> getBoolQueryForRepo(
.map(repositoryInstance -> SemanticAssetMetadataQuery.getBoolQueryForRepo(
repositoryInstance.getUrl(),
repositoryInstance.getInstance()))
.map(QueryVariant::_toQuery)
Expand All @@ -87,38 +84,14 @@ private Optional<BoolQuery> getConditionForInstances() {
return Optional.of(BoolQuery.of(bq -> bq.should(repoInstanceQueries)));
}

private BoolQuery getBoolQueryForRepo(String url, Instance instance) {
List<Query> termsQueries = Stream.of(
termsQuery("repoUrl", url),
termsQuery("instance", instance.name()))
.map(QueryVariant::_toQuery)
.toList();

return BoolQuery.of(bq -> bq.must(termsQueries));
}


private List<TermsQuery> getQueriesForParams(Set<String> types, Set<String> themes, Set<String> rightsHolder) {
return Map.of("type", types, "themes", themes, "agencyId", rightsHolder).entrySet().stream()
.filter(e -> nonNull(e.getValue()))
.filter(e -> !e.getValue().isEmpty())
.map(e -> termsQuery(e.getKey(), e.getValue()))
.map(e -> SemanticAssetMetadataQuery.termsQuery(e.getKey(), e.getValue()))
.toList();
}

private TermsQuery termsQuery(String field, String value) {
return termsQuery(field, Set.of(value));
}


private TermsQuery termsQuery(String field, Set<String> values) {
return TermsQuery.of(t -> t.field(field).terms(
terms -> terms.value(values.stream()
.filter(Objects::nonNull)
.map(FieldValue::of)
.toList())));
}

public Optional<SemanticAssetMetadata> findByIri(String iri) {
List<Query> queries = new ArrayList<>();
queries.add(termQuery("iri", iri)._toQuery());
Expand All @@ -138,13 +111,7 @@ public Optional<SemanticAssetMetadata> findByIri(String iri) {
}

public long deleteByRepoUrl(String repoUrl, Instance instance) {
return esOps.delete(
DeleteQuery.builder(
NativeQuery.builder()
.withQuery(getBoolQueryForRepo(repoUrl, instance)._toQuery())
.build())
.build(),
SemanticAssetMetadata.class).getDeleted();
return semanticAssetMetadataDeleter.deleteByRepoUrl(repoUrl, instance);
}

public void save(SemanticAssetMetadata metadata) {
Expand All @@ -156,7 +123,7 @@ public List<SemanticAssetMetadata> findVocabulariesForRepoUrl(String repoUrl, In
bq -> bq.must(
List.of(
termQuery("repoUrl", repoUrl)._toQuery(),
termsQuery("instance", instance.name())._toQuery(),
SemanticAssetMetadataQuery.termsQuery("instance", instance.name())._toQuery(),
termQuery("type", CONTROLLED_VOCABULARY.name())._toQuery())));

NativeQuery query = NativeQuery.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import it.gov.innovazione.ndc.harvester.model.Instance;
import it.gov.innovazione.ndc.harvester.service.RepositoryService;
import it.gov.innovazione.ndc.model.harvester.Repository;
import it.gov.innovazione.ndc.repository.SemanticAssetMetadataDeleter;
import it.gov.innovazione.ndc.repository.TripleStoreRepository;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Service;
Expand All @@ -21,6 +22,7 @@ public class DefaultInstanceManager implements InstanceManager {
private final ConfigService configService;
private final RepositoryService repositoryService;
private final TripleStoreRepository tripleStoreRepository;
private final SemanticAssetMetadataDeleter deleter;

public Instance getNextOnlineInstance(String repoUrl) {
Optional<Repository> repository = repositoryService.findActiveRepoByUrl(repoUrl);
Expand All @@ -45,7 +47,12 @@ public Instance getCurrentInstance(Repository repository) {

public void switchInstances(Repository repository) {
// switch instance on Repositories
configService.writeConfigKey(ACTIVE_INSTANCE, "system", getNextOnlineInstance(repository), repository.getId());
Instance newInstance = getNextOnlineInstance(repository);
configService.writeConfigKey(ACTIVE_INSTANCE, "system", newInstance, repository.getId());

Instance instanceToDelete = newInstance.switchInstance();

deleter.deleteByRepoUrl(repository.getUrl(), instanceToDelete);

// switch instance on Virtuoso
tripleStoreRepository.switchInstances(repository);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ class SemanticAssetMetadataRepositoryTest {

@InjectMocks
private SemanticAssetMetadataRepository repository;
@InjectMocks
private SemanticAssetMetadataDeleter deleter;

@Test
void shouldFindById() {
Expand Down Expand Up @@ -87,7 +89,7 @@ void shouldDeleteByIri() {

when(esOps.delete(captor.capture(), any(Class.class))).thenReturn(deletedResponse);

long deleteCount = repository.deleteByRepoUrl("someRepoUrl", Instance.PRIMARY);
long deleteCount = deleter.deleteByRepoUrl("someRepoUrl", Instance.PRIMARY);

assertThat(deleteCount).isEqualTo(1);
Query query = captor.getValue().getQuery();
Expand Down

0 comments on commit 111d083

Please sign in to comment.