-
Notifications
You must be signed in to change notification settings - Fork 200
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add an option to drop the request #732
Changes from all commits
4909204
b95638f
1a9c89d
63059c9
3eddb8f
673be16
933a7fd
4e21aa2
08476d8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -60,6 +60,15 @@ class ContinuousBatchingPipeline::Impl { | |
ChatHistory m_history; | ||
|
||
|
||
void _notify_requests_dropped_by_handle() { | ||
// Notify the last time by pushing empty output | ||
// This causes read() to unblock by adding anything to the queue | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Even with this comment I don't fully understand why do we need to send empty outputs? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
for (SequenceGroup::Ptr& request : m_requests) { | ||
if (request->handle_dropped()) | ||
request->push_empty_outputs(); | ||
} | ||
} | ||
|
||
void _free_non_running_requests() { | ||
std::vector<SequenceGroup::Ptr>::iterator requests_iterator = m_requests.begin(); | ||
while (requests_iterator != m_requests.end()) { | ||
|
@@ -136,7 +145,7 @@ class ContinuousBatchingPipeline::Impl { | |
std::lock_guard<std::mutex> lock{m_awaiting_requests_mutex}; | ||
m_awaiting_requests.push_back(sequence_group); | ||
} | ||
return std::make_unique<GenerationHandleImpl>(sequence_group->get_generation_stream(), sampling_params); | ||
return std::make_shared<GenerationHandleImpl>(sequence_group->get_generation_stream(), sampling_params); | ||
} | ||
|
||
GenerationHandle add_request(uint64_t request_id, const std::string& prompt, ov::genai::GenerationConfig sampling_params) { | ||
|
@@ -227,6 +236,15 @@ class ContinuousBatchingPipeline::Impl { | |
timer.end(); | ||
} | ||
|
||
// notify requests dropped by handle | ||
|
||
{ | ||
static ManualTimer timer("notify requests dropped by handle"); | ||
timer.start(); | ||
_notify_requests_dropped_by_handle(); | ||
timer.end(); | ||
} | ||
|
||
// free non running requests for current step | ||
|
||
{ | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -9,22 +9,32 @@ | |
using namespace ov::genai; | ||
|
||
GenerationHandleImpl::~GenerationHandleImpl() { | ||
m_generation_stream->drop(); | ||
drop(); | ||
} | ||
|
||
GenerationStatus GenerationHandleImpl::get_status() { | ||
return m_generation_stream->get_status(); | ||
} | ||
|
||
bool GenerationHandleImpl::can_read() { | ||
return m_generation_stream->can_read(); | ||
return !is_dropped() && m_generation_stream->can_read(); | ||
} | ||
|
||
bool GenerationHandleImpl::is_dropped() { | ||
return get_status() == GenerationStatus::DROPPED_BY_HANDLE; | ||
} | ||
|
||
void GenerationHandleImpl::drop() { | ||
m_generation_stream->drop(); | ||
} | ||
|
||
std::unordered_map<uint64_t, GenerationOutput> GenerationHandleImpl::back() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we should block all methods not just There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
OPENVINO_ASSERT(!is_dropped(), "GenerationHandle cannot be used after it is dropped."); | ||
return m_generation_stream->back(); | ||
} | ||
|
||
std::unordered_map<uint64_t, GenerationOutput> GenerationHandleImpl::read() { | ||
OPENVINO_ASSERT(!is_dropped(), "GenerationHandle cannot be used after it is dropped."); | ||
return m_generation_stream->read(); | ||
} | ||
|
||
|
@@ -41,6 +51,7 @@ void add_partial_result(std::unordered_map<uint64_t, GenerationOutput>& partial_ | |
} | ||
|
||
std::vector<GenerationOutput> GenerationHandleImpl::read_all() { | ||
OPENVINO_ASSERT(!is_dropped(), "GenerationHandle cannot be used after it is dropped."); | ||
std::vector<GenerationOutput> results; | ||
std::unordered_map<uint64_t, GenerationOutput> partial_results; | ||
// We iterate until generation is running or there are tokens we haven't read yet | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With that change we can have multiple handles pointing to a single stream and we give an explicit option to drop generation via method call. This means that one handle can call
drop
and invalidate handle for not only itself but potentially other handles.I think that if we go this way we should block any calls on handle that has been dropped (throw errors for example).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that it is safe to say that current approach with unique_ptr did not restrict GenAI API users from handle misusage, one could simply take the reference of the handle and do whatever.
Changing to shared_ptr and exposing explicit
drop()
method gives more flexibilty which OVMS needed - dropping the request in HTTP client disconnection callback (look up OVMS pull request). Now, multiple threads can use the handle (http thread and the mediapipe thread) and the generation is dropped once all handle shared references are gone.I also agree with you that we could verify nobody calls read/read_all methods after handle is dropped. Will add that