Skip to content

Commit

Permalink
chore: Update gRPC to 1.65.1 (#6301)
Browse files Browse the repository at this point in the history
This version matches what Arrow 18 depends on, plus a few bugfix
releases. All upstream gRPC-java changes to grpc-servlet are applied
here, and some conflicts resolved.
  • Loading branch information
niloc132 authored Nov 8, 2024
1 parent dfc8e0d commit 3cc66ff
Show file tree
Hide file tree
Showing 10 changed files with 135 additions and 75 deletions.
3 changes: 2 additions & 1 deletion gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ google-findbugs = "3.0.2"
google-java-allocation-instrumenter = "3.3.4"
groovy = "3.0.22"
# Only bump this in concert with boringssl
grpc = "1.58.0"
grpc = "1.65.1"
guava = "33.3.1-jre"
gwt = "2.11.0"
# used by GwtTools
Expand Down Expand Up @@ -176,6 +176,7 @@ grpc-protobuf = { module = "io.grpc:grpc-protobuf" }
grpc-services = { module = "io.grpc:grpc-services" }
grpc-stub = { module = "io.grpc:grpc-services" }
grpc-testing = { module = "io.grpc:grpc-testing" }
grpc-inprocess = { module = "io.grpc:grpc-inprocess" }
grpc-util = { module = "io.grpc:grpc-util" }

guava = { module = "com.google.guava:guava", version.ref = "guava" }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

package io.grpc.servlet.jakarta;

import com.google.common.annotations.VisibleForTesting;
import io.grpc.InternalLogId;
import io.grpc.servlet.jakarta.ServletServerStream.ServletTransportState;
import jakarta.servlet.AsyncContext;
Expand All @@ -26,6 +27,8 @@
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.LockSupport;
import java.util.function.BiFunction;
import java.util.function.BooleanSupplier;
import java.util.logging.Logger;

import static com.google.common.base.Preconditions.checkState;
Expand All @@ -36,9 +39,6 @@
/** Handles write actions from the container thread and the application thread. */
final class AsyncServletOutputStreamWriter {

private static final Logger logger =
Logger.getLogger(AsyncServletOutputStreamWriter.class.getName());

/**
* Memory boundary for write actions.
*
Expand All @@ -61,11 +61,11 @@ final class AsyncServletOutputStreamWriter {
*/
private final AtomicReference<WriteState> writeState = new AtomicReference<>(WriteState.DEFAULT);

private final ServletOutputStream outputStream;
private final ServletTransportState transportState;
private final InternalLogId logId;
private final Log log;
private final BiFunction<byte[], Integer, ActionItem> writeAction;
private final ActionItem flushAction;
private final ActionItem completeAction;
private final BooleanSupplier isReady;

/**
* New write actions will be buffered into this queue if the servlet output stream is not ready or the queue is not
Expand All @@ -82,38 +82,75 @@ final class AsyncServletOutputStreamWriter {
AsyncContext asyncContext,
ServletTransportState transportState,
InternalLogId logId) throws IOException {
this.outputStream = asyncContext.getResponse().getOutputStream();
this.transportState = transportState;
this.logId = logId;
Logger logger = Logger.getLogger(AsyncServletOutputStreamWriter.class.getName());
this.log = new Log() {
@Override
public void fine(String str, Object... params) {
if (logger.isLoggable(FINE)) {
logger.log(FINE, "[" + logId + "]" + str, params);
}
}

@Override
public void finest(String str, Object... params) {
if (logger.isLoggable(FINEST)) {
logger.log(FINEST, "[" + logId + "] " + str, params);
}
}

@Override
public boolean isFinestEnabled() {
return logger.isLoggable(FINEST);
}
};

ServletOutputStream outputStream = asyncContext.getResponse().getOutputStream();
this.writeAction = (byte[] bytes, Integer numBytes) -> () -> {
outputStream.write(bytes, 0, numBytes);
transportState.runOnTransportThread(() -> transportState.onSentBytes(numBytes));
if (log.isFinestEnabled()) {
log.finest("outbound data: length={0}, bytes={1}", numBytes, toHexString(bytes, numBytes));
}
};
this.flushAction = () -> {
logger.log(FINEST, "[{0}] flushBuffer", logId);
log.finest("flushBuffer");
asyncContext.getResponse().flushBuffer();
};
this.completeAction = () -> {
logger.log(FINE, "[{0}] call is completing", logId);
log.fine("call is completing");
transportState.runOnTransportThread(
() -> {
transportState.complete();
asyncContext.complete();
logger.log(FINE, "[{0}] call completed", logId);
log.fine("call completed");
});
};
this.isReady = () -> outputStream.isReady();
}

/**
* Constructor without java.util.logging and jakarta.servlet.* dependency, so that Lincheck can run.
*
* @param writeAction Provides an {@link ActionItem} to write given bytes with specified length.
* @param isReady Indicates whether the writer can write bytes at the moment (asynchronously).
*/
@VisibleForTesting
AsyncServletOutputStreamWriter(
BiFunction<byte[], Integer, ActionItem> writeAction,
ActionItem flushAction,
ActionItem completeAction,
BooleanSupplier isReady,
Log log) {
this.writeAction = writeAction;
this.flushAction = flushAction;
this.completeAction = completeAction;
this.isReady = isReady;
this.log = log;
}

/** Called from application thread. */
void writeBytes(byte[] bytes, int numBytes) throws IOException {
runOrBuffer(
// write bytes action
() -> {
outputStream.write(bytes, 0, numBytes);
transportState.runOnTransportThread(() -> transportState.onSentBytes(numBytes));
if (logger.isLoggable(FINEST)) {
logger.log(
FINEST,
"[{0}] outbound data: length = {1}, bytes = {2}",
new Object[] {logId, numBytes, toHexString(bytes, numBytes)});
}
});
runOrBuffer(writeAction.apply(bytes, numBytes));
}

/** Called from application thread. */
Expand All @@ -132,10 +169,9 @@ void complete() {

/** Called from the container thread {@link jakarta.servlet.WriteListener#onWritePossible()}. */
void onWritePossible() throws IOException {
logger.log(
FINEST, "[{0}] onWritePossible: ENTRY. The servlet output stream becomes ready", logId);
log.finest("onWritePossible: ENTRY. The servlet output stream becomes ready");
assureReadyAndDrainedTurnsFalse();
while (outputStream.isReady()) {
while (isReady.getAsBoolean()) {
WriteState curState = writeState.get();

ActionItem actionItem = writeChain.poll();
Expand All @@ -146,18 +182,15 @@ void onWritePossible() throws IOException {

if (writeState.compareAndSet(curState, curState.withReadyAndDrained(true))) {
// state has not changed since.
logger.log(
FINEST,
"[{0}] onWritePossible: EXIT. All data available now is sent out and the servlet output"
+ " stream is still ready",
logId);
log.finest(
"onWritePossible: EXIT. All data available now is sent out and the servlet output"
+ " stream is still ready");
return;
}
// else, state changed by another thread (runOrBuffer()), need to drain the writeChain
// again
}
logger.log(
FINEST, "[{0}] onWritePossible: EXIT. The servlet output stream becomes not ready", logId);
log.finest("onWritePossible: EXIT. The servlet output stream becomes not ready");
}

private void assureReadyAndDrainedTurnsFalse() {
Expand All @@ -166,6 +199,8 @@ private void assureReadyAndDrainedTurnsFalse() {
// being set to false by runOrBuffer() concurrently.
while (writeState.get().readyAndDrained) {
parkingThread = Thread.currentThread();
// Try to sleep for an extremely long time to avoid writeState being changed at exactly
// the time when sleep time expires (in extreme scenario, such as #9917).
LockSupport.parkNanos(Duration.ofHours(1).toNanos()); // should return immediately
}
parkingThread = null;
Expand All @@ -184,12 +219,12 @@ private void runOrBuffer(ActionItem actionItem) throws IOException {
if (actionItem == completeAction) {
return;
}
if (!outputStream.isReady()) {
if (!isReady.getAsBoolean()) {
boolean successful =
writeState.compareAndSet(curState, curState.withReadyAndDrained(false));
LockSupport.unpark(parkingThread);
checkState(successful, "Bug: curState is unexpectedly changed by another thread");
logger.log(FINEST, "[{0}] the servlet output stream becomes not ready", logId);
log.finest("the servlet output stream becomes not ready");
}
} else { // buffer to the writeChain
writeChain.offer(actionItem);
Expand All @@ -208,10 +243,22 @@ private void runOrBuffer(ActionItem actionItem) throws IOException {

/** Write actions, e.g. writeBytes, flush, complete. */
@FunctionalInterface
private interface ActionItem {
@VisibleForTesting
interface ActionItem {
void run() throws IOException;
}

@VisibleForTesting // Lincheck test can not run with java.util.logging dependency.
interface Log {
default void fine(String str, Object...params) {}

default void finest(String str, Object...params) {}

default boolean isFinestEnabled() {
return false;
}
}

private static final class WriteState {

static final WriteState DEFAULT = new WriteState(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@

package io.grpc.servlet.jakarta;

import com.google.common.annotations.VisibleForTesting;
import io.grpc.Attributes;
import io.grpc.BindableService;
import io.grpc.ExperimentalApi;
Expand Down Expand Up @@ -44,7 +43,6 @@ public class GrpcServlet extends HttpServlet {

private final ServletAdapter servletAdapter;

@VisibleForTesting
GrpcServlet(ServletAdapter servletAdapter) {
this.servletAdapter = servletAdapter;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
import java.util.Enumeration;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Logger;

import static com.google.common.base.Preconditions.checkArgument;
Expand Down Expand Up @@ -116,7 +115,7 @@ public <T> T otherAdapter(AdapterConstructor<T> constructor) {
* {@code resp.setBufferSize()} before invocation is allowed.
*/
public void doGet(HttpServletRequest req, HttpServletResponse resp) throws IOException {
// TODO(zdapeng)
resp.sendError(HttpServletResponse.SC_METHOD_NOT_ALLOWED, "GET method not supported");
}

/**
Expand Down Expand Up @@ -172,10 +171,8 @@ public void doPost(HttpServletRequest req, HttpServletResponse resp) throws IOEx
getAuthority(req),
logId);

stream.transportState().runOnTransportThread(() -> {
transportListener.streamCreated(stream, method, headers);
stream.transportState().onStreamAllocated();
});
transportListener.streamCreated(stream, method, headers);
stream.transportState().runOnTransportThread(stream.transportState()::onStreamAllocated);

asyncCtx.getRequest().getInputStream()
.setReadListener(new GrpcReadListener(stream, asyncCtx, logId));
Expand Down Expand Up @@ -217,7 +214,7 @@ private static String getAuthority(HttpServletRequest req) {
try {
return new URI(req.getRequestURL().toString()).getAuthority();
} catch (URISyntaxException e) {
logger.log(FINE, "Error getting authority from the request URL {0}" + req.getRequestURL());
logger.log(FINE, "Error getting authority from the request URL {0}", req.getRequestURL());
return req.getServerName() + ":" + req.getServerPort();
}
}
Expand Down Expand Up @@ -282,7 +279,6 @@ private static final class GrpcReadListener implements ReadListener {
final AsyncContext asyncCtx;
final ServletInputStream input;
final InternalLogId logId;
private final AtomicBoolean closed = new AtomicBoolean(false);

GrpcReadListener(
ServletServerStream stream,
Expand Down Expand Up @@ -325,16 +321,8 @@ public void onDataAvailable() throws IOException {
@Override
public void onAllDataRead() {
logger.log(FINE, "[{0}] onAllDataRead", logId);
if (!closed.compareAndSet(false, true)) {
// https://github.com/eclipse/jetty.project/issues/8405
// Note that while this can be mitigated by setting
// AbstractHTTP2ServerConnectionFactory.getStreamIdleTimeout to zero, we allow this to be customized, so
// this workaround is being left in place.
logger.log(FINE, "[{0}] onAllDataRead already called, skipping this one", logId);
return;
}
stream.transportState().runOnTransportThread(
() -> stream.transportState().inboundDataReceived(ReadableBuffers.empty(), true));
stream.transportState().runOnTransportThread(() ->
stream.transportState().inboundDataReceived(ReadableBuffers.empty(), true));
}

@Override
Expand Down
Loading

0 comments on commit 3cc66ff

Please sign in to comment.