diff --git a/qendpoint-backend/src/main/java/com/the_qa_company/qendpoint/controller/Sparql.java b/qendpoint-backend/src/main/java/com/the_qa_company/qendpoint/controller/Sparql.java index d896de4c..050fa7c3 100644 --- a/qendpoint-backend/src/main/java/com/the_qa_company/qendpoint/controller/Sparql.java +++ b/qendpoint-backend/src/main/java/com/the_qa_company/qendpoint/controller/Sparql.java @@ -426,7 +426,7 @@ void initializeEndpointStore(boolean finishLoading) throws IOException { CompiledSailOptions options = new CompiledSailOptions(); options.setPort(port); options.setEndpointThreshold(threshold); - options.setHdtSpec(hdtSpec); + options.setHdtSpec(HDTOptions.of(hdtSpec)); options.setTimeoutQuery(maxTimeoutCfg); options.setTimeoutUpdate(maxTimeoutUpdateCfg); diff --git a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/compact/bitmap/MultiLayerBitmapWrapper.java b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/compact/bitmap/MultiLayerBitmapWrapper.java index 7ad3450e..548426c4 100644 --- a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/compact/bitmap/MultiLayerBitmapWrapper.java +++ b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/compact/bitmap/MultiLayerBitmapWrapper.java @@ -19,7 +19,7 @@ private MultiLayerModBitmapWrapper(ModifiableBitmap handle, long graphs) { @Override public void set(long layer, long position, boolean value) { - ((ModifiableBitmap) handle).set(graphs * layer + position, value); + ((ModifiableBitmap) handle).set(graphs * position + layer, value); } } @@ -51,13 +51,14 @@ private MultiLayerBitmapWrapper(Bitmap handle, long graphs) { this.graphs = graphs; } - public Bitmap getHandle() { - return handle; + @SuppressWarnings("unchecked") + public T getHandle() { + return (T)handle; } @Override public boolean access(long layer, long position) { - return handle.access(graphs * layer + position); + return handle.access(graphs * position + layer); } @Override @@ -132,7 +133,7 @@ public long getLayersCount() { @Override public boolean access(long position) { - return handle.access(position); + return access(0, position); } @Override diff --git a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/iterator/utils/GraphFilteringTripleId.java b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/iterator/utils/GraphFilteringTripleId.java new file mode 100644 index 00000000..04185bac --- /dev/null +++ b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/iterator/utils/GraphFilteringTripleId.java @@ -0,0 +1,103 @@ +package com.the_qa_company.qendpoint.core.iterator.utils; + +import com.the_qa_company.qendpoint.core.enums.ResultEstimationType; +import com.the_qa_company.qendpoint.core.enums.TripleComponentOrder; +import com.the_qa_company.qendpoint.core.exceptions.NotImplementedException; +import com.the_qa_company.qendpoint.core.triples.IteratorTripleID; +import com.the_qa_company.qendpoint.core.triples.TripleID; + +public class GraphFilteringTripleId implements IteratorTripleID { + private final IteratorTripleID iterator; + private final long[] graphIds; + private TripleID next; + + public GraphFilteringTripleId(IteratorTripleID iterator, long[] graphIds) { + this.iterator = iterator; + this.graphIds = graphIds; + } + + @Override + public boolean hasPrevious() { + throw new NotImplementedException(); + } + + @Override + public TripleID previous() { + throw new NotImplementedException(); + } + + @Override + public void goToStart() { + throw new NotImplementedException(); + } + + @Override + public boolean canGoTo() { + return false; + } + + @Override + public void goTo(long pos) { + throw new NotImplementedException(); + } + + @Override + public long estimatedNumResults() { + return iterator.estimatedNumResults(); + } + + @Override + public ResultEstimationType numResultEstimation() { + return iterator.numResultEstimation(); + } + + @Override + public TripleComponentOrder getOrder() { + return iterator.getOrder(); + } + + @Override + public long getLastTriplePosition() { + return iterator.getLastTriplePosition(); + } + + @Override + public boolean isLastTriplePositionBoundToOrder() { + return iterator.isLastTriplePositionBoundToOrder(); + } + + @Override + public boolean hasNext() { + if (next != null) { + return true; + } + while (iterator.hasNext()) { + TripleID val = iterator.next(); + + long g = val.getGraph(); + for (long graphId : graphIds) { + if (graphId == g) { + next = val; + return true; + } + } + // can't find valid graph + } + return false; + } + + @Override + public TripleID next() { + if (!hasNext()) { + return null; + } + TripleID newVal = next; + next = null; + return newVal; + } + + @Override + public void remove() { + throw new NotImplementedException(); + } +} diff --git a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/options/HDTOptions.java b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/options/HDTOptions.java index 260947bf..13615376 100644 --- a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/options/HDTOptions.java +++ b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/options/HDTOptions.java @@ -30,7 +30,6 @@ import java.io.Writer; import java.nio.file.Files; import java.nio.file.Path; -import java.util.Collection; import java.util.Collections; import java.util.EnumSet; import java.util.HashSet; @@ -92,7 +91,7 @@ static HDTOptions empty() { */ static HDTOptions of(HDTOptions other) { HDTOptions opt = of(); - other.getKeys().forEach(key -> opt.set(String.valueOf(key), other.get(String.valueOf(key)))); + opt.setOptions(other); return opt; } @@ -154,6 +153,18 @@ static HDTOptions of(Object... data) { return opt; } + /** + * create modifiable options with a string config + * @param cfg config + * @return options + */ + static HDTOptions of(String cfg) { + Objects.requireNonNull(cfg, "cfg can't be null!"); + HDTOptions opt = of(); + opt.setOptions(cfg); + return opt; + } + /** * get options or {@link #EMPTY} * @@ -248,13 +259,27 @@ default Path getPath(String key, Supplier defaultValue) { /** * @return the keys of the options - * @throws NotImplementedException if the implemented class do not implement + * @throws NotImplementedException if the implementation does not implement * this method (backward compatibility) */ default Set getKeys() { throw new NotImplementedException("getKeys"); } + /** + * @return the options to be used with {@link #setOptions(String)} + */ + default String getOptions() { + StringBuilder bld = new StringBuilder(); + + for (Object k : getKeys()) { + String keyStr = String.valueOf(k); + bld.append(keyStr).append("=").append(get(keyStr)).append(";"); + } + + return bld.toString(); + } + /** * get a value * @@ -480,8 +505,7 @@ default void load(String filename) throws IOException { void set(String key, String value); /** - * set a value, same as using {@link String#valueOf(Object)} with - * {@link #set(String, String)} + * set a value, check the type of the value to serialize it. * * @param key key * @param value value @@ -495,6 +519,11 @@ default void set(String key, Object value) { set(key, p); } else if (value instanceof File f) { set(key, f.getAbsolutePath()); + } else if (value instanceof HDTOptions opt) { + for (Object optKey : opt.getKeys()) { + String optKeyStr = String.valueOf(optKey); + set(key + "." + optKeyStr, opt.get(optKeyStr)); + } } else { set(key, String.valueOf(value)); } @@ -556,6 +585,14 @@ default void setOptions(Map options) { options.forEach((k, v) -> set(String.valueOf(k), v)); } + /** + * add options from another set + * @param other other set + */ + default void setOptions(HDTOptions other) { + other.getKeys().forEach(key -> set(String.valueOf(key), other.get(String.valueOf(key)))); + } + /** * add options, each param should be in the format (key, value)* * diff --git a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/quad/impl/BitmapTriplesIteratorGraph.java b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/quad/impl/BitmapTriplesIteratorGraph.java index 7eceb0df..aa296be6 100644 --- a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/quad/impl/BitmapTriplesIteratorGraph.java +++ b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/quad/impl/BitmapTriplesIteratorGraph.java @@ -49,6 +49,7 @@ protected TripleID getNext() { return id; } // search another + tid = null; continue; } @@ -105,6 +106,11 @@ public TripleComponentOrder getOrder() { return tidIt.getOrder(); } + @Override + public boolean isLastTriplePositionBoundToOrder() { + return tidIt.isLastTriplePositionBoundToOrder(); + } + @Override public long getLastTriplePosition() { return posZ; diff --git a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/triples/impl/BitmapQuadTriples.java b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/triples/impl/BitmapQuadTriples.java index d1259350..04f260a8 100644 --- a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/triples/impl/BitmapQuadTriples.java +++ b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/triples/impl/BitmapQuadTriples.java @@ -218,28 +218,6 @@ public void save(OutputStream output, ControlInfo ci, ProgressListener listener) graphs.save(output, iListener); } - @Override - public SuppliableIteratorTripleID search(TripleID pattern) { - if (isClosed) { - throw new IllegalStateException("Cannot search on BitmapTriples if it's already closed"); - } - - if (getNumberOfElements() == 0 || pattern.isNoMatch()) { - return new EmptyTriplesIterator(order); - } - - TripleID reorderedPat = new TripleID(pattern); - TripleOrderConvert.swapComponentOrder(reorderedPat, TripleComponentOrder.SPO, order); - String patternString = reorderedPat.getPatternString(); - - if (hasFOQIndex() && patternString.equals("???G")) { - return new BitmapTriplesIteratorGraphG(this, pattern); - } - - return new BitmapTriplesIteratorGraph(this, super.search(pattern.copyNoGraph()), - pattern.isQuad() ? pattern.getGraph() : 0); - } - @Override public SuppliableIteratorTripleID search(TripleID pattern, int searchMask) { if (isClosed) { diff --git a/qendpoint-store/src/main/java/com/the_qa_company/qendpoint/compiler/CompiledSail.java b/qendpoint-store/src/main/java/com/the_qa_company/qendpoint/compiler/CompiledSail.java index 42408295..67c3e319 100644 --- a/qendpoint-store/src/main/java/com/the_qa_company/qendpoint/compiler/CompiledSail.java +++ b/qendpoint-store/src/main/java/com/the_qa_company/qendpoint/compiler/CompiledSail.java @@ -350,7 +350,7 @@ public static class CompiledSailCompiler { private NotifyingSail sourceSail; private EndpointFiles endpointFiles; private HDTOptions spec; - private String hdtSpec; + private HDTOptions hdtSpec; private final Map stringConfig = new HashMap<>(); private final List stringObject = new ArrayList<>(); private CompiledSailOptions options; @@ -518,6 +518,21 @@ public CompiledSailCompiler withEndpointFiles(EndpointFiles endpointFiles) { * @throws java.lang.NullPointerException a parameter is null */ public CompiledSailCompiler withHDTSpec(String hdtSpec) { + HDTOptions spec = HDTOptions.of(); + spec.setOptions(hdtSpec); + return withHDTSpec(spec); + } + + + /** + * set the hdt spec for the endpoint store, won't be used if the source + * is defined or if the generated source isn't an endpoint store + * + * @param hdtSpec the spec + * @return this + * @throws java.lang.NullPointerException a parameter is null + */ + public CompiledSailCompiler withHDTSpec(HDTOptions hdtSpec) { this.hdtSpec = Objects.requireNonNull(hdtSpec, "hdtSpec can't be null!"); return this; } diff --git a/qendpoint-store/src/main/java/com/the_qa_company/qendpoint/compiler/CompiledSailOptions.java b/qendpoint-store/src/main/java/com/the_qa_company/qendpoint/compiler/CompiledSailOptions.java index c5425ff5..1d5451d1 100644 --- a/qendpoint-store/src/main/java/com/the_qa_company/qendpoint/compiler/CompiledSailOptions.java +++ b/qendpoint-store/src/main/java/com/the_qa_company/qendpoint/compiler/CompiledSailOptions.java @@ -36,7 +36,7 @@ public class CompiledSailOptions { private int rdf4jSplitUpdate; private int endpointThreshold; private int port; - private String hdtSpec; + private HDTOptions hdtSpec; private int timeoutUpdate; private int timeoutQuery; private Map hdtOptions; @@ -74,7 +74,7 @@ public CompiledSailOptions() { rdf4jSplitUpdate = SailCompilerSchema.RDF_STORE_SPLIT_STORAGE.getHandler().defaultValue(); endpointThreshold = SailCompilerSchema.ENDPOINT_THRESHOLD.getHandler().defaultValue(); port = SailCompilerSchema.SERVER_PORT.getHandler().defaultValue(); - hdtSpec = ""; + hdtSpec = HDTOptions.empty(); timeoutUpdate = SailCompilerSchema.TIMEOUT_UPDATE.getHandler().defaultValue(); timeoutQuery = SailCompilerSchema.TIMEOUT_QUERY.getHandler().defaultValue(); hdtOptions = Map.of(); @@ -96,7 +96,7 @@ void readOptions(SailCompiler.SailCompilerReader reader) { endpointThreshold = reader.searchPropertyValue(SailCompilerSchema.MAIN, SailCompilerSchema.ENDPOINT_THRESHOLD); hdtReadMode = reader.searchPropertyValue(SailCompilerSchema.MAIN, SailCompilerSchema.HDT_READ_MODE_PROPERTY); port = reader.searchPropertyValue(SailCompilerSchema.MAIN, SailCompilerSchema.SERVER_PORT); - hdtSpec = reader.searchPropertyValue(SailCompilerSchema.MAIN, SailCompilerSchema.HDT_SPEC_PROPERTY); + hdtSpec = HDTOptions.of(reader.searchPropertyValue(SailCompilerSchema.MAIN, SailCompilerSchema.HDT_SPEC_PROPERTY)); timeoutUpdate = reader.searchPropertyValue(SailCompilerSchema.MAIN, SailCompilerSchema.TIMEOUT_UPDATE); timeoutQuery = reader.searchPropertyValue(SailCompilerSchema.MAIN, SailCompilerSchema.TIMEOUT_QUERY); hdtOptions = reader.search(SailCompilerSchema.MAIN, SailCompilerSchema.GEN_HDT_OPTION_PARAM).stream() @@ -224,7 +224,7 @@ public void setPort(int port) { this.port = port; } - public String getHdtSpec() { + public HDTOptions getHdtSpec() { return hdtSpec; } @@ -238,10 +238,7 @@ public void setDumpLocation(Path dumpLocation) { * @return HDTOptions */ public HDTOptions createSpecHDTOptions() { - HDTOptions opt = new HDTOptionsBase(); - - // set hdtspec config - opt.setOptions(getHdtSpec()); + HDTOptions opt = HDTOptions.of(getHdtSpec()); // set model config getHdtOptions().forEach(opt::set); @@ -277,7 +274,7 @@ public HDTOptions createHDTOptions(Path endHDT, Path workLocation) { return opt; } - public void setHdtSpec(String hdtSpec) { + public void setHdtSpec(HDTOptions hdtSpec) { this.hdtSpec = hdtSpec; } diff --git a/qendpoint-store/src/main/java/com/the_qa_company/qendpoint/model/SimpleIRIHDT.java b/qendpoint-store/src/main/java/com/the_qa_company/qendpoint/model/SimpleIRIHDT.java index 5d30f41d..7ba529bf 100644 --- a/qendpoint-store/src/main/java/com/the_qa_company/qendpoint/model/SimpleIRIHDT.java +++ b/qendpoint-store/src/main/java/com/the_qa_company/qendpoint/model/SimpleIRIHDT.java @@ -18,8 +18,8 @@ public class SimpleIRIHDT extends AbstractIRI implements HDTValue { public static final byte SUBJECT_POS = 1; public static final byte PREDICATE_POS = 2; public static final byte OBJECT_POS = 3; - public static final byte SHARED_POS = 4; - public static final byte GRAPH_POS = 5; + public static final byte GRAPH_POS = 4; + public static final byte SHARED_POS = 5; public static byte getPos(DictionarySectionRole role) { return switch (role) { @@ -96,6 +96,8 @@ public String stringValue() { charSequence = hdt.getDictionary().idToString(this.id, TripleComponentRole.OBJECT); } else if (this.postion == PREDICATE_POS) { charSequence = hdt.getDictionary().idToString(this.id, TripleComponentRole.PREDICATE); + } else if (this.postion == GRAPH_POS) { + charSequence = hdt.getDictionary().idToString(this.id, TripleComponentRole.GRAPH); } else { throw new EndpointStoreException("bad postion value: " + postion); } @@ -157,6 +159,8 @@ public int hashCode() { prefix += "P"; } else if (this.postion == OBJECT_POS) { prefix += "O"; + } else if (this.postion == GRAPH_POS) { + prefix += "G"; } else { if (iriString != null) { prefix = iriString; diff --git a/qendpoint-store/src/main/java/com/the_qa_company/qendpoint/store/EndpointStore.java b/qendpoint-store/src/main/java/com/the_qa_company/qendpoint/store/EndpointStore.java index fcc54e72..7188027a 100644 --- a/qendpoint-store/src/main/java/com/the_qa_company/qendpoint/store/EndpointStore.java +++ b/qendpoint-store/src/main/java/com/the_qa_company/qendpoint/store/EndpointStore.java @@ -81,6 +81,10 @@ public class EndpointStore extends AbstractNotifyingSail { * set the user locales */ public static final String QUERY_CONFIG_USER_LOCALES = "user_locales"; + /** + * enable the merge join, default true + */ + public static final String OPTION_QENDPOINT_MERGE_JOIN = "qendpoint.mergejoin"; private static final AtomicLong ENDPOINT_DEBUG_ID_GEN = new AtomicLong(); private static final Logger logger = LoggerFactory.getLogger(EndpointStore.class); private final long debugId; @@ -438,9 +442,20 @@ public void setFreezedStoreStore(AbstractNotifyingSail sail) { } } - // force access to the store via reflection, the library does not allow - // directly since the method is protected + /** + * force access to the store via reflection, the library does not allow directly since the method is protected + * @return sailstore + * @deprecated use {@link #getCurrentSailStore()} instead + */ + @Deprecated public SailStore getCurrentSaliStore() { + return getCurrentSailStore(); + } + /** + * force access to the store via reflection, the library does not allow directly since the method is protected + * @return sailstore + */ + public SailStore getCurrentSailStore() { try { Sail sail = getChangingStore(); Method method = sail.getClass().getDeclaredMethod("getSailStore"); @@ -650,7 +665,7 @@ public void initTempDeleteArray() throws IOException { } for (MultiLayerBitmapWrapper b : this.tempdeleteBitMap) { if (b != null) { - ((BitArrayDisk) b.getHandle()).force(false); + b.getHandle().force(false); } } } @@ -790,9 +805,9 @@ public void resetDeleteArray(HDT newHdt) throws IOException { TripleID tid = next.next(); long newIndex = next.getLastTriplePosition(); - int graph; + long graph; if (newHdt.getDictionary().supportGraphs()) { - graph = (int) tid.getGraph(); + graph = tid.getGraph(); } else { graph = 1; } @@ -808,8 +823,7 @@ public void resetDeleteArray(HDT newHdt) throws IOException { Closer.closeSingle(getDeleteBitMaps()); try { for (TripleComponentOrder sorder : validOrders) { - MultiLayerBitmapWrapper.MultiLayerModBitmapWrapper bitArrayDisks = newDeleteArray[sorder.ordinal()]; - ((BitArrayDisk) bitArrayDisks.getHandle()) + newDeleteArray[sorder.ordinal()].getHandle() .changeToInDisk(new File(endpointFiles.getTripleDeleteArr(sorder))); } } catch (Throwable t) { @@ -851,9 +865,11 @@ public void markDeletedTempTriples() throws IOException { searchId = new TripleID(sid, pid, oid); } + logger.info("search triple {}", searchId); IteratorTripleID search = this.hdt.getTriples().search(searchId); while (search.hasNext()) { TripleID ts = search.next(); + logger.info("-> {}", ts); long index = search.getLastTriplePosition(); if (index >= 0) { TripleComponentOrder order; @@ -1083,14 +1099,14 @@ public long countTriplesNativeStore() { public void flushWrites() throws IOException { for (MultiLayerBitmapWrapper.MultiLayerModBitmapWrapper b : deleteBitMap) { if (b != null) { - ((BitArrayDisk) b.getHandle()).force(true); + b.getHandle().force(true); } } if (isMerging()) { getRdfWriterTempTriples().getWriter().flush(); for (MultiLayerBitmapWrapper.MultiLayerModBitmapWrapper b : tempdeleteBitMap) { if (b != null) { - ((BitArrayDisk) b.getHandle()).force(true); + b.getHandle().force(true); } } } diff --git a/qendpoint-store/src/main/java/com/the_qa_company/qendpoint/store/EndpointStoreConnection.java b/qendpoint-store/src/main/java/com/the_qa_company/qendpoint/store/EndpointStoreConnection.java index bd8a4314..c5439fbf 100644 --- a/qendpoint-store/src/main/java/com/the_qa_company/qendpoint/store/EndpointStoreConnection.java +++ b/qendpoint-store/src/main/java/com/the_qa_company/qendpoint/store/EndpointStoreConnection.java @@ -2,7 +2,6 @@ import com.the_qa_company.qendpoint.compiler.ConfigSailConnection; import com.the_qa_company.qendpoint.core.enums.TripleComponentOrder; -import com.the_qa_company.qendpoint.model.HDTValue; import com.the_qa_company.qendpoint.store.exception.EndpointTimeoutException; import org.eclipse.rdf4j.common.concurrent.locks.Lock; import org.eclipse.rdf4j.common.iteration.CloseableIteration; @@ -61,7 +60,7 @@ public class EndpointStoreConnection extends SailSourceConnection implements Con private final Map config = new HashMap<>(); public EndpointStoreConnection(EndpointStore endpoint) throws InterruptedException { - super(endpoint, endpoint.getCurrentSaliStore(), new StrictEvaluationStrategyFactory()); + super(endpoint, endpoint.getCurrentSailStore(), new StrictEvaluationStrategyFactory()); this.debugId = DEBUG_ID_STORE.getAndIncrement(); this.endpoint = endpoint; EndpointStoreUtils.openConnection(this); @@ -420,7 +419,7 @@ public void flush() throws SailException { try { endpoint.flushWrites(); } catch (IOException e) { - throw new SailException("Can't flush enpoint store writes", e); + throw new SailException("Can't flush endpoint store writes", e); } } this.connA_write.flush(); @@ -558,7 +557,7 @@ public void removeStatement(UpdateContext op, Resource subj, IRI pred, Value obj this.getCurrentConnectionWrite().removeStatement(op, newSubj, newPred, newObj, contexts); } - assignBitMapDeletes(tid, newSubj, newPred, newObj, contexts, null); + assignBitMapDeletes(tid, subj, pred, obj, contexts, null); } else { long[] contextIds = new long[contexts.length]; Resource[] newcontexts = this.endpoint.getHdtConverter().graphIdToIRI(contexts, contextIds); @@ -616,16 +615,17 @@ private void assignBitMapDeletes(TripleID tid, Resource subj, IRI pred, Value ob long o = tid.getObject(); TripleID tripleID = new TripleID(s, p, o); + boolean supportGraphs = endpoint.getHdt().getDictionary().supportGraphs(); if (s != -1 && p != -1 && o != -1) { - if (contexts.length == 0 || !endpoint.getHdt().getDictionary().supportGraphs()) { - if (endpoint.getHdt().getDictionary().supportGraphs()) { - tripleID.setGraph(endpoint.getHdtProps().getDefaultGraph()); + if (contexts.length == 0 || !supportGraphs) { + if (supportGraphs) { + tripleID.setGraph(0); // all patterns } for (TripleComponentOrder order : endpoint.getValidOrders()) { IteratorTripleID iter = endpoint.getHdt().getTriples().search(tripleID, order.mask); - if (iter.hasNext()) { - iter.next(); + while (iter.hasNext()) { + TripleID removedId = iter.next(); long index = iter.getLastTriplePosition(); assert iter.isLastTriplePositionBoundToOrder(); @@ -633,10 +633,17 @@ private void assignBitMapDeletes(TripleID tid, Resource subj, IRI pred, Value ob assert sorder == order; - if (!this.endpoint.getDeleteBitMap(sorder).access(0, index)) { - this.endpoint.getDeleteBitMap(sorder).set(0, index, true); + long layer; + if (supportGraphs) { + layer = removedId.getGraph() - 1; + } else { + layer = 0; + } + + if (!this.endpoint.getDeleteBitMap(sorder).access(layer, index)) { + this.endpoint.getDeleteBitMap(sorder).set(layer, index, true); if (this.endpoint.isMerging()) { - this.endpoint.getTempDeleteBitMap(sorder).set(0, index, true); + this.endpoint.getTempDeleteBitMap(sorder).set(layer, index, true); } if (order == TripleComponentOrder.SPO) { notifyStatementRemoved(this.endpoint.getValueFactory().createStatement(subj, pred, obj)); @@ -647,11 +654,7 @@ private void assignBitMapDeletes(TripleID tid, Resource subj, IRI pred, Value ob } else { for (int i = 0; i < contexts.length; i++) { Resource context = contexts[i]; - if (context == null) { - tripleID.setGraph(endpoint.getHdtProps().getDefaultGraph()); - } else { - tripleID.setGraph(contextIds[i]); - } + tripleID.setGraph(contextIds[i]); if (tripleID.getGraph() == -1) { continue; // bad context } @@ -659,16 +662,17 @@ private void assignBitMapDeletes(TripleID tid, Resource subj, IRI pred, Value ob IteratorTripleID iter = endpoint.getHdt().getTriples().search(tripleID, order.mask); if (iter.hasNext()) { - iter.next(); + TripleID removedId = iter.next(); long index = iter.getLastTriplePosition(); assert iter.isLastTriplePositionBoundToOrder(); TripleComponentOrder sorder = iter.getOrder(); - if (!this.endpoint.getDeleteBitMap(sorder).access(index)) { - this.endpoint.getDeleteBitMap(sorder).set(tripleID.getGraph() - 1, index, true); + + if (!this.endpoint.getDeleteBitMap(sorder).access(removedId.getGraph() - 1, index)) { + this.endpoint.getDeleteBitMap(sorder).set(removedId.getGraph() - 1, index, true); if (this.endpoint.isMerging()) { - this.endpoint.getTempDeleteBitMap(sorder).set(tripleID.getGraph() - 1, index, true); + this.endpoint.getTempDeleteBitMap(sorder).set(removedId.getGraph() - 1, index, true); } if (order == TripleComponentOrder.SPO) { notifyStatementRemoved(this.endpoint.getValueFactory().createStatement(subj, pred, obj, context)); @@ -687,7 +691,7 @@ private void assignBitMapDeletes(TripleID tid, Resource subj, IRI pred, Value ob NTriplesWriter writer = this.endpoint.getRdfWriterTempTriples(); if (writer != null) { synchronized (writer) { - if (contexts.length == 0 || !this.endpoint.getHdt().getDictionary().supportGraphs()) { + if (contexts.length == 0 || !supportGraphs) { writer.handleStatement(this.endpoint.getValueFactory().createStatement(subj, pred, obj)); } else { for (Resource ctx : contexts) { diff --git a/qendpoint-store/src/main/java/com/the_qa_company/qendpoint/store/EndpointStoreQueryPreparer.java b/qendpoint-store/src/main/java/com/the_qa_company/qendpoint/store/EndpointStoreQueryPreparer.java index d5e66d27..1ddf2780 100644 --- a/qendpoint-store/src/main/java/com/the_qa_company/qendpoint/store/EndpointStoreQueryPreparer.java +++ b/qendpoint-store/src/main/java/com/the_qa_company/qendpoint/store/EndpointStoreQueryPreparer.java @@ -60,7 +60,7 @@ public EndpointStoreQueryPreparer(EndpointStore endpoint, EndpointTripleSource t cloneTupleExpression = true; evaluationStatistics = new EndpointStoreEvaluationStatistics(new EndpointStoreEvaluationStatisticsHDT(endpoint), - endpoint.getCurrentSaliStore().getEvaluationStatistics()); + endpoint.getCurrentSailStore().getEvaluationStatistics()); } public void setExplanationLevel(Explanation.Level level) { diff --git a/qendpoint-store/src/main/java/com/the_qa_company/qendpoint/store/EndpointStoreTripleIterator.java b/qendpoint-store/src/main/java/com/the_qa_company/qendpoint/store/EndpointStoreTripleIterator.java index 650acee6..1c082838 100644 --- a/qendpoint-store/src/main/java/com/the_qa_company/qendpoint/store/EndpointStoreTripleIterator.java +++ b/qendpoint-store/src/main/java/com/the_qa_company/qendpoint/store/EndpointStoreTripleIterator.java @@ -50,6 +50,7 @@ public boolean hasNext() { if (connection.isTimeout()) { throw new EndpointTimeoutException(); } + boolean supportGraphs = endpoint.getHdt().getDictionary().supportGraphs(); // iterate over the result of hdt while (iterator.hasNext()) { TripleID tripleID = iterator.next(); @@ -63,7 +64,12 @@ public boolean hasNext() { if (logger.isTraceEnabled()) { logger.trace("From HDT {} {} {} ", subject, predicate, object); } - next = endpointTripleSource.getValueFactory().createStatement(subject, predicate, object); + if (supportGraphs) { + Resource ctx = tripleID.isQuad() ? endpoint.getHdtConverter().idToGraphHDTResource(tripleID.getGraph()) : null; + next = endpointTripleSource.getValueFactory().createStatement(subject, predicate, object, ctx); + } else { + next = endpointTripleSource.getValueFactory().createStatement(subject, predicate, object); + } return true; } } @@ -73,8 +79,10 @@ public boolean hasNext() { Resource newSubj = endpoint.getHdtConverter().rdf4jToHdtIDsubject(stm.getSubject()); IRI newPred = endpoint.getHdtConverter().rdf4jToHdtIDpredicate(stm.getPredicate()); Value newObject = endpoint.getHdtConverter().rdf4jToHdtIDobject(stm.getObject()); + Resource newContext = endpoint.getHdtConverter().rdf4jToHdtIDcontext(stm.getContext()); + next = endpointTripleSource.getValueFactory().createStatement(newSubj, newPred, newObject, - stm.getContext()); + newContext); if (logger.isTraceEnabled()) { logger.trace("From RDF4j {} {} {}", next.getSubject(), next.getPredicate(), next.getObject()); } diff --git a/qendpoint-store/src/main/java/com/the_qa_company/qendpoint/store/EndpointTripleSource.java b/qendpoint-store/src/main/java/com/the_qa_company/qendpoint/store/EndpointTripleSource.java index 93b3807d..94f1b58b 100644 --- a/qendpoint-store/src/main/java/com/the_qa_company/qendpoint/store/EndpointTripleSource.java +++ b/qendpoint-store/src/main/java/com/the_qa_company/qendpoint/store/EndpointTripleSource.java @@ -2,6 +2,7 @@ import com.the_qa_company.qendpoint.core.enums.TripleComponentOrder; import com.the_qa_company.qendpoint.core.enums.TripleComponentRole; +import com.the_qa_company.qendpoint.core.iterator.utils.GraphFilteringTripleId; import com.the_qa_company.qendpoint.core.triples.IteratorTripleID; import com.the_qa_company.qendpoint.core.triples.TripleID; import com.the_qa_company.qendpoint.core.triples.impl.EmptyTriplesIterator; @@ -48,7 +49,7 @@ public EndpointTripleSource(EndpointStoreConnection endpointStoreConnection, End this.endpoint = endpoint; this.numberOfCurrentTriples = endpoint.getHdt().getTriples().getNumberOfElements(); this.endpointStoreConnection = endpointStoreConnection; - this.enableMergeJoin = endpoint.getHDTSpec().getBoolean("qendpoint.mergejoin", true); + this.enableMergeJoin = endpoint.getHDTSpec().getBoolean(EndpointStore.OPTION_QENDPOINT_MERGE_JOIN, true); } private void initHDTIndex() { @@ -91,13 +92,17 @@ public CloseableIteration getStatements(StatementOrder stat initHDTIndex(); } + boolean graph = endpoint.getHdt().getDictionary().supportGraphs(); + // convert uris into ids if needed Resource newSubj; IRI newPred; Value newObj; + Resource[] newContextes; long subjectID = this.endpoint.getHdtConverter().subjectToID(subj); long predicateID = this.endpoint.getHdtConverter().predicateToID(pred); long objectID = this.endpoint.getHdtConverter().objectToID(obj); + long[] graphID; if (subjectID == 0 || subjectID == -1) { newSubj = subj; @@ -115,7 +120,15 @@ public CloseableIteration getStatements(StatementOrder stat newObj = this.endpoint.getHdtConverter().objectIdToIRI(objectID); } - logger.debug("SEARCH {} {} {}", newSubj, newPred, newObj); + if (graph) { + graphID = new long[contexts.length]; + newContextes = this.endpoint.getHdtConverter().graphIdToIRI(contexts, graphID); + } else { + graphID = null; + newContextes = contexts; + } + + //logger.debug("SEARCH {} {} {}", newSubj, newPred, newObj); // check if we need to search over the delta and if yes, search CloseableIteration repositoryResult; @@ -130,15 +143,15 @@ public CloseableIteration getStatements(StatementOrder stat // query both native stores logger.debug("Query both RDF4j stores!"); CloseableIteration repositoryResult1 = this.endpointStoreConnection.getConnA_read() - .getStatements(newSubj, newPred, newObj, false, contexts); + .getStatements(newSubj, newPred, newObj, false, newContextes); CloseableIteration repositoryResult2 = this.endpointStoreConnection.getConnB_read() - .getStatements(newSubj, newPred, newObj, false, contexts); + .getStatements(newSubj, newPred, newObj, false, newContextes); repositoryResult = new CombinedNativeStoreResult(repositoryResult1, repositoryResult2); } else { logger.debug("Query only one RDF4j stores!"); repositoryResult = this.endpointStoreConnection.getCurrentConnectionRead().getStatements(newSubj, - newPred, newObj, false, contexts); + newPred, newObj, false, newContextes); } } else { logger.debug("Not searching over native store"); @@ -148,18 +161,34 @@ public CloseableIteration getStatements(StatementOrder stat // iterate over the HDT file IteratorTripleID iterator; if (subjectID != -1 && predicateID != -1 && objectID != -1) { - logger.debug("Searching over HDT {} {} {}", subjectID, predicateID, objectID); + //logger.debug("Searching over HDT {} {} {}", subjectID, predicateID, objectID); TripleID t = new TripleID(subjectID, predicateID, objectID); - if (statementOrder != null) { - int indexMaskMatchingStatementOrder = getIndexMaskMatchingStatementOrder(statementOrder, subj, pred, - obj, t); + if (graph && contexts.length > 1) { + if (statementOrder != null) { + int indexMaskMatchingStatementOrder = getIndexMaskMatchingStatementOrder(statementOrder, subj, pred, + obj, t); - // search with the ID to check if the triples has been deleted - iterator = this.endpoint.getHdt().getTriples().search(t, indexMaskMatchingStatementOrder); + // search with the ID to check if the triples has been deleted + iterator = new GraphFilteringTripleId(this.endpoint.getHdt().getTriples().search(t, indexMaskMatchingStatementOrder), graphID); + } else { + // search with the ID to check if the triples has been deleted + iterator = new GraphFilteringTripleId(this.endpoint.getHdt().getTriples().search(t), graphID); + } } else { - // search with the ID to check if the triples has been deleted - iterator = this.endpoint.getHdt().getTriples().search(t); + if (graph && contexts.length == 1) { + t.setGraph(graphID[0]); + } + if (statementOrder != null) { + int indexMaskMatchingStatementOrder = getIndexMaskMatchingStatementOrder(statementOrder, subj, pred, + obj, t); + + // search with the ID to check if the triples has been deleted + iterator = this.endpoint.getHdt().getTriples().search(t, indexMaskMatchingStatementOrder); + } else { + // search with the ID to check if the triples has been deleted + iterator = this.endpoint.getHdt().getTriples().search(t); + } } } else {// no need to search over hdt diff --git a/qendpoint-store/src/main/java/com/the_qa_company/qendpoint/store/HDTConverter.java b/qendpoint-store/src/main/java/com/the_qa_company/qendpoint/store/HDTConverter.java index e6229410..5a50b9d2 100644 --- a/qendpoint-store/src/main/java/com/the_qa_company/qendpoint/store/HDTConverter.java +++ b/qendpoint-store/src/main/java/com/the_qa_company/qendpoint/store/HDTConverter.java @@ -124,20 +124,18 @@ public IRI graphIdToIRI(long id) { } public Resource[] graphIdToIRI(Resource[] contexts, long[] contextIds) { - if (contexts.length == 0 || Arrays.stream(contexts).noneMatch(Objects::nonNull)) { + if (contexts.length == 0) { return contexts; // nothing to remap } Resource[] newcontexts = new Resource[contexts.length]; for (int i = 0; i < newcontexts.length; i++) { - if (contexts[i] != null) { - contextIds[i] = contextToID(contexts[i]); - if (contextIds[i] > 0) { - newcontexts[i] = graphIdToIRI(contextIds[i]); - } else { - newcontexts[i] = contexts[i]; - } + contextIds[i] = contextToID(contexts[i]); + if (contextIds[i] > 0) { + newcontexts[i] = graphIdToIRI(contextIds[i]); + } else { + newcontexts[i] = contexts[i]; } } @@ -170,7 +168,7 @@ public Value rdf4jToHdtIDobject(Value object) { public Resource rdf4jToHdtIDcontext(Resource ctx) { if (ctx == null || !hdt.getDictionary().supportGraphs()) { - return null; + return ctx; } long id = rdf4jContextToHdtID(ctx); if (id != -1) { @@ -180,6 +178,9 @@ public Resource rdf4jToHdtIDcontext(Resource ctx) { } public long rdf4jSubjectToHdtID(Resource subj) { + if (subj == null) { + return -1; + } String iriString = subj.stringValue(); if (iriString.startsWith((HDT_URI))) { if (iriString.startsWith("SO", HDT_URI.length())) { @@ -192,6 +193,9 @@ public long rdf4jSubjectToHdtID(Resource subj) { } public long rdf4jPredicateToHdtID(IRI pred) { + if (pred == null) { + return -1; + } String iriString = pred.stringValue(); if (iriString.startsWith((HDT_URI))) { if (iriString.startsWith("P", HDT_URI.length())) { @@ -202,6 +206,9 @@ public long rdf4jPredicateToHdtID(IRI pred) { } public long rdf4jObjectToHdtID(Value object) { + if (object == null) { + return -1; + } String iriString = object.stringValue(); if (iriString.startsWith(HDT_URI)) { if (iriString.startsWith("SO", HDT_URI.length())) { @@ -215,7 +222,7 @@ public long rdf4jObjectToHdtID(Value object) { public long rdf4jContextToHdtID(Resource ctx) { if (ctx == null) { - return -1; + return endpoint.getHdtProps().getDefaultGraph(); } String iriString = ctx.stringValue(); if (iriString.startsWith((HDT_URI))) { @@ -300,12 +307,14 @@ private Value idToObjectHDTResource0(long objectID) { } public Resource idToGraphHDTResource(long graphID) { + if (graphID == endpoint.getHdtProps().getDefaultGraph()) { + return null; + } if ((graphID >= endpoint.getHdtProps().getStartBlankGraph() && graphID <= endpoint.getHdtProps().getEndBlankGraph())) { return new SimpleBNodeHDT(hdt, SimpleIRIHDT.GRAPH_POS, graphID); - } else { - return new SimpleIRIHDT(hdt, SimpleIRIHDT.GRAPH_POS, graphID); } + return new SimpleIRIHDT(hdt, SimpleIRIHDT.GRAPH_POS, graphID); } public Resource subjectHdtResourceToResource(Resource subject) { diff --git a/qendpoint-store/src/main/java/com/the_qa_company/qendpoint/store/MergeRunnable.java b/qendpoint-store/src/main/java/com/the_qa_company/qendpoint/store/MergeRunnable.java index 2e405d86..2d11b0d5 100644 --- a/qendpoint-store/src/main/java/com/the_qa_company/qendpoint/store/MergeRunnable.java +++ b/qendpoint-store/src/main/java/com/the_qa_company/qendpoint/store/MergeRunnable.java @@ -640,10 +640,10 @@ private synchronized void step2(boolean restarting, Lock lock, EndpointStoreDump logger.debug("Create HDT index from dumped file"); createHDTDump(endpointFiles.getRDFTempOutput(graph), endpointFiles.getHDTTempOutput()); // cat the original index and the temp index - logger.debug("HDT Cat/Diff"); + logger.debug("HDT diffcat"); catDiffIndexes(endpointFiles.getHDTIndex(), endpointFiles.getTripleDeleteCopyArr(TripleComponentOrder.SPO), endpointFiles.getHDTTempOutput(), endpointFiles.getHDTNewIndex()); - logger.debug("CAT completed!!!!! " + endpointFiles.getLocationHdt()); + logger.debug("CAT completed {}", endpointFiles.getLocationHdt()); // #391: save DUMP HDT if (dumpInfo != null) { @@ -686,12 +686,12 @@ private Step3SubStep getStep3SubStep() { boolean existsOldTripleDeleteTempArr = endpoint.getValidOrders().stream() .anyMatch(order -> existsOld(endpointFiles.getTripleDeleteTempArr(order))); - if (!exists(endpointFiles.getHDTNewIndexV11()) && existsOldTripleDeleteTempArr) { + if (existsOldTripleDeleteTempArr && !exists(endpointFiles.getHDTNewIndexV11())) { // after rename(endpointFiles.getHDTNewIndexV11(), // endpointFiles.getHDTIndexV11()); return Step3SubStep.AFTER_INDEX_V11_RENAME; } - if (!exists(endpointFiles.getHDTNewIndex()) && existsOldTripleDeleteTempArr) { + if (existsOldTripleDeleteTempArr && !exists(endpointFiles.getHDTNewIndex())) { // after rename(endpointFiles.getHDTNewIndex(), // endpointFiles.getHDTIndex()); return Step3SubStep.AFTER_INDEX_RENAME; @@ -904,7 +904,7 @@ private void createHDTDump(String rdfInput, String hdtOutput) throws IOException oopt.setOverride(HDTOptionsKeys.LOADER_DISK_LOCATION_KEY, location.resolve("gen")); oopt.setOverride(HDTOptionsKeys.LOADER_DISK_FUTURE_HDT_LOCATION_KEY, location.resolve("wip.hdt")); try { - try (HDT hdt = HDTManager.generateHDT(new File(rdfInput).getAbsolutePath(), baseURI, RDFNotation.NTRIPLES, + try (HDT hdt = HDTManager.generateHDT(new File(rdfInput).getAbsolutePath(), baseURI, RDFNotation.guess(rdfInput), oopt, null)) { logger.info("File converted in: " + sw.stopAndShow()); hdt.saveToHDT(hdtOutput, null); @@ -945,7 +945,7 @@ private void writeTempFile(RepositoryConnection connection, String file, boolean Statement stmConverted; if (graph) { - Resource newCtxIRI = this.endpoint.getHdtConverter().rdf4jToHdtIDsubject(stm.getContext()); + Resource newCtxIRI = this.endpoint.getHdtConverter().rdf4jToHdtIDcontext(stm.getContext()); newCtxIRI = this.endpoint.getHdtConverter().subjectHdtResourceToResource(newCtxIRI); stmConverted = this.endpoint.getValueFactory().createStatement( @@ -982,7 +982,7 @@ private void convertOldToNew(HDT newHDT, boolean graph) throws IOException { Resource oldSubject = iriConverter.rdf4jToHdtIDsubject(s.getSubject()); IRI oldPredicate = iriConverter.rdf4jToHdtIDpredicate(s.getPredicate()); Value oldObject = iriConverter.rdf4jToHdtIDobject(s.getObject()); - Resource oldContext = graph ? s.getContext() : null; + Resource oldContext = graph ? iriConverter.rdf4jToHdtIDcontext(s.getContext()) : null; // if the old string cannot be converted than we can // keep the diff --git a/qendpoint-store/src/main/java/com/the_qa_company/qendpoint/utils/BinarySearch.java b/qendpoint-store/src/main/java/com/the_qa_company/qendpoint/utils/BinarySearch.java index bbe87e8d..e83e14cf 100644 --- a/qendpoint-store/src/main/java/com/the_qa_company/qendpoint/utils/BinarySearch.java +++ b/qendpoint-store/src/main/java/com/the_qa_company/qendpoint/utils/BinarySearch.java @@ -57,9 +57,11 @@ public static long last(Dictionary dictionary, long low, long high, long n, Stri long mid = (high + low) / 2; int c = -1; if (mid != n) { - c = COMPARATOR.compare(string, dictionary.idToString(mid + 1, role).toString().subSequence(0, 1)); + String s = dictionary.idToString(mid + 1, role).toString(); + c = COMPARATOR.compare(string, s.isEmpty() ? "" : s.subSequence(0, 1)); } - int c2 = COMPARATOR.compare(string, dictionary.idToString(mid, role).toString().subSequence(0, 1)); + String s = dictionary.idToString(mid, role).toString(); + int c2 = COMPARATOR.compare(string, s.isEmpty() ? "": s.subSequence(0, 1)); // System.out.println("c"+c); // System.out.println("c2 "+c2); if ((mid == n || c != 0) && c2 == 0) diff --git a/qendpoint-store/src/test/java/com/the_qa_company/qendpoint/store/EndpointStoreGraphTest.java b/qendpoint-store/src/test/java/com/the_qa_company/qendpoint/store/EndpointStoreGraphTest.java new file mode 100644 index 00000000..655bbea1 --- /dev/null +++ b/qendpoint-store/src/test/java/com/the_qa_company/qendpoint/store/EndpointStoreGraphTest.java @@ -0,0 +1,296 @@ +package com.the_qa_company.qendpoint.store; + +import com.the_qa_company.qendpoint.compiler.CompiledSail; +import com.the_qa_company.qendpoint.compiler.SparqlRepository; +import com.the_qa_company.qendpoint.core.compact.bitmap.MultiLayerBitmapWrapper; +import com.the_qa_company.qendpoint.core.enums.TripleComponentOrder; +import com.the_qa_company.qendpoint.core.exceptions.NotFoundException; +import com.the_qa_company.qendpoint.core.options.HDTOptions; +import com.the_qa_company.qendpoint.core.options.HDTOptionsKeys; +import com.the_qa_company.qendpoint.model.HDTValue; +import com.the_qa_company.qendpoint.model.SimpleIRIHDT; +import org.eclipse.rdf4j.model.Resource; +import org.eclipse.rdf4j.model.Statement; +import org.eclipse.rdf4j.model.Value; +import org.eclipse.rdf4j.model.ValueFactory; +import org.eclipse.rdf4j.repository.RepositoryResult; +import org.eclipse.rdf4j.repository.sail.SailRepositoryConnection; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; + +public class EndpointStoreGraphTest { + + @Rule + public TemporaryFolder tempDir = TemporaryFolder.builder().assureDeletion().build(); + + + public static String posStr(int pos) { + return switch (pos) { + case SimpleIRIHDT.SUBJECT_POS -> "s"; + case SimpleIRIHDT.PREDICATE_POS -> "p"; + case SimpleIRIHDT.OBJECT_POS -> "o"; + case SimpleIRIHDT.SHARED_POS -> "sh"; + case SimpleIRIHDT.GRAPH_POS -> "g"; + default -> "unk" + pos; + }; + } + + public static String printVal(Value val) { + if (val instanceof HDTValue hv) { + return hv + "(" + hv.getHDTId() + "/" + posStr(hv.getHDTPosition()) + ")"; + } + return val.toString(); + } + public static void printStmt(Statement stmt) { + System.out.print(printVal(stmt.getSubject()) + ", " + printVal(stmt.getPredicate()) + ", " + printVal(stmt.getObject())); + if (stmt.getContext() != null) { + System.out.println(" [" + printVal(stmt.getContext()) + "]"); + } else { + System.out.println(); + } + } + + @Before + public void setupTest() { + MergeRunnableStopPoint.debug = true; + } + + @After + public void clearTest() { + MergeRunnableStopPoint.debug = false; + MergeRunnableStopPoint.unlockAll(); + } + + @Test + public void graphTest() throws IOException, InterruptedException, NotFoundException { + File roo = tempDir.getRoot(); + HDTOptions spec = HDTOptions.of( + // dict + HDTOptionsKeys.DICTIONARY_TYPE_KEY, HDTOptionsKeys.DICTIONARY_TYPE_VALUE_MULTI_OBJECTS_LANG_QUAD + // temp dict + , HDTOptionsKeys.TEMP_DICTIONARY_IMPL_KEY, HDTOptionsKeys.TEMP_DICTIONARY_IMPL_VALUE_HASH_QUAD + // merge join? + //, EndpointStore.OPTION_QENDPOINT_MERGE_JOIN, false + + ); + SparqlRepository sparqlRepository = CompiledSail.compiler() + .withEndpointFiles(new EndpointFiles(roo.toPath())) + .withHDTSpec(spec).compileToSparqlRepository(); + + EndpointStore endpointStore = (EndpointStore) ((CompiledSail) sparqlRepository.getRepository().getSail()).getSource(); + + ValueFactory vf = sparqlRepository.getRepository().getValueFactory(); + try { + { + System.out.println("ADD"); + try (SailRepositoryConnection conn = sparqlRepository.getConnection()) { + conn.begin(); + conn.add(vf.createStatement( + vf.createIRI(Utility.EXAMPLE_NAMESPACE + "aa"), + vf.createIRI(Utility.EXAMPLE_NAMESPACE + "o"), + vf.createIRI(Utility.EXAMPLE_NAMESPACE + "aa"), + vf.createIRI(Utility.EXAMPLE_NAMESPACE + "graph") + )); + conn.add(vf.createStatement( + vf.createIRI(Utility.EXAMPLE_NAMESPACE + "aa"), + vf.createIRI(Utility.EXAMPLE_NAMESPACE + "o"), + vf.createIRI(Utility.EXAMPLE_NAMESPACE + "aa") + )); + conn.add(vf.createStatement( + vf.createIRI(Utility.EXAMPLE_NAMESPACE + "ss"), + vf.createIRI(Utility.EXAMPLE_NAMESPACE + "p"), + vf.createIRI(Utility.EXAMPLE_NAMESPACE + "aa5") + )); + conn.add(vf.createStatement( + vf.createIRI(Utility.EXAMPLE_NAMESPACE + "ss"), + vf.createIRI(Utility.EXAMPLE_NAMESPACE + "p"), + vf.createIRI(Utility.EXAMPLE_NAMESPACE + "aa6") + )); + conn.commit(); + } + } + System.out.println("PRE MERGE"); + { + try (SailRepositoryConnection conn = sparqlRepository.getConnection()) { + try (RepositoryResult st = conn.getStatements(null, null, null, false)) { + st.stream().forEach(EndpointStoreGraphTest::printStmt); + } + + } + } + System.out.println("MERGE"); + endpointStore.mergeStore(); + MergeRunnable.debugWaitMerge(); + { + try (SailRepositoryConnection conn = sparqlRepository.getConnection()) { + System.out.println("SEARCH GRAPH"); + try (RepositoryResult st = conn.getStatements(null, null, null, false, vf.createIRI(Utility.EXAMPLE_NAMESPACE + "graph"))) { + st.stream().forEach(EndpointStoreGraphTest::printStmt); + } + System.out.println("SEARCH NO GRAPH"); + try (RepositoryResult st = conn.getStatements(null, null, null, false, (Resource) null)) { + st.stream().forEach(EndpointStoreGraphTest::printStmt); + } + System.out.println("SEARCH BOTH GRAPH"); + try (RepositoryResult st = conn.getStatements(null, null, null, false, vf.createIRI(Utility.EXAMPLE_NAMESPACE + "graph"), null)) { + st.stream().forEach(EndpointStoreGraphTest::printStmt); + } + + } + } + { + System.out.println("REM DEFAULT"); + try (SailRepositoryConnection conn = sparqlRepository.getConnection()) { + conn.begin(); + conn.remove( + vf.createIRI(Utility.EXAMPLE_NAMESPACE + "aa"), + vf.createIRI(Utility.EXAMPLE_NAMESPACE + "o"), + vf.createIRI(Utility.EXAMPLE_NAMESPACE + "aa"), + (Resource) null); + conn.commit(); + } + } + { + try (SailRepositoryConnection conn = sparqlRepository.getConnection()) { + try (RepositoryResult st = conn.getStatements(null, null, null, false)) { + st.stream().forEach(EndpointStoreGraphTest::printStmt); + } + + } + } + { + System.out.println("REM GRAPH"); + try (SailRepositoryConnection conn = sparqlRepository.getConnection()) { + conn.begin(); + conn.remove(vf.createStatement( + vf.createIRI(Utility.EXAMPLE_NAMESPACE + "aa"), + vf.createIRI(Utility.EXAMPLE_NAMESPACE + "o"), + vf.createIRI(Utility.EXAMPLE_NAMESPACE + "aa") + ), vf.createIRI(Utility.EXAMPLE_NAMESPACE + "graph")); + conn.commit(); + } + } + { + try (SailRepositoryConnection conn = sparqlRepository.getConnection()) { + try (RepositoryResult st = conn.getStatements(null, null, null, false)) { + st.stream().forEach(EndpointStoreGraphTest::printStmt); + } + + } + } + System.out.println("deletes:"); + long size = endpointStore.getHdt().getTriples().getNumberOfElements(); + long graphs = endpointStore.getGraphsCount(); + for (int i = 0; i < endpointStore.getDeleteBitMaps().length; i++) { + TripleComponentOrder order = TripleComponentOrder.values()[i]; + MultiLayerBitmapWrapper.MultiLayerModBitmapWrapper bm = endpointStore.getDeleteBitMaps()[i]; + if (bm == null) { + continue; + } + + System.out.println(order); + for (int g = 0; g < graphs; g++) { + System.out.print("g" + (g + 1) + ": "); + for (long t = 0; t < size; t++) { + System.out.print(bm.access(g, t) ? "1" : "0"); + } + System.out.println(); + } + } + System.out.println("ADD"); + { + try (SailRepositoryConnection conn = sparqlRepository.getConnection()) { + conn.begin(); + conn.add(vf.createStatement( + vf.createIRI(Utility.EXAMPLE_NAMESPACE + "aa"), + vf.createIRI(Utility.EXAMPLE_NAMESPACE + "o"), + vf.createIRI(Utility.EXAMPLE_NAMESPACE + "aa2"), + vf.createIRI(Utility.EXAMPLE_NAMESPACE + "graph2") + )); + conn.commit(); + } + } + System.out.println("MERGE"); + MergeRunnableStopPoint.STEP2_START.debugLock(); + MergeRunnableStopPoint.STEP2_START.debugLockTest(); + MergeRunnableStopPoint.STEP2_END.debugLock(); + MergeRunnableStopPoint.STEP2_END.debugLockTest(); + endpointStore.mergeStore(); + + MergeRunnableStopPoint.STEP2_START.debugWaitForEvent(); + { + System.out.println("ADD merge"); + { + try (SailRepositoryConnection conn = sparqlRepository.getConnection()) { + conn.begin(); + conn.add(vf.createStatement( + vf.createIRI(Utility.EXAMPLE_NAMESPACE + "aa"), + vf.createIRI(Utility.EXAMPLE_NAMESPACE + "o"), + vf.createIRI(Utility.EXAMPLE_NAMESPACE + "aa3"), + vf.createIRI(Utility.EXAMPLE_NAMESPACE + "graph2") + )); + conn.commit(); + } + } + } + MergeRunnableStopPoint.STEP2_START.debugUnlockTest(); + MergeRunnableStopPoint.STEP2_END.debugWaitForEvent(); + { + System.out.println("ADD merge 2"); + { + try (SailRepositoryConnection conn = sparqlRepository.getConnection()) { + conn.begin(); + conn.add(vf.createStatement( + vf.createIRI(Utility.EXAMPLE_NAMESPACE + "aa"), + vf.createIRI(Utility.EXAMPLE_NAMESPACE + "o"), + vf.createIRI(Utility.EXAMPLE_NAMESPACE + "aa4"), + vf.createIRI(Utility.EXAMPLE_NAMESPACE + "graph2") + )); + conn.commit(); + } + try (SailRepositoryConnection conn = sparqlRepository.getConnection()) { + conn.begin(); + conn.add(vf.createStatement( + vf.createIRI(Utility.EXAMPLE_NAMESPACE + "aa"), + vf.createIRI(Utility.EXAMPLE_NAMESPACE + "o"), + vf.createIRI(Utility.EXAMPLE_NAMESPACE + "aa3"), + vf.createIRI(Utility.EXAMPLE_NAMESPACE + "graph2") + )); + conn.commit(); + } + System.out.println("REM 6"); + try (SailRepositoryConnection conn = sparqlRepository.getConnection()) { + conn.begin(); + conn.remove( + vf.createIRI(Utility.EXAMPLE_NAMESPACE + "ss"), + vf.createIRI(Utility.EXAMPLE_NAMESPACE + "p"), + vf.createIRI(Utility.EXAMPLE_NAMESPACE + "aa6"), + (Resource) null); + conn.commit(); + } + } + } + MergeRunnableStopPoint.STEP2_END.debugUnlockTest(); + + MergeRunnable.debugWaitMerge(); + { + try (SailRepositoryConnection conn = sparqlRepository.getConnection()) { + try (RepositoryResult st = conn.getStatements(null, null, null, false)) { + st.stream().forEach(EndpointStoreGraphTest::printStmt); + } + } + } + System.out.println("triples: " + endpointStore.getHdt().getTriples().getNumberOfElements()); + endpointStore.getHdt().searchAll().forEachRemaining(System.out::println); + } finally { + sparqlRepository.shutDown(); + } + + } +} diff --git a/qendpoint-store/src/test/java/com/the_qa_company/qendpoint/store/MergeRestartTest.java b/qendpoint-store/src/test/java/com/the_qa_company/qendpoint/store/MergeRestartTest.java index 98301a70..63561da6 100644 --- a/qendpoint-store/src/test/java/com/the_qa_company/qendpoint/store/MergeRestartTest.java +++ b/qendpoint-store/src/test/java/com/the_qa_company/qendpoint/store/MergeRestartTest.java @@ -1,16 +1,23 @@ package com.the_qa_company.qendpoint.store; +import com.the_qa_company.qendpoint.core.compact.bitmap.MultiLayerBitmapWrapper; +import com.the_qa_company.qendpoint.core.enums.TripleComponentOrder; import com.the_qa_company.qendpoint.utils.BitArrayDisk; import org.apache.commons.io.FileUtils; +import org.eclipse.rdf4j.common.iteration.CloseableIteration; +import org.eclipse.rdf4j.model.Resource; import org.eclipse.rdf4j.model.Statement; import org.eclipse.rdf4j.model.ValueFactory; import org.eclipse.rdf4j.repository.RepositoryConnection; import org.eclipse.rdf4j.repository.RepositoryException; import org.eclipse.rdf4j.repository.RepositoryResult; import org.eclipse.rdf4j.repository.sail.SailRepository; +import org.eclipse.rdf4j.repository.util.Connections; +import org.eclipse.rdf4j.repository.util.Repositories; import org.eclipse.rdf4j.rio.RDFFormat; import org.eclipse.rdf4j.rio.RDFWriter; import org.eclipse.rdf4j.rio.Rio; +import org.eclipse.rdf4j.sail.NotifyingSailConnection; import org.eclipse.rdf4j.sail.memory.model.MemValueFactory; import org.junit.After; import org.junit.Assert; @@ -27,36 +34,58 @@ import com.the_qa_company.qendpoint.core.options.HDTOptionsKeys; import com.the_qa_company.qendpoint.core.triples.IteratorTripleString; import com.the_qa_company.qendpoint.core.util.io.Closer; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; import java.io.Closeable; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; import java.nio.file.FileVisitResult; import java.nio.file.FileVisitor; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import java.nio.file.attribute.BasicFileAttributes; +import java.util.Collection; +import java.util.List; import java.util.function.BiConsumer; +import java.util.function.Consumer; import static java.lang.String.format; import static org.junit.Assert.fail; +@RunWith(Parameterized.class) public class MergeRestartTest { + @Parameterized.Parameters(name = "quads: {0}") + public static Collection params() { + return List.of(true, false); + } private static final Logger logger = LoggerFactory.getLogger(MergeRestartTest.class); private static final File HALT_TEST_DIR = new File("tests", "halt_test_dir"); @Rule public TemporaryFolder tempDir = TemporaryFolder.builder().assureDeletion().build(); HDTOptions spec; + @Parameterized.Parameter + public boolean quadTest; @Before public void setUp() { - spec = HDTOptions.of(HDTOptionsKeys.TEMP_DICTIONARY_IMPL_KEY, - HDTOptionsKeys.TEMP_DICTIONARY_IMPL_VALUE_MULT_HASH, HDTOptionsKeys.DICTIONARY_TYPE_KEY, - HDTOptionsKeys.DICTIONARY_TYPE_VALUE_MULTI_OBJECTS_LANG); + if (quadTest) { + spec = HDTOptions.of(HDTOptionsKeys.TEMP_DICTIONARY_IMPL_KEY, + HDTOptionsKeys.TEMP_DICTIONARY_IMPL_VALUE_HASH_QUAD, HDTOptionsKeys.DICTIONARY_TYPE_KEY, + HDTOptionsKeys.DICTIONARY_TYPE_VALUE_MULTI_OBJECTS_LANG_QUAD); + } else { + spec = HDTOptions.of(HDTOptionsKeys.TEMP_DICTIONARY_IMPL_KEY, + HDTOptionsKeys.TEMP_DICTIONARY_IMPL_VALUE_MULT_HASH, HDTOptionsKeys.DICTIONARY_TYPE_KEY, + HDTOptionsKeys.DICTIONARY_TYPE_VALUE_MULTI_OBJECTS_LANG); + } // set the MergeRunnable in test mode MergeRunnableStopPoint.debug = true; MergeRunnableStopPoint.unlockAll(); @@ -459,86 +488,14 @@ private void mergeRestartTest2(MergeRunnableStopPoint point, File root, Closer c executeTestCount(countFile, endpointStore2, store2); } - /** - * basic synced files/value class - */ - private static class FileStore { - File root1; - File root2; - boolean switchValue = false; - - public FileStore(File root1, File root2) { - this.root1 = root1; - this.root2 = root2; - } - - /** - * switch the file - */ - public synchronized void switchValue() { - switchValue = !switchValue; - } - - /** - * @return the root file - */ - public synchronized File getRoot() { - return switchValue ? root2 : root1; - } - - /** - * @return the root store dir - */ - public synchronized File getHdtStore() { - return new File(getRoot(), "hdt-store"); - } - } - public void mergeRestartTest(MergeRunnableStopPoint stopPoint) throws IOException, InterruptedException { MergeRunnableStopPoint.disableRequest = false; MergeRunnableStopPoint.unlockAll(); File testRoot = tempDir.newFolder(); File root1 = new File(testRoot, "root1"); File root2 = new File(testRoot, "root2"); - Thread knowledgeThread = null; try (Closer closer = Closer.of()) { // create a store to tell which dir we are using - FileStore store = new FileStore(root1, root2); - knowledgeThread = new Thread(() -> { - // lock the points we need, this is done before and after the - // crash, - // so we don't have to check - // if this is before or after the stop point - MergeRunnableStopPoint.STEP1_TEST_BITMAP1.debugLock(); - MergeRunnableStopPoint.STEP1_TEST_BITMAP1.debugLockTest(); - MergeRunnableStopPoint.STEP1_TEST_BITMAP2.debugLock(); - MergeRunnableStopPoint.STEP1_TEST_BITMAP2.debugLockTest(); - - MergeRunnableStopPoint.STEP1_TEST_BITMAP1.debugWaitForEvent(); - { - // log the bitmap state at STEP1_TEST_BITMAP1 - try (BitArrayDisk bitmap = new BitArrayDisk(4, - store.getHdtStore().getAbsolutePath() + "/triples-delete.arr")) { - logger.debug("STEP1_TEST_BITMAP1: {}", bitmap.printInfo()); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - MergeRunnableStopPoint.STEP1_TEST_BITMAP1.debugUnlockTest(); - - MergeRunnableStopPoint.STEP1_TEST_BITMAP2.debugWaitForEvent(); - { - // log the bitmap state at STEP1_TEST_BITMAP2 - try (BitArrayDisk bitmap = new BitArrayDisk(4, - store.getHdtStore().getAbsolutePath() + "/triples-delete.arr")) { - logger.debug("STEP1_TEST_BITMAP2: {}", bitmap.printInfo()); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - MergeRunnableStopPoint.STEP1_TEST_BITMAP2.debugUnlockTest(); - }, "KnowledgeThread"); - knowledgeThread.start(); // start the first phase mergeRestartTest1(stopPoint, root1, closer); @@ -549,15 +506,20 @@ public void mergeRestartTest(MergeRunnableStopPoint stopPoint) throws IOExceptio // switch the directory we are using swapDir(root1, root2); - store.switchValue(); + // start the second phase mergeRestartTest2(stopPoint, root2, closer); + } catch (Throwable t) { + try { + FileUtils.deleteDirectory(testRoot); + } catch (IOException e) { + t.addSuppressed(e); + } + throw t; } finally { - FileUtils.deleteDirectory(testRoot); - assert knowledgeThread != null; - knowledgeThread.interrupt(); MergeRunnableStopPoint.disableRequest = false; } + FileUtils.deleteDirectory(testRoot); } /** @@ -930,6 +892,10 @@ private void executeTestRemoveHDT(File out, SailRepository repo, int id, int cou connection.remove(stm); }); writeInfoCount(out, count); + try (OutputStream buff = new BufferedOutputStream(new FileOutputStream(out.getAbsolutePath() + ".delta", true))) { + buff.write(("REM HDT " + id + " / " + count + "\n").getBytes(StandardCharsets.UTF_8)); + } + } /** @@ -950,6 +916,9 @@ private void executeTestRemoveRDF(File out, SailRepository repo, int id, int cou connection.remove(stm); }); writeInfoCount(out, count); + try (OutputStream buff = new BufferedOutputStream(new FileOutputStream(out.getAbsolutePath() + ".delta", true))) { + buff.write(("REM RDF " + id + " / " + count + "\n").getBytes(StandardCharsets.UTF_8)); + } } /** @@ -970,6 +939,9 @@ private void executeTestAddRDF(File out, SailRepository repo, int id, int count) connection.add(stm); }); writeInfoCount(out, count); + try (OutputStream buff = new BufferedOutputStream(new FileOutputStream(out.getAbsolutePath() + ".delta", true))) { + buff.write(("ADD RDF " + id + " / " + count + "\n").getBytes(StandardCharsets.UTF_8)); + } } /** @@ -991,6 +963,9 @@ private void executeTestAddHDT(File out, SailRepository repo, int id, int count) connection.add(stm); }); writeInfoCount(out, count); + try (OutputStream buff = new BufferedOutputStream(new FileOutputStream(out.getAbsolutePath() + ".delta", true))) { + buff.write(("ADD HDT " + id + " / " + count + "\n").getBytes(StandardCharsets.UTF_8)); + } } /** @@ -1003,15 +978,66 @@ private void executeTestAddHDT(File out, SailRepository repo, int id, int count) */ private void executeTestCount(File out, SailRepository repo, EndpointStore store) throws IOException { int excepted = getInfoCount(out); - if (store != null) { - printHDT(store.getHdt()); - } openConnection(repo, (vf, connection) -> { int count = count(connection); + System.out.println("values:"); + if (count != excepted) { - try (RepositoryResult query = connection.getStatements(null, null, null)) { - query.forEach(System.out::println); + try (RepositoryResult query = connection.getStatements(null, null, null, false)) { + query.forEach(EndpointStoreGraphTest::printStmt); + } + if (store != null) { + System.out.println("curr ns: " + (store.switchStore ? "2" : "1")); + System.out.println("ns1:"); + Consumer printer = stmt -> { + HDTConverter converter = store.getHdtConverter(); + EndpointStoreGraphTest.printStmt(converter.rdf4ToHdt(stmt)); + }; + try (NotifyingSailConnection conn = store.getNativeStoreA().getConnection()) { + try (CloseableIteration it = conn.getStatements(null, null, null, false)) { + it.forEachRemaining(printer); + } } + System.out.println("ns2:"); + try (NotifyingSailConnection conn = store.getNativeStoreB().getConnection()) { + try (CloseableIteration it = conn.getStatements(null, null, null, false)) { + it.forEachRemaining(printer); + } + } + System.out.println("hdt:"); + try { + store.getHdt().searchAll().forEachRemaining(System.out::println); + } catch (NotFoundException e) { + throw new RuntimeException(e); + } + System.out.println("bitmaps:"); + + long size = store.getHdt().getTriples().getNumberOfElements(); + long graphs = store.getGraphsCount(); + for (int i = 0; i < store.getDeleteBitMaps().length; i++) { + TripleComponentOrder order = TripleComponentOrder.values()[i]; + MultiLayerBitmapWrapper.MultiLayerModBitmapWrapper bm = store.getDeleteBitMaps()[i]; + if (bm == null) { + continue; + } + + System.out.println(order); + for (int g = 0; g < graphs; g++) { + System.out.print("g" + (g + 1) + ": "); + for (long t = 0; t < size; t++) { + System.out.print(bm.access(g, t) ? "1" : "0"); + } + System.out.println(); + } + } + } + + System.out.println("operations:"); + try { + System.out.println(Files.readString(Path.of(out.getAbsolutePath() + ".delta"))); + } catch (IOException e) { + throw new RuntimeException(e); + } fail(format("count:%d != excepted:%d : Invalid test count", count, excepted)); } });