-
Notifications
You must be signed in to change notification settings - Fork 8.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feature: add single server rate limit #6756
Conversation
# Conflicts: # common/src/main/java/org/apache/seata/common/ConfigurationKeys.java # server/pom.xml
…ture/flow-limit-xjl
/** | ||
* The constant RATE_LIMIT_BUCKET_TOKEN_SECOND_NUM. | ||
*/ | ||
String RATE_LIMIT_BUCKET_TOKEN_SECOND_NUM = RATE_LIMIT_PREFIX + "bucketTokenSecondNum"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
bucketTokenNumPerSecond
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The rate limit has to immediate effect when it enable or change.
@@ -28,6 +26,8 @@ | |||
import org.slf4j.Logger; | |||
import org.slf4j.LoggerFactory; | |||
|
|||
import java.util.concurrent.TimeUnit; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
import java.util.concurrent.TimeUnit; |
@@ -16,8 +16,6 @@ | |||
*/ | |||
package io.seata.tm.api; | |||
|
|||
import java.util.concurrent.TimeUnit; | |||
|
|||
import io.netty.util.HashedWheelTimer; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
import io.netty.util.HashedWheelTimer; | |
import java.util.concurrent.TimeUnit; | |
import io.netty.util.HashedWheelTimer; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed it
import java.lang.reflect.Method; | ||
import java.util.LinkedHashSet; | ||
import java.util.Set; | ||
import java.util.concurrent.ScheduledThreadPoolExecutor; | ||
import java.util.concurrent.TimeUnit; | ||
import java.util.concurrent.atomic.AtomicBoolean; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
import java.lang.reflect.Method; | |
import java.util.LinkedHashSet; | |
import java.util.Set; | |
import java.util.concurrent.ScheduledThreadPoolExecutor; | |
import java.util.concurrent.TimeUnit; | |
import java.util.concurrent.atomic.AtomicBoolean; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed it
import java.util.concurrent.ScheduledThreadPoolExecutor; | ||
import java.util.concurrent.TimeUnit; | ||
import java.util.concurrent.atomic.AtomicBoolean; | ||
|
||
import com.google.common.eventbus.Subscribe; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
import com.google.common.eventbus.Subscribe; | |
import java.lang.reflect.Method; | |
import java.util.LinkedHashSet; | |
import java.util.Set; | |
import java.util.concurrent.ScheduledThreadPoolExecutor; | |
import java.util.concurrent.TimeUnit; | |
import java.util.concurrent.atomic.AtomicBoolean; | |
import com.google.common.eventbus.Subscribe; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed it
|
||
String CLIENT_ID_KEY = "clientId"; | ||
|
||
String SERVER_IP_ADDRESS_AND_PORT_KEY = "serverIpAddressAndPort"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't 'hostAndPort' more concise?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed it
script/config-center/config.txt
Outdated
server.ratelimit.bucketTokenMaxNum = 1 | ||
server.ratelimit.bucketTokenInitialNum = 1 | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some users may blindly copy this configuration, so it's better to set the threshold to a larger value
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed it
import java.util.concurrent.ScheduledThreadPoolExecutor; | ||
import java.util.concurrent.ThreadPoolExecutor; | ||
import java.util.concurrent.TimeUnit; | ||
|
||
import io.netty.channel.Channel; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
import io.netty.channel.Channel; | |
import java.util.ArrayList; | |
import java.util.Collection; | |
import java.util.Comparator; | |
import java.util.List; | |
import java.util.Map; | |
import java.util.Objects; | |
import java.util.concurrent.ArrayBlockingQueue; | |
import java.util.concurrent.ScheduledThreadPoolExecutor; | |
import java.util.concurrent.ThreadPoolExecutor; | |
import java.util.concurrent.TimeUnit; | |
import io.netty.channel.Channel; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed it
import java.util.ArrayList; | ||
import java.util.Collection; | ||
import java.util.Comparator; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.Objects; | ||
import java.util.concurrent.ArrayBlockingQueue; | ||
import java.util.concurrent.ScheduledThreadPoolExecutor; | ||
import java.util.concurrent.ThreadPoolExecutor; | ||
import java.util.concurrent.TimeUnit; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
import java.util.ArrayList; | |
import java.util.Collection; | |
import java.util.Comparator; | |
import java.util.List; | |
import java.util.Map; | |
import java.util.Objects; | |
import java.util.concurrent.ArrayBlockingQueue; | |
import java.util.concurrent.ScheduledThreadPoolExecutor; | |
import java.util.concurrent.ThreadPoolExecutor; | |
import java.util.concurrent.TimeUnit; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed it
Had added hot config. |
GlobalBeginResponse response = new GlobalBeginResponse(); | ||
response.setTransactionExceptionCode(TransactionExceptionCode.BeginFailed); | ||
response.setResultCode(ResultCode.Failed); | ||
RateLimitInfo rateLimitInfo = RateLimitInfo.generateRateLimitInfo(rpcContext.getApplicationId(), | ||
RateLimitInfo.GLOBAL_BEGIN_FAILED, rpcContext.getClientId(), XID.getIpAddressAndPort()); | ||
MetricsPublisher.postRateLimitEvent(rateLimitInfo); | ||
response.setMsg(String.format("TransactionException[rate limit exception, rate limit info: %s]", rateLimitInfo)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
限流器应该只是限流作用,为什么要跟业务逻辑耦合在一起?
The role of limited viewership should only be limited viewership, why should it be coupled with business logic?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed it.
server/src/main/resources/file.conf
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Delete it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed it.
return null; | ||
} | ||
|
||
if (request instanceof GlobalBeginRequest) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
为什么不使用MessageType来判断,未来是否限流器可以作用于更多的request?只要对外提供一个配置项,允许用户配置一个需要限流request的集合,这里只要判断MessageType是否包含在其中即可
Why not use MessageType to determine whether the limit viewership of requests can be used in the future? Just provide a configuration item to allow users to configure a collection that needs to limit viewership of requests. Here, just determine whether MessageType is included in it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这个当时是说的后面来优化,限流request的集合是指的比如是分支开始?如果是配置集合,我理解那就必须在客户端也看看是否做对应修改,以免不兼容?
This was the later optimization. Is the collection of current restriction Request refers to the start of the branch? If it is a configuration collection, you must also see the corresponding modification on the client, so as not to be incompatible?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pr需要登记
@@ -159,6 +159,11 @@ seata: | |||
session: | |||
branch-async-queue-size: 5000 #branch async remove queue size | |||
enable-branch-async-remove: false #enable to asynchronous remove branchSession | |||
ratelimit: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
默认值调大
Default value up
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed it
@@ -131,6 +131,11 @@ seata: | |||
session: | |||
branch-async-queue-size: 5000 #branch async remove queue size | |||
enable-branch-async-remove: false #enable to asynchronous remove branchSession | |||
ratelimit: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
默认值调大
Default value up
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed it
if (ConfigurationKeys.RATE_LIMIT_BUCKET_TOKEN_INITIAL_NUM.equals(dataId)) { | ||
config.setBucketTokenInitialNum(NumberUtils.toInt(newValue, config.getBucketTokenInitialNum())); | ||
} | ||
rateLimiter.reInit(config); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
为什么不直接在TokenBucketLimiter中进行监听?
Why not listen directly in TokenBucketLimiter?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
因为想的是ratelimithandler下面可以通过spi加载不同的限流器,然后具体的限流器重新加载由具体的限流器实现
By ratelimithandler, you can load different current limiter through spi, and then the specific current limiter reloads by the new config from Ratelimithandler
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
# Conflicts: # changes/en-us/2.x.md # changes/zh-cn/2.x.md # common/src/main/java/org/apache/seata/common/ConfigurationKeys.java
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
# Conflicts: # changes/en-us/2.x.md # changes/zh-cn/2.x.md # seata-spring-autoconfigure/seata-spring-autoconfigure-server/src/main/java/org/apache/seata/spring/boot/autoconfigure/SeataServerEnvironmentPostProcessor.java
Ⅰ. Describe what this PR did
Support flow limiting control for a single server:
Ⅱ. Does this pull request fix one issue?
Ⅲ. Why don't you add test cases (unit test/integration test)?
Ⅳ. Describe how to verify it
Ⅴ. Special notes for reviews