Skip to content

Commit

Permalink
paused priority support
Browse files Browse the repository at this point in the history
Reviewed By: afrind

Differential Revision: D67988645

fbshipit-source-id: 465f8c56177e5b703dbc1511b73521e6c26c1f3e
  • Loading branch information
pfarcasanu authored and facebook-github-bot committed Jan 24, 2025
1 parent d438b82 commit 38d77ba
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 4 deletions.
54 changes: 54 additions & 0 deletions quic/api/test/QuicPacketSchedulerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2818,4 +2818,58 @@ TEST_F(QuicPacketSchedulerTest, RstStreamSchedulerReliableReset) {
EXPECT_FALSE(conn.pendingEvents.resets.contains(stream->id));
}

TEST_F(QuicPacketSchedulerTest, PausedPriorityInitial) {
static const auto kSequentialPriority = Priority(3, false);
static const auto kPausedPriority = Priority(0, false, 0, true /* paused */);
QuicServerConnectionState conn(
FizzServerQuicHandshakeContext::Builder().build());
conn.streamManager->setMaxLocalBidirectionalStreams(10);
conn.flowControlState.peerAdvertisedMaxOffset = 100000;
conn.flowControlState.peerAdvertisedInitialMaxStreamOffsetBidiRemote = 100000;
auto pausedStreamId =
(*conn.streamManager->createNextBidirectionalStream())->id;
auto regularStreamId =
(*conn.streamManager->createNextBidirectionalStream())->id;
auto pausedStream = conn.streamManager->findStream(pausedStreamId);
auto regularStream = conn.streamManager->findStream(regularStreamId);
pausedStream->priority = kPausedPriority;
regularStream->priority = kSequentialPriority;

writeDataToQuicStream(
*conn.streamManager->findStream(pausedStream->id),
folly::IOBuf::copyBuffer("paused_data"),
false);
writeDataToQuicStream(
*conn.streamManager->findStream(regularStream->id),
folly::IOBuf::copyBuffer("regular_data"),
false);

// Should write frames for only regular stream.
StreamFrameScheduler scheduler(conn);
NiceMock<MockQuicPacketBuilder> mockBuilder;
EXPECT_CALL(mockBuilder, remainingSpaceInPkt()).WillRepeatedly(Return(4096));
EXPECT_CALL(mockBuilder, appendFrame(_)).WillRepeatedly(Invoke([&](auto f) {
mockBuilder.frames_.push_back(f);
}));
scheduler.writeStreams(mockBuilder);
auto& frames = mockBuilder.frames_;
ASSERT_EQ(frames.size(), 1);
WriteStreamFrame regularFrame(regularStream->id, 0, 12, false);
ASSERT_TRUE(frames[0].asWriteStreamFrame());
EXPECT_EQ(*frames[0].asWriteStreamFrame(), regularFrame);
conn.streamManager->removeWritable(*regularStream);

// Unpause the stream. Expect the scheduleor to write the data.
conn.streamManager->setStreamPriority(pausedStreamId, kSequentialPriority);
scheduler.writeStreams(mockBuilder);
ASSERT_EQ(frames.size(), 2);
WriteStreamFrame pausedFrame(pausedStream->id, 0, 11, false);
ASSERT_TRUE(frames[1].asWriteStreamFrame());
EXPECT_EQ(*frames[1].asWriteStreamFrame(), pausedFrame);

// Pause the stream again. Expect no more data writable.
conn.streamManager->setStreamPriority(pausedStreamId, kPausedPriority);
ASSERT_FALSE(conn.streamManager->hasWritable());
}

} // namespace quic::test
11 changes: 7 additions & 4 deletions quic/state/QuicPriorityQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,15 @@ using OrderedStreamSet = std::set<OrderedStream, ordered_stream_cmp>;
struct Priority {
uint8_t level : 3;
bool incremental : 1;
OrderId orderId : 58;
OrderId orderId : 57;
bool paused : 1;

Priority(uint8_t l, bool i, OrderId o = 0)
: level(l), incremental(i), orderId(o) {}
Priority(uint8_t l, bool i, OrderId o = 0, bool p = false)
: level(l), incremental(i), orderId(o), paused(p) {}

bool operator==(Priority other) const noexcept {
return level == other.level && incremental == other.incremental &&
orderId == other.orderId;
orderId == other.orderId && paused == other.paused;
}
};

Expand Down Expand Up @@ -212,11 +213,13 @@ struct PriorityQueue {
void updateIfExist(StreamId id, Priority priority) {
auto iter = writableStreamsToLevel_.find(id);
if (iter != writableStreamsToLevel_.end()) {
CHECK(!priority.paused);
updateExistingStreamPriority(iter, priority);
}
}

void insertOrUpdate(StreamId id, Priority pri) {
CHECK(!pri.paused);
auto it = writableStreamsToLevel_.find(id);
auto index = priority2index(pri);
if (it != writableStreamsToLevel_.end()) {
Expand Down
5 changes: 5 additions & 0 deletions quic/state/QuicStreamManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,7 @@ bool QuicStreamManager::setStreamPriority(StreamId id, Priority newPriority) {
}
notifyStreamPriorityChanges();
}
updateWritableStreams(*stream);
writeQueue_.updateIfExist(id, stream->priority);
return true;
}
Expand Down Expand Up @@ -669,6 +670,10 @@ void QuicStreamManager::updateWritableStreams(QuicStreamState& stream) {
removeWritable(stream);
return;
}
if (stream.priority.paused) {
removeWritable(stream);
return;
}
if (stream.hasWritableData()) {
writableStreams_.emplace(stream.id);
} else {
Expand Down

0 comments on commit 38d77ba

Please sign in to comment.