diff --git a/build/pom.xml b/build/pom.xml index 3d8709ee683..a04576aa48b 100644 --- a/build/pom.xml +++ b/build/pom.xml @@ -89,6 +89,7 @@ 3.6.1 2.0 1.9.13 + 8.1.0 5.8.2 diff --git a/changes/en-us/2.x.md b/changes/en-us/2.x.md index fb389bf5cfd..007da4ca2db 100644 --- a/changes/en-us/2.x.md +++ b/changes/en-us/2.x.md @@ -5,6 +5,7 @@ Add changes here for all PR submitted to the 2.x branch. ### feature: - [[#7073](https://github.com/apache/incubator-seata/pull/7073)] support virtual thread,replace the usages of synchronized with ReentrantLock +- [[#6756](https://github.com/apache/incubator-seata/pull/6756)] feature: add single server rate limit - [[#7037](https://github.com/apache/incubator-seata/pull/7037)] support fury undolog parser - [[#7069](https://github.com/apache/incubator-seata/pull/7069)] Raft cluster mode supports address translation - [[#7038](https://github.com/apache/incubator-seata/pull/7038)] support fury serializer @@ -49,6 +50,7 @@ Thanks to these contributors for their code commits. Please report an unintended - [lightClouds917](https://github.com/lightClouds917) - [GoodBoyCoder](https://github.com/GoodBoyCoder) - [PeppaO](https://github.com/PeppaO) +- [xjlgod](https://github.com/xjlgod) - [funky-eyes](https://github.com/funky-eyes) - [psxjoy](https://github.com/psxjoy) - [xiaoxiangyeyu0](https://github.com/xiaoxiangyeyu0) diff --git a/changes/zh-cn/2.x.md b/changes/zh-cn/2.x.md index b2b1f20c95d..0bb65daaa8d 100644 --- a/changes/zh-cn/2.x.md +++ b/changes/zh-cn/2.x.md @@ -4,6 +4,7 @@ ### feature: +- [[#6756](https://github.com/apache/incubator-seata/pull/6756)] seata服务单点限流支持 - [[#7073](https://github.com/apache/incubator-seata/pull/7073)] 支持虚拟线程,用ReentrantLock替换synchronized的用法 - [[#7037](https://github.com/apache/incubator-seata/pull/7037)] 支持UndoLog的fury序列化方式 - [[#7069](https://github.com/apache/incubator-seata/pull/7069)] Raft集群模式支持地址转换 @@ -45,6 +46,7 @@ - [slievrly](https://github.com/slievrly) - [lyl2008dsg](https://github.com/lyl2008dsg) - [remind](https://github.com/remind) +- [xjlgod](https://github.com/xjlgod) - [lightClouds917](https://github.com/lightClouds917) - [GoodBoyCoder](https://github.com/GoodBoyCoder) - [PeppaO](https://github.com/PeppaO) diff --git a/common/src/main/java/org/apache/seata/common/ConfigurationKeys.java b/common/src/main/java/org/apache/seata/common/ConfigurationKeys.java index 297d64167b5..cbb8b0086e5 100644 --- a/common/src/main/java/org/apache/seata/common/ConfigurationKeys.java +++ b/common/src/main/java/org/apache/seata/common/ConfigurationKeys.java @@ -1121,6 +1121,31 @@ public interface ConfigurationKeys { */ String META_PREFIX = SEATA_FILE_ROOT_CONFIG + FILE_CONFIG_SPLIT_CHAR + FILE_ROOT_REGISTRY + FILE_CONFIG_SPLIT_CHAR + "metadata."; + /** + * The constant RATE_LIMIT_PREFIX. + */ + String RATE_LIMIT_PREFIX = SERVER_PREFIX + "ratelimit"; + + /** + * The constant RATE_LIMIT_BUCKET_TOKEN_NUM_PER_SECOND. + */ + String RATE_LIMIT_BUCKET_TOKEN_NUM_PER_SECOND = RATE_LIMIT_PREFIX + ".bucketTokenNumPerSecond"; + + /** + * The constant RATE_LIMIT_ENABLE. + */ + String RATE_LIMIT_ENABLE = RATE_LIMIT_PREFIX + ".enable"; + + /** + * The constant RATE_LIMIT_BUCKET_TOKEN_MAX_NUM. + */ + String RATE_LIMIT_BUCKET_TOKEN_MAX_NUM = RATE_LIMIT_PREFIX + ".bucketTokenMaxNum"; + + /** + * The constant RATE_LIMIT_BUCKET_TOKEN_INITIAL_NUM. + */ + String RATE_LIMIT_BUCKET_TOKEN_INITIAL_NUM = RATE_LIMIT_PREFIX + ".bucketTokenInitialNum"; + /** * The constant SERVER_REGISTRY_METADATA_PREFIX */ diff --git a/common/src/main/java/org/apache/seata/common/DefaultValues.java b/common/src/main/java/org/apache/seata/common/DefaultValues.java index 85ed496430d..228223a7ec8 100644 --- a/common/src/main/java/org/apache/seata/common/DefaultValues.java +++ b/common/src/main/java/org/apache/seata/common/DefaultValues.java @@ -497,6 +497,12 @@ public interface DefaultValues { */ int DEFAULT_ROCKET_MQ_MSG_TIMEOUT = 60 * 1000; + /** + * The constant DEFAULT_RATE_LIMIT_ENABLE. + */ + boolean DEFAULT_RATE_LIMIT_ENABLE = false; + + /** * The constant DEFAULT_RAFT_SSL_ENABLED. */ diff --git a/compatible/src/main/java/io/seata/tm/api/DefaultFailureHandlerImpl.java b/compatible/src/main/java/io/seata/tm/api/DefaultFailureHandlerImpl.java index f56ad5cad43..fc3998d9868 100644 --- a/compatible/src/main/java/io/seata/tm/api/DefaultFailureHandlerImpl.java +++ b/compatible/src/main/java/io/seata/tm/api/DefaultFailureHandlerImpl.java @@ -28,6 +28,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; + /** * The type Default failure handler. */ diff --git a/core/src/main/java/org/apache/seata/core/event/RateLimitEvent.java b/core/src/main/java/org/apache/seata/core/event/RateLimitEvent.java new file mode 100644 index 00000000000..f11a7158506 --- /dev/null +++ b/core/src/main/java/org/apache/seata/core/event/RateLimitEvent.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.core.event; + +public class RateLimitEvent implements Event { + + /** + * The Trace id. + */ + private String traceId; + + /** + * The Limit type (like GlobalBeginFailed). + */ + private String limitType; + + /** + * The Application id. + */ + private String applicationId; + + /** + * The Client id. + */ + private String clientId; + + /** + * The Server ip address and port. + */ + private String serverIpAddressAndPort; + + public String getTraceId() { + return traceId; + } + + public void setTraceId(String traceId) { + this.traceId = traceId; + } + + public String getLimitType() { + return limitType; + } + + public void setLimitType(String limitType) { + this.limitType = limitType; + } + + public String getApplicationId() { + return applicationId; + } + + public void setApplicationId(String applicationId) { + this.applicationId = applicationId; + } + + public String getClientId() { + return clientId; + } + + public void setClientId(String clientId) { + this.clientId = clientId; + } + + public String getServerIpAddressAndPort() { + return serverIpAddressAndPort; + } + + public void setServerIpAddressAndPort(String serverIpAddressAndPort) { + this.serverIpAddressAndPort = serverIpAddressAndPort; + } + + public RateLimitEvent(String traceId, String limitType, String applicationId, String clientId, String serverIpAddressAndPort) { + this.traceId = traceId; + this.limitType = limitType; + this.applicationId = applicationId; + this.clientId = clientId; + this.serverIpAddressAndPort = serverIpAddressAndPort; + } + + @Override + public String toString() { + return "RateLimitEvent{" + + "traceId='" + traceId + '\'' + + ", limitType='" + limitType + '\'' + + ", applicationId='" + applicationId + '\'' + + ", clientId='" + clientId + '\'' + + ", serverIpAddressAndPort='" + serverIpAddressAndPort + '\'' + + '}'; + } +} diff --git a/core/src/main/java/org/apache/seata/core/exception/TransactionExceptionCode.java b/core/src/main/java/org/apache/seata/core/exception/TransactionExceptionCode.java index 16a2e899dc6..962a9486ec4 100644 --- a/core/src/main/java/org/apache/seata/core/exception/TransactionExceptionCode.java +++ b/core/src/main/java/org/apache/seata/core/exception/TransactionExceptionCode.java @@ -31,7 +31,6 @@ public enum TransactionExceptionCode { * BeginFailed */ BeginFailed, - /** * Lock key conflict transaction exception code. */ diff --git a/core/src/main/java/org/apache/seata/core/protocol/MessageType.java b/core/src/main/java/org/apache/seata/core/protocol/MessageType.java index 58adbfbce6f..43a2108f39d 100644 --- a/core/src/main/java/org/apache/seata/core/protocol/MessageType.java +++ b/core/src/main/java/org/apache/seata/core/protocol/MessageType.java @@ -22,6 +22,11 @@ */ public interface MessageType { + /** + * The constant TYPE_NOT_EXIST. + */ + short TYPE_NOT_EXIST = 0; + /** * The constant TYPE_GLOBAL_BEGIN. */ diff --git a/core/src/test/java/org/apache/seata/core/protocol/ResultCodeTest.java b/core/src/test/java/org/apache/seata/core/protocol/ResultCodeTest.java index b1c7ef3e741..6039fbc6351 100644 --- a/core/src/test/java/org/apache/seata/core/protocol/ResultCodeTest.java +++ b/core/src/test/java/org/apache/seata/core/protocol/ResultCodeTest.java @@ -45,7 +45,8 @@ void getInt() { @Test void values() { - Assertions.assertArrayEquals(new ResultCode[]{ResultCode.Failed, ResultCode.Success}, ResultCode.values()); + Assertions.assertArrayEquals(new ResultCode[]{ResultCode.Failed, ResultCode.Success}, + ResultCode.values()); } @Test diff --git a/dependencies/pom.xml b/dependencies/pom.xml index 2b567d6927f..a68d94704aa 100644 --- a/dependencies/pom.xml +++ b/dependencies/pom.xml @@ -68,6 +68,7 @@ 6.3.0 1.0.0 1.82 + 8.1.0 1.21 1.10.12 1.7.1 @@ -618,6 +619,11 @@ ${jcommander.version} + + com.bucket4j + bucket4j_jdk8-core + ${bucket4j.version} + io.grpc grpc-testing diff --git a/integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/interceptor/handler/GlobalTransactionalInterceptorHandler.java b/integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/interceptor/handler/GlobalTransactionalInterceptorHandler.java index 288bd5dd782..e2b77967deb 100644 --- a/integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/interceptor/handler/GlobalTransactionalInterceptorHandler.java +++ b/integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/interceptor/handler/GlobalTransactionalInterceptorHandler.java @@ -56,6 +56,7 @@ import org.apache.seata.tm.api.transaction.NoRollbackRule; import org.apache.seata.tm.api.transaction.RollbackRule; import org.apache.seata.tm.api.transaction.TransactionInfo; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/metrics/seata-metrics-api/src/main/java/org/apache/seata/metrics/IdConstants.java b/metrics/seata-metrics-api/src/main/java/org/apache/seata/metrics/IdConstants.java index 507bafafee6..f6ee3e60280 100644 --- a/metrics/seata-metrics-api/src/main/java/org/apache/seata/metrics/IdConstants.java +++ b/metrics/seata-metrics-api/src/main/java/org/apache/seata/metrics/IdConstants.java @@ -25,6 +25,8 @@ public interface IdConstants { String SEATA_EXCEPTION = "seata.exception"; + String SEATA_RATE_LIMIT = "seata.rate.limit"; + String APP_ID_KEY = "applicationId"; String GROUP_KEY = "group"; @@ -79,4 +81,9 @@ public interface IdConstants { String STATUS_VALUE_AFTER_ROLLBACKED_KEY = "AfterRollbacked"; + String LIMIT_TYPE_KEY = "limitType"; + + String CLIENT_ID_KEY = "clientId"; + + String HOST_AND_PORT = "hostAndPort"; } diff --git a/saga/seata-saga-spring/src/main/java/org/apache/seata/saga/engine/tm/DefaultSagaTransactionalTemplate.java b/saga/seata-saga-spring/src/main/java/org/apache/seata/saga/engine/tm/DefaultSagaTransactionalTemplate.java index 869f38bf1a6..3f0234c060f 100644 --- a/saga/seata-saga-spring/src/main/java/org/apache/seata/saga/engine/tm/DefaultSagaTransactionalTemplate.java +++ b/saga/seata-saga-spring/src/main/java/org/apache/seata/saga/engine/tm/DefaultSagaTransactionalTemplate.java @@ -16,15 +16,14 @@ */ package org.apache.seata.saga.engine.tm; -import java.util.List; - import org.apache.seata.common.exception.FrameworkErrorCode; +import org.apache.seata.common.util.StringUtils; import org.apache.seata.core.exception.TransactionException; import org.apache.seata.core.model.BranchStatus; import org.apache.seata.core.model.BranchType; import org.apache.seata.core.model.GlobalStatus; -import org.apache.seata.core.rpc.netty.RmNettyRemotingClient; import org.apache.seata.core.rpc.ShutdownHook; +import org.apache.seata.core.rpc.netty.RmNettyRemotingClient; import org.apache.seata.core.rpc.netty.TmNettyRemotingClient; import org.apache.seata.rm.DefaultResourceManager; import org.apache.seata.rm.RMClient; @@ -39,7 +38,6 @@ import org.apache.seata.tm.api.transaction.TransactionHook; import org.apache.seata.tm.api.transaction.TransactionHookManager; import org.apache.seata.tm.api.transaction.TransactionInfo; -import org.apache.seata.common.util.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.BeansException; @@ -49,6 +47,8 @@ import org.springframework.context.ApplicationContextAware; import org.springframework.context.ConfigurableApplicationContext; +import java.util.List; + /** * Template of executing business logic with a global transaction for SAGA mode */ @@ -93,7 +93,6 @@ public GlobalTransaction beginTransaction(TransactionInfo txInfo) throws Transac triggerAfterBegin(tx); } catch (TransactionException txe) { throw new TransactionalExecutor.ExecutionException(tx, txe, TransactionalExecutor.Code.BeginFailure); - } return tx; } diff --git a/script/config-center/config.txt b/script/config-center/config.txt index af831bc5d23..52b62cc3c13 100644 --- a/script/config-center/config.txt +++ b/script/config-center/config.txt @@ -162,7 +162,10 @@ server.raft.serialization=jackson server.raft.compressor=none server.raft.sync=true - +server.ratelimit.enable=false +server.ratelimit.bucketTokenNumPerSecond = 999999 +server.ratelimit.bucketTokenMaxNum = 999999 +server.ratelimit.bucketTokenInitialNum = 999999 #Metrics configuration, only for the server metrics.enabled=true diff --git a/seata-spring-autoconfigure/seata-spring-autoconfigure-core/src/main/java/org/apache/seata/spring/boot/autoconfigure/StarterConstants.java b/seata-spring-autoconfigure/seata-spring-autoconfigure-core/src/main/java/org/apache/seata/spring/boot/autoconfigure/StarterConstants.java index 7a6c3c24e41..d9b7b0849af 100644 --- a/seata-spring-autoconfigure/seata-spring-autoconfigure-core/src/main/java/org/apache/seata/spring/boot/autoconfigure/StarterConstants.java +++ b/seata-spring-autoconfigure/seata-spring-autoconfigure-core/src/main/java/org/apache/seata/spring/boot/autoconfigure/StarterConstants.java @@ -71,6 +71,7 @@ public interface StarterConstants { String SERVER_PREFIX = SEATA_PREFIX + ".server"; + String SERVER_RATELIMIT_PREFIX = SERVER_PREFIX + ".ratelimit"; String SERVER_UNDO_PREFIX = SERVER_PREFIX + ".undo"; String SERVER_RAFT_PREFIX = SERVER_PREFIX + ".raft"; String SERVER_RAFT_SSL_PREFIX = SERVER_RAFT_PREFIX + ".ssl"; diff --git a/seata-spring-autoconfigure/seata-spring-autoconfigure-server/src/main/java/org/apache/seata/spring/boot/autoconfigure/SeataServerEnvironmentPostProcessor.java b/seata-spring-autoconfigure/seata-spring-autoconfigure-server/src/main/java/org/apache/seata/spring/boot/autoconfigure/SeataServerEnvironmentPostProcessor.java index f20268cf0dd..d1f66832f70 100644 --- a/seata-spring-autoconfigure/seata-spring-autoconfigure-server/src/main/java/org/apache/seata/spring/boot/autoconfigure/SeataServerEnvironmentPostProcessor.java +++ b/seata-spring-autoconfigure/seata-spring-autoconfigure-server/src/main/java/org/apache/seata/spring/boot/autoconfigure/SeataServerEnvironmentPostProcessor.java @@ -17,6 +17,8 @@ package org.apache.seata.spring.boot.autoconfigure; import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.seata.spring.boot.autoconfigure.properties.server.ServerRateLimitProperties; import org.apache.seata.spring.boot.autoconfigure.properties.server.raft.ServerRaftSSLClientProperties; import org.apache.seata.spring.boot.autoconfigure.properties.server.raft.ServerRaftSSLProperties; import org.apache.seata.spring.boot.autoconfigure.properties.server.raft.ServerRaftSSLServerProperties; @@ -41,6 +43,7 @@ import static org.apache.seata.spring.boot.autoconfigure.StarterConstants.PROPERTY_BEAN_MAP; import static org.apache.seata.spring.boot.autoconfigure.StarterConstants.SERVER_PREFIX; import static org.apache.seata.spring.boot.autoconfigure.StarterConstants.SERVER_RAFT_PREFIX; +import static org.apache.seata.spring.boot.autoconfigure.StarterConstants.SERVER_RATELIMIT_PREFIX; import static org.apache.seata.spring.boot.autoconfigure.StarterConstants.SERVER_RAFT_SSL_CLIENT_KEYSTORE_PREFIX; import static org.apache.seata.spring.boot.autoconfigure.StarterConstants.SERVER_RAFT_SSL_PREFIX; import static org.apache.seata.spring.boot.autoconfigure.StarterConstants.SERVER_RAFT_SSL_SERVER_KEYSTORE_PREFIX; @@ -90,6 +93,7 @@ public static void init() { PROPERTY_BEAN_MAP.put(SERVER_RAFT_SSL_CLIENT_KEYSTORE_PREFIX, ServerRaftSSLClientProperties.class); PROPERTY_BEAN_MAP.put(SESSION_PREFIX, SessionProperties.class); PROPERTY_BEAN_MAP.put(STORE_PREFIX, StoreProperties.class); + PROPERTY_BEAN_MAP.put(SERVER_RATELIMIT_PREFIX, ServerRateLimitProperties.class); } } diff --git a/seata-spring-autoconfigure/seata-spring-autoconfigure-server/src/main/java/org/apache/seata/spring/boot/autoconfigure/properties/server/ServerRateLimitProperties.java b/seata-spring-autoconfigure/seata-spring-autoconfigure-server/src/main/java/org/apache/seata/spring/boot/autoconfigure/properties/server/ServerRateLimitProperties.java new file mode 100644 index 00000000000..d9249eede82 --- /dev/null +++ b/seata-spring-autoconfigure/seata-spring-autoconfigure-server/src/main/java/org/apache/seata/spring/boot/autoconfigure/properties/server/ServerRateLimitProperties.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.spring.boot.autoconfigure.properties.server; + +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.stereotype.Component; + +import static org.apache.seata.spring.boot.autoconfigure.StarterConstants.SERVER_RATELIMIT_PREFIX; + +@Component +@ConfigurationProperties(prefix = SERVER_RATELIMIT_PREFIX) +public class ServerRateLimitProperties { + /** + * whether enable server rate limit + */ + private boolean enable; + + /** + * limit token number of bucket per second + */ + private Integer bucketTokenNumPerSecond; + + /** + * limit token max number of bucket + */ + private Integer bucketTokenMaxNum; + + /** + * limit token initial number of bucket + */ + private Integer bucketTokenInitialTime; + + public boolean isEnable() { + return enable; + } + + public void setEnable(boolean enable) { + this.enable = enable; + } + + public Integer getBucketTokenNumPerSecond() { + return bucketTokenNumPerSecond; + } + + public void setBucketTokenNumPerSecond(Integer bucketTokenNumPerSecond) { + this.bucketTokenNumPerSecond = bucketTokenNumPerSecond; + } + + public Integer getBucketTokenMaxNum() { + return bucketTokenMaxNum; + } + + public void setBucketTokenMaxNum(Integer bucketTokenMaxNum) { + this.bucketTokenMaxNum = bucketTokenMaxNum; + } + + public Integer getBucketTokenInitialTime() { + return bucketTokenInitialTime; + } + + public void setBucketTokenInitialTime(Integer bucketTokenInitialTime) { + this.bucketTokenInitialTime = bucketTokenInitialTime; + } +} diff --git a/server/pom.xml b/server/pom.xml index 5ba3e8dd9e9..7b8a4b1b94d 100644 --- a/server/pom.xml +++ b/server/pom.xml @@ -329,6 +329,13 @@ jackson-mapper-asl ${jackson-mapper.version} + + + + com.bucket4j + bucket4j_jdk8-core + ${bucket4j.version} + diff --git a/server/src/main/java/org/apache/seata/server/coordinator/DefaultCoordinator.java b/server/src/main/java/org/apache/seata/server/coordinator/DefaultCoordinator.java index 468e9ccc5ff..f6a5e6cb8b5 100644 --- a/server/src/main/java/org/apache/seata/server/coordinator/DefaultCoordinator.java +++ b/server/src/main/java/org/apache/seata/server/coordinator/DefaultCoordinator.java @@ -66,6 +66,7 @@ import org.apache.seata.core.rpc.netty.ChannelManager; import org.apache.seata.core.rpc.netty.NettyRemotingServer; import org.apache.seata.server.AbstractTCInboundHandler; +import org.apache.seata.server.limit.LimitRequestDecorator; import org.apache.seata.server.metrics.MetricsPublisher; import org.apache.seata.server.session.BranchSession; import org.apache.seata.server.session.GlobalSession; @@ -643,7 +644,8 @@ public AbstractResultMessage onRequest(AbstractMessage request, RpcContext conte AbstractTransactionRequestToTC transactionRequest = (AbstractTransactionRequestToTC) request; transactionRequest.setTCInboundHandler(this); - return transactionRequest.handle(context); + LimitRequestDecorator limitRequestDecorator = new LimitRequestDecorator(transactionRequest); + return limitRequestDecorator.handle(context); } @Override diff --git a/server/src/main/java/org/apache/seata/server/limit/AbstractTransactionRequestHandler.java b/server/src/main/java/org/apache/seata/server/limit/AbstractTransactionRequestHandler.java new file mode 100644 index 00000000000..55fff518d59 --- /dev/null +++ b/server/src/main/java/org/apache/seata/server/limit/AbstractTransactionRequestHandler.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.server.limit; + +import org.apache.seata.core.protocol.transaction.AbstractTransactionRequestToTC; +import org.apache.seata.core.protocol.transaction.AbstractTransactionResponse; +import org.apache.seata.core.rpc.RpcContext; + +/** + * TransactionRequestLimitHandler + */ +public abstract class AbstractTransactionRequestHandler { + + /** + * limit handler + */ + protected AbstractTransactionRequestHandler abstractTransactionRequestHandler; + + public AbstractTransactionRequestHandler() { + } + + /** + * next handler handle + * @param context + * @return + */ + protected AbstractTransactionResponse next(AbstractTransactionRequestToTC originRequest, RpcContext context) { + if (abstractTransactionRequestHandler != null) { + return abstractTransactionRequestHandler.next(originRequest, context); + } + return originRequest.handle(context); + } + + public abstract AbstractTransactionResponse handle(AbstractTransactionRequestToTC originRequest, RpcContext context); + + public void setTransactionRequestLimitHandler(AbstractTransactionRequestHandler abstractTransactionRequestHandler) { + this.abstractTransactionRequestHandler = abstractTransactionRequestHandler; + } +} diff --git a/server/src/main/java/org/apache/seata/server/limit/LimitRequestDecorator.java b/server/src/main/java/org/apache/seata/server/limit/LimitRequestDecorator.java new file mode 100644 index 00000000000..edfde9e2013 --- /dev/null +++ b/server/src/main/java/org/apache/seata/server/limit/LimitRequestDecorator.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.server.limit; + +import org.apache.seata.core.protocol.transaction.AbstractTransactionRequestToTC; +import org.apache.seata.core.protocol.transaction.AbstractTransactionResponse; +import org.apache.seata.core.rpc.RpcContext; +import org.apache.seata.server.limit.ratelimit.RateLimiterHandler; + +/** + * LimitRequestDecorator decorate AbstractTransactionRequestToTC to use limiter + */ +public class LimitRequestDecorator extends AbstractTransactionRequestToTC { + + private AbstractTransactionRequestToTC originalRequest; + + private AbstractTransactionRequestHandler requestLimitHandler; + + public LimitRequestDecorator(AbstractTransactionRequestToTC originalRequest) { + this.originalRequest = originalRequest; + + // create server rate limter + RateLimiterHandler rateLimiterHandler = RateLimiterHandler.getInstance(); + rateLimiterHandler.setTransactionRequestLimitHandler(null); + requestLimitHandler = rateLimiterHandler; + } + + + @Override + public AbstractTransactionResponse handle(RpcContext rpcContext) { + return requestLimitHandler.handle(originalRequest, rpcContext); + } + + @Override + public short getTypeCode() { + return originalRequest.getTypeCode(); + } +} diff --git a/server/src/main/java/org/apache/seata/server/limit/ratelimit/RateLimitInfo.java b/server/src/main/java/org/apache/seata/server/limit/ratelimit/RateLimitInfo.java new file mode 100644 index 00000000000..1d1fd570fd8 --- /dev/null +++ b/server/src/main/java/org/apache/seata/server/limit/ratelimit/RateLimitInfo.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.server.limit.ratelimit; + +import org.apache.seata.common.util.UUIDGenerator; + +/** + * The type Rate limit info. + */ +public class RateLimitInfo { + + /** + * The constant ROLE_TC. + */ + public static final String GLOBAL_BEGIN_FAILED = "globalBeginFailed"; + + /** + * The Trace id. + */ + private String traceId; + + /** + * The Limit type (like GlobalBeginFailed). + */ + private String limitType; + + /** + * The Application id. + */ + private String applicationId; + + /** + * The Client id. + */ + private String clientId; + + /** + * The Server ip address and port. + */ + private String serverIpAddressAndPort; + + private RateLimitInfo() { + } + + public static RateLimitInfo generateRateLimitInfo(String applicationId, String type, + String clientId, String serverIpAddressAndPort) { + RateLimitInfo rateLimitInfo = new RateLimitInfo(); + rateLimitInfo.setTraceId(String.valueOf(UUIDGenerator.generateUUID())); + rateLimitInfo.setLimitType(type); + rateLimitInfo.setApplicationId(applicationId); + rateLimitInfo.setClientId(clientId); + rateLimitInfo.setServerIpAddressAndPort(serverIpAddressAndPort); + return rateLimitInfo; + } + + public String getTraceId() { + return traceId; + } + + public void setTraceId(String traceId) { + this.traceId = traceId; + } + + public String getLimitType() { + return limitType; + } + + public void setLimitType(String limitType) { + this.limitType = limitType; + } + + public String getApplicationId() { + return applicationId; + } + + public void setApplicationId(String applicationId) { + this.applicationId = applicationId; + } + + public String getClientId() { + return clientId; + } + + public void setClientId(String clientId) { + this.clientId = clientId; + } + + public String getServerIpAddressAndPort() { + return serverIpAddressAndPort; + } + + public void setServerIpAddressAndPort(String serverIpAddressAndPort) { + this.serverIpAddressAndPort = serverIpAddressAndPort; + } + + @Override + public String toString() { + return "RateLimitInfo{" + + "traceId='" + traceId + '\'' + + ", limitType='" + limitType + '\'' + + ", applicationId='" + applicationId + '\'' + + ", clientId='" + clientId + '\'' + + ", serverIpAddressAndPort='" + serverIpAddressAndPort + '\'' + + '}'; + } +} diff --git a/server/src/main/java/org/apache/seata/server/limit/ratelimit/RateLimiter.java b/server/src/main/java/org/apache/seata/server/limit/ratelimit/RateLimiter.java new file mode 100644 index 00000000000..ffddedd58e0 --- /dev/null +++ b/server/src/main/java/org/apache/seata/server/limit/ratelimit/RateLimiter.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.server.limit.ratelimit; + +/** + * RateLimiter + */ +public interface RateLimiter { + /** + * check whether the request can pass + * + * @return the boolean + */ + boolean canPass(); + + /** + * reInit reinitialize the rate limiter + */ + void reInit(RateLimiterHandlerConfig config); + + /** + * obtainConfig obtain the config of rate limiter + * + * @return + */ + RateLimiterHandlerConfig obtainConfig(); + + /** + * whether the rate limiter is enabled + * + * @return the boolean + */ + boolean isEnable(); +} diff --git a/server/src/main/java/org/apache/seata/server/limit/ratelimit/RateLimiterHandler.java b/server/src/main/java/org/apache/seata/server/limit/ratelimit/RateLimiterHandler.java new file mode 100644 index 00000000000..a52dd74dd8d --- /dev/null +++ b/server/src/main/java/org/apache/seata/server/limit/ratelimit/RateLimiterHandler.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.server.limit.ratelimit; + +import org.apache.seata.common.XID; +import org.apache.seata.common.loader.EnhancedServiceLoader; +import org.apache.seata.common.util.NumberUtils; +import org.apache.seata.config.CachedConfigurationChangeListener; +import org.apache.seata.config.Configuration; +import org.apache.seata.config.ConfigurationChangeEvent; +import org.apache.seata.common.ConfigurationKeys; +import org.apache.seata.config.ConfigurationFactory; +import org.apache.seata.core.exception.TransactionExceptionCode; +import org.apache.seata.core.protocol.MessageType; +import org.apache.seata.core.protocol.ResultCode; +import org.apache.seata.core.protocol.transaction.AbstractTransactionRequestToTC; +import org.apache.seata.core.protocol.transaction.AbstractTransactionResponse; +import org.apache.seata.core.protocol.transaction.GlobalBeginResponse; +import org.apache.seata.core.rpc.RpcContext; +import org.apache.seata.server.limit.AbstractTransactionRequestHandler; +import org.apache.seata.server.metrics.MetricsPublisher; + +/** + * RateLimiterHandler + */ +public class RateLimiterHandler extends AbstractTransactionRequestHandler implements CachedConfigurationChangeListener { + /** + * The instance of RateLimiterHandler + */ + private static volatile RateLimiterHandler instance; + + /** + * The instance of RateLimiter + */ + private final RateLimiter rateLimiter; + + /** + * The config of RateLimiterHandler + */ + private final RateLimiterHandlerConfig config; + + public RateLimiterHandler(RateLimiter rateLimiter) { + this.rateLimiter = rateLimiter; + this.config = new RateLimiterHandlerConfig(); + } + + private RateLimiterHandler() { + rateLimiter = EnhancedServiceLoader.load(RateLimiter.class); + config = rateLimiter.obtainConfig(); + + Configuration config = ConfigurationFactory.getInstance(); + config.addConfigListener(ConfigurationKeys.RATE_LIMIT_ENABLE, this); + config.addConfigListener(ConfigurationKeys.RATE_LIMIT_BUCKET_TOKEN_NUM_PER_SECOND, this); + config.addConfigListener(ConfigurationKeys.RATE_LIMIT_BUCKET_TOKEN_MAX_NUM, this); + config.addConfigListener(ConfigurationKeys.RATE_LIMIT_BUCKET_TOKEN_INITIAL_NUM, this); + } + + @Override + public AbstractTransactionResponse handle(AbstractTransactionRequestToTC originRequest, RpcContext context) { + if (!rateLimiter.isEnable()) { + return next(originRequest, context); + } + + if (MessageType.TYPE_GLOBAL_BEGIN == originRequest.getTypeCode()) { + if (!rateLimiter.canPass()) { + GlobalBeginResponse response = new GlobalBeginResponse(); + response.setTransactionExceptionCode(TransactionExceptionCode.BeginFailed); + response.setResultCode(ResultCode.Failed); + RateLimitInfo rateLimitInfo = RateLimitInfo.generateRateLimitInfo(context.getApplicationId(), + RateLimitInfo.GLOBAL_BEGIN_FAILED, context.getClientId(), XID.getIpAddressAndPort()); + MetricsPublisher.postRateLimitEvent(rateLimitInfo); + response.setMsg(String.format("TransactionException[rate limit exception, rate limit info: %s]", rateLimitInfo)); + return response; + } + } + return next(originRequest, context); + } + + public static RateLimiterHandler getInstance() { + if (instance == null) { + synchronized (RateLimiterHandler.class) { + if (instance == null) { + instance = new RateLimiterHandler(); + } + } + } + return instance; + } + + @Override + public void onChangeEvent(ConfigurationChangeEvent event) { + String dataId = event.getDataId(); + String newValue = event.getNewValue(); + if (ConfigurationKeys.RATE_LIMIT_ENABLE.equals(dataId)) { + config.setEnable(Boolean.parseBoolean(newValue)); + } + if (ConfigurationKeys.RATE_LIMIT_BUCKET_TOKEN_NUM_PER_SECOND.equals(dataId)) { + config.setBucketTokenNumPerSecond(NumberUtils.toInt(newValue, config.getBucketTokenNumPerSecond())); + } + if (ConfigurationKeys.RATE_LIMIT_BUCKET_TOKEN_MAX_NUM.equals(dataId)) { + config.setBucketTokenMaxNum(NumberUtils.toInt(newValue, config.getBucketTokenMaxNum())); + } + if (ConfigurationKeys.RATE_LIMIT_BUCKET_TOKEN_INITIAL_NUM.equals(dataId)) { + config.setBucketTokenInitialNum(NumberUtils.toInt(newValue, config.getBucketTokenInitialNum())); + } + rateLimiter.reInit(config); + } +} diff --git a/server/src/main/java/org/apache/seata/server/limit/ratelimit/RateLimiterHandlerConfig.java b/server/src/main/java/org/apache/seata/server/limit/ratelimit/RateLimiterHandlerConfig.java new file mode 100644 index 00000000000..89db918e2f1 --- /dev/null +++ b/server/src/main/java/org/apache/seata/server/limit/ratelimit/RateLimiterHandlerConfig.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.server.limit.ratelimit; + +/** + * RateLimiterHandlerConfig + */ +public class RateLimiterHandlerConfig { + /** + * whether enable server rate limit + */ + private boolean enable; + + /** + * limit token number of bucket per second + */ + private int bucketTokenNumPerSecond; + + /** + * limit token max number of bucket + */ + private int bucketTokenMaxNum; + + /** + * limit token initial number of bucket + */ + private int bucketTokenInitialNum; + + public boolean isEnable() { + return enable; + } + + public void setEnable(boolean enable) { + this.enable = enable; + } + + public int getBucketTokenNumPerSecond() { + return bucketTokenNumPerSecond; + } + + public void setBucketTokenNumPerSecond(int bucketTokenNumPerSecond) { + this.bucketTokenNumPerSecond = bucketTokenNumPerSecond; + } + + public int getBucketTokenMaxNum() { + return bucketTokenMaxNum; + } + + public void setBucketTokenMaxNum(int bucketTokenMaxNum) { + this.bucketTokenMaxNum = bucketTokenMaxNum; + } + + public int getBucketTokenInitialNum() { + return bucketTokenInitialNum; + } + + public void setBucketTokenInitialNum(int bucketTokenInitialNum) { + this.bucketTokenInitialNum = bucketTokenInitialNum; + } +} diff --git a/server/src/main/java/org/apache/seata/server/limit/ratelimit/TokenBucketLimiter.java b/server/src/main/java/org/apache/seata/server/limit/ratelimit/TokenBucketLimiter.java new file mode 100644 index 00000000000..419b4fe7050 --- /dev/null +++ b/server/src/main/java/org/apache/seata/server/limit/ratelimit/TokenBucketLimiter.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.server.limit.ratelimit; + +import org.apache.seata.common.ConfigurationKeys; +import org.apache.seata.common.executor.Initialize; +import org.apache.seata.common.loader.LoadLevel; +import org.apache.seata.common.loader.Scope; +import org.apache.seata.common.util.NumberUtils; +import org.apache.seata.config.Configuration; +import org.apache.seata.config.ConfigurationFactory; + +import io.github.bucket4j.Bandwidth; +import io.github.bucket4j.Bucket; +import io.github.bucket4j.Refill; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; + +/** + * TokenBucketLimiter based on Bucket4j + */ +@LoadLevel(name = "token-bucket-limiter", scope = Scope.SINGLETON) +public class TokenBucketLimiter implements RateLimiter, Initialize { + private static final Logger LOGGER = LoggerFactory.getLogger(TokenBucketLimiter.class); + + /** + * whether enable server rate limit + */ + private boolean enable; + + /** + * limit token number of bucket per second + */ + private Integer bucketTokenNumPerSecond; + + /** + * limit token max number of bucket + */ + private Integer bucketTokenMaxNum; + + /** + * limit token initial number of bucket + */ + private Integer bucketTokenInitialNum; + + /** + * the Bucket + */ + private Bucket bucket; + + private static final int DEFAULT_BUCKET_TOKEN_NUM_PER_SECOND = Integer.MAX_VALUE; + private static final int DEFAULT_BUCKET_TOKEN_MAX_NUM = Integer.MAX_VALUE; + private static final int DEFAULT_BUCKET_TOKEN_INITIAL_NUM = Integer.MAX_VALUE; + + public TokenBucketLimiter() {} + + public TokenBucketLimiter(boolean enable, Integer bucketTokenNumPerSecond, + Integer bucketTokenMaxNum, Integer bucketTokenInitialNum) { + this.enable = enable; + this.bucketTokenNumPerSecond = bucketTokenNumPerSecond; + this.bucketTokenMaxNum = bucketTokenMaxNum; + this.bucketTokenInitialNum = bucketTokenInitialNum; + initBucket(); + } + + @Override + public void init() { + final Configuration config = ConfigurationFactory.getInstance(); + this.enable = config.getBoolean(ConfigurationKeys.RATE_LIMIT_ENABLE); + this.bucketTokenNumPerSecond = NumberUtils.toInt( + config.getConfig(ConfigurationKeys.RATE_LIMIT_BUCKET_TOKEN_NUM_PER_SECOND), + DEFAULT_BUCKET_TOKEN_NUM_PER_SECOND + ); + this.bucketTokenMaxNum = NumberUtils.toInt( + config.getConfig(ConfigurationKeys.RATE_LIMIT_BUCKET_TOKEN_MAX_NUM), + DEFAULT_BUCKET_TOKEN_MAX_NUM + ); + this.bucketTokenInitialNum = NumberUtils.toInt( + config.getConfig(ConfigurationKeys.RATE_LIMIT_BUCKET_TOKEN_INITIAL_NUM), + DEFAULT_BUCKET_TOKEN_INITIAL_NUM + ); + + if (this.enable) { + initBucket(); + LOGGER.info("TokenBucketLimiter init success, bucketTokenNumPerSecond: {}, tokenMaxNum: {}, tokenInitialNum: {}", + this.bucketTokenNumPerSecond, this.bucketTokenMaxNum, this.bucketTokenInitialNum); + } + } + + @Override + public boolean canPass() { + return bucket.tryConsume(1); + } + + @Override + public void reInit(RateLimiterHandlerConfig config) { + this.enable = config.isEnable(); + this.bucketTokenNumPerSecond = config.getBucketTokenNumPerSecond(); + this.bucketTokenMaxNum = config.getBucketTokenMaxNum(); + this.bucketTokenInitialNum = config.getBucketTokenInitialNum(); + + if (this.enable) { + initBucket(); + LOGGER.info("TokenBucketLimiter reInit success, bucketTokenNumPerSecond: {}, tokenMaxNum: {}, tokenInitialNum: {}", + this.bucketTokenNumPerSecond, this.bucketTokenMaxNum, this.bucketTokenInitialNum); + return; + } + LOGGER.info("TokenBucketLimiter reInit success, The limiter is disabled"); + } + + @Override + public RateLimiterHandlerConfig obtainConfig() { + RateLimiterHandlerConfig config = new RateLimiterHandlerConfig(); + config.setEnable(this.enable); + config.setBucketTokenNumPerSecond(this.bucketTokenNumPerSecond); + config.setBucketTokenMaxNum(this.bucketTokenMaxNum); + config.setBucketTokenInitialNum(this.bucketTokenInitialNum); + return config; + } + + @Override + public boolean isEnable() { + return this.enable; + } + + private void initBucket() { + Bandwidth limit = Bandwidth.classic(this.bucketTokenMaxNum, Refill.greedy(this.bucketTokenNumPerSecond, + Duration.ofSeconds(1))); + Bucket bucket = Bucket.builder().addLimit(limit).build(); + if (this.bucketTokenInitialNum > 0) { + bucket.addTokens(this.bucketTokenInitialNum); + } + this.bucket = bucket; + } + +} diff --git a/server/src/main/java/org/apache/seata/server/metrics/MeterIdConstants.java b/server/src/main/java/org/apache/seata/server/metrics/MeterIdConstants.java index 18787594f8f..78ea4fca537 100644 --- a/server/src/main/java/org/apache/seata/server/metrics/MeterIdConstants.java +++ b/server/src/main/java/org/apache/seata/server/metrics/MeterIdConstants.java @@ -109,4 +109,8 @@ public interface MeterIdConstants { Id SUMMARY_EXP = new Id(IdConstants.SEATA_EXCEPTION) .withTag(IdConstants.ROLE_KEY, IdConstants.ROLE_VALUE_TC) .withTag(IdConstants.METER_KEY, IdConstants.METER_VALUE_SUMMARY); + + Id SUMMARY_RATE_LIMIT = new Id(IdConstants.SEATA_RATE_LIMIT) + .withTag(IdConstants.ROLE_KEY, IdConstants.ROLE_VALUE_TC) + .withTag(IdConstants.METER_KEY, IdConstants.METER_VALUE_SUMMARY); } diff --git a/server/src/main/java/org/apache/seata/server/metrics/MetricsPublisher.java b/server/src/main/java/org/apache/seata/server/metrics/MetricsPublisher.java index d7dc1beef13..0f51809986a 100644 --- a/server/src/main/java/org/apache/seata/server/metrics/MetricsPublisher.java +++ b/server/src/main/java/org/apache/seata/server/metrics/MetricsPublisher.java @@ -18,8 +18,10 @@ import org.apache.seata.core.event.EventBus; import org.apache.seata.core.event.GlobalTransactionEvent; +import org.apache.seata.core.event.RateLimitEvent; import org.apache.seata.core.model.GlobalStatus; import org.apache.seata.server.event.EventBusManager; +import org.apache.seata.server.limit.ratelimit.RateLimitInfo; import org.apache.seata.server.session.GlobalSession; /** @@ -94,4 +96,14 @@ public static void postSessionDoingEvent(final GlobalSession globalSession, Stri globalSession.getTransactionName(), globalSession.getApplicationId(), globalSession.getTransactionServiceGroup(), globalSession.getBeginTime(), null, status, retryGlobal, retryBranch)); } + + /** + * Post rate limit event. + * + * @param rateLimitInfo the rate limit info + */ + public static void postRateLimitEvent(RateLimitInfo rateLimitInfo) { + EVENT_BUS.post(new RateLimitEvent(rateLimitInfo.getTraceId(), rateLimitInfo.getLimitType(), rateLimitInfo.getApplicationId(), + rateLimitInfo.getClientId(), rateLimitInfo.getServerIpAddressAndPort())); + } } diff --git a/server/src/main/java/org/apache/seata/server/metrics/MetricsSubscriber.java b/server/src/main/java/org/apache/seata/server/metrics/MetricsSubscriber.java index 2a937d0b98c..74d6fbe8816 100644 --- a/server/src/main/java/org/apache/seata/server/metrics/MetricsSubscriber.java +++ b/server/src/main/java/org/apache/seata/server/metrics/MetricsSubscriber.java @@ -16,14 +16,10 @@ */ package org.apache.seata.server.metrics; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.TimeUnit; -import java.util.function.Consumer; - import com.google.common.eventbus.Subscribe; import org.apache.seata.core.event.ExceptionEvent; import org.apache.seata.core.event.GlobalTransactionEvent; +import org.apache.seata.core.event.RateLimitEvent; import org.apache.seata.core.model.GlobalStatus; import org.apache.seata.metrics.Id; import org.apache.seata.metrics.registry.Registry; @@ -31,8 +27,16 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; + import static org.apache.seata.metrics.IdConstants.APP_ID_KEY; +import static org.apache.seata.metrics.IdConstants.CLIENT_ID_KEY; import static org.apache.seata.metrics.IdConstants.GROUP_KEY; +import static org.apache.seata.metrics.IdConstants.LIMIT_TYPE_KEY; +import static org.apache.seata.metrics.IdConstants.HOST_AND_PORT; import static org.apache.seata.metrics.IdConstants.STATUS_VALUE_AFTER_COMMITTED_KEY; import static org.apache.seata.metrics.IdConstants.STATUS_VALUE_AFTER_ROLLBACKED_KEY; @@ -193,6 +197,15 @@ public void exceptionEventForMetrics(ExceptionEvent event) { .withTag(APP_ID_KEY, event.getName())).increase(1); } + @Subscribe + public void recordRateLimitEventForMetrics(RateLimitEvent event) { + registry.getSummary(MeterIdConstants.SUMMARY_RATE_LIMIT + .withTag(LIMIT_TYPE_KEY, event.getLimitType()) + .withTag(APP_ID_KEY, event.getApplicationId()) + .withTag(CLIENT_ID_KEY, event.getClientId()) + .withTag(HOST_AND_PORT, event.getServerIpAddressAndPort())).increase(1); + } + @Override public boolean equals(Object obj) { return this.getClass().getName().equals(obj.getClass().getName()); diff --git a/server/src/main/resources/META-INF/services/org.apache.seata.server.limit.ratelimit.RateLimiter b/server/src/main/resources/META-INF/services/org.apache.seata.server.limit.ratelimit.RateLimiter new file mode 100644 index 00000000000..ef355142ff1 --- /dev/null +++ b/server/src/main/resources/META-INF/services/org.apache.seata.server.limit.ratelimit.RateLimiter @@ -0,0 +1,17 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +org.apache.seata.server.limit.ratelimit.TokenBucketLimiter \ No newline at end of file diff --git a/server/src/main/resources/application.example.yml b/server/src/main/resources/application.example.yml index 059312ae856..7231b11180c 100644 --- a/server/src/main/resources/application.example.yml +++ b/server/src/main/resources/application.example.yml @@ -159,6 +159,11 @@ seata: session: branch-async-queue-size: 5000 #branch async remove queue size enable-branch-async-remove: false #enable to asynchronous remove branchSession + ratelimit: + enable: false + bucketTokenSecondNum: 999999 + bucketTokenMaxNum: 999999 + bucketTokenInitialNum: 999999 store: # support: file 、 db 、 redis 、 raft mode: file diff --git a/server/src/main/resources/application.raft.example.yml b/server/src/main/resources/application.raft.example.yml index c427dce5760..dea10b382fe 100644 --- a/server/src/main/resources/application.raft.example.yml +++ b/server/src/main/resources/application.raft.example.yml @@ -129,6 +129,11 @@ seata: session: branch-async-queue-size: 5000 #branch async remove queue size enable-branch-async-remove: false #enable to asynchronous remove branchSession + ratelimit: + enable: false + bucketTokenNumPerSecond: 999999 + bucketTokenMaxNum: 999999 + bucketTokenInitialNum: 999999 store: # support: file mode: raft diff --git a/server/src/test/java/org/apache/seata/server/ratelimiter/RateLimiterHandlerTest.java b/server/src/test/java/org/apache/seata/server/ratelimiter/RateLimiterHandlerTest.java new file mode 100644 index 00000000000..4ef8d0bf4e5 --- /dev/null +++ b/server/src/test/java/org/apache/seata/server/ratelimiter/RateLimiterHandlerTest.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.server.ratelimiter; + +import org.apache.seata.core.protocol.transaction.GlobalBeginRequest; +import org.apache.seata.core.rpc.RpcContext; +import org.apache.seata.server.limit.ratelimit.RateLimiter; +import org.apache.seata.server.limit.ratelimit.RateLimiterHandler; +import org.apache.seata.server.limit.ratelimit.TokenBucketLimiter; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.boot.test.context.SpringBootTest; + +/** + * RateLimiterHandlerTest + */ +@SpringBootTest +public class RateLimiterHandlerTest { + + /** + * Logger for TokenBucketLimiterTest + **/ + private static final Logger LOGGER = LoggerFactory.getLogger(RateLimiterHandlerTest.class); + + private static RateLimiterHandler rateLimiterHandler; + + @Test + public void testHandlePass() { + RateLimiter rateLimiter = new TokenBucketLimiter(true, 1, + 10, 10); + rateLimiterHandler = new RateLimiterHandler(rateLimiter); + GlobalBeginRequest request = new GlobalBeginRequest(); + RpcContext rpcContext = new RpcContext(); + Assertions.assertThrowsExactly(NullPointerException.class, () -> rateLimiterHandler.handle(request, rpcContext)); + } + + @Test + public void testHandleNotPass() { + RateLimiter rateLimiter = new TokenBucketLimiter(true, 1, + 1, 0); + rateLimiterHandler = new RateLimiterHandler(rateLimiter); + GlobalBeginRequest request = new GlobalBeginRequest(); + RpcContext rpcContext = new RpcContext(); + Assertions.assertThrowsExactly(NullPointerException.class, () -> rateLimiterHandler.handle(request, rpcContext)); + } + +} diff --git a/server/src/test/java/org/apache/seata/server/ratelimiter/TokenBucketLimiterTest.java b/server/src/test/java/org/apache/seata/server/ratelimiter/TokenBucketLimiterTest.java new file mode 100644 index 00000000000..7f386665ca5 --- /dev/null +++ b/server/src/test/java/org/apache/seata/server/ratelimiter/TokenBucketLimiterTest.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.server.ratelimiter; + +import org.apache.seata.common.thread.NamedThreadFactory; +import org.apache.seata.server.limit.ratelimit.RateLimiter; +import org.apache.seata.server.limit.ratelimit.TokenBucketLimiter; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.util.StopWatch; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * TokenBucketLimiterTest + */ +@SpringBootTest +public class TokenBucketLimiterTest { + + /** + * Logger for TokenBucketLimiterTest + **/ + private static final Logger LOGGER = LoggerFactory.getLogger(TokenBucketLimiterTest.class); + + @Test + public void testPerformanceOfTokenBucketLimiter() throws InterruptedException { + RateLimiter rateLimiter = new TokenBucketLimiter(true, 1, + 10, 10); + int threads = 10; + final int count = 100; + final CountDownLatch cnt = new CountDownLatch(count * threads); + + final ThreadPoolExecutor service1 = new ThreadPoolExecutor(threads, threads, 0L, TimeUnit.MILLISECONDS, + new SynchronousQueue(), new NamedThreadFactory("test1", false)); + AtomicInteger totalPass = new AtomicInteger(); + AtomicInteger totalReject = new AtomicInteger(); + StopWatch totalStopWatch = new StopWatch(); + totalStopWatch.start(); + for (int i = 0; i < threads; i++) { + service1.execute(() -> { + int pass = 0; + int reject = 0; + StopWatch w = new StopWatch(); + w.start(); + for (int u = 0; u < count; u++) { + try { + Thread.sleep(10); + } catch (InterruptedException e) { + e.printStackTrace(); + } + boolean result = rateLimiter.canPass(); + if (result) { + pass++; + totalPass.getAndIncrement(); + } else { + reject++; + totalReject.getAndIncrement(); + } + cnt.countDown(); + } + w.stop(); + LOGGER.info("total time:{}ms, pass:{}, reject:{}", w.getLastTaskTimeMillis(), pass, reject); + }); + } + cnt.await(); + totalStopWatch.stop(); + LOGGER.info("total time:{}ms, total pass:{}, total reject:{}", totalStopWatch.getLastTaskTimeMillis(), + totalPass.get(), totalReject.get()); + Assertions.assertNotEquals(0, totalReject.get()); + } +} diff --git a/server/src/test/resources/META-INF/services/org.apache.seata.server.limit.ratelimit.RateLimiter b/server/src/test/resources/META-INF/services/org.apache.seata.server.limit.ratelimit.RateLimiter new file mode 100644 index 00000000000..ef355142ff1 --- /dev/null +++ b/server/src/test/resources/META-INF/services/org.apache.seata.server.limit.ratelimit.RateLimiter @@ -0,0 +1,17 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +org.apache.seata.server.limit.ratelimit.TokenBucketLimiter \ No newline at end of file diff --git a/server/src/test/resources/file.conf b/server/src/test/resources/file.conf index 422c6dd5836..1a03672ddf6 100644 --- a/server/src/test/resources/file.conf +++ b/server/src/test/resources/file.conf @@ -56,6 +56,12 @@ server { #schedule delete expired undo_log in milliseconds logDeletePeriod = 86400000 } + ratelimit { + enable = false + bucketTokenNumPerSecond = 999999 + bucketTokenMaxNum = 999999 + bucketTokenInitialNum = 999999 + } } ## metrics settings metrics {