From 7fd0cae0c86f05261bcc39b8e92405559dce8f5b Mon Sep 17 00:00:00 2001 From: Shichao Nie Date: Fri, 13 Dec 2024 09:37:36 +0800 Subject: [PATCH] feat(telemetry): support gzip compression on uploading metrics & logs to s3 Signed-off-by: Shichao Nie --- .../com/automq/shell/log/LogUploader.java | 3 +- .../shell/metrics/S3MetricsExporter.java | 3 +- .../java/com/automq/shell/util/Utils.java | 61 +++++++++++++++++++ .../java/com/automq/shell/util/UtilsTest.java | 40 ++++++++++++ 4 files changed, 105 insertions(+), 2 deletions(-) create mode 100644 automq-shell/src/main/java/com/automq/shell/util/Utils.java create mode 100644 automq-shell/src/test/java/com/automq/shell/util/UtilsTest.java diff --git a/automq-shell/src/main/java/com/automq/shell/log/LogUploader.java b/automq-shell/src/main/java/com/automq/shell/log/LogUploader.java index 67dd6d1a43..ca55437e6d 100644 --- a/automq-shell/src/main/java/com/automq/shell/log/LogUploader.java +++ b/automq-shell/src/main/java/com/automq/shell/log/LogUploader.java @@ -12,6 +12,7 @@ package com.automq.shell.log; import com.automq.shell.AutoMQApplication; +import com.automq.shell.util.Utils; import com.automq.stream.s3.operator.ObjectStorage; import com.automq.stream.s3.operator.ObjectStorage.ObjectInfo; import com.automq.stream.s3.operator.ObjectStorage.ObjectPath; @@ -201,7 +202,7 @@ private void upload(long now) { try { String objectKey = getObjectKey(); - objectStorage.write(WriteOptions.DEFAULT, objectKey, uploadBuffer.retainedSlice().asReadOnly()).get(); + objectStorage.write(WriteOptions.DEFAULT, objectKey, Utils.compress(uploadBuffer.slice().asReadOnly())).get(); break; } catch (Exception e) { e.printStackTrace(System.err); diff --git a/automq-shell/src/main/java/com/automq/shell/metrics/S3MetricsExporter.java b/automq-shell/src/main/java/com/automq/shell/metrics/S3MetricsExporter.java index 55ac77ce5a..f6fed6d66a 100644 --- a/automq-shell/src/main/java/com/automq/shell/metrics/S3MetricsExporter.java +++ b/automq-shell/src/main/java/com/automq/shell/metrics/S3MetricsExporter.java @@ -11,6 +11,7 @@ package com.automq.shell.metrics; +import com.automq.shell.util.Utils; import com.automq.stream.s3.operator.ObjectStorage; import com.automq.stream.s3.operator.ObjectStorage.ObjectInfo; import com.automq.stream.s3.operator.ObjectStorage.ObjectPath; @@ -239,7 +240,7 @@ public CompletableResultCode flush() { synchronized (uploadBuffer) { if (uploadBuffer.readableBytes() > 0) { try { - objectStorage.write(WriteOptions.DEFAULT, getObjectKey(), uploadBuffer.retainedSlice().asReadOnly()).get(); + objectStorage.write(WriteOptions.DEFAULT, getObjectKey(), Utils.compress(uploadBuffer.slice().asReadOnly())).get(); } catch (Exception e) { LOGGER.error("Failed to upload metrics to s3", e); return CompletableResultCode.ofFailure(); diff --git a/automq-shell/src/main/java/com/automq/shell/util/Utils.java b/automq-shell/src/main/java/com/automq/shell/util/Utils.java new file mode 100644 index 0000000000..9972c9f435 --- /dev/null +++ b/automq-shell/src/main/java/com/automq/shell/util/Utils.java @@ -0,0 +1,61 @@ +/* + * Copyright 2024, AutoMQ HK Limited. + * + * The use of this file is governed by the Business Source License, + * as detailed in the file "/LICENSE.S3Stream" included in this repository. + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +package com.automq.shell.util; + +import com.automq.stream.s3.ByteBufAlloc; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.zip.GZIPInputStream; +import java.util.zip.GZIPOutputStream; + +import io.netty.buffer.ByteBuf; + +public class Utils { + + public static ByteBuf compress(ByteBuf input) throws IOException { + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + GZIPOutputStream gzipOutputStream = new GZIPOutputStream(byteArrayOutputStream); + + byte[] buffer = new byte[input.readableBytes()]; + input.readBytes(buffer); + gzipOutputStream.write(buffer); + gzipOutputStream.close(); + + ByteBuf compressed = ByteBufAlloc.byteBuffer(byteArrayOutputStream.size()); + compressed.writeBytes(byteArrayOutputStream.toByteArray()); + return compressed; + } + + public static ByteBuf decompress(ByteBuf input) throws IOException { + byte[] compressedData = new byte[input.readableBytes()]; + input.readBytes(compressedData); + ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(compressedData); + GZIPInputStream gzipInputStream = new GZIPInputStream(byteArrayInputStream); + + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + byte[] buffer = new byte[1024]; + int bytesRead; + while ((bytesRead = gzipInputStream.read(buffer)) != -1) { + byteArrayOutputStream.write(buffer, 0, bytesRead); + } + + gzipInputStream.close(); + byteArrayOutputStream.close(); + + byte[] uncompressedData = byteArrayOutputStream.toByteArray(); + ByteBuf output = ByteBufAlloc.byteBuffer(uncompressedData.length); + output.writeBytes(uncompressedData); + return output; + } +} diff --git a/automq-shell/src/test/java/com/automq/shell/util/UtilsTest.java b/automq-shell/src/test/java/com/automq/shell/util/UtilsTest.java new file mode 100644 index 0000000000..8d0d032ee9 --- /dev/null +++ b/automq-shell/src/test/java/com/automq/shell/util/UtilsTest.java @@ -0,0 +1,40 @@ +/* + * Copyright 2024, AutoMQ HK Limited. + * + * The use of this file is governed by the Business Source License, + * as detailed in the file "/LICENSE.S3Stream" included in this repository. + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +package com.automq.shell.util; + +import com.automq.stream.s3.ByteBufAlloc; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; + +import io.netty.buffer.ByteBuf; + +@Tag("S3Unit") +public class UtilsTest { + + @Test + public void testCompression() { + String testStr = "This is a test string"; + ByteBuf input = ByteBufAlloc.byteBuffer(testStr.length()); + input.writeBytes(testStr.getBytes()); + try { + ByteBuf compressed = Utils.compress(input); + ByteBuf decompressed = Utils.decompress(compressed); + String decompressedStr = decompressed.toString(io.netty.util.CharsetUtil.UTF_8); + System.out.printf("Original: %s, Decompressed: %s\n", testStr, decompressedStr); + Assertions.assertEquals(testStr, decompressedStr); + } catch (Exception e) { + Assertions.fail("Exception occurred during compression/decompression: " + e.getMessage()); + } + } +}