From 74673356aed83b44ee10db93174d3ab1bf672cfa Mon Sep 17 00:00:00 2001 From: Matthew Blissett Date: Tue, 2 Feb 2021 18:42:49 +0100 Subject: [PATCH] WIP: Upload any download to Azure or GCS. Split Avro-format downloads (simple Avro) into chunks as they are uploaded. Upload contents of Zip files (very rough WIP) --- occurrence-cli/pom.xml | 54 ++ .../avro/file/ParallelAvroSplitter.java | 59 +++ .../apache/avro/file/RawDataFileWriter.java | 487 ++++++++++++++++++ .../apache/avro/file/SerialAvroSplitter.java | 88 ++++ .../cli/download/CloudUploaderCallback.java | 112 ++++ .../cli/download/CloudUploaderCommand.java | 28 + .../download/CloudUploaderConfiguration.java | 56 ++ .../cli/download/CloudUploaderService.java | 86 ++++ .../cli/download/UploaderToAzure.java | 108 ++++ .../cli/download/UploaderToGCS.java | 71 +++ .../cli/download/transfer/UploadToAzure.java | 52 ++ .../transfer/UploadToAzureConfiguration.java | 42 ++ .../cloud-upload-simple-avro.json | 36 ++ pom.xml | 2 +- 14 files changed, 1280 insertions(+), 1 deletion(-) create mode 100644 occurrence-cli/src/main/java/org/apache/avro/file/ParallelAvroSplitter.java create mode 100644 occurrence-cli/src/main/java/org/apache/avro/file/RawDataFileWriter.java create mode 100644 occurrence-cli/src/main/java/org/apache/avro/file/SerialAvroSplitter.java create mode 100644 occurrence-cli/src/main/java/org/gbif/occurrence/cli/download/CloudUploaderCallback.java create mode 100644 occurrence-cli/src/main/java/org/gbif/occurrence/cli/download/CloudUploaderCommand.java create mode 100644 occurrence-cli/src/main/java/org/gbif/occurrence/cli/download/CloudUploaderConfiguration.java create mode 100644 occurrence-cli/src/main/java/org/gbif/occurrence/cli/download/CloudUploaderService.java create mode 100644 occurrence-cli/src/main/java/org/gbif/occurrence/cli/download/UploaderToAzure.java create mode 100644 occurrence-cli/src/main/java/org/gbif/occurrence/cli/download/UploaderToGCS.java create mode 100644 occurrence-cli/src/main/java/org/gbif/occurrence/cli/download/transfer/UploadToAzure.java create mode 100644 occurrence-cli/src/main/java/org/gbif/occurrence/cli/download/transfer/UploadToAzureConfiguration.java create mode 100644 occurrence-download/example-jobs/monthly-downloads/cloud-upload-simple-avro.json diff --git a/occurrence-cli/pom.xml b/occurrence-cli/pom.xml index 6e21aa55d..18c71f4ab 100644 --- a/occurrence-cli/pom.xml +++ b/occurrence-cli/pom.xml @@ -20,6 +20,12 @@ + 12.4.0 + 12.3.0 + 12.2.0 + 12.0.0-preview.6 + 16.1.0 + 1.113.4 23.0 2.4 @@ -115,6 +121,18 @@ + + + + com.google.cloud + libraries-bom + ${google-cloud.version} + pom + import + + + + com.beust @@ -256,6 +274,11 @@ variables + + org.apache.avro + avro + + org.kohsuke.metainf-services @@ -323,6 +346,37 @@ junit-jupiter-api test + + + + com.azure + azure-storage-blob + ${azure-storage-blob.version} + + + + com.azure + azure-storage-queue + ${azure-storage-queue.version} + + + + com.azure + azure-storage-file-share + ${azure-storage-file-share.version} + + + + com.azure + azure-storage-file-datalake + ${azure-storage-file-datalake.version} + + + + com.google.cloud + google-cloud-storage + ${google-cloud-storage.verison} + diff --git a/occurrence-cli/src/main/java/org/apache/avro/file/ParallelAvroSplitter.java b/occurrence-cli/src/main/java/org/apache/avro/file/ParallelAvroSplitter.java new file mode 100644 index 000000000..d60a14250 --- /dev/null +++ b/occurrence-cli/src/main/java/org/apache/avro/file/ParallelAvroSplitter.java @@ -0,0 +1,59 @@ +package org.apache.avro.file; + +import org.apache.avro.generic.GenericContainer; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumReader; +import org.apache.avro.reflect.ReflectDatumWriter; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.InputStream; + +/** + * Tool to split an Avro file into N chunks, preserving the schema. + * + * Usage: ParallelAvroSplitter filename outputFileFormat numberChunks + */ +public class ParallelAvroSplitter { + + public static void main(String... args) throws Exception{ + + InputStream is; + if (args[0].equals("-")) { + is = System.in; + } else { + is = new FileInputStream(args[0]); + } + + DatumReader datumReader = new GenericDatumReader<>(); + ReflectDatumWriter rdw = new ReflectDatumWriter<>(GenericContainer.class); + + try (DataFileStream dfr = new DataFileStream(is, datumReader)) { + + int files = Integer.parseInt(args[2]); + RawDataFileWriter[] dfws = new RawDataFileWriter[files]; + for (int i = 0; i < files; i++) { + FileOutputStream output = new FileOutputStream(String.format(args[1], i)); + dfws[i] = new RawDataFileWriter<>(rdw); + dfws[i].setCodec(CodecFactory.deflateCodec(6)); + dfws[i].setFlushOnEveryBlock(false); + dfws[i].create(dfr.getSchema(), output); + } + + int o = 0; + while (dfr.hasNextBlock()) { + DataFileStream.DataBlock nextBlockRaw = null; + nextBlockRaw = dfr.nextRawBlock(nextBlockRaw); + dfws[o%files].writeRawBlock(nextBlockRaw); + + o++; + } + + for (RawDataFileWriter dfw : dfws) { + dfw.close(); + } + } + } +} diff --git a/occurrence-cli/src/main/java/org/apache/avro/file/RawDataFileWriter.java b/occurrence-cli/src/main/java/org/apache/avro/file/RawDataFileWriter.java new file mode 100644 index 000000000..170bef692 --- /dev/null +++ b/occurrence-cli/src/main/java/org/apache/avro/file/RawDataFileWriter.java @@ -0,0 +1,487 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/* Copied from Avro: lang/java/avro/src/main/java/org/apache/avro/file/DataFileWriter.java + * with the addition of the writeRawBlock(DataBlock) method. + */ +package org.apache.avro.file; + +import java.io.BufferedOutputStream; +import java.io.ByteArrayOutputStream; +import java.io.Closeable; +import java.io.File; +import java.io.FilterOutputStream; +import java.io.Flushable; +import java.io.IOException; +import java.io.OutputStream; +import java.io.UnsupportedEncodingException; +import java.nio.ByteBuffer; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; + +import org.apache.avro.AvroRuntimeException; +import org.apache.avro.Schema; +import org.apache.avro.file.CodecFactory; +import org.apache.avro.file.DataFileConstants; +import org.apache.avro.file.DataFileReader; +import org.apache.avro.file.DataFileStream; +import org.apache.avro.file.DataFileStream.DataBlock; +import org.apache.avro.file.SeekableFileInput; +import org.apache.avro.file.SeekableInput; +import org.apache.avro.file.SyncableFileOutputStream; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.io.BinaryEncoder; +import org.apache.avro.io.DatumWriter; +import org.apache.avro.io.EncoderFactory; + +public class RawDataFileWriter implements Closeable, Flushable { + private Schema schema; + private DatumWriter dout; + + private OutputStream underlyingStream; + + private BufferedFileOutputStream out; + private BinaryEncoder vout; + + private final Map meta = new HashMap(); + + private long blockCount; // # entries in current block + + private NonCopyingByteArrayOutputStream buffer; + private BinaryEncoder bufOut; + + private byte[] sync; // 16 random bytes + private int syncInterval = DataFileConstants.DEFAULT_SYNC_INTERVAL; + + private boolean isOpen; + private Codec codec; + + private boolean flushOnEveryBlock = true; + + /** Construct a writer, not yet open. */ + public RawDataFileWriter(DatumWriter dout) { + this.dout = dout; + } + + private void assertOpen() { + if (!isOpen) throw new AvroRuntimeException("not open"); + } + private void assertNotOpen() { + if (isOpen) throw new AvroRuntimeException("already open"); + } + + /** + * Configures this writer to use the given codec. + * May not be reset after writes have begun. + */ + public RawDataFileWriter setCodec(CodecFactory c) { + assertNotOpen(); + this.codec = c.createInstance(); + setMetaInternal(DataFileConstants.CODEC, codec.getName()); + return this; + } + + /** + * Set the synchronization interval for this file, in bytes. + * Valid values range from 32 to 2^30 + * Suggested values are between 2K and 2M + * + * The stream is flushed by default at the end of each synchronization + * interval. + * + * If {@linkplain #setFlushOnEveryBlock(boolean)} is + * called with param set to false, then the block may not be flushed to the + * stream after the sync marker is written. In this case, + * the {@linkplain #flush()} must be called to flush the stream. + * + * Invalid values throw IllegalArgumentException + * + * @param syncInterval + * the approximate number of uncompressed bytes to write in each block + * @return + * this RawDataFileWriter + */ + public RawDataFileWriter setSyncInterval(int syncInterval) { + if (syncInterval < 32 || syncInterval > (1 << 30)) { + throw new IllegalArgumentException("Invalid syncInterval value: " + syncInterval); + } + this.syncInterval = syncInterval; + return this; + } + + /** Open a new file for data matching a schema with a random sync. */ + public RawDataFileWriter create(Schema schema, File file) throws IOException { + return create(schema, new SyncableFileOutputStream(file), null); + } + + /** Open a new file for data matching a schema with a random sync. */ + public RawDataFileWriter create(Schema schema, OutputStream outs) + throws IOException { + return create(schema, outs, null); + } + + /** Open a new file for data matching a schema with an explicit sync. */ + public RawDataFileWriter create(Schema schema, OutputStream outs, byte[] sync) + throws IOException { + assertNotOpen(); + + this.schema = schema; + setMetaInternal(DataFileConstants.SCHEMA, schema.toString()); + if (sync == null ) { + this.sync = generateSync(); + } else if (sync.length == 16) { + this.sync = sync; + } else { + throw new IOException("sync must be exactly 16 bytes"); + } + + init(outs); + + vout.writeFixed(DataFileConstants.MAGIC); // write magic + + vout.writeMapStart(); // write metadata + vout.setItemCount(meta.size()); + for (Map.Entry entry : meta.entrySet()) { + vout.startItem(); + vout.writeString(entry.getKey()); + vout.writeBytes(entry.getValue()); + } + vout.writeMapEnd(); + vout.writeFixed(this.sync); // write initial sync + vout.flush(); //vout may be buffered, flush before writing to out + return this; + } + + /** + * Set whether this writer should flush the block to the stream every time + * a sync marker is written. By default, the writer will flush the buffer + * each time a sync marker is written (if the block size limit is reached + * or the {@linkplain #sync()} is called. + * @param flushOnEveryBlock - If set to false, this writer will not flush + * the block to the stream until {@linkplain + * #flush()} is explicitly called. + */ + public void setFlushOnEveryBlock(boolean flushOnEveryBlock) { + this.flushOnEveryBlock = flushOnEveryBlock; + } + + /** + * @return - true if this writer flushes the block to the stream every time + * a sync marker is written. Else returns false. + */ + public boolean isFlushOnEveryBlock() { + return this.flushOnEveryBlock; + } + + /** Open a writer appending to an existing file. */ + public RawDataFileWriter appendTo(File file) throws IOException { + return appendTo(new SeekableFileInput(file), + new SyncableFileOutputStream(file, true)); + } + + /** Open a writer appending to an existing file. + * @param in reading the existing file. + * @param out positioned at the end of the existing file. + */ + public RawDataFileWriter appendTo(SeekableInput in, OutputStream out) + throws IOException { + assertNotOpen(); + DataFileReader reader = + new DataFileReader(in, new GenericDatumReader()); + this.schema = reader.getSchema(); + this.sync = reader.getHeader().sync; + this.meta.putAll(reader.getHeader().meta); + byte[] codecBytes = this.meta.get(DataFileConstants.CODEC); + if (codecBytes != null) { + String strCodec = new String(codecBytes, "UTF-8"); + this.codec = CodecFactory.fromString(strCodec).createInstance(); + } else { + this.codec = CodecFactory.nullCodec().createInstance(); + } + reader.close(); + + init(out); + + return this; + } + + private void init(OutputStream outs) throws IOException { + this.underlyingStream = outs; + this.out = new RawDataFileWriter.BufferedFileOutputStream(outs); + EncoderFactory efactory = new EncoderFactory(); + this.vout = efactory.binaryEncoder(out, null); + dout.setSchema(schema); + buffer = new RawDataFileWriter.NonCopyingByteArrayOutputStream( + Math.min((int)(syncInterval * 1.25), Integer.MAX_VALUE/2 -1)); + this.bufOut = efactory.binaryEncoder(buffer, null); + if (this.codec == null) { + this.codec = CodecFactory.nullCodec().createInstance(); + } + this.isOpen = true; + } + + private static byte[] generateSync() { + try { + MessageDigest digester = MessageDigest.getInstance("MD5"); + long time = System.currentTimeMillis(); + digester.update((UUID.randomUUID()+"@"+time).getBytes()); + return digester.digest(); + } catch (NoSuchAlgorithmException e) { + throw new RuntimeException(e); + } + } + + private RawDataFileWriter setMetaInternal(String key, byte[] value) { + assertNotOpen(); + meta.put(key, value); + return this; + } + + private RawDataFileWriter setMetaInternal(String key, String value) { + try { + return setMetaInternal(key, value.getBytes("UTF-8")); + } catch (UnsupportedEncodingException e) { + throw new RuntimeException(e); + } + } + + /** Set a metadata property. */ + public RawDataFileWriter setMeta(String key, byte[] value) { + if (isReservedMeta(key)) { + throw new AvroRuntimeException("Cannot set reserved meta key: " + key); + } + return setMetaInternal(key, value); + } + + public static boolean isReservedMeta(String key) { + return key.startsWith("avro."); + } + + /** Set a metadata property. */ + public RawDataFileWriter setMeta(String key, String value) { + try { + return setMeta(key, value.getBytes("UTF-8")); + } catch (UnsupportedEncodingException e) { + throw new RuntimeException(e); + } + } + /** Set a metadata property. */ + public RawDataFileWriter setMeta(String key, long value) { + return setMeta(key, Long.toString(value)); + } + + /** Thrown by {@link #append(Object)} when an exception occurs while writing a + * datum to the buffer. When this is thrown, the file is unaltered and may + * continue to be appended to. */ + public static class AppendWriteException extends RuntimeException { + public AppendWriteException(Exception e) { super(e); } + } + + /** Append a datum to the file. + * @see RawDataFileWriter.AppendWriteException + */ + public void append(D datum) throws IOException { + assertOpen(); + int usedBuffer = bufferInUse(); + try { + dout.write(datum, bufOut); + } catch (IOException e) { + resetBufferTo(usedBuffer); + throw new RawDataFileWriter.AppendWriteException(e); + } catch (RuntimeException re) { + resetBufferTo(usedBuffer); + throw new RawDataFileWriter.AppendWriteException(re); + } + blockCount++; + writeIfBlockFull(); + } + + // if there is an error encoding, flush the encoder and then + // reset the buffer position to contain size bytes, discarding the rest. + // Otherwise the file will be corrupt with a partial record. + private void resetBufferTo(int size) throws IOException { + bufOut.flush(); + byte[] data = buffer.toByteArray(); + buffer.reset(); + buffer.write(data, 0, size); + } + + /** Expert: Append a pre-encoded datum to the file. No validation is + * performed to check that the encoding conforms to the file's schema. + * Appending non-conforming data may result in an unreadable file. */ + public void appendEncoded(ByteBuffer datum) throws IOException { + assertOpen(); + bufOut.writeFixed(datum); + blockCount++; + writeIfBlockFull(); + } + + private int bufferInUse() { + return (buffer.size() + bufOut.bytesBuffered()); + } + + private void writeIfBlockFull() throws IOException { + if (bufferInUse() >= syncInterval) + writeBlock(); + } + + /** + * Appends data from another file. otherFile must have the same schema. + * Data blocks will be copied without de-serializing data. If the codecs + * of the two files are compatible, data blocks are copied directly without + * decompression. If the codecs are not compatible, blocks from otherFile + * are uncompressed and then compressed using this file's codec. + *

+ * If the recompress flag is set all blocks are decompressed and then compressed + * using this file's codec. This is useful when the two files have compatible + * compression codecs but different codec options. For example, one might + * append a file compressed with deflate at compression level 1 to a file with + * deflate at compression level 7. If recompress is false, blocks + * will be copied without changing the compression level. If true, they will + * be converted to the new compression level. + * @param otherFile + * @param recompress + * @throws IOException + */ + public void appendAllFrom(DataFileStream otherFile, boolean recompress) throws IOException { + assertOpen(); + // make sure other file has same schema + Schema otherSchema = otherFile.getSchema(); + if (!this.schema.equals(otherSchema)) { + throw new IOException("Schema from file " + otherFile + " does not match"); + } + // flush anything written so far + writeBlock(); + Codec otherCodec = otherFile.resolveCodec(); + DataBlock nextBlockRaw = null; + if (codec.equals(otherCodec) && !recompress) { + // copy raw bytes + while(otherFile.hasNextBlock()) { + nextBlockRaw = otherFile.nextRawBlock(nextBlockRaw); + nextBlockRaw.writeBlockTo(vout, sync); + } + } else { + while(otherFile.hasNextBlock()) { + nextBlockRaw = otherFile.nextRawBlock(nextBlockRaw); + nextBlockRaw.decompressUsing(otherCodec); + nextBlockRaw.compressUsing(codec); + nextBlockRaw.writeBlockTo(vout, sync); + } + } + } + + private void writeBlock() throws IOException { + if (blockCount > 0) { + bufOut.flush(); + ByteBuffer uncompressed = buffer.getByteArrayAsByteBuffer(); + DataBlock block = new DataBlock(uncompressed, blockCount); + block.setFlushOnWrite(flushOnEveryBlock); + block.compressUsing(codec); + block.writeBlockTo(vout, sync); + buffer.reset(); + blockCount = 0; + } + } + + protected void writeRawBlock(DataBlock rawBlock) throws IOException { + rawBlock.writeBlockTo(vout, sync); + } + + /** Return the current position as a value that may be passed to {@link + * DataFileReader#seek(long)}. Forces the end of the current block, + * emitting a synchronization marker. By default, this will also flush the + * block to the stream. + * + * If {@linkplain #setFlushOnEveryBlock(boolean)} is + * called with param set to false, then this method may not flush + * the block. In this case, the {@linkplain #flush()} must be called to + * flush the stream. + */ + public long sync() throws IOException { + assertOpen(); + writeBlock(); + return out.tell(); + } + + /** Calls {@linkplain #sync()} and then flushes the current state of the + * file. + */ + @Override + public void flush() throws IOException { + sync(); + vout.flush(); + } + + /** + * If this writer was instantiated using a File or using an + * {@linkplain Syncable} instance, this method flushes all buffers for this + * writer to disk. In other cases, this method behaves exactly + * like {@linkplain #flush()}. + * + * @throws IOException + */ + public void fSync() throws IOException { + flush(); + if (underlyingStream instanceof Syncable) { + ((Syncable) underlyingStream).sync(); + } + } + + /** Flush and close the file. */ + @Override + public void close() throws IOException { + if (isOpen) { + flush(); + out.close(); + isOpen = false; + } + } + + private class BufferedFileOutputStream extends BufferedOutputStream { + private long position; // start of buffer + + private class PositionFilter extends FilterOutputStream { + public PositionFilter(OutputStream out) throws IOException { super(out); } + @Override + public void write(byte[] b, int off, int len) throws IOException { + out.write(b, off, len); + position += len; // update on write + } + } + + public BufferedFileOutputStream(OutputStream out) throws IOException { + super(null); + this.out = new RawDataFileWriter.BufferedFileOutputStream.PositionFilter(out); + } + + public long tell() { return position+count; } + } + + private static class NonCopyingByteArrayOutputStream extends ByteArrayOutputStream { + NonCopyingByteArrayOutputStream(int initialSize) { + super(initialSize); + } + ByteBuffer getByteArrayAsByteBuffer() { + return ByteBuffer.wrap(buf, 0, count); + } + } + + +} diff --git a/occurrence-cli/src/main/java/org/apache/avro/file/SerialAvroSplitter.java b/occurrence-cli/src/main/java/org/apache/avro/file/SerialAvroSplitter.java new file mode 100644 index 000000000..80c0e3bfe --- /dev/null +++ b/occurrence-cli/src/main/java/org/apache/avro/file/SerialAvroSplitter.java @@ -0,0 +1,88 @@ +package org.apache.avro.file; + +import org.apache.avro.generic.GenericContainer; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumReader; +import org.apache.avro.reflect.ReflectDatumWriter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.util.function.Consumer; + +/** + * Splits an Avro file into chunks of a particular length (approximately), and sends them to a + * consumer. The schema is preserved. + * + * The operation is done serially, as it is expected that the consumer is slower than this producer. + */ +public class SerialAvroSplitter { + private static final Logger LOG = LoggerFactory.getLogger(SerialAvroSplitter.class); + + private static final long DEFAULT_CHUNK_SIZE = 100_000_000; // 100 MB + + private final InputStream inputStream; + private final Consumer fileConsumer; + private final long chunkSize; + + public SerialAvroSplitter(InputStream inputStream, Consumer fileConsumer) { + this.inputStream = inputStream; + this.fileConsumer = fileConsumer; + this.chunkSize = DEFAULT_CHUNK_SIZE; + } + + public void split() throws IOException { + + DatumReader datumReader = new GenericDatumReader<>(); + ReflectDatumWriter rdw = new ReflectDatumWriter<>(GenericContainer.class); + File output = null; + RawDataFileWriter dfw = null; + + int blockCount = 0; + int fileCount = 0; + + LOG.debug("Starting to read input stream"); + try (DataFileStream dfr = new DataFileStream(inputStream, datumReader)) { + + while (dfr.hasNextBlock()) { + if (blockCount == 0 || output.length() > chunkSize) { + if (dfw != null) { + dfw.close(); + LOG.debug("Completed file {} of length {}B, passing to consumer", output, output.length()); + fileConsumer.accept(output); + LOG.debug("File {} consumed, deleting", output); + output.delete(); + } + + // Start a new file + fileCount++; + output = File.createTempFile("avro-splitter-"+fileCount+"-", ".avro"); + output.deleteOnExit(); + LOG.debug("Copying Avro data to new file {}", output.getAbsolutePath()); + + dfw = new RawDataFileWriter<>(rdw); + dfw.setCodec(CodecFactory.deflateCodec(8)); // TODO: Configure compression? + dfw.setFlushOnEveryBlock(false); + dfw.create(dfr.getSchema(), output); + } + + DataFileStream.DataBlock nextBlockRaw = null; + nextBlockRaw = dfr.nextRawBlock(nextBlockRaw); + dfw.writeRawBlock(nextBlockRaw); + + blockCount++; + } + + dfw.close(); + LOG.debug("Completed file {} of length {}B, passing to consumer", output, output.length()); + fileConsumer.accept(output); + LOG.debug("File {} consumed, deleting", output); + output.delete(); + + LOG.info("Input was split to {} files with {} blocks", fileCount, blockCount); + } + } +} diff --git a/occurrence-cli/src/main/java/org/gbif/occurrence/cli/download/CloudUploaderCallback.java b/occurrence-cli/src/main/java/org/gbif/occurrence/cli/download/CloudUploaderCallback.java new file mode 100644 index 000000000..e85a78ef6 --- /dev/null +++ b/occurrence-cli/src/main/java/org/gbif/occurrence/cli/download/CloudUploaderCallback.java @@ -0,0 +1,112 @@ +package org.gbif.occurrence.cli.download; + +import org.apache.avro.file.SerialAvroSplitter; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.gbif.api.model.occurrence.Download; +import org.gbif.api.model.occurrence.DownloadFormat; +import org.gbif.api.service.registry.OccurrenceDownloadService; +import org.gbif.common.messaging.AbstractMessageCallback; +import org.gbif.common.messaging.api.messages.TransferDownloadToAzureMessage; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.slf4j.MDC; + +import java.io.IOException; + +/** Callback that is called when the {@link TransferDownloadToAzureMessage} is received. */ +public class CloudUploaderCallback + extends AbstractMessageCallback { + + private static final Logger LOG = LoggerFactory.getLogger(CloudUploaderCallback.class); + + private final OccurrenceDownloadService downloadService; + private final CloudUploaderConfiguration config; + private final FileSystem fs; + + public CloudUploaderCallback(OccurrenceDownloadService downloadService, FileSystem fs, CloudUploaderConfiguration config) { + this.downloadService = downloadService; + this.config = config; + this.fs = fs; + } + + @Override + public void handleMessage(TransferDownloadToAzureMessage message) { + MDC.put("downloadKey", message.getDownloadKey()); + + final String downloadKey = message.getDownloadKey(); + + // Check the download exists + Download download = downloadService.get(downloadKey); + LOG.info("Starting upload of download file {} to Azure", downloadKey); + if (download == null) { + throw new RuntimeException("Download is null"); + } + + if (!download.getStatus().equals(Download.Status.SUCCEEDED)) { + throw new RuntimeException("Download is not succeeded"); + } + + // Check the format is SIMPLE_AVRO + if (download.getRequest().getFormat().equals(DownloadFormat.SIMPLE_AVRO)) { + try { + // Check the file exists in HDFS + Path hdfsPath = new Path("/occurrence-download/dev-downloads/" + downloadKey + DownloadFormat.SIMPLE_AVRO.getExtension()); + if (!fs.exists(hdfsPath)) { + throw new RuntimeException("Download file does not exist on HDFS"); + } + FSDataInputStream fis = fs.open(hdfsPath); + + // Set up an UploaderToAzure with an appropriate destination + String chunkFormat = downloadKey + "/occurrence-%05d.avro"; + LOG.debug("Uploaded chunked file will have paths like {}", chunkFormat); + UploaderToAzure uploader = + new UploaderToAzure(message.getEndpoint(), message.getSasToken(), message.getContainerName(), chunkFormat); + + // Feed the file through the SerialAvroSplitter, connected to the uploader + SerialAvroSplitter avroSplitter = new SerialAvroSplitter(fis, uploader); + avroSplitter.split(); + + // Verify upload was successful etc + + // Send an email or something? + + } catch (IOException e) { + + } + } else { + // TODO: Just assuming a Zip file. + try { + // Check the file exists in HDFS + Path hdfsPath = new Path("/occurrence-download/dev-downloads/" + downloadKey + DownloadFormat.SIMPLE_CSV.getExtension()); + if (!fs.exists(hdfsPath)) { + throw new RuntimeException("Download file does not exist on HDFS"); + } + FSDataInputStream fis = fs.open(hdfsPath); + + // Set up an UploaderToAzure with an appropriate destination + String chunkFormat = downloadKey + DownloadFormat.SIMPLE_CSV.getExtension(); + LOG.debug("Uploaded file will have path {}", chunkFormat); + UploaderToAzure uploader = + new UploaderToAzure(message.getEndpoint(), message.getSasToken(), message.getContainerName(), chunkFormat); + + // Pass the HDFS stream to the uploader + uploader.accept(fis, fs.getFileStatus(hdfsPath).getLen()); + + // Verify upload was successful etc + + // Send an email or something? + + } catch (IOException e) { + + } + + + throw new RuntimeException("Download is not SIMPLE_AVRO"); + } + + LOG.info("Uploading {} to Azure completed.", downloadKey); + } + +} diff --git a/occurrence-cli/src/main/java/org/gbif/occurrence/cli/download/CloudUploaderCommand.java b/occurrence-cli/src/main/java/org/gbif/occurrence/cli/download/CloudUploaderCommand.java new file mode 100644 index 000000000..cbe853531 --- /dev/null +++ b/occurrence-cli/src/main/java/org/gbif/occurrence/cli/download/CloudUploaderCommand.java @@ -0,0 +1,28 @@ +package org.gbif.occurrence.cli.download; + +import com.google.common.util.concurrent.Service; +import org.gbif.cli.Command; +import org.gbif.cli.service.ServiceCommand; +import org.kohsuke.MetaInfServices; + +/** Entry class for CLI command to start a service that uploads GBIF downloads to cloud storage. */ +@MetaInfServices(Command.class) +public class CloudUploaderCommand extends ServiceCommand { + + private final CloudUploaderConfiguration configuration = + new CloudUploaderConfiguration(); + + public CloudUploaderCommand() { + super("cloud-uploader"); + } + + @Override + protected Service getService() { + return new CloudUploaderService(configuration); + } + + @Override + protected Object getConfigurationObject() { + return configuration; + } +} diff --git a/occurrence-cli/src/main/java/org/gbif/occurrence/cli/download/CloudUploaderConfiguration.java b/occurrence-cli/src/main/java/org/gbif/occurrence/cli/download/CloudUploaderConfiguration.java new file mode 100644 index 000000000..fcfd3a5c7 --- /dev/null +++ b/occurrence-cli/src/main/java/org/gbif/occurrence/cli/download/CloudUploaderConfiguration.java @@ -0,0 +1,56 @@ +package org.gbif.occurrence.cli.download; + +import com.beust.jcommander.Parameter; +import com.beust.jcommander.ParametersDelegate; +import org.gbif.cli.PropertyName; +import org.gbif.common.messaging.config.MessagingConfiguration; +import org.gbif.occurrence.cli.common.GangliaConfiguration; + +import javax.validation.Valid; +import javax.validation.constraints.Min; +import javax.validation.constraints.NotNull; +import java.util.Arrays; +import java.util.StringJoiner; + +/** Configuration required to upload GBIF downloads to cloud services. */ +public class CloudUploaderConfiguration { + + @ParametersDelegate @NotNull @Valid + public MessagingConfiguration messaging = new MessagingConfiguration(); + + @ParametersDelegate @Valid @NotNull + public GangliaConfiguration ganglia = new GangliaConfiguration(); + + @Parameter(names = "--pool-size") + @Min(1) + public int poolSize = 1; + + @Parameter(names = "--queue-name") + @NotNull + public String queueName; + + @Parameter(names = "--ws-url") + @NotNull + public String wsUrl = "http://api.gbif.org/v1/"; + + @Parameter(names = "--hdfs-site-config") + @NotNull + public String hdfsSiteConfig; + + @Parameter(names = "--core-site-config") + @NotNull + public String coreSiteConfig; + + @Override + public String toString() { + return new StringJoiner( + ", ", CloudUploaderConfiguration.class.getSimpleName() + "[", "]") + .add("messaging=" + messaging) + .add("ganglia=" + ganglia) + .add("poolSize=" + poolSize) + .add("queueName=" + queueName) + .add("coreSiteConfig=" + coreSiteConfig) + .add("hdfsSiteConfig=" + hdfsSiteConfig) + .toString(); + } +} diff --git a/occurrence-cli/src/main/java/org/gbif/occurrence/cli/download/CloudUploaderService.java b/occurrence-cli/src/main/java/org/gbif/occurrence/cli/download/CloudUploaderService.java new file mode 100644 index 000000000..5c3688f3a --- /dev/null +++ b/occurrence-cli/src/main/java/org/gbif/occurrence/cli/download/CloudUploaderService.java @@ -0,0 +1,86 @@ + +package org.gbif.occurrence.cli.download; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Strings; +import com.google.common.util.concurrent.AbstractIdleService; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.gbif.api.service.registry.OccurrenceDownloadService; +import org.gbif.common.messaging.DefaultMessageRegistry; +import org.gbif.common.messaging.MessageListener; +import org.gbif.registry.ws.client.OccurrenceDownloadClient; +import org.gbif.ws.client.ClientFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Value; + +import java.io.File; +import java.io.IOException; + +/** + * Service that listens to {@link + * org.gbif.common.messaging.api.messages.TransferDownloadToAzureMessage} messages. + */ +public class CloudUploaderService extends AbstractIdleService { + + private static final Logger LOG = LoggerFactory.getLogger(CloudUploaderService.class); + + private final CloudUploaderConfiguration config; + private MessageListener listener; + private FileSystem fs; + + public CloudUploaderService(CloudUploaderConfiguration config) { + this.config = config; + } + + @Override + protected void startUp() throws Exception { + LOG.info("Starting cloud-uploader-service service with params: {}", config); + listener = new MessageListener(config.messaging.getConnectionParameters(), new DefaultMessageRegistry(), new ObjectMapper(), 1); + + OccurrenceDownloadService downloadService = occurrenceDownloadService(config.wsUrl); + + fs = createFs(); + + config.ganglia.start(); + + listener.listen( + config.queueName, + config.poolSize, + new CloudUploaderCallback(downloadService, fs, config)); + } + + @Override + protected void shutDown() throws Exception { + if (listener != null) { + listener.close(); + } + if (fs != null) { + fs.close(); + } + } + + private FileSystem createFs() throws IOException { + Configuration cf = new Configuration(); + // check if the hdfs-site.xml is provided + if (!Strings.isNullOrEmpty(config.hdfsSiteConfig)) { + File hdfsSite = new File(config.hdfsSiteConfig); + if (hdfsSite.exists() && hdfsSite.isFile()) { + LOG.info("using hdfs-site.xml"); + cf.addResource(hdfsSite.toURI().toURL()); + } else { + LOG.warn("hdfs-site.xml does not exist"); + } + } + + return FileSystem.get(cf); + } + + // @Bean + public OccurrenceDownloadService occurrenceDownloadService(@Value("${api.url}") String apiUrl) { + ClientFactory clientFactory = new ClientFactory(apiUrl); + return clientFactory.newInstance(OccurrenceDownloadClient.class); + } + +} diff --git a/occurrence-cli/src/main/java/org/gbif/occurrence/cli/download/UploaderToAzure.java b/occurrence-cli/src/main/java/org/gbif/occurrence/cli/download/UploaderToAzure.java new file mode 100644 index 000000000..71fc9ed5a --- /dev/null +++ b/occurrence-cli/src/main/java/org/gbif/occurrence/cli/download/UploaderToAzure.java @@ -0,0 +1,108 @@ +package org.gbif.occurrence.cli.download; + +import com.azure.storage.blob.BlobClient; +import com.azure.storage.blob.BlobContainerClient; +import com.azure.storage.blob.BlobServiceClient; +import com.azure.storage.blob.BlobServiceClientBuilder; +import org.apache.avro.file.SerialAvroSplitter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.function.Consumer; +import java.util.zip.ZipFile; + +/** + * Upload a series of files to an Azure container. + */ +public class UploaderToAzure implements Consumer { + private final BlobContainerClient containerClient; + + private static final Logger LOG = LoggerFactory.getLogger(UploaderToAzure.class); + + /* Just for development */ + public static void main(String... args) throws Exception { + if (args.length != 4) { + System.err.println("Usage: UploaderToAzure file sasToken endpoint containerName chunkFormat"); + System.exit(1); + } + + String filePath = args[0]; // E.g. "0000031-201028124655771/occurrence-%05d.avro" + String sasToken = args[1]; // Looks like "?sv=2019-12-12&ss=xxxx&srt=sco&sp=xxxxxxxxx&se=2020-12-05T03:15:48Z&st=2020-11-04T19:15:48Z&spr=https,http&sig=XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"; + String endpoint = args[2]; // Looks like "https://gbifdownloadtest2020.blob.core.windows.net/"; + String containerName = args[3]; // "dltest1"; + String chunkFormat = args[4]; // Local file path + + new SerialAvroSplitter( + new FileInputStream(filePath), + new UploaderToAzure(endpoint, sasToken, containerName, chunkFormat)).split(); + } + + private final String chunkFormat; + private int fileCount = 0; + + public UploaderToAzure(String endpoint, String sasToken, String containerName, String chunkFormat) { + /* Create a new BlobServiceClient with a SAS Token */ + BlobServiceClient blobServiceClient = new BlobServiceClientBuilder() + .endpoint(endpoint) + .sasToken(sasToken) + .buildClient(); + + this.containerClient = blobServiceClient.getBlobContainerClient(containerName); + this.chunkFormat = chunkFormat; + } + + @Override + public void accept(File file) { + // try { + fileCount++; + BlobClient blobClient = containerClient.getBlobClient(String.format(chunkFormat, fileCount)); + LOG.debug("Starting uploading chunk file {} of size {} to {}", file, file.length(), String.format(chunkFormat, fileCount)); + blobClient.uploadFromFile(file.getPath(), true); // TODO: Remove overwrite once in prod. + LOG.debug("Completed chunk {}", file); + // } catch (BlobStorageException ex) { + // if (!ex.getErrorCode().equals(BlobErrorCode.CONTAINER_ALREADY_EXISTS)) { + // ... + // } + } + + public void accept(InputStream is, long length) { + // try { + fileCount++; + BlobClient blobClient = containerClient.getBlobClient(String.format(chunkFormat, fileCount)); + LOG.debug("Starting uploading InputStream of length {}", length); + blobClient.upload(is, length, true); // TODO: Remove overwrite once in prod. + LOG.debug("Completed uploading stream"); + // } catch (BlobStorageException ex) { + // if (!ex.getErrorCode().equals(BlobErrorCode.CONTAINER_ALREADY_EXISTS)) { + // ... + // } + } + + /* Idea: could also upload individual files within a zip file, even in parallel */ + public void acceptZipFile(File file) { + try { + ZipFile zipFile = new ZipFile(file); + System.out.println(zipFile.stream().parallel().isParallel()); + + zipFile.stream().parallel().forEach( + zipEntry -> { + try { + BlobClient blobClient = containerClient.getBlobClient(zipEntry.getName() + "aoeu"); + System.out.println("Starting " + zipEntry.getName()); + // IOUtils.skipFully(zipFile.getInputStream(zipEntry), zipEntry.getSize()); + blobClient.upload(zipFile.getInputStream(zipEntry), zipEntry.getSize(), true); + System.out.println("Completed " + zipEntry.getName()); + } catch (IOException e) { + e.printStackTrace(); + } + } + ); + } catch (Exception e) { + + } + } +} diff --git a/occurrence-cli/src/main/java/org/gbif/occurrence/cli/download/UploaderToGCS.java b/occurrence-cli/src/main/java/org/gbif/occurrence/cli/download/UploaderToGCS.java new file mode 100644 index 000000000..5e4abc434 --- /dev/null +++ b/occurrence-cli/src/main/java/org/gbif/occurrence/cli/download/UploaderToGCS.java @@ -0,0 +1,71 @@ +package org.gbif.occurrence.cli.download; + +import com.google.cloud.storage.BlobId; +import com.google.cloud.storage.BlobInfo; +import com.google.cloud.storage.Bucket; +import com.google.cloud.storage.Storage; +import com.google.cloud.storage.StorageOptions; +import org.apache.avro.file.SerialAvroSplitter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.nio.file.Files; +import java.util.function.Consumer; + +/** + * Draft code, probably doesn't work. + */ +public class UploaderToGCS implements Consumer { + private static final Logger LOG = LoggerFactory.getLogger(UploaderToGCS.class); + + private final String bucketName; + private final Storage storage; + + public static void main(String... args) throws Exception { + + if (args.length != 4) { + System.err.println("Usage: UploaderToAzure sasToken endpoint containerName file"); + System.exit(1); + } + + String sasToken = args[0]; + String endpoint = args[1]; + String containerName = args[2]; + String filePath = args[3]; + + new SerialAvroSplitter( + new FileInputStream(filePath), + new UploaderToGCS(endpoint, sasToken, containerName, "0000031-201028124655771/occurrence-%05d.avro")).split(); + } + + private final String chunkFormat; + private int fileCount = 0; + + public UploaderToGCS(String projectId, String sasToken, String bucketName, String chunkFormat) { + // Instantiates a client + storage = StorageOptions.newBuilder().setProjectId(projectId).build().getService(); + + // Get bucket + Bucket bucket = storage.get(bucketName); + this.bucketName = bucketName; + + this.chunkFormat = chunkFormat; + } + + @Override + public void accept(File file) { + try { + fileCount++; + BlobId blobId = BlobId.of(bucketName, String.format(chunkFormat, fileCount)); + BlobInfo blobInfo = BlobInfo.newBuilder(blobId).build(); + LOG.debug("Starting uploading chunk file {} of size {} to {}", file, file.length(), String.format(chunkFormat, fileCount)); + storage.create(blobInfo, Files.readAllBytes(file.toPath())); + LOG.debug("Completed chunk {}", file); + } catch (IOException e) { + + } + } +} diff --git a/occurrence-cli/src/main/java/org/gbif/occurrence/cli/download/transfer/UploadToAzure.java b/occurrence-cli/src/main/java/org/gbif/occurrence/cli/download/transfer/UploadToAzure.java new file mode 100644 index 000000000..512ad7d36 --- /dev/null +++ b/occurrence-cli/src/main/java/org/gbif/occurrence/cli/download/transfer/UploadToAzure.java @@ -0,0 +1,52 @@ +package org.gbif.occurrence.cli.download.transfer; + +import org.gbif.cli.BaseCommand; +import org.gbif.cli.Command; +import org.gbif.common.messaging.DefaultMessagePublisher; +import org.gbif.common.messaging.api.Message; +import org.gbif.common.messaging.api.MessagePublisher; +import org.gbif.common.messaging.api.messages.TransferDownloadToAzureMessage; +import org.kohsuke.MetaInfServices; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +/** Starts an upload by sending a message (which then needs to be picked up by the Uploader). + * + * Example: + * java -jar occurrence-cli.jar upload-to-azure --conf ~crap/config/occurrence-cloud-transfer-azure.yaml \ + * --download-key 0000075-201112155919426 \ + * --sas-token '?sv=2019-12-12&ss=XXXX&srt=sco&sp=XXXXXXXXX&se=2020-12-05T03:15:48Z&st=2020-11-04T19:15:48Z&spr=https,http&sig=XXXXXXXX' \ + * --endpoint 'https://gbifdownloadtest2020.blob.core.windows.net/' \ + * --container-name dltest1 + */ +@MetaInfServices(Command.class) +public class UploadToAzure extends BaseCommand { + + private static final Logger LOG = LoggerFactory.getLogger(UploadToAzure.class); + + private final UploadToAzureConfiguration config = new UploadToAzureConfiguration(); + + public UploadToAzure() { + super("upload-to-azure"); + } + + @Override + protected Object getConfigurationObject() { + return config; + } + + @Override + protected void doRun() { + try { + MessagePublisher publisher = + new DefaultMessagePublisher(config.messaging.getConnectionParameters()); + Message message = new TransferDownloadToAzureMessage(config.downloadKey, config.sasToken, config.endpoint, config.containerName); + publisher.send(message); + LOG.info("Sent message to upload {} to Azure", config.downloadKey); + } catch (IOException e) { + LOG.error("Caught exception while sending upload", e); + } + } +} diff --git a/occurrence-cli/src/main/java/org/gbif/occurrence/cli/download/transfer/UploadToAzureConfiguration.java b/occurrence-cli/src/main/java/org/gbif/occurrence/cli/download/transfer/UploadToAzureConfiguration.java new file mode 100644 index 000000000..de40cd5c4 --- /dev/null +++ b/occurrence-cli/src/main/java/org/gbif/occurrence/cli/download/transfer/UploadToAzureConfiguration.java @@ -0,0 +1,42 @@ +package org.gbif.occurrence.cli.download.transfer; + +import com.beust.jcommander.Parameter; +import com.beust.jcommander.ParametersDelegate; +import org.gbif.common.messaging.config.MessagingConfiguration; + +import javax.validation.Valid; +import javax.validation.constraints.NotNull; +import java.util.StringJoiner; + +/** Configuration required to upload GBIF downloads to cloud services. */ +public class UploadToAzureConfiguration { + + @ParametersDelegate @Valid @NotNull + public MessagingConfiguration messaging = new MessagingConfiguration(); + + @Parameter(names = "--download-key") + @NotNull + public String downloadKey; + + @Parameter(names = "--sas-token") + @NotNull + public String sasToken; + + @Parameter(names = "--endpoint") + @NotNull + public String endpoint; + + @Parameter(names = "--container-name") + @NotNull + public String containerName; + + @Override + public String toString() { + return new StringJoiner( + ", ", UploadToAzureConfiguration.class.getSimpleName() + "[", "]") + .add("sasToken=" + sasToken) + .add("endpoint=" + endpoint) + .add("containerName=" + containerName) + .toString(); + } +} diff --git a/occurrence-download/example-jobs/monthly-downloads/cloud-upload-simple-avro.json b/occurrence-download/example-jobs/monthly-downloads/cloud-upload-simple-avro.json new file mode 100644 index 000000000..d6a245790 --- /dev/null +++ b/occurrence-download/example-jobs/monthly-downloads/cloud-upload-simple-avro.json @@ -0,0 +1,36 @@ +{ + "creator": "download.gbif.org", + "format": "SIMPLE_AVRO", + "sendNotification": true, + "notification_address": ["mblissett@gbif.org"], + "predicate": { + "type": "and", + "predicates": [ + { + "type": "or", + "predicates": [ + { + "type": "equals", + "key": "LICENSE", + "value": "CC_BY_4_0" + }, + { + "type": "equals", + "key": "LICENSE", + "value": "CC0_1_0" + } + ] + }, + { + "type": "equals", + "key": "HAS_COORDINATE", + "value": "true" + }, + { + "type": "equals", + "key": "HAS_GEOSPATIAL_ISSUE", + "value": "false" + } + ] + } +} diff --git a/pom.xml b/pom.xml index bd72d96a8..da0324f10 100644 --- a/pom.xml +++ b/pom.xml @@ -153,7 +153,7 @@ 0.36 0.10 0.44 - 0.46 + 0.48-SNAPSHOT 3.43 0.4 0.31