Skip to content

Commit

Permalink
chore: bump s3stream to 0.11.0-SNAPSHOT (#614)
Browse files Browse the repository at this point in the history
Signed-off-by: Ning Yu <[email protected]>
  • Loading branch information
Chillax-0v0 authored Dec 29, 2023
1 parent d89b85c commit c3eb519
Show file tree
Hide file tree
Showing 4 changed files with 12 additions and 12 deletions.
8 changes: 4 additions & 4 deletions core/src/main/scala/kafka/log/streamaspect/ElasticLog.scala
Original file line number Diff line number Diff line change
Expand Up @@ -477,7 +477,7 @@ object ElasticLog extends Logging {
} else {
val metaStreamId = Unpooled.wrappedBuffer(value.get()).readLong()
// open partition meta stream
val stream = client.streamClient().openStream(metaStreamId, OpenStreamOptions.newBuilder().epoch(leaderEpoch).build())
val stream = client.streamClient().openStream(metaStreamId, OpenStreamOptions.builder().epoch(leaderEpoch).build())
.thenApply(stream => new MetaStream(stream, META_SCHEDULE_EXECUTOR, logIdent))
.get()
info(s"${logIdent}opened existing meta stream: stream_id=$metaStreamId")
Expand Down Expand Up @@ -628,15 +628,15 @@ object ElasticLog extends Logging {

private def openStreamWithRetry(client: Client, streamId: Long, epoch: Long, logIdent: String): MetaStream = {
client.streamClient()
.openStream(streamId, OpenStreamOptions.newBuilder().epoch(epoch).build())
.openStream(streamId, OpenStreamOptions.builder().epoch(epoch).build())
.exceptionally(_ => client.streamClient()
.openStream(streamId, OpenStreamOptions.newBuilder().build()).join()
.openStream(streamId, OpenStreamOptions.builder().build()).join()
).thenApply(stream => new MetaStream(stream, META_SCHEDULE_EXECUTOR, logIdent))
.join()
}

private[streamaspect] def createMetaStream(client: Client, key: String, replicaCount: Int, leaderEpoch: Long, logIdent: String): MetaStream = {
val metaStream = client.streamClient().createAndOpenStream(CreateStreamOptions.newBuilder()
val metaStream = client.streamClient().createAndOpenStream(CreateStreamOptions.builder()
.replicaCount(replicaCount)
.epoch(leaderEpoch).build()
).thenApply(stream => new MetaStream(stream, META_SCHEDULE_EXECUTOR, logIdent))
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/kafka/log/streamaspect/LazyStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public LazyStream(String name, long streamId, StreamClient client, int replicaCo
if (streamId != NOOP_STREAM_ID) {
try {
// open exist stream
inner = client.openStream(streamId, OpenStreamOptions.newBuilder().epoch(epoch).build()).get();
inner = client.openStream(streamId, OpenStreamOptions.builder().epoch(epoch).build()).get();
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (ExecutionException e) {
Expand All @@ -75,7 +75,7 @@ public LazyStream(String name, long streamId, StreamClient client, int replicaCo
public void warmUp() throws IOException {
if (this.inner == NOOP_STREAM) {
try {
this.inner = client.createAndOpenStream(CreateStreamOptions.newBuilder().replicaCount(replicaCount)
this.inner = client.createAndOpenStream(CreateStreamOptions.builder().replicaCount(replicaCount)
.epoch(epoch).build()).get();
LOGGER.info("warmup, created and opened a new stream: stream_id={}, epoch={}, name={}", this.inner.streamId(), epoch, name);
notifyListener(ElasticStreamMetaEvent.STREAM_DO_CREATE);
Expand Down Expand Up @@ -110,7 +110,7 @@ public long nextOffset() {
public synchronized CompletableFuture<AppendResult> append(AppendContext context, RecordBatch recordBatch) {
if (this.inner == NOOP_STREAM) {
try {
this.inner = client.createAndOpenStream(CreateStreamOptions.newBuilder().replicaCount(replicaCount)
this.inner = client.createAndOpenStream(CreateStreamOptions.builder().replicaCount(replicaCount)
.epoch(epoch).build()).get();
LOGGER.info("created and opened a new stream: stream_id={}, epoch={}, name={}", this.inner.streamId(), epoch, name);
notifyListener(ElasticStreamMetaEvent.STREAM_DO_CREATE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public void basicAppendAndFetch() throws ExecutionException, InterruptedExceptio
client.start();
Stream stream = client
.streamClient()
.createAndOpenStream(CreateStreamOptions.newBuilder().epoch(0).replicaCount(1).build())
.createAndOpenStream(CreateStreamOptions.builder().epoch(0).replicaCount(1).build())
.get();
List<byte[]> payloads = List.of("hello".getBytes(), "world".getBytes());
CompletableFuture.allOf(
Expand Down Expand Up @@ -110,7 +110,7 @@ public void testStreamOperationHalt() {

Stream stream = client
.streamClient()
.createAndOpenStream(CreateStreamOptions.newBuilder().epoch(0).replicaCount(1).build())
.createAndOpenStream(CreateStreamOptions.builder().epoch(0).replicaCount(1).build())
.join();

AtomicInteger exceptionCount = new AtomicInteger(0);
Expand Down Expand Up @@ -177,7 +177,7 @@ public void testNormalExceptionHandling() {
private CompletableFuture<Stream> openStream(long streamId) {
return client
.streamClient()
.openStream(streamId, OpenStreamOptions.newBuilder().epoch(1).build());
.openStream(streamId, OpenStreamOptions.builder().epoch(1).build());
}

private void checkAppendAndFetch(List<byte[]> rawPayloads, FetchResult fetched) {
Expand Down Expand Up @@ -396,4 +396,4 @@ public ExceptionHint moveToNext() {
}
}
}
}
}
2 changes: 1 addition & 1 deletion gradle/dependencies.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ versions += [
zookeeper: "3.6.3",
zstd: "1.5.2-1",
commonLang: "3.12.0",
s3stream: "0.10.0-SNAPSHOT",
s3stream: "0.11.0-SNAPSHOT",
opentelemetry: "1.32.0",
opentelemetryAlpha: "1.32.0-alpha",
oshi: "6.4.7"
Expand Down

0 comments on commit c3eb519

Please sign in to comment.