Skip to content

Commit

Permalink
harvester will stop for fatal errors (#143)
Browse files Browse the repository at this point in the history
  • Loading branch information
ndc-dxc authored Oct 3, 2024
1 parent c2f3980 commit 589d1fb
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,21 @@

import java.io.File;
import java.nio.file.Path;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;

import static it.gov.innovazione.ndc.harvester.service.ActualConfigService.ConfigKey.MAX_FILE_SIZE_BYTES;

@Slf4j
@RequiredArgsConstructor
public abstract class BaseSemanticAssetHarvester<P extends SemanticAssetPath> implements SemanticAssetHarvester {

private static final List<String> INFRASTRUCTURE_EXCEPTIONS = List.of("java.net", "org.apache.http");

private final SemanticAssetType type;
private final NdcEventPublisher eventPublisher;
private final ConfigService configService;
Expand All @@ -40,6 +45,10 @@ public SemanticAssetType getType() {
return type;
}

private static boolean isInfrastructureTypeException(Throwable cause) {
return INFRASTRUCTURE_EXCEPTIONS.stream().anyMatch(cause.getClass().getName()::contains);
}

@Override
public void harvest(Repository repository, Path rootPath) {
log.debug("Looking for {} paths", type);
Expand All @@ -58,14 +67,15 @@ public void harvest(Repository repository, Path rootPath) {
processPath(repository.getUrl(), path);
log.debug("Path {} processed correctly for {}", path, type);
} catch (SinglePathProcessingException e) {
boolean isInfrastuctureError = checkInfrastructureError(e);
Optional.ofNullable(HarvestExecutionContextUtils.getContext())
.ifPresent(context -> context.addHarvestingError(repository, e, path.getAllFiles()));
eventPublisher.publishAlertableEvent(
"harvester",
DefaultAlertableEvent.builder()
.name("Harvester Single Path Processing Error")
.description("Error processing " + type + " " + path + " in repo " + repository.getUrl())
.category(EventCategory.SEMANTIC)
.category(isInfrastuctureError ? EventCategory.INFRASTRUCTURE : EventCategory.SEMANTIC)
.severity(Severity.ERROR)
.context(Map.of(
"error", e.getRealErrorMessage(),
Expand All @@ -81,6 +91,22 @@ public void harvest(Repository repository, Path rootPath) {
}
}

private boolean checkInfrastructureError(SinglePathProcessingException e) {
// checks if in the chain of exceptions there is an infrastructure error (es. java.net, httpException, etc)
Throwable cause = e;
Set<Throwable> seen = new HashSet<>();
while (cause != null) {
if (!seen.add(cause)) {
return false;
}
if (isInfrastructureTypeException(cause)) {
return true;
}
cause = cause.getCause();
}
return false;
}

private void notifyIfSizeExceed(P path, Long maxFileSizeBytes) {
HarvestExecutionContext context = HarvestExecutionContextUtils.getContext();
if (Objects.nonNull(context) && Objects.nonNull(path)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,13 @@ public TripleStoreRepository(VirtuosoClient virtuosoClient) {
this.virtuosoClient = virtuosoClient;
}

private static String getCommandAndLog(String command) {
log.info("Update command: {}", command);
return command;
}

private static String getRenameCommand(String oldGraph, String newGraph) {
return format(RENAME_GRAPH, oldGraph, newGraph);
return getCommandAndLog(format(RENAME_GRAPH, oldGraph, newGraph));
}

private void saveWithConnection(String graphName, Model model, RDFConnection connection) {
Expand All @@ -48,7 +53,7 @@ private void saveWithConnection(String graphName, Model model, RDFConnection con
}

private static String getUpdateCommand(String repoUrl, String repoUrlPrefix) {
return format(DROP_SILENT_GRAPH_WITH_LOG_ENABLE_3, reworkRepoUrlIfNecessary(repoUrl, repoUrlPrefix));
return getCommandAndLog(format(DROP_SILENT_GRAPH_WITH_LOG_ENABLE_3, reworkRepoUrlIfNecessary(repoUrl, repoUrlPrefix)));
}

@SneakyThrows
Expand All @@ -62,11 +67,13 @@ private static String reworkRepoUrlIfNecessary(String repoUrl, String repoUrlPre
}

public void clearExistingNamedGraph(String repoUrl) {
log.info("Clearing existing named graph for {}", repoUrl);
clearExistingNamedGraph(repoUrl, ONLINE_GRAPH_PREFIX);
}

public void clearExistingNamedGraph(String repoUrl, String prefix) {
try {
log.info("Clearing existing named graph for {} with prefix {}", repoUrl, prefix);
String sparqlEndpoint = virtuosoClient.getSparqlEndpoint();
UpdateExecution
.service(sparqlEndpoint)
Expand All @@ -92,6 +99,7 @@ public void save(String graphName, Model model) {

public void switchInstances(it.gov.innovazione.ndc.model.harvester.Repository repository) {
String tmpGraphName = reworkRepoUrlIfNecessary(repository.getUrl(), TMP_GRAPH_PREFIX);
log.info("Switching instances on Virtuoso ({}, {})", repository.getUrl(), tmpGraphName);
clearExistingNamedGraph(repository.getUrl());
rename(tmpGraphName, repository.getUrl());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import it.gov.innovazione.ndc.repository.SemanticAssetMetadataDeleter;
import it.gov.innovazione.ndc.repository.TripleStoreRepository;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

import java.util.List;
Expand All @@ -17,6 +18,7 @@

@Service
@RequiredArgsConstructor
@Slf4j
public class DefaultInstanceManager implements InstanceManager {

private final ConfigService configService;
Expand Down Expand Up @@ -47,13 +49,21 @@ public Instance getCurrentInstance(Repository repository) {

public void switchInstances(Repository repository) {
// switch instance on Repositories
log.info("Switching instance for repository {}", repository.getUrl());
Instance newInstance = getNextOnlineInstance(repository);

log.info("Switching Elastic search to instance {} for repo {}", newInstance, repository.getUrl());

configService.writeConfigKey(ACTIVE_INSTANCE, "system", newInstance, repository.getId());

Instance instanceToDelete = newInstance.switchInstance();

log.info("Deleting metadata for instance {} for repo {}", instanceToDelete, repository.getUrl());

deleter.deleteByRepoUrl(repository.getUrl(), instanceToDelete);

log.info("Switching instances on Virtuoso for repo {}", repository.getUrl());

// switch instance on Virtuoso
tripleStoreRepository.switchInstances(repository);
}
Expand Down
2 changes: 1 addition & 1 deletion src/main/resources/application-local.properties
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
harvester.repositories=https://github.com/istat/ts-ontologie-vocabolari-controllati
harvester.repositories=https://github.com/FrankMaverick/Leo-OpenData

virtuoso.sparql=http://localhost:8890/sparql-auth
virtuoso.sparql-graph-store=http://localhost:8890/sparql-graph-crud-auth
Expand Down

0 comments on commit 589d1fb

Please sign in to comment.