Skip to content

Commit

Permalink
process time
Browse files Browse the repository at this point in the history
  • Loading branch information
bstrzele committed Dec 23, 2024
1 parent 88e298e commit 08f22b8
Show file tree
Hide file tree
Showing 6 changed files with 111 additions and 0 deletions.
19 changes: 19 additions & 0 deletions src/mediapipe_internal/mediapipegraphexecutor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "../model_metric_reporter.hpp"
#include "../profiler.hpp"
#include "../status.hpp"
#include "../timer.hpp"
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
#include "mediapipe/framework/calculator_graph.h"
Expand Down Expand Up @@ -104,6 +105,12 @@ class MediapipeGraphExecutor {
MetricGaugeGuard currentGraphsGuard(this->mediapipeServableMetricReporter->currentGraphs.get());
::mediapipe::CalculatorGraph graph;
MP_RETURN_ON_FAIL(graph.Initialize(this->config), std::string("failed initialization of MediaPipe graph: ") + this->name, StatusCode::MEDIAPIPE_GRAPH_INITIALIZATION_ERROR);
enum : unsigned int {
PROCESS,
TIMER_END2
};
Timer<TIMER_END2> timer;
timer.start(PROCESS);
std::unordered_map<std::string, ::mediapipe::OutputStreamPoller> outputPollers;
for (auto& name : this->outputNames) {
if (name.empty()) {
Expand Down Expand Up @@ -196,6 +203,9 @@ class MediapipeGraphExecutor {
INCREMENT_IF_ENABLED(this->mediapipeServableMetricReporter->getGraphErrorMetric(executionContext));
}
MP_RETURN_ON_FAIL(status, "graph wait until done", mediapipeAbslToOvmsStatus(status.code()));
timer.stop(PROCESS);
double processTime = timer.template elapsed<std::chrono::microseconds>(PROCESS);
OBSERVE_IF_ENABLED(this->mediapipeServableMetricReporter->getProcessingTimeMetric(executionContext), processTime);
if (outputPollers.size() != outputPollersWithReceivedPacket.size()) {
SPDLOG_DEBUG("Mediapipe failed to execute. Failed to receive all output packets");
return Status(StatusCode::MEDIAPIPE_EXECUTION_ERROR, "Unknown error during mediapipe execution");
Expand All @@ -218,6 +228,12 @@ class MediapipeGraphExecutor {
// Init
MP_RETURN_ON_FAIL(graph.Initialize(this->config), "graph initialization", StatusCode::MEDIAPIPE_GRAPH_INITIALIZATION_ERROR);
}
enum : unsigned int {
PROCESS,
TIMER_END2
};
Timer<TIMER_END2> timer;
timer.start(PROCESS);
{
OVMS_PROFILE_SCOPE("Mediapipe graph installing packet observers");
// Installing observers
Expand Down Expand Up @@ -334,6 +350,9 @@ class MediapipeGraphExecutor {
MP_RETURN_ON_FAIL(status, "graph wait until done", mediapipeAbslToOvmsStatus(status.code()));
SPDLOG_DEBUG("Graph {}: Done execution", this->name);
}
timer.stop(PROCESS);
double processTime = timer.template elapsed<std::chrono::microseconds>(PROCESS);
OBSERVE_IF_ENABLED(this->mediapipeServableMetricReporter->getProcessingTimeMetric(executionContext), processTime);
return StatusCode::OK;
} catch (...) {
SPDLOG_DEBUG("Graph {}: Exception while processing MediaPipe graph", this->name);
Expand Down
1 change: 1 addition & 0 deletions src/metric_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ const std::string METRIC_NAME_REQUESTS_ACCEPTED = "ovms_requests_accepted";
const std::string METRIC_NAME_REQUESTS_REJECTED = "ovms_requests_rejected";

const std::string METRIC_NAME_GRAPH_ERROR = "ovms_graph_error";
const std::string METRIC_NAME_PROCESSING_TIME = "ovms_processing_time";

bool MetricConfig::validateEndpointPath(const std::string& endpoint) {
std::regex valid_endpoint_regex("^/[a-zA-Z0-9]*$");
Expand Down
2 changes: 2 additions & 0 deletions src/metric_config.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ extern const std::string METRIC_NAME_REQUESTS_ACCEPTED;
extern const std::string METRIC_NAME_REQUESTS_REJECTED;

extern const std::string METRIC_NAME_GRAPH_ERROR;
extern const std::string METRIC_NAME_PROCESSING_TIME;

class Status;
/**
Expand Down Expand Up @@ -98,6 +99,7 @@ class MetricConfig {
{METRIC_NAME_REQUESTS_ACCEPTED},
{METRIC_NAME_REQUESTS_REJECTED},
{METRIC_NAME_GRAPH_ERROR},
{METRIC_NAME_PROCESSING_TIME},
{METRIC_NAME_RESPONSES}};
};
} // namespace ovms
47 changes: 47 additions & 0 deletions src/model_metric_reporter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,10 @@ MediapipeServableMetricReporter::MediapipeServableMetricReporter(const MetricCon
return;
}

for (int i = 0; i < NUMBER_OF_BUCKETS; i++) {
this->buckets.emplace_back(floor(BUCKET_MULTIPLIER * pow(BUCKET_POWER_BASE, i)));
}

auto familyName = METRIC_NAME_CURRENT_GRAPHS;
if (metricConfig->isFamilyEnabled(familyName)) {
auto family = registry->createFamily<MetricGauge>(familyName,
Expand Down Expand Up @@ -552,6 +556,49 @@ MediapipeServableMetricReporter::MediapipeServableMetricReporter(const MetricCon
{"interface", "REST"}});
THROW_IF_NULL(this->requestSuccessRestModelReady, "cannot create metric");
}
familyName = METRIC_NAME_PROCESSING_TIME;
if (metricConfig->isFamilyEnabled(familyName)) {
auto family = registry->createFamily<MetricHistogram>(familyName,
"Time packet spent in the MediaPipe graph.");
THROW_IF_NULL(family, "cannot create family");

// KFS
this->processingTimeGrpcModelInfer = family->addMetric({{"name", graphName},
{"api", "KServe"},
{"method", "ModelInfer"},
{"interface", "gRPC"}},
this->buckets);
THROW_IF_NULL(this->processingTimeGrpcModelInfer, "cannot create metric");

this->processingTimeGrpcModelInferStream = family->addMetric({{"name", graphName},
{"api", "KServe"},
{"method", "ModelInferStream"},
{"interface", "gRPC"}},
this->buckets);
THROW_IF_NULL(this->processingTimeGrpcModelInfer, "cannot create metric");

this->processingTimeRestModelInfer = family->addMetric({{"name", graphName},
{"api", "KServe"},
{"method", "ModelInfer"},
{"interface", "REST"}},
this->buckets);
THROW_IF_NULL(this->processingTimeRestModelInfer, "cannot create metric");

// V3
this->processingTimeRestV3Unary = family->addMetric({{"name", graphName},
{"api", "V3"},
{"method", "Unary"},
{"interface", "REST"}},
this->buckets);
THROW_IF_NULL(this->processingTimeRestV3Unary, "cannot create metric");

this->processingTimeRestV3Stream = family->addMetric({{"name", graphName},
{"api", "V3"},
{"method", "Stream"},
{"interface", "REST"}},
this->buckets);
THROW_IF_NULL(this->processingTimeRestV3Stream, "cannot create metric");
}
}

} // namespace ovms
30 changes: 30 additions & 0 deletions src/model_metric_reporter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,8 @@ class ModelMetricReporter : public ServableMetricReporter {
class MediapipeServableMetricReporter {
MetricRegistry* registry;

protected:
std::vector<double> buckets;
public:
std::unique_ptr<MetricGauge> currentGraphs;

Expand Down Expand Up @@ -205,6 +207,34 @@ class MediapipeServableMetricReporter {
std::unique_ptr<MetricCounter> requestErrorRestV3Unary;
std::unique_ptr<MetricCounter> requestErrorRestV3Stream;


std::unique_ptr<MetricHistogram> processingTimeGrpcModelInfer;
std::unique_ptr<MetricHistogram> processingTimeGrpcModelInferStream;
std::unique_ptr<MetricHistogram> processingTimeRestModelInfer;
std::unique_ptr<MetricHistogram> processingTimeRestV3Unary;
std::unique_ptr<MetricHistogram> processingTimeRestV3Stream;

inline MetricHistogram* getProcessingTimeMetric(const ExecutionContext& context) {
if (context.interface == ExecutionContext::Interface::GRPC) {
if (context.method == ExecutionContext::Method::ModelInfer)
return this->processingTimeGrpcModelInfer.get();
if (context.method == ExecutionContext::Method::ModelInferStream)
return this->processingTimeGrpcModelInferStream.get();
return nullptr;
} else if (context.interface == ExecutionContext::Interface::REST) {
if (context.method == ExecutionContext::Method::ModelInfer)
return this->processingTimeRestModelInfer.get();
if (context.method == ExecutionContext::Method::V3Unary)
return this->processingTimeRestV3Unary.get();
if (context.method == ExecutionContext::Method::V3Stream)
return this->processingTimeRestV3Stream.get();
return nullptr;
} else {
return nullptr;
}
return nullptr;
}

inline MetricCounter* getRequestsMetric(const ExecutionContext& context, bool success = true) {
if (context.interface == ExecutionContext::Interface::GRPC) {
if (context.method == ExecutionContext::Method::ModelInfer)
Expand Down
12 changes: 12 additions & 0 deletions src/test/metrics_flow_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,9 @@ TEST_F(MetricFlowTest, GrpcModelInfer) {
checkMediapipeRequestsCounter(server.collect(), METRIC_NAME_REQUESTS_REJECTED, mpName, "gRPC", "ModelInfer", "KServe", numberOfRejectedRequests);

checkMediapipeRequestsCounter(server.collect(), METRIC_NAME_RESPONSES, mpName, "gRPC", "ModelInfer", "KServe", numberOfAcceptedRequests);

EXPECT_THAT(server.collect(), HasSubstr(METRIC_NAME_PROCESSING_TIME + std::string{"_count{api=\"KServe\",interface=\"gRPC\",method=\"ModelInfer\",name=\""} + mpName + std::string{"\"} "} + std::to_string(numberOfAcceptedRequests)));
EXPECT_THAT(server.collect(), HasSubstr(METRIC_NAME_PROCESSING_TIME + std::string{"_count{api=\"KServe\",interface=\"REST\",method=\"ModelInfer\",name=\""} + mpName + std::string{"\"} "} + std::to_string(0)));
#endif

EXPECT_THAT(server.collect(), HasSubstr(METRIC_NAME_REQUEST_TIME + std::string{"_count{interface=\"gRPC\",name=\""} + modelName + std::string{"\",version=\"1\"} "} + std::to_string(numberOfSuccessRequests)));
Expand Down Expand Up @@ -708,6 +711,9 @@ TEST_F(MetricFlowTest, RestModelInfer) {
checkMediapipeRequestsCounter(server.collect(), METRIC_NAME_REQUESTS_REJECTED, mpName, "REST", "ModelInfer", "KServe", numberOfRejectedRequests);

checkMediapipeRequestsCounter(server.collect(), METRIC_NAME_RESPONSES, mpName, "REST", "ModelInfer", "KServe", numberOfAcceptedRequests);

EXPECT_THAT(server.collect(), HasSubstr(METRIC_NAME_PROCESSING_TIME + std::string{"_count{api=\"KServe\",interface=\"gRPC\",method=\"ModelInfer\",name=\""} + mpName + std::string{"\"} "} + std::to_string(0)));
EXPECT_THAT(server.collect(), HasSubstr(METRIC_NAME_PROCESSING_TIME + std::string{"_count{api=\"KServe\",interface=\"REST\",method=\"ModelInfer\",name=\""} + mpName + std::string{"\"} "} + std::to_string(numberOfAcceptedRequests)));
#endif

EXPECT_THAT(server.collect(), HasSubstr(METRIC_NAME_REQUEST_TIME + std::string{"_count{interface=\"gRPC\",name=\""} + modelName + std::string{"\",version=\"1\"} "} + std::to_string(0)));
Expand Down Expand Up @@ -828,6 +834,8 @@ TEST_F(MetricFlowTest, RestV3Unary) {
checkMediapipeRequestsCounter(server.collect(), METRIC_NAME_REQUESTS_ACCEPTED, "dummy_gpt", "REST", "Unary", "V3", numberOfAcceptedRequests * 2);
// checkMediapipeRequestsCounter(server.collect(), METRIC_NAME_REQUESTS_REJECTED, "dummy_gpt", "REST", "Unary", "V3", numberOfRejectedRequests);
checkMediapipeRequestsCounter(server.collect(), METRIC_NAME_RESPONSES, "dummy_gpt", "REST", "Unary", "V3", numberOfAcceptedRequests * 2);

EXPECT_THAT(server.collect(), HasSubstr(METRIC_NAME_PROCESSING_TIME + std::string{"_count{api=\"V3\",interface=\"REST\",method=\"Unary\",name=\""} + "dummy_gpt" + std::string{"\"} "} + std::to_string(numberOfAcceptedRequests * 2)));
}
#endif

Expand Down Expand Up @@ -880,6 +888,9 @@ TEST_F(MetricFlowTest, RestV3Stream) {
// checkMediapipeRequestsCounter(server.collect(), METRIC_NAME_REQUESTS_REJECTED, "dummy_gpt", "REST", "Stream", "V3", numberOfRejectedRequests);
const int numberOfMockedChunksPerRequest = 9; // Defined in openai_chat_completions_mock_calculator.cpp
checkMediapipeRequestsCounter(server.collect(), METRIC_NAME_RESPONSES, "dummy_gpt", "REST", "Stream", "V3", numberOfAcceptedRequests * numberOfMockedChunksPerRequest * 2);

EXPECT_THAT(server.collect(), HasSubstr(METRIC_NAME_PROCESSING_TIME + std::string{"_count{api=\"V3\",interface=\"REST\",method=\"Stream\",name=\""} + "dummy_gpt" + std::string{"\"} "} + std::to_string(numberOfAcceptedRequests * 2)));

SPDLOG_ERROR(server.collect());
}
#endif
Expand Down Expand Up @@ -986,6 +997,7 @@ std::string MetricFlowTest::prepareConfigContent() {
R"(",")" + METRIC_NAME_REQUESTS_REJECTED +
R"(",")" + METRIC_NAME_RESPONSES +
R"(",")" + METRIC_NAME_GRAPH_ERROR +
R"(",")" + METRIC_NAME_PROCESSING_TIME +
R"("]
}
},
Expand Down

0 comments on commit 08f22b8

Please sign in to comment.