Skip to content

Commit

Permalink
feat(telemetry): support gzip compression on uploading metrics & logs… (
Browse files Browse the repository at this point in the history
#2224)

feat(telemetry): support gzip compression on uploading metrics & logs to s3

Signed-off-by: Shichao Nie <[email protected]>
  • Loading branch information
SCNieh authored Dec 13, 2024
1 parent 432c0dc commit 370f947
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
package com.automq.shell.log;

import com.automq.shell.AutoMQApplication;
import com.automq.shell.util.Utils;
import com.automq.stream.s3.operator.ObjectStorage;
import com.automq.stream.s3.operator.ObjectStorage.ObjectInfo;
import com.automq.stream.s3.operator.ObjectStorage.ObjectPath;
Expand Down Expand Up @@ -201,7 +202,7 @@ private void upload(long now) {

try {
String objectKey = getObjectKey();
objectStorage.write(WriteOptions.DEFAULT, objectKey, uploadBuffer.retainedSlice().asReadOnly()).get();
objectStorage.write(WriteOptions.DEFAULT, objectKey, Utils.compress(uploadBuffer.slice().asReadOnly())).get();
break;
} catch (Exception e) {
e.printStackTrace(System.err);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

package com.automq.shell.metrics;

import com.automq.shell.util.Utils;
import com.automq.stream.s3.operator.ObjectStorage;
import com.automq.stream.s3.operator.ObjectStorage.ObjectInfo;
import com.automq.stream.s3.operator.ObjectStorage.ObjectPath;
Expand Down Expand Up @@ -239,7 +240,7 @@ public CompletableResultCode flush() {
synchronized (uploadBuffer) {
if (uploadBuffer.readableBytes() > 0) {
try {
objectStorage.write(WriteOptions.DEFAULT, getObjectKey(), uploadBuffer.retainedSlice().asReadOnly()).get();
objectStorage.write(WriteOptions.DEFAULT, getObjectKey(), Utils.compress(uploadBuffer.slice().asReadOnly())).get();
} catch (Exception e) {
LOGGER.error("Failed to upload metrics to s3", e);
return CompletableResultCode.ofFailure();
Expand Down
61 changes: 61 additions & 0 deletions automq-shell/src/main/java/com/automq/shell/util/Utils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright 2024, AutoMQ HK Limited.
*
* The use of this file is governed by the Business Source License,
* as detailed in the file "/LICENSE.S3Stream" included in this repository.
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0
*/

package com.automq.shell.util;

import com.automq.stream.s3.ByteBufAlloc;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;

import io.netty.buffer.ByteBuf;

public class Utils {

public static ByteBuf compress(ByteBuf input) throws IOException {
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
GZIPOutputStream gzipOutputStream = new GZIPOutputStream(byteArrayOutputStream);

byte[] buffer = new byte[input.readableBytes()];
input.readBytes(buffer);
gzipOutputStream.write(buffer);
gzipOutputStream.close();

ByteBuf compressed = ByteBufAlloc.byteBuffer(byteArrayOutputStream.size());
compressed.writeBytes(byteArrayOutputStream.toByteArray());
return compressed;
}

public static ByteBuf decompress(ByteBuf input) throws IOException {
byte[] compressedData = new byte[input.readableBytes()];
input.readBytes(compressedData);
ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(compressedData);
GZIPInputStream gzipInputStream = new GZIPInputStream(byteArrayInputStream);

ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
byte[] buffer = new byte[1024];
int bytesRead;
while ((bytesRead = gzipInputStream.read(buffer)) != -1) {
byteArrayOutputStream.write(buffer, 0, bytesRead);
}

gzipInputStream.close();
byteArrayOutputStream.close();

byte[] uncompressedData = byteArrayOutputStream.toByteArray();
ByteBuf output = ByteBufAlloc.byteBuffer(uncompressedData.length);
output.writeBytes(uncompressedData);
return output;
}
}
40 changes: 40 additions & 0 deletions automq-shell/src/test/java/com/automq/shell/util/UtilsTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright 2024, AutoMQ HK Limited.
*
* The use of this file is governed by the Business Source License,
* as detailed in the file "/LICENSE.S3Stream" included in this repository.
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0
*/

package com.automq.shell.util;

import com.automq.stream.s3.ByteBufAlloc;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;

import io.netty.buffer.ByteBuf;

@Tag("S3Unit")
public class UtilsTest {

@Test
public void testCompression() {
String testStr = "This is a test string";
ByteBuf input = ByteBufAlloc.byteBuffer(testStr.length());
input.writeBytes(testStr.getBytes());
try {
ByteBuf compressed = Utils.compress(input);
ByteBuf decompressed = Utils.decompress(compressed);
String decompressedStr = decompressed.toString(io.netty.util.CharsetUtil.UTF_8);
System.out.printf("Original: %s, Decompressed: %s\n", testStr, decompressedStr);
Assertions.assertEquals(testStr, decompressedStr);
} catch (Exception e) {
Assertions.fail("Exception occurred during compression/decompression: " + e.getMessage());
}
}
}

0 comments on commit 370f947

Please sign in to comment.