diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index baf4e2963b6..073b7333b79 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -31,7 +31,7 @@ google-findbugs = "3.0.2" google-java-allocation-instrumenter = "3.3.4" groovy = "3.0.22" # Only bump this in concert with boringssl -grpc = "1.58.0" +grpc = "1.65.1" guava = "33.3.1-jre" gwt = "2.11.0" # used by GwtTools @@ -176,6 +176,7 @@ grpc-protobuf = { module = "io.grpc:grpc-protobuf" } grpc-services = { module = "io.grpc:grpc-services" } grpc-stub = { module = "io.grpc:grpc-services" } grpc-testing = { module = "io.grpc:grpc-testing" } +grpc-inprocess = { module = "io.grpc:grpc-inprocess" } grpc-util = { module = "io.grpc:grpc-util" } guava = { module = "com.google.guava:guava", version.ref = "guava" } diff --git a/grpc-java/grpc-servlet-jakarta/src/main/java/io/grpc/servlet/jakarta/AsyncServletOutputStreamWriter.java b/grpc-java/grpc-servlet-jakarta/src/main/java/io/grpc/servlet/jakarta/AsyncServletOutputStreamWriter.java index 9a819c2426e..8b2c1da5412 100644 --- a/grpc-java/grpc-servlet-jakarta/src/main/java/io/grpc/servlet/jakarta/AsyncServletOutputStreamWriter.java +++ b/grpc-java/grpc-servlet-jakarta/src/main/java/io/grpc/servlet/jakarta/AsyncServletOutputStreamWriter.java @@ -13,6 +13,7 @@ package io.grpc.servlet.jakarta; +import com.google.common.annotations.VisibleForTesting; import io.grpc.InternalLogId; import io.grpc.servlet.jakarta.ServletServerStream.ServletTransportState; import jakarta.servlet.AsyncContext; @@ -26,6 +27,8 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.LockSupport; +import java.util.function.BiFunction; +import java.util.function.BooleanSupplier; import java.util.logging.Logger; import static com.google.common.base.Preconditions.checkState; @@ -36,9 +39,6 @@ /** Handles write actions from the container thread and the application thread. */ final class AsyncServletOutputStreamWriter { - private static final Logger logger = - Logger.getLogger(AsyncServletOutputStreamWriter.class.getName()); - /** * Memory boundary for write actions. * @@ -61,11 +61,11 @@ final class AsyncServletOutputStreamWriter { */ private final AtomicReference writeState = new AtomicReference<>(WriteState.DEFAULT); - private final ServletOutputStream outputStream; - private final ServletTransportState transportState; - private final InternalLogId logId; + private final Log log; + private final BiFunction writeAction; private final ActionItem flushAction; private final ActionItem completeAction; + private final BooleanSupplier isReady; /** * New write actions will be buffered into this queue if the servlet output stream is not ready or the queue is not @@ -82,38 +82,75 @@ final class AsyncServletOutputStreamWriter { AsyncContext asyncContext, ServletTransportState transportState, InternalLogId logId) throws IOException { - this.outputStream = asyncContext.getResponse().getOutputStream(); - this.transportState = transportState; - this.logId = logId; + Logger logger = Logger.getLogger(AsyncServletOutputStreamWriter.class.getName()); + this.log = new Log() { + @Override + public void fine(String str, Object... params) { + if (logger.isLoggable(FINE)) { + logger.log(FINE, "[" + logId + "]" + str, params); + } + } + + @Override + public void finest(String str, Object... params) { + if (logger.isLoggable(FINEST)) { + logger.log(FINEST, "[" + logId + "] " + str, params); + } + } + + @Override + public boolean isFinestEnabled() { + return logger.isLoggable(FINEST); + } + }; + + ServletOutputStream outputStream = asyncContext.getResponse().getOutputStream(); + this.writeAction = (byte[] bytes, Integer numBytes) -> () -> { + outputStream.write(bytes, 0, numBytes); + transportState.runOnTransportThread(() -> transportState.onSentBytes(numBytes)); + if (log.isFinestEnabled()) { + log.finest("outbound data: length={0}, bytes={1}", numBytes, toHexString(bytes, numBytes)); + } + }; this.flushAction = () -> { - logger.log(FINEST, "[{0}] flushBuffer", logId); + log.finest("flushBuffer"); asyncContext.getResponse().flushBuffer(); }; this.completeAction = () -> { - logger.log(FINE, "[{0}] call is completing", logId); + log.fine("call is completing"); transportState.runOnTransportThread( () -> { transportState.complete(); asyncContext.complete(); - logger.log(FINE, "[{0}] call completed", logId); + log.fine("call completed"); }); }; + this.isReady = () -> outputStream.isReady(); + } + + /** + * Constructor without java.util.logging and jakarta.servlet.* dependency, so that Lincheck can run. + * + * @param writeAction Provides an {@link ActionItem} to write given bytes with specified length. + * @param isReady Indicates whether the writer can write bytes at the moment (asynchronously). + */ + @VisibleForTesting + AsyncServletOutputStreamWriter( + BiFunction writeAction, + ActionItem flushAction, + ActionItem completeAction, + BooleanSupplier isReady, + Log log) { + this.writeAction = writeAction; + this.flushAction = flushAction; + this.completeAction = completeAction; + this.isReady = isReady; + this.log = log; } /** Called from application thread. */ void writeBytes(byte[] bytes, int numBytes) throws IOException { - runOrBuffer( - // write bytes action - () -> { - outputStream.write(bytes, 0, numBytes); - transportState.runOnTransportThread(() -> transportState.onSentBytes(numBytes)); - if (logger.isLoggable(FINEST)) { - logger.log( - FINEST, - "[{0}] outbound data: length = {1}, bytes = {2}", - new Object[] {logId, numBytes, toHexString(bytes, numBytes)}); - } - }); + runOrBuffer(writeAction.apply(bytes, numBytes)); } /** Called from application thread. */ @@ -132,10 +169,9 @@ void complete() { /** Called from the container thread {@link jakarta.servlet.WriteListener#onWritePossible()}. */ void onWritePossible() throws IOException { - logger.log( - FINEST, "[{0}] onWritePossible: ENTRY. The servlet output stream becomes ready", logId); + log.finest("onWritePossible: ENTRY. The servlet output stream becomes ready"); assureReadyAndDrainedTurnsFalse(); - while (outputStream.isReady()) { + while (isReady.getAsBoolean()) { WriteState curState = writeState.get(); ActionItem actionItem = writeChain.poll(); @@ -146,18 +182,15 @@ void onWritePossible() throws IOException { if (writeState.compareAndSet(curState, curState.withReadyAndDrained(true))) { // state has not changed since. - logger.log( - FINEST, - "[{0}] onWritePossible: EXIT. All data available now is sent out and the servlet output" - + " stream is still ready", - logId); + log.finest( + "onWritePossible: EXIT. All data available now is sent out and the servlet output" + + " stream is still ready"); return; } // else, state changed by another thread (runOrBuffer()), need to drain the writeChain // again } - logger.log( - FINEST, "[{0}] onWritePossible: EXIT. The servlet output stream becomes not ready", logId); + log.finest("onWritePossible: EXIT. The servlet output stream becomes not ready"); } private void assureReadyAndDrainedTurnsFalse() { @@ -166,6 +199,8 @@ private void assureReadyAndDrainedTurnsFalse() { // being set to false by runOrBuffer() concurrently. while (writeState.get().readyAndDrained) { parkingThread = Thread.currentThread(); + // Try to sleep for an extremely long time to avoid writeState being changed at exactly + // the time when sleep time expires (in extreme scenario, such as #9917). LockSupport.parkNanos(Duration.ofHours(1).toNanos()); // should return immediately } parkingThread = null; @@ -184,12 +219,12 @@ private void runOrBuffer(ActionItem actionItem) throws IOException { if (actionItem == completeAction) { return; } - if (!outputStream.isReady()) { + if (!isReady.getAsBoolean()) { boolean successful = writeState.compareAndSet(curState, curState.withReadyAndDrained(false)); LockSupport.unpark(parkingThread); checkState(successful, "Bug: curState is unexpectedly changed by another thread"); - logger.log(FINEST, "[{0}] the servlet output stream becomes not ready", logId); + log.finest("the servlet output stream becomes not ready"); } } else { // buffer to the writeChain writeChain.offer(actionItem); @@ -208,10 +243,22 @@ private void runOrBuffer(ActionItem actionItem) throws IOException { /** Write actions, e.g. writeBytes, flush, complete. */ @FunctionalInterface - private interface ActionItem { + @VisibleForTesting + interface ActionItem { void run() throws IOException; } + @VisibleForTesting // Lincheck test can not run with java.util.logging dependency. + interface Log { + default void fine(String str, Object...params) {} + + default void finest(String str, Object...params) {} + + default boolean isFinestEnabled() { + return false; + } + } + private static final class WriteState { static final WriteState DEFAULT = new WriteState(false); diff --git a/grpc-java/grpc-servlet-jakarta/src/main/java/io/grpc/servlet/jakarta/GrpcServlet.java b/grpc-java/grpc-servlet-jakarta/src/main/java/io/grpc/servlet/jakarta/GrpcServlet.java index 89b518112a9..933c1dcf4b4 100644 --- a/grpc-java/grpc-servlet-jakarta/src/main/java/io/grpc/servlet/jakarta/GrpcServlet.java +++ b/grpc-java/grpc-servlet-jakarta/src/main/java/io/grpc/servlet/jakarta/GrpcServlet.java @@ -13,7 +13,6 @@ package io.grpc.servlet.jakarta; -import com.google.common.annotations.VisibleForTesting; import io.grpc.Attributes; import io.grpc.BindableService; import io.grpc.ExperimentalApi; @@ -44,7 +43,6 @@ public class GrpcServlet extends HttpServlet { private final ServletAdapter servletAdapter; - @VisibleForTesting GrpcServlet(ServletAdapter servletAdapter) { this.servletAdapter = servletAdapter; } diff --git a/grpc-java/grpc-servlet-jakarta/src/main/java/io/grpc/servlet/jakarta/ServletAdapter.java b/grpc-java/grpc-servlet-jakarta/src/main/java/io/grpc/servlet/jakarta/ServletAdapter.java index 3fb2655b43b..5be92f0989e 100644 --- a/grpc-java/grpc-servlet-jakarta/src/main/java/io/grpc/servlet/jakarta/ServletAdapter.java +++ b/grpc-java/grpc-servlet-jakarta/src/main/java/io/grpc/servlet/jakarta/ServletAdapter.java @@ -45,7 +45,6 @@ import java.util.Enumeration; import java.util.List; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.logging.Logger; import static com.google.common.base.Preconditions.checkArgument; @@ -116,7 +115,7 @@ public T otherAdapter(AdapterConstructor constructor) { * {@code resp.setBufferSize()} before invocation is allowed. */ public void doGet(HttpServletRequest req, HttpServletResponse resp) throws IOException { - // TODO(zdapeng) + resp.sendError(HttpServletResponse.SC_METHOD_NOT_ALLOWED, "GET method not supported"); } /** @@ -172,10 +171,8 @@ public void doPost(HttpServletRequest req, HttpServletResponse resp) throws IOEx getAuthority(req), logId); - stream.transportState().runOnTransportThread(() -> { - transportListener.streamCreated(stream, method, headers); - stream.transportState().onStreamAllocated(); - }); + transportListener.streamCreated(stream, method, headers); + stream.transportState().runOnTransportThread(stream.transportState()::onStreamAllocated); asyncCtx.getRequest().getInputStream() .setReadListener(new GrpcReadListener(stream, asyncCtx, logId)); @@ -217,7 +214,7 @@ private static String getAuthority(HttpServletRequest req) { try { return new URI(req.getRequestURL().toString()).getAuthority(); } catch (URISyntaxException e) { - logger.log(FINE, "Error getting authority from the request URL {0}" + req.getRequestURL()); + logger.log(FINE, "Error getting authority from the request URL {0}", req.getRequestURL()); return req.getServerName() + ":" + req.getServerPort(); } } @@ -282,7 +279,6 @@ private static final class GrpcReadListener implements ReadListener { final AsyncContext asyncCtx; final ServletInputStream input; final InternalLogId logId; - private final AtomicBoolean closed = new AtomicBoolean(false); GrpcReadListener( ServletServerStream stream, @@ -325,16 +321,8 @@ public void onDataAvailable() throws IOException { @Override public void onAllDataRead() { logger.log(FINE, "[{0}] onAllDataRead", logId); - if (!closed.compareAndSet(false, true)) { - // https://github.com/eclipse/jetty.project/issues/8405 - // Note that while this can be mitigated by setting - // AbstractHTTP2ServerConnectionFactory.getStreamIdleTimeout to zero, we allow this to be customized, so - // this workaround is being left in place. - logger.log(FINE, "[{0}] onAllDataRead already called, skipping this one", logId); - return; - } - stream.transportState().runOnTransportThread( - () -> stream.transportState().inboundDataReceived(ReadableBuffers.empty(), true)); + stream.transportState().runOnTransportThread(() -> + stream.transportState().inboundDataReceived(ReadableBuffers.empty(), true)); } @Override diff --git a/grpc-java/grpc-servlet-jakarta/src/main/java/io/grpc/servlet/jakarta/ServletServerBuilder.java b/grpc-java/grpc-servlet-jakarta/src/main/java/io/grpc/servlet/jakarta/ServletServerBuilder.java index e149cb5eb56..25c7b40c498 100644 --- a/grpc-java/grpc-servlet-jakarta/src/main/java/io/grpc/servlet/jakarta/ServletServerBuilder.java +++ b/grpc-java/grpc-servlet-jakarta/src/main/java/io/grpc/servlet/jakarta/ServletServerBuilder.java @@ -15,12 +15,14 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.ListenableFuture; +import io.grpc.Attributes; import io.grpc.ExperimentalApi; import io.grpc.ForwardingServerBuilder; import io.grpc.Internal; import io.grpc.InternalChannelz.SocketStats; import io.grpc.InternalInstrumented; import io.grpc.InternalLogId; +import io.grpc.Metadata; import io.grpc.Server; import io.grpc.ServerBuilder; import io.grpc.ServerStreamTracer; @@ -29,6 +31,7 @@ import io.grpc.internal.InternalServer; import io.grpc.internal.ServerImplBuilder; import io.grpc.internal.ServerListener; +import io.grpc.internal.ServerStream; import io.grpc.internal.ServerTransport; import io.grpc.internal.ServerTransportListener; import io.grpc.internal.SharedResourceHolder; @@ -97,10 +100,18 @@ public ServletAdapter buildServletAdapter() { return new ServletAdapter(buildAndStart(), streamTracerFactories, maxInboundMessageSize); } + /** + * Creates a {@link GrpcServlet}. + */ + public GrpcServlet buildServlet() { + return new GrpcServlet(buildServletAdapter()); + } + private ServerTransportListener buildAndStart() { + Server server; try { internalCaller = true; - build().start(); + server = build().start(); } catch (IOException e) { // actually this should never happen throw new RuntimeException(e); @@ -115,9 +126,29 @@ private ServerTransportListener buildAndStart() { // Create only one "transport" for all requests because it has no knowledge of which request is // associated with which client socket. This "transport" does not do socket connection, the // container does. - ServerTransportImpl serverTransport = - new ServerTransportImpl(scheduler, usingCustomScheduler); - return internalServer.serverListener.transportCreated(serverTransport); + ServerTransportImpl serverTransport = new ServerTransportImpl(scheduler); + ServerTransportListener delegate = + internalServer.serverListener.transportCreated(serverTransport); + return new ServerTransportListener() { + @Override + public void streamCreated(ServerStream stream, String method, Metadata headers) { + delegate.streamCreated(stream, method, headers); + } + + @Override + public Attributes transportReady(Attributes attributes) { + return delegate.transportReady(attributes); + } + + @Override + public void transportTerminated() { + server.shutdown(); + delegate.transportTerminated(); + if (!usingCustomScheduler) { + SharedResourceHolder.release(GrpcUtil.TIMER_SERVICE, scheduler); + } + } + }; } @VisibleForTesting @@ -211,24 +242,17 @@ static final class ServerTransportImpl implements ServerTransport { private final InternalLogId logId = InternalLogId.allocate(ServerTransportImpl.class, null); private final ScheduledExecutorService scheduler; - private final boolean usingCustomScheduler; - ServerTransportImpl( - ScheduledExecutorService scheduler, boolean usingCustomScheduler) { + ServerTransportImpl(ScheduledExecutorService scheduler) { this.scheduler = checkNotNull(scheduler, "scheduler"); - this.usingCustomScheduler = usingCustomScheduler; } @Override public void shutdown() { - if (!usingCustomScheduler) { - SharedResourceHolder.release(GrpcUtil.TIMER_SERVICE, scheduler); - } } @Override public void shutdownNow(Status reason) { - shutdown(); } @Override diff --git a/grpc-java/grpc-servlet-jakarta/src/main/java/io/grpc/servlet/jakarta/ServletServerStream.java b/grpc-java/grpc-servlet-jakarta/src/main/java/io/grpc/servlet/jakarta/ServletServerStream.java index cd04cf3b41e..2f3bce41f9a 100644 --- a/grpc-java/grpc-servlet-jakarta/src/main/java/io/grpc/servlet/jakarta/ServletServerStream.java +++ b/grpc-java/grpc-servlet-jakarta/src/main/java/io/grpc/servlet/jakarta/ServletServerStream.java @@ -152,8 +152,8 @@ public void bytesRead(int numBytes) { @Override public void deframeFailed(Throwable cause) { - if (logger.isLoggable(FINE)) { - logger.log(FINE, String.format("[{%s}] Exception processing message", logId), cause); + if (logger.isLoggable(WARNING)) { + logger.log(WARNING, String.format("[{%s}] Exception processing message", logId), cause); } cancel(Status.fromThrowable(cause)); } @@ -223,7 +223,7 @@ private final class Sink implements AbstractServerStream.Sink { final TrailerSupplier trailerSupplier = new TrailerSupplier(); @Override - public void writeHeaders(Metadata headers) { + public void writeHeaders(Metadata headers, boolean flush) { writeHeadersToServletResponse(headers); try { resp.setTrailerFields(trailerSupplier); diff --git a/grpc-java/grpc-servlet-websocket-jakarta/src/main/java/io/grpc/servlet/web/websocket/MultiplexedWebsocketStreamImpl.java b/grpc-java/grpc-servlet-websocket-jakarta/src/main/java/io/grpc/servlet/web/websocket/MultiplexedWebsocketStreamImpl.java index e2aa3b59901..f8f58d19ae5 100644 --- a/grpc-java/grpc-servlet-websocket-jakarta/src/main/java/io/grpc/servlet/web/websocket/MultiplexedWebsocketStreamImpl.java +++ b/grpc-java/grpc-servlet-websocket-jakarta/src/main/java/io/grpc/servlet/web/websocket/MultiplexedWebsocketStreamImpl.java @@ -56,7 +56,7 @@ protected AbstractServerStream.Sink abstractServerStreamSink() { private final class Sink implements AbstractServerStream.Sink { @Override - public void writeHeaders(Metadata headers) { + public void writeHeaders(Metadata headers, boolean flush) { writeMetadataToStream(headers, false); } diff --git a/grpc-java/grpc-servlet-websocket-jakarta/src/main/java/io/grpc/servlet/web/websocket/WebsocketStreamImpl.java b/grpc-java/grpc-servlet-websocket-jakarta/src/main/java/io/grpc/servlet/web/websocket/WebsocketStreamImpl.java index 259e230c171..04d953eb877 100644 --- a/grpc-java/grpc-servlet-websocket-jakarta/src/main/java/io/grpc/servlet/web/websocket/WebsocketStreamImpl.java +++ b/grpc-java/grpc-servlet-websocket-jakarta/src/main/java/io/grpc/servlet/web/websocket/WebsocketStreamImpl.java @@ -49,7 +49,7 @@ protected Sink abstractServerStreamSink() { private final class Sink implements AbstractServerStream.Sink { @Override - public void writeHeaders(Metadata headers) { + public void writeHeaders(Metadata headers, boolean flush) { // headers/trailers are always sent as asci, colon-delimited pairs, with \r\n separating them. The // trailer response must be prefixed with 0x80 (0r 0x81 if compressed), followed by the length of the // message diff --git a/java-client/session/build.gradle b/java-client/session/build.gradle index ef210ce3c64..b875569f9df 100644 --- a/java-client/session/build.gradle +++ b/java-client/session/build.gradle @@ -36,6 +36,7 @@ dependencies { testImplementation libs.junit4 testImplementation libs.grpc.testing + testImplementation libs.grpc.inprocess testImplementation libs.assertj diff --git a/server/test-utils/build.gradle b/server/test-utils/build.gradle index 54133d912b0..6f3c56e1af0 100644 --- a/server/test-utils/build.gradle +++ b/server/test-utils/build.gradle @@ -19,6 +19,7 @@ dependencies { api platform(libs.grpc.bom) api libs.grpc.testing + api libs.grpc.inprocess compileOnly project(':util-immutables') annotationProcessor libs.immutables.value