Skip to content

Commit

Permalink
Use unbound queue for transfer thread pool (#5700)
Browse files Browse the repository at this point in the history
Signed-off-by: Paolo Di Tommaso <[email protected]>
  • Loading branch information
pditommaso authored Jan 23, 2025
1 parent 5325e5a commit 9031968
Show file tree
Hide file tree
Showing 5 changed files with 11 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ class BlockingThreadExecutorFactory {
Integer.MAX_VALUE,
keepAlive.getMillis(),
TimeUnit.MILLISECONDS,
new BlockingBlockingQueue<Runnable>(maxQueueSize-maxThreads),
new HardBlockingQueue<Runnable>(maxQueueSize-maxThreads),
new CustomThreadFactory(prefix),
new ThreadPoolExecutor.CallerRunsPolicy() )
// ^^^^^
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ import groovy.util.logging.Slf4j
*/
@Slf4j
@CompileStatic
class BlockingBlockingQueue<E> extends LinkedBlockingQueue<E> {
class HardBlockingQueue<E> extends LinkedBlockingQueue<E> {

BlockingBlockingQueue(int maxSize) {
HardBlockingQueue(int maxSize) {
super(maxSize);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,8 @@ class ThreadPoolBuilder {

ThreadPoolBuilder withQueueSize(int size) {
this.queueSize = size
this.workQueue = new LinkedBlockingQueue<Runnable>(size)
if( size>0 )
this.workQueue = new HardBlockingQueue<Runnable>(size)
return this
}

Expand Down Expand Up @@ -141,7 +142,7 @@ class ThreadPoolBuilder {
if( workQueue==null )
workQueue = new LinkedBlockingQueue<>()
if( rejectionPolicy==null )
rejectionPolicy = new ThreadPoolExecutor.CallerRunsPolicy()
rejectionPolicy = new ThreadPoolExecutor.AbortPolicy()
if( threadFactory==null )
threadFactory = new CustomThreadFactory(name)

Expand All @@ -160,19 +161,4 @@ class ThreadPoolBuilder {
return result
}


static ThreadPoolExecutor io(String name=null) {
io(10, 100, 10_000, name)
}


static ThreadPoolExecutor io(int min, int max, int queue, String name=null) {
new ThreadPoolBuilder()
.withMinSize(min)
.withMaxSize(max)
.withQueueSize(queue)
.withName(name)
.build()
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class ThreadPoolManager {

final static public int DEFAULT_MIN_THREAD = 10
final static public int DEFAULT_MAX_THREAD = Math.max(DEFAULT_MIN_THREAD, Runtime.runtime.availableProcessors()*3)
final static public int DEFAULT_QUEUE_SIZE = 10_000
final static public int DEFAULT_QUEUE_SIZE = -1 // use -1 for using an unbound queue
final static public Duration DEFAULT_KEEP_ALIVE = Duration.of('60sec')
final static public Duration DEFAULT_MAX_AWAIT = Duration.of('12 hour')

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,11 @@ class ThreadPoolBuilderTest extends Specification {
pool.getMaximumPoolSize() == 10
pool.getKeepAliveTime(TimeUnit.MILLISECONDS) == 60_000
pool.getThreadFactory() instanceof CustomThreadFactory
pool.getRejectedExecutionHandler() instanceof ThreadPoolExecutor.CallerRunsPolicy
pool.getRejectedExecutionHandler() instanceof ThreadPoolExecutor.AbortPolicy
and:
builder.getName().startsWith('nf-thread-pool-')
builder.getWorkQueue() instanceof LinkedBlockingQueue
builder.getWorkQueue().remainingCapacity() == Integer.MAX_VALUE
}

def 'should create pool with all settings' () {
Expand All @@ -66,6 +67,8 @@ class ThreadPoolBuilderTest extends Specification {
builder.queueSize == 1000
builder.allowCoreThreadTimeout
builder.rejectionPolicy instanceof ThreadPoolExecutor.AbortPolicy
builder.workQueue instanceof HardBlockingQueue
builder.workQueue.remainingCapacity() == 1000

when:
def pool = builder.build()
Expand Down

0 comments on commit 9031968

Please sign in to comment.