Skip to content

Commit

Permalink
MINOR: Use Endpoint instead of EndPoint in RemoteLogManager (apache#1…
Browse files Browse the repository at this point in the history
…7514)


Reviewers: Luke Chen <[email protected]>
  • Loading branch information
mimaison authored Oct 18, 2024
1 parent e01f6a5 commit 715606e
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 39 deletions.
28 changes: 14 additions & 14 deletions core/src/main/java/kafka/log/remote/RemoteLogManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package kafka.log.remote;

import kafka.cluster.EndPoint;
import kafka.cluster.Partition;
import kafka.log.AsyncOffsetReadFutureHolder;
import kafka.log.UnifiedLog;
Expand All @@ -25,6 +24,7 @@
import kafka.server.StopPartition;
import kafka.server.TopicPartitionOperationKey;

import org.apache.kafka.common.Endpoint;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
Expand Down Expand Up @@ -201,7 +201,7 @@ public class RemoteLogManager implements Closeable {
private final KafkaMetricsGroup metricsGroup = new KafkaMetricsGroup(this.getClass());

// The endpoint for remote log metadata manager to connect to
private Optional<EndPoint> endpoint = Optional.empty();
private Optional<Endpoint> endpoint = Optional.empty();
private boolean closed = false;

private volatile boolean remoteLogManagerConfigured = false;
Expand Down Expand Up @@ -373,7 +373,7 @@ RemoteLogMetadataManager createRemoteLogMetadataManager() {
});
}

public void onEndPointCreated(EndPoint endpoint) {
public void onEndPointCreated(Endpoint endpoint) {
this.endpoint = Optional.of(endpoint);
}

Expand Down Expand Up @@ -672,7 +672,7 @@ public Optional<FileRecords.TimestampAndOffset> findOffsetByTimestamp(TopicParti
throw new KafkaException("Topic id does not exist for topic partition: " + tp);
}
Optional<UnifiedLog> unifiedLogOptional = fetchLog.apply(tp);
if (!unifiedLogOptional.isPresent()) {
if (unifiedLogOptional.isEmpty()) {
throw new KafkaException("UnifiedLog does not exist for topic partition: " + tp);
}
UnifiedLog unifiedLog = unifiedLogOptional.get();
Expand Down Expand Up @@ -775,7 +775,7 @@ public void run() {
try {
Optional<UnifiedLog> unifiedLogOptional = fetchLog.apply(topicIdPartition.topicPartition());

if (!unifiedLogOptional.isPresent()) {
if (unifiedLogOptional.isEmpty()) {
return;
}

Expand Down Expand Up @@ -839,7 +839,7 @@ private void maybeUpdateLogStartOffsetOnBecomingLeader(UnifiedLog log) throws Re
}

private void maybeUpdateCopiedOffset(UnifiedLog log) throws RemoteStorageException {
if (!copiedOffsetOption.isPresent()) {
if (copiedOffsetOption.isEmpty()) {
// This is found by traversing from the latest leader epoch from leader epoch history and find the highest offset
// of a segment with that epoch copied into remote storage. If it can not find an entry then it checks for the
// previous leader epoch till it finds an entry, If there are no entries till the earliest leader epoch in leader
Expand Down Expand Up @@ -983,7 +983,7 @@ private void copyLogSegment(UnifiedLog log, LogSegment segment, RemoteLogSegment
producerStateSnapshotFile.toPath(), leaderEpochsIndex);
brokerTopicStats.topicStats(log.topicPartition().topic()).remoteCopyRequestRate().mark();
brokerTopicStats.allTopicsStats().remoteCopyRequestRate().mark();
Optional<CustomMetadata> customMetadata = Optional.empty();
Optional<CustomMetadata> customMetadata;

try {
customMetadata = remoteLogStorageManager.copyLogSegmentData(copySegmentStartedRlsm, segmentData);
Expand Down Expand Up @@ -1098,7 +1098,7 @@ public RemoteLogRetentionHandler(Optional<RetentionSizeData> retentionSizeData,

private boolean isSegmentBreachedByRetentionSize(RemoteLogSegmentMetadata metadata) {
boolean shouldDeleteSegment = false;
if (!retentionSizeData.isPresent()) {
if (retentionSizeData.isEmpty()) {
return shouldDeleteSegment;
}
// Assumption that segments contain size >= 0
Expand All @@ -1110,7 +1110,7 @@ private boolean isSegmentBreachedByRetentionSize(RemoteLogSegmentMetadata metada
}
}
if (shouldDeleteSegment) {
if (!logStartOffset.isPresent() || logStartOffset.getAsLong() < metadata.endOffset() + 1) {
if (logStartOffset.isEmpty() || logStartOffset.getAsLong() < metadata.endOffset() + 1) {
logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
}
logger.info("About to delete remote log segment {} due to retention size {} breach. Log size after deletion will be {}.",
Expand All @@ -1121,15 +1121,15 @@ private boolean isSegmentBreachedByRetentionSize(RemoteLogSegmentMetadata metada

public boolean isSegmentBreachedByRetentionTime(RemoteLogSegmentMetadata metadata) {
boolean shouldDeleteSegment = false;
if (!retentionTimeData.isPresent()) {
if (retentionTimeData.isEmpty()) {
return shouldDeleteSegment;
}
shouldDeleteSegment = metadata.maxTimestampMs() <= retentionTimeData.get().cleanupUntilMs;
if (shouldDeleteSegment) {
remainingBreachedSize = Math.max(0, remainingBreachedSize - metadata.segmentSizeInBytes());
// It is fine to have logStartOffset as `metadata.endOffset() + 1` as the segment offset intervals
// are ascending with in an epoch.
if (!logStartOffset.isPresent() || logStartOffset.getAsLong() < metadata.endOffset() + 1) {
if (logStartOffset.isEmpty() || logStartOffset.getAsLong() < metadata.endOffset() + 1) {
logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
}
logger.info("About to delete remote log segment {} due to retention time {}ms breach based on the largest record timestamp in the segment",
Expand Down Expand Up @@ -1196,7 +1196,7 @@ void cleanupExpiredRemoteLogSegments() throws RemoteStorageException, ExecutionE
}

final Optional<UnifiedLog> logOptional = fetchLog.apply(topicIdPartition.topicPartition());
if (!logOptional.isPresent()) {
if (logOptional.isEmpty()) {
logger.debug("No UnifiedLog instance available for partition: {}", topicIdPartition);
return;
}
Expand Down Expand Up @@ -1642,7 +1642,7 @@ public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo) throws
? fetchRemoteLogSegmentMetadata(tp, epoch.getAsInt(), offset)
: Optional.empty();

if (!rlsMetadataOptional.isPresent()) {
if (rlsMetadataOptional.isEmpty()) {
String epochStr = (epoch.isPresent()) ? Integer.toString(epoch.getAsInt()) : "NOT AVAILABLE";
throw new OffsetOutOfRangeException("Received request for offset " + offset + " for leader epoch "
+ epochStr + " and partition " + tp + " which does not exist in remote tier.");
Expand Down Expand Up @@ -1854,7 +1854,7 @@ long findLogStartOffset(TopicIdPartition topicIdPartition, UnifiedLog log) throw
OptionalInt earliestEpochOpt = cache.earliestEntry()
.map(epochEntry -> OptionalInt.of(epochEntry.epoch))
.orElseGet(OptionalInt::empty);
while (!logStartOffset.isPresent() && earliestEpochOpt.isPresent()) {
while (logStartOffset.isEmpty() && earliestEpochOpt.isPresent()) {
Iterator<RemoteLogSegmentMetadata> iterator =
remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition, earliestEpochOpt.getAsInt());
if (iterator.hasNext()) {
Expand Down
3 changes: 1 addition & 2 deletions core/src/main/scala/kafka/server/BrokerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package kafka.server

import kafka.cluster.EndPoint
import kafka.coordinator.group.{CoordinatorLoaderImpl, CoordinatorPartitionWriter, GroupCoordinatorAdapter}
import kafka.coordinator.transaction.{ProducerIdManager, TransactionCoordinator}
import kafka.log.LogManager
Expand Down Expand Up @@ -481,7 +480,7 @@ class BrokerServer(
.findFirst()
.orElseThrow(() => new ConfigException(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP,
listenerName, "Should be set as a listener name within valid broker listener name list: " + listenerInfo.listeners().values()))
rlm.onEndPointCreated(EndPoint.fromJava(endpoint))
rlm.onEndPointCreated(endpoint)
}
rlm.startup()
}
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/server/KafkaServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -570,7 +570,7 @@ class KafkaServer(
.orElse(throw new ConfigException(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP,
listenerName, "Should be set as a listener name within valid broker listener name list: "
+ brokerInfo.broker.endPoints.map(_.listenerName).mkString(",")))
.foreach(e => rlm.onEndPointCreated(e))
.foreach(e => rlm.onEndPointCreated(e.toJava))
}
rlm.startup()
}
Expand Down
38 changes: 16 additions & 22 deletions core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@
*/
package kafka.log.remote;

import kafka.cluster.EndPoint;
import kafka.cluster.Partition;
import kafka.log.UnifiedLog;
import kafka.server.KafkaConfig;
import kafka.server.StopPartition;

import org.apache.kafka.common.Endpoint;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
Expand All @@ -30,7 +30,6 @@
import org.apache.kafka.common.errors.ReplicaNotAvailableException;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.record.FileRecords;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.RecordBatch;
Expand Down Expand Up @@ -375,10 +374,9 @@ void testRemoteStorageManagerWithUserDefinedConfigs() {
@Test
void testRemoteLogMetadataManagerWithEndpointConfig() {
String host = "localhost";
String port = "1234";
int port = 1234;
String securityProtocol = "PLAINTEXT";
EndPoint endPoint = new EndPoint(host, Integer.parseInt(port), new ListenerName(securityProtocol),
SecurityProtocol.PLAINTEXT);
Endpoint endPoint = new Endpoint(securityProtocol, SecurityProtocol.PLAINTEXT, host, port);
remoteLogManager.onEndPointCreated(endPoint);
remoteLogManager.startup();

Expand Down Expand Up @@ -417,11 +415,10 @@ public RemoteLogMetadataManager createRemoteLogMetadataManager() {
}) {

String host = "localhost";
String port = "1234";
int port = 1234;
String securityProtocol = "PLAINTEXT";
EndPoint endPoint = new EndPoint(host, Integer.parseInt(port), new ListenerName(securityProtocol),
SecurityProtocol.PLAINTEXT);
remoteLogManager.onEndPointCreated(endPoint);
Endpoint endpoint = new Endpoint(securityProtocol, SecurityProtocol.PLAINTEXT, host, port);
remoteLogManager.onEndPointCreated(endpoint);
remoteLogManager.startup();

ArgumentCaptor<Map<String, Object>> capture = ArgumentCaptor.forClass(Map.class);
Expand Down Expand Up @@ -1365,16 +1362,16 @@ public RemoteStorageManager createRemoteStorageManager() {
}

private void verifyInCache(TopicIdPartition... topicIdPartitions) {
Arrays.stream(topicIdPartitions).forEach(topicIdPartition -> {
assertDoesNotThrow(() -> remoteLogManager.fetchRemoteLogSegmentMetadata(topicIdPartition.topicPartition(), 0, 0L));
});
Arrays.stream(topicIdPartitions).forEach(topicIdPartition ->
assertDoesNotThrow(() -> remoteLogManager.fetchRemoteLogSegmentMetadata(topicIdPartition.topicPartition(), 0, 0L))
);
}

private void verifyNotInCache(TopicIdPartition... topicIdPartitions) {
Arrays.stream(topicIdPartitions).forEach(topicIdPartition -> {
Arrays.stream(topicIdPartitions).forEach(topicIdPartition ->
assertThrows(KafkaException.class, () ->
remoteLogManager.fetchRemoteLogSegmentMetadata(topicIdPartition.topicPartition(), 0, 0L));
});
remoteLogManager.fetchRemoteLogSegmentMetadata(topicIdPartition.topicPartition(), 0, 0L))
);
}

@Test
Expand Down Expand Up @@ -1693,10 +1690,10 @@ void testIdempotentClose() throws IOException {

@Test
public void testRemoveMetricsOnClose() throws IOException {
MockedConstruction<KafkaMetricsGroup> mockMetricsGroupCtor = mockConstruction(KafkaMetricsGroup.class);
try {
try (MockedConstruction<KafkaMetricsGroup> mockMetricsGroupCtor = mockConstruction(KafkaMetricsGroup.class)) {
RemoteLogManager remoteLogManager = new RemoteLogManager(config.remoteLogManagerConfig(), brokerId, logDir, clusterId,
time, tp -> Optional.of(mockLog), (topicPartition, offset) -> { }, brokerTopicStats, metrics) {
time, tp -> Optional.of(mockLog), (topicPartition, offset) -> {
}, brokerTopicStats, metrics) {
public RemoteStorageManager createRemoteStorageManager() {
return remoteStorageManager;
}
Expand Down Expand Up @@ -1727,8 +1724,6 @@ public RemoteLogMetadataManager createRemoteLogMetadataManager() {

verifyNoMoreInteractions(mockRlmMetricsGroup);
verifyNoMoreInteractions(mockThreadPoolMetricsGroup);
} finally {
mockMetricsGroupCtor.close();
}
}

Expand Down Expand Up @@ -3380,8 +3375,7 @@ private RemoteLogManager.RLMCopyTask setupRLMTask(boolean quotaExceeded) throws
when(rlmCopyQuotaManager.getThrottleTimeMs()).thenReturn(quotaExceeded ? 1000L : 0L);
doNothing().when(rlmCopyQuotaManager).record(anyInt());

RemoteLogManager.RLMCopyTask task = remoteLogManager.new RLMCopyTask(leaderTopicIdPartition, 128);
return task;
return remoteLogManager.new RLMCopyTask(leaderTopicIdPartition, 128);
}

@Test
Expand Down

0 comments on commit 715606e

Please sign in to comment.