-
Notifications
You must be signed in to change notification settings - Fork 200
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add an option to drop the request #732
Conversation
is it possible to cover such scenario with tests ? |
GenerationOutputs back(); | ||
// Reads result of a generation for single iteration | ||
GenerationOutputs read(); | ||
// Reads all generated tokens for all sequences | ||
std::vector<GenerationOutput> read_all(); | ||
}; | ||
|
||
using GenerationHandle = std::unique_ptr<GenerationHandleImpl>; | ||
using GenerationHandle = std::shared_ptr<GenerationHandleImpl>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With that change we can have multiple handles pointing to a single stream and we give an explicit option to drop generation via method call. This means that one handle can call drop
and invalidate handle for not only itself but potentially other handles.
I think that if we go this way we should block any calls on handle that has been dropped (throw errors for example).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that it is safe to say that current approach with unique_ptr did not restrict GenAI API users from handle misusage, one could simply take the reference of the handle and do whatever.
Changing to shared_ptr and exposing explicit drop()
method gives more flexibilty which OVMS needed - dropping the request in HTTP client disconnection callback (look up OVMS pull request). Now, multiple threads can use the handle (http thread and the mediapipe thread) and the generation is dropped once all handle shared references are gone.
I also agree with you that we could verify nobody calls read/read_all methods after handle is dropped. Will add that
// Notify the last time even if there will be no results | ||
// This causes read_all() to unblock in all situations | ||
request->notify_handle(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we can do it that way. The idea is that we should have only one notification per step() (if any). Here we add another call and when called in such circumstances, notify_handle()
will always send tokens to handle. So we can end up sending the same results twice. For example when generation finishes it notifies handle in sampler and then here in cleanup method. In the streaming scenario, if generation handle doesn't read() token and check status between these calls it will read last token twice.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have removed notify_handle call in case there is out of memory error. Now it is not duplicated, those will be notified in _free_non_running_requests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Separated the empty push to dropped handles into another step in stepping method.
4654360
to
eba3844
Compare
@@ -60,6 +60,15 @@ class ContinuousBatchingPipeline::Impl { | |||
ChatHistory m_history; | |||
|
|||
|
|||
void _notify_requests_dropped_by_handle() { | |||
// Notify the last time by pushing empty output | |||
// This causes read_all() to unblock by adding anything to the queue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// This causes read_all() to unblock by adding anything to the queue | |
// This causes read() to unblock when called before status change by adding anything to the queue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
void GenerationHandleImpl::drop() { | ||
m_generation_stream->drop(); | ||
} | ||
|
||
std::unordered_map<uint64_t, GenerationOutput> GenerationHandleImpl::back() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should block all methods not just read
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
src/cpp/src/generation_handle.cpp
Outdated
std::unordered_map<uint64_t, GenerationOutput> GenerationHandleImpl::back() { | ||
return m_generation_stream->back(); | ||
} | ||
|
||
std::unordered_map<uint64_t, GenerationOutput> GenerationHandleImpl::read() { | ||
OPENVINO_ASSERT(!is_dropped(), "Read cannot be called while underlying GenerationStream is already in dropped by handle state."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OPENVINO_ASSERT(!is_dropped(), "Read cannot be called while underlying GenerationStream is already in dropped by handle state."); | |
OPENVINO_ASSERT(!is_dropped(), "GenerationHandle cannot be used after it is dropped."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
I don't think we can test it on unit test level, we would need to work with real models. |
You can use real models in our tests - we use them in a lot of our tests. |
Real models are used in Python tests, so we would need to expose GenerationHandle via python, so new tests can use lower level API since we can't test it when running pipeline via generate() calls. |
@@ -60,6 +60,15 @@ class ContinuousBatchingPipeline::Impl { | |||
ChatHistory m_history; | |||
|
|||
|
|||
void _notify_requests_dropped_by_handle() { | |||
// Notify the last time by pushing empty output | |||
// This causes read() to unblock by adding anything to the queue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Even with this comment I don't fully understand why do we need to send empty outputs?
If handle is dropped by user, user should not expect any empty outputs from this request / handle
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
…d streaming) (#2610) * Patch tensorflow net_http to allow for installing client disconnection callbacks * Use new genai, add tests, fix building without mediapipe, disconnect unary as well * Tests CVS-148134 Modifications to GenAI: openvinotoolkit/openvino.genai#732
This enables to drop user request in case the client is disconnected (when used in OVMS).
OVMS commit using this:
openvinotoolkit/model_server#2610