Skip to content

Commit

Permalink
GH-494 Fix HDT file loading issues
Browse files Browse the repository at this point in the history
  • Loading branch information
ate47 committed Oct 22, 2024
1 parent 63cb547 commit 9e68f57
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import org.eclipse.rdf4j.rio.Rio;
import org.eclipse.rdf4j.sail.NotifyingSail;
import com.the_qa_company.qendpoint.core.enums.CompressionType;
import com.the_qa_company.qendpoint.core.enums.RDFNotation;
import com.the_qa_company.qendpoint.core.exceptions.ParserException;
import com.the_qa_company.qendpoint.core.hdt.HDT;
import com.the_qa_company.qendpoint.core.hdt.HDTManager;
Expand All @@ -43,7 +42,6 @@
import java.io.InputStream;
import java.io.OutputStream;
import java.io.PrintStream;
import java.io.PrintWriter;
import java.net.URI;
import java.nio.file.Files;
import java.nio.file.Path;
Expand Down Expand Up @@ -463,8 +461,9 @@ public LoadFileResult loadFile(InputStream input, String filename) throws IOExce
if (sparqlRepository.getOptions().getStorageMode().equals(SailCompilerSchema.ENDPOINTSTORE_STORAGE)) {
shutdown();

RDFFormat format = Rio.getParserFormatForFileName(filename)
.orElseThrow(() -> new ServerWebInputException("file format not supported " + filename));
RDFFormat format = filename.toLowerCase().endsWith(".hdt") ? RDFFormat.HDT
: Rio.getParserFormatForFileName(filename).orElseThrow(
() -> new ServerWebInputException("file format not supported " + filename));

EndpointStore endpoint = (EndpointStore) compiledSail.getSource();
EndpointFiles files = endpoint.getEndpointFiles();
Expand Down Expand Up @@ -524,7 +523,7 @@ public LoadFileResult loadFile(InputStream input, String filename) throws IOExce
} else {
shutdown();
initializeEndpointStore(false);
sendUpdates(input, baseURI, filename);
sendUpdates(input, filename);
}
try {
sparqlRepository.reindexLuceneSails();
Expand Down Expand Up @@ -575,7 +574,7 @@ public Map<String, String> getPrefixes() {
return prefixes;
}

private void sendUpdates(InputStream inputStream, String baseURI, String filename) throws IOException {
private void sendUpdates(InputStream inputStream, String filename) throws IOException {
StopWatch timeWatch = new StopWatch();

// uncompress the file if required
Expand Down Expand Up @@ -613,43 +612,6 @@ private void sendUpdates(InputStream inputStream, String baseURI, String filenam
logger.info("NT file loaded in {}", timeWatch.stopAndShow());
}

private void generateHDT(Iterator<TripleString> it, String baseURI, HDTOptions spec, String hdtOutput)
throws IOException {
if (sparqlRepository.getOptions().getPassMode().equals(SailCompilerSchema.HDT_TWO_PASS_MODE)) {
// dump the file to the disk to allow 2 passes
Path tempNTFile = Paths.get(hdtOutput + "-tmp.nt");
logger.info("Create TEMP NT file '{}'", tempNTFile);
try {
try (PrintWriter stream = new PrintWriter(tempNTFile.toFile())) {
while (it.hasNext()) {
TripleString ts = it.next();
ts.dumpNtriple(stream);
}
}
logger.info("NT file created, generating HDT...");
try {
HDT hdtDump = HDTManager.generateHDT(tempNTFile.toFile().getAbsolutePath(), baseURI,
RDFNotation.NTRIPLES, spec, null);
hdtDump.saveToHDT(hdtOutput, null);
hdtDump.close();
} catch (ParserException e) {
throw new IOException("Can't generate HDT", e);
}
} finally {
Files.deleteIfExists(tempNTFile);
}
} else {
// directly use the TripleString stream to generate the HDT
try {
HDT hdtDump = HDTManager.generateHDT(it, baseURI, spec, null);
hdtDump.saveToHDT(hdtOutput, null);
hdtDump.close();
} catch (ParserException e) {
throw new IOException("Can't generate HDT", e);
}
}
}

public int getPort() {
return port;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
package com.the_qa_company.qendpoint.controller;

import com.the_qa_company.qendpoint.Application;
import com.the_qa_company.qendpoint.core.enums.RDFNotation;
import com.the_qa_company.qendpoint.core.exceptions.ParserException;
import com.the_qa_company.qendpoint.core.hdt.HDT;
import com.the_qa_company.qendpoint.core.hdt.HDTManager;
import com.the_qa_company.qendpoint.core.listener.ProgressListener;
import com.the_qa_company.qendpoint.core.options.HDTOptions;
import com.the_qa_company.qendpoint.store.EndpointStore;
import com.the_qa_company.qendpoint.utils.LargeFakeDataSetStreamSupplier;
import com.the_qa_company.qendpoint.utils.RDFStreamUtils;
Expand Down Expand Up @@ -60,7 +66,9 @@ public class FileUploadTest {

@Parameterized.Parameters(name = "{0}")
public static Collection<Object> params() {
return new ArrayList<>(RDFParserRegistry.getInstance().getKeys());
ArrayList<Object> list = new ArrayList<>(RDFParserRegistry.getInstance().getKeys());
list.add(RDFFormat.HDT);
return list;
}

@Autowired
Expand All @@ -69,7 +77,7 @@ public static Collection<Object> params() {
private final String fileName;
private final RDFFormat format;

public FileUploadTest(RDFFormat format) throws IOException {
public FileUploadTest(RDFFormat format) throws IOException, ParserException {
this.format = format;
RDFFormat originalFormat = Rio.getParserFormatForFileName(COKTAILS_NT).orElseThrow();

Expand All @@ -79,9 +87,16 @@ public FileUploadTest(RDFFormat format) throws IOException {
Path RDFFile = testDir.resolve(COKTAILS_NT + "." + format.getDefaultFileExtension());
if (!Files.exists(RDFFile)) {
try (OutputStream os = new FileOutputStream(RDFFile.toFile()); InputStream is = stream(COKTAILS_NT)) {
RDFWriter writer = Rio.createWriter(format, os);
parser.setRDFHandler(noBNode(writer));
parser.parse(is);
if (format == RDFFormat.HDT) {
try (HDT hdt = HDTManager.generateHDT(is, "http://example.org/#", RDFNotation.TURTLE,
HDTOptions.empty(), ProgressListener.ignore())) {
hdt.saveToHDT(os);
}
} else {
RDFWriter writer = Rio.createWriter(format, os);
parser.setRDFHandler(noBNode(writer));
parser.parse(is);
}
}
}

Expand Down Expand Up @@ -127,18 +142,6 @@ private InputStream streamOut(String file) throws FileNotFoundException {
return new FileInputStream(file);
}

private long fileSize(String file) throws IOException {
InputStream testNt = streamOut(file);
byte[] buff = new byte[1024];

long r;
long size = 0;
while ((r = testNt.read(buff)) != -1) {
size += r;
}
return size;
}

private String clearSpaces(String text) {
return text.matches("(\\s|[\\n\\r])*") ? "" : text;
}
Expand Down Expand Up @@ -222,6 +225,8 @@ public void loadTest() throws IOException {
@Test
@Ignore("large test")
public void loadLargeTest() throws IOException {
if (format == RDFFormat.HDT)
return;
long size = Sparql.getMaxChunkSize() * 10;
LargeFakeDataSetStreamSupplier supplier = new LargeFakeDataSetStreamSupplier(size, 42);
sparql.loadFile(supplier.createRDFStream(format), "fake." + format.getDefaultFileExtension());
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.the_qa_company.qendpoint.utils;

import com.the_qa_company.qendpoint.core.hdt.HDT;
import com.the_qa_company.qendpoint.core.hdt.HDTManager;
import com.the_qa_company.qendpoint.core.triples.TripleString;
import com.the_qa_company.qendpoint.core.util.LiteralsUtils;
import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
Expand All @@ -9,6 +11,7 @@
import org.eclipse.rdf4j.model.Statement;
import org.eclipse.rdf4j.model.Value;
import org.eclipse.rdf4j.model.ValueFactory;
import org.eclipse.rdf4j.model.impl.SimpleValueFactory;
import org.eclipse.rdf4j.rio.RDFFormat;
import org.eclipse.rdf4j.rio.RDFHandler;
import org.eclipse.rdf4j.rio.RDFHandlerException;
Expand All @@ -17,6 +20,9 @@

import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.util.Iterator;
import java.util.function.Consumer;
import java.util.zip.GZIPInputStream;
Expand Down Expand Up @@ -61,6 +67,22 @@ public static InputStream uncompressedStream(InputStream stream, String filename
*/
public static void readRDFStream(InputStream stream, RDFFormat format, boolean keepBNode,
Consumer<Statement> statementConsumer) throws IOException {
if (format == RDFFormat.HDT) {
// write HDT into a temp file, map it and iterate over it
Path path = Files.createTempFile(RDFStreamUtils.class.getName(), ".hdt");
try {
Files.copy(stream, path, StandardCopyOption.REPLACE_EXISTING);
try (HDT hdt = HDTManager.mapHDT(path)) {
for (TripleString ts : hdt) {
SimpleValueFactory vf = SimpleValueFactory.getInstance();
statementConsumer.accept(convertStatement(vf, ts));
}
}
} finally {
Files.deleteIfExists(path);
}
return;
}
RDFParser parser = Rio.createParser(format);
parser.setPreserveBNodeIDs(keepBNode);
parser.setRDFHandler(new RDFHandler() {
Expand Down

0 comments on commit 9e68f57

Please sign in to comment.