Skip to content

Commit

Permalink
Merge pull request #463 from the-qa-company/GH-462-DeltaFile
Browse files Browse the repository at this point in the history
GH-462 add support for delta file
  • Loading branch information
ate47 authored Mar 26, 2024
2 parents 85b7426 + 88f426b commit 7a56479
Show file tree
Hide file tree
Showing 11 changed files with 384 additions and 2 deletions.
2 changes: 2 additions & 0 deletions qendpoint-cli/bin/qepSearch.ps1
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ param(
[Parameter()]
[Switch]
$memory,
[Switch]
$nocrc,
[Parameter(ValueFromRemainingArguments, Position = 0)]
[string[]]
$OtherParams
Expand Down
28 changes: 28 additions & 0 deletions qendpoint-core/datastructures/deltafile.abs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// delta file structure version 0, com.the_qa_company.qendpoint.core.rdf.parsers.RDFDeltaFileParser

WikidataChangesFlavor : byte {
DUMP = 0x63,
SIMPLE = 0x64,
FULL = 0x65
}

DeltaFile {
struct {
string magic = "$DltF0\n\r";
long count;
long start;
long end;
WikidataChangesFlavor flavor;
byte __pad[3];
crc8 crc;
} header;
struct {
vlong sizeName;
byte buff[sizeName];
vlong sizeBuff;
byte buff[sizeBuff];
} elements[header.count];
crc32 crc;
}

file(DeltaFile, ".*\\.df");
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,11 @@ public enum RDFNotation {
*/
JSONLD,

/**
* Delta file
*/
DELTAFILE,

/**
* List of URIs with RDF content in other RDF Formats
*/
Expand Down Expand Up @@ -148,6 +153,9 @@ public static RDFNotation parse(String str) {
case "trix" -> {
return TRIX;
}
case "df", "deltafile" -> {
return DELTAFILE;
}
}
throw new IllegalArgumentException();
}
Expand Down Expand Up @@ -195,6 +203,8 @@ public static RDFNotation guess(String fileName) throws IllegalArgumentException
return TRIG;
} else if (str.endsWith("trix")) {
return TRIX;
} else if (str.endsWith("df") || str.endsWith("deltafile")) {
return DELTAFILE;
}

throw new IllegalArgumentException("Could not guess the format for " + fileName);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package com.the_qa_company.qendpoint.core.enums;

import java.util.HashMap;

public enum WikidataChangesFlavor {
/**
* Excludes descriptions of entities referred to in the data
*/
DUMP("dump", true, "Excludes descriptions of entities referred to in the data.", (byte) 0x63),
/**
* Provides only truthy statements, along with sitelinks and version
* information.
*/
SIMPLE("simple", true, "Provides only truthy statements, along with sitelinks and version information.",
(byte) 0x64),
/**
* An argument of "full" returns all data.
*/
FULL("full", false, "An argument of \"full\" returns all data.", (byte) 0x65);

private static final HashMap<Byte, WikidataChangesFlavor> FLAVOR_HASH_MAP = new HashMap<>();

static {
for (WikidataChangesFlavor fl : values()) {
FLAVOR_HASH_MAP.put(fl.id, fl);
}
}

public final String title;
public final boolean shouldSpecify;
public final String description;
public final byte id;

WikidataChangesFlavor(String title, boolean shouldSpecify, String description, byte id) {
this.title = title;
this.shouldSpecify = shouldSpecify;
this.description = description;
this.id = id;
}

/**
* @return the default flavor
*/
public static WikidataChangesFlavor getDefaultFlavor() {
return FULL;
}

/**
* get a flavor from its id
*
* @param id id
* @return flavor or null
*/
public static WikidataChangesFlavor getFromId(byte id) {
return FLAVOR_HASH_MAP.get(id);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ public ParserException(String message) {
super(message);
}

public ParserException(String message, Throwable e) {
super(message, e);
}

public ParserException(String message, String line, int location) {
this(createMessage(message, line, location));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.the_qa_company.qendpoint.core.hdt.HDTManager;
import com.the_qa_company.qendpoint.core.rdf.RDFFluxStop;
import com.the_qa_company.qendpoint.core.util.Profiler;
import com.the_qa_company.qendpoint.core.util.UnicodeEscape;

import java.io.File;
import java.io.IOException;
Expand Down Expand Up @@ -617,7 +618,7 @@ default void write(Writer w, boolean withComment) throws IOException {
w.write("# " + opt.getKeyInfo().desc() + "\n# Type: " + opt.getKeyInfo().type().getTitle() + "\n");
}
}
w.write(key + "=" + value + "\n");
w.write(key + "=" + UnicodeEscape.escapeString(value) + "\n");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,17 @@ public class HDTOptionsKeys {
*/
@Key(type = Key.Type.BOOLEAN, desc = "Use the canonical NT file parser, removing checks")
public static final String NT_SIMPLE_PARSER_KEY = "parser.ntSimpleParser";
/**
* No crc check with deltafile reader, default to false. Boolean value
*/
@Key(type = Key.Type.BOOLEAN, desc = "No crc check with deltafile reader")
public static final String PARSER_DELTAFILE_NO_CRC = "parser.deltafile.nocrc";
/**
* No exception, only a stop with deltafile reader, default to false.
* Boolean value
*/
@Key(type = Key.Type.BOOLEAN, desc = "No exception, only a stop with deltafile reader")
public static final String PARSER_DELTAFILE_NO_EXCEPTION = "parser.deltafile.noExceptionOnlyStop";
/**
* Key for setting the maximum amount of file loaded with the directory
* parser, 1 for no async parsing, 0 for the number of processors, default
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.the_qa_company.qendpoint.core.exceptions.NotImplementedException;
import com.the_qa_company.qendpoint.core.options.HDTOptions;
import com.the_qa_company.qendpoint.core.options.HDTOptionsKeys;
import com.the_qa_company.qendpoint.core.rdf.parsers.RDFDeltaFileParser;
import com.the_qa_company.qendpoint.core.rdf.parsers.RDFParserDir;
import com.the_qa_company.qendpoint.core.rdf.parsers.RDFParserHDT;
import com.the_qa_company.qendpoint.core.rdf.parsers.RDFParserList;
Expand Down Expand Up @@ -72,6 +73,8 @@ public static RDFParserCallback getParserCallback(RDFNotation notation, HDTOptio
return new RDFParserRAR(spec);
case HDT:
return new RDFParserHDT();
case DELTAFILE:
return new RDFDeltaFileParser(spec);
case JSONLD:
// FIXME: Implement
throw new NotImplementedException("RDFParserJSONLD not implemented");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
package com.the_qa_company.qendpoint.core.rdf.parsers;

import com.the_qa_company.qendpoint.core.enums.RDFNotation;
import com.the_qa_company.qendpoint.core.enums.WikidataChangesFlavor;
import com.the_qa_company.qendpoint.core.exceptions.ParserException;
import com.the_qa_company.qendpoint.core.iterator.utils.FetcherExceptionIterator;
import com.the_qa_company.qendpoint.core.listener.ProgressListener;
import com.the_qa_company.qendpoint.core.options.HDTOptions;
import com.the_qa_company.qendpoint.core.options.HDTOptionsKeys;
import com.the_qa_company.qendpoint.core.rdf.RDFParserCallback;
import com.the_qa_company.qendpoint.core.rdf.RDFParserFactory;
import com.the_qa_company.qendpoint.core.util.crc.CRC32;
import com.the_qa_company.qendpoint.core.util.crc.CRC8;
import com.the_qa_company.qendpoint.core.util.crc.CRCInputStream;
import com.the_qa_company.qendpoint.core.util.io.IOUtil;

import java.io.ByteArrayInputStream;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.time.Instant;
import java.util.Arrays;
import java.util.zip.GZIPInputStream;

import static java.nio.charset.StandardCharsets.US_ASCII;

public class RDFDeltaFileParser implements RDFParserCallback {
public static final byte[] COOKIE = "$DltF0\n\r".getBytes(US_ASCII);

public record DeltaFileComponent(String fileName, byte[] data) {}

public static class DeltaFileReader extends FetcherExceptionIterator<DeltaFileComponent, IOException>
implements Closeable {
private final long count;
private long id;
private final InputStream stream;
private final long start;
private final long end;
private final WikidataChangesFlavor flavor;
private boolean noExceptionOnlyStop;

public DeltaFileReader(InputStream is, HDTOptions spec) throws IOException {
boolean nocrc = spec.getBoolean(HDTOptionsKeys.PARSER_DELTAFILE_NO_CRC, false);
noExceptionOnlyStop = spec.getBoolean(HDTOptionsKeys.PARSER_DELTAFILE_NO_EXCEPTION, false);

stream = nocrc ? is : new CRCInputStream(is, new CRC8());

if (!Arrays.equals(stream.readNBytes(8), COOKIE)) {
throw new IOException("Bad cookie");
}

this.count = IOUtil.readLong(stream);
this.start = IOUtil.readLong(stream);
this.end = IOUtil.readLong(stream);
this.flavor = WikidataChangesFlavor.getFromId((byte) stream.read());
if (flavor == null) {
throw new IOException("Bad flavor");
}
stream.skipNBytes(3);

if (!nocrc) {
CRCInputStream crcis = (CRCInputStream) stream;
if (!crcis.readCRCAndCheck()) {
throw new IOException("Bad header crc");
}
crcis.setCRC(new CRC32());
} else {
stream.skipNBytes(1); // skip header crc
}
}

public Instant getStart() {
return Instant.ofEpochSecond(start / 1000000, (start % 1000000) * 1000);
}

public Instant getEnd() {
return Instant.ofEpochSecond(end / 1000000, (end % 1000000) * 1000);
}

@Override
public long getSize() {
return count;
}

public void setNoExceptionOnlyStop(boolean noExceptionOnlyStop) {
this.noExceptionOnlyStop = noExceptionOnlyStop;
}

public WikidataChangesFlavor getFlavor() {
return flavor;
}

@Override
protected DeltaFileComponent getNext() throws IOException {
if (id >= count) {
if (id == count) { // last
id++;
try {
if (stream instanceof CRCInputStream crcis) {
if (!crcis.readCRCAndCheck()) {
throw new IOException("Bad data crc!");
}
} else {
stream.readNBytes(4); // read crc
}
} catch (Throwable t) {
if (noExceptionOnlyStop) {
return null;
}
throw t;
}
}
return null;
}
id++;

try {
// name
byte[] name = IOUtil.readSizedBuffer(stream, ProgressListener.ignore()); // title
// +
// .ttl?
// buffer
byte[] bytes = IOUtil.readSizedBuffer(stream, ProgressListener.ignore());

return new DeltaFileComponent(new String(name, US_ASCII), bytes);
} catch (Throwable e) {
if (noExceptionOnlyStop) {
return null;
}
throw e;
}
}

@Override
public void close() throws IOException {
stream.close();
}
}

private final HDTOptions spec;

public RDFDeltaFileParser(HDTOptions spec) {
this.spec = spec;
}

@Override
public void doParse(String fileName, String baseUri, RDFNotation notation, boolean keepBNode, RDFCallback callback)
throws ParserException {
try (InputStream is = IOUtil.getFileInputStream(fileName)) {
doParse(is, baseUri, notation, keepBNode, callback);
} catch (IOException e) {
throw new ParserException(e);
}
}

@Override
public void doParse(InputStream in, String baseUri, RDFNotation notation, boolean keepBNode, RDFCallback callback)
throws ParserException {
try {
// read df file
DeltaFileReader reader = new DeltaFileReader(in, spec);
while (reader.hasNext()) {
DeltaFileComponent next = reader.next();
if (next.data.length == 0) {
continue; // deleted
}
RDFNotation not = RDFNotation.guess(next.fileName);
RDFParserCallback parser = RDFParserFactory.getParserCallback(not, spec);
try {
// read the next byte information
parser.doParse(new GZIPInputStream(new ByteArrayInputStream(next.data)), baseUri, not, keepBNode,
callback);
} catch (IOException e) {
throw new ParserException("Error when reading " + next.fileName + " size: " + next.data.length, e);
}
}
} catch (IOException e) {
throw new ParserException(e);
}
}

}
Loading

0 comments on commit 7a56479

Please sign in to comment.