-
Notifications
You must be signed in to change notification settings - Fork 9
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merge join #428
Merge join #428
Conversation
I now have a mostly functional merge join implementation. There was a bit of work getting it working in qEndpoint, especially since I needed to migrate the code from RDF4J 4.x to 5.x. A big change is that the checked exception support in the CloseableIteration has been removed. To try out merge join you first have to checkout the branch in this PR here: eclipse-rdf4j/rdf4j#4822 and run: Then you can build the branch of this PR to try it out in qEndpoint. I've assumed that all data is sorted by subject, since the default index in HDT is SPO. I've also assumed that we can do a string comparison on the values. |
qendpoint-store/src/main/java/com/the_qa_company/qendpoint/store/experimental/QEPSailStore.java
Outdated
Show resolved
Hide resolved
Hello, the implementation in the package com/the_qa_company/qendpoint/store/experimental isn't actually the one we're using, this one is a prototype for a future store. The one we are currently using is inside the store package, EndpointStore is the sail class and EndpointStoreConnection the connection. Then, inside the store, we are using custom Value, they're all implementing HDTValue
then main difference is that you can get a numerical ID from the element inside the HDT file. Another big issue is that the iterators resolved from a HDT file aren't "sorted" by string, but by the ID inside the HDT. For the subjects and objects it's to compress the shared elements between these 2 sections and for the predicate section it is due to the fact that the HDT string sort is done on the UTF8 bytes and not on the string (see this issue rdfhdt/hdt-java#177) |
I'll take a look and add the new methods to the non-experimental store. If predicates have a different comparator to the subject/object/context then that could be a bit problematic. During the SPARQL evaluation we use a binding set where we only retain the variable name and value. Even if we did retain if the value was originally a subject, predicate or object we would still run into problems when the variable is a predicate in one statement and a subject in another. |
For predicate vs subject/object part, I think you can ignore or consider the two streams as non easily "mergeable". I'm not triple store user, but I don't think this case is used a lot. |
qendpoint-store/src/main/java/com/the_qa_company/qendpoint/store/EndpointTripleSource.java
Outdated
Show resolved
Hide resolved
@ate47 is there any chance that we could have a PSOC index? It would be great for the fairly common pattern |
@hmottestad: @ate47 is working on this since friday #429. We are on it ... just we need some more time .... (and it is holiday today in france ...) |
0b2a0a9
to
d90cb0f
Compare
public QueryCloseableIterator search(CharSequence subject, CharSequence predicate, CharSequence object) | ||
throws QEPCoreException { | ||
QEPDatasetContext ctx = createContext(); | ||
return search(ctx, subject, predicate, object).attach(ctx); | ||
return (QueryCloseableIterator) search(ctx, subject, predicate, object).attach(ctx); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This may not be the correct fix, but it was the best I could manage when migrating to RDF4J 5.0.0.
public class QEPDataset implements Closeable, Serializable { | ||
|
||
@Serial | ||
private static final long serialVersionUID = 7525689572432598258L; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are some requirements for serializable classes in RDF4J that were not explicitly tested for. From what I remember when migrating the code to RDF4J 5.0.0 this class is used transitively from within a query plan because it is used in QEPComponent which is used in QEPCoreBNode which is used in StatementPattern through Var.
@SafeVarargs | ||
public static QueryCloseableIterator of(QueryCloseableIterator it, | ||
AutoCloseableGeneric<QEPCoreException>... closeables) { | ||
AutoCloseableGeneric<? extends RuntimeException>... closeables) { | ||
Objects.requireNonNull(it, "it can't be null!"); | ||
if (closeables.length == 0) { | ||
return it; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Again I'm not sure if this is the correct approach to fixing the issues when migrating to RDF4J 5.0.0.
@@ -472,6 +474,7 @@ public void load(InputStream input, ControlInfo ci, ProgressListener listener) t | |||
|
|||
@Override | |||
public void mapFromFile(CountInputStream input, File f, ProgressListener listener) throws IOException { | |||
log.info("Mapping BitmapTriples from {}", f.getName()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I found this useful during the application startup to see that the application is not hanging but in fact loading a large file.
|
||
/** | ||
* @author mario.arias | ||
*/ | ||
public class BitmapTriplesIterator implements SuppliableIteratorTripleID { | ||
public class BitmapTriplesIterator implements SuppliableIteratorTripleID, IndexReportingIterator { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've added a new interface to RDF4J which allows the query explanation to include the name of the index used in StatementPattern.
@@ -400,8 +400,7 @@ public LinkedSail<? extends NotifyingSail> compileNode(Value node) throws SailCo | |||
*/ | |||
public Value searchOne(Resource subject, IRI predicate) throws SailCompilerException { | |||
Value out; | |||
try (CloseableIteration<? extends Statement, SailCompilerException> it = connection.getStatements(subject, | |||
predicate, null)) { | |||
try (CloseableIteration<? extends Statement> it = connection.getStatements(subject, predicate, null)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In RDF4J 5.0.0 the CloseableIteration interface and all other iterators/iterations no longer support generic declaration of exceptions. You can still throw an exception, but it can no longer be declared and thus it needs to be a RuntimeException.
@@ -13,7 +13,7 @@ static int compare(HDTValue v1, HDTValue v2) { | |||
return c; | |||
} | |||
|
|||
return Long.compare(v1.getHDTPosition(), v2.getHDTPosition()); | |||
return Long.compare(v1.getHDTId(), v2.getHDTId()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The HDTPosition is checked above, so I assume that the ID should be compared if the positions are equal.
if (o instanceof IRI) { | ||
return toString().equals(o.toString()); | ||
} else { | ||
return false; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A small performance improvement.
private volatile boolean isMerging = false; | ||
|
||
public boolean isMergeTriggered = false; | ||
public volatile boolean isMergeTriggered = false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These needed to be volatile for me to read updated values from a different thread.
@@ -760,7 +760,7 @@ public void markDeletedTempTriples() throws IOException { | |||
try (InputStream inputStream = new FileInputStream(endpointFiles.getTempTriples())) { | |||
RDFParser rdfParser = Rio.createParser(RDFFormat.NTRIPLES); | |||
rdfParser.getParserConfig().set(BasicParserSettings.VERIFY_URI_SYNTAX, false); | |||
try (GraphQueryResult res = QueryResults.parseGraphBackground(inputStream, null, rdfParser, null)) { | |||
try (GraphQueryResult res = QueryResults.parseGraphBackground(inputStream, null, rdfParser)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The last argument to this RDF4J method has been removed in RDF4J 5.0.0.
@@ -128,15 +128,17 @@ protected CloseableIteration<? extends BindingSet, QueryEvaluationException> eva | |||
new DisjunctiveConstraintOptimizer().optimize(tupleExpr, dataset, bindings); | |||
new SameTermFilterOptimizer().optimize(tupleExpr, dataset, bindings); | |||
new QueryModelNormalizerOptimizer().optimize(tupleExpr, dataset, bindings); | |||
new QueryJoinOptimizer(evaluationStatistics).optimize(tupleExpr, dataset, bindings); | |||
new QueryJoinOptimizer(evaluationStatistics, tripleSource).optimize(tupleExpr, dataset, bindings); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Including the tripleSource here is what allows the QueryJoinOptimizer to check the possible statement orders from the underlying store.
QueryEvaluationStep precompile = strategy.precompile(tupleExpr); | ||
|
||
return precompile.evaluate(bindings); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here is also a small performance fix that uses the precompile stage in RDF4J. I think this was introduced in RDF4J 4.0.0.
CloseableIteration<? extends Statement> repositoryResult1 = this.endpointStoreConnection.getConnA_read() | ||
.getStatements(newSubj, newPred, newObj, false, contexts); | ||
CloseableIteration<? extends Statement> repositoryResult2 = this.endpointStoreConnection.getConnB_read() | ||
.getStatements(newSubj, newPred, newObj, false, contexts); | ||
repositoryResult = new CombinedNativeStoreResult(repositoryResult1, repositoryResult2); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If there is a failure when getting repositoryResult2 the repositoryResult1 would not be closed. I've fixed similar issues in RDF4J and just realised that the same could happen here. I've not tried to fix the code though, but just wanted to point it out now that I saw it.
} | ||
|
||
@Override | ||
public Set<StatementOrder> getSupportedOrders(Resource subj, IRI pred, Value obj, Resource... contexts) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Most of the code here was copied from the getStatements method.
@@ -29,7 +24,7 @@ public class HDTConverter { | |||
public static final String HDT_URI = "http://hdt.org/"; | |||
private final EndpointStore endpoint; | |||
private final HDT hdt; | |||
private final ValueFactory valueFactory = new MemValueFactory(); | |||
private final ValueFactory valueFactory = SimpleValueFactory.getInstance(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Performance improvement since we don't need the deduplication aspects of the MemValueFactory.
Var var1 = new Var(var.getName(), converter.idToHDTValue(id, position), var.isAnonymous(), | ||
var.isConstant()); | ||
var.replaceWith(var1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In order to reduce bugs in RDF4J we now require that Var objects are not reused.
//@formatter:off | ||
|
||
//package com.the_qa_company.qendpoint; | ||
// | ||
//import com.the_qa_company.qendpoint.core.options.HDTOptions; | ||
//import com.the_qa_company.qendpoint.core.options.HDTOptionsKeys; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've commented this class out since it requires the WikiData HDT files.
// public EndpointSPARQL11QueryComplianceTest(String displayName, String testURI, String name, String queryFileURL, | ||
// String resultFileURL, Dataset dataset, boolean ordered, boolean laxCardinality) | ||
// throws ParserException, NotFoundException, IOException { | ||
// super(displayName, testURI, name, queryFileURL, resultFileURL, null, ordered, laxCardinality); | ||
// setUpHDT(dataset); | ||
// List<String> testToIgnore = new ArrayList<>(); | ||
// // @todo these tests are failing and should not, they are skipped so | ||
// // that we can be sure that we see when | ||
// // currently passing tests are not failing. Many of these tests are not | ||
// // so problematic since we do not support | ||
// // named graphs anyway | ||
// testToIgnore.add("constructwhere02 - CONSTRUCT WHERE"); | ||
// testToIgnore.add("constructwhere03 - CONSTRUCT WHERE"); | ||
// testToIgnore.add("constructwhere04 - CONSTRUCT WHERE"); | ||
// testToIgnore.add("Exists within graph pattern"); | ||
// testToIgnore.add("(pp07) Path with one graph"); | ||
// testToIgnore.add("(pp35) Named Graph 2"); | ||
// testToIgnore.add("sq01 - Subquery within graph pattern"); | ||
// testToIgnore.add("sq02 - Subquery within graph pattern, graph variable is bound"); | ||
// testToIgnore.add("sq03 - Subquery within graph pattern, graph variable is not bound"); | ||
// testToIgnore.add("sq04 - Subquery within graph pattern, default graph does not apply"); | ||
// testToIgnore.add("sq05 - Subquery within graph pattern, from named applies"); | ||
// testToIgnore.add("sq06 - Subquery with graph pattern, from named applies"); | ||
// testToIgnore.add("sq07 - Subquery with from "); | ||
// testToIgnore.add("sq11 - Subquery limit per resource"); | ||
// testToIgnore.add("sq13 - Subqueries don't inject bindings"); | ||
// testToIgnore.add("sq14 - limit by resource"); | ||
// | ||
// this.setIgnoredTests(testToIgnore); | ||
// } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The test class in RDF4J has changed quite a bit in RDF4J 5.0.0 and there is no longer a constructor that provides the dataset. I've simply commented out the code since I wasn't sure how to fix it in the RDF4J code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be good to understand how it works before merging
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I took some time to figure out the RDF4J side of things and I've created a method that gets called with all the test arguments so that we can listen in and read the dataset variable to set up the HDT store.
I just merged it into RDF4J now, so it should be available within 24 hours from the snapshot repo.
while (store.isMergeTriggered || store.isMerging()) { | ||
Thread.onSpinWait(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is where we read the two variables I had to make volatile.
@@ -150,7 +152,7 @@ public static Statement getFakeStatement(ValueFactory vf, int id) { | |||
|
|||
private static void writeBigIndex(File file) throws IOException { | |||
ValueFactory vf = new MemValueFactory(); | |||
try (FileOutputStream out = new FileOutputStream(file)) { | |||
try (OutputStream out = new BufferedOutputStream(new FileOutputStream(file))) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Performance improvement for tests.
<repositories> | ||
<repository> | ||
<id>oss.sonatype.org-snapshot</id> | ||
<url>https://oss.sonatype.org/content/repositories/snapshots</url> | ||
<releases> | ||
<enabled>false</enabled> | ||
</releases> | ||
<snapshots> | ||
<enabled>true</enabled> | ||
</snapshots> | ||
</repository> | ||
</repositories> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have note published a new milestone build of RDF4J 5.0.0 yet, so we need to use the snapshots repo in the meantime.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fine for me except the small parts I commented
qendpoint-core/pom.xml
Outdated
<groupId>org.eclipse.rdf4j</groupId> | ||
<artifactId>rdf4j-common-iterator</artifactId> | ||
<version>${rdf4j.version}</version> | ||
</dependency> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Our project was to leave the core, which is a clone of the rdfhdt/hdt-java repository as close as possible as the original library. Is this mandatory to add RDF4J to it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've managed to move the code to the qEndpoint store model, so we don't need to reference RDF4J from the core model anymore.
* This flag can be set to false in order to disable the use of merge join. | ||
* This can be useful for comparing performance. | ||
*/ | ||
private static boolean ENABLE_MERGE_JOIN = true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would be better to use the config of the endpoint with a key.
You can access the options using
HDTOptions spec = endpoint.getHDTSpec();
enableMergeJoin = spec.getBoolean("qendpoint.mergejoin", false);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. I've changed it use the HDT Spec. I've made it true
as default.
// public EndpointSPARQL11QueryComplianceTest(String displayName, String testURI, String name, String queryFileURL, | ||
// String resultFileURL, Dataset dataset, boolean ordered, boolean laxCardinality) | ||
// throws ParserException, NotFoundException, IOException { | ||
// super(displayName, testURI, name, queryFileURL, resultFileURL, null, ordered, laxCardinality); | ||
// setUpHDT(dataset); | ||
// List<String> testToIgnore = new ArrayList<>(); | ||
// // @todo these tests are failing and should not, they are skipped so | ||
// // that we can be sure that we see when | ||
// // currently passing tests are not failing. Many of these tests are not | ||
// // so problematic since we do not support | ||
// // named graphs anyway | ||
// testToIgnore.add("constructwhere02 - CONSTRUCT WHERE"); | ||
// testToIgnore.add("constructwhere03 - CONSTRUCT WHERE"); | ||
// testToIgnore.add("constructwhere04 - CONSTRUCT WHERE"); | ||
// testToIgnore.add("Exists within graph pattern"); | ||
// testToIgnore.add("(pp07) Path with one graph"); | ||
// testToIgnore.add("(pp35) Named Graph 2"); | ||
// testToIgnore.add("sq01 - Subquery within graph pattern"); | ||
// testToIgnore.add("sq02 - Subquery within graph pattern, graph variable is bound"); | ||
// testToIgnore.add("sq03 - Subquery within graph pattern, graph variable is not bound"); | ||
// testToIgnore.add("sq04 - Subquery within graph pattern, default graph does not apply"); | ||
// testToIgnore.add("sq05 - Subquery within graph pattern, from named applies"); | ||
// testToIgnore.add("sq06 - Subquery with graph pattern, from named applies"); | ||
// testToIgnore.add("sq07 - Subquery with from "); | ||
// testToIgnore.add("sq11 - Subquery limit per resource"); | ||
// testToIgnore.add("sq13 - Subqueries don't inject bindings"); | ||
// testToIgnore.add("sq14 - limit by resource"); | ||
// | ||
// this.setIgnoredTests(testToIgnore); | ||
// } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be good to understand how it works before merging
9af4b87
to
74f46d4
Compare
@ate47 could you run the tests. I'm seeing a test failure locally, but I'm not sure why it's failing. |
Issue resolved (if any): #
Description of this pull request:
Please check all the lines before posting the pull request:
mvn formatter:format
on the backend,npm run format
on the frontend) before posting my pull request,mvn formatter:validate
to validate the formatting on the backend,npm run validate
on the frontend