From 903196818deae4fe52949fbfcc9645abbaaf262e Mon Sep 17 00:00:00 2001 From: Paolo Di Tommaso Date: Thu, 23 Jan 2025 14:46:05 +0100 Subject: [PATCH] Use unbound queue for transfer thread pool (#5700) Signed-off-by: Paolo Di Tommaso --- .../util/BlockingThreadExecutorFactory.groovy | 2 +- ...gQueue.groovy => HardBlockingQueue.groovy} | 4 ++-- .../nextflow/util/ThreadPoolBuilder.groovy | 20 +++---------------- .../nextflow/util/ThreadPoolManager.groovy | 2 +- .../util/ThreadPoolBuilderTest.groovy | 5 ++++- 5 files changed, 11 insertions(+), 22 deletions(-) rename modules/nextflow/src/main/groovy/nextflow/util/{BlockingBlockingQueue.groovy => HardBlockingQueue.groovy} (93%) diff --git a/modules/nextflow/src/main/groovy/nextflow/util/BlockingThreadExecutorFactory.groovy b/modules/nextflow/src/main/groovy/nextflow/util/BlockingThreadExecutorFactory.groovy index 925bfb6f51..77a7b9af3b 100644 --- a/modules/nextflow/src/main/groovy/nextflow/util/BlockingThreadExecutorFactory.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/util/BlockingThreadExecutorFactory.groovy @@ -114,7 +114,7 @@ class BlockingThreadExecutorFactory { Integer.MAX_VALUE, keepAlive.getMillis(), TimeUnit.MILLISECONDS, - new BlockingBlockingQueue(maxQueueSize-maxThreads), + new HardBlockingQueue(maxQueueSize-maxThreads), new CustomThreadFactory(prefix), new ThreadPoolExecutor.CallerRunsPolicy() ) // ^^^^^ diff --git a/modules/nextflow/src/main/groovy/nextflow/util/BlockingBlockingQueue.groovy b/modules/nextflow/src/main/groovy/nextflow/util/HardBlockingQueue.groovy similarity index 93% rename from modules/nextflow/src/main/groovy/nextflow/util/BlockingBlockingQueue.groovy rename to modules/nextflow/src/main/groovy/nextflow/util/HardBlockingQueue.groovy index b27e0d6e4f..db792a474d 100644 --- a/modules/nextflow/src/main/groovy/nextflow/util/BlockingBlockingQueue.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/util/HardBlockingQueue.groovy @@ -30,9 +30,9 @@ import groovy.util.logging.Slf4j */ @Slf4j @CompileStatic -class BlockingBlockingQueue extends LinkedBlockingQueue { +class HardBlockingQueue extends LinkedBlockingQueue { - BlockingBlockingQueue(int maxSize) { + HardBlockingQueue(int maxSize) { super(maxSize); } diff --git a/modules/nextflow/src/main/groovy/nextflow/util/ThreadPoolBuilder.groovy b/modules/nextflow/src/main/groovy/nextflow/util/ThreadPoolBuilder.groovy index 20298106df..ac140a826d 100644 --- a/modules/nextflow/src/main/groovy/nextflow/util/ThreadPoolBuilder.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/util/ThreadPoolBuilder.groovy @@ -106,7 +106,8 @@ class ThreadPoolBuilder { ThreadPoolBuilder withQueueSize(int size) { this.queueSize = size - this.workQueue = new LinkedBlockingQueue(size) + if( size>0 ) + this.workQueue = new HardBlockingQueue(size) return this } @@ -141,7 +142,7 @@ class ThreadPoolBuilder { if( workQueue==null ) workQueue = new LinkedBlockingQueue<>() if( rejectionPolicy==null ) - rejectionPolicy = new ThreadPoolExecutor.CallerRunsPolicy() + rejectionPolicy = new ThreadPoolExecutor.AbortPolicy() if( threadFactory==null ) threadFactory = new CustomThreadFactory(name) @@ -160,19 +161,4 @@ class ThreadPoolBuilder { return result } - - static ThreadPoolExecutor io(String name=null) { - io(10, 100, 10_000, name) - } - - - static ThreadPoolExecutor io(int min, int max, int queue, String name=null) { - new ThreadPoolBuilder() - .withMinSize(min) - .withMaxSize(max) - .withQueueSize(queue) - .withName(name) - .build() - } - } diff --git a/modules/nextflow/src/main/groovy/nextflow/util/ThreadPoolManager.groovy b/modules/nextflow/src/main/groovy/nextflow/util/ThreadPoolManager.groovy index 892d81998d..fabdf3e013 100644 --- a/modules/nextflow/src/main/groovy/nextflow/util/ThreadPoolManager.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/util/ThreadPoolManager.groovy @@ -42,7 +42,7 @@ class ThreadPoolManager { final static public int DEFAULT_MIN_THREAD = 10 final static public int DEFAULT_MAX_THREAD = Math.max(DEFAULT_MIN_THREAD, Runtime.runtime.availableProcessors()*3) - final static public int DEFAULT_QUEUE_SIZE = 10_000 + final static public int DEFAULT_QUEUE_SIZE = -1 // use -1 for using an unbound queue final static public Duration DEFAULT_KEEP_ALIVE = Duration.of('60sec') final static public Duration DEFAULT_MAX_AWAIT = Duration.of('12 hour') diff --git a/modules/nextflow/src/test/groovy/nextflow/util/ThreadPoolBuilderTest.groovy b/modules/nextflow/src/test/groovy/nextflow/util/ThreadPoolBuilderTest.groovy index 4a2d016bd4..103d98bcfe 100644 --- a/modules/nextflow/src/test/groovy/nextflow/util/ThreadPoolBuilderTest.groovy +++ b/modules/nextflow/src/test/groovy/nextflow/util/ThreadPoolBuilderTest.groovy @@ -42,10 +42,11 @@ class ThreadPoolBuilderTest extends Specification { pool.getMaximumPoolSize() == 10 pool.getKeepAliveTime(TimeUnit.MILLISECONDS) == 60_000 pool.getThreadFactory() instanceof CustomThreadFactory - pool.getRejectedExecutionHandler() instanceof ThreadPoolExecutor.CallerRunsPolicy + pool.getRejectedExecutionHandler() instanceof ThreadPoolExecutor.AbortPolicy and: builder.getName().startsWith('nf-thread-pool-') builder.getWorkQueue() instanceof LinkedBlockingQueue + builder.getWorkQueue().remainingCapacity() == Integer.MAX_VALUE } def 'should create pool with all settings' () { @@ -66,6 +67,8 @@ class ThreadPoolBuilderTest extends Specification { builder.queueSize == 1000 builder.allowCoreThreadTimeout builder.rejectionPolicy instanceof ThreadPoolExecutor.AbortPolicy + builder.workQueue instanceof HardBlockingQueue + builder.workQueue.remainingCapacity() == 1000 when: def pool = builder.build()