diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java b/core/src/main/java/kafka/log/remote/RemoteLogManager.java index 8a71b9e88db1a..abff85e0cc038 100644 --- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java +++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java @@ -16,7 +16,6 @@ */ package kafka.log.remote; -import kafka.cluster.EndPoint; import kafka.cluster.Partition; import kafka.log.AsyncOffsetReadFutureHolder; import kafka.log.UnifiedLog; @@ -25,6 +24,7 @@ import kafka.server.StopPartition; import kafka.server.TopicPartitionOperationKey; +import org.apache.kafka.common.Endpoint; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.common.TopicPartition; @@ -201,7 +201,7 @@ public class RemoteLogManager implements Closeable { private final KafkaMetricsGroup metricsGroup = new KafkaMetricsGroup(this.getClass()); // The endpoint for remote log metadata manager to connect to - private Optional endpoint = Optional.empty(); + private Optional endpoint = Optional.empty(); private boolean closed = false; private volatile boolean remoteLogManagerConfigured = false; @@ -373,7 +373,7 @@ RemoteLogMetadataManager createRemoteLogMetadataManager() { }); } - public void onEndPointCreated(EndPoint endpoint) { + public void onEndPointCreated(Endpoint endpoint) { this.endpoint = Optional.of(endpoint); } @@ -672,7 +672,7 @@ public Optional findOffsetByTimestamp(TopicParti throw new KafkaException("Topic id does not exist for topic partition: " + tp); } Optional unifiedLogOptional = fetchLog.apply(tp); - if (!unifiedLogOptional.isPresent()) { + if (unifiedLogOptional.isEmpty()) { throw new KafkaException("UnifiedLog does not exist for topic partition: " + tp); } UnifiedLog unifiedLog = unifiedLogOptional.get(); @@ -775,7 +775,7 @@ public void run() { try { Optional unifiedLogOptional = fetchLog.apply(topicIdPartition.topicPartition()); - if (!unifiedLogOptional.isPresent()) { + if (unifiedLogOptional.isEmpty()) { return; } @@ -839,7 +839,7 @@ private void maybeUpdateLogStartOffsetOnBecomingLeader(UnifiedLog log) throws Re } private void maybeUpdateCopiedOffset(UnifiedLog log) throws RemoteStorageException { - if (!copiedOffsetOption.isPresent()) { + if (copiedOffsetOption.isEmpty()) { // This is found by traversing from the latest leader epoch from leader epoch history and find the highest offset // of a segment with that epoch copied into remote storage. If it can not find an entry then it checks for the // previous leader epoch till it finds an entry, If there are no entries till the earliest leader epoch in leader @@ -983,7 +983,7 @@ private void copyLogSegment(UnifiedLog log, LogSegment segment, RemoteLogSegment producerStateSnapshotFile.toPath(), leaderEpochsIndex); brokerTopicStats.topicStats(log.topicPartition().topic()).remoteCopyRequestRate().mark(); brokerTopicStats.allTopicsStats().remoteCopyRequestRate().mark(); - Optional customMetadata = Optional.empty(); + Optional customMetadata; try { customMetadata = remoteLogStorageManager.copyLogSegmentData(copySegmentStartedRlsm, segmentData); @@ -1098,7 +1098,7 @@ public RemoteLogRetentionHandler(Optional retentionSizeData, private boolean isSegmentBreachedByRetentionSize(RemoteLogSegmentMetadata metadata) { boolean shouldDeleteSegment = false; - if (!retentionSizeData.isPresent()) { + if (retentionSizeData.isEmpty()) { return shouldDeleteSegment; } // Assumption that segments contain size >= 0 @@ -1110,7 +1110,7 @@ private boolean isSegmentBreachedByRetentionSize(RemoteLogSegmentMetadata metada } } if (shouldDeleteSegment) { - if (!logStartOffset.isPresent() || logStartOffset.getAsLong() < metadata.endOffset() + 1) { + if (logStartOffset.isEmpty() || logStartOffset.getAsLong() < metadata.endOffset() + 1) { logStartOffset = OptionalLong.of(metadata.endOffset() + 1); } logger.info("About to delete remote log segment {} due to retention size {} breach. Log size after deletion will be {}.", @@ -1121,7 +1121,7 @@ private boolean isSegmentBreachedByRetentionSize(RemoteLogSegmentMetadata metada public boolean isSegmentBreachedByRetentionTime(RemoteLogSegmentMetadata metadata) { boolean shouldDeleteSegment = false; - if (!retentionTimeData.isPresent()) { + if (retentionTimeData.isEmpty()) { return shouldDeleteSegment; } shouldDeleteSegment = metadata.maxTimestampMs() <= retentionTimeData.get().cleanupUntilMs; @@ -1129,7 +1129,7 @@ public boolean isSegmentBreachedByRetentionTime(RemoteLogSegmentMetadata metadat remainingBreachedSize = Math.max(0, remainingBreachedSize - metadata.segmentSizeInBytes()); // It is fine to have logStartOffset as `metadata.endOffset() + 1` as the segment offset intervals // are ascending with in an epoch. - if (!logStartOffset.isPresent() || logStartOffset.getAsLong() < metadata.endOffset() + 1) { + if (logStartOffset.isEmpty() || logStartOffset.getAsLong() < metadata.endOffset() + 1) { logStartOffset = OptionalLong.of(metadata.endOffset() + 1); } logger.info("About to delete remote log segment {} due to retention time {}ms breach based on the largest record timestamp in the segment", @@ -1196,7 +1196,7 @@ void cleanupExpiredRemoteLogSegments() throws RemoteStorageException, ExecutionE } final Optional logOptional = fetchLog.apply(topicIdPartition.topicPartition()); - if (!logOptional.isPresent()) { + if (logOptional.isEmpty()) { logger.debug("No UnifiedLog instance available for partition: {}", topicIdPartition); return; } @@ -1642,7 +1642,7 @@ public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo) throws ? fetchRemoteLogSegmentMetadata(tp, epoch.getAsInt(), offset) : Optional.empty(); - if (!rlsMetadataOptional.isPresent()) { + if (rlsMetadataOptional.isEmpty()) { String epochStr = (epoch.isPresent()) ? Integer.toString(epoch.getAsInt()) : "NOT AVAILABLE"; throw new OffsetOutOfRangeException("Received request for offset " + offset + " for leader epoch " + epochStr + " and partition " + tp + " which does not exist in remote tier."); @@ -1854,7 +1854,7 @@ long findLogStartOffset(TopicIdPartition topicIdPartition, UnifiedLog log) throw OptionalInt earliestEpochOpt = cache.earliestEntry() .map(epochEntry -> OptionalInt.of(epochEntry.epoch)) .orElseGet(OptionalInt::empty); - while (!logStartOffset.isPresent() && earliestEpochOpt.isPresent()) { + while (logStartOffset.isEmpty() && earliestEpochOpt.isPresent()) { Iterator iterator = remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition, earliestEpochOpt.getAsInt()); if (iterator.hasNext()) { diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala index c779482b50bb0..f11b90a697413 100644 --- a/core/src/main/scala/kafka/server/BrokerServer.scala +++ b/core/src/main/scala/kafka/server/BrokerServer.scala @@ -17,7 +17,6 @@ package kafka.server -import kafka.cluster.EndPoint import kafka.coordinator.group.{CoordinatorLoaderImpl, CoordinatorPartitionWriter, GroupCoordinatorAdapter} import kafka.coordinator.transaction.{ProducerIdManager, TransactionCoordinator} import kafka.log.LogManager @@ -481,7 +480,7 @@ class BrokerServer( .findFirst() .orElseThrow(() => new ConfigException(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP, listenerName, "Should be set as a listener name within valid broker listener name list: " + listenerInfo.listeners().values())) - rlm.onEndPointCreated(EndPoint.fromJava(endpoint)) + rlm.onEndPointCreated(endpoint) } rlm.startup() } diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 0b8d3a1dd81db..f545eae4d7cbb 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -570,7 +570,7 @@ class KafkaServer( .orElse(throw new ConfigException(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP, listenerName, "Should be set as a listener name within valid broker listener name list: " + brokerInfo.broker.endPoints.map(_.listenerName).mkString(","))) - .foreach(e => rlm.onEndPointCreated(e)) + .foreach(e => rlm.onEndPointCreated(e.toJava)) } rlm.startup() } diff --git a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java index d78ee7359c7bb..d9ff616b15bd6 100644 --- a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java +++ b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java @@ -16,12 +16,12 @@ */ package kafka.log.remote; -import kafka.cluster.EndPoint; import kafka.cluster.Partition; import kafka.log.UnifiedLog; import kafka.server.KafkaConfig; import kafka.server.StopPartition; +import org.apache.kafka.common.Endpoint; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.common.TopicPartition; @@ -30,7 +30,6 @@ import org.apache.kafka.common.errors.ReplicaNotAvailableException; import org.apache.kafka.common.metrics.KafkaMetric; import org.apache.kafka.common.metrics.Metrics; -import org.apache.kafka.common.network.ListenerName; import org.apache.kafka.common.record.FileRecords; import org.apache.kafka.common.record.MemoryRecords; import org.apache.kafka.common.record.RecordBatch; @@ -375,10 +374,9 @@ void testRemoteStorageManagerWithUserDefinedConfigs() { @Test void testRemoteLogMetadataManagerWithEndpointConfig() { String host = "localhost"; - String port = "1234"; + int port = 1234; String securityProtocol = "PLAINTEXT"; - EndPoint endPoint = new EndPoint(host, Integer.parseInt(port), new ListenerName(securityProtocol), - SecurityProtocol.PLAINTEXT); + Endpoint endPoint = new Endpoint(securityProtocol, SecurityProtocol.PLAINTEXT, host, port); remoteLogManager.onEndPointCreated(endPoint); remoteLogManager.startup(); @@ -417,11 +415,10 @@ public RemoteLogMetadataManager createRemoteLogMetadataManager() { }) { String host = "localhost"; - String port = "1234"; + int port = 1234; String securityProtocol = "PLAINTEXT"; - EndPoint endPoint = new EndPoint(host, Integer.parseInt(port), new ListenerName(securityProtocol), - SecurityProtocol.PLAINTEXT); - remoteLogManager.onEndPointCreated(endPoint); + Endpoint endpoint = new Endpoint(securityProtocol, SecurityProtocol.PLAINTEXT, host, port); + remoteLogManager.onEndPointCreated(endpoint); remoteLogManager.startup(); ArgumentCaptor> capture = ArgumentCaptor.forClass(Map.class); @@ -1365,16 +1362,16 @@ public RemoteStorageManager createRemoteStorageManager() { } private void verifyInCache(TopicIdPartition... topicIdPartitions) { - Arrays.stream(topicIdPartitions).forEach(topicIdPartition -> { - assertDoesNotThrow(() -> remoteLogManager.fetchRemoteLogSegmentMetadata(topicIdPartition.topicPartition(), 0, 0L)); - }); + Arrays.stream(topicIdPartitions).forEach(topicIdPartition -> + assertDoesNotThrow(() -> remoteLogManager.fetchRemoteLogSegmentMetadata(topicIdPartition.topicPartition(), 0, 0L)) + ); } private void verifyNotInCache(TopicIdPartition... topicIdPartitions) { - Arrays.stream(topicIdPartitions).forEach(topicIdPartition -> { + Arrays.stream(topicIdPartitions).forEach(topicIdPartition -> assertThrows(KafkaException.class, () -> - remoteLogManager.fetchRemoteLogSegmentMetadata(topicIdPartition.topicPartition(), 0, 0L)); - }); + remoteLogManager.fetchRemoteLogSegmentMetadata(topicIdPartition.topicPartition(), 0, 0L)) + ); } @Test @@ -1693,10 +1690,10 @@ void testIdempotentClose() throws IOException { @Test public void testRemoveMetricsOnClose() throws IOException { - MockedConstruction mockMetricsGroupCtor = mockConstruction(KafkaMetricsGroup.class); - try { + try (MockedConstruction mockMetricsGroupCtor = mockConstruction(KafkaMetricsGroup.class)) { RemoteLogManager remoteLogManager = new RemoteLogManager(config.remoteLogManagerConfig(), brokerId, logDir, clusterId, - time, tp -> Optional.of(mockLog), (topicPartition, offset) -> { }, brokerTopicStats, metrics) { + time, tp -> Optional.of(mockLog), (topicPartition, offset) -> { + }, brokerTopicStats, metrics) { public RemoteStorageManager createRemoteStorageManager() { return remoteStorageManager; } @@ -1727,8 +1724,6 @@ public RemoteLogMetadataManager createRemoteLogMetadataManager() { verifyNoMoreInteractions(mockRlmMetricsGroup); verifyNoMoreInteractions(mockThreadPoolMetricsGroup); - } finally { - mockMetricsGroupCtor.close(); } } @@ -3380,8 +3375,7 @@ private RemoteLogManager.RLMCopyTask setupRLMTask(boolean quotaExceeded) throws when(rlmCopyQuotaManager.getThrottleTimeMs()).thenReturn(quotaExceeded ? 1000L : 0L); doNothing().when(rlmCopyQuotaManager).record(anyInt()); - RemoteLogManager.RLMCopyTask task = remoteLogManager.new RLMCopyTask(leaderTopicIdPartition, 128); - return task; + return remoteLogManager.new RLMCopyTask(leaderTopicIdPartition, 128); } @Test