Skip to content

Commit

Permalink
Merge pull request #50 from Myyyvothrr/main
Browse files Browse the repository at this point in the history
async reader: filter files in target location
  • Loading branch information
Myyyvothrr authored Aug 29, 2023
2 parents 01c7fae + d3d2883 commit f6276f7
Showing 1 changed file with 74 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Random;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -64,6 +62,11 @@ public class AsyncCollectionReader {

private int debugCount = 25;

/**
* If a target location is specified, documents in the source directory that already exist in the target are skipped automatically
*/
private String targetLocation = null;

public AsyncCollectionReader(String folder, String ending) {
this(folder, ending, 25, -1, false, "", false, null);
}
Expand Down Expand Up @@ -117,11 +120,15 @@ public AsyncCollectionReader(String folder, String ending, int debugCount, int s
}

public AsyncCollectionReader(String folder, String ending, int debugCount, int sampleSize, DUUI_ASYNC_COLLECTION_READER_SAMPLE_MODE sampleMode, String savePath, boolean bAddMetadata, String language, int skipSmallerFiles) {
this(folder, ending, debugCount, getRandomFromMode(sampleMode, sampleSize), getSortFromMode(sampleMode), savePath, bAddMetadata, language, skipSmallerFiles);
this(folder, ending, debugCount, getRandomFromMode(sampleMode, sampleSize), getSortFromMode(sampleMode), savePath, bAddMetadata, language, skipSmallerFiles, null, "");
}

public AsyncCollectionReader(String folder, String ending, int debugCount, int iRandom, boolean bSort, String savePath, boolean bAddMetadata, String language) {
this(folder, ending, debugCount, iRandom, bSort, savePath, bAddMetadata, language, 0);
this(folder, ending, debugCount, iRandom, bSort, savePath, bAddMetadata, language, 0, null, "");
}

public AsyncCollectionReader(String folder, String ending, int debugCount, int iRandom, boolean bSort, String savePath, boolean bAddMetadata, String language, String targetLocation, String targetEnding) {
this(folder, ending, debugCount, iRandom, bSort, savePath, bAddMetadata, language, 0, targetLocation, targetEnding);
}

/***
Expand All @@ -135,9 +142,10 @@ public AsyncCollectionReader(String folder, String ending, int debugCount, int i
* @param bAddMetadata Add metadata to the documents
* @param language Add language to the documents
* @param skipSmallerFiles Skip files smaller than this value in bytes
* @param targetLocation If a target location is specified, documents in the source directory that already exist in the target are skipped automatically
*/
public AsyncCollectionReader(String folder, String ending, int debugCount, int iRandom, boolean bSort, String savePath, boolean bAddMetadata, String language, int skipSmallerFiles) {

public AsyncCollectionReader(String folder, String ending, int debugCount, int iRandom, boolean bSort, String savePath, boolean bAddMetadata, String language, int skipSmallerFiles, String targetLocation, String targetEnding) {
this.targetLocation = targetLocation;
_addMetadata = bAddMetadata;
_language = language;
_filePaths = new ConcurrentLinkedQueue<>();
Expand Down Expand Up @@ -205,6 +213,12 @@ else if(iRandom>0){
}
}

// remove files that are already in the target location
// NOTE we do this after saving the file list, as we do not want to change anything but only avoid processing files multiple times
if (this.targetLocation != null) {
_filePaths = removeIfInTarget(_filePaths, this.targetLocation, targetEnding, this._path, ending);
}

_filePathsBackup.addAll(_filePaths);

this.debugCount = debugCount;
Expand Down Expand Up @@ -452,6 +466,59 @@ else if (n < 0){
return rQueue;
}

/***
* Removes files that are present in the target location
* @param paths List of paths
* @param targetLocation Target location where to check for files
* @return A new queue without files that are present in the target location
*/
public static ConcurrentLinkedQueue<String> removeIfInTarget(ConcurrentLinkedQueue<String> paths, String targetLocation, String targetEnding, String sourceLocation, String sourceEnding){
System.out.println("Chacking target location for files: " + targetLocation);
ConcurrentLinkedQueue<String> targetFilePaths = new ConcurrentLinkedQueue<>();
File targetDir = new File(targetLocation);
if (!targetDir.exists()) {
// This might not be an error, e.g. if it is the first run
System.err.println("The targetLocation " + targetFilePaths + " does not exist! Continuing without removing files from target location.");
}
else if (targetDir.exists() && !targetDir.isDirectory()) {
throw new RuntimeException("The targetLocation " + targetFilePaths + " is not a directory!");
}
else {
addFilesToConcurrentList(targetDir, targetEnding, targetFilePaths);
}
System.out.println("Found " + targetFilePaths.size() + " files in target location");

List<String> cleanList = new ArrayList<>();
if (!targetFilePaths.isEmpty()) {
System.out.println("Checking against " + targetFilePaths.size() + " files in target location");
Set<String> existingFiles = targetFilePaths.stream()
.map(Paths::get)
.filter(Files::isRegularFile)
.map(f -> targetDir.toPath().relativize(f).toString())
.map(f -> f.replaceAll(targetEnding, ""))
.map(f -> f.replaceAll(sourceEnding, ""))
.collect(Collectors.toSet());

Path sourceDir = Paths.get(sourceLocation);
for (String f : paths) {
Path p = Paths.get(f);
String fn = sourceDir.relativize(p).toString();
fn = fn.replaceAll(sourceEnding, "");
boolean found = existingFiles.contains(fn);
if (!found) {
cleanList.add(f);
}
}
}
else {
System.out.println("No files in target location found, keeping all files from source location");
cleanList.addAll(paths);
}
System.out.println("Removed " + (paths.size() - cleanList.size()) + " files from source location that are already present in target location, keeping " + cleanList.size() + " files");

return new ConcurrentLinkedQueue<>(cleanList);
}

public static String getSize(String sPath){
return FileUtils.byteCountToDisplaySize(new File(sPath).length());
}
Expand Down

0 comments on commit f6276f7

Please sign in to comment.