Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use NIO scheduleCallback API for connection handler timers #28

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ let dependencies: [Package.Dependency] = [
),
.package(
url: "https://github.com/apple/swift-nio.git",
from: "2.65.0"
from: "2.75.0"
),
.package(
url: "https://github.com/apple/swift-nio-http2.git",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,17 +63,14 @@ package final class ClientConnectionHandler: ChannelInboundHandler, ChannelOutbo
/// The `EventLoop` of the `Channel` this handler exists in.
private let eventLoop: any EventLoop

/// The maximum amount of time the connection may be idle for. If the connection remains idle
/// (i.e. has no open streams) for this period of time then the connection will be gracefully
/// closed.
private var maxIdleTimer: Timer?
/// The timer used to gracefully close idle connections.
private var maxIdleTimerHandler: Timer<MaxIdleTimerHandlerView>?

/// The amount of time to wait before sending a keep alive ping.
private var keepaliveTimer: Timer?
/// The timer used to send keep-alive pings.
private var keepaliveTimerHandler: Timer<KeepaliveTimerHandlerView>?

/// The amount of time the client has to reply after sending a keep alive ping. Only used if
/// `keepaliveTimer` is set.
private var keepaliveTimeoutTimer: Timer
/// The timer used to detect keep alive timeouts, if keep-alive pings are enabled.
private var keepaliveTimeoutHandler: Timer<KeepaliveTimeoutHandlerView>?

/// Opaque data sent in keep alive pings.
private let keepalivePingData: HTTP2PingData
Expand Down Expand Up @@ -110,14 +107,34 @@ package final class ClientConnectionHandler: ChannelInboundHandler, ChannelOutbo
keepaliveWithoutCalls: Bool
) {
self.eventLoop = eventLoop
self.maxIdleTimer = maxIdleTime.map { Timer(delay: $0) }
self.keepaliveTimer = keepaliveTime.map { Timer(delay: $0, repeat: true) }
self.keepaliveTimeoutTimer = Timer(delay: keepaliveTimeout ?? .seconds(20))
self.keepalivePingData = HTTP2PingData(withInteger: .random(in: .min ... .max))
self.state = StateMachine(allowKeepaliveWithoutCalls: keepaliveWithoutCalls)

self.flushPending = false
self.inReadLoop = false
if let maxIdleTime {
self.maxIdleTimerHandler = Timer(
eventLoop: eventLoop,
duration: maxIdleTime,
repeating: false,
handler: MaxIdleTimerHandlerView(self)
)
}
if let keepaliveTime {
let keepaliveTimeout = keepaliveTimeout ?? .seconds(20)
self.keepaliveTimerHandler = Timer(
eventLoop: eventLoop,
duration: keepaliveTime,
repeating: true,
handler: KeepaliveTimerHandlerView(self)
)
self.keepaliveTimeoutHandler = Timer(
eventLoop: eventLoop,
duration: keepaliveTimeout,
repeating: false,
handler: KeepaliveTimeoutHandlerView(self)
)
}
}

package func handlerAdded(context: ChannelHandlerContext) {
Expand All @@ -142,8 +159,8 @@ package final class ClientConnectionHandler: ChannelInboundHandler, ChannelOutbo
promise.succeed()
}

self.keepaliveTimer?.cancel()
self.keepaliveTimeoutTimer.cancel()
self.keepaliveTimerHandler?.cancel()
self.keepaliveTimeoutHandler?.cancel()
context.fireChannelInactive()
}

Expand Down Expand Up @@ -222,11 +239,8 @@ package final class ClientConnectionHandler: ChannelInboundHandler, ChannelOutbo
// Pings are ack'd by the HTTP/2 handler so we only pay attention to acks here, and in
// particular only those carrying the keep-alive data.
if ack, data == self.keepalivePingData {
let loopBound = LoopBoundView(handler: self, context: context)
self.keepaliveTimeoutTimer.cancel()
self.keepaliveTimer?.schedule(on: context.eventLoop) {
loopBound.keepaliveTimerFired()
}
self.keepaliveTimeoutHandler?.cancel()
self.keepaliveTimerHandler?.start()
}

case .settings(.settings(_)):
Expand All @@ -236,15 +250,8 @@ package final class ClientConnectionHandler: ChannelInboundHandler, ChannelOutbo
// becoming active is insufficient as, for example, a TLS handshake may fail after
// establishing the TCP connection, or the server isn't configured for gRPC (or HTTP/2).
if isInitialSettings {
let loopBound = LoopBoundView(handler: self, context: context)
self.keepaliveTimer?.schedule(on: context.eventLoop) {
loopBound.keepaliveTimerFired()
}

self.maxIdleTimer?.schedule(on: context.eventLoop) {
loopBound.maxIdleTimerFired()
}

self.keepaliveTimerHandler?.start()
self.maxIdleTimerHandler?.start()
context.fireChannelRead(self.wrapInboundOut(.ready))
}

Expand Down Expand Up @@ -290,29 +297,44 @@ package final class ClientConnectionHandler: ChannelInboundHandler, ChannelOutbo
}
}

// Timer handler views.
extension ClientConnectionHandler {
struct LoopBoundView: @unchecked Sendable {
final class MaxIdleTimerHandlerView: @unchecked Sendable, NIOScheduledCallbackHandler {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couple of questions:

  1. Does this need to be a class?
  2. Does it need to be Sendable? The previous LoopBoundView was Sendable because it needed to be pass to the schedule(on:) closure, I don't think we have a similar constraint any more (?)

If it doesn't need to be Sendable then I think we can remove the assertInEventLoop checks as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • Does this need to be a class?

These view types can be structs. That they are classes, is a hangover from before I pushed this logic into Timer. Timer itself needs to be a class because the protocol requirement for handleScheduledCallback is non-mutating.

I've added a commit to address that.

  • Does it need to be Sendable? The previous LoopBoundView was Sendable because it needed to be pass to the schedule(on:) closure, I don't think we have a similar constraint any more (?)

It wasn't the case when the API was added to NIO, but it looks like it got changed since and now it is a requirement that the handler be Sendable:

    @preconcurrency
    @discardableResult
    func scheduleCallback(
        in amount: TimeAmount,
        handler: some (NIOScheduledCallbackHandler & Sendable)
    ) throws -> NIOScheduledCallback

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for clarifying 👍

private let handler: ClientConnectionHandler
private let context: ChannelHandlerContext

init(handler: ClientConnectionHandler, context: ChannelHandlerContext) {
init(_ handler: ClientConnectionHandler) {
self.handler = handler
self.context = context
}

func keepaliveTimerFired() {
self.context.eventLoop.assertInEventLoop()
self.handler.keepaliveTimerFired(context: self.context)
func handleScheduledCallback(eventLoop: some EventLoop) {
self.handler.eventLoop.assertInEventLoop()
self.handler.maxIdleTimerFired()
}
}

final class KeepaliveTimerHandlerView: @unchecked Sendable, NIOScheduledCallbackHandler {
private let handler: ClientConnectionHandler

func keepaliveTimeoutExpired() {
self.context.eventLoop.assertInEventLoop()
self.handler.keepaliveTimeoutExpired(context: self.context)
init(_ handler: ClientConnectionHandler) {
self.handler = handler
}

func maxIdleTimerFired() {
self.context.eventLoop.assertInEventLoop()
self.handler.maxIdleTimerFired(context: self.context)
func handleScheduledCallback(eventLoop: some EventLoop) {
self.handler.eventLoop.assertInEventLoop()
self.handler.keepaliveTimerFired()
}
}

final class KeepaliveTimeoutHandlerView: @unchecked Sendable, NIOScheduledCallbackHandler {
private let handler: ClientConnectionHandler

init(_ handler: ClientConnectionHandler) {
self.handler = handler
}

func handleScheduledCallback(eventLoop: some EventLoop) {
self.handler.eventLoop.assertInEventLoop()
self.handler.keepaliveTimeoutExpired()
}
}
}
Expand Down Expand Up @@ -356,7 +378,7 @@ extension ClientConnectionHandler {
self.eventLoop.assertInEventLoop()

// Stream created, so the connection isn't idle.
self.maxIdleTimer?.cancel()
self.maxIdleTimerHandler?.cancel()
self.state.streamOpened(id)
}

Expand All @@ -368,13 +390,10 @@ extension ClientConnectionHandler {
case .startIdleTimer(let cancelKeepalive):
// All streams are closed, restart the idle timer, and stop the keep-alive timer (it may
// not stop if keep-alive is allowed when there are no active calls).
let loopBound = LoopBoundView(handler: self, context: context)
self.maxIdleTimer?.schedule(on: context.eventLoop) {
loopBound.maxIdleTimerFired()
}
self.maxIdleTimerHandler?.start()

if cancelKeepalive {
self.keepaliveTimer?.cancel()
self.keepaliveTimerHandler?.cancel()
}

case .close:
Expand All @@ -397,34 +416,31 @@ extension ClientConnectionHandler {
}
}

private func keepaliveTimerFired(context: ChannelHandlerContext) {
guard self.state.sendKeepalivePing() else { return }
private func keepaliveTimerFired() {
guard self.state.sendKeepalivePing(), let context = self.context else { return }

// Cancel the keep alive timer when the client sends a ping. The timer is resumed when the ping
// is acknowledged.
self.keepaliveTimer?.cancel()
self.keepaliveTimerHandler?.cancel()

let ping = HTTP2Frame(streamID: .rootStream, payload: .ping(self.keepalivePingData, ack: false))
context.write(self.wrapOutboundOut(ping), promise: nil)
self.maybeFlush(context: context)

// Schedule a timeout on waiting for the response.
let loopBound = LoopBoundView(handler: self, context: context)
self.keepaliveTimeoutTimer.schedule(on: context.eventLoop) {
loopBound.keepaliveTimeoutExpired()
}
self.keepaliveTimeoutHandler?.start()
}

private func keepaliveTimeoutExpired(context: ChannelHandlerContext) {
guard self.state.beginClosing() else { return }
private func keepaliveTimeoutExpired() {
guard self.state.beginClosing(), let context = self.context else { return }

context.fireChannelRead(self.wrapInboundOut(.closing(.keepaliveExpired)))
self.writeAndFlushGoAway(context: context, message: "keepalive_expired")
context.close(promise: nil)
}

private func maxIdleTimerFired(context: ChannelHandlerContext) {
guard self.state.beginClosing() else { return }
private func maxIdleTimerFired() {
guard self.state.beginClosing(), let context = self.context else { return }

context.fireChannelRead(self.wrapInboundOut(.closing(.idle)))
self.writeAndFlushGoAway(context: context, message: "idle")
Expand Down
83 changes: 41 additions & 42 deletions Sources/GRPCNIOTransportCore/Internal/Timer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -16,55 +16,54 @@

package import NIOCore

package struct Timer {
/// The delay to wait before running the task.
private let delay: TimeAmount
/// The task to run, if scheduled.
private var task: Kind?
/// Whether the task to schedule is repeated.
private let `repeat`: Bool
/// A timer backed by `NIOScheduledCallback`.
package final class Timer<Handler: NIOScheduledCallbackHandler> where Handler: Sendable {
/// The event loop on which to run this timer.
private let eventLoop: any EventLoop

private enum Kind {
case once(Scheduled<Void>)
case repeated(RepeatedTask)
/// The duration of the timer.
private let duration: TimeAmount

func cancel() {
switch self {
case .once(let task):
task.cancel()
case .repeated(let task):
task.cancel()
}
}
}
/// Whether this timer should repeat.
private let repeating: Bool

/// The handler to call when the timer fires.
private let handler: Handler

/// The currently scheduled callback if the timer is running.
private var scheduledCallback: NIOScheduledCallback?

package init(delay: TimeAmount, repeat: Bool = false) {
self.delay = delay
self.task = nil
self.repeat = `repeat`
package init(eventLoop: any EventLoop, duration: TimeAmount, repeating: Bool, handler: Handler) {
self.eventLoop = eventLoop
self.duration = duration
self.repeating = repeating
self.handler = handler
self.scheduledCallback = nil
}

/// Schedule a task on the given `EventLoop`.
package mutating func schedule(
on eventLoop: any EventLoop,
work: @escaping @Sendable () throws -> Void
) {
self.task?.cancel()
/// Cancel the timer, if it is running.
package func cancel() {
self.eventLoop.assertInEventLoop()
guard let scheduledCallback = self.scheduledCallback else { return }
scheduledCallback.cancel()
}

if self.repeat {
let task = eventLoop.scheduleRepeatedTask(initialDelay: self.delay, delay: self.delay) { _ in
try work()
}
self.task = .repeated(task)
} else {
let task = eventLoop.scheduleTask(in: self.delay, work)
self.task = .once(task)
}
/// Start or restart the timer.
package func start() {
self.eventLoop.assertInEventLoop()
self.scheduledCallback?.cancel()
// Only throws if the event loop is shutting down, so we'll just swallow the error here.
self.scheduledCallback = try? self.eventLoop.scheduleCallback(in: self.duration, handler: self)
}
}

/// Cancels the task, if one was scheduled.
package mutating func cancel() {
self.task?.cancel()
self.task = nil
extension Timer: NIOScheduledCallbackHandler, @unchecked Sendable where Handler: Sendable {
/// For repeated timer support, the timer itself proxies the callback and restarts the timer.
///
/// - NOTE: Users should not call this function directly.
package func handleScheduledCallback(eventLoop: some EventLoop) {
self.eventLoop.assertInEventLoop()
self.handler.handleScheduledCallback(eventLoop: eventLoop)
if self.repeating { self.start() }
Comment on lines +64 to +67
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you remind me what happens w.r.t. the callback if it's cancelled? Just wondering if it works how we expect for the repeating timers.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that the NIO scheduled callback API is always one-shot. The repeated timer API here is layered on top.

The semantics of this timer is that if you cancel it you cancel all future recurrences too, but it can then be reused/restarted by calling start again.

There's a test case that covers this behaviour.

}
}
Loading
Loading