diff --git a/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java b/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java index e489129676a..a5f0f6bb9dd 100644 --- a/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java +++ b/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java @@ -76,6 +76,7 @@ import io.grpc.grpclb.GrpclbState.ErrorEntry; import io.grpc.grpclb.GrpclbState.IdleSubchannelEntry; import io.grpc.grpclb.GrpclbState.Mode; +import io.grpc.grpclb.GrpclbState.RoundRobinEntry; import io.grpc.grpclb.GrpclbState.RoundRobinPicker; import io.grpc.inprocess.InProcessChannelBuilder; import io.grpc.inprocess.InProcessServerBuilder; @@ -161,16 +162,16 @@ public boolean shouldAccept(Runnable command) { delegatesTo(new CachedSubchannelPool(helper))); private final ArrayList logs = new ArrayList<>(); private final ChannelLogger channelLogger = new ChannelLogger() { - @Override - public void log(ChannelLogLevel level, String msg) { - logs.add(level + ": " + msg); - } + @Override + public void log(ChannelLogLevel level, String msg) { + logs.add(level + ": " + msg); + } - @Override - public void log(ChannelLogLevel level, String template, Object... args) { - log(level, MessageFormat.format(template, args)); - } - }; + @Override + public void log(ChannelLogLevel level, String template, Object... args) { + log(level, MessageFormat.format(template, args)); + } + }; private SubchannelPicker currentPicker; private LoadBalancerGrpc.LoadBalancerImplBase mockLbService; @Captor @@ -216,12 +217,12 @@ public StreamObserver balanceLoad( StreamObserver requestObserver = mock(StreamObserver.class); Answer closeRpc = new Answer() { - @Override - public Void answer(InvocationOnMock invocation) { - responseObserver.onCompleted(); - return null; - } - }; + @Override + public Void answer(InvocationOnMock invocation) { + responseObserver.onCompleted(); + return null; + } + }; doAnswer(closeRpc).when(requestObserver).onCompleted(); lbRequestObservers.add(requestObserver); return requestObserver; @@ -246,11 +247,11 @@ public void tearDown() { try { if (balancer != null) { syncContext.execute(new Runnable() { - @Override - public void run() { - balancer.shutdown(); - } - }); + @Override + public void run() { + balancer.shutdown(); + } + }); } for (ManagedChannel channel : oobChannelTracker) { assertTrue(channel + " is shutdown", channel.isShutdown()); @@ -406,6 +407,14 @@ public void roundRobinPickerWithIdleEntry_andDrop() { verify(subchannel).requestConnection(); } + StreamObserver lbResponseObserver; + StreamObserver lbRequestObserver; + InOrder inOrder; + InOrder helperInOrder; + Subchannel subchannel1; + Subchannel subchannel2; + RoundRobinPicker picker; + @Test public void loadReporting() { Metadata headers = new Metadata(); @@ -417,20 +426,7 @@ public void loadReporting() { deliverResolvedAddresses(Collections.emptyList(), grpclbBalancerList); // Fallback timer is started as soon as address is resolved. - assertEquals(1, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER)); - - assertEquals(1, fakeOobChannels.size()); - verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); - StreamObserver lbResponseObserver = lbResponseObserverCaptor.getValue(); - assertEquals(1, lbRequestObservers.size()); - StreamObserver lbRequestObserver = lbRequestObservers.poll(); - InOrder inOrder = inOrder(lbRequestObserver); - InOrder helperInOrder = inOrder(helper, subchannelPool); - - inOrder.verify(lbRequestObserver).onNext( - eq(LoadBalanceRequest.newBuilder().setInitialRequest( - InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) - .build())); + startFallbackTimer(); // Simulate receiving LB response assertEquals(0, fakeClock.numPendingTasks(LOAD_REPORTING_TASK_FILTER)); @@ -446,107 +442,29 @@ public void loadReporting() { new ServerEntry("127.0.0.1", 2010, "token0002"), new ServerEntry("token0003")); // drop - lbResponseObserver.onNext(buildLbResponse(backends)); - - assertEquals(2, mockSubchannels.size()); - Subchannel subchannel1 = mockSubchannels.poll(); - Subchannel subchannel2 = mockSubchannels.poll(); - deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(CONNECTING)); - deliverSubchannelState(subchannel2, ConnectivityStateInfo.forNonError(CONNECTING)); - deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(READY)); - deliverSubchannelState(subchannel2, ConnectivityStateInfo.forNonError(READY)); - - helperInOrder.verify(helper, atLeast(1)) - .updateBalancingState(eq(READY), pickerCaptor.capture()); - RoundRobinPicker picker = (RoundRobinPicker) pickerCaptor.getValue(); - assertThat(picker.dropList).containsExactly( - null, - new DropEntry(getLoadRecorder(), "token0001"), - null, - new DropEntry(getLoadRecorder(), "token0003")).inOrder(); - assertThat(picker.pickList).containsExactly( - new BackendEntry(subchannel1, getLoadRecorder(), "token0001"), - new BackendEntry(subchannel2, getLoadRecorder(), "token0002")).inOrder(); + scheduleLoadReporting(backends); // Report, no data - assertNextReport( - inOrder, lbRequestObserver, loadReportIntervalMillis, - ClientStats.newBuilder().build()); - - PickResult pick1 = picker.pickSubchannel(args); - assertSame(subchannel1, pick1.getSubchannel()); - assertSame(getLoadRecorder(), pick1.getStreamTracerFactory()); + PickResult pick1 = getPickResult(loadReportIntervalMillis, + ClientStats.newBuilder(), args, subchannel1); // Merely the pick will not be recorded as upstart. - assertNextReport( - inOrder, lbRequestObserver, loadReportIntervalMillis, - ClientStats.newBuilder().build()); - - ClientStreamTracer tracer1 = - pick1.getStreamTracerFactory().newClientStreamTracer(STREAM_INFO, new Metadata()); - tracer1.streamCreated(Attributes.EMPTY, new Metadata()); - - PickResult pick2 = picker.pickSubchannel(args); - assertNull(pick2.getSubchannel()); - assertSame(DROP_PICK_RESULT, pick2); + ClientStreamTracer tracer1 = getClientStreamTracer(args, loadReportIntervalMillis, pick1); // Report includes upstart of pick1 and the drop of pick2 - assertNextReport( - inOrder, lbRequestObserver, loadReportIntervalMillis, - ClientStats.newBuilder() - .setNumCallsStarted(2) - .setNumCallsFinished(1) // pick2 - .addCallsFinishedWithDrop( - ClientStatsPerToken.newBuilder() - .setLoadBalanceToken("token0001") - .setNumCalls(1) // pick2 - .build()) - .build()); - - PickResult pick3 = picker.pickSubchannel(args); - assertSame(subchannel2, pick3.getSubchannel()); - assertSame(getLoadRecorder(), pick3.getStreamTracerFactory()); - ClientStreamTracer tracer3 = - pick3.getStreamTracerFactory().newClientStreamTracer(STREAM_INFO, new Metadata()); - tracer3.streamCreated(Attributes.EMPTY, new Metadata()); + ClientStreamTracer tracer3 = getClientStreamTracer(args, loadReportIntervalMillis); // pick3 has sent out headers tracer3.outboundHeaders(); // 3rd report includes pick3's upstart - assertNextReport( - inOrder, lbRequestObserver, loadReportIntervalMillis, - ClientStats.newBuilder() - .setNumCallsStarted(1) - .build()); - - PickResult pick4 = picker.pickSubchannel(args); - assertNull(pick4.getSubchannel()); - assertSame(DROP_PICK_RESULT, pick4); + pickReportUpstart(args, loadReportIntervalMillis); // pick1 ended without sending anything tracer1.streamClosed(Status.CANCELLED); // 4th report includes end of pick1 and drop of pick4 - assertNextReport( - inOrder, lbRequestObserver, loadReportIntervalMillis, - ClientStats.newBuilder() - .setNumCallsStarted(1) // pick4 - .setNumCallsFinished(2) - .setNumCallsFinishedWithClientFailedToSend(1) // pick1 - .addCallsFinishedWithDrop( - ClientStatsPerToken.newBuilder() - .setLoadBalanceToken("token0003") - .setNumCalls(1) // pick4 - .build()) - .build()); - - PickResult pick5 = picker.pickSubchannel(args); - assertSame(subchannel1, pick1.getSubchannel()); - assertSame(getLoadRecorder(), pick5.getStreamTracerFactory()); - ClientStreamTracer tracer5 = - pick5.getStreamTracerFactory().newClientStreamTracer(STREAM_INFO, new Metadata()); - tracer5.streamCreated(Attributes.EMPTY, new Metadata()); + ClientStreamTracer tracer5 = getStreamTracer(args, loadReportIntervalMillis, pick1); // pick3 ended without receiving response headers tracer3.streamClosed(Status.DEADLINE_EXCEEDED); @@ -556,40 +474,16 @@ public void loadReporting() { tracer5.inboundHeaders(); // 5th report includes pick3's end and pick5's upstart - assertNextReport( - inOrder, lbRequestObserver, loadReportIntervalMillis, - ClientStats.newBuilder() - .setNumCallsStarted(1) // pick5 - .setNumCallsFinished(1) // pick3 - .build()); + upstartPick5(loadReportIntervalMillis); // pick5 ends tracer5.streamClosed(Status.OK); // 6th report includes pick5's end - assertNextReport( - inOrder, lbRequestObserver, loadReportIntervalMillis, - ClientStats.newBuilder() - .setNumCallsFinished(1) - .setNumCallsFinishedKnownReceived(1) - .build()); - - assertEquals(1, fakeClock.numPendingTasks()); - // Balancer closes the stream, scheduled reporting task cancelled - lbResponseObserver.onError(Status.UNAVAILABLE.asException()); - assertEquals(0, fakeClock.numPendingTasks()); + includePick5(loadReportIntervalMillis); // New stream created - verify(mockLbService, times(2)).balanceLoad(lbResponseObserverCaptor.capture()); - lbResponseObserver = lbResponseObserverCaptor.getValue(); - assertEquals(1, lbRequestObservers.size()); - lbRequestObserver = lbRequestObservers.poll(); - inOrder = inOrder(lbRequestObserver); - - inOrder.verify(lbRequestObserver).onNext( - eq(LoadBalanceRequest.newBuilder().setInitialRequest( - InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) - .build())); + createNewStream(); // Load reporting is also requested lbResponseObserver.onNext(buildInitialResponse(loadReportIntervalMillis)); @@ -600,12 +494,7 @@ public void loadReporting() { // Make a new pick on that picker. It will not show up on the report of the new stream, because // that picker is associated with the previous stream. - PickResult pick6 = picker.pickSubchannel(args); - assertNull(pick6.getSubchannel()); - assertSame(DROP_PICK_RESULT, pick6); - assertNextReport( - inOrder, lbRequestObserver, loadReportIntervalMillis, - ClientStats.newBuilder().build()); + newPicker(args, loadReportIntervalMillis); // New stream got the list update lbResponseObserver.onNext(buildLbResponse(backends)); @@ -615,6 +504,28 @@ public void loadReporting() { any(EquivalentAddressGroup.class), any(Attributes.class)); // But the new RoundRobinEntries have a new loadRecorder, thus considered different from // the previous list, thus a new picker is created + newRoundRobinEntries(args, loadReportIntervalMillis); + } + + private void upstartPick5(long loadReportIntervalMillis) { + assertNextReport( + inOrder, lbRequestObserver, loadReportIntervalMillis, + ClientStats.newBuilder() + .setNumCallsStarted(1) // pick5 + .setNumCallsFinished(1) // pick3 + .build()); + } + + private void newPicker(PickSubchannelArgs args, long loadReportIntervalMillis) { + PickResult pick6 = picker.pickSubchannel(args); + assertNull(pick6.getSubchannel()); + assertSame(DROP_PICK_RESULT, pick6); + assertNextReport( + inOrder, lbRequestObserver, loadReportIntervalMillis, + ClientStats.newBuilder().build()); + } + + private void newRoundRobinEntries(PickSubchannelArgs args, long loadReportIntervalMillis) { helperInOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture()); picker = (RoundRobinPicker) pickerCaptor.getValue(); @@ -634,71 +545,221 @@ public void loadReporting() { verifyNoMoreInteractions(args); } - @Test - public void abundantInitialResponse() { - List grpclbBalancerList = createResolvedBalancerAddresses(1); - deliverResolvedAddresses(Collections.emptyList(), grpclbBalancerList); - assertEquals(1, fakeOobChannels.size()); - verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); - StreamObserver lbResponseObserver = lbResponseObserverCaptor.getValue(); + private void createNewStream() { + verify(mockLbService, times(2)) + .balanceLoad(lbResponseObserverCaptor.capture()); + lbResponseObserver = lbResponseObserverCaptor.getValue(); + assertEquals(1, lbRequestObservers.size()); + lbRequestObserver = lbRequestObservers.poll(); + inOrder = inOrder(lbRequestObserver); - // Simulate LB initial response - assertEquals(0, fakeClock.numPendingTasks(LOAD_REPORTING_TASK_FILTER)); - lbResponseObserver.onNext(buildInitialResponse(1983)); + inOrder.verify(lbRequestObserver).onNext( + eq(LoadBalanceRequest.newBuilder().setInitialRequest( + InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) + .build())); + } - // Load reporting task is scheduled - assertEquals(1, fakeClock.numPendingTasks(LOAD_REPORTING_TASK_FILTER)); - FakeClock.ScheduledTask scheduledTask = - Iterables.getOnlyElement(fakeClock.getPendingTasks(LOAD_REPORTING_TASK_FILTER)); - assertEquals(1983, scheduledTask.getDelay(TimeUnit.MILLISECONDS)); + private void includePick5(long loadReportIntervalMillis) { + assertNextReport( + inOrder, lbRequestObserver, loadReportIntervalMillis, + ClientStats.newBuilder() + .setNumCallsFinished(1) + .setNumCallsFinishedKnownReceived(1) + .build()); - logs.clear(); - // Simulate an abundant LB initial response, with a different report interval - lbResponseObserver.onNext(buildInitialResponse(9097)); + assertEquals(1, fakeClock.numPendingTasks()); + // Balancer closes the stream, scheduled reporting task cancelled + lbResponseObserver.onError(Status.UNAVAILABLE.asException()); + assertEquals(0, fakeClock.numPendingTasks()); + } - // This incident is logged - assertThat(logs).containsExactly( - "DEBUG: [grpclb-] Got an LB response: " + buildInitialResponse(9097), - "WARNING: [grpclb-] " - + "Ignoring unexpected response type: INITIAL_RESPONSE") - .inOrder(); + private ClientStreamTracer getStreamTracer(PickSubchannelArgs args, long loadReportIntervalMillis, + PickResult pick1) { + assertNextReport( + inOrder, lbRequestObserver, loadReportIntervalMillis, + ClientStats.newBuilder() + .setNumCallsStarted(1) // pick4 + .setNumCallsFinished(2) + .setNumCallsFinishedWithClientFailedToSend(1) // pick1 + .addCallsFinishedWithDrop( + ClientStatsPerToken.newBuilder() + .setLoadBalanceToken("token0003") + .setNumCalls(1) // pick4 + .build()) + .build()); - // It doesn't affect load-reporting at all - assertThat(fakeClock.getPendingTasks(LOAD_REPORTING_TASK_FILTER)) - .containsExactly(scheduledTask); - assertEquals(1983, scheduledTask.getDelay(TimeUnit.MILLISECONDS)); + PickResult pick5 = picker.pickSubchannel(args); + assertSame(subchannel1, pick1.getSubchannel()); + assertSame(getLoadRecorder(), pick5.getStreamTracerFactory()); + ClientStreamTracer tracer5 = + pick5.getStreamTracerFactory().newClientStreamTracer(STREAM_INFO, new Metadata()); + tracer5.streamCreated(Attributes.EMPTY, new Metadata()); + return tracer5; } - @SuppressWarnings("unchecked") - @Test - public void raceBetweenHandleAddressesAndLbStreamClosure() { - InOrder inOrder = inOrder(mockLbService, backoffPolicyProvider, backoffPolicy1); - deliverResolvedAddresses(Collections.emptyList(), - createResolvedBalancerAddresses(1)); - assertEquals(1, fakeOobChannels.size()); - inOrder.verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); - StreamObserver lbResponseObserver = lbResponseObserverCaptor.getValue(); - assertEquals(1, lbRequestObservers.size()); - StreamObserver lbRequestObserver = lbRequestObservers.poll(); - verify(lbRequestObserver).onNext( - eq(LoadBalanceRequest.newBuilder().setInitialRequest( - InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) - .build())); + private void pickReportUpstart(PickSubchannelArgs args, long loadReportIntervalMillis) { + assertNextReport( + inOrder, lbRequestObserver, loadReportIntervalMillis, + ClientStats.newBuilder() + .setNumCallsStarted(1) + .build()); - // Close lbStream - lbResponseObserver.onCompleted(); - inOrder.verify(backoffPolicyProvider).get(); - inOrder.verify(backoffPolicy1).nextBackoffNanos(); - // Retry task scheduled - assertEquals(1, fakeClock.numPendingTasks(LB_RPC_RETRY_TASK_FILTER)); - FakeClock.ScheduledTask retryTask = - Iterables.getOnlyElement(fakeClock.getPendingTasks(LB_RPC_RETRY_TASK_FILTER)); - assertEquals(10L, retryTask.getDelay(TimeUnit.NANOSECONDS)); + PickResult pick4 = picker.pickSubchannel(args); + assertNull(pick4.getSubchannel()); + assertSame(DROP_PICK_RESULT, pick4); + } - // Receive the same Lb address again - deliverResolvedAddresses(Collections.emptyList(), - createResolvedBalancerAddresses(1)); - // Retry task cancelled + private ClientStreamTracer getClientStreamTracer(PickSubchannelArgs args, + long loadReportIntervalMillis, PickResult pick1) { + assertNextReport( + inOrder, lbRequestObserver, loadReportIntervalMillis, + ClientStats.newBuilder().build()); + + ClientStreamTracer tracer1 = + pick1.getStreamTracerFactory().newClientStreamTracer(STREAM_INFO, new Metadata()); + tracer1.streamCreated(Attributes.EMPTY, new Metadata()); + + PickResult pick2 = picker.pickSubchannel(args); + assertNull(pick2.getSubchannel()); + assertSame(DROP_PICK_RESULT, pick2); + return tracer1; + } + + private ClientStreamTracer getClientStreamTracer(PickSubchannelArgs args, + long loadReportIntervalMillis) { + PickResult pick3 = getPickResult(loadReportIntervalMillis, + ClientStats.newBuilder() + .setNumCallsStarted(2) + .setNumCallsFinished(1) // pick2 + .addCallsFinishedWithDrop( + ClientStatsPerToken.newBuilder() + .setLoadBalanceToken("token0001") + .setNumCalls(1) // pick2 + .build()), args, subchannel2); + ClientStreamTracer tracer3 = + pick3.getStreamTracerFactory().newClientStreamTracer(STREAM_INFO, new Metadata()); + tracer3.streamCreated(Attributes.EMPTY, new Metadata()); + return tracer3; + } + + private PickResult getPickResult(long loadReportIntervalMillis, ClientStats.Builder newBuilder, + PickSubchannelArgs args, Subchannel subchannel1) { + assertNextReport( + inOrder, lbRequestObserver, loadReportIntervalMillis, + newBuilder.build()); + + PickResult pick1 = picker.pickSubchannel(args); + assertSame(subchannel1, pick1.getSubchannel()); + assertSame(getLoadRecorder(), pick1.getStreamTracerFactory()); + return pick1; + } + + private void scheduleLoadReporting(List backends) { + lbResponseObserver.onNext(buildLbResponse(backends)); + + assertEquals(2, mockSubchannels.size()); + subchannel1 = mockSubchannels.poll(); + subchannel2 = mockSubchannels.poll(); + deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(CONNECTING)); + deliverSubchannelState(subchannel2, ConnectivityStateInfo.forNonError(CONNECTING)); + deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(READY)); + deliverSubchannelState(subchannel2, ConnectivityStateInfo.forNonError(READY)); + + helperInOrder.verify(helper, atLeast(1)) + .updateBalancingState(eq(READY), pickerCaptor.capture()); + picker = (RoundRobinPicker) pickerCaptor.getValue(); + assertThat(picker.dropList).containsExactly( + null, + new DropEntry(getLoadRecorder(), "token0001"), + null, + new DropEntry(getLoadRecorder(), "token0003")).inOrder(); + assertThat(picker.pickList).containsExactly( + new BackendEntry(subchannel1, getLoadRecorder(), "token0001"), + new BackendEntry(subchannel2, getLoadRecorder(), "token0002")).inOrder(); + } + + private void startFallbackTimer() { + assertEquals(1, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER)); + + assertEquals(1, fakeOobChannels.size()); + verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); + lbResponseObserver = lbResponseObserverCaptor.getValue(); + assertEquals(1, lbRequestObservers.size()); + lbRequestObserver = lbRequestObservers.poll(); + inOrder = inOrder(lbRequestObserver); + helperInOrder = inOrder(helper, subchannelPool); + + inOrder.verify(lbRequestObserver).onNext( + eq(LoadBalanceRequest.newBuilder().setInitialRequest( + InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) + .build())); + } + + @Test + public void abundantInitialResponse() { + List grpclbBalancerList = createResolvedBalancerAddresses(1); + deliverResolvedAddresses(Collections.emptyList(), grpclbBalancerList); + assertEquals(1, fakeOobChannels.size()); + verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); + StreamObserver lbResponseObserver = lbResponseObserverCaptor.getValue(); + + // Simulate LB initial response + assertEquals(0, fakeClock.numPendingTasks(LOAD_REPORTING_TASK_FILTER)); + lbResponseObserver.onNext(buildInitialResponse(1983)); + + // Load reporting task is scheduled + assertEquals(1, fakeClock.numPendingTasks(LOAD_REPORTING_TASK_FILTER)); + FakeClock.ScheduledTask scheduledTask = + Iterables.getOnlyElement(fakeClock.getPendingTasks(LOAD_REPORTING_TASK_FILTER)); + assertEquals(1983, scheduledTask.getDelay(TimeUnit.MILLISECONDS)); + + logs.clear(); + // Simulate an abundant LB initial response, with a different report interval + lbResponseObserver.onNext(buildInitialResponse(9097)); + + // This incident is logged + assertThat(logs).containsExactly( + "DEBUG: [grpclb-] Got an LB response: " + buildInitialResponse(9097), + "WARNING: [grpclb-] " + + "Ignoring unexpected response type: INITIAL_RESPONSE") + .inOrder(); + + // It doesn't affect load-reporting at all + assertThat(fakeClock.getPendingTasks(LOAD_REPORTING_TASK_FILTER)) + .containsExactly(scheduledTask); + assertEquals(1983, scheduledTask.getDelay(TimeUnit.MILLISECONDS)); + } + + @SuppressWarnings("unchecked") + @Test + public void raceBetweenHandleAddressesAndLbStreamClosure() { + InOrder inOrder = inOrder(mockLbService, backoffPolicyProvider, backoffPolicy1); + deliverResolvedAddresses(Collections.emptyList(), + createResolvedBalancerAddresses(1)); + assertEquals(1, fakeOobChannels.size()); + inOrder.verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); + StreamObserver lbResponseObserver = lbResponseObserverCaptor.getValue(); + assertEquals(1, lbRequestObservers.size()); + StreamObserver lbRequestObserver = lbRequestObservers.poll(); + verify(lbRequestObserver).onNext( + eq(LoadBalanceRequest.newBuilder().setInitialRequest( + InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) + .build())); + + // Close lbStream + lbResponseObserver.onCompleted(); + inOrder.verify(backoffPolicyProvider).get(); + inOrder.verify(backoffPolicy1).nextBackoffNanos(); + // Retry task scheduled + assertEquals(1, fakeClock.numPendingTasks(LB_RPC_RETRY_TASK_FILTER)); + FakeClock.ScheduledTask retryTask = + Iterables.getOnlyElement(fakeClock.getPendingTasks(LB_RPC_RETRY_TASK_FILTER)); + assertEquals(10L, retryTask.getDelay(TimeUnit.NANOSECONDS)); + + // Receive the same Lb address again + deliverResolvedAddresses(Collections.emptyList(), + createResolvedBalancerAddresses(1)); + // Retry task cancelled assertEquals(0, fakeClock.numPendingTasks(LB_RPC_RETRY_TASK_FILTER)); // Reuse the existing OOB channel assertEquals(1, fakeOobChannels.size()); @@ -708,7 +769,7 @@ public void raceBetweenHandleAddressesAndLbStreamClosure() { lbRequestObserver = lbRequestObservers.poll(); verify(lbRequestObserver).onNext( eq(LoadBalanceRequest.newBuilder().setInitialRequest( - InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) + InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) .build())); // Simulate a race condition where the task has just started when it's cancelled @@ -729,7 +790,7 @@ public void raceBetweenLoadReportingAndLbStreamClosure() { inOrder.verify(lbRequestObserver).onNext( eq(LoadBalanceRequest.newBuilder().setInitialRequest( - InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) + InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) .build())); // Simulate receiving LB response @@ -894,6 +955,10 @@ public void grpclbUpdatedAddresses_reconnectOnAuthorityChange() { assertEquals(1, lbRequestObservers.size()); // No additional RPC } + List backends1 = Arrays.asList( + new ServerEntry("127.0.0.1", 2000, "token0001"), + new ServerEntry("127.0.0.1", 2010, "token0002")); + @Test public void grpclbWorking() { InOrder inOrder = inOrder(helper, subchannelPool); @@ -901,31 +966,16 @@ public void grpclbWorking() { deliverResolvedAddresses(Collections.emptyList(), grpclbBalancerList); // Fallback timer is started as soon as the addresses are resolved. - assertEquals(1, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER)); - - verify(helper).createOobChannel(eq(xattr(grpclbBalancerList)), - eq(lbAuthority(0) + NO_USE_AUTHORITY_SUFFIX)); - assertEquals(1, fakeOobChannels.size()); - ManagedChannel oobChannel = fakeOobChannels.poll(); - verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); - StreamObserver lbResponseObserver = lbResponseObserverCaptor.getValue(); - assertEquals(1, lbRequestObservers.size()); - StreamObserver lbRequestObserver = lbRequestObservers.poll(); - verify(lbRequestObserver).onNext( - eq(LoadBalanceRequest.newBuilder().setInitialRequest( - InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) - .build())); + initializeSubchannel(grpclbBalancerList); // Simulate receiving LB response - List backends1 = Arrays.asList( - new ServerEntry("127.0.0.1", 2000, "token0001"), - new ServerEntry("127.0.0.1", 2010, "token0002")); inOrder.verify(helper, never()) .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class)); logs.clear(); lbResponseObserver.onNext(buildInitialResponse()); assertThat(logs).containsExactly( - "INFO: [grpclb-] Got an LB initial response: " + buildInitialResponse()); + "INFO: [grpclb-] Got an LB initial response: " + + buildInitialResponse()); logs.clear(); lbResponseObserver.onNext(buildLbResponse(backends1)); @@ -938,48 +988,15 @@ public void grpclbWorking() { assertEquals(2, mockSubchannels.size()); Subchannel subchannel1 = mockSubchannels.poll(); Subchannel subchannel2 = mockSubchannels.poll(); - verify(subchannel1).requestConnection(); - verify(subchannel2).requestConnection(); - assertEquals( - new EquivalentAddressGroup(backends1.get(0).addr, LB_BACKEND_ATTRS), - subchannel1.getAddresses()); - assertEquals( - new EquivalentAddressGroup(backends1.get(1).addr, LB_BACKEND_ATTRS), - subchannel2.getAddresses()); + verifySubchannelRequestConnection(backends1.get(0), backends1.get(1), subchannel1, subchannel2); deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(CONNECTING)); - deliverSubchannelState(subchannel2, ConnectivityStateInfo.forNonError(CONNECTING)); - - inOrder.verify(helper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); - RoundRobinPicker picker0 = (RoundRobinPicker) pickerCaptor.getValue(); - assertThat(picker0.dropList).containsExactly(null, null); - assertThat(picker0.pickList).containsExactly(BUFFER_ENTRY); - inOrder.verifyNoMoreInteractions(); - - assertThat(logs).containsExactly( - "DEBUG: [grpclb-] Got an LB response: " + buildLbResponse(backends1)) - .inOrder(); - logs.clear(); + connectingBalanceState(subchannel2, ConnectivityStateInfo.forNonError(CONNECTING), inOrder, + CONNECTING, BUFFER_ENTRY); + inOrderVerifySubchannel(inOrder, backends1); // Let subchannels be connected - deliverSubchannelState(subchannel2, ConnectivityStateInfo.forNonError(READY)); - inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture()); - - RoundRobinPicker picker1 = (RoundRobinPicker) pickerCaptor.getValue(); - - assertThat(picker1.dropList).containsExactly(null, null); - assertThat(picker1.pickList).containsExactly( - new BackendEntry(subchannel2, getLoadRecorder(), "token0002")); - - deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(READY)); - inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture()); - - RoundRobinPicker picker2 = (RoundRobinPicker) pickerCaptor.getValue(); - assertThat(picker2.dropList).containsExactly(null, null); - assertThat(picker2.pickList).containsExactly( - new BackendEntry(subchannel1, getLoadRecorder(), "token0001"), - new BackendEntry(subchannel2, getLoadRecorder(), "token0002")) - .inOrder(); + connectSubchannels(inOrder, subchannel1, subchannel2); // Disconnected subchannels verify(subchannel1).requestConnection(); @@ -1014,21 +1031,7 @@ public void grpclbWorking() { assertThat(picker4.pickList).containsExactly(BUFFER_ENTRY); // Update backends, with a drop entry - List backends2 = - Arrays.asList( - new ServerEntry("127.0.0.1", 2030, "token0003"), // New address - new ServerEntry("token0003"), // drop - new ServerEntry("127.0.0.1", 2010, "token0004"), // Existing address with token changed - new ServerEntry("127.0.0.1", 2030, "token0005"), // New address appearing second time - new ServerEntry("token0006")); // drop - verify(subchannelPool, never()) - .returnSubchannel(same(subchannel1), any(ConnectivityStateInfo.class)); - - lbResponseObserver.onNext(buildLbResponse(backends2)); - assertThat(logs).containsExactly( - "DEBUG: [grpclb-] Got an LB response: " + buildLbResponse(backends2)) - .inOrder(); - logs.clear(); + List backends2 = getServerEntries(subchannel1); // not in backends2, closed verify(subchannelPool).returnSubchannel(same(subchannel1), same(errorState1)); @@ -1101,6 +1104,43 @@ public void grpclbWorking() { .returnSubchannel(same(subchannel3), any(ConnectivityStateInfo.class)); // Update backends, with no entry + updateBackendWithNoEntry(inOrder, oobChannel, lbResponseObserver, lbRequestObserver, + subchannel2, subchannel3); + + // Load reporting was not requested, thus never scheduled + assertEquals(0, fakeClock.numPendingTasks(LOAD_REPORTING_TASK_FILTER)); + + verify(subchannelPool, never()).clear(); + balancer.shutdown(); + verify(subchannelPool).clear(); + } + + private List getServerEntries(Subchannel subchannel1) { + List backends2 = + Arrays.asList( + new ServerEntry("127.0.0.1", 2030, "token0003"), // New address + new ServerEntry("token0003"), // drop + // Existing address with token changed + new ServerEntry("127.0.0.1", 2010, "token0004"), + // New address appearing second time + new ServerEntry("127.0.0.1", 2030, "token0005"), + new ServerEntry("token0006")); // drop + verify(subchannelPool, never()) + .returnSubchannel(same(subchannel1), any(ConnectivityStateInfo.class)); + + lbResponseObserver.onNext(buildLbResponse(backends2)); + assertThat(logs).containsExactly( + "DEBUG: [grpclb-] Got an LB response: " + + buildLbResponse(backends2)) + .inOrder(); + logs.clear(); + return backends2; + } + + private void updateBackendWithNoEntry(InOrder inOrder, ManagedChannel oobChannel, + StreamObserver lbResponseObserver, + StreamObserver lbRequestObserver, Subchannel subchannel2, + Subchannel subchannel3) { lbResponseObserver.onNext(buildLbResponse(Collections.emptyList())); verify(subchannelPool) .returnSubchannel(same(subchannel2), eq(ConnectivityStateInfo.forNonError(READY))); @@ -1116,13 +1156,6 @@ public void grpclbWorking() { assertEquals(0, lbRequestObservers.size()); verify(lbRequestObserver, never()).onCompleted(); verify(lbRequestObserver, never()).onError(any(Throwable.class)); - - // Load reporting was not requested, thus never scheduled - assertEquals(0, fakeClock.numPendingTasks(LOAD_REPORTING_TASK_FILTER)); - - verify(subchannelPool, never()).clear(); - balancer.shutdown(); - verify(subchannelPool).clear(); } @Test @@ -1134,9 +1167,6 @@ public void roundRobinMode_subchannelStayTransientFailureUntilReady() { StreamObserver lbResponseObserver = lbResponseObserverCaptor.getValue(); // Simulate receiving LB response - List backends1 = Arrays.asList( - new ServerEntry("127.0.0.1", 2000, "token0001"), - new ServerEntry("127.0.0.1", 2010, "token0002")); lbResponseObserver.onNext(buildInitialResponse()); lbResponseObserver.onNext(buildLbResponse(backends1)); assertEquals(2, mockSubchannels.size()); @@ -1200,8 +1230,8 @@ private void subtestGrpclbFallbackTimeout(boolean timerExpires, long timeout) { // Create balancer and backend addresses List backendList = createResolvedBackendAddresses(2); List grpclbBalancerList = createResolvedBalancerAddresses(1); - deliverResolvedAddresses( - backendList, grpclbBalancerList, GrpclbConfig.create(Mode.ROUND_ROBIN, null, timeout)); + deliverResolvedAddresses(backendList, grpclbBalancerList, + GrpclbConfig.create(Mode.ROUND_ROBIN, null, timeout)); inOrder.verify(helper).createOobChannel(eq(xattr(grpclbBalancerList)), eq(lbAuthority(0) + NO_USE_AUTHORITY_SUFFIX)); @@ -1211,16 +1241,8 @@ private void subtestGrpclbFallbackTimeout(boolean timerExpires, long timeout) { verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); StreamObserver lbResponseObserver = lbResponseObserverCaptor.getValue(); assertEquals(1, lbRequestObservers.size()); - StreamObserver lbRequestObserver = lbRequestObservers.poll(); - - verify(lbRequestObserver).onNext( - eq(LoadBalanceRequest.newBuilder().setInitialRequest( - InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) - .build())); - lbResponseObserver.onNext(buildInitialResponse(loadReportIntervalMillis)); - // We don't care if these methods have been run. - inOrder.verify(helper, atLeast(0)).getSynchronizationContext(); - inOrder.verify(helper, atLeast(0)).getScheduledExecutorService(); + StreamObserver lbRequestObserver = verifyLbRequestObserver( + loadReportIntervalMillis, inOrder, lbResponseObserver); inOrder.verifyNoMoreInteractions(); @@ -1267,14 +1289,7 @@ private void subtestGrpclbFallbackTimeout(boolean timerExpires, long timeout) { } // RPC error status includes message of balancer RPC timeout - inOrder.verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture()); - PickResult result = pickerCaptor.getValue().pickSubchannel(mock(PickSubchannelArgs.class)); - assertThat(result.getStatus().getCode()) - .isEqualTo(Code.UNAVAILABLE); - assertThat(result.getStatus().getDescription()) - .startsWith(GrpclbState.NO_FALLBACK_BACKENDS_STATUS.getDescription()); - assertThat(result.getStatus().getDescription()) - .contains(GrpclbState.BALANCER_TIMEOUT_STATUS.getDescription()); + balancerRequestFallbackErrorStatus(inOrder, GrpclbState.BALANCER_TIMEOUT_STATUS); } //////////////////////////////////////////////////////////////// @@ -1284,8 +1299,8 @@ private void subtestGrpclbFallbackTimeout(boolean timerExpires, long timeout) { subchannelPool.clear(); backendList = createResolvedBackendAddresses(2); grpclbBalancerList = createResolvedBalancerAddresses(1); - deliverResolvedAddresses( - backendList, grpclbBalancerList, GrpclbConfig.create(Mode.ROUND_ROBIN, null, timeout)); + deliverResolvedAddresses(backendList, grpclbBalancerList, + GrpclbConfig.create(Mode.ROUND_ROBIN, null, timeout)); // New LB address is updated to the OobChannel inOrder.verify(helper).updateOobChannelAddresses( @@ -1308,13 +1323,14 @@ private void subtestGrpclbFallbackTimeout(boolean timerExpires, long timeout) { inOrder.verify(helper, never()) .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class)); // A new stream is created - verify(mockLbService, times(2)).balanceLoad(lbResponseObserverCaptor.capture()); + verify(mockLbService, times(2)) + .balanceLoad(lbResponseObserverCaptor.capture()); lbResponseObserver = lbResponseObserverCaptor.getValue(); assertEquals(1, lbRequestObservers.size()); lbRequestObserver = lbRequestObservers.poll(); verify(lbRequestObserver).onNext( eq(LoadBalanceRequest.newBuilder().setInitialRequest( - InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) + InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) .build())); } @@ -1335,8 +1351,8 @@ private void subtestGrpclbFallbackTimeout(boolean timerExpires, long timeout) { /////////////////////////////////////////////////////////////// backendList = createResolvedBackendAddresses(1); grpclbBalancerList = createResolvedBalancerAddresses(1); - deliverResolvedAddresses( - backendList, grpclbBalancerList, GrpclbConfig.create(Mode.ROUND_ROBIN, null, timeout)); + deliverResolvedAddresses(backendList, grpclbBalancerList, + GrpclbConfig.create(Mode.ROUND_ROBIN, null, timeout)); // Will not affect the round robin list at all inOrder.verify(helper, never()) .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class)); @@ -1363,16 +1379,8 @@ public void grpclbFallback_breakLbStreamBeforeFallbackTimerExpires() { verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); StreamObserver lbResponseObserver = lbResponseObserverCaptor.getValue(); assertThat(lbRequestObservers).hasSize(1); - StreamObserver lbRequestObserver = lbRequestObservers.poll(); - - verify(lbRequestObserver).onNext( - eq(LoadBalanceRequest.newBuilder().setInitialRequest( - InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) - .build())); - lbResponseObserver.onNext(buildInitialResponse(loadReportIntervalMillis)); - // We don't care if these methods have been run. - inOrder.verify(helper, atLeast(0)).getSynchronizationContext(); - inOrder.verify(helper, atLeast(0)).getScheduledExecutorService(); + verifyLbRequestObserver(loadReportIntervalMillis, inOrder, lbResponseObserver); + StreamObserver lbRequestObserver; inOrder.verifyNoMoreInteractions(); @@ -1392,12 +1400,13 @@ public void grpclbFallback_breakLbStreamBeforeFallbackTimerExpires() { inOrder, Arrays.asList(backendList.get(0), backendList.get(1))); // A new stream is created - verify(mockLbService, times(2)).balanceLoad(lbResponseObserverCaptor.capture()); + verify(mockLbService, times(2)) + .balanceLoad(lbResponseObserverCaptor.capture()); assertThat(lbRequestObservers).hasSize(1); lbRequestObserver = lbRequestObservers.poll(); verify(lbRequestObserver).onNext( eq(LoadBalanceRequest.newBuilder().setInitialRequest( - InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) + InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) .build())); ////////////////////////////////////////////////////////////////////// @@ -1411,12 +1420,7 @@ public void grpclbFallback_breakLbStreamBeforeFallbackTimerExpires() { } // RPC error status includes error of balancer stream - inOrder.verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture()); - PickResult result = pickerCaptor.getValue().pickSubchannel(mock(PickSubchannelArgs.class)); - assertThat(result.getStatus().getCode()).isEqualTo(Code.UNAVAILABLE); - assertThat(result.getStatus().getDescription()) - .startsWith(GrpclbState.NO_FALLBACK_BACKENDS_STATUS.getDescription()); - assertThat(result.getStatus().getDescription()).contains(streamError.getDescription()); + balancerRequestFallbackErrorStatus(inOrder, streamError); } @Test @@ -1529,21 +1533,8 @@ private void subtestGrpclbFallbackConnectionLost( eq(lbAuthority(0) + NO_USE_AUTHORITY_SUFFIX)); // Attempted to connect to balancer - assertEquals(1, fakeOobChannels.size()); - fakeOobChannels.poll(); - inOrder.verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); - StreamObserver lbResponseObserver = lbResponseObserverCaptor.getValue(); - assertEquals(1, lbRequestObservers.size()); - StreamObserver lbRequestObserver = lbRequestObservers.poll(); - - verify(lbRequestObserver).onNext( - eq(LoadBalanceRequest.newBuilder().setInitialRequest( - InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) - .build())); - lbResponseObserver.onNext(buildInitialResponse(loadReportIntervalMillis)); - // We don't care if these methods have been run. - inOrder.verify(helper, atLeast(0)).getSynchronizationContext(); - inOrder.verify(helper, atLeast(0)).getScheduledExecutorService(); + StreamObserver lbResponseObserver = getLoadBalanceResponseStreamObserver( + loadReportIntervalMillis, inOrder); inOrder.verifyNoMoreInteractions(); @@ -1563,7 +1554,6 @@ private void subtestGrpclbFallbackConnectionLost( inOrder.verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); lbResponseObserver = lbResponseObserverCaptor.getValue(); assertEquals(1, lbRequestObservers.size()); - lbRequestObserver = lbRequestObservers.poll(); inOrder.verify(helper).refreshNameResolution(); } if (allSubchannelsBroken) { @@ -1619,21 +1609,8 @@ public void grpclbFallback_allLost_failToFallback() { eq(lbAuthority(0) + NO_USE_AUTHORITY_SUFFIX)); // Attempted to connect to balancer - assertEquals(1, fakeOobChannels.size()); - fakeOobChannels.poll(); - inOrder.verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); - StreamObserver lbResponseObserver = lbResponseObserverCaptor.getValue(); - assertEquals(1, lbRequestObservers.size()); - StreamObserver lbRequestObserver = lbRequestObservers.poll(); - - verify(lbRequestObserver).onNext( - eq(LoadBalanceRequest.newBuilder().setInitialRequest( - InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) - .build())); - lbResponseObserver.onNext(buildInitialResponse(loadReportIntervalMillis)); - // We don't care if these methods have been run. - inOrder.verify(helper, atLeast(0)).getSynchronizationContext(); - inOrder.verify(helper, atLeast(0)).getScheduledExecutorService(); + StreamObserver lbResponseObserver = getLoadBalanceResponseStreamObserver( + loadReportIntervalMillis, inOrder); inOrder.verifyNoMoreInteractions(); @@ -1664,12 +1641,52 @@ public void grpclbFallback_allLost_failToFallback() { } // RPC error status includes errors of subchannels to balancer-provided backends - inOrder.verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture()); - PickResult result = pickerCaptor.getValue().pickSubchannel(mock(PickSubchannelArgs.class)); - assertThat(result.getStatus().getCode()).isEqualTo(Code.UNAVAILABLE); - assertThat(result.getStatus().getDescription()) - .startsWith(GrpclbState.NO_FALLBACK_BACKENDS_STATUS.getDescription()); - assertThat(result.getStatus().getDescription()).contains(error.getDescription()); + balancerRequestFallbackErrorStatus(inOrder, error); + } + + private StreamObserver getLoadBalanceResponseStreamObserver() { + List grpclbBalancerList = createResolvedBalancerAddresses(1); + deliverResolvedAddresses( + Collections.emptyList(), + grpclbBalancerList, + GrpclbConfig.create(Mode.PICK_FIRST)); + + assertEquals(1, fakeOobChannels.size()); + verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); + StreamObserver lbResponseObserver = lbResponseObserverCaptor.getValue(); + assertEquals(1, lbRequestObservers.size()); + StreamObserver lbRequestObserver = lbRequestObservers.poll(); + verify(lbRequestObserver).onNext( + eq(LoadBalanceRequest.newBuilder().setInitialRequest( + InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) + .build())); + return lbResponseObserver; + } + + private StreamObserver getLoadBalanceResponseStreamObserver( + long loadReportIntervalMillis, InOrder inOrder) { + assertEquals(1, fakeOobChannels.size()); + fakeOobChannels.poll(); + inOrder.verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); + StreamObserver lbResponseObserver = lbResponseObserverCaptor.getValue(); + assertEquals(1, lbRequestObservers.size()); + verifyLbRequestObserver(loadReportIntervalMillis, inOrder, lbResponseObserver); + return lbResponseObserver; + } + + private StreamObserver verifyLbRequestObserver(long loadReportIntervalMillis, + InOrder inOrder, StreamObserver lbResponseObserver) { + StreamObserver lbRequestObserver = lbRequestObservers.poll(); + + verify(lbRequestObserver).onNext( + eq(LoadBalanceRequest.newBuilder().setInitialRequest( + InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) + .build())); + lbResponseObserver.onNext(buildInitialResponse(loadReportIntervalMillis)); + // We don't care if these methods have been run. + inOrder.verify(helper, atLeast(0)).getSynchronizationContext(); + inOrder.verify(helper, atLeast(0)).getScheduledExecutorService(); + return lbRequestObserver; } private List fallbackTestVerifyUseOfFallbackBackendLists( @@ -1698,7 +1715,8 @@ private List fallbackTestVerifyUseOfBackendLists( inOrder.verify(subchannelPool).takeOrCreateSubchannel(eq(addr), any(Attributes.class)); } RoundRobinPicker picker = (RoundRobinPicker) currentPicker; - assertThat(picker.dropList).containsExactlyElementsIn(Collections.nCopies(addrs.size(), null)); + assertThat(picker.dropList) + .containsExactlyElementsIn(Collections.nCopies(addrs.size(), null)); assertThat(picker.pickList).containsExactly(GrpclbState.BUFFER_ENTRY); assertEquals(addrs.size(), mockSubchannels.size()); ArrayList subchannels = new ArrayList<>(mockSubchannels); @@ -1748,7 +1766,7 @@ public void grpclbMultipleAuthorities() throws Exception { new FakeSocketAddress("fake-address-3"), lbAttributes("fake-authority-1").toBuilder() .set(GrpclbConstants.TOKEN_ATTRIBUTE_KEY, "value").build() - )); + )); deliverResolvedAddresses(backendList, grpclbBalancerList); List goldenOobEagList = @@ -1780,27 +1798,14 @@ public void grpclbMultipleAuthorities() throws Exception { @Test public void grpclbBalancerStreamClosedAndRetried() throws Exception { - LoadBalanceRequest expectedInitialRequest = - LoadBalanceRequest.newBuilder() - .setInitialRequest( - InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) - .build(); + LoadBalanceRequest expectedInitialRequest = getIntialLoadBalanceRequest(); + InOrder inOrder = inOrder(mockLbService, backoffPolicyProvider, backoffPolicy1, backoffPolicy2, helper); - List grpclbBalancerList = createResolvedBalancerAddresses(1); - deliverResolvedAddresses(Collections.emptyList(), grpclbBalancerList); - - assertEquals(1, fakeOobChannels.size()); - @SuppressWarnings("unused") - ManagedChannel oobChannel = fakeOobChannels.poll(); - // First balancer RPC - inOrder.verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); - StreamObserver lbResponseObserver = lbResponseObserverCaptor.getValue(); - assertEquals(1, lbRequestObservers.size()); - StreamObserver lbRequestObserver = lbRequestObservers.poll(); - verify(lbRequestObserver).onNext(eq(expectedInitialRequest)); - assertEquals(0, fakeClock.numPendingTasks(LB_RPC_RETRY_TASK_FILTER)); + StreamObserver lbRequestObserver; + StreamObserver lbResponseObserver; + lbResponseObserver = getFirstBalancerRpc(expectedInitialRequest, inOrder); // Balancer closes it immediately (erroneously) lbResponseObserver.onCompleted(); @@ -1811,17 +1816,7 @@ public void grpclbBalancerStreamClosedAndRetried() throws Exception { assertEquals(1, fakeClock.numPendingTasks(LB_RPC_RETRY_TASK_FILTER)); inOrder.verify(helper).refreshNameResolution(); - // Fast-forward to a moment before the retry - fakeClock.forwardNanos(9); - verifyNoMoreInteractions(mockLbService); - // Then time for retry - fakeClock.forwardNanos(1); - inOrder.verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); - lbResponseObserver = lbResponseObserverCaptor.getValue(); - assertEquals(1, lbRequestObservers.size()); - lbRequestObserver = lbRequestObservers.poll(); - verify(lbRequestObserver).onNext(eq(expectedInitialRequest)); - assertEquals(0, fakeClock.numPendingTasks(LB_RPC_RETRY_TASK_FILTER)); + lbResponseObserver = fastForwardAndRetry(9, inOrder, expectedInitialRequest); // Balancer closes it with an error. lbResponseObserver.onError(Status.UNAVAILABLE.asException()); @@ -1832,16 +1827,7 @@ public void grpclbBalancerStreamClosedAndRetried() throws Exception { inOrder.verify(helper).refreshNameResolution(); // Fast-forward to a moment before the retry - fakeClock.forwardNanos(100 - 1); - verifyNoMoreInteractions(mockLbService); - // Then time for retry - fakeClock.forwardNanos(1); - inOrder.verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); - lbResponseObserver = lbResponseObserverCaptor.getValue(); - assertEquals(1, lbRequestObservers.size()); - lbRequestObserver = lbRequestObservers.poll(); - verify(lbRequestObserver).onNext(eq(expectedInitialRequest)); - assertEquals(0, fakeClock.numPendingTasks(LB_RPC_RETRY_TASK_FILTER)); + lbResponseObserver = fastForwardAndRetry(100 - 1, inOrder, expectedInitialRequest); // Balancer sends initial response. lbResponseObserver.onNext(buildInitialResponse()); @@ -1868,121 +1854,99 @@ public void grpclbBalancerStreamClosedAndRetried() throws Exception { inOrder.verify(helper).refreshNameResolution(); // Fast-forward to a moment before the retry, the time spent in the last try is deducted. - fakeClock.forwardNanos(10 - 4 - 1); + fastForwardAndRetry(10 - 4 - 1, inOrder, expectedInitialRequest); + + // Wrapping up + verify(backoffPolicyProvider, times(2)).get(); + verify(backoffPolicy1, times(2)).nextBackoffNanos(); + verify(backoffPolicy2, times(1)).nextBackoffNanos(); + verify(helper, times(4)).refreshNameResolution(); + } + + private StreamObserver fastForwardAndRetry(int nanos, InOrder inOrder, + LoadBalanceRequest expectedInitialRequest) { + // Fast-forward to a moment before the retry + fakeClock.forwardNanos(nanos); verifyNoMoreInteractions(mockLbService); // Then time for retry fakeClock.forwardNanos(1); inOrder.verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); + StreamObserver lbResponseObserver = lbResponseObserverCaptor.getValue(); + verifyRequestProcessed(expectedInitialRequest); + return lbResponseObserver; + } + + private void verifyRequestProcessed(LoadBalanceRequest expectedInitialRequest) { assertEquals(1, lbRequestObservers.size()); - lbRequestObserver = lbRequestObservers.poll(); + StreamObserver lbRequestObserver = lbRequestObservers.poll(); verify(lbRequestObserver).onNext(eq(expectedInitialRequest)); assertEquals(0, fakeClock.numPendingTasks(LB_RPC_RETRY_TASK_FILTER)); + } - // Wrapping up - verify(backoffPolicyProvider, times(2)).get(); - verify(backoffPolicy1, times(2)).nextBackoffNanos(); - verify(backoffPolicy2, times(1)).nextBackoffNanos(); - verify(helper, times(4)).refreshNameResolution(); + private StreamObserver getFirstBalancerRpc(LoadBalanceRequest + expectedInitialRequest, InOrder inOrder) { + // First balancer RPC + inOrder.verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); + StreamObserver lbResponseObserver = lbResponseObserverCaptor.getValue(); + verifyRequestProcessed(expectedInitialRequest); + return lbResponseObserver; } - @Test - public void grpclbWorking_pickFirstMode() throws Exception { - InOrder inOrder = inOrder(helper); + private LoadBalanceRequest getIntialLoadBalanceRequest() { + LoadBalanceRequest expectedInitialRequest = + LoadBalanceRequest.newBuilder() + .setInitialRequest( + InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) + .build(); List grpclbBalancerList = createResolvedBalancerAddresses(1); - - deliverResolvedAddresses( - Collections.emptyList(), - grpclbBalancerList, - GrpclbConfig.create(Mode.PICK_FIRST)); + deliverResolvedAddresses(Collections.emptyList(), grpclbBalancerList); assertEquals(1, fakeOobChannels.size()); - verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); - StreamObserver lbResponseObserver = lbResponseObserverCaptor.getValue(); - assertEquals(1, lbRequestObservers.size()); - StreamObserver lbRequestObserver = lbRequestObservers.poll(); - verify(lbRequestObserver).onNext( - eq(LoadBalanceRequest.newBuilder().setInitialRequest( - InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) - .build())); + @SuppressWarnings("unused") + ManagedChannel oobChannel = fakeOobChannels.poll(); + return expectedInitialRequest; + } - // Simulate receiving LB response - List backends1 = Arrays.asList( - new ServerEntry("127.0.0.1", 2000, "token0001"), - new ServerEntry("127.0.0.1", 2010, "token0002")); - inOrder.verify(helper, never()) - .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class)); - lbResponseObserver.onNext(buildInitialResponse()); - lbResponseObserver.onNext(buildLbResponse(backends1)); + @Test + public void grpclbWorking_pickFirstMode() throws Exception { + InOrder inOrder = inOrder(helper); - inOrder.verify(helper).createSubchannel(createSubchannelArgsCaptor.capture()); - CreateSubchannelArgs createSubchannelArgs = createSubchannelArgsCaptor.getValue(); - assertThat(createSubchannelArgs.getAddresses()) - .containsExactly( - new EquivalentAddressGroup(backends1.get(0).addr, eagAttrsWithToken("token0001")), - new EquivalentAddressGroup(backends1.get(1).addr, eagAttrsWithToken("token0002"))); + StreamObserver lbResponseObserver = getLoadBalanceResponseStreamObserver(); + + // Simulate receiving LB response + simulateReceivingLbResponse(inOrder, lbResponseObserver); // Initially IDLE inOrder.verify(helper).updateBalancingState(eq(IDLE), pickerCaptor.capture()); RoundRobinPicker picker0 = (RoundRobinPicker) pickerCaptor.getValue(); // Only one subchannel is created - assertThat(mockSubchannels).hasSize(1); - Subchannel subchannel = mockSubchannels.poll(); - assertThat(picker0.dropList).containsExactly(null, null); - assertThat(picker0.pickList).containsExactly(new IdleSubchannelEntry(subchannel, syncContext)); + Subchannel subchannel = getOneSubchannel(picker0); // PICK_FIRST doesn't eagerly connect verify(subchannel, never()).requestConnection(); // CONNECTING - deliverSubchannelState(subchannel, ConnectivityStateInfo.forNonError(CONNECTING)); - inOrder.verify(helper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); - RoundRobinPicker picker1 = (RoundRobinPicker) pickerCaptor.getValue(); - assertThat(picker1.dropList).containsExactly(null, null); - assertThat(picker1.pickList).containsExactly(BUFFER_ENTRY); + connectingBalanceState(subchannel, ConnectivityStateInfo.forNonError(CONNECTING), inOrder, + CONNECTING, BUFFER_ENTRY); // TRANSIENT_FAILURE Status error = Status.UNAVAILABLE.withDescription("Simulated connection error"); - deliverSubchannelState(subchannel, ConnectivityStateInfo.forTransientFailure(error)); - inOrder.verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture()); - RoundRobinPicker picker2 = (RoundRobinPicker) pickerCaptor.getValue(); - assertThat(picker2.dropList).containsExactly(null, null); - assertThat(picker2.pickList).containsExactly(new ErrorEntry(error)); + connectingBalanceState(subchannel, ConnectivityStateInfo.forTransientFailure(error), inOrder, + TRANSIENT_FAILURE, new ErrorEntry(error)); // READY - deliverSubchannelState(subchannel, ConnectivityStateInfo.forNonError(READY)); - inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture()); - RoundRobinPicker picker3 = (RoundRobinPicker) pickerCaptor.getValue(); - assertThat(picker3.dropList).containsExactly(null, null); - assertThat(picker3.pickList).containsExactly( + connectingBalanceState(subchannel, ConnectivityStateInfo.forNonError(READY), inOrder, READY, new BackendEntry(subchannel, new TokenAttachingTracerFactory(getLoadRecorder()))); - // New server list with drops - List backends2 = Arrays.asList( - new ServerEntry("127.0.0.1", 2000, "token0001"), - new ServerEntry("token0003"), // drop - new ServerEntry("127.0.0.1", 2020, "token0004")); - inOrder.verify(helper, never()) - .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class)); - lbResponseObserver.onNext(buildLbResponse(backends2)); + List backends2 = testRecoverNewLbResponseAddress( + inOrder, lbResponseObserver); // new addresses will be updated to the existing subchannel // createSubchannel() has ever been called only once - verify(helper, times(1)).createSubchannel(any(CreateSubchannelArgs.class)); - assertThat(mockSubchannels).isEmpty(); - verify(subchannel).updateAddresses( - eq(Arrays.asList( - new EquivalentAddressGroup(backends2.get(0).addr, eagAttrsWithToken("token0001")), - new EquivalentAddressGroup(backends2.get(2).addr, - eagAttrsWithToken("token0004"))))); - inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture()); - RoundRobinPicker picker4 = (RoundRobinPicker) pickerCaptor.getValue(); - assertThat(picker4.dropList).containsExactly( - null, new DropEntry(getLoadRecorder(), "token0003"), null); - assertThat(picker4.pickList).containsExactly( - new BackendEntry(subchannel, new TokenAttachingTracerFactory(getLoadRecorder()))); + createNewAddressSubchannel(inOrder, subchannel, backends2); // Subchannel goes IDLE, but PICK_FIRST will not try to reconnect deliverSubchannelState(subchannel, ConnectivityStateInfo.forNonError(IDLE)); @@ -2007,76 +1971,52 @@ public void grpclbWorking_pickFirstMode() throws Exception { .returnSubchannel(any(Subchannel.class), any(ConnectivityStateInfo.class)); } + private void createNewAddressSubchannel(InOrder inOrder, Subchannel subchannel, + List backends2) { + verify(helper, times(1)) + .createSubchannel(any(CreateSubchannelArgs.class)); + assertThat(mockSubchannels).isEmpty(); + verify(subchannel).updateAddresses( + eq(Arrays.asList( + new EquivalentAddressGroup(backends2.get(0).addr, eagAttrsWithToken("token0001")), + new EquivalentAddressGroup(backends2.get(2).addr, + eagAttrsWithToken("token0004"))))); + inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture()); + RoundRobinPicker picker4 = (RoundRobinPicker) pickerCaptor.getValue(); + assertThat(picker4.dropList).containsExactly( + null, new DropEntry(getLoadRecorder(), "token0003"), null); + assertThat(picker4.pickList).containsExactly( + new BackendEntry(subchannel, new TokenAttachingTracerFactory(getLoadRecorder()))); + } + @Test public void grpclbWorking_pickFirstMode_lbSendsEmptyAddress() throws Exception { InOrder inOrder = inOrder(helper); - List grpclbBalancerList = createResolvedBalancerAddresses(1); - deliverResolvedAddresses( - Collections.emptyList(), - grpclbBalancerList, - GrpclbConfig.create(Mode.PICK_FIRST)); - - assertEquals(1, fakeOobChannels.size()); - verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); - StreamObserver lbResponseObserver = lbResponseObserverCaptor.getValue(); - assertEquals(1, lbRequestObservers.size()); - StreamObserver lbRequestObserver = lbRequestObservers.poll(); - verify(lbRequestObserver).onNext( - eq(LoadBalanceRequest.newBuilder().setInitialRequest( - InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) - .build())); + StreamObserver lbResponseObserver = getLoadBalanceResponseStreamObserver(); - // Simulate receiving LB response - List backends1 = Arrays.asList( - new ServerEntry("127.0.0.1", 2000, "token0001"), - new ServerEntry("127.0.0.1", 2010, "token0002")); - inOrder.verify(helper, never()) - .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class)); - lbResponseObserver.onNext(buildInitialResponse()); - lbResponseObserver.onNext(buildLbResponse(backends1)); - - inOrder.verify(helper).createSubchannel(createSubchannelArgsCaptor.capture()); - CreateSubchannelArgs createSubchannelArgs = createSubchannelArgsCaptor.getValue(); - assertThat(createSubchannelArgs.getAddresses()) - .containsExactly( - new EquivalentAddressGroup(backends1.get(0).addr, eagAttrsWithToken("token0001")), - new EquivalentAddressGroup(backends1.get(1).addr, eagAttrsWithToken("token0002"))); + simulateReceivingLbResponse(inOrder, lbResponseObserver); // Initially IDLE inOrder.verify(helper).updateBalancingState(eq(IDLE), pickerCaptor.capture()); RoundRobinPicker picker0 = (RoundRobinPicker) pickerCaptor.getValue(); - // Only one subchannel is created - assertThat(mockSubchannels).hasSize(1); - Subchannel subchannel = mockSubchannels.poll(); - assertThat(picker0.dropList).containsExactly(null, null); - assertThat(picker0.pickList).containsExactly(new IdleSubchannelEntry(subchannel, syncContext)); + Subchannel subchannel = getOneSubchannel(picker0); // PICK_FIRST doesn't eagerly connect verify(subchannel, never()).requestConnection(); // CONNECTING - deliverSubchannelState(subchannel, ConnectivityStateInfo.forNonError(CONNECTING)); - inOrder.verify(helper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); - RoundRobinPicker picker1 = (RoundRobinPicker) pickerCaptor.getValue(); - assertThat(picker1.dropList).containsExactly(null, null); - assertThat(picker1.pickList).containsExactly(BUFFER_ENTRY); + connectingBalanceState(subchannel, ConnectivityStateInfo.forNonError(CONNECTING), inOrder, + CONNECTING, BUFFER_ENTRY); // TRANSIENT_FAILURE Status error = Status.UNAVAILABLE.withDescription("Simulated connection error"); - deliverSubchannelState(subchannel, ConnectivityStateInfo.forTransientFailure(error)); - inOrder.verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture()); - RoundRobinPicker picker2 = (RoundRobinPicker) pickerCaptor.getValue(); - assertThat(picker2.dropList).containsExactly(null, null); - assertThat(picker2.pickList).containsExactly(new ErrorEntry(error)); + connectingBalanceState(subchannel, ConnectivityStateInfo.forTransientFailure(error), inOrder, + TRANSIENT_FAILURE, new ErrorEntry(error)); // READY - deliverSubchannelState(subchannel, ConnectivityStateInfo.forNonError(READY)); - inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture()); - RoundRobinPicker picker3 = (RoundRobinPicker) pickerCaptor.getValue(); - assertThat(picker3.dropList).containsExactly(null, null); - assertThat(picker3.pickList).containsExactly( + connectingBalanceState(subchannel, ConnectivityStateInfo.forNonError(READY), inOrder, READY, new BackendEntry(subchannel, new TokenAttachingTracerFactory(getLoadRecorder()))); inOrder.verify(helper, never()) @@ -2091,14 +2031,23 @@ public void grpclbWorking_pickFirstMode_lbSendsEmptyAddress() throws Exception { assertThat(mockSubchannels).isEmpty(); verify(subchannel).shutdown(); - // RPC error status includes message of no backends provided by balancer - inOrder.verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture()); - RoundRobinPicker errorPicker = (RoundRobinPicker) pickerCaptor.getValue(); - assertThat(errorPicker.pickList) - .containsExactly(new ErrorEntry(GrpclbState.NO_AVAILABLE_BACKENDS_STATUS)); + noAvailableBalanceStatus(inOrder); lbResponseObserver.onNext(buildLbResponse(Collections.emptyList())); + testRecoverNewLbResponseAddress(inOrder, lbResponseObserver); + + // new addresses will be updated to the existing subchannel + inOrder.verify(helper, times(1)) + .createSubchannel(any(CreateSubchannelArgs.class)); + inOrder.verify(helper).updateBalancingState(eq(IDLE), pickerCaptor.capture()); + subchannel = mockSubchannels.poll(); + + readySubchannel(inOrder, subchannel); + } + + private List testRecoverNewLbResponseAddress(InOrder inOrder, + StreamObserver lbResponseObserver) { // Test recover from new LB response with addresses // New server list with drops List backends2 = Arrays.asList( @@ -2108,12 +2057,10 @@ public void grpclbWorking_pickFirstMode_lbSendsEmptyAddress() throws Exception { inOrder.verify(helper, never()) .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class)); lbResponseObserver.onNext(buildLbResponse(backends2)); + return backends2; + } - // new addresses will be updated to the existing subchannel - inOrder.verify(helper, times(1)).createSubchannel(any(CreateSubchannelArgs.class)); - inOrder.verify(helper).updateBalancingState(eq(IDLE), pickerCaptor.capture()); - subchannel = mockSubchannels.poll(); - + private void readySubchannel(InOrder inOrder, Subchannel subchannel) { // Subchannel became READY deliverSubchannelState(subchannel, ConnectivityStateInfo.forNonError(CONNECTING)); deliverSubchannelState(subchannel, ConnectivityStateInfo.forNonError(READY)); @@ -2123,6 +2070,71 @@ public void grpclbWorking_pickFirstMode_lbSendsEmptyAddress() throws Exception { new BackendEntry(subchannel, new TokenAttachingTracerFactory(getLoadRecorder()))); } + private void noAvailableBalanceStatus(InOrder inOrder) { + // RPC error status includes message of no backends provided by balancer + inOrder.verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture()); + RoundRobinPicker errorPicker = (RoundRobinPicker) pickerCaptor.getValue(); + assertThat(errorPicker.pickList) + .containsExactly(new ErrorEntry(GrpclbState.NO_AVAILABLE_BACKENDS_STATUS)); + } + + private void connectingBalanceState(Subchannel subchannel, ConnectivityStateInfo connectingInfo, + InOrder inOrder, ConnectivityState connecting, RoundRobinEntry bufferEntry) { + deliverSubchannelState(subchannel, connectingInfo); + inOrder.verify(helper).updateBalancingState(eq(connecting), pickerCaptor.capture()); + RoundRobinPicker picker1 = (RoundRobinPicker) pickerCaptor.getValue(); + assertThat(picker1.dropList).containsExactly(null, null); + assertThat(picker1.pickList).containsExactly(bufferEntry); + } + + private Subchannel getOneSubchannel(RoundRobinPicker picker0) { + // Only one subchannel is created + assertThat(mockSubchannels).hasSize(1); + Subchannel subchannel = mockSubchannels.poll(); + assertThat(picker0.dropList).containsExactly(null, null); + assertThat(picker0.pickList).containsExactly(new IdleSubchannelEntry(subchannel, syncContext)); + return subchannel; + } + + private void simulateReceivingLbResponse(InOrder inOrder, + StreamObserver lbResponseObserver) { + // Simulate receiving LB response + inOrder.verify(helper, never()) + .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class)); + lbResponseObserver.onNext(buildInitialResponse()); + lbResponseObserver.onNext(buildLbResponse(backends1)); + + inOrder.verify(helper).createSubchannel(createSubchannelArgsCaptor.capture()); + CreateSubchannelArgs createSubchannelArgs = createSubchannelArgsCaptor.getValue(); + assertThat(createSubchannelArgs.getAddresses()) + .containsExactly( + new EquivalentAddressGroup(backends1.get(0).addr, eagAttrsWithToken("token0001")), + new EquivalentAddressGroup(backends1.get(1).addr, eagAttrsWithToken("token0002"))); + } + + private List simulateReceivingLbResponse(InOrder inOrder, + StreamObserver lbResponseObserver, ServerEntry backend1a, + ServerEntry backend1b) { + List backends1 = Arrays.asList(backend1a, backend1b); + inOrder.verify(helper, never()) + .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class)); + logs.clear(); + lbResponseObserver.onNext(buildInitialResponse()); + assertThat(logs).containsExactly( + "INFO: [grpclb-] Got an LB initial response: " + + buildInitialResponse()); + logs.clear(); + lbResponseObserver.onNext(buildLbResponse(backends1)); + + inOrder.verify(subchannelPool).takeOrCreateSubchannel( + eq(new EquivalentAddressGroup(backend1a.addr, LB_BACKEND_ATTRS)), + any(Attributes.class)); + inOrder.verify(subchannelPool).takeOrCreateSubchannel( + eq(new EquivalentAddressGroup(backend1b.addr, LB_BACKEND_ATTRS)), + any(Attributes.class)); + return backends1; + } + @Test public void shutdownWithoutSubchannel_roundRobin() throws Exception { subtestShutdownWithoutSubchannel(GrpclbConfig.create(Mode.ROUND_ROBIN)); @@ -2167,8 +2179,8 @@ private void pickFirstModeFallback(long timeout) throws Exception { // Name resolver returns balancer and backend addresses List backendList = createResolvedBackendAddresses(2); List grpclbBalancerList = createResolvedBalancerAddresses(1); - deliverResolvedAddresses( - backendList, grpclbBalancerList, GrpclbConfig.create(Mode.PICK_FIRST, null, timeout)); + deliverResolvedAddresses(backendList, grpclbBalancerList, GrpclbConfig.create(Mode.PICK_FIRST, + null, timeout)); // Attempted to connect to balancer assertEquals(1, fakeOobChannels.size()); @@ -2193,11 +2205,7 @@ private void pickFirstModeFallback(long timeout) throws Exception { RoundRobinPicker picker0 = (RoundRobinPicker) pickerCaptor.getValue(); // READY - deliverSubchannelState(subchannel, ConnectivityStateInfo.forNonError(READY)); - inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture()); - RoundRobinPicker picker1 = (RoundRobinPicker) pickerCaptor.getValue(); - assertThat(picker1.dropList).containsExactly(null, null); - assertThat(picker1.pickList).containsExactly( + connectingBalanceState(subchannel, ConnectivityStateInfo.forNonError(READY), inOrder, READY, new BackendEntry(subchannel, new TokenAttachingTracerFactory(null))); assertThat(picker0.dropList).containsExactly(null, null); @@ -2205,9 +2213,6 @@ private void pickFirstModeFallback(long timeout) throws Exception { // Finally, an LB response, which brings us out of fallback - List backends1 = Arrays.asList( - new ServerEntry("127.0.0.1", 2000, "token0001"), - new ServerEntry("127.0.0.1", 2010, "token0002")); inOrder.verify(helper, never()) .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class)); lbResponseObserver.onNext(buildInitialResponse()); @@ -2245,46 +2250,22 @@ public void switchMode() throws Exception { grpclbBalancerList, GrpclbConfig.create(Mode.ROUND_ROBIN)); - assertEquals(1, fakeOobChannels.size()); - ManagedChannel oobChannel = fakeOobChannels.poll(); - verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); - StreamObserver lbResponseObserver = lbResponseObserverCaptor.getValue(); - assertEquals(1, lbRequestObservers.size()); - StreamObserver lbRequestObserver = lbRequestObservers.poll(); - verify(lbRequestObserver).onNext( - eq(LoadBalanceRequest.newBuilder().setInitialRequest( - InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) - .build())); + initializeChannel(); // Simulate receiving LB response - List backends1 = Arrays.asList( - new ServerEntry("127.0.0.1", 2000, "token0001"), - new ServerEntry("127.0.0.1", 2010, "token0002")); inOrder.verify(helper, never()) .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class)); lbResponseObserver.onNext(buildInitialResponse()); lbResponseObserver.onNext(buildLbResponse(backends1)); // ROUND_ROBIN: create one subchannel per server - verify(subchannelPool).takeOrCreateSubchannel( - eq(new EquivalentAddressGroup(backends1.get(0).addr, LB_BACKEND_ATTRS)), - any(Attributes.class)); - verify(subchannelPool).takeOrCreateSubchannel( - eq(new EquivalentAddressGroup(backends1.get(1).addr, LB_BACKEND_ATTRS)), - any(Attributes.class)); - inOrder.verify(helper).updateBalancingState(eq(CONNECTING), any(SubchannelPicker.class)); - assertEquals(2, mockSubchannels.size()); - Subchannel subchannel1 = mockSubchannels.poll(); - Subchannel subchannel2 = mockSubchannels.poll(); - verify(subchannelPool, never()) - .returnSubchannel(any(Subchannel.class), any(ConnectivityStateInfo.class)); + roundRobinCreateOneSubchannel(inOrder, backends1); // Switch to PICK_FIRST deliverResolvedAddresses( Collections.emptyList(), grpclbBalancerList, GrpclbConfig.create(Mode.PICK_FIRST)); - // GrpclbState will be shutdown, and a new one will be created assertThat(oobChannel.isShutdown()).isTrue(); verify(subchannelPool) @@ -2294,13 +2275,14 @@ public void switchMode() throws Exception { // A new LB stream is created assertEquals(1, fakeOobChannels.size()); - verify(mockLbService, times(2)).balanceLoad(lbResponseObserverCaptor.capture()); + verify(mockLbService, times(2)) + .balanceLoad(lbResponseObserverCaptor.capture()); lbResponseObserver = lbResponseObserverCaptor.getValue(); assertEquals(1, lbRequestObservers.size()); lbRequestObserver = lbRequestObservers.poll(); verify(lbRequestObserver).onNext( eq(LoadBalanceRequest.newBuilder().setInitialRequest( - InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) + InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) .build())); // Simulate receiving LB response @@ -2310,6 +2292,10 @@ public void switchMode() throws Exception { lbResponseObserver.onNext(buildLbResponse(backends1)); // PICK_FIRST Subchannel + pickFirstSubchannel(inOrder, backends1); + } + + private void pickFirstSubchannel(InOrder inOrder, List backends1) { inOrder.verify(helper).createSubchannel(createSubchannelArgsCaptor.capture()); CreateSubchannelArgs createSubchannelArgs = createSubchannelArgsCaptor.getValue(); assertThat(createSubchannelArgs.getAddresses()) @@ -2320,6 +2306,34 @@ public void switchMode() throws Exception { inOrder.verify(helper).updateBalancingState(eq(IDLE), any(SubchannelPicker.class)); } + private void roundRobinCreateOneSubchannel(InOrder inOrder, List backends1) { + verify(subchannelPool).takeOrCreateSubchannel( + eq(new EquivalentAddressGroup(backends1.get(0).addr, LB_BACKEND_ATTRS)), + any(Attributes.class)); + verify(subchannelPool).takeOrCreateSubchannel( + eq(new EquivalentAddressGroup(backends1.get(1).addr, LB_BACKEND_ATTRS)), + any(Attributes.class)); + inOrder.verify(helper).updateBalancingState(eq(CONNECTING), any(SubchannelPicker.class)); + assertEquals(2, mockSubchannels.size()); + subchannel1 = mockSubchannels.poll(); + subchannel2 = mockSubchannels.poll(); + verify(subchannelPool, never()) + .returnSubchannel(any(Subchannel.class), any(ConnectivityStateInfo.class)); + } + + private void initializeChannel() { + assertEquals(1, fakeOobChannels.size()); + oobChannel = fakeOobChannels.poll(); + verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); + lbResponseObserver = lbResponseObserverCaptor.getValue(); + assertEquals(1, lbRequestObservers.size()); + lbRequestObserver = lbRequestObservers.poll(); + verify(lbRequestObserver).onNext( + eq(LoadBalanceRequest.newBuilder().setInitialRequest( + InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) + .build())); + } + private static Attributes eagAttrsWithToken(String token) { return LB_BACKEND_ATTRS.toBuilder().set(GrpclbConstants.TOKEN_ATTRIBUTE_KEY, token).build(); } @@ -2333,39 +2347,16 @@ public void switchMode_nullLbPolicy() throws Exception { Collections.emptyList(), grpclbBalancerList); - assertEquals(1, fakeOobChannels.size()); - ManagedChannel oobChannel = fakeOobChannels.poll(); - verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); - StreamObserver lbResponseObserver = lbResponseObserverCaptor.getValue(); - assertEquals(1, lbRequestObservers.size()); - StreamObserver lbRequestObserver = lbRequestObservers.poll(); - verify(lbRequestObserver).onNext( - eq(LoadBalanceRequest.newBuilder().setInitialRequest( - InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) - .build())); + initializeChannel(); // Simulate receiving LB response - List backends1 = Arrays.asList( - new ServerEntry("127.0.0.1", 2000, "token0001"), - new ServerEntry("127.0.0.1", 2010, "token0002")); inOrder.verify(helper, never()) .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class)); lbResponseObserver.onNext(buildInitialResponse()); lbResponseObserver.onNext(buildLbResponse(backends1)); // ROUND_ROBIN: create one subchannel per server - verify(subchannelPool).takeOrCreateSubchannel( - eq(new EquivalentAddressGroup(backends1.get(0).addr, LB_BACKEND_ATTRS)), - any(Attributes.class)); - verify(subchannelPool).takeOrCreateSubchannel( - eq(new EquivalentAddressGroup(backends1.get(1).addr, LB_BACKEND_ATTRS)), - any(Attributes.class)); - inOrder.verify(helper).updateBalancingState(eq(CONNECTING), any(SubchannelPicker.class)); - assertEquals(2, mockSubchannels.size()); - Subchannel subchannel1 = mockSubchannels.poll(); - Subchannel subchannel2 = mockSubchannels.poll(); - verify(subchannelPool, never()) - .returnSubchannel(any(Subchannel.class), any(ConnectivityStateInfo.class)); + roundRobinCreateOneSubchannel(inOrder, backends1); // Switch to PICK_FIRST deliverResolvedAddresses( @@ -2382,13 +2373,14 @@ public void switchMode_nullLbPolicy() throws Exception { // A new LB stream is created assertEquals(1, fakeOobChannels.size()); - verify(mockLbService, times(2)).balanceLoad(lbResponseObserverCaptor.capture()); + verify(mockLbService, times(2)) + .balanceLoad(lbResponseObserverCaptor.capture()); lbResponseObserver = lbResponseObserverCaptor.getValue(); assertEquals(1, lbRequestObservers.size()); lbRequestObserver = lbRequestObservers.poll(); verify(lbRequestObserver).onNext( eq(LoadBalanceRequest.newBuilder().setInitialRequest( - InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) + InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) .build())); // Simulate receiving LB response @@ -2398,14 +2390,7 @@ public void switchMode_nullLbPolicy() throws Exception { lbResponseObserver.onNext(buildLbResponse(backends1)); // PICK_FIRST Subchannel - inOrder.verify(helper).createSubchannel(createSubchannelArgsCaptor.capture()); - CreateSubchannelArgs createSubchannelArgs = createSubchannelArgsCaptor.getValue(); - assertThat(createSubchannelArgs.getAddresses()) - .containsExactly( - new EquivalentAddressGroup(backends1.get(0).addr, eagAttrsWithToken("token0001")), - new EquivalentAddressGroup(backends1.get(1).addr, eagAttrsWithToken("token0002"))); - - inOrder.verify(helper).updateBalancingState(eq(IDLE), any(SubchannelPicker.class)); + pickFirstSubchannel(inOrder, backends1); } @Test @@ -2428,31 +2413,17 @@ public void switchServiceName() throws Exception { StreamObserver lbRequestObserver = lbRequestObservers.poll(); verify(lbRequestObserver).onNext( eq(LoadBalanceRequest.newBuilder().setInitialRequest( - InitialLoadBalanceRequest.newBuilder().setName(serviceName).build()) + InitialLoadBalanceRequest.newBuilder().setName(serviceName).build()) .build())); // Simulate receiving LB response - List backends1 = Arrays.asList( - new ServerEntry("127.0.0.1", 2000, "token0001"), - new ServerEntry("127.0.0.1", 2010, "token0002")); inOrder.verify(helper, never()) .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class)); lbResponseObserver.onNext(buildInitialResponse()); lbResponseObserver.onNext(buildLbResponse(backends1)); // ROUND_ROBIN: create one subchannel per server - verify(subchannelPool).takeOrCreateSubchannel( - eq(new EquivalentAddressGroup(backends1.get(0).addr, LB_BACKEND_ATTRS)), - any(Attributes.class)); - verify(subchannelPool).takeOrCreateSubchannel( - eq(new EquivalentAddressGroup(backends1.get(1).addr, LB_BACKEND_ATTRS)), - any(Attributes.class)); - inOrder.verify(helper).updateBalancingState(eq(CONNECTING), any(SubchannelPicker.class)); - assertEquals(2, mockSubchannels.size()); - Subchannel subchannel1 = mockSubchannels.poll(); - Subchannel subchannel2 = mockSubchannels.poll(); - verify(subchannelPool, never()) - .returnSubchannel(any(Subchannel.class), any(ConnectivityStateInfo.class)); + roundRobinCreateOneSubchannel(inOrder, backends1); // Switch to different serviceName serviceName = "bar.google.com"; @@ -2470,15 +2441,18 @@ public void switchServiceName() throws Exception { .returnSubchannel(same(subchannel2), eq(ConnectivityStateInfo.forNonError(IDLE))); assertEquals(1, fakeOobChannels.size()); - verify(mockLbService, times(2)).balanceLoad(lbResponseObserverCaptor.capture()); + verify(mockLbService, times(2)) + .balanceLoad(lbResponseObserverCaptor.capture()); assertEquals(1, lbRequestObservers.size()); lbRequestObserver = lbRequestObservers.poll(); verify(lbRequestObserver).onNext( eq(LoadBalanceRequest.newBuilder().setInitialRequest( - InitialLoadBalanceRequest.newBuilder().setName(serviceName).build()) + InitialLoadBalanceRequest.newBuilder().setName(serviceName).build()) .build())); } + ManagedChannel oobChannel; + @Test public void grpclbWorking_lbSendsFallbackMessage() { InOrder inOrder = inOrder(helper, subchannelPool); @@ -2487,88 +2461,27 @@ public void grpclbWorking_lbSendsFallbackMessage() { deliverResolvedAddresses(backendList, grpclbBalancerList); // Fallback timer is started as soon as the addresses are resolved. - assertEquals(1, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER)); - verify(helper).createOobChannel(eq(xattr(grpclbBalancerList)), - eq(lbAuthority(0) + NO_USE_AUTHORITY_SUFFIX)); - assertEquals(1, fakeOobChannels.size()); - ManagedChannel oobChannel = fakeOobChannels.poll(); - verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); - StreamObserver lbResponseObserver = lbResponseObserverCaptor.getValue(); - assertEquals(1, lbRequestObservers.size()); - StreamObserver lbRequestObserver = lbRequestObservers.poll(); - verify(lbRequestObserver).onNext( - eq(LoadBalanceRequest.newBuilder() - .setInitialRequest( - InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) - .build())); + initializeSubchannel(grpclbBalancerList); // Simulate receiving LB response ServerEntry backend1a = new ServerEntry("127.0.0.1", 2000, "token0001"); ServerEntry backend1b = new ServerEntry("127.0.0.1", 2010, "token0002"); - List backends1 = Arrays.asList(backend1a, backend1b); - inOrder.verify(helper, never()) - .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class)); - logs.clear(); - lbResponseObserver.onNext(buildInitialResponse()); - assertThat(logs).containsExactly( - "INFO: [grpclb-] Got an LB initial response: " + buildInitialResponse()); - logs.clear(); - lbResponseObserver.onNext(buildLbResponse(backends1)); - - inOrder.verify(subchannelPool).takeOrCreateSubchannel( - eq(new EquivalentAddressGroup(backend1a.addr, LB_BACKEND_ATTRS)), - any(Attributes.class)); - inOrder.verify(subchannelPool).takeOrCreateSubchannel( - eq(new EquivalentAddressGroup(backend1b.addr, LB_BACKEND_ATTRS)), - any(Attributes.class)); + List backends1 = simulateReceivingLbResponse( + inOrder, lbResponseObserver, backend1a, backend1b); assertEquals(2, mockSubchannels.size()); Subchannel subchannel1 = mockSubchannels.poll(); Subchannel subchannel2 = mockSubchannels.poll(); - verify(subchannel1).requestConnection(); - verify(subchannel2).requestConnection(); - assertEquals( - new EquivalentAddressGroup(backend1a.addr, LB_BACKEND_ATTRS), - subchannel1.getAddresses()); - assertEquals( - new EquivalentAddressGroup(backend1b.addr, LB_BACKEND_ATTRS), - subchannel2.getAddresses()); + verifySubchannelRequestConnection(backend1a, backend1b, subchannel1, subchannel2); deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(CONNECTING)); - deliverSubchannelState(subchannel2, ConnectivityStateInfo.forNonError(CONNECTING)); - - inOrder.verify(helper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); - RoundRobinPicker picker0 = (RoundRobinPicker) pickerCaptor.getValue(); - assertThat(picker0.dropList).containsExactly(null, null); - assertThat(picker0.pickList).containsExactly(BUFFER_ENTRY); - inOrder.verifyNoMoreInteractions(); - - assertThat(logs) - .containsExactly( - "DEBUG: [grpclb-] Got an LB response: " + buildLbResponse(backends1)) - .inOrder(); - logs.clear(); + connectingBalanceState(subchannel2, ConnectivityStateInfo.forNonError(CONNECTING), inOrder, + CONNECTING, BUFFER_ENTRY); + inOrderVerifySubchannel(inOrder, backends1); // Let subchannels be connected - deliverSubchannelState(subchannel2, ConnectivityStateInfo.forNonError(READY)); - inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture()); - - RoundRobinPicker picker1 = (RoundRobinPicker) pickerCaptor.getValue(); - - assertThat(picker1.dropList).containsExactly(null, null); - assertThat(picker1.pickList).containsExactly( - new BackendEntry(subchannel2, getLoadRecorder(), "token0002")); - - deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(READY)); - inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture()); - - RoundRobinPicker picker2 = (RoundRobinPicker) pickerCaptor.getValue(); - assertThat(picker2.dropList).containsExactly(null, null); - assertThat(picker2.pickList).containsExactly( - new BackendEntry(subchannel1, getLoadRecorder(), "token0001"), - new BackendEntry(subchannel2, getLoadRecorder(), "token0002")) - .inOrder(); + connectSubchannels(inOrder, subchannel1, subchannel2); // Balancer forces entering fallback mode lbResponseObserver.onNext(buildLbFallbackResponse()); @@ -2595,66 +2508,78 @@ public void grpclbWorking_lbSendsFallbackMessage() { verify(subchannelPool).returnSubchannel(eq(subchannel), any(ConnectivityStateInfo.class)); } - // RPC error status includes message of fallback requested by balancer - inOrder.verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture()); - PickResult result = pickerCaptor.getValue().pickSubchannel(mock(PickSubchannelArgs.class)); - assertThat(result.getStatus().getCode()).isEqualTo(Code.UNAVAILABLE); - assertThat(result.getStatus().getDescription()) - .startsWith(GrpclbState.NO_FALLBACK_BACKENDS_STATUS.getDescription()); - assertThat(result.getStatus().getDescription()) - .contains(GrpclbState.BALANCER_REQUESTED_FALLBACK_STATUS.getDescription()); + balancerRequestFallbackErrorStatus(inOrder, GrpclbState.BALANCER_REQUESTED_FALLBACK_STATUS); // exit fall back by providing two new backends ServerEntry backend2a = new ServerEntry("127.0.0.1", 8000, "token1001"); ServerEntry backend2b = new ServerEntry("127.0.0.1", 8010, "token1002"); List backends2 = Arrays.asList(backend2a, backend2b); - inOrder.verify(helper, never()) - .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class)); - logs.clear(); - lbResponseObserver.onNext(buildLbResponse(backends2)); - - inOrder.verify(subchannelPool).takeOrCreateSubchannel( - eq(new EquivalentAddressGroup(backend2a.addr, LB_BACKEND_ATTRS)), - any(Attributes.class)); - inOrder.verify(subchannelPool).takeOrCreateSubchannel( - eq(new EquivalentAddressGroup(backend2b.addr, LB_BACKEND_ATTRS)), - any(Attributes.class)); + exitFallbackWithNewBackends(inOrder, lbResponseObserver, backend2a, backend2b, backends2); assertEquals(2, mockSubchannels.size()); Subchannel subchannel3 = mockSubchannels.poll(); Subchannel subchannel4 = mockSubchannels.poll(); - verify(subchannel3).requestConnection(); - verify(subchannel4).requestConnection(); - assertEquals( - new EquivalentAddressGroup(backend2a.addr, LB_BACKEND_ATTRS), - subchannel3.getAddresses()); - assertEquals( - new EquivalentAddressGroup(backend2b.addr, LB_BACKEND_ATTRS), - subchannel4.getAddresses()); + verifySubchannelRequestConnection(backend2a, backend2b, subchannel3, subchannel4); deliverSubchannelState(subchannel3, ConnectivityStateInfo.forNonError(CONNECTING)); - deliverSubchannelState(subchannel4, ConnectivityStateInfo.forNonError(CONNECTING)); + connectingBalanceState(subchannel4, ConnectivityStateInfo.forNonError(CONNECTING), inOrder, + CONNECTING, BUFFER_ENTRY); + inOrderVerifySubchannel(inOrder, backends2); - inOrder.verify(helper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); - RoundRobinPicker picker6 = (RoundRobinPicker) pickerCaptor.getValue(); - assertThat(picker6.dropList).containsExactly(null, null); - assertThat(picker6.pickList).containsExactly(BUFFER_ENTRY); + // Let new subchannels be connected + connectNewSubchannels(inOrder, subchannel3, subchannel4); + } + + private void inOrderVerifySubchannel(InOrder inOrder, List backends1) { inOrder.verifyNoMoreInteractions(); assertThat(logs) .containsExactly( - "DEBUG: [grpclb-] Got an LB response: " + buildLbResponse(backends2)) + "DEBUG: [grpclb-] Got an LB response: " + + buildLbResponse(backends1)) .inOrder(); logs.clear(); + } - // Let new subchannels be connected - deliverSubchannelState(subchannel3, ConnectivityStateInfo.forNonError(READY)); - inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture()); + private void initializeSubchannel(List grpclbBalancerList) { + assertEquals(1, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER)); + verify(helper).createOobChannel(eq(xattr(grpclbBalancerList)), + eq(lbAuthority(0) + NO_USE_AUTHORITY_SUFFIX)); + initializeChannel(); + } - RoundRobinPicker picker3 = (RoundRobinPicker) pickerCaptor.getValue(); - assertThat(picker3.dropList).containsExactly(null, null); - assertThat(picker3.pickList).containsExactly( + private static void verifySubchannelRequestConnection(ServerEntry backend1a, + ServerEntry backend1b, Subchannel subchannel1, Subchannel subchannel2) { + verify(subchannel1).requestConnection(); + verify(subchannel2).requestConnection(); + assertEquals( + new EquivalentAddressGroup(backend1a.addr, LB_BACKEND_ATTRS), + subchannel1.getAddresses()); + assertEquals( + new EquivalentAddressGroup(backend1b.addr, LB_BACKEND_ATTRS), + subchannel2.getAddresses()); + } + + private void exitFallbackWithNewBackends(InOrder inOrder, + StreamObserver lbResponseObserver, + ServerEntry backend2a, ServerEntry backend2b, List backends2) { + inOrder.verify(helper, never()) + .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class)); + logs.clear(); + lbResponseObserver.onNext(buildLbResponse(backends2)); + + inOrder.verify(subchannelPool).takeOrCreateSubchannel( + eq(new EquivalentAddressGroup(backend2a.addr, LB_BACKEND_ATTRS)), + any(Attributes.class)); + inOrder.verify(subchannelPool).takeOrCreateSubchannel( + eq(new EquivalentAddressGroup(backend2b.addr, LB_BACKEND_ATTRS)), + any(Attributes.class)); + } + + private void connectNewSubchannels(InOrder inOrder, Subchannel subchannel3, + Subchannel subchannel4) { + connectingBalanceState(subchannel3, ConnectivityStateInfo.forNonError(READY), inOrder, READY, new BackendEntry(subchannel3, getLoadRecorder(), "token1001")); deliverSubchannelState(subchannel4, ConnectivityStateInfo.forNonError(READY)); @@ -2663,8 +2588,37 @@ public void grpclbWorking_lbSendsFallbackMessage() { RoundRobinPicker picker4 = (RoundRobinPicker) pickerCaptor.getValue(); assertThat(picker4.dropList).containsExactly(null, null); assertThat(picker4.pickList).containsExactly( - new BackendEntry(subchannel3, getLoadRecorder(), "token1001"), - new BackendEntry(subchannel4, getLoadRecorder(), "token1002")) + new BackendEntry(subchannel3, getLoadRecorder(), "token1001"), + new BackendEntry(subchannel4, getLoadRecorder(), "token1002")) + .inOrder(); + } + + private void balancerRequestFallbackErrorStatus(InOrder inOrder, + Status balancerRequestedFallbackStatus) { + // RPC error status includes message of fallback requested by balancer + inOrder.verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture()); + PickResult result = pickerCaptor.getValue().pickSubchannel(mock(PickSubchannelArgs.class)); + assertThat(result.getStatus().getCode()).isEqualTo(Code.UNAVAILABLE); + assertThat(result.getStatus().getDescription()) + .startsWith(GrpclbState.NO_FALLBACK_BACKENDS_STATUS.getDescription()); + assertThat(result.getStatus().getDescription()) + .contains(balancerRequestedFallbackStatus.getDescription()); + } + + private void connectSubchannels(InOrder inOrder, Subchannel subchannel1, + Subchannel subchannel2) { + // Let subchannels be connected + connectingBalanceState(subchannel2, ConnectivityStateInfo.forNonError(READY), inOrder, READY, + new BackendEntry(subchannel2, getLoadRecorder(), "token0002")); + + deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(READY)); + inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture()); + + RoundRobinPicker picker2 = (RoundRobinPicker) pickerCaptor.getValue(); + assertThat(picker2.dropList).containsExactly(null, null); + assertThat(picker2.pickList).containsExactly( + new BackendEntry(subchannel1, getLoadRecorder(), "token0001"), + new BackendEntry(subchannel2, getLoadRecorder(), "token0002")) .inOrder(); }