diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java index 57ada3a56022..88d64af2ba4c 100644 --- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java +++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java @@ -27,6 +27,8 @@ import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import org.apache.lucene.codecs.Codec; import org.apache.lucene.index.DocumentsWriterDeleteQueue.DeleteSlice; @@ -44,7 +46,7 @@ import org.apache.lucene.util.StringHelper; import org.apache.lucene.util.Version; -final class DocumentsWriterPerThread implements Accountable { +final class DocumentsWriterPerThread implements Accountable, Lock { private Throwable abortingException; @@ -694,25 +696,26 @@ long getCommitLastBytesUsedDelta() { return delta; } - /** - * Locks this DWPT for exclusive access. - * - * @see ReentrantLock#lock() - */ - void lock() { + @Override + public void lock() { lock.lock(); } - /** - * Acquires the DWPT's lock only if it is not held by another thread at the time of invocation. - * - * @return true if the lock was acquired. - * @see ReentrantLock#tryLock() - */ - boolean tryLock() { + @Override + public void lockInterruptibly() throws InterruptedException { + lock.lockInterruptibly(); + } + + @Override + public boolean tryLock() { return lock.tryLock(); } + @Override + public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { + return lock.tryLock(time, unit); + } + /** * Returns true if the DWPT's lock is held by the current thread * @@ -722,15 +725,16 @@ boolean isHeldByCurrentThread() { return lock.isHeldByCurrentThread(); } - /** - * Unlocks the DWPT's lock - * - * @see ReentrantLock#unlock() - */ - void unlock() { + @Override + public void unlock() { lock.unlock(); } + @Override + public Condition newCondition() { + throw new UnsupportedOperationException(); + } + /** Returns true iff this DWPT has been flushed */ boolean hasFlushed() { return hasFlushed.get() == Boolean.TRUE; diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java index df2d76b88797..d69a71bfea57 100644 --- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java +++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java @@ -44,8 +44,8 @@ final class DocumentsWriterPerThreadPool implements Iterable dwpts = Collections.newSetFromMap(new IdentityHashMap<>()); - private final ConcurrentApproximatePriorityQueue freeList = - new ConcurrentApproximatePriorityQueue<>(); + private final LockableConcurrentApproximatePriorityQueue freeList = + new LockableConcurrentApproximatePriorityQueue<>(); private final Supplier dwptFactory; private int takenWriterPermits = 0; private volatile boolean closed; @@ -114,10 +114,11 @@ private synchronized DocumentsWriterPerThread newWriter() { */ DocumentsWriterPerThread getAndLock() { ensureOpen(); - DocumentsWriterPerThread dwpt = freeList.poll(DocumentsWriterPerThread::tryLock); + DocumentsWriterPerThread dwpt = freeList.lockAndPoll(); if (dwpt != null) { return dwpt; } + // newWriter() adds the DWPT to the `dwpts` set as a side-effect. However it is not added to // `freeList` at this point, it will be added later on once DocumentsWriter has indexed a // document into this DWPT and then gives it back to the pool by calling @@ -139,8 +140,7 @@ void marksAsFreeAndUnlock(DocumentsWriterPerThread state) { final long ramBytesUsed = state.ramBytesUsed(); assert contains(state) : "we tried to add a DWPT back to the pool but the pool doesn't know about this DWPT"; - freeList.add(state, ramBytesUsed); - state.unlock(); + freeList.addAndUnlock(state, ramBytesUsed); } @Override diff --git a/lucene/core/src/java/org/apache/lucene/index/LockableConcurrentApproximatePriorityQueue.java b/lucene/core/src/java/org/apache/lucene/index/LockableConcurrentApproximatePriorityQueue.java new file mode 100644 index 000000000000..234fb8871939 --- /dev/null +++ b/lucene/core/src/java/org/apache/lucene/index/LockableConcurrentApproximatePriorityQueue.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.lucene.index; + +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Lock; + +/** A {@link ConcurrentApproximatePriorityQueue} of {@link Lock} objects. */ +final class LockableConcurrentApproximatePriorityQueue { + + private final ConcurrentApproximatePriorityQueue queue; + private final AtomicInteger addAndUnlockCounter = new AtomicInteger(); + + LockableConcurrentApproximatePriorityQueue(int concurrency) { + this.queue = new ConcurrentApproximatePriorityQueue<>(concurrency); + } + + LockableConcurrentApproximatePriorityQueue() { + this.queue = new ConcurrentApproximatePriorityQueue<>(); + } + + /** + * Lock an entry, and poll it from the queue, in that order. If no entry can be found and locked, + * {@code null} is returned. + */ + T lockAndPoll() { + int addAndUnlockCount; + do { + addAndUnlockCount = addAndUnlockCounter.get(); + T entry = queue.poll(Lock::tryLock); + if (entry != null) { + return entry; + } + // If an entry has been added to the queue in the meantime, try again. + } while (addAndUnlockCount != addAndUnlockCounter.get()); + + return null; + } + + /** Remove an entry from the queue. */ + boolean remove(Object o) { + return queue.remove(o); + } + + // Only used for assertions + boolean contains(Object o) { + return queue.contains(o); + } + + /** Add an entry to the queue and unlock it, in that order. */ + void addAndUnlock(T entry, long weight) { + queue.add(entry, weight); + entry.unlock(); + addAndUnlockCounter.incrementAndGet(); + } +} diff --git a/lucene/core/src/test/org/apache/lucene/index/TestLockableConcurrentApproximatePriorityQueue.java b/lucene/core/src/test/org/apache/lucene/index/TestLockableConcurrentApproximatePriorityQueue.java new file mode 100644 index 000000000000..90af5f2d0833 --- /dev/null +++ b/lucene/core/src/test/org/apache/lucene/index/TestLockableConcurrentApproximatePriorityQueue.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.lucene.index; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import org.apache.lucene.tests.util.LuceneTestCase; +import org.apache.lucene.tests.util.TestUtil; +import org.apache.lucene.util.ThreadInterruptedException; + +public class TestLockableConcurrentApproximatePriorityQueue extends LuceneTestCase { + + private static class WeightedLock implements Lock { + + private final Lock lock = new ReentrantLock(); + long weight; + + @Override + public void lock() { + lock.lock(); + } + + @Override + public void lockInterruptibly() throws InterruptedException { + throw new UnsupportedOperationException(); + } + + @Override + public boolean tryLock() { + return lock.tryLock(); + } + + @Override + public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { + throw new UnsupportedOperationException(); + } + + @Override + public void unlock() { + lock.unlock(); + } + + @Override + public Condition newCondition() { + throw new UnsupportedOperationException(); + } + } + + public void testNeverReturnNullOnNonEmptyQueue() throws Exception { + final int iters = atLeast(10); + for (int iter = 0; iter < iters; ++iter) { + final int concurrency = TestUtil.nextInt(random(), 1, 16); + final LockableConcurrentApproximatePriorityQueue queue = + new LockableConcurrentApproximatePriorityQueue<>(concurrency); + final int numThreads = TestUtil.nextInt(random(), 2, 16); + final Thread[] threads = new Thread[numThreads]; + final CountDownLatch startingGun = new CountDownLatch(1); + for (int t = 0; t < threads.length; ++t) { + threads[t] = + new Thread( + () -> { + try { + startingGun.await(); + } catch (InterruptedException e) { + throw new ThreadInterruptedException(e); + } + WeightedLock lock = new WeightedLock(); + lock.lock(); + lock.weight++; // Simulate a DWPT whose RAM usage increases + queue.addAndUnlock(lock, lock.weight); + for (int i = 0; i < 10_000; ++i) { + lock = queue.lockAndPoll(); + assertNotNull(lock); + queue.addAndUnlock(lock, lock.hashCode()); + } + }); + } + for (Thread thread : threads) { + thread.start(); + } + startingGun.countDown(); + for (Thread thread : threads) { + thread.join(); + } + } + } +}