Skip to content

Commit

Permalink
Make sure ConcurrentApproximatePriorityQueue#poll never returns `nu…
Browse files Browse the repository at this point in the history
…ll` on a non-empty queue.

Before this change, `ConcurrentApproximatePriorityQueue#poll` could sometimes
return `null` even though the queue was empty at no point in time. The
practical implication is that we can end up with more DWPTs in memory than
indexing threads, which, while not a bug, is probably not efficient.

I ran luceneutil's `IndexGeonames` with this change and could not notice a
slowdown while this benchmark has been good at exhibiting contention issues in
the past. This is likely due to the fact that most of the time, a DWPT can be
obtained from the pool using the optimistic path, which doesn't block.

Closes apache#12649 apache#12916
  • Loading branch information
jpountz committed Dec 21, 2023
1 parent 91002d0 commit 2a413b8
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -116,20 +116,28 @@ T poll(Predicate<T> predicate) {
}
}
}
for (int i = 0; i < concurrency; ++i) {
final int index = (threadHash + i) % concurrency;
final Lock lock = locks[index];
final ApproximatePriorityQueue<T> queue = queues[index];

// We want to make sure we return a non-null entry if this queue is not empty. This requires us
// to take all the locks at once, otherwise if there is a single non-empty sub queue, as we
// iterate through all sub queues, there is a chance that an entry gets added to a queue we just
// checked and that the existing entry gets removed from a queue we haven't checked yet. This
// would make this method return `null` even though the queue was empty at no point in time.
for (Lock lock : locks) {
lock.lock();
try {
}
try {
for (ApproximatePriorityQueue<T> queue : queues) {
T entry = queue.poll(predicate);
if (entry != null) {
return entry;
}
} finally {
}
} finally {
for (Lock lock : locks) {
lock.unlock();
}
}

return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,4 +100,41 @@ public void run() {
assertEquals(Integer.valueOf(3), pq.poll(x -> true));
assertNull(pq.poll(x -> true));
}

public void testNeverReturnNullOnNonEmptyQueue() throws Exception {
final int iters = atLeast(10);
for (int iter = 0; iter < iters; ++iter) {
final int concurrency = TestUtil.nextInt(random(), 1, 16);
final ConcurrentApproximatePriorityQueue<Integer> queue =
new ConcurrentApproximatePriorityQueue<>(concurrency);
final int numThreads = TestUtil.nextInt(random(), 2, 16);
final Thread[] threads = new Thread[numThreads];
final CountDownLatch startingGun = new CountDownLatch(1);
for (int t = 0; t < threads.length; ++t) {
threads[t] =
new Thread(
() -> {
try {
startingGun.await();
} catch (InterruptedException e) {
throw new ThreadInterruptedException(e);
}
Integer v = TestUtil.nextInt(random(), 0, 100);
queue.add(v, v);
for (int i = 0; i < 1_000; ++i) {
v = queue.poll(x -> true);
assertNotNull(v);
queue.add(v, v);
}
});
}
for (Thread thread : threads) {
thread.start();
}
startingGun.countDown();
for (Thread thread : threads) {
thread.join();
}
}
}
}

0 comments on commit 2a413b8

Please sign in to comment.