From 03071d16557a44daa50d0aab16a8c94c7f65fb94 Mon Sep 17 00:00:00 2001 From: Juha Mynttinen Date: Mon, 5 Aug 2024 09:19:37 +0300 Subject: [PATCH] Fix compatibility with Kafka >= 3.8 Stop using io.aiven.kafka.auth.audit.Session from Kafka that is no longer present in Kafka >= 3.8. Add io.aiven.kafka.auth.audit.Session to replace that. In the new class, add properly named getter methods and make the fields private. --- .../kafka/auth/AivenAclAuthorizerV2.java | 2 +- .../io/aiven/kafka/auth/audit/Auditor.java | 1 - .../io/aiven/kafka/auth/audit/AuditorAPI.java | 4 +- .../io/aiven/kafka/auth/audit/NoAuditor.java | 4 +- .../io/aiven/kafka/auth/audit/Session.java | 40 +++++++++++++++++++ .../kafka/auth/audit/UserActivityAuditor.java | 5 +-- .../audit/UserOperationsActivityAuditor.java | 11 +++-- .../kafka/auth/audit/FormatterTestBase.java | 20 +++++----- .../audit/PrincipalAndIpFormatterTest.java | 4 +- .../auth/audit/PrincipalFormatterTest.java | 4 +- .../auth/audit/UserActivityAuditorTest.java | 1 - .../UserOperationsActivityAuditorTest.java | 11 +++-- 12 files changed, 68 insertions(+), 39 deletions(-) create mode 100644 src/main/java/io/aiven/kafka/auth/audit/Session.java diff --git a/src/main/java/io/aiven/kafka/auth/AivenAclAuthorizerV2.java b/src/main/java/io/aiven/kafka/auth/AivenAclAuthorizerV2.java index ced090f..1a9c1ad 100644 --- a/src/main/java/io/aiven/kafka/auth/AivenAclAuthorizerV2.java +++ b/src/main/java/io/aiven/kafka/auth/AivenAclAuthorizerV2.java @@ -52,6 +52,7 @@ import org.apache.kafka.server.authorizer.AuthorizerServerInfo; import io.aiven.kafka.auth.audit.AuditorAPI; +import io.aiven.kafka.auth.audit.Session; import io.aiven.kafka.auth.json.AivenAcl; import io.aiven.kafka.auth.json.reader.AclJsonReader; import io.aiven.kafka.auth.json.reader.JsonReaderException; @@ -59,7 +60,6 @@ import io.aiven.kafka.auth.nameformatters.LegacyResourceTypeNameFormatter; import io.aiven.kafka.auth.nativeacls.AclAivenToNativeConverter; -import kafka.network.RequestChannel.Session; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/src/main/java/io/aiven/kafka/auth/audit/Auditor.java b/src/main/java/io/aiven/kafka/auth/audit/Auditor.java index 7a3fae9..caa69c5 100644 --- a/src/main/java/io/aiven/kafka/auth/audit/Auditor.java +++ b/src/main/java/io/aiven/kafka/auth/audit/Auditor.java @@ -30,7 +30,6 @@ import org.apache.kafka.common.resource.ResourcePattern; import org.apache.kafka.common.security.auth.KafkaPrincipal; -import kafka.network.RequestChannel.Session; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/src/main/java/io/aiven/kafka/auth/audit/AuditorAPI.java b/src/main/java/io/aiven/kafka/auth/audit/AuditorAPI.java index d580e1d..b6907ae 100644 --- a/src/main/java/io/aiven/kafka/auth/audit/AuditorAPI.java +++ b/src/main/java/io/aiven/kafka/auth/audit/AuditorAPI.java @@ -20,10 +20,8 @@ import org.apache.kafka.common.acl.AclOperation; import org.apache.kafka.common.resource.ResourcePattern; -import kafka.network.RequestChannel; - public interface AuditorAPI extends Configurable { - void addActivity(final RequestChannel.Session session, + void addActivity(final Session session, final AclOperation operation, final ResourcePattern resource, final boolean hasAccess); diff --git a/src/main/java/io/aiven/kafka/auth/audit/NoAuditor.java b/src/main/java/io/aiven/kafka/auth/audit/NoAuditor.java index d479b6d..56fd507 100644 --- a/src/main/java/io/aiven/kafka/auth/audit/NoAuditor.java +++ b/src/main/java/io/aiven/kafka/auth/audit/NoAuditor.java @@ -21,8 +21,6 @@ import org.apache.kafka.common.acl.AclOperation; import org.apache.kafka.common.resource.ResourcePattern; -import kafka.network.RequestChannel; - /** * A no-op {@link AuditorAPI}. */ @@ -33,7 +31,7 @@ public NoAuditor() { } @Override - public void addActivity(final RequestChannel.Session session, + public void addActivity(final Session session, final AclOperation operation, final ResourcePattern resource, final boolean hasAccess) { diff --git a/src/main/java/io/aiven/kafka/auth/audit/Session.java b/src/main/java/io/aiven/kafka/auth/audit/Session.java new file mode 100644 index 0000000..e04824e --- /dev/null +++ b/src/main/java/io/aiven/kafka/auth/audit/Session.java @@ -0,0 +1,40 @@ +/* + * Copyright 2024 Aiven Oy https://aiven.io + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.auth.audit; + +import java.net.InetAddress; + +import org.apache.kafka.common.security.auth.KafkaPrincipal; + + +public class Session { + private final KafkaPrincipal principal; + private final InetAddress clientAddress; + + public Session(final KafkaPrincipal principal, final InetAddress clientAddress) { + this.principal = principal; + this.clientAddress = clientAddress; + } + + public KafkaPrincipal getPrincipal() { + return principal; + } + + public InetAddress getClientAddress() { + return clientAddress; + } +} diff --git a/src/main/java/io/aiven/kafka/auth/audit/UserActivityAuditor.java b/src/main/java/io/aiven/kafka/auth/audit/UserActivityAuditor.java index 50bbdb0..f2d5293 100644 --- a/src/main/java/io/aiven/kafka/auth/audit/UserActivityAuditor.java +++ b/src/main/java/io/aiven/kafka/auth/audit/UserActivityAuditor.java @@ -23,7 +23,6 @@ import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.resource.ResourcePattern; -import kafka.network.RequestChannel; import org.slf4j.Logger; public class UserActivityAuditor extends Auditor { @@ -46,11 +45,11 @@ protected UserActivityAuditor(final Logger logger) { } @Override - protected void addActivity0(final RequestChannel.Session session, + protected void addActivity0(final Session session, final AclOperation operation, final ResourcePattern resource, final boolean hasAccess) { - final AuditKey auditKey = new AuditKey(session.principal(), session.clientAddress()); + final AuditKey auditKey = new AuditKey(session.getPrincipal(), session.getClientAddress()); auditStorage.compute(auditKey, (key, userActivity) -> Objects.isNull(userActivity) ? new UserActivity.UserActivityOperations() diff --git a/src/main/java/io/aiven/kafka/auth/audit/UserOperationsActivityAuditor.java b/src/main/java/io/aiven/kafka/auth/audit/UserOperationsActivityAuditor.java index 4ebc7ca..6946780 100644 --- a/src/main/java/io/aiven/kafka/auth/audit/UserOperationsActivityAuditor.java +++ b/src/main/java/io/aiven/kafka/auth/audit/UserOperationsActivityAuditor.java @@ -21,7 +21,6 @@ import org.apache.kafka.common.acl.AclOperation; import org.apache.kafka.common.resource.ResourcePattern; -import kafka.network.RequestChannel; import org.slf4j.Logger; public class UserOperationsActivityAuditor extends Auditor { @@ -35,7 +34,7 @@ protected UserOperationsActivityAuditor(final Logger logger) { } @Override - protected void addActivity0(final RequestChannel.Session session, + protected void addActivity0(final Session session, final AclOperation operation, final ResourcePattern resource, final boolean hasAccess) { @@ -46,18 +45,18 @@ protected void addActivity0(final RequestChannel.Session session, } else { ua = userActivity; } - ua.addOperation(new UserOperation(session.clientAddress(), operation, resource, hasAccess)); + ua.addOperation(new UserOperation(session.getClientAddress(), operation, resource, hasAccess)); return ua; }); } - private AuditKey createAuditKey(final RequestChannel.Session session) { + private AuditKey createAuditKey(final Session session) { final var grouping = auditorConfig.getAggregationGrouping(); switch (grouping) { case USER: - return new AuditKey(session.principal(), null); + return new AuditKey(session.getPrincipal(), null); case USER_AND_IP: - return new AuditKey(session.principal(), session.clientAddress()); + return new AuditKey(session.getPrincipal(), session.getClientAddress()); default: throw new IllegalArgumentException("Unknown aggregation grouping type: " + grouping); } diff --git a/src/test/java/io/aiven/kafka/auth/audit/FormatterTestBase.java b/src/test/java/io/aiven/kafka/auth/audit/FormatterTestBase.java index 5fe9c5a..6674618 100644 --- a/src/test/java/io/aiven/kafka/auth/audit/FormatterTestBase.java +++ b/src/test/java/io/aiven/kafka/auth/audit/FormatterTestBase.java @@ -30,13 +30,11 @@ import org.apache.kafka.common.resource.ResourceType; import org.apache.kafka.common.security.auth.KafkaPrincipal; -import kafka.network.RequestChannel; - import static org.junit.jupiter.api.Assertions.assertEquals; public class FormatterTestBase { - protected RequestChannel.Session session; + protected Session session; protected AclOperation operation; @@ -48,7 +46,7 @@ public class FormatterTestBase { protected ResourcePattern anotherResource; - protected RequestChannel.Session anotherSession; + protected Session anotherSession; protected InetAddress anotherInetAddress; @@ -60,9 +58,9 @@ protected FormatterTestBase(final AuditorConfig.AggregationGrouping aggregationG void setUp() throws Exception { final KafkaPrincipal principal = new KafkaPrincipal("PRINCIPAL_TYPE", "PRINCIPAL_NAME"); - session = new RequestChannel.Session(principal, InetAddress.getLocalHost()); + session = new Session(principal, InetAddress.getLocalHost()); anotherInetAddress = InetAddress.getByName("192.168.0.1"); - anotherSession = new RequestChannel.Session(principal, anotherInetAddress); + anotherSession = new Session(principal, anotherInetAddress); resource = new ResourcePattern( ResourceType.CLUSTER, @@ -88,20 +86,20 @@ protected void zeroOperations(final ZonedDateTime now, final String expected) { protected void twoOperations(final ZonedDateTime now, final String expected) { final Map dump = new HashMap<>(); final UserActivity userActivity = createUserActivity(now); - userActivity.addOperation(new UserOperation(session.clientAddress(), operation, resource, false)); + userActivity.addOperation(new UserOperation(session.getClientAddress(), operation, resource, false)); userActivity.addOperation( - new UserOperation(session.clientAddress(), anotherOperation, anotherResource, true)); + new UserOperation(session.getClientAddress(), anotherOperation, anotherResource, true)); dump.put(createAuditKey(session), userActivity); formatAndAssert(dump, expected); } - protected Auditor.AuditKey createAuditKey(final RequestChannel.Session session) { + protected Auditor.AuditKey createAuditKey(final Session session) { switch (aggregationGrouping) { case USER: - return new Auditor.AuditKey(session.principal(), null); + return new Auditor.AuditKey(session.getPrincipal(), null); case USER_AND_IP: - return new Auditor.AuditKey(session.principal(), session.clientAddress()); + return new Auditor.AuditKey(session.getPrincipal(), session.getClientAddress()); default: throw new IllegalArgumentException("Unknown aggregation grouping: " + aggregationGrouping); } diff --git a/src/test/java/io/aiven/kafka/auth/audit/PrincipalAndIpFormatterTest.java b/src/test/java/io/aiven/kafka/auth/audit/PrincipalAndIpFormatterTest.java index e03c699..4f43e32 100644 --- a/src/test/java/io/aiven/kafka/auth/audit/PrincipalAndIpFormatterTest.java +++ b/src/test/java/io/aiven/kafka/auth/audit/PrincipalAndIpFormatterTest.java @@ -86,12 +86,12 @@ protected void twoOperationsTwoIpAddresses(final ZonedDateTime now, final String final Map dump = new HashMap<>(); final UserActivity userActivity = createUserActivity(now); - userActivity.addOperation(new UserOperation(session.clientAddress(), operation, resource, false)); + userActivity.addOperation(new UserOperation(session.getClientAddress(), operation, resource, false)); dump.put(createAuditKey(session), userActivity); final UserActivity anotherUserActivity = createUserActivity(now); anotherUserActivity.addOperation( - new UserOperation(anotherSession.clientAddress(), anotherOperation, anotherResource, true)); + new UserOperation(anotherSession.getClientAddress(), anotherOperation, anotherResource, true)); dump.put(createAuditKey(anotherSession), anotherUserActivity); formatAndAssert(dump, expected); diff --git a/src/test/java/io/aiven/kafka/auth/audit/PrincipalFormatterTest.java b/src/test/java/io/aiven/kafka/auth/audit/PrincipalFormatterTest.java index 279db84..fa082ac 100644 --- a/src/test/java/io/aiven/kafka/auth/audit/PrincipalFormatterTest.java +++ b/src/test/java/io/aiven/kafka/auth/audit/PrincipalFormatterTest.java @@ -78,9 +78,9 @@ protected void twoOperationsTwoIpAddresses(final ZonedDateTime now, final String final UserActivity userActivity = createUserActivity(now); userActivity.addOperation( - new UserOperation(session.clientAddress(), operation, resource, false)); + new UserOperation(session.getClientAddress(), operation, resource, false)); userActivity.addOperation( - new UserOperation(anotherSession.clientAddress(), anotherOperation, anotherResource, true)); + new UserOperation(anotherSession.getClientAddress(), anotherOperation, anotherResource, true)); dump.put(createAuditKey(session), userActivity); formatAndAssert(dump, expected); diff --git a/src/test/java/io/aiven/kafka/auth/audit/UserActivityAuditorTest.java b/src/test/java/io/aiven/kafka/auth/audit/UserActivityAuditorTest.java index ebc34b8..1e562ba 100644 --- a/src/test/java/io/aiven/kafka/auth/audit/UserActivityAuditorTest.java +++ b/src/test/java/io/aiven/kafka/auth/audit/UserActivityAuditorTest.java @@ -29,7 +29,6 @@ import org.apache.kafka.common.resource.ResourceType; import org.apache.kafka.common.security.auth.KafkaPrincipal; -import kafka.network.RequestChannel.Session; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; diff --git a/src/test/java/io/aiven/kafka/auth/audit/UserOperationsActivityAuditorTest.java b/src/test/java/io/aiven/kafka/auth/audit/UserOperationsActivityAuditorTest.java index 7cccf13..24d9885 100644 --- a/src/test/java/io/aiven/kafka/auth/audit/UserOperationsActivityAuditorTest.java +++ b/src/test/java/io/aiven/kafka/auth/audit/UserOperationsActivityAuditorTest.java @@ -31,7 +31,6 @@ import org.apache.kafka.common.resource.ResourceType; import org.apache.kafka.common.security.auth.KafkaPrincipal; -import kafka.network.RequestChannel.Session; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -115,16 +114,16 @@ void shouldAggregateOperationsForSameUser() throws Exception { 2, cast(auditor.auditStorage.get( new Auditor.AuditKey( - session.principal(), - session.clientAddress()) + session.getPrincipal(), + session.getClientAddress()) ), UserActivity.UserActivityOperations.class).operations.size() ); assertEquals( 1, cast(auditor.auditStorage.get( new Auditor.AuditKey( - anotherSession.principal(), - anotherSession.clientAddress()) + anotherSession.getPrincipal(), + anotherSession.getClientAddress()) ), UserActivity.UserActivityOperations.class).operations.size() ); auditor.dump(); @@ -152,7 +151,7 @@ void shouldAggregateOperationsForSameUserAndPrincipalGrouping() throws Exception 2, cast(auditor.auditStorage.get( new Auditor.AuditKey( - session.principal(), + session.getPrincipal(), null) ), UserActivity.UserActivityOperationsGropedByIP.class).operations.size() );