From ab9704560525468d28294c7c6c0e4266e79efab4 Mon Sep 17 00:00:00 2001 From: SreeramdasLavanya Date: Fri, 20 Sep 2024 14:25:17 +0000 Subject: [PATCH 01/17] SynchronizationContextTest changes for scheduleFixedDelay with Duration --- .../java/io/grpc/SynchronizationContext.java | 33 ++++ .../io/grpc/SynchronizationContextTest.java | 172 +++++++++++------- 2 files changed, 135 insertions(+), 70 deletions(-) diff --git a/api/src/main/java/io/grpc/SynchronizationContext.java b/api/src/main/java/io/grpc/SynchronizationContext.java index 5a7677ac15f..046988b4779 100644 --- a/api/src/main/java/io/grpc/SynchronizationContext.java +++ b/api/src/main/java/io/grpc/SynchronizationContext.java @@ -20,6 +20,7 @@ import static com.google.common.base.Preconditions.checkState; import java.lang.Thread.UncaughtExceptionHandler; +import java.time.Duration; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Executor; @@ -194,6 +195,38 @@ public String toString() { } + public final ScheduledHandle scheduleWithFixedDelay( + final Runnable task, Duration initialDelay, Duration delay, TimeUnit unit, + ScheduledExecutorService timerService) { + final ManagedRunnable runnable = new ManagedRunnable(task); + System.out.println("Inside Durationcall"); + ScheduledFuture future = timerService.scheduleWithFixedDelay(new Runnable() { + @Override + public void run() { + execute(runnable); + } + + @Override + public String toString() { + return task.toString() + "(scheduled in SynchronizationContext with delay of " + delay + + ")"; + } + }, toNanosSaturated(initialDelay), toNanosSaturated(delay), unit); + return new ScheduledHandle(runnable, future); + } + static long toNanosSaturated(Duration duration) { + // Using a try/catch seems lazy, but the catch block will rarely get invoked (except for + // durations longer than approximately +/- 292 years). + try { + //long delay = TimeUnit.MILLISECONDS.convert(500, TimeUnit.SECONDS); // Converts 500 seconds to milliseconds + return duration.toNanos(); + //return TimeUnit.NANOSECONDS.convert(duration); + + } catch (ArithmeticException tooBig) { + return duration.isNegative() ? Long.MIN_VALUE : Long.MAX_VALUE; + } + } + private static class ManagedRunnable implements Runnable { final Runnable task; boolean isCancelled; diff --git a/api/src/test/java/io/grpc/SynchronizationContextTest.java b/api/src/test/java/io/grpc/SynchronizationContextTest.java index 3d5e7fa42b9..a4d9e80837f 100644 --- a/api/src/test/java/io/grpc/SynchronizationContextTest.java +++ b/api/src/test/java/io/grpc/SynchronizationContextTest.java @@ -27,6 +27,7 @@ import com.google.common.util.concurrent.testing.TestingExecutors; import io.grpc.SynchronizationContext.ScheduledHandle; +import java.time.Duration; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingQueue; @@ -52,6 +53,7 @@ */ @RunWith(JUnit4.class) public class SynchronizationContextTest { + private final BlockingQueue uncaughtErrors = new LinkedBlockingQueue<>(); private final SynchronizationContext syncContext = new SynchronizationContext( new Thread.UncaughtExceptionHandler() { @@ -72,8 +74,9 @@ public void uncaughtException(Thread t, Throwable e) { @Mock private Runnable task3; - - @After public void tearDown() { + + @After + public void tearDown() { assertThat(uncaughtErrors).isEmpty(); } @@ -105,36 +108,36 @@ public void multiThread() throws Exception { final AtomicReference task2Thread = new AtomicReference<>(); doAnswer(new Answer() { - @Override - public Void answer(InvocationOnMock invocation) { - task1Thread.set(Thread.currentThread()); - task1Running.countDown(); - try { - assertTrue(task1Proceed.await(5, TimeUnit.SECONDS)); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - return null; + @Override + public Void answer(InvocationOnMock invocation) { + task1Thread.set(Thread.currentThread()); + task1Running.countDown(); + try { + assertTrue(task1Proceed.await(5, TimeUnit.SECONDS)); + } catch (InterruptedException e) { + throw new RuntimeException(e); } - }).when(task1).run(); + return null; + } + }).when(task1).run(); doAnswer(new Answer() { - @Override - public Void answer(InvocationOnMock invocation) { - task2Thread.set(Thread.currentThread()); - return null; - } - }).when(task2).run(); + @Override + public Void answer(InvocationOnMock invocation) { + task2Thread.set(Thread.currentThread()); + return null; + } + }).when(task2).run(); Thread sideThread = new Thread() { - @Override - public void run() { - syncContext.executeLater(task1); - task1Added.countDown(); - syncContext.drain(); - sideThreadDone.countDown(); - } - }; + @Override + public void run() { + syncContext.executeLater(task1); + task1Added.countDown(); + syncContext.drain(); + sideThreadDone.countDown(); + } + }; sideThread.start(); assertTrue(task1Added.await(5, TimeUnit.SECONDS)); @@ -162,26 +165,26 @@ public void throwIfNotInThisSynchronizationContext() throws Exception { final CountDownLatch task1Proceed = new CountDownLatch(1); doAnswer(new Answer() { - @Override - public Void answer(InvocationOnMock invocation) { - task1Running.countDown(); - syncContext.throwIfNotInThisSynchronizationContext(); - try { - assertTrue(task1Proceed.await(5, TimeUnit.SECONDS)); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - taskSuccess.set(true); - return null; + @Override + public Void answer(InvocationOnMock invocation) { + task1Running.countDown(); + syncContext.throwIfNotInThisSynchronizationContext(); + try { + assertTrue(task1Proceed.await(5, TimeUnit.SECONDS)); + } catch (InterruptedException e) { + throw new RuntimeException(e); } - }).when(task1).run(); + taskSuccess.set(true); + return null; + } + }).when(task1).run(); Thread sideThread = new Thread() { - @Override - public void run() { - syncContext.execute(task1); - } - }; + @Override + public void run() { + syncContext.execute(task1); + } + }; sideThread.start(); assertThat(task1Running.await(5, TimeUnit.SECONDS)).isTrue(); @@ -215,11 +218,11 @@ public void taskThrows() { InOrder inOrder = inOrder(task1, task2, task3); final RuntimeException e = new RuntimeException("Simulated"); doAnswer(new Answer() { - @Override - public Void answer(InvocationOnMock invocation) { - throw e; - } - }).when(task2).run(); + @Override + public Void answer(InvocationOnMock invocation) { + throw e; + } + }).when(task2).run(); syncContext.executeLater(task1); syncContext.executeLater(task2); syncContext.executeLater(task3); @@ -246,6 +249,24 @@ public void schedule() { verify(task1).run(); } + @Test + public void testScheduleWithFixedDelay() { + MockScheduledExecutorService executorService = new MockScheduledExecutorService(); + + ScheduledHandle handle = + syncContext.scheduleWithFixedDelay(task1, Duration.ofNanos(110), Duration.ofNanos(110), + TimeUnit.NANOSECONDS, executorService); + + assertThat(executorService.delay) + .isEqualTo(executorService.unit.convert(110, TimeUnit.NANOSECONDS)); + assertThat(handle.isPending()).isTrue(); + verify(task1, never()).run(); + + executorService.command.run(); + assertThat(handle.isPending()).isFalse(); + verify(task1).run(); + } + @Test public void scheduleDueImmediately() { MockScheduledExecutorService executorService = new MockScheduledExecutorService(); @@ -288,28 +309,28 @@ public void scheduledHandle_cancelRacesWithTimerExpiration() throws Exception { final CountDownLatch sideThreadDone = new CountDownLatch(1); doAnswer(new Answer() { - @Override - public Void answer(InvocationOnMock invocation) { - task1Running.countDown(); - try { - ScheduledHandle task2Handle; - assertThat(task2Handle = task2HandleQueue.poll(5, TimeUnit.SECONDS)).isNotNull(); - task2Handle.cancel(); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - task1Done.set(true); - return null; + @Override + public Void answer(InvocationOnMock invocation) { + task1Running.countDown(); + try { + ScheduledHandle task2Handle; + assertThat(task2Handle = task2HandleQueue.poll(5, TimeUnit.SECONDS)).isNotNull(); + task2Handle.cancel(); + } catch (InterruptedException e) { + throw new RuntimeException(e); } - }).when(task1).run(); + task1Done.set(true); + return null; + } + }).when(task1).run(); Thread sideThread = new Thread() { - @Override - public void run() { - syncContext.execute(task1); - sideThreadDone.countDown(); - } - }; + @Override + public void run() { + syncContext.execute(task1); + sideThreadDone.countDown(); + } + }; ScheduledHandle handle = syncContext.schedule(task2, 10, TimeUnit.NANOSECONDS, executorService); // This will execute and block in task1 @@ -340,6 +361,7 @@ public void run() { } static class MockScheduledExecutorService extends ForwardingScheduledExecutorService { + private ScheduledExecutorService delegate = TestingExecutors.noOpScheduledExecutor(); Runnable command; @@ -347,15 +369,25 @@ static class MockScheduledExecutorService extends ForwardingScheduledExecutorSer TimeUnit unit; ScheduledFuture future; - @Override public ScheduledExecutorService delegate() { + @Override + public ScheduledExecutorService delegate() { return delegate; } - @Override public ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit) { + @Override + public ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit) { this.command = command; this.delay = delay; this.unit = unit; return future = super.schedule(command, delay, unit); } + @Override + public ScheduledFuture scheduleWithFixedDelay(Runnable command, long intialDelay, long delay, + TimeUnit unit) { + this.command = command; + this.delay = delay; + this.unit = unit; + return future = super.scheduleWithFixedDelay(command, intialDelay, delay, unit); + } } } From fef4c923f04864bbb8ef9420da910f687bb0da6d Mon Sep 17 00:00:00 2001 From: SreeramdasLavanya Date: Fri, 20 Sep 2024 14:47:45 +0000 Subject: [PATCH 02/17] Revert "SynchronizationContextTest changes for scheduleFixedDelay with Duration" This reverts commit ab9704560525468d28294c7c6c0e4266e79efab4. --- .../java/io/grpc/SynchronizationContext.java | 33 ---- .../io/grpc/SynchronizationContextTest.java | 172 +++++++----------- 2 files changed, 70 insertions(+), 135 deletions(-) diff --git a/api/src/main/java/io/grpc/SynchronizationContext.java b/api/src/main/java/io/grpc/SynchronizationContext.java index 046988b4779..5a7677ac15f 100644 --- a/api/src/main/java/io/grpc/SynchronizationContext.java +++ b/api/src/main/java/io/grpc/SynchronizationContext.java @@ -20,7 +20,6 @@ import static com.google.common.base.Preconditions.checkState; import java.lang.Thread.UncaughtExceptionHandler; -import java.time.Duration; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Executor; @@ -195,38 +194,6 @@ public String toString() { } - public final ScheduledHandle scheduleWithFixedDelay( - final Runnable task, Duration initialDelay, Duration delay, TimeUnit unit, - ScheduledExecutorService timerService) { - final ManagedRunnable runnable = new ManagedRunnable(task); - System.out.println("Inside Durationcall"); - ScheduledFuture future = timerService.scheduleWithFixedDelay(new Runnable() { - @Override - public void run() { - execute(runnable); - } - - @Override - public String toString() { - return task.toString() + "(scheduled in SynchronizationContext with delay of " + delay - + ")"; - } - }, toNanosSaturated(initialDelay), toNanosSaturated(delay), unit); - return new ScheduledHandle(runnable, future); - } - static long toNanosSaturated(Duration duration) { - // Using a try/catch seems lazy, but the catch block will rarely get invoked (except for - // durations longer than approximately +/- 292 years). - try { - //long delay = TimeUnit.MILLISECONDS.convert(500, TimeUnit.SECONDS); // Converts 500 seconds to milliseconds - return duration.toNanos(); - //return TimeUnit.NANOSECONDS.convert(duration); - - } catch (ArithmeticException tooBig) { - return duration.isNegative() ? Long.MIN_VALUE : Long.MAX_VALUE; - } - } - private static class ManagedRunnable implements Runnable { final Runnable task; boolean isCancelled; diff --git a/api/src/test/java/io/grpc/SynchronizationContextTest.java b/api/src/test/java/io/grpc/SynchronizationContextTest.java index a4d9e80837f..3d5e7fa42b9 100644 --- a/api/src/test/java/io/grpc/SynchronizationContextTest.java +++ b/api/src/test/java/io/grpc/SynchronizationContextTest.java @@ -27,7 +27,6 @@ import com.google.common.util.concurrent.testing.TestingExecutors; import io.grpc.SynchronizationContext.ScheduledHandle; -import java.time.Duration; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingQueue; @@ -53,7 +52,6 @@ */ @RunWith(JUnit4.class) public class SynchronizationContextTest { - private final BlockingQueue uncaughtErrors = new LinkedBlockingQueue<>(); private final SynchronizationContext syncContext = new SynchronizationContext( new Thread.UncaughtExceptionHandler() { @@ -74,9 +72,8 @@ public void uncaughtException(Thread t, Throwable e) { @Mock private Runnable task3; - - @After - public void tearDown() { + + @After public void tearDown() { assertThat(uncaughtErrors).isEmpty(); } @@ -108,36 +105,36 @@ public void multiThread() throws Exception { final AtomicReference task2Thread = new AtomicReference<>(); doAnswer(new Answer() { - @Override - public Void answer(InvocationOnMock invocation) { - task1Thread.set(Thread.currentThread()); - task1Running.countDown(); - try { - assertTrue(task1Proceed.await(5, TimeUnit.SECONDS)); - } catch (InterruptedException e) { - throw new RuntimeException(e); + @Override + public Void answer(InvocationOnMock invocation) { + task1Thread.set(Thread.currentThread()); + task1Running.countDown(); + try { + assertTrue(task1Proceed.await(5, TimeUnit.SECONDS)); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + return null; } - return null; - } - }).when(task1).run(); + }).when(task1).run(); doAnswer(new Answer() { - @Override - public Void answer(InvocationOnMock invocation) { - task2Thread.set(Thread.currentThread()); - return null; - } - }).when(task2).run(); + @Override + public Void answer(InvocationOnMock invocation) { + task2Thread.set(Thread.currentThread()); + return null; + } + }).when(task2).run(); Thread sideThread = new Thread() { - @Override - public void run() { - syncContext.executeLater(task1); - task1Added.countDown(); - syncContext.drain(); - sideThreadDone.countDown(); - } - }; + @Override + public void run() { + syncContext.executeLater(task1); + task1Added.countDown(); + syncContext.drain(); + sideThreadDone.countDown(); + } + }; sideThread.start(); assertTrue(task1Added.await(5, TimeUnit.SECONDS)); @@ -165,26 +162,26 @@ public void throwIfNotInThisSynchronizationContext() throws Exception { final CountDownLatch task1Proceed = new CountDownLatch(1); doAnswer(new Answer() { - @Override - public Void answer(InvocationOnMock invocation) { - task1Running.countDown(); - syncContext.throwIfNotInThisSynchronizationContext(); - try { - assertTrue(task1Proceed.await(5, TimeUnit.SECONDS)); - } catch (InterruptedException e) { - throw new RuntimeException(e); + @Override + public Void answer(InvocationOnMock invocation) { + task1Running.countDown(); + syncContext.throwIfNotInThisSynchronizationContext(); + try { + assertTrue(task1Proceed.await(5, TimeUnit.SECONDS)); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + taskSuccess.set(true); + return null; } - taskSuccess.set(true); - return null; - } - }).when(task1).run(); + }).when(task1).run(); Thread sideThread = new Thread() { - @Override - public void run() { - syncContext.execute(task1); - } - }; + @Override + public void run() { + syncContext.execute(task1); + } + }; sideThread.start(); assertThat(task1Running.await(5, TimeUnit.SECONDS)).isTrue(); @@ -218,11 +215,11 @@ public void taskThrows() { InOrder inOrder = inOrder(task1, task2, task3); final RuntimeException e = new RuntimeException("Simulated"); doAnswer(new Answer() { - @Override - public Void answer(InvocationOnMock invocation) { - throw e; - } - }).when(task2).run(); + @Override + public Void answer(InvocationOnMock invocation) { + throw e; + } + }).when(task2).run(); syncContext.executeLater(task1); syncContext.executeLater(task2); syncContext.executeLater(task3); @@ -249,24 +246,6 @@ public void schedule() { verify(task1).run(); } - @Test - public void testScheduleWithFixedDelay() { - MockScheduledExecutorService executorService = new MockScheduledExecutorService(); - - ScheduledHandle handle = - syncContext.scheduleWithFixedDelay(task1, Duration.ofNanos(110), Duration.ofNanos(110), - TimeUnit.NANOSECONDS, executorService); - - assertThat(executorService.delay) - .isEqualTo(executorService.unit.convert(110, TimeUnit.NANOSECONDS)); - assertThat(handle.isPending()).isTrue(); - verify(task1, never()).run(); - - executorService.command.run(); - assertThat(handle.isPending()).isFalse(); - verify(task1).run(); - } - @Test public void scheduleDueImmediately() { MockScheduledExecutorService executorService = new MockScheduledExecutorService(); @@ -309,28 +288,28 @@ public void scheduledHandle_cancelRacesWithTimerExpiration() throws Exception { final CountDownLatch sideThreadDone = new CountDownLatch(1); doAnswer(new Answer() { - @Override - public Void answer(InvocationOnMock invocation) { - task1Running.countDown(); - try { - ScheduledHandle task2Handle; - assertThat(task2Handle = task2HandleQueue.poll(5, TimeUnit.SECONDS)).isNotNull(); - task2Handle.cancel(); - } catch (InterruptedException e) { - throw new RuntimeException(e); + @Override + public Void answer(InvocationOnMock invocation) { + task1Running.countDown(); + try { + ScheduledHandle task2Handle; + assertThat(task2Handle = task2HandleQueue.poll(5, TimeUnit.SECONDS)).isNotNull(); + task2Handle.cancel(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + task1Done.set(true); + return null; } - task1Done.set(true); - return null; - } - }).when(task1).run(); + }).when(task1).run(); Thread sideThread = new Thread() { - @Override - public void run() { - syncContext.execute(task1); - sideThreadDone.countDown(); - } - }; + @Override + public void run() { + syncContext.execute(task1); + sideThreadDone.countDown(); + } + }; ScheduledHandle handle = syncContext.schedule(task2, 10, TimeUnit.NANOSECONDS, executorService); // This will execute and block in task1 @@ -361,7 +340,6 @@ public void run() { } static class MockScheduledExecutorService extends ForwardingScheduledExecutorService { - private ScheduledExecutorService delegate = TestingExecutors.noOpScheduledExecutor(); Runnable command; @@ -369,25 +347,15 @@ static class MockScheduledExecutorService extends ForwardingScheduledExecutorSer TimeUnit unit; ScheduledFuture future; - @Override - public ScheduledExecutorService delegate() { + @Override public ScheduledExecutorService delegate() { return delegate; } - @Override - public ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit) { + @Override public ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit) { this.command = command; this.delay = delay; this.unit = unit; return future = super.schedule(command, delay, unit); } - @Override - public ScheduledFuture scheduleWithFixedDelay(Runnable command, long intialDelay, long delay, - TimeUnit unit) { - this.command = command; - this.delay = delay; - this.unit = unit; - return future = super.scheduleWithFixedDelay(command, intialDelay, delay, unit); - } } } From d53e504a659696cba3b93846d813086ebcd6140f Mon Sep 17 00:00:00 2001 From: SreeramdasLavanya Date: Fri, 8 Nov 2024 11:26:51 +0000 Subject: [PATCH 03/17] grpclb-test: Refactored grpclbBalancerStreamClosedAndRetried method --- .../grpc/grpclb/GrpclbLoadBalancerTest.java | 99 ++++++++++--------- 1 file changed, 51 insertions(+), 48 deletions(-) diff --git a/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java b/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java index e489129676a..bf506839ded 100644 --- a/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java +++ b/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java @@ -1780,30 +1780,15 @@ public void grpclbMultipleAuthorities() throws Exception { @Test public void grpclbBalancerStreamClosedAndRetried() throws Exception { - LoadBalanceRequest expectedInitialRequest = - LoadBalanceRequest.newBuilder() - .setInitialRequest( - InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) - .build(); + LoadBalanceRequest expectedInitialRequest = getIntialLoadBalanceRequest(); + InOrder inOrder = inOrder(mockLbService, backoffPolicyProvider, backoffPolicy1, backoffPolicy2, helper); - List grpclbBalancerList = createResolvedBalancerAddresses(1); - deliverResolvedAddresses(Collections.emptyList(), grpclbBalancerList); - assertEquals(1, fakeOobChannels.size()); - @SuppressWarnings("unused") - ManagedChannel oobChannel = fakeOobChannels.poll(); + StreamObserver lbRequestObserver; + StreamObserver lbResponseObserver; - // First balancer RPC - inOrder.verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); - StreamObserver lbResponseObserver = lbResponseObserverCaptor.getValue(); - assertEquals(1, lbRequestObservers.size()); - StreamObserver lbRequestObserver = lbRequestObservers.poll(); - verify(lbRequestObserver).onNext(eq(expectedInitialRequest)); - assertEquals(0, fakeClock.numPendingTasks(LB_RPC_RETRY_TASK_FILTER)); - - // Balancer closes it immediately (erroneously) - lbResponseObserver.onCompleted(); + getFirstBalancerRPC(expectedInitialRequest, inOrder); // Will start backoff sequence 1 (10ns) inOrder.verify(backoffPolicyProvider).get(); @@ -1811,17 +1796,7 @@ public void grpclbBalancerStreamClosedAndRetried() throws Exception { assertEquals(1, fakeClock.numPendingTasks(LB_RPC_RETRY_TASK_FILTER)); inOrder.verify(helper).refreshNameResolution(); - // Fast-forward to a moment before the retry - fakeClock.forwardNanos(9); - verifyNoMoreInteractions(mockLbService); - // Then time for retry - fakeClock.forwardNanos(1); - inOrder.verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); - lbResponseObserver = lbResponseObserverCaptor.getValue(); - assertEquals(1, lbRequestObservers.size()); - lbRequestObserver = lbRequestObservers.poll(); - verify(lbRequestObserver).onNext(eq(expectedInitialRequest)); - assertEquals(0, fakeClock.numPendingTasks(LB_RPC_RETRY_TASK_FILTER)); + lbResponseObserver = getFastForwardBeforeRetry(9, inOrder, expectedInitialRequest); // Balancer closes it with an error. lbResponseObserver.onError(Status.UNAVAILABLE.asException()); @@ -1832,16 +1807,7 @@ public void grpclbBalancerStreamClosedAndRetried() throws Exception { inOrder.verify(helper).refreshNameResolution(); // Fast-forward to a moment before the retry - fakeClock.forwardNanos(100 - 1); - verifyNoMoreInteractions(mockLbService); - // Then time for retry - fakeClock.forwardNanos(1); - inOrder.verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); - lbResponseObserver = lbResponseObserverCaptor.getValue(); - assertEquals(1, lbRequestObservers.size()); - lbRequestObserver = lbRequestObservers.poll(); - verify(lbRequestObserver).onNext(eq(expectedInitialRequest)); - assertEquals(0, fakeClock.numPendingTasks(LB_RPC_RETRY_TASK_FILTER)); + lbResponseObserver = getFastForwardBeforeRetry(100 - 1, inOrder, expectedInitialRequest); // Balancer sends initial response. lbResponseObserver.onNext(buildInitialResponse()); @@ -1868,21 +1834,58 @@ public void grpclbBalancerStreamClosedAndRetried() throws Exception { inOrder.verify(helper).refreshNameResolution(); // Fast-forward to a moment before the retry, the time spent in the last try is deducted. - fakeClock.forwardNanos(10 - 4 - 1); + getFastForwardBeforeRetry(10 -4 -1, inOrder, expectedInitialRequest); + + // Wrapping up + verify(backoffPolicyProvider, times(2)).get(); + verify(backoffPolicy1, times(2)).nextBackoffNanos(); + verify(backoffPolicy2, times(1)).nextBackoffNanos(); + verify(helper, times(4)).refreshNameResolution(); + } + + private StreamObserver getFastForwardBeforeRetry(int nanos, InOrder inOrder, + LoadBalanceRequest expectedInitialRequest) { + // Fast-forward to a moment before the retry + fakeClock.forwardNanos(nanos); verifyNoMoreInteractions(mockLbService); // Then time for retry fakeClock.forwardNanos(1); inOrder.verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); + StreamObserver lbResponseObserver = lbResponseObserverCaptor.getValue(); assertEquals(1, lbRequestObservers.size()); - lbRequestObserver = lbRequestObservers.poll(); + StreamObserver lbRequestObserver = lbRequestObservers.poll(); verify(lbRequestObserver).onNext(eq(expectedInitialRequest)); assertEquals(0, fakeClock.numPendingTasks(LB_RPC_RETRY_TASK_FILTER)); + return lbResponseObserver; + } - // Wrapping up - verify(backoffPolicyProvider, times(2)).get(); - verify(backoffPolicy1, times(2)).nextBackoffNanos(); - verify(backoffPolicy2, times(1)).nextBackoffNanos(); - verify(helper, times(4)).refreshNameResolution(); + private void getFirstBalancerRPC(LoadBalanceRequest expectedInitialRequest, InOrder inOrder) { + // First balancer RPC + inOrder.verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); + StreamObserver lbResponseObserver = lbResponseObserverCaptor.getValue(); + assertEquals(1, lbRequestObservers.size()); + StreamObserver lbRequestObserver = lbRequestObservers.poll(); + verify(lbRequestObserver).onNext(eq(expectedInitialRequest)); + assertEquals(0, fakeClock.numPendingTasks(LB_RPC_RETRY_TASK_FILTER)); + + // Balancer closes it immediately (erroneously) + lbResponseObserver.onCompleted(); + } + + private LoadBalanceRequest getIntialLoadBalanceRequest() { + LoadBalanceRequest expectedInitialRequest = + LoadBalanceRequest.newBuilder() + .setInitialRequest( + InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) + .build(); + + List grpclbBalancerList = createResolvedBalancerAddresses(1); + deliverResolvedAddresses(Collections.emptyList(), grpclbBalancerList); + + assertEquals(1, fakeOobChannels.size()); + @SuppressWarnings("unused") + ManagedChannel oobChannel = fakeOobChannels.poll(); + return expectedInitialRequest; } @Test From 8882e0f2f48f5b6f0f31fb9ecf358b144214c726 Mon Sep 17 00:00:00 2001 From: SreeramdasLavanya Date: Fri, 8 Nov 2024 11:47:51 +0000 Subject: [PATCH 04/17] grpclb: Checkstyle issues --- .../test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java b/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java index bf506839ded..baf3cfe9e29 100644 --- a/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java +++ b/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java @@ -1788,7 +1788,7 @@ public void grpclbBalancerStreamClosedAndRetried() throws Exception { StreamObserver lbRequestObserver; StreamObserver lbResponseObserver; - getFirstBalancerRPC(expectedInitialRequest, inOrder); + getFirstBalancerRpc(expectedInitialRequest, inOrder); // Will start backoff sequence 1 (10ns) inOrder.verify(backoffPolicyProvider).get(); @@ -1834,7 +1834,7 @@ public void grpclbBalancerStreamClosedAndRetried() throws Exception { inOrder.verify(helper).refreshNameResolution(); // Fast-forward to a moment before the retry, the time spent in the last try is deducted. - getFastForwardBeforeRetry(10 -4 -1, inOrder, expectedInitialRequest); + getFastForwardBeforeRetry(10 - 4 - 1, inOrder, expectedInitialRequest); // Wrapping up verify(backoffPolicyProvider, times(2)).get(); @@ -1859,7 +1859,7 @@ private StreamObserver getFastForwardBeforeRetry(int nanos, return lbResponseObserver; } - private void getFirstBalancerRPC(LoadBalanceRequest expectedInitialRequest, InOrder inOrder) { + private void getFirstBalancerRpc(LoadBalanceRequest expectedInitialRequest, InOrder inOrder) { // First balancer RPC inOrder.verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); StreamObserver lbResponseObserver = lbResponseObserverCaptor.getValue(); From 77143530ba88cb66a54b724cd4ffe3994498abda Mon Sep 17 00:00:00 2001 From: SreeramdasLavanya Date: Tue, 12 Nov 2024 10:14:26 +0000 Subject: [PATCH 05/17] grpclb: Changes for extracted methods as per review comments --- .../grpc/grpclb/GrpclbLoadBalancerTest.java | 30 ++++++++++--------- 1 file changed, 16 insertions(+), 14 deletions(-) diff --git a/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java b/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java index baf3cfe9e29..0cd52954a04 100644 --- a/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java +++ b/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java @@ -1788,7 +1788,10 @@ public void grpclbBalancerStreamClosedAndRetried() throws Exception { StreamObserver lbRequestObserver; StreamObserver lbResponseObserver; - getFirstBalancerRpc(expectedInitialRequest, inOrder); + lbResponseObserver = getFirstBalancerRpc(expectedInitialRequest, inOrder); + + // Balancer closes it immediately (erroneously) + lbResponseObserver.onCompleted(); // Will start backoff sequence 1 (10ns) inOrder.verify(backoffPolicyProvider).get(); @@ -1796,7 +1799,7 @@ public void grpclbBalancerStreamClosedAndRetried() throws Exception { assertEquals(1, fakeClock.numPendingTasks(LB_RPC_RETRY_TASK_FILTER)); inOrder.verify(helper).refreshNameResolution(); - lbResponseObserver = getFastForwardBeforeRetry(9, inOrder, expectedInitialRequest); + lbResponseObserver = fastForwardAndRetry(9, inOrder, expectedInitialRequest); // Balancer closes it with an error. lbResponseObserver.onError(Status.UNAVAILABLE.asException()); @@ -1807,7 +1810,7 @@ public void grpclbBalancerStreamClosedAndRetried() throws Exception { inOrder.verify(helper).refreshNameResolution(); // Fast-forward to a moment before the retry - lbResponseObserver = getFastForwardBeforeRetry(100 - 1, inOrder, expectedInitialRequest); + lbResponseObserver = fastForwardAndRetry(100 - 1, inOrder, expectedInitialRequest); // Balancer sends initial response. lbResponseObserver.onNext(buildInitialResponse()); @@ -1834,7 +1837,7 @@ public void grpclbBalancerStreamClosedAndRetried() throws Exception { inOrder.verify(helper).refreshNameResolution(); // Fast-forward to a moment before the retry, the time spent in the last try is deducted. - getFastForwardBeforeRetry(10 - 4 - 1, inOrder, expectedInitialRequest); + fastForwardAndRetry(10 - 4 - 1, inOrder, expectedInitialRequest); // Wrapping up verify(backoffPolicyProvider, times(2)).get(); @@ -1843,7 +1846,7 @@ public void grpclbBalancerStreamClosedAndRetried() throws Exception { verify(helper, times(4)).refreshNameResolution(); } - private StreamObserver getFastForwardBeforeRetry(int nanos, InOrder inOrder, + private StreamObserver fastForwardAndRetry(int nanos, InOrder inOrder, LoadBalanceRequest expectedInitialRequest) { // Fast-forward to a moment before the retry fakeClock.forwardNanos(nanos); @@ -1852,24 +1855,23 @@ private StreamObserver getFastForwardBeforeRetry(int nanos, fakeClock.forwardNanos(1); inOrder.verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); StreamObserver lbResponseObserver = lbResponseObserverCaptor.getValue(); + verifyRequestProcessed(expectedInitialRequest); + return lbResponseObserver; + } + + private void verifyRequestProcessed(LoadBalanceRequest expectedInitialRequest) { assertEquals(1, lbRequestObservers.size()); StreamObserver lbRequestObserver = lbRequestObservers.poll(); verify(lbRequestObserver).onNext(eq(expectedInitialRequest)); assertEquals(0, fakeClock.numPendingTasks(LB_RPC_RETRY_TASK_FILTER)); - return lbResponseObserver; } - private void getFirstBalancerRpc(LoadBalanceRequest expectedInitialRequest, InOrder inOrder) { + private StreamObserver getFirstBalancerRpc(LoadBalanceRequest expectedInitialRequest, InOrder inOrder) { // First balancer RPC inOrder.verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); StreamObserver lbResponseObserver = lbResponseObserverCaptor.getValue(); - assertEquals(1, lbRequestObservers.size()); - StreamObserver lbRequestObserver = lbRequestObservers.poll(); - verify(lbRequestObserver).onNext(eq(expectedInitialRequest)); - assertEquals(0, fakeClock.numPendingTasks(LB_RPC_RETRY_TASK_FILTER)); - - // Balancer closes it immediately (erroneously) - lbResponseObserver.onCompleted(); + verifyRequestProcessed(expectedInitialRequest); + return lbResponseObserver; } private LoadBalanceRequest getIntialLoadBalanceRequest() { From 958f4002a1e43654267996513571ed9cc1fa3cca Mon Sep 17 00:00:00 2001 From: SreeramdasLavanya Date: Tue, 12 Nov 2024 11:32:51 +0000 Subject: [PATCH 06/17] grpclb: Checkstyle issues --- .../src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java b/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java index 0cd52954a04..1b27151bc1b 100644 --- a/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java +++ b/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java @@ -1866,7 +1866,8 @@ private void verifyRequestProcessed(LoadBalanceRequest expectedInitialRequest) { assertEquals(0, fakeClock.numPendingTasks(LB_RPC_RETRY_TASK_FILTER)); } - private StreamObserver getFirstBalancerRpc(LoadBalanceRequest expectedInitialRequest, InOrder inOrder) { + private StreamObserver getFirstBalancerRpc(LoadBalanceRequest + expectedInitialRequest, InOrder inOrder) { // First balancer RPC inOrder.verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); StreamObserver lbResponseObserver = lbResponseObserverCaptor.getValue(); From 012661e13f68058ed37c40001bd4dc4a063d1c51 Mon Sep 17 00:00:00 2001 From: SreeramdasLavanya Date: Tue, 12 Nov 2024 16:37:54 +0000 Subject: [PATCH 07/17] grpclb: checkstyle fix --- grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java b/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java index 1b27151bc1b..e094f0a3fd5 100644 --- a/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java +++ b/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java @@ -1787,7 +1787,6 @@ public void grpclbBalancerStreamClosedAndRetried() throws Exception { StreamObserver lbRequestObserver; StreamObserver lbResponseObserver; - lbResponseObserver = getFirstBalancerRpc(expectedInitialRequest, inOrder); // Balancer closes it immediately (erroneously) From 1ef7a9fc86aee9dd4ce0f558e38a7221b6c57def Mon Sep 17 00:00:00 2001 From: SreeramdasLavanya Date: Mon, 18 Nov 2024 11:47:40 +0000 Subject: [PATCH 08/17] grpclb: Refactoring for grpclbWorking_pickFirstMode and grpclbWorking_pickFirstMode_lbSendsEmptyAddress --- .../grpc/grpclb/GrpclbLoadBalancerTest.java | 633 +++++++++--------- 1 file changed, 298 insertions(+), 335 deletions(-) diff --git a/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java b/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java index e094f0a3fd5..14a57c36f5d 100644 --- a/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java +++ b/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java @@ -76,6 +76,7 @@ import io.grpc.grpclb.GrpclbState.ErrorEntry; import io.grpc.grpclb.GrpclbState.IdleSubchannelEntry; import io.grpc.grpclb.GrpclbState.Mode; +import io.grpc.grpclb.GrpclbState.RoundRobinEntry; import io.grpc.grpclb.GrpclbState.RoundRobinPicker; import io.grpc.inprocess.InProcessChannelBuilder; import io.grpc.inprocess.InProcessServerBuilder; @@ -161,16 +162,16 @@ public boolean shouldAccept(Runnable command) { delegatesTo(new CachedSubchannelPool(helper))); private final ArrayList logs = new ArrayList<>(); private final ChannelLogger channelLogger = new ChannelLogger() { - @Override - public void log(ChannelLogLevel level, String msg) { - logs.add(level + ": " + msg); - } + @Override + public void log(ChannelLogLevel level, String msg) { + logs.add(level + ": " + msg); + } - @Override - public void log(ChannelLogLevel level, String template, Object... args) { - log(level, MessageFormat.format(template, args)); - } - }; + @Override + public void log(ChannelLogLevel level, String template, Object... args) { + log(level, MessageFormat.format(template, args)); + } + }; private SubchannelPicker currentPicker; private LoadBalancerGrpc.LoadBalancerImplBase mockLbService; @Captor @@ -216,12 +217,12 @@ public StreamObserver balanceLoad( StreamObserver requestObserver = mock(StreamObserver.class); Answer closeRpc = new Answer() { - @Override - public Void answer(InvocationOnMock invocation) { - responseObserver.onCompleted(); - return null; - } - }; + @Override + public Void answer(InvocationOnMock invocation) { + responseObserver.onCompleted(); + return null; + } + }; doAnswer(closeRpc).when(requestObserver).onCompleted(); lbRequestObservers.add(requestObserver); return requestObserver; @@ -246,11 +247,11 @@ public void tearDown() { try { if (balancer != null) { syncContext.execute(new Runnable() { - @Override - public void run() { - balancer.shutdown(); - } - }); + @Override + public void run() { + balancer.shutdown(); + } + }); } for (ManagedChannel channel : oobChannelTracker) { assertTrue(channel + " is shutdown", channel.isShutdown()); @@ -429,7 +430,7 @@ public void loadReporting() { inOrder.verify(lbRequestObserver).onNext( eq(LoadBalanceRequest.newBuilder().setInitialRequest( - InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) + InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) .build())); // Simulate receiving LB response @@ -588,7 +589,7 @@ public void loadReporting() { inOrder.verify(lbRequestObserver).onNext( eq(LoadBalanceRequest.newBuilder().setInitialRequest( - InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) + InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) .build())); // Load reporting is also requested @@ -682,7 +683,7 @@ public void raceBetweenHandleAddressesAndLbStreamClosure() { StreamObserver lbRequestObserver = lbRequestObservers.poll(); verify(lbRequestObserver).onNext( eq(LoadBalanceRequest.newBuilder().setInitialRequest( - InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) + InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) .build())); // Close lbStream @@ -708,7 +709,7 @@ public void raceBetweenHandleAddressesAndLbStreamClosure() { lbRequestObserver = lbRequestObservers.poll(); verify(lbRequestObserver).onNext( eq(LoadBalanceRequest.newBuilder().setInitialRequest( - InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) + InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) .build())); // Simulate a race condition where the task has just started when it's cancelled @@ -729,7 +730,7 @@ public void raceBetweenLoadReportingAndLbStreamClosure() { inOrder.verify(lbRequestObserver).onNext( eq(LoadBalanceRequest.newBuilder().setInitialRequest( - InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) + InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) .build())); // Simulate receiving LB response @@ -913,7 +914,7 @@ public void grpclbWorking() { StreamObserver lbRequestObserver = lbRequestObservers.poll(); verify(lbRequestObserver).onNext( eq(LoadBalanceRequest.newBuilder().setInitialRequest( - InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) + InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) .build())); // Simulate receiving LB response @@ -925,7 +926,8 @@ public void grpclbWorking() { logs.clear(); lbResponseObserver.onNext(buildInitialResponse()); assertThat(logs).containsExactly( - "INFO: [grpclb-] Got an LB initial response: " + buildInitialResponse()); + "INFO: [grpclb-] Got an LB initial response: " + + buildInitialResponse()); logs.clear(); lbResponseObserver.onNext(buildLbResponse(backends1)); @@ -938,48 +940,21 @@ public void grpclbWorking() { assertEquals(2, mockSubchannels.size()); Subchannel subchannel1 = mockSubchannels.poll(); Subchannel subchannel2 = mockSubchannels.poll(); - verify(subchannel1).requestConnection(); - verify(subchannel2).requestConnection(); - assertEquals( - new EquivalentAddressGroup(backends1.get(0).addr, LB_BACKEND_ATTRS), - subchannel1.getAddresses()); - assertEquals( - new EquivalentAddressGroup(backends1.get(1).addr, LB_BACKEND_ATTRS), - subchannel2.getAddresses()); + verifySubchannelRequestConnection(backends1.get(0), backends1.get(1), subchannel1, subchannel2); deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(CONNECTING)); - deliverSubchannelState(subchannel2, ConnectivityStateInfo.forNonError(CONNECTING)); - - inOrder.verify(helper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); - RoundRobinPicker picker0 = (RoundRobinPicker) pickerCaptor.getValue(); - assertThat(picker0.dropList).containsExactly(null, null); - assertThat(picker0.pickList).containsExactly(BUFFER_ENTRY); + connectingBalanceState(subchannel2, ConnectivityStateInfo.forNonError(CONNECTING), inOrder, + CONNECTING, BUFFER_ENTRY); inOrder.verifyNoMoreInteractions(); assertThat(logs).containsExactly( - "DEBUG: [grpclb-] Got an LB response: " + buildLbResponse(backends1)) + "DEBUG: [grpclb-] Got an LB response: " + + buildLbResponse(backends1)) .inOrder(); logs.clear(); // Let subchannels be connected - deliverSubchannelState(subchannel2, ConnectivityStateInfo.forNonError(READY)); - inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture()); - - RoundRobinPicker picker1 = (RoundRobinPicker) pickerCaptor.getValue(); - - assertThat(picker1.dropList).containsExactly(null, null); - assertThat(picker1.pickList).containsExactly( - new BackendEntry(subchannel2, getLoadRecorder(), "token0002")); - - deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(READY)); - inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture()); - - RoundRobinPicker picker2 = (RoundRobinPicker) pickerCaptor.getValue(); - assertThat(picker2.dropList).containsExactly(null, null); - assertThat(picker2.pickList).containsExactly( - new BackendEntry(subchannel1, getLoadRecorder(), "token0001"), - new BackendEntry(subchannel2, getLoadRecorder(), "token0002")) - .inOrder(); + connectSubchannels(inOrder, subchannel1, subchannel2); // Disconnected subchannels verify(subchannel1).requestConnection(); @@ -1018,15 +993,18 @@ public void grpclbWorking() { Arrays.asList( new ServerEntry("127.0.0.1", 2030, "token0003"), // New address new ServerEntry("token0003"), // drop - new ServerEntry("127.0.0.1", 2010, "token0004"), // Existing address with token changed - new ServerEntry("127.0.0.1", 2030, "token0005"), // New address appearing second time + // Existing address with token changed + new ServerEntry("127.0.0.1", 2010, "token0004"), + // New address appearing second time + new ServerEntry("127.0.0.1", 2030, "token0005"), new ServerEntry("token0006")); // drop verify(subchannelPool, never()) .returnSubchannel(same(subchannel1), any(ConnectivityStateInfo.class)); lbResponseObserver.onNext(buildLbResponse(backends2)); assertThat(logs).containsExactly( - "DEBUG: [grpclb-] Got an LB response: " + buildLbResponse(backends2)) + "DEBUG: [grpclb-] Got an LB response: " + + buildLbResponse(backends2)) .inOrder(); logs.clear(); @@ -1200,8 +1178,8 @@ private void subtestGrpclbFallbackTimeout(boolean timerExpires, long timeout) { // Create balancer and backend addresses List backendList = createResolvedBackendAddresses(2); List grpclbBalancerList = createResolvedBalancerAddresses(1); - deliverResolvedAddresses( - backendList, grpclbBalancerList, GrpclbConfig.create(Mode.ROUND_ROBIN, null, timeout)); + deliverResolvedAddresses(backendList, grpclbBalancerList, + GrpclbConfig.create(Mode.ROUND_ROBIN, null, timeout)); inOrder.verify(helper).createOobChannel(eq(xattr(grpclbBalancerList)), eq(lbAuthority(0) + NO_USE_AUTHORITY_SUFFIX)); @@ -1215,7 +1193,7 @@ private void subtestGrpclbFallbackTimeout(boolean timerExpires, long timeout) { verify(lbRequestObserver).onNext( eq(LoadBalanceRequest.newBuilder().setInitialRequest( - InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) + InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) .build())); lbResponseObserver.onNext(buildInitialResponse(loadReportIntervalMillis)); // We don't care if these methods have been run. @@ -1267,14 +1245,7 @@ private void subtestGrpclbFallbackTimeout(boolean timerExpires, long timeout) { } // RPC error status includes message of balancer RPC timeout - inOrder.verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture()); - PickResult result = pickerCaptor.getValue().pickSubchannel(mock(PickSubchannelArgs.class)); - assertThat(result.getStatus().getCode()) - .isEqualTo(Code.UNAVAILABLE); - assertThat(result.getStatus().getDescription()) - .startsWith(GrpclbState.NO_FALLBACK_BACKENDS_STATUS.getDescription()); - assertThat(result.getStatus().getDescription()) - .contains(GrpclbState.BALANCER_TIMEOUT_STATUS.getDescription()); + balancerRequestFallbackErrorStatus(inOrder, GrpclbState.BALANCER_TIMEOUT_STATUS); } //////////////////////////////////////////////////////////////// @@ -1284,8 +1255,8 @@ private void subtestGrpclbFallbackTimeout(boolean timerExpires, long timeout) { subchannelPool.clear(); backendList = createResolvedBackendAddresses(2); grpclbBalancerList = createResolvedBalancerAddresses(1); - deliverResolvedAddresses( - backendList, grpclbBalancerList, GrpclbConfig.create(Mode.ROUND_ROBIN, null, timeout)); + deliverResolvedAddresses(backendList, grpclbBalancerList, + GrpclbConfig.create(Mode.ROUND_ROBIN, null, timeout)); // New LB address is updated to the OobChannel inOrder.verify(helper).updateOobChannelAddresses( @@ -1308,13 +1279,14 @@ private void subtestGrpclbFallbackTimeout(boolean timerExpires, long timeout) { inOrder.verify(helper, never()) .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class)); // A new stream is created - verify(mockLbService, times(2)).balanceLoad(lbResponseObserverCaptor.capture()); + verify(mockLbService, times(2)) + .balanceLoad(lbResponseObserverCaptor.capture()); lbResponseObserver = lbResponseObserverCaptor.getValue(); assertEquals(1, lbRequestObservers.size()); lbRequestObserver = lbRequestObservers.poll(); verify(lbRequestObserver).onNext( eq(LoadBalanceRequest.newBuilder().setInitialRequest( - InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) + InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) .build())); } @@ -1335,8 +1307,8 @@ private void subtestGrpclbFallbackTimeout(boolean timerExpires, long timeout) { /////////////////////////////////////////////////////////////// backendList = createResolvedBackendAddresses(1); grpclbBalancerList = createResolvedBalancerAddresses(1); - deliverResolvedAddresses( - backendList, grpclbBalancerList, GrpclbConfig.create(Mode.ROUND_ROBIN, null, timeout)); + deliverResolvedAddresses(backendList, grpclbBalancerList, + GrpclbConfig.create(Mode.ROUND_ROBIN, null, timeout)); // Will not affect the round robin list at all inOrder.verify(helper, never()) .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class)); @@ -1367,7 +1339,7 @@ public void grpclbFallback_breakLbStreamBeforeFallbackTimerExpires() { verify(lbRequestObserver).onNext( eq(LoadBalanceRequest.newBuilder().setInitialRequest( - InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) + InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) .build())); lbResponseObserver.onNext(buildInitialResponse(loadReportIntervalMillis)); // We don't care if these methods have been run. @@ -1392,12 +1364,13 @@ public void grpclbFallback_breakLbStreamBeforeFallbackTimerExpires() { inOrder, Arrays.asList(backendList.get(0), backendList.get(1))); // A new stream is created - verify(mockLbService, times(2)).balanceLoad(lbResponseObserverCaptor.capture()); + verify(mockLbService, times(2)) + .balanceLoad(lbResponseObserverCaptor.capture()); assertThat(lbRequestObservers).hasSize(1); lbRequestObserver = lbRequestObservers.poll(); verify(lbRequestObserver).onNext( eq(LoadBalanceRequest.newBuilder().setInitialRequest( - InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) + InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) .build())); ////////////////////////////////////////////////////////////////////// @@ -1411,12 +1384,7 @@ public void grpclbFallback_breakLbStreamBeforeFallbackTimerExpires() { } // RPC error status includes error of balancer stream - inOrder.verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture()); - PickResult result = pickerCaptor.getValue().pickSubchannel(mock(PickSubchannelArgs.class)); - assertThat(result.getStatus().getCode()).isEqualTo(Code.UNAVAILABLE); - assertThat(result.getStatus().getDescription()) - .startsWith(GrpclbState.NO_FALLBACK_BACKENDS_STATUS.getDescription()); - assertThat(result.getStatus().getDescription()).contains(streamError.getDescription()); + balancerRequestFallbackErrorStatus(inOrder, streamError); } @Test @@ -1538,7 +1506,7 @@ private void subtestGrpclbFallbackConnectionLost( verify(lbRequestObserver).onNext( eq(LoadBalanceRequest.newBuilder().setInitialRequest( - InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) + InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) .build())); lbResponseObserver.onNext(buildInitialResponse(loadReportIntervalMillis)); // We don't care if these methods have been run. @@ -1628,7 +1596,7 @@ public void grpclbFallback_allLost_failToFallback() { verify(lbRequestObserver).onNext( eq(LoadBalanceRequest.newBuilder().setInitialRequest( - InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) + InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) .build())); lbResponseObserver.onNext(buildInitialResponse(loadReportIntervalMillis)); // We don't care if these methods have been run. @@ -1664,12 +1632,7 @@ public void grpclbFallback_allLost_failToFallback() { } // RPC error status includes errors of subchannels to balancer-provided backends - inOrder.verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture()); - PickResult result = pickerCaptor.getValue().pickSubchannel(mock(PickSubchannelArgs.class)); - assertThat(result.getStatus().getCode()).isEqualTo(Code.UNAVAILABLE); - assertThat(result.getStatus().getDescription()) - .startsWith(GrpclbState.NO_FALLBACK_BACKENDS_STATUS.getDescription()); - assertThat(result.getStatus().getDescription()).contains(error.getDescription()); + balancerRequestFallbackErrorStatus(inOrder, error); } private List fallbackTestVerifyUseOfFallbackBackendLists( @@ -1698,7 +1661,8 @@ private List fallbackTestVerifyUseOfBackendLists( inOrder.verify(subchannelPool).takeOrCreateSubchannel(eq(addr), any(Attributes.class)); } RoundRobinPicker picker = (RoundRobinPicker) currentPicker; - assertThat(picker.dropList).containsExactlyElementsIn(Collections.nCopies(addrs.size(), null)); + assertThat(picker.dropList) + .containsExactlyElementsIn(Collections.nCopies(addrs.size(), null)); assertThat(picker.pickList).containsExactly(GrpclbState.BUFFER_ENTRY); assertEquals(addrs.size(), mockSubchannels.size()); ArrayList subchannels = new ArrayList<>(mockSubchannels); @@ -1748,7 +1712,7 @@ public void grpclbMultipleAuthorities() throws Exception { new FakeSocketAddress("fake-address-3"), lbAttributes("fake-authority-1").toBuilder() .set(GrpclbConstants.TOKEN_ATTRIBUTE_KEY, "value").build() - )); + )); deliverResolvedAddresses(backendList, grpclbBalancerList); List goldenOobEagList = @@ -1894,100 +1858,41 @@ private LoadBalanceRequest getIntialLoadBalanceRequest() { public void grpclbWorking_pickFirstMode() throws Exception { InOrder inOrder = inOrder(helper); - List grpclbBalancerList = createResolvedBalancerAddresses(1); - - deliverResolvedAddresses( - Collections.emptyList(), - grpclbBalancerList, - GrpclbConfig.create(Mode.PICK_FIRST)); - - assertEquals(1, fakeOobChannels.size()); - verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); - StreamObserver lbResponseObserver = lbResponseObserverCaptor.getValue(); - assertEquals(1, lbRequestObservers.size()); - StreamObserver lbRequestObserver = lbRequestObservers.poll(); - verify(lbRequestObserver).onNext( - eq(LoadBalanceRequest.newBuilder().setInitialRequest( - InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) - .build())); + StreamObserver lbResponseObserver = getLoadBalanceResponseStreamObserver(); // Simulate receiving LB response - List backends1 = Arrays.asList( - new ServerEntry("127.0.0.1", 2000, "token0001"), - new ServerEntry("127.0.0.1", 2010, "token0002")); - inOrder.verify(helper, never()) - .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class)); - lbResponseObserver.onNext(buildInitialResponse()); - lbResponseObserver.onNext(buildLbResponse(backends1)); - - inOrder.verify(helper).createSubchannel(createSubchannelArgsCaptor.capture()); - CreateSubchannelArgs createSubchannelArgs = createSubchannelArgsCaptor.getValue(); - assertThat(createSubchannelArgs.getAddresses()) - .containsExactly( - new EquivalentAddressGroup(backends1.get(0).addr, eagAttrsWithToken("token0001")), - new EquivalentAddressGroup(backends1.get(1).addr, eagAttrsWithToken("token0002"))); + simulateReceivingLbResponse(inOrder, lbResponseObserver); // Initially IDLE inOrder.verify(helper).updateBalancingState(eq(IDLE), pickerCaptor.capture()); RoundRobinPicker picker0 = (RoundRobinPicker) pickerCaptor.getValue(); // Only one subchannel is created - assertThat(mockSubchannels).hasSize(1); - Subchannel subchannel = mockSubchannels.poll(); - assertThat(picker0.dropList).containsExactly(null, null); - assertThat(picker0.pickList).containsExactly(new IdleSubchannelEntry(subchannel, syncContext)); + Subchannel subchannel = getOneSubchannel(picker0); // PICK_FIRST doesn't eagerly connect verify(subchannel, never()).requestConnection(); // CONNECTING - deliverSubchannelState(subchannel, ConnectivityStateInfo.forNonError(CONNECTING)); - inOrder.verify(helper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); - RoundRobinPicker picker1 = (RoundRobinPicker) pickerCaptor.getValue(); - assertThat(picker1.dropList).containsExactly(null, null); - assertThat(picker1.pickList).containsExactly(BUFFER_ENTRY); + connectingBalanceState(subchannel, ConnectivityStateInfo.forNonError(CONNECTING), inOrder, + CONNECTING, BUFFER_ENTRY); // TRANSIENT_FAILURE Status error = Status.UNAVAILABLE.withDescription("Simulated connection error"); - deliverSubchannelState(subchannel, ConnectivityStateInfo.forTransientFailure(error)); - inOrder.verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture()); - RoundRobinPicker picker2 = (RoundRobinPicker) pickerCaptor.getValue(); - assertThat(picker2.dropList).containsExactly(null, null); - assertThat(picker2.pickList).containsExactly(new ErrorEntry(error)); + connectingBalanceState(subchannel, ConnectivityStateInfo.forTransientFailure(error), inOrder, + TRANSIENT_FAILURE, new ErrorEntry(error)); // READY - deliverSubchannelState(subchannel, ConnectivityStateInfo.forNonError(READY)); - inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture()); - RoundRobinPicker picker3 = (RoundRobinPicker) pickerCaptor.getValue(); - assertThat(picker3.dropList).containsExactly(null, null); - assertThat(picker3.pickList).containsExactly( + connectingBalanceState(subchannel, ConnectivityStateInfo.forNonError(READY), inOrder, READY, new BackendEntry(subchannel, new TokenAttachingTracerFactory(getLoadRecorder()))); - // New server list with drops - List backends2 = Arrays.asList( - new ServerEntry("127.0.0.1", 2000, "token0001"), - new ServerEntry("token0003"), // drop - new ServerEntry("127.0.0.1", 2020, "token0004")); - inOrder.verify(helper, never()) - .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class)); - lbResponseObserver.onNext(buildLbResponse(backends2)); + List backends2 = testRecoverNewLbResponseAddress( + inOrder, lbResponseObserver); // new addresses will be updated to the existing subchannel // createSubchannel() has ever been called only once - verify(helper, times(1)).createSubchannel(any(CreateSubchannelArgs.class)); - assertThat(mockSubchannels).isEmpty(); - verify(subchannel).updateAddresses( - eq(Arrays.asList( - new EquivalentAddressGroup(backends2.get(0).addr, eagAttrsWithToken("token0001")), - new EquivalentAddressGroup(backends2.get(2).addr, - eagAttrsWithToken("token0004"))))); - inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture()); - RoundRobinPicker picker4 = (RoundRobinPicker) pickerCaptor.getValue(); - assertThat(picker4.dropList).containsExactly( - null, new DropEntry(getLoadRecorder(), "token0003"), null); - assertThat(picker4.pickList).containsExactly( - new BackendEntry(subchannel, new TokenAttachingTracerFactory(getLoadRecorder()))); + createNewAddressSubchannel(inOrder, subchannel, backends2); // Subchannel goes IDLE, but PICK_FIRST will not try to reconnect deliverSubchannelState(subchannel, ConnectivityStateInfo.forNonError(IDLE)); @@ -2012,76 +1917,52 @@ public void grpclbWorking_pickFirstMode() throws Exception { .returnSubchannel(any(Subchannel.class), any(ConnectivityStateInfo.class)); } + private void createNewAddressSubchannel(InOrder inOrder, Subchannel subchannel, + List backends2) { + verify(helper, times(1)) + .createSubchannel(any(CreateSubchannelArgs.class)); + assertThat(mockSubchannels).isEmpty(); + verify(subchannel).updateAddresses( + eq(Arrays.asList( + new EquivalentAddressGroup(backends2.get(0).addr, eagAttrsWithToken("token0001")), + new EquivalentAddressGroup(backends2.get(2).addr, + eagAttrsWithToken("token0004"))))); + inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture()); + RoundRobinPicker picker4 = (RoundRobinPicker) pickerCaptor.getValue(); + assertThat(picker4.dropList).containsExactly( + null, new DropEntry(getLoadRecorder(), "token0003"), null); + assertThat(picker4.pickList).containsExactly( + new BackendEntry(subchannel, new TokenAttachingTracerFactory(getLoadRecorder()))); + } + @Test public void grpclbWorking_pickFirstMode_lbSendsEmptyAddress() throws Exception { InOrder inOrder = inOrder(helper); - List grpclbBalancerList = createResolvedBalancerAddresses(1); - deliverResolvedAddresses( - Collections.emptyList(), - grpclbBalancerList, - GrpclbConfig.create(Mode.PICK_FIRST)); - - assertEquals(1, fakeOobChannels.size()); - verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); - StreamObserver lbResponseObserver = lbResponseObserverCaptor.getValue(); - assertEquals(1, lbRequestObservers.size()); - StreamObserver lbRequestObserver = lbRequestObservers.poll(); - verify(lbRequestObserver).onNext( - eq(LoadBalanceRequest.newBuilder().setInitialRequest( - InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) - .build())); - - // Simulate receiving LB response - List backends1 = Arrays.asList( - new ServerEntry("127.0.0.1", 2000, "token0001"), - new ServerEntry("127.0.0.1", 2010, "token0002")); - inOrder.verify(helper, never()) - .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class)); - lbResponseObserver.onNext(buildInitialResponse()); - lbResponseObserver.onNext(buildLbResponse(backends1)); + StreamObserver lbResponseObserver = getLoadBalanceResponseStreamObserver(); - inOrder.verify(helper).createSubchannel(createSubchannelArgsCaptor.capture()); - CreateSubchannelArgs createSubchannelArgs = createSubchannelArgsCaptor.getValue(); - assertThat(createSubchannelArgs.getAddresses()) - .containsExactly( - new EquivalentAddressGroup(backends1.get(0).addr, eagAttrsWithToken("token0001")), - new EquivalentAddressGroup(backends1.get(1).addr, eagAttrsWithToken("token0002"))); + simulateReceivingLbResponse(inOrder, lbResponseObserver); // Initially IDLE inOrder.verify(helper).updateBalancingState(eq(IDLE), pickerCaptor.capture()); RoundRobinPicker picker0 = (RoundRobinPicker) pickerCaptor.getValue(); - // Only one subchannel is created - assertThat(mockSubchannels).hasSize(1); - Subchannel subchannel = mockSubchannels.poll(); - assertThat(picker0.dropList).containsExactly(null, null); - assertThat(picker0.pickList).containsExactly(new IdleSubchannelEntry(subchannel, syncContext)); + Subchannel subchannel = getOneSubchannel(picker0); // PICK_FIRST doesn't eagerly connect verify(subchannel, never()).requestConnection(); // CONNECTING - deliverSubchannelState(subchannel, ConnectivityStateInfo.forNonError(CONNECTING)); - inOrder.verify(helper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); - RoundRobinPicker picker1 = (RoundRobinPicker) pickerCaptor.getValue(); - assertThat(picker1.dropList).containsExactly(null, null); - assertThat(picker1.pickList).containsExactly(BUFFER_ENTRY); + connectingBalanceState(subchannel, ConnectivityStateInfo.forNonError(CONNECTING), inOrder, + CONNECTING, BUFFER_ENTRY); // TRANSIENT_FAILURE Status error = Status.UNAVAILABLE.withDescription("Simulated connection error"); - deliverSubchannelState(subchannel, ConnectivityStateInfo.forTransientFailure(error)); - inOrder.verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture()); - RoundRobinPicker picker2 = (RoundRobinPicker) pickerCaptor.getValue(); - assertThat(picker2.dropList).containsExactly(null, null); - assertThat(picker2.pickList).containsExactly(new ErrorEntry(error)); + connectingBalanceState(subchannel, ConnectivityStateInfo.forTransientFailure(error), inOrder, + TRANSIENT_FAILURE, new ErrorEntry(error)); // READY - deliverSubchannelState(subchannel, ConnectivityStateInfo.forNonError(READY)); - inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture()); - RoundRobinPicker picker3 = (RoundRobinPicker) pickerCaptor.getValue(); - assertThat(picker3.dropList).containsExactly(null, null); - assertThat(picker3.pickList).containsExactly( + connectingBalanceState(subchannel, ConnectivityStateInfo.forNonError(READY), inOrder, READY, new BackendEntry(subchannel, new TokenAttachingTracerFactory(getLoadRecorder()))); inOrder.verify(helper, never()) @@ -2096,14 +1977,23 @@ public void grpclbWorking_pickFirstMode_lbSendsEmptyAddress() throws Exception { assertThat(mockSubchannels).isEmpty(); verify(subchannel).shutdown(); - // RPC error status includes message of no backends provided by balancer - inOrder.verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture()); - RoundRobinPicker errorPicker = (RoundRobinPicker) pickerCaptor.getValue(); - assertThat(errorPicker.pickList) - .containsExactly(new ErrorEntry(GrpclbState.NO_AVAILABLE_BACKENDS_STATUS)); + noAvailableBalanceStatus(inOrder); lbResponseObserver.onNext(buildLbResponse(Collections.emptyList())); + testRecoverNewLbResponseAddress(inOrder, lbResponseObserver); + + // new addresses will be updated to the existing subchannel + inOrder.verify(helper, times(1)) + .createSubchannel(any(CreateSubchannelArgs.class)); + inOrder.verify(helper).updateBalancingState(eq(IDLE), pickerCaptor.capture()); + subchannel = mockSubchannels.poll(); + + readySubchannel(inOrder, subchannel); + } + + private List testRecoverNewLbResponseAddress(InOrder inOrder, + StreamObserver lbResponseObserver) { // Test recover from new LB response with addresses // New server list with drops List backends2 = Arrays.asList( @@ -2113,12 +2003,10 @@ public void grpclbWorking_pickFirstMode_lbSendsEmptyAddress() throws Exception { inOrder.verify(helper, never()) .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class)); lbResponseObserver.onNext(buildLbResponse(backends2)); + return backends2; + } - // new addresses will be updated to the existing subchannel - inOrder.verify(helper, times(1)).createSubchannel(any(CreateSubchannelArgs.class)); - inOrder.verify(helper).updateBalancingState(eq(IDLE), pickerCaptor.capture()); - subchannel = mockSubchannels.poll(); - + private void readySubchannel(InOrder inOrder, Subchannel subchannel) { // Subchannel became READY deliverSubchannelState(subchannel, ConnectivityStateInfo.forNonError(CONNECTING)); deliverSubchannelState(subchannel, ConnectivityStateInfo.forNonError(READY)); @@ -2128,6 +2016,70 @@ public void grpclbWorking_pickFirstMode_lbSendsEmptyAddress() throws Exception { new BackendEntry(subchannel, new TokenAttachingTracerFactory(getLoadRecorder()))); } + private void noAvailableBalanceStatus(InOrder inOrder) { + // RPC error status includes message of no backends provided by balancer + inOrder.verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture()); + RoundRobinPicker errorPicker = (RoundRobinPicker) pickerCaptor.getValue(); + assertThat(errorPicker.pickList) + .containsExactly(new ErrorEntry(GrpclbState.NO_AVAILABLE_BACKENDS_STATUS)); + } + + private void connectingBalanceState(Subchannel subchannel, ConnectivityStateInfo CONNECTING, + InOrder inOrder, ConnectivityState connecting, RoundRobinEntry bufferEntry) { + deliverSubchannelState(subchannel, CONNECTING); + inOrder.verify(helper).updateBalancingState(eq(connecting), pickerCaptor.capture()); + RoundRobinPicker picker1 = (RoundRobinPicker) pickerCaptor.getValue(); + assertThat(picker1.dropList).containsExactly(null, null); + assertThat(picker1.pickList).containsExactly(bufferEntry); + } + + private Subchannel getOneSubchannel(RoundRobinPicker picker0) { + // Only one subchannel is created + assertThat(mockSubchannels).hasSize(1); + Subchannel subchannel = mockSubchannels.poll(); + assertThat(picker0.dropList).containsExactly(null, null); + assertThat(picker0.pickList).containsExactly(new IdleSubchannelEntry(subchannel, syncContext)); + return subchannel; + } + + private void simulateReceivingLbResponse(InOrder inOrder, + StreamObserver lbResponseObserver) { + // Simulate receiving LB response + List backends1 = Arrays.asList( + new ServerEntry("127.0.0.1", 2000, "token0001"), + new ServerEntry("127.0.0.1", 2010, "token0002")); + inOrder.verify(helper, never()) + .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class)); + lbResponseObserver.onNext(buildInitialResponse()); + lbResponseObserver.onNext(buildLbResponse(backends1)); + + inOrder.verify(helper).createSubchannel(createSubchannelArgsCaptor.capture()); + CreateSubchannelArgs createSubchannelArgs = createSubchannelArgsCaptor.getValue(); + assertThat(createSubchannelArgs.getAddresses()) + .containsExactly( + new EquivalentAddressGroup(backends1.get(0).addr, eagAttrsWithToken("token0001")), + new EquivalentAddressGroup(backends1.get(1).addr, eagAttrsWithToken("token0002"))); + } + + private StreamObserver getLoadBalanceResponseStreamObserver() { + List grpclbBalancerList = createResolvedBalancerAddresses(1); + deliverResolvedAddresses( + Collections.emptyList(), + grpclbBalancerList, + GrpclbConfig.create(Mode.PICK_FIRST)); + + assertEquals(1, fakeOobChannels.size()); + verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); + StreamObserver lbResponseObserver = lbResponseObserverCaptor.getValue(); + assertEquals(1, lbRequestObservers.size()); + StreamObserver lbRequestObserver = lbRequestObservers.poll(); + verify(lbRequestObserver).onNext( + eq(LoadBalanceRequest.newBuilder().setInitialRequest( + InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) + .build())); + return lbResponseObserver; + } + @Test public void shutdownWithoutSubchannel_roundRobin() throws Exception { subtestShutdownWithoutSubchannel(GrpclbConfig.create(Mode.ROUND_ROBIN)); @@ -2172,8 +2124,8 @@ private void pickFirstModeFallback(long timeout) throws Exception { // Name resolver returns balancer and backend addresses List backendList = createResolvedBackendAddresses(2); List grpclbBalancerList = createResolvedBalancerAddresses(1); - deliverResolvedAddresses( - backendList, grpclbBalancerList, GrpclbConfig.create(Mode.PICK_FIRST, null, timeout)); + deliverResolvedAddresses(backendList, grpclbBalancerList, GrpclbConfig.create(Mode.PICK_FIRST, + null, timeout)); // Attempted to connect to balancer assertEquals(1, fakeOobChannels.size()); @@ -2198,11 +2150,7 @@ private void pickFirstModeFallback(long timeout) throws Exception { RoundRobinPicker picker0 = (RoundRobinPicker) pickerCaptor.getValue(); // READY - deliverSubchannelState(subchannel, ConnectivityStateInfo.forNonError(READY)); - inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture()); - RoundRobinPicker picker1 = (RoundRobinPicker) pickerCaptor.getValue(); - assertThat(picker1.dropList).containsExactly(null, null); - assertThat(picker1.pickList).containsExactly( + connectingBalanceState(subchannel, ConnectivityStateInfo.forNonError(READY), inOrder, READY, new BackendEntry(subchannel, new TokenAttachingTracerFactory(null))); assertThat(picker0.dropList).containsExactly(null, null); @@ -2258,7 +2206,7 @@ public void switchMode() throws Exception { StreamObserver lbRequestObserver = lbRequestObservers.poll(); verify(lbRequestObserver).onNext( eq(LoadBalanceRequest.newBuilder().setInitialRequest( - InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) + InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) .build())); // Simulate receiving LB response @@ -2299,13 +2247,14 @@ public void switchMode() throws Exception { // A new LB stream is created assertEquals(1, fakeOobChannels.size()); - verify(mockLbService, times(2)).balanceLoad(lbResponseObserverCaptor.capture()); + verify(mockLbService, times(2)) + .balanceLoad(lbResponseObserverCaptor.capture()); lbResponseObserver = lbResponseObserverCaptor.getValue(); assertEquals(1, lbRequestObservers.size()); lbRequestObserver = lbRequestObservers.poll(); verify(lbRequestObserver).onNext( eq(LoadBalanceRequest.newBuilder().setInitialRequest( - InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) + InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) .build())); // Simulate receiving LB response @@ -2346,7 +2295,7 @@ public void switchMode_nullLbPolicy() throws Exception { StreamObserver lbRequestObserver = lbRequestObservers.poll(); verify(lbRequestObserver).onNext( eq(LoadBalanceRequest.newBuilder().setInitialRequest( - InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) + InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) .build())); // Simulate receiving LB response @@ -2387,13 +2336,14 @@ public void switchMode_nullLbPolicy() throws Exception { // A new LB stream is created assertEquals(1, fakeOobChannels.size()); - verify(mockLbService, times(2)).balanceLoad(lbResponseObserverCaptor.capture()); + verify(mockLbService, times(2)) + .balanceLoad(lbResponseObserverCaptor.capture()); lbResponseObserver = lbResponseObserverCaptor.getValue(); assertEquals(1, lbRequestObservers.size()); lbRequestObserver = lbRequestObservers.poll(); verify(lbRequestObserver).onNext( eq(LoadBalanceRequest.newBuilder().setInitialRequest( - InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) + InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) .build())); // Simulate receiving LB response @@ -2433,7 +2383,7 @@ public void switchServiceName() throws Exception { StreamObserver lbRequestObserver = lbRequestObservers.poll(); verify(lbRequestObserver).onNext( eq(LoadBalanceRequest.newBuilder().setInitialRequest( - InitialLoadBalanceRequest.newBuilder().setName(serviceName).build()) + InitialLoadBalanceRequest.newBuilder().setName(serviceName).build()) .build())); // Simulate receiving LB response @@ -2475,12 +2425,13 @@ public void switchServiceName() throws Exception { .returnSubchannel(same(subchannel2), eq(ConnectivityStateInfo.forNonError(IDLE))); assertEquals(1, fakeOobChannels.size()); - verify(mockLbService, times(2)).balanceLoad(lbResponseObserverCaptor.capture()); + verify(mockLbService, times(2)) + .balanceLoad(lbResponseObserverCaptor.capture()); assertEquals(1, lbRequestObservers.size()); lbRequestObserver = lbRequestObservers.poll(); verify(lbRequestObserver).onNext( eq(LoadBalanceRequest.newBuilder().setInitialRequest( - InitialLoadBalanceRequest.newBuilder().setName(serviceName).build()) + InitialLoadBalanceRequest.newBuilder().setName(serviceName).build()) .build())); } @@ -2510,70 +2461,29 @@ public void grpclbWorking_lbSendsFallbackMessage() { // Simulate receiving LB response ServerEntry backend1a = new ServerEntry("127.0.0.1", 2000, "token0001"); ServerEntry backend1b = new ServerEntry("127.0.0.1", 2010, "token0002"); - List backends1 = Arrays.asList(backend1a, backend1b); - inOrder.verify(helper, never()) - .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class)); - logs.clear(); - lbResponseObserver.onNext(buildInitialResponse()); - assertThat(logs).containsExactly( - "INFO: [grpclb-] Got an LB initial response: " + buildInitialResponse()); - logs.clear(); - lbResponseObserver.onNext(buildLbResponse(backends1)); - - inOrder.verify(subchannelPool).takeOrCreateSubchannel( - eq(new EquivalentAddressGroup(backend1a.addr, LB_BACKEND_ATTRS)), - any(Attributes.class)); - inOrder.verify(subchannelPool).takeOrCreateSubchannel( - eq(new EquivalentAddressGroup(backend1b.addr, LB_BACKEND_ATTRS)), - any(Attributes.class)); + List backends1 = simulateReceivingLbResponse( + inOrder, lbResponseObserver, backend1a, backend1b); assertEquals(2, mockSubchannels.size()); Subchannel subchannel1 = mockSubchannels.poll(); Subchannel subchannel2 = mockSubchannels.poll(); - verify(subchannel1).requestConnection(); - verify(subchannel2).requestConnection(); - assertEquals( - new EquivalentAddressGroup(backend1a.addr, LB_BACKEND_ATTRS), - subchannel1.getAddresses()); - assertEquals( - new EquivalentAddressGroup(backend1b.addr, LB_BACKEND_ATTRS), - subchannel2.getAddresses()); + verifySubchannelRequestConnection(backend1a, backend1b, subchannel1, subchannel2); deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(CONNECTING)); - deliverSubchannelState(subchannel2, ConnectivityStateInfo.forNonError(CONNECTING)); - - inOrder.verify(helper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); - RoundRobinPicker picker0 = (RoundRobinPicker) pickerCaptor.getValue(); - assertThat(picker0.dropList).containsExactly(null, null); - assertThat(picker0.pickList).containsExactly(BUFFER_ENTRY); + connectingBalanceState(subchannel2, ConnectivityStateInfo.forNonError(CONNECTING), inOrder, + CONNECTING, BUFFER_ENTRY); inOrder.verifyNoMoreInteractions(); assertThat(logs) .containsExactly( - "DEBUG: [grpclb-] Got an LB response: " + buildLbResponse(backends1)) + "DEBUG: [grpclb-] Got an LB response: " + + buildLbResponse(backends1)) .inOrder(); logs.clear(); // Let subchannels be connected - deliverSubchannelState(subchannel2, ConnectivityStateInfo.forNonError(READY)); - inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture()); - - RoundRobinPicker picker1 = (RoundRobinPicker) pickerCaptor.getValue(); - - assertThat(picker1.dropList).containsExactly(null, null); - assertThat(picker1.pickList).containsExactly( - new BackendEntry(subchannel2, getLoadRecorder(), "token0002")); - - deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(READY)); - inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture()); - - RoundRobinPicker picker2 = (RoundRobinPicker) pickerCaptor.getValue(); - assertThat(picker2.dropList).containsExactly(null, null); - assertThat(picker2.pickList).containsExactly( - new BackendEntry(subchannel1, getLoadRecorder(), "token0001"), - new BackendEntry(subchannel2, getLoadRecorder(), "token0002")) - .inOrder(); + connectSubchannels(inOrder, subchannel1, subchannel2); // Balancer forces entering fallback mode lbResponseObserver.onNext(buildLbFallbackResponse()); @@ -2600,66 +2510,90 @@ public void grpclbWorking_lbSendsFallbackMessage() { verify(subchannelPool).returnSubchannel(eq(subchannel), any(ConnectivityStateInfo.class)); } - // RPC error status includes message of fallback requested by balancer - inOrder.verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture()); - PickResult result = pickerCaptor.getValue().pickSubchannel(mock(PickSubchannelArgs.class)); - assertThat(result.getStatus().getCode()).isEqualTo(Code.UNAVAILABLE); - assertThat(result.getStatus().getDescription()) - .startsWith(GrpclbState.NO_FALLBACK_BACKENDS_STATUS.getDescription()); - assertThat(result.getStatus().getDescription()) - .contains(GrpclbState.BALANCER_REQUESTED_FALLBACK_STATUS.getDescription()); + balancerRequestFallbackErrorStatus(inOrder, GrpclbState.BALANCER_REQUESTED_FALLBACK_STATUS); // exit fall back by providing two new backends ServerEntry backend2a = new ServerEntry("127.0.0.1", 8000, "token1001"); ServerEntry backend2b = new ServerEntry("127.0.0.1", 8010, "token1002"); List backends2 = Arrays.asList(backend2a, backend2b); - inOrder.verify(helper, never()) - .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class)); - logs.clear(); - lbResponseObserver.onNext(buildLbResponse(backends2)); - - inOrder.verify(subchannelPool).takeOrCreateSubchannel( - eq(new EquivalentAddressGroup(backend2a.addr, LB_BACKEND_ATTRS)), - any(Attributes.class)); - inOrder.verify(subchannelPool).takeOrCreateSubchannel( - eq(new EquivalentAddressGroup(backend2b.addr, LB_BACKEND_ATTRS)), - any(Attributes.class)); + exitFallbackWithNewBackends(inOrder, lbResponseObserver, backend2a, backend2b, backends2); assertEquals(2, mockSubchannels.size()); Subchannel subchannel3 = mockSubchannels.poll(); Subchannel subchannel4 = mockSubchannels.poll(); - verify(subchannel3).requestConnection(); - verify(subchannel4).requestConnection(); - assertEquals( - new EquivalentAddressGroup(backend2a.addr, LB_BACKEND_ATTRS), - subchannel3.getAddresses()); - assertEquals( - new EquivalentAddressGroup(backend2b.addr, LB_BACKEND_ATTRS), - subchannel4.getAddresses()); + verifySubchannelRequestConnection(backend2a, backend2b, subchannel3, subchannel4); deliverSubchannelState(subchannel3, ConnectivityStateInfo.forNonError(CONNECTING)); - deliverSubchannelState(subchannel4, ConnectivityStateInfo.forNonError(CONNECTING)); - - inOrder.verify(helper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); - RoundRobinPicker picker6 = (RoundRobinPicker) pickerCaptor.getValue(); - assertThat(picker6.dropList).containsExactly(null, null); - assertThat(picker6.pickList).containsExactly(BUFFER_ENTRY); + connectingBalanceState(subchannel4, ConnectivityStateInfo.forNonError(CONNECTING), inOrder, + CONNECTING, BUFFER_ENTRY); inOrder.verifyNoMoreInteractions(); assertThat(logs) .containsExactly( - "DEBUG: [grpclb-] Got an LB response: " + buildLbResponse(backends2)) + "DEBUG: [grpclb-] Got an LB response: " + + buildLbResponse(backends2)) .inOrder(); logs.clear(); // Let new subchannels be connected - deliverSubchannelState(subchannel3, ConnectivityStateInfo.forNonError(READY)); - inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture()); + connectNewSubchannels(inOrder, subchannel3, subchannel4); + } - RoundRobinPicker picker3 = (RoundRobinPicker) pickerCaptor.getValue(); - assertThat(picker3.dropList).containsExactly(null, null); - assertThat(picker3.pickList).containsExactly( + private static void verifySubchannelRequestConnection(ServerEntry backend1a, + ServerEntry backend1b, Subchannel subchannel1, Subchannel subchannel2) { + verify(subchannel1).requestConnection(); + verify(subchannel2).requestConnection(); + assertEquals( + new EquivalentAddressGroup(backend1a.addr, LB_BACKEND_ATTRS), + subchannel1.getAddresses()); + assertEquals( + new EquivalentAddressGroup(backend1b.addr, LB_BACKEND_ATTRS), + subchannel2.getAddresses()); + } + + private List simulateReceivingLbResponse(InOrder inOrder, + StreamObserver lbResponseObserver, ServerEntry backend1a, + ServerEntry backend1b) { + List backends1 = Arrays.asList(backend1a, backend1b); + inOrder.verify(helper, never()) + .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class)); + logs.clear(); + lbResponseObserver.onNext(buildInitialResponse()); + assertThat(logs).containsExactly( + "INFO: [grpclb-] Got an LB initial response: " + + buildInitialResponse()); + logs.clear(); + lbResponseObserver.onNext(buildLbResponse(backends1)); + + inOrder.verify(subchannelPool).takeOrCreateSubchannel( + eq(new EquivalentAddressGroup(backend1a.addr, LB_BACKEND_ATTRS)), + any(Attributes.class)); + inOrder.verify(subchannelPool).takeOrCreateSubchannel( + eq(new EquivalentAddressGroup(backend1b.addr, LB_BACKEND_ATTRS)), + any(Attributes.class)); + return backends1; + } + + private void exitFallbackWithNewBackends(InOrder inOrder, + StreamObserver lbResponseObserver, + ServerEntry backend2a, ServerEntry backend2b, List backends2) { + inOrder.verify(helper, never()) + .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class)); + logs.clear(); + lbResponseObserver.onNext(buildLbResponse(backends2)); + + inOrder.verify(subchannelPool).takeOrCreateSubchannel( + eq(new EquivalentAddressGroup(backend2a.addr, LB_BACKEND_ATTRS)), + any(Attributes.class)); + inOrder.verify(subchannelPool).takeOrCreateSubchannel( + eq(new EquivalentAddressGroup(backend2b.addr, LB_BACKEND_ATTRS)), + any(Attributes.class)); + } + + private void connectNewSubchannels(InOrder inOrder, Subchannel subchannel3, + Subchannel subchannel4) { + connectingBalanceState(subchannel3, ConnectivityStateInfo.forNonError(READY), inOrder, READY, new BackendEntry(subchannel3, getLoadRecorder(), "token1001")); deliverSubchannelState(subchannel4, ConnectivityStateInfo.forNonError(READY)); @@ -2668,8 +2602,37 @@ public void grpclbWorking_lbSendsFallbackMessage() { RoundRobinPicker picker4 = (RoundRobinPicker) pickerCaptor.getValue(); assertThat(picker4.dropList).containsExactly(null, null); assertThat(picker4.pickList).containsExactly( - new BackendEntry(subchannel3, getLoadRecorder(), "token1001"), - new BackendEntry(subchannel4, getLoadRecorder(), "token1002")) + new BackendEntry(subchannel3, getLoadRecorder(), "token1001"), + new BackendEntry(subchannel4, getLoadRecorder(), "token1002")) + .inOrder(); + } + + private void balancerRequestFallbackErrorStatus(InOrder inOrder, + Status balancerRequestedFallbackStatus) { + // RPC error status includes message of fallback requested by balancer + inOrder.verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture()); + PickResult result = pickerCaptor.getValue().pickSubchannel(mock(PickSubchannelArgs.class)); + assertThat(result.getStatus().getCode()).isEqualTo(Code.UNAVAILABLE); + assertThat(result.getStatus().getDescription()) + .startsWith(GrpclbState.NO_FALLBACK_BACKENDS_STATUS.getDescription()); + assertThat(result.getStatus().getDescription()) + .contains(balancerRequestedFallbackStatus.getDescription()); + } + + private void connectSubchannels(InOrder inOrder, Subchannel subchannel1, + Subchannel subchannel2) { + // Let subchannels be connected + connectingBalanceState(subchannel2, ConnectivityStateInfo.forNonError(READY), inOrder, READY, + new BackendEntry(subchannel2, getLoadRecorder(), "token0002")); + + deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(READY)); + inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture()); + + RoundRobinPicker picker2 = (RoundRobinPicker) pickerCaptor.getValue(); + assertThat(picker2.dropList).containsExactly(null, null); + assertThat(picker2.pickList).containsExactly( + new BackendEntry(subchannel1, getLoadRecorder(), "token0001"), + new BackendEntry(subchannel2, getLoadRecorder(), "token0002")) .inOrder(); } From bb5caa84a5da3c4a482bd62268a6b06625e5ec31 Mon Sep 17 00:00:00 2001 From: SreeramdasLavanya Date: Mon, 18 Nov 2024 12:15:10 +0000 Subject: [PATCH 09/17] grpclb: Checkstyle issues resolved --- .../grpc/grpclb/GrpclbLoadBalancerTest.java | 50 +++++++++---------- 1 file changed, 25 insertions(+), 25 deletions(-) diff --git a/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java b/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java index 14a57c36f5d..2efd422edad 100644 --- a/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java +++ b/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java @@ -2024,9 +2024,9 @@ private void noAvailableBalanceStatus(InOrder inOrder) { .containsExactly(new ErrorEntry(GrpclbState.NO_AVAILABLE_BACKENDS_STATUS)); } - private void connectingBalanceState(Subchannel subchannel, ConnectivityStateInfo CONNECTING, + private void connectingBalanceState(Subchannel subchannel, ConnectivityStateInfo connectingInfo, InOrder inOrder, ConnectivityState connecting, RoundRobinEntry bufferEntry) { - deliverSubchannelState(subchannel, CONNECTING); + deliverSubchannelState(subchannel, connectingInfo); inOrder.verify(helper).updateBalancingState(eq(connecting), pickerCaptor.capture()); RoundRobinPicker picker1 = (RoundRobinPicker) pickerCaptor.getValue(); assertThat(picker1.dropList).containsExactly(null, null); @@ -2061,6 +2061,29 @@ private void simulateReceivingLbResponse(InOrder inOrder, new EquivalentAddressGroup(backends1.get(1).addr, eagAttrsWithToken("token0002"))); } + private List simulateReceivingLbResponse(InOrder inOrder, + StreamObserver lbResponseObserver, ServerEntry backend1a, + ServerEntry backend1b) { + List backends1 = Arrays.asList(backend1a, backend1b); + inOrder.verify(helper, never()) + .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class)); + logs.clear(); + lbResponseObserver.onNext(buildInitialResponse()); + assertThat(logs).containsExactly( + "INFO: [grpclb-] Got an LB initial response: " + + buildInitialResponse()); + logs.clear(); + lbResponseObserver.onNext(buildLbResponse(backends1)); + + inOrder.verify(subchannelPool).takeOrCreateSubchannel( + eq(new EquivalentAddressGroup(backend1a.addr, LB_BACKEND_ATTRS)), + any(Attributes.class)); + inOrder.verify(subchannelPool).takeOrCreateSubchannel( + eq(new EquivalentAddressGroup(backend1b.addr, LB_BACKEND_ATTRS)), + any(Attributes.class)); + return backends1; + } + private StreamObserver getLoadBalanceResponseStreamObserver() { List grpclbBalancerList = createResolvedBalancerAddresses(1); deliverResolvedAddresses( @@ -2552,29 +2575,6 @@ private static void verifySubchannelRequestConnection(ServerEntry backend1a, subchannel2.getAddresses()); } - private List simulateReceivingLbResponse(InOrder inOrder, - StreamObserver lbResponseObserver, ServerEntry backend1a, - ServerEntry backend1b) { - List backends1 = Arrays.asList(backend1a, backend1b); - inOrder.verify(helper, never()) - .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class)); - logs.clear(); - lbResponseObserver.onNext(buildInitialResponse()); - assertThat(logs).containsExactly( - "INFO: [grpclb-] Got an LB initial response: " - + buildInitialResponse()); - logs.clear(); - lbResponseObserver.onNext(buildLbResponse(backends1)); - - inOrder.verify(subchannelPool).takeOrCreateSubchannel( - eq(new EquivalentAddressGroup(backend1a.addr, LB_BACKEND_ATTRS)), - any(Attributes.class)); - inOrder.verify(subchannelPool).takeOrCreateSubchannel( - eq(new EquivalentAddressGroup(backend1b.addr, LB_BACKEND_ATTRS)), - any(Attributes.class)); - return backends1; - } - private void exitFallbackWithNewBackends(InOrder inOrder, StreamObserver lbResponseObserver, ServerEntry backend2a, ServerEntry backend2b, List backends2) { From a8f71286dac912ac9a2d5b47c93fd300320385a9 Mon Sep 17 00:00:00 2001 From: SreeramdasLavanya Date: Tue, 19 Nov 2024 10:50:05 +0000 Subject: [PATCH 10/17] grpclb: Refactored grpclbWorking_lbSendsFallbackMessage testcase --- .../grpc/grpclb/GrpclbLoadBalancerTest.java | 62 +++++++++---------- 1 file changed, 30 insertions(+), 32 deletions(-) diff --git a/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java b/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java index 2efd422edad..ae8614aebdc 100644 --- a/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java +++ b/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java @@ -945,13 +945,7 @@ public void grpclbWorking() { deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(CONNECTING)); connectingBalanceState(subchannel2, ConnectivityStateInfo.forNonError(CONNECTING), inOrder, CONNECTING, BUFFER_ENTRY); - inOrder.verifyNoMoreInteractions(); - - assertThat(logs).containsExactly( - "DEBUG: [grpclb-] Got an LB response: " - + buildLbResponse(backends1)) - .inOrder(); - logs.clear(); + inOrderVerifySubchannel(inOrder, backends1); // Let subchannels be connected connectSubchannels(inOrder, subchannel1, subchannel2); @@ -2458,6 +2452,9 @@ public void switchServiceName() throws Exception { .build())); } + ManagedChannel oobChannel; + StreamObserver lbResponseObserver; + StreamObserver lbRequestObserver; @Test public void grpclbWorking_lbSendsFallbackMessage() { InOrder inOrder = inOrder(helper, subchannelPool); @@ -2466,20 +2463,7 @@ public void grpclbWorking_lbSendsFallbackMessage() { deliverResolvedAddresses(backendList, grpclbBalancerList); // Fallback timer is started as soon as the addresses are resolved. - assertEquals(1, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER)); - verify(helper).createOobChannel(eq(xattr(grpclbBalancerList)), - eq(lbAuthority(0) + NO_USE_AUTHORITY_SUFFIX)); - assertEquals(1, fakeOobChannels.size()); - ManagedChannel oobChannel = fakeOobChannels.poll(); - verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); - StreamObserver lbResponseObserver = lbResponseObserverCaptor.getValue(); - assertEquals(1, lbRequestObservers.size()); - StreamObserver lbRequestObserver = lbRequestObservers.poll(); - verify(lbRequestObserver).onNext( - eq(LoadBalanceRequest.newBuilder() - .setInitialRequest( - InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) - .build())); + initializeSubchannel(grpclbBalancerList); // Simulate receiving LB response ServerEntry backend1a = new ServerEntry("127.0.0.1", 2000, "token0001"); @@ -2496,14 +2480,7 @@ public void grpclbWorking_lbSendsFallbackMessage() { deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(CONNECTING)); connectingBalanceState(subchannel2, ConnectivityStateInfo.forNonError(CONNECTING), inOrder, CONNECTING, BUFFER_ENTRY); - inOrder.verifyNoMoreInteractions(); - - assertThat(logs) - .containsExactly( - "DEBUG: [grpclb-] Got an LB response: " - + buildLbResponse(backends1)) - .inOrder(); - logs.clear(); + inOrderVerifySubchannel(inOrder, backends1); // Let subchannels be connected connectSubchannels(inOrder, subchannel1, subchannel2); @@ -2550,17 +2527,38 @@ public void grpclbWorking_lbSendsFallbackMessage() { deliverSubchannelState(subchannel3, ConnectivityStateInfo.forNonError(CONNECTING)); connectingBalanceState(subchannel4, ConnectivityStateInfo.forNonError(CONNECTING), inOrder, CONNECTING, BUFFER_ENTRY); + inOrderVerifySubchannel(inOrder, backends2); + + // Let new subchannels be connected + connectNewSubchannels(inOrder, subchannel3, subchannel4); + } + + private void inOrderVerifySubchannel(InOrder inOrder, List backends1) { inOrder.verifyNoMoreInteractions(); assertThat(logs) .containsExactly( "DEBUG: [grpclb-] Got an LB response: " - + buildLbResponse(backends2)) + + buildLbResponse(backends1)) .inOrder(); logs.clear(); + } - // Let new subchannels be connected - connectNewSubchannels(inOrder, subchannel3, subchannel4); + private void initializeSubchannel(List grpclbBalancerList) { + assertEquals(1, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER)); + verify(helper).createOobChannel(eq(xattr(grpclbBalancerList)), + eq(lbAuthority(0) + NO_USE_AUTHORITY_SUFFIX)); + assertEquals(1, fakeOobChannels.size()); + oobChannel = fakeOobChannels.poll(); + verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); + lbResponseObserver = lbResponseObserverCaptor.getValue(); + assertEquals(1, lbRequestObservers.size()); + lbRequestObserver = lbRequestObservers.poll(); + verify(lbRequestObserver).onNext( + eq(LoadBalanceRequest.newBuilder() + .setInitialRequest( + InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) + .build())); } private static void verifySubchannelRequestConnection(ServerEntry backend1a, From 79a229cc96d5435b6e69acc18ff9056cfea644a3 Mon Sep 17 00:00:00 2001 From: SreeramdasLavanya Date: Tue, 19 Nov 2024 11:06:58 +0000 Subject: [PATCH 11/17] grpclb: Resolved checkstyle issue --- grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java b/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java index ae8614aebdc..2ca66a39ceb 100644 --- a/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java +++ b/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java @@ -2455,6 +2455,7 @@ public void switchServiceName() throws Exception { ManagedChannel oobChannel; StreamObserver lbResponseObserver; StreamObserver lbRequestObserver; + @Test public void grpclbWorking_lbSendsFallbackMessage() { InOrder inOrder = inOrder(helper, subchannelPool); From af3d144f40b0a00dd6a2df8eb8bb7c58ad007f0b Mon Sep 17 00:00:00 2001 From: SreeramdasLavanya Date: Tue, 19 Nov 2024 13:40:13 +0000 Subject: [PATCH 12/17] grpclb: Refactored switchMode testcase --- .../grpc/grpclb/GrpclbLoadBalancerTest.java | 114 +++++++++--------- 1 file changed, 54 insertions(+), 60 deletions(-) diff --git a/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java b/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java index 2ca66a39ceb..b843d8b9a15 100644 --- a/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java +++ b/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java @@ -2205,6 +2205,10 @@ private void pickFirstModeFallback(long timeout) throws Exception { .returnSubchannel(any(Subchannel.class), any(ConnectivityStateInfo.class)); } + + Subchannel subchannel1; + Subchannel subchannel2; + @Test public void switchMode() throws Exception { InOrder inOrder = inOrder(helper); @@ -2215,16 +2219,7 @@ public void switchMode() throws Exception { grpclbBalancerList, GrpclbConfig.create(Mode.ROUND_ROBIN)); - assertEquals(1, fakeOobChannels.size()); - ManagedChannel oobChannel = fakeOobChannels.poll(); - verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); - StreamObserver lbResponseObserver = lbResponseObserverCaptor.getValue(); - assertEquals(1, lbRequestObservers.size()); - StreamObserver lbRequestObserver = lbRequestObservers.poll(); - verify(lbRequestObserver).onNext( - eq(LoadBalanceRequest.newBuilder().setInitialRequest( - InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) - .build())); + initializeChannel(); // Simulate receiving LB response List backends1 = Arrays.asList( @@ -2236,25 +2231,13 @@ public void switchMode() throws Exception { lbResponseObserver.onNext(buildLbResponse(backends1)); // ROUND_ROBIN: create one subchannel per server - verify(subchannelPool).takeOrCreateSubchannel( - eq(new EquivalentAddressGroup(backends1.get(0).addr, LB_BACKEND_ATTRS)), - any(Attributes.class)); - verify(subchannelPool).takeOrCreateSubchannel( - eq(new EquivalentAddressGroup(backends1.get(1).addr, LB_BACKEND_ATTRS)), - any(Attributes.class)); - inOrder.verify(helper).updateBalancingState(eq(CONNECTING), any(SubchannelPicker.class)); - assertEquals(2, mockSubchannels.size()); - Subchannel subchannel1 = mockSubchannels.poll(); - Subchannel subchannel2 = mockSubchannels.poll(); - verify(subchannelPool, never()) - .returnSubchannel(any(Subchannel.class), any(ConnectivityStateInfo.class)); + roundRobinCreateOneSubchannel(inOrder, backends1); // Switch to PICK_FIRST deliverResolvedAddresses( Collections.emptyList(), grpclbBalancerList, GrpclbConfig.create(Mode.PICK_FIRST)); - // GrpclbState will be shutdown, and a new one will be created assertThat(oobChannel.isShutdown()).isTrue(); verify(subchannelPool) @@ -2263,6 +2246,20 @@ public void switchMode() throws Exception { .returnSubchannel(same(subchannel2), eq(ConnectivityStateInfo.forNonError(IDLE))); // A new LB stream is created + createNewLbStream(lbResponseObserver, lbRequestObserver); + + // Simulate receiving LB response + inOrder.verify(helper, never()) + .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class)); + lbResponseObserver.onNext(buildInitialResponse()); + lbResponseObserver.onNext(buildLbResponse(backends1)); + + // PICK_FIRST Subchannel + pickFirstSubchannel(inOrder, backends1); + } + + private void createNewLbStream(StreamObserver lbResponseObserver, + StreamObserver lbRequestObserver) { assertEquals(1, fakeOobChannels.size()); verify(mockLbService, times(2)) .balanceLoad(lbResponseObserverCaptor.capture()); @@ -2273,14 +2270,9 @@ public void switchMode() throws Exception { eq(LoadBalanceRequest.newBuilder().setInitialRequest( InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) .build())); + } - // Simulate receiving LB response - inOrder.verify(helper, never()) - .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class)); - lbResponseObserver.onNext(buildInitialResponse()); - lbResponseObserver.onNext(buildLbResponse(backends1)); - - // PICK_FIRST Subchannel + private void pickFirstSubchannel(InOrder inOrder, List backends1) { inOrder.verify(helper).createSubchannel(createSubchannelArgsCaptor.capture()); CreateSubchannelArgs createSubchannelArgs = createSubchannelArgsCaptor.getValue(); assertThat(createSubchannelArgs.getAddresses()) @@ -2291,6 +2283,34 @@ public void switchMode() throws Exception { inOrder.verify(helper).updateBalancingState(eq(IDLE), any(SubchannelPicker.class)); } + private void roundRobinCreateOneSubchannel(InOrder inOrder, List backends1) { + verify(subchannelPool).takeOrCreateSubchannel( + eq(new EquivalentAddressGroup(backends1.get(0).addr, LB_BACKEND_ATTRS)), + any(Attributes.class)); + verify(subchannelPool).takeOrCreateSubchannel( + eq(new EquivalentAddressGroup(backends1.get(1).addr, LB_BACKEND_ATTRS)), + any(Attributes.class)); + inOrder.verify(helper).updateBalancingState(eq(CONNECTING), any(SubchannelPicker.class)); + assertEquals(2, mockSubchannels.size()); + subchannel1 = mockSubchannels.poll(); + subchannel2 = mockSubchannels.poll(); + verify(subchannelPool, never()) + .returnSubchannel(any(Subchannel.class), any(ConnectivityStateInfo.class)); + } + + private void initializeChannel() { + assertEquals(1, fakeOobChannels.size()); + oobChannel = fakeOobChannels.poll(); + verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); + lbResponseObserver = lbResponseObserverCaptor.getValue(); + assertEquals(1, lbRequestObservers.size()); + lbRequestObserver = lbRequestObservers.poll(); + verify(lbRequestObserver).onNext( + eq(LoadBalanceRequest.newBuilder().setInitialRequest( + InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) + .build())); + } + private static Attributes eagAttrsWithToken(String token) { return LB_BACKEND_ATTRS.toBuilder().set(GrpclbConstants.TOKEN_ATTRIBUTE_KEY, token).build(); } @@ -2352,16 +2372,7 @@ public void switchMode_nullLbPolicy() throws Exception { .returnSubchannel(same(subchannel2), eq(ConnectivityStateInfo.forNonError(IDLE))); // A new LB stream is created - assertEquals(1, fakeOobChannels.size()); - verify(mockLbService, times(2)) - .balanceLoad(lbResponseObserverCaptor.capture()); - lbResponseObserver = lbResponseObserverCaptor.getValue(); - assertEquals(1, lbRequestObservers.size()); - lbRequestObserver = lbRequestObservers.poll(); - verify(lbRequestObserver).onNext( - eq(LoadBalanceRequest.newBuilder().setInitialRequest( - InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) - .build())); + createNewLbStream(lbResponseObserver, lbRequestObserver); // Simulate receiving LB response inOrder.verify(helper, never()) @@ -2370,14 +2381,7 @@ public void switchMode_nullLbPolicy() throws Exception { lbResponseObserver.onNext(buildLbResponse(backends1)); // PICK_FIRST Subchannel - inOrder.verify(helper).createSubchannel(createSubchannelArgsCaptor.capture()); - CreateSubchannelArgs createSubchannelArgs = createSubchannelArgsCaptor.getValue(); - assertThat(createSubchannelArgs.getAddresses()) - .containsExactly( - new EquivalentAddressGroup(backends1.get(0).addr, eagAttrsWithToken("token0001")), - new EquivalentAddressGroup(backends1.get(1).addr, eagAttrsWithToken("token0002"))); - - inOrder.verify(helper).updateBalancingState(eq(IDLE), any(SubchannelPicker.class)); + pickFirstSubchannel(inOrder, backends1); } @Test @@ -2455,7 +2459,7 @@ public void switchServiceName() throws Exception { ManagedChannel oobChannel; StreamObserver lbResponseObserver; StreamObserver lbRequestObserver; - + @Test public void grpclbWorking_lbSendsFallbackMessage() { InOrder inOrder = inOrder(helper, subchannelPool); @@ -2549,17 +2553,7 @@ private void initializeSubchannel(List grpclbBalancerLis assertEquals(1, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER)); verify(helper).createOobChannel(eq(xattr(grpclbBalancerList)), eq(lbAuthority(0) + NO_USE_AUTHORITY_SUFFIX)); - assertEquals(1, fakeOobChannels.size()); - oobChannel = fakeOobChannels.poll(); - verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); - lbResponseObserver = lbResponseObserverCaptor.getValue(); - assertEquals(1, lbRequestObservers.size()); - lbRequestObserver = lbRequestObservers.poll(); - verify(lbRequestObserver).onNext( - eq(LoadBalanceRequest.newBuilder() - .setInitialRequest( - InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) - .build())); + initializeChannel(); } private static void verifySubchannelRequestConnection(ServerEntry backend1a, From b1f8fa8b4dab32e86c6409da0973c2719db80d88 Mon Sep 17 00:00:00 2001 From: SreeramdasLavanya Date: Tue, 19 Nov 2024 15:52:03 +0000 Subject: [PATCH 13/17] grpclb: Resolved compilation issue --- .../grpc/grpclb/GrpclbLoadBalancerTest.java | 34 +++++++++++-------- 1 file changed, 19 insertions(+), 15 deletions(-) diff --git a/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java b/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java index b843d8b9a15..78f0a10fd95 100644 --- a/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java +++ b/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java @@ -2246,20 +2246,6 @@ public void switchMode() throws Exception { .returnSubchannel(same(subchannel2), eq(ConnectivityStateInfo.forNonError(IDLE))); // A new LB stream is created - createNewLbStream(lbResponseObserver, lbRequestObserver); - - // Simulate receiving LB response - inOrder.verify(helper, never()) - .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class)); - lbResponseObserver.onNext(buildInitialResponse()); - lbResponseObserver.onNext(buildLbResponse(backends1)); - - // PICK_FIRST Subchannel - pickFirstSubchannel(inOrder, backends1); - } - - private void createNewLbStream(StreamObserver lbResponseObserver, - StreamObserver lbRequestObserver) { assertEquals(1, fakeOobChannels.size()); verify(mockLbService, times(2)) .balanceLoad(lbResponseObserverCaptor.capture()); @@ -2270,6 +2256,15 @@ private void createNewLbStream(StreamObserver lbResponseObs eq(LoadBalanceRequest.newBuilder().setInitialRequest( InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) .build())); + + // Simulate receiving LB response + inOrder.verify(helper, never()) + .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class)); + lbResponseObserver.onNext(buildInitialResponse()); + lbResponseObserver.onNext(buildLbResponse(backends1)); + + // PICK_FIRST Subchannel + pickFirstSubchannel(inOrder, backends1); } private void pickFirstSubchannel(InOrder inOrder, List backends1) { @@ -2372,7 +2367,16 @@ public void switchMode_nullLbPolicy() throws Exception { .returnSubchannel(same(subchannel2), eq(ConnectivityStateInfo.forNonError(IDLE))); // A new LB stream is created - createNewLbStream(lbResponseObserver, lbRequestObserver); + assertEquals(1, fakeOobChannels.size()); + verify(mockLbService, times(2)) + .balanceLoad(lbResponseObserverCaptor.capture()); + lbResponseObserver = lbResponseObserverCaptor.getValue(); + assertEquals(1, lbRequestObservers.size()); + lbRequestObserver = lbRequestObservers.poll(); + verify(lbRequestObserver).onNext( + eq(LoadBalanceRequest.newBuilder().setInitialRequest( + InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) + .build())); // Simulate receiving LB response inOrder.verify(helper, never()) From 5c7f722890cc6ef50e9d9d60437e56ca6ed8f157 Mon Sep 17 00:00:00 2001 From: SreeramdasLavanya Date: Wed, 20 Nov 2024 14:42:53 +0000 Subject: [PATCH 14/17] grpclb: Refactored grpclbWorking, grpclbBalancerStreamClosedAndRetried, grpclbFallback_allLost_failToFallback, switchMode and its related testcases --- .../grpc/grpclb/GrpclbLoadBalancerTest.java | 224 +++++++----------- 1 file changed, 80 insertions(+), 144 deletions(-) diff --git a/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java b/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java index 78f0a10fd95..e86a3daa014 100644 --- a/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java +++ b/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java @@ -895,6 +895,10 @@ public void grpclbUpdatedAddresses_reconnectOnAuthorityChange() { assertEquals(1, lbRequestObservers.size()); // No additional RPC } + List backends1 = Arrays.asList( + new ServerEntry("127.0.0.1", 2000, "token0001"), + new ServerEntry("127.0.0.1", 2010, "token0002")); + @Test public void grpclbWorking() { InOrder inOrder = inOrder(helper, subchannelPool); @@ -902,25 +906,9 @@ public void grpclbWorking() { deliverResolvedAddresses(Collections.emptyList(), grpclbBalancerList); // Fallback timer is started as soon as the addresses are resolved. - assertEquals(1, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER)); - - verify(helper).createOobChannel(eq(xattr(grpclbBalancerList)), - eq(lbAuthority(0) + NO_USE_AUTHORITY_SUFFIX)); - assertEquals(1, fakeOobChannels.size()); - ManagedChannel oobChannel = fakeOobChannels.poll(); - verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); - StreamObserver lbResponseObserver = lbResponseObserverCaptor.getValue(); - assertEquals(1, lbRequestObservers.size()); - StreamObserver lbRequestObserver = lbRequestObservers.poll(); - verify(lbRequestObserver).onNext( - eq(LoadBalanceRequest.newBuilder().setInitialRequest( - InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) - .build())); + initializeSubchannel(grpclbBalancerList); // Simulate receiving LB response - List backends1 = Arrays.asList( - new ServerEntry("127.0.0.1", 2000, "token0001"), - new ServerEntry("127.0.0.1", 2010, "token0002")); inOrder.verify(helper, never()) .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class)); logs.clear(); @@ -983,24 +971,7 @@ public void grpclbWorking() { assertThat(picker4.pickList).containsExactly(BUFFER_ENTRY); // Update backends, with a drop entry - List backends2 = - Arrays.asList( - new ServerEntry("127.0.0.1", 2030, "token0003"), // New address - new ServerEntry("token0003"), // drop - // Existing address with token changed - new ServerEntry("127.0.0.1", 2010, "token0004"), - // New address appearing second time - new ServerEntry("127.0.0.1", 2030, "token0005"), - new ServerEntry("token0006")); // drop - verify(subchannelPool, never()) - .returnSubchannel(same(subchannel1), any(ConnectivityStateInfo.class)); - - lbResponseObserver.onNext(buildLbResponse(backends2)); - assertThat(logs).containsExactly( - "DEBUG: [grpclb-] Got an LB response: " - + buildLbResponse(backends2)) - .inOrder(); - logs.clear(); + List backends2 = getServerEntries(subchannel1); // not in backends2, closed verify(subchannelPool).returnSubchannel(same(subchannel1), same(errorState1)); @@ -1073,6 +1044,42 @@ public void grpclbWorking() { .returnSubchannel(same(subchannel3), any(ConnectivityStateInfo.class)); // Update backends, with no entry + updateBackendWithNoEntry(inOrder, oobChannel, lbResponseObserver, lbRequestObserver, subchannel2, subchannel3); + + // Load reporting was not requested, thus never scheduled + assertEquals(0, fakeClock.numPendingTasks(LOAD_REPORTING_TASK_FILTER)); + + verify(subchannelPool, never()).clear(); + balancer.shutdown(); + verify(subchannelPool).clear(); + } + + private List getServerEntries(Subchannel subchannel1) { + List backends2 = + Arrays.asList( + new ServerEntry("127.0.0.1", 2030, "token0003"), // New address + new ServerEntry("token0003"), // drop + // Existing address with token changed + new ServerEntry("127.0.0.1", 2010, "token0004"), + // New address appearing second time + new ServerEntry("127.0.0.1", 2030, "token0005"), + new ServerEntry("token0006")); // drop + verify(subchannelPool, never()) + .returnSubchannel(same(subchannel1), any(ConnectivityStateInfo.class)); + + lbResponseObserver.onNext(buildLbResponse(backends2)); + assertThat(logs).containsExactly( + "DEBUG: [grpclb-] Got an LB response: " + + buildLbResponse(backends2)) + .inOrder(); + logs.clear(); + return backends2; + } + + private void updateBackendWithNoEntry(InOrder inOrder, ManagedChannel oobChannel, + StreamObserver lbResponseObserver, + StreamObserver lbRequestObserver, Subchannel subchannel2, + Subchannel subchannel3) { lbResponseObserver.onNext(buildLbResponse(Collections.emptyList())); verify(subchannelPool) .returnSubchannel(same(subchannel2), eq(ConnectivityStateInfo.forNonError(READY))); @@ -1088,13 +1095,6 @@ public void grpclbWorking() { assertEquals(0, lbRequestObservers.size()); verify(lbRequestObserver, never()).onCompleted(); verify(lbRequestObserver, never()).onError(any(Throwable.class)); - - // Load reporting was not requested, thus never scheduled - assertEquals(0, fakeClock.numPendingTasks(LOAD_REPORTING_TASK_FILTER)); - - verify(subchannelPool, never()).clear(); - balancer.shutdown(); - verify(subchannelPool).clear(); } @Test @@ -1106,9 +1106,6 @@ public void roundRobinMode_subchannelStayTransientFailureUntilReady() { StreamObserver lbResponseObserver = lbResponseObserverCaptor.getValue(); // Simulate receiving LB response - List backends1 = Arrays.asList( - new ServerEntry("127.0.0.1", 2000, "token0001"), - new ServerEntry("127.0.0.1", 2010, "token0002")); lbResponseObserver.onNext(buildInitialResponse()); lbResponseObserver.onNext(buildLbResponse(backends1)); assertEquals(2, mockSubchannels.size()); @@ -1183,16 +1180,8 @@ private void subtestGrpclbFallbackTimeout(boolean timerExpires, long timeout) { verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); StreamObserver lbResponseObserver = lbResponseObserverCaptor.getValue(); assertEquals(1, lbRequestObservers.size()); - StreamObserver lbRequestObserver = lbRequestObservers.poll(); - - verify(lbRequestObserver).onNext( - eq(LoadBalanceRequest.newBuilder().setInitialRequest( - InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) - .build())); - lbResponseObserver.onNext(buildInitialResponse(loadReportIntervalMillis)); - // We don't care if these methods have been run. - inOrder.verify(helper, atLeast(0)).getSynchronizationContext(); - inOrder.verify(helper, atLeast(0)).getScheduledExecutorService(); + StreamObserver lbRequestObserver = verifyLbRequestObserver( + loadReportIntervalMillis, inOrder, lbResponseObserver); inOrder.verifyNoMoreInteractions(); @@ -1329,16 +1318,8 @@ public void grpclbFallback_breakLbStreamBeforeFallbackTimerExpires() { verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); StreamObserver lbResponseObserver = lbResponseObserverCaptor.getValue(); assertThat(lbRequestObservers).hasSize(1); - StreamObserver lbRequestObserver = lbRequestObservers.poll(); - - verify(lbRequestObserver).onNext( - eq(LoadBalanceRequest.newBuilder().setInitialRequest( - InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) - .build())); - lbResponseObserver.onNext(buildInitialResponse(loadReportIntervalMillis)); - // We don't care if these methods have been run. - inOrder.verify(helper, atLeast(0)).getSynchronizationContext(); - inOrder.verify(helper, atLeast(0)).getScheduledExecutorService(); + verifyLbRequestObserver(loadReportIntervalMillis, inOrder, lbResponseObserver); + StreamObserver lbRequestObserver; inOrder.verifyNoMoreInteractions(); @@ -1491,21 +1472,9 @@ private void subtestGrpclbFallbackConnectionLost( eq(lbAuthority(0) + NO_USE_AUTHORITY_SUFFIX)); // Attempted to connect to balancer - assertEquals(1, fakeOobChannels.size()); - fakeOobChannels.poll(); - inOrder.verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); - StreamObserver lbResponseObserver = lbResponseObserverCaptor.getValue(); - assertEquals(1, lbRequestObservers.size()); - StreamObserver lbRequestObserver = lbRequestObservers.poll(); - - verify(lbRequestObserver).onNext( - eq(LoadBalanceRequest.newBuilder().setInitialRequest( - InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) - .build())); - lbResponseObserver.onNext(buildInitialResponse(loadReportIntervalMillis)); - // We don't care if these methods have been run. - inOrder.verify(helper, atLeast(0)).getSynchronizationContext(); - inOrder.verify(helper, atLeast(0)).getScheduledExecutorService(); + StreamObserver lbResponseObserver = getLoadBalanceResponseStreamObserver( + loadReportIntervalMillis, inOrder); + StreamObserver lbRequestObserver; inOrder.verifyNoMoreInteractions(); @@ -1581,21 +1550,8 @@ public void grpclbFallback_allLost_failToFallback() { eq(lbAuthority(0) + NO_USE_AUTHORITY_SUFFIX)); // Attempted to connect to balancer - assertEquals(1, fakeOobChannels.size()); - fakeOobChannels.poll(); - inOrder.verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); - StreamObserver lbResponseObserver = lbResponseObserverCaptor.getValue(); - assertEquals(1, lbRequestObservers.size()); - StreamObserver lbRequestObserver = lbRequestObservers.poll(); - - verify(lbRequestObserver).onNext( - eq(LoadBalanceRequest.newBuilder().setInitialRequest( - InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) - .build())); - lbResponseObserver.onNext(buildInitialResponse(loadReportIntervalMillis)); - // We don't care if these methods have been run. - inOrder.verify(helper, atLeast(0)).getSynchronizationContext(); - inOrder.verify(helper, atLeast(0)).getScheduledExecutorService(); + StreamObserver lbResponseObserver = getLoadBalanceResponseStreamObserver( + loadReportIntervalMillis, inOrder); inOrder.verifyNoMoreInteractions(); @@ -1629,6 +1585,32 @@ public void grpclbFallback_allLost_failToFallback() { balancerRequestFallbackErrorStatus(inOrder, error); } + private StreamObserver getLoadBalanceResponseStreamObserver( + long loadReportIntervalMillis, InOrder inOrder) { + assertEquals(1, fakeOobChannels.size()); + fakeOobChannels.poll(); + inOrder.verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); + StreamObserver lbResponseObserver = lbResponseObserverCaptor.getValue(); + assertEquals(1, lbRequestObservers.size()); + verifyLbRequestObserver(loadReportIntervalMillis, inOrder, lbResponseObserver); + return lbResponseObserver; + } + + private StreamObserver verifyLbRequestObserver(long loadReportIntervalMillis, InOrder inOrder, + StreamObserver lbResponseObserver) { + StreamObserver lbRequestObserver = lbRequestObservers.poll(); + + verify(lbRequestObserver).onNext( + eq(LoadBalanceRequest.newBuilder().setInitialRequest( + InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) + .build())); + lbResponseObserver.onNext(buildInitialResponse(loadReportIntervalMillis)); + // We don't care if these methods have been run. + inOrder.verify(helper, atLeast(0)).getSynchronizationContext(); + inOrder.verify(helper, atLeast(0)).getScheduledExecutorService(); + return lbRequestObserver; + } + private List fallbackTestVerifyUseOfFallbackBackendLists( InOrder inOrder, List addrs) { return fallbackTestVerifyUseOfBackendLists(inOrder, addrs, null); @@ -2039,9 +2021,6 @@ private Subchannel getOneSubchannel(RoundRobinPicker picker0) { private void simulateReceivingLbResponse(InOrder inOrder, StreamObserver lbResponseObserver) { // Simulate receiving LB response - List backends1 = Arrays.asList( - new ServerEntry("127.0.0.1", 2000, "token0001"), - new ServerEntry("127.0.0.1", 2010, "token0002")); inOrder.verify(helper, never()) .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class)); lbResponseObserver.onNext(buildInitialResponse()); @@ -2175,9 +2154,6 @@ private void pickFirstModeFallback(long timeout) throws Exception { // Finally, an LB response, which brings us out of fallback - List backends1 = Arrays.asList( - new ServerEntry("127.0.0.1", 2000, "token0001"), - new ServerEntry("127.0.0.1", 2010, "token0002")); inOrder.verify(helper, never()) .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class)); lbResponseObserver.onNext(buildInitialResponse()); @@ -2222,9 +2198,6 @@ public void switchMode() throws Exception { initializeChannel(); // Simulate receiving LB response - List backends1 = Arrays.asList( - new ServerEntry("127.0.0.1", 2000, "token0001"), - new ServerEntry("127.0.0.1", 2010, "token0002")); inOrder.verify(helper, never()) .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class)); lbResponseObserver.onNext(buildInitialResponse()); @@ -2319,39 +2292,16 @@ public void switchMode_nullLbPolicy() throws Exception { Collections.emptyList(), grpclbBalancerList); - assertEquals(1, fakeOobChannels.size()); - ManagedChannel oobChannel = fakeOobChannels.poll(); - verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); - StreamObserver lbResponseObserver = lbResponseObserverCaptor.getValue(); - assertEquals(1, lbRequestObservers.size()); - StreamObserver lbRequestObserver = lbRequestObservers.poll(); - verify(lbRequestObserver).onNext( - eq(LoadBalanceRequest.newBuilder().setInitialRequest( - InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) - .build())); + initializeChannel(); // Simulate receiving LB response - List backends1 = Arrays.asList( - new ServerEntry("127.0.0.1", 2000, "token0001"), - new ServerEntry("127.0.0.1", 2010, "token0002")); inOrder.verify(helper, never()) .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class)); lbResponseObserver.onNext(buildInitialResponse()); lbResponseObserver.onNext(buildLbResponse(backends1)); // ROUND_ROBIN: create one subchannel per server - verify(subchannelPool).takeOrCreateSubchannel( - eq(new EquivalentAddressGroup(backends1.get(0).addr, LB_BACKEND_ATTRS)), - any(Attributes.class)); - verify(subchannelPool).takeOrCreateSubchannel( - eq(new EquivalentAddressGroup(backends1.get(1).addr, LB_BACKEND_ATTRS)), - any(Attributes.class)); - inOrder.verify(helper).updateBalancingState(eq(CONNECTING), any(SubchannelPicker.class)); - assertEquals(2, mockSubchannels.size()); - Subchannel subchannel1 = mockSubchannels.poll(); - Subchannel subchannel2 = mockSubchannels.poll(); - verify(subchannelPool, never()) - .returnSubchannel(any(Subchannel.class), any(ConnectivityStateInfo.class)); + roundRobinCreateOneSubchannel(inOrder, backends1); // Switch to PICK_FIRST deliverResolvedAddresses( @@ -2412,27 +2362,13 @@ public void switchServiceName() throws Exception { .build())); // Simulate receiving LB response - List backends1 = Arrays.asList( - new ServerEntry("127.0.0.1", 2000, "token0001"), - new ServerEntry("127.0.0.1", 2010, "token0002")); inOrder.verify(helper, never()) .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class)); lbResponseObserver.onNext(buildInitialResponse()); lbResponseObserver.onNext(buildLbResponse(backends1)); // ROUND_ROBIN: create one subchannel per server - verify(subchannelPool).takeOrCreateSubchannel( - eq(new EquivalentAddressGroup(backends1.get(0).addr, LB_BACKEND_ATTRS)), - any(Attributes.class)); - verify(subchannelPool).takeOrCreateSubchannel( - eq(new EquivalentAddressGroup(backends1.get(1).addr, LB_BACKEND_ATTRS)), - any(Attributes.class)); - inOrder.verify(helper).updateBalancingState(eq(CONNECTING), any(SubchannelPicker.class)); - assertEquals(2, mockSubchannels.size()); - Subchannel subchannel1 = mockSubchannels.poll(); - Subchannel subchannel2 = mockSubchannels.poll(); - verify(subchannelPool, never()) - .returnSubchannel(any(Subchannel.class), any(ConnectivityStateInfo.class)); + roundRobinCreateOneSubchannel(inOrder, backends1); // Switch to different serviceName serviceName = "bar.google.com"; From cec8e4579dd104165182d801262a173c5b49e5ac Mon Sep 17 00:00:00 2001 From: SreeramdasLavanya Date: Wed, 20 Nov 2024 15:06:16 +0000 Subject: [PATCH 15/17] grpclb: Removed local variable and some checkstyle issues --- .../grpc/grpclb/GrpclbLoadBalancerTest.java | 47 +++++++++---------- 1 file changed, 23 insertions(+), 24 deletions(-) diff --git a/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java b/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java index e86a3daa014..90f6c8d9257 100644 --- a/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java +++ b/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java @@ -1044,7 +1044,8 @@ public void grpclbWorking() { .returnSubchannel(same(subchannel3), any(ConnectivityStateInfo.class)); // Update backends, with no entry - updateBackendWithNoEntry(inOrder, oobChannel, lbResponseObserver, lbRequestObserver, subchannel2, subchannel3); + updateBackendWithNoEntry(inOrder, oobChannel, lbResponseObserver, lbRequestObserver, + subchannel2, subchannel3); // Load reporting was not requested, thus never scheduled assertEquals(0, fakeClock.numPendingTasks(LOAD_REPORTING_TASK_FILTER)); @@ -1474,7 +1475,6 @@ private void subtestGrpclbFallbackConnectionLost( // Attempted to connect to balancer StreamObserver lbResponseObserver = getLoadBalanceResponseStreamObserver( loadReportIntervalMillis, inOrder); - StreamObserver lbRequestObserver; inOrder.verifyNoMoreInteractions(); @@ -1494,7 +1494,6 @@ private void subtestGrpclbFallbackConnectionLost( inOrder.verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); lbResponseObserver = lbResponseObserverCaptor.getValue(); assertEquals(1, lbRequestObservers.size()); - lbRequestObserver = lbRequestObservers.poll(); inOrder.verify(helper).refreshNameResolution(); } if (allSubchannelsBroken) { @@ -1585,6 +1584,25 @@ public void grpclbFallback_allLost_failToFallback() { balancerRequestFallbackErrorStatus(inOrder, error); } + private StreamObserver getLoadBalanceResponseStreamObserver() { + List grpclbBalancerList = createResolvedBalancerAddresses(1); + deliverResolvedAddresses( + Collections.emptyList(), + grpclbBalancerList, + GrpclbConfig.create(Mode.PICK_FIRST)); + + assertEquals(1, fakeOobChannels.size()); + verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); + StreamObserver lbResponseObserver = lbResponseObserverCaptor.getValue(); + assertEquals(1, lbRequestObservers.size()); + StreamObserver lbRequestObserver = lbRequestObservers.poll(); + verify(lbRequestObserver).onNext( + eq(LoadBalanceRequest.newBuilder().setInitialRequest( + InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) + .build())); + return lbResponseObserver; + } + private StreamObserver getLoadBalanceResponseStreamObserver( long loadReportIntervalMillis, InOrder inOrder) { assertEquals(1, fakeOobChannels.size()); @@ -1596,8 +1614,8 @@ private StreamObserver getLoadBalanceResponseStreamObserver return lbResponseObserver; } - private StreamObserver verifyLbRequestObserver(long loadReportIntervalMillis, InOrder inOrder, - StreamObserver lbResponseObserver) { + private StreamObserver verifyLbRequestObserver(long loadReportIntervalMillis, + InOrder inOrder, StreamObserver lbResponseObserver) { StreamObserver lbRequestObserver = lbRequestObservers.poll(); verify(lbRequestObserver).onNext( @@ -2057,25 +2075,6 @@ private List simulateReceivingLbResponse(InOrder inOrder, return backends1; } - private StreamObserver getLoadBalanceResponseStreamObserver() { - List grpclbBalancerList = createResolvedBalancerAddresses(1); - deliverResolvedAddresses( - Collections.emptyList(), - grpclbBalancerList, - GrpclbConfig.create(Mode.PICK_FIRST)); - - assertEquals(1, fakeOobChannels.size()); - verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); - StreamObserver lbResponseObserver = lbResponseObserverCaptor.getValue(); - assertEquals(1, lbRequestObservers.size()); - StreamObserver lbRequestObserver = lbRequestObservers.poll(); - verify(lbRequestObserver).onNext( - eq(LoadBalanceRequest.newBuilder().setInitialRequest( - InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) - .build())); - return lbResponseObserver; - } - @Test public void shutdownWithoutSubchannel_roundRobin() throws Exception { subtestShutdownWithoutSubchannel(GrpclbConfig.create(Mode.ROUND_ROBIN)); From e3160d873232cc488bdf0a073bbda1e6184934e0 Mon Sep 17 00:00:00 2001 From: SreeramdasLavanya Date: Fri, 22 Nov 2024 10:44:25 +0000 Subject: [PATCH 16/17] grpc-api/stub: Refactored loadReporting test method --- .../grpc/grpclb/GrpclbLoadBalancerTest.java | 329 ++++++++++-------- 1 file changed, 191 insertions(+), 138 deletions(-) diff --git a/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java b/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java index 78f0a10fd95..d475f5d383e 100644 --- a/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java +++ b/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java @@ -407,6 +407,14 @@ public void roundRobinPickerWithIdleEntry_andDrop() { verify(subchannel).requestConnection(); } + StreamObserver lbResponseObserver; + StreamObserver lbRequestObserver; + InOrder inOrder; + InOrder helperInOrder; + Subchannel subchannel1; + Subchannel subchannel2; + RoundRobinPicker picker; + @Test public void loadReporting() { Metadata headers = new Metadata(); @@ -418,20 +426,7 @@ public void loadReporting() { deliverResolvedAddresses(Collections.emptyList(), grpclbBalancerList); // Fallback timer is started as soon as address is resolved. - assertEquals(1, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER)); - - assertEquals(1, fakeOobChannels.size()); - verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); - StreamObserver lbResponseObserver = lbResponseObserverCaptor.getValue(); - assertEquals(1, lbRequestObservers.size()); - StreamObserver lbRequestObserver = lbRequestObservers.poll(); - InOrder inOrder = inOrder(lbRequestObserver); - InOrder helperInOrder = inOrder(helper, subchannelPool); - - inOrder.verify(lbRequestObserver).onNext( - eq(LoadBalanceRequest.newBuilder().setInitialRequest( - InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) - .build())); + startFallbackTimer(); // Simulate receiving LB response assertEquals(0, fakeClock.numPendingTasks(LOAD_REPORTING_TASK_FILTER)); @@ -447,107 +442,29 @@ public void loadReporting() { new ServerEntry("127.0.0.1", 2010, "token0002"), new ServerEntry("token0003")); // drop - lbResponseObserver.onNext(buildLbResponse(backends)); - - assertEquals(2, mockSubchannels.size()); - Subchannel subchannel1 = mockSubchannels.poll(); - Subchannel subchannel2 = mockSubchannels.poll(); - deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(CONNECTING)); - deliverSubchannelState(subchannel2, ConnectivityStateInfo.forNonError(CONNECTING)); - deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(READY)); - deliverSubchannelState(subchannel2, ConnectivityStateInfo.forNonError(READY)); - - helperInOrder.verify(helper, atLeast(1)) - .updateBalancingState(eq(READY), pickerCaptor.capture()); - RoundRobinPicker picker = (RoundRobinPicker) pickerCaptor.getValue(); - assertThat(picker.dropList).containsExactly( - null, - new DropEntry(getLoadRecorder(), "token0001"), - null, - new DropEntry(getLoadRecorder(), "token0003")).inOrder(); - assertThat(picker.pickList).containsExactly( - new BackendEntry(subchannel1, getLoadRecorder(), "token0001"), - new BackendEntry(subchannel2, getLoadRecorder(), "token0002")).inOrder(); + scheduleLoadReporting(backends); // Report, no data - assertNextReport( - inOrder, lbRequestObserver, loadReportIntervalMillis, - ClientStats.newBuilder().build()); - - PickResult pick1 = picker.pickSubchannel(args); - assertSame(subchannel1, pick1.getSubchannel()); - assertSame(getLoadRecorder(), pick1.getStreamTracerFactory()); + PickResult pick1 = getPickResult(loadReportIntervalMillis, + ClientStats.newBuilder(), args, subchannel1); // Merely the pick will not be recorded as upstart. - assertNextReport( - inOrder, lbRequestObserver, loadReportIntervalMillis, - ClientStats.newBuilder().build()); - - ClientStreamTracer tracer1 = - pick1.getStreamTracerFactory().newClientStreamTracer(STREAM_INFO, new Metadata()); - tracer1.streamCreated(Attributes.EMPTY, new Metadata()); - - PickResult pick2 = picker.pickSubchannel(args); - assertNull(pick2.getSubchannel()); - assertSame(DROP_PICK_RESULT, pick2); + ClientStreamTracer tracer1 = getClientStreamTracer(args, loadReportIntervalMillis, pick1); // Report includes upstart of pick1 and the drop of pick2 - assertNextReport( - inOrder, lbRequestObserver, loadReportIntervalMillis, - ClientStats.newBuilder() - .setNumCallsStarted(2) - .setNumCallsFinished(1) // pick2 - .addCallsFinishedWithDrop( - ClientStatsPerToken.newBuilder() - .setLoadBalanceToken("token0001") - .setNumCalls(1) // pick2 - .build()) - .build()); - - PickResult pick3 = picker.pickSubchannel(args); - assertSame(subchannel2, pick3.getSubchannel()); - assertSame(getLoadRecorder(), pick3.getStreamTracerFactory()); - ClientStreamTracer tracer3 = - pick3.getStreamTracerFactory().newClientStreamTracer(STREAM_INFO, new Metadata()); - tracer3.streamCreated(Attributes.EMPTY, new Metadata()); + ClientStreamTracer tracer3 = getClientStreamTracer(args, loadReportIntervalMillis); // pick3 has sent out headers tracer3.outboundHeaders(); // 3rd report includes pick3's upstart - assertNextReport( - inOrder, lbRequestObserver, loadReportIntervalMillis, - ClientStats.newBuilder() - .setNumCallsStarted(1) - .build()); - - PickResult pick4 = picker.pickSubchannel(args); - assertNull(pick4.getSubchannel()); - assertSame(DROP_PICK_RESULT, pick4); + pickReportUpstart(args, loadReportIntervalMillis); // pick1 ended without sending anything tracer1.streamClosed(Status.CANCELLED); // 4th report includes end of pick1 and drop of pick4 - assertNextReport( - inOrder, lbRequestObserver, loadReportIntervalMillis, - ClientStats.newBuilder() - .setNumCallsStarted(1) // pick4 - .setNumCallsFinished(2) - .setNumCallsFinishedWithClientFailedToSend(1) // pick1 - .addCallsFinishedWithDrop( - ClientStatsPerToken.newBuilder() - .setLoadBalanceToken("token0003") - .setNumCalls(1) // pick4 - .build()) - .build()); - - PickResult pick5 = picker.pickSubchannel(args); - assertSame(subchannel1, pick1.getSubchannel()); - assertSame(getLoadRecorder(), pick5.getStreamTracerFactory()); - ClientStreamTracer tracer5 = - pick5.getStreamTracerFactory().newClientStreamTracer(STREAM_INFO, new Metadata()); - tracer5.streamCreated(Attributes.EMPTY, new Metadata()); + ClientStreamTracer tracer5 = getStreamTracer(args, loadReportIntervalMillis, pick1); // pick3 ended without receiving response headers tracer3.streamClosed(Status.DEADLINE_EXCEEDED); @@ -557,40 +474,16 @@ public void loadReporting() { tracer5.inboundHeaders(); // 5th report includes pick3's end and pick5's upstart - assertNextReport( - inOrder, lbRequestObserver, loadReportIntervalMillis, - ClientStats.newBuilder() - .setNumCallsStarted(1) // pick5 - .setNumCallsFinished(1) // pick3 - .build()); + upstartPick5(loadReportIntervalMillis); // pick5 ends tracer5.streamClosed(Status.OK); // 6th report includes pick5's end - assertNextReport( - inOrder, lbRequestObserver, loadReportIntervalMillis, - ClientStats.newBuilder() - .setNumCallsFinished(1) - .setNumCallsFinishedKnownReceived(1) - .build()); - - assertEquals(1, fakeClock.numPendingTasks()); - // Balancer closes the stream, scheduled reporting task cancelled - lbResponseObserver.onError(Status.UNAVAILABLE.asException()); - assertEquals(0, fakeClock.numPendingTasks()); + includePick5(loadReportIntervalMillis); // New stream created - verify(mockLbService, times(2)).balanceLoad(lbResponseObserverCaptor.capture()); - lbResponseObserver = lbResponseObserverCaptor.getValue(); - assertEquals(1, lbRequestObservers.size()); - lbRequestObserver = lbRequestObservers.poll(); - inOrder = inOrder(lbRequestObserver); - - inOrder.verify(lbRequestObserver).onNext( - eq(LoadBalanceRequest.newBuilder().setInitialRequest( - InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) - .build())); + createNewStream(); // Load reporting is also requested lbResponseObserver.onNext(buildInitialResponse(loadReportIntervalMillis)); @@ -601,12 +494,7 @@ public void loadReporting() { // Make a new pick on that picker. It will not show up on the report of the new stream, because // that picker is associated with the previous stream. - PickResult pick6 = picker.pickSubchannel(args); - assertNull(pick6.getSubchannel()); - assertSame(DROP_PICK_RESULT, pick6); - assertNextReport( - inOrder, lbRequestObserver, loadReportIntervalMillis, - ClientStats.newBuilder().build()); + newPicker(args, loadReportIntervalMillis); // New stream got the list update lbResponseObserver.onNext(buildLbResponse(backends)); @@ -616,6 +504,28 @@ public void loadReporting() { any(EquivalentAddressGroup.class), any(Attributes.class)); // But the new RoundRobinEntries have a new loadRecorder, thus considered different from // the previous list, thus a new picker is created + newRoundRobinEntries(args, loadReportIntervalMillis); + } + + private void upstartPick5(long loadReportIntervalMillis) { + assertNextReport( + inOrder, lbRequestObserver, loadReportIntervalMillis, + ClientStats.newBuilder() + .setNumCallsStarted(1) // pick5 + .setNumCallsFinished(1) // pick3 + .build()); + } + + private void newPicker(PickSubchannelArgs args, long loadReportIntervalMillis) { + PickResult pick6 = picker.pickSubchannel(args); + assertNull(pick6.getSubchannel()); + assertSame(DROP_PICK_RESULT, pick6); + assertNextReport( + inOrder, lbRequestObserver, loadReportIntervalMillis, + ClientStats.newBuilder().build()); + } + + private void newRoundRobinEntries(PickSubchannelArgs args, long loadReportIntervalMillis) { helperInOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture()); picker = (RoundRobinPicker) pickerCaptor.getValue(); @@ -635,6 +545,155 @@ public void loadReporting() { verifyNoMoreInteractions(args); } + private void createNewStream() { + verify(mockLbService, times(2)).balanceLoad(lbResponseObserverCaptor.capture()); + lbResponseObserver = lbResponseObserverCaptor.getValue(); + assertEquals(1, lbRequestObservers.size()); + lbRequestObserver = lbRequestObservers.poll(); + inOrder = inOrder(lbRequestObserver); + + inOrder.verify(lbRequestObserver).onNext( + eq(LoadBalanceRequest.newBuilder().setInitialRequest( + InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) + .build())); + } + + private void includePick5(long loadReportIntervalMillis) { + assertNextReport( + inOrder, lbRequestObserver, loadReportIntervalMillis, + ClientStats.newBuilder() + .setNumCallsFinished(1) + .setNumCallsFinishedKnownReceived(1) + .build()); + + assertEquals(1, fakeClock.numPendingTasks()); + // Balancer closes the stream, scheduled reporting task cancelled + lbResponseObserver.onError(Status.UNAVAILABLE.asException()); + assertEquals(0, fakeClock.numPendingTasks()); + } + + private ClientStreamTracer getStreamTracer(PickSubchannelArgs args, long loadReportIntervalMillis, + PickResult pick1) { + assertNextReport( + inOrder, lbRequestObserver, loadReportIntervalMillis, + ClientStats.newBuilder() + .setNumCallsStarted(1) // pick4 + .setNumCallsFinished(2) + .setNumCallsFinishedWithClientFailedToSend(1) // pick1 + .addCallsFinishedWithDrop( + ClientStatsPerToken.newBuilder() + .setLoadBalanceToken("token0003") + .setNumCalls(1) // pick4 + .build()) + .build()); + + PickResult pick5 = picker.pickSubchannel(args); + assertSame(subchannel1, pick1.getSubchannel()); + assertSame(getLoadRecorder(), pick5.getStreamTracerFactory()); + ClientStreamTracer tracer5 = + pick5.getStreamTracerFactory().newClientStreamTracer(STREAM_INFO, new Metadata()); + tracer5.streamCreated(Attributes.EMPTY, new Metadata()); + return tracer5; + } + + private void pickReportUpstart(PickSubchannelArgs args, long loadReportIntervalMillis) { + assertNextReport( + inOrder, lbRequestObserver, loadReportIntervalMillis, + ClientStats.newBuilder() + .setNumCallsStarted(1) + .build()); + + PickResult pick4 = picker.pickSubchannel(args); + assertNull(pick4.getSubchannel()); + assertSame(DROP_PICK_RESULT, pick4); + } + + private ClientStreamTracer getClientStreamTracer(PickSubchannelArgs args, + long loadReportIntervalMillis, PickResult pick1) { + assertNextReport( + inOrder, lbRequestObserver, loadReportIntervalMillis, + ClientStats.newBuilder().build()); + + ClientStreamTracer tracer1 = + pick1.getStreamTracerFactory().newClientStreamTracer(STREAM_INFO, new Metadata()); + tracer1.streamCreated(Attributes.EMPTY, new Metadata()); + + PickResult pick2 = picker.pickSubchannel(args); + assertNull(pick2.getSubchannel()); + assertSame(DROP_PICK_RESULT, pick2); + return tracer1; + } + + private ClientStreamTracer getClientStreamTracer(PickSubchannelArgs args, + long loadReportIntervalMillis) { + PickResult pick3 = getPickResult(loadReportIntervalMillis, + ClientStats.newBuilder() + .setNumCallsStarted(2) + .setNumCallsFinished(1) // pick2 + .addCallsFinishedWithDrop( + ClientStatsPerToken.newBuilder() + .setLoadBalanceToken("token0001") + .setNumCalls(1) // pick2 + .build()), args, subchannel2); + ClientStreamTracer tracer3 = + pick3.getStreamTracerFactory().newClientStreamTracer(STREAM_INFO, new Metadata()); + tracer3.streamCreated(Attributes.EMPTY, new Metadata()); + return tracer3; + } + + private PickResult getPickResult(long loadReportIntervalMillis, ClientStats.Builder newBuilder, + PickSubchannelArgs args, Subchannel subchannel1) { + assertNextReport( + inOrder, lbRequestObserver, loadReportIntervalMillis, + newBuilder.build()); + + PickResult pick1 = picker.pickSubchannel(args); + assertSame(subchannel1, pick1.getSubchannel()); + assertSame(getLoadRecorder(), pick1.getStreamTracerFactory()); + return pick1; + } + + private void scheduleLoadReporting(List backends) { + lbResponseObserver.onNext(buildLbResponse(backends)); + + assertEquals(2, mockSubchannels.size()); + subchannel1 = mockSubchannels.poll(); + subchannel2 = mockSubchannels.poll(); + deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(CONNECTING)); + deliverSubchannelState(subchannel2, ConnectivityStateInfo.forNonError(CONNECTING)); + deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(READY)); + deliverSubchannelState(subchannel2, ConnectivityStateInfo.forNonError(READY)); + + helperInOrder.verify(helper, atLeast(1)) + .updateBalancingState(eq(READY), pickerCaptor.capture()); + picker = (RoundRobinPicker) pickerCaptor.getValue(); + assertThat(picker.dropList).containsExactly( + null, + new DropEntry(getLoadRecorder(), "token0001"), + null, + new DropEntry(getLoadRecorder(), "token0003")).inOrder(); + assertThat(picker.pickList).containsExactly( + new BackendEntry(subchannel1, getLoadRecorder(), "token0001"), + new BackendEntry(subchannel2, getLoadRecorder(), "token0002")).inOrder(); + } + + private void startFallbackTimer() { + assertEquals(1, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER)); + + assertEquals(1, fakeOobChannels.size()); + verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); + lbResponseObserver = lbResponseObserverCaptor.getValue(); + assertEquals(1, lbRequestObservers.size()); + lbRequestObserver = lbRequestObservers.poll(); + inOrder = inOrder(lbRequestObserver); + helperInOrder = inOrder(helper, subchannelPool); + + inOrder.verify(lbRequestObserver).onNext( + eq(LoadBalanceRequest.newBuilder().setInitialRequest( + InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) + .build())); + } + @Test public void abundantInitialResponse() { List grpclbBalancerList = createResolvedBalancerAddresses(1); @@ -2205,10 +2264,6 @@ private void pickFirstModeFallback(long timeout) throws Exception { .returnSubchannel(any(Subchannel.class), any(ConnectivityStateInfo.class)); } - - Subchannel subchannel1; - Subchannel subchannel2; - @Test public void switchMode() throws Exception { InOrder inOrder = inOrder(helper); @@ -2461,8 +2516,6 @@ public void switchServiceName() throws Exception { } ManagedChannel oobChannel; - StreamObserver lbResponseObserver; - StreamObserver lbRequestObserver; @Test public void grpclbWorking_lbSendsFallbackMessage() { From e845fa5b8765220e4b154515026e80fbfbfebb26 Mon Sep 17 00:00:00 2001 From: SreeramdasLavanya Date: Fri, 22 Nov 2024 11:06:03 +0000 Subject: [PATCH 17/17] grpclb: Resolved checkstyle issue --- .../src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java b/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java index cd2b5dc735e..a5f0f6bb9dd 100644 --- a/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java +++ b/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java @@ -546,7 +546,8 @@ private void newRoundRobinEntries(PickSubchannelArgs args, long loadReportInterv } private void createNewStream() { - verify(mockLbService, times(2)).balanceLoad(lbResponseObserverCaptor.capture()); + verify(mockLbService, times(2)) + .balanceLoad(lbResponseObserverCaptor.capture()); lbResponseObserver = lbResponseObserverCaptor.getValue(); assertEquals(1, lbRequestObservers.size()); lbRequestObserver = lbRequestObservers.poll();