Skip to content

Commit

Permalink
Forward-merge branch-24.03 into branch-24.06 (nv-morpheus#1591)
Browse files Browse the repository at this point in the history
Forward-merge triggered by push to branch-24.03 that creates a PR to
keep branch-24.06 up-to-date. If this PR is unable to be immediately
merged due to conflicts, it will remain open for the team to manually
merge. See [forward-merger
docs](https://docs.rapids.ai/maintainers/forward-merger/) for more info.
  • Loading branch information
mdemoret-nv authored Apr 5, 2024
2 parents 82a80aa + 10922a4 commit 3c23991
Show file tree
Hide file tree
Showing 18 changed files with 1,365 additions and 573 deletions.
4 changes: 3 additions & 1 deletion .clang-tidy
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ Checks: >
WarningsAsErrors: >
*,
-clang-diagnostic-unused-command-line-argument
-clang-diagnostic-unused-command-line-argument,
-Wno-ignored-optimization-argument,
-Qunused-arguments
#WarningsAsErrors: '*'
HeaderFilterRegex: '.*\/include\/morpheus\/.*'
Expand Down
10 changes: 9 additions & 1 deletion examples/log_parsing/inference.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,4 +180,12 @@ def _convert_one_response(output: MultiResponseMessage, inf: MultiInferenceNLPMe
return MultiResponseMessage.from_message(inf, memory=memory, offset=inf.offset, count=inf.mess_count)

def _get_inference_worker(self, inf_queue: ProducerConsumerQueue) -> TritonInferenceLogParsing:
return TritonInferenceLogParsing(inf_queue=inf_queue, c=self._config, **self._kwargs)
return TritonInferenceLogParsing(inf_queue=inf_queue,
c=self._config,
server_url=self._server_url,
model_name=self._model_name,
force_convert_inputs=self._force_convert_inputs,
use_shared_memory=self._use_shared_memory,
input_mapping=self._input_mapping,
output_mapping=self._output_mapping,
needs_logits=self._needs_logits)
1 change: 1 addition & 0 deletions morpheus/_lib/cmake/libmorpheus.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ add_library(morpheus
src/stages/file_source.cpp
src/stages/filter_detection.cpp
src/stages/http_server_source_stage.cpp
src/stages/inference_client_stage.cpp
src/stages/kafka_source.cpp
src/stages/preprocess_fil.cpp
src/stages/preprocess_nlp.cpp
Expand Down
168 changes: 168 additions & 0 deletions morpheus/_lib/include/morpheus/stages/inference_client_stage.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
/*
* SPDX-FileCopyrightText: Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include "morpheus/export.h"
#include "morpheus/messages/multi_inference.hpp"
#include "morpheus/messages/multi_response.hpp"
#include "morpheus/types.hpp"

#include <mrc/coroutines/async_generator.hpp>
#include <mrc/coroutines/scheduler.hpp>
#include <mrc/coroutines/task.hpp>
#include <mrc/segment/builder.hpp>
#include <mrc/segment/object.hpp>
#include <pybind11/pybind11.h>
#include <pymrc/asyncio_runnable.hpp>

#include <cstdint>
#include <map>
#include <memory>
#include <mutex>
#include <string>
#include <vector>

namespace morpheus {

struct MORPHEUS_EXPORT TensorModelMapping
{
/**
* @brief The field name to/from the model used for mapping
*/
std::string model_field_name;

/**
* @brief The field name to/from the tensor used for mapping
*/
std::string tensor_field_name;
};

class MORPHEUS_EXPORT IInferenceClientSession
{
public:
virtual ~IInferenceClientSession() = default;
/**
@brief Gets the inference input mappings
*/
virtual std::vector<TensorModelMapping> get_input_mappings(std::vector<TensorModelMapping> input_map_overrides) = 0;

/**
@brief Gets the inference output mappings
*/
virtual std::vector<TensorModelMapping> get_output_mappings(
std::vector<TensorModelMapping> output_map_overrides) = 0;

/**
@brief Invokes a single tensor inference
*/
virtual mrc::coroutines::Task<TensorMap> infer(TensorMap&& inputs) = 0;
};

class MORPHEUS_EXPORT IInferenceClient
{
public:
virtual ~IInferenceClient() = default;
/**
@brief Creates an inference session.
*/
virtual std::unique_ptr<IInferenceClientSession> create_session() = 0;
};

/**
* @addtogroup stages
* @{
* @file
*/

/**
* @brief Perform inference with Triton Inference Server.
* This class specifies which inference implementation category (Ex: NLP/FIL) is needed for inferencing.
*/
class MORPHEUS_EXPORT InferenceClientStage
: public mrc::pymrc::AsyncioRunnable<std::shared_ptr<MultiInferenceMessage>, std::shared_ptr<MultiResponseMessage>>
{
public:
using sink_type_t = std::shared_ptr<MultiInferenceMessage>;
using source_type_t = std::shared_ptr<MultiResponseMessage>;

/**
* @brief Construct a new Inference Client Stage object
*
* @param client : Inference client instance.
* @param model_name : Name of the model specifies which model can handle the inference requests that are sent to
* Triton inference
* @param needs_logits : Determines if logits are required.
* @param inout_mapping : Dictionary used to map pipeline input/output names to Triton input/output names. Use this
* if the Morpheus names do not match the model.
*/
InferenceClientStage(std::unique_ptr<IInferenceClient>&& client,
std::string model_name,
bool needs_logits,
std::vector<TensorModelMapping> input_mapping,
std::vector<TensorModelMapping> output_mapping);

/**
* Process a single MultiInferenceMessage by running the constructor-provided inference client against it's Tensor,
* and yields the result as a MultiResponseMessage
*/
mrc::coroutines::AsyncGenerator<std::shared_ptr<MultiResponseMessage>> on_data(
std::shared_ptr<MultiInferenceMessage>&& data, std::shared_ptr<mrc::coroutines::Scheduler> on) override;

private:
std::string m_model_name;
std::shared_ptr<IInferenceClient> m_client;
std::shared_ptr<IInferenceClientSession> m_session;
bool m_needs_logits{true};
std::vector<TensorModelMapping> m_input_mapping;
std::vector<TensorModelMapping> m_output_mapping;
std::mutex m_session_mutex;

int32_t m_retry_max = 10;
};

/****** InferenceClientStageInferenceProxy******************/
/**
* @brief Interface proxy, used to insulate python bindings.
*/
struct MORPHEUS_EXPORT InferenceClientStageInterfaceProxy
{
/**
* @brief Create and initialize a InferenceClientStage, and return the result
*
* @param builder : Pipeline context object reference
* @param name : Name of a stage reference
* @param model_name : Name of the model specifies which model can handle the inference requests that are sent to
* Triton inference
* @param server_url : Triton server URL.
* @param needs_logits : Determines if logits are required.
* @param inout_mapping : Dictionary used to map pipeline input/output names to Triton input/output names. Use this
* if the Morpheus names do not match the model.
* @return std::shared_ptr<mrc::segment::Object<InferenceClientStage>>
*/
static std::shared_ptr<mrc::segment::Object<InferenceClientStage>> init(
mrc::segment::Builder& builder,
const std::string& name,
std::string model_name,
std::string server_url,
bool needs_logits,
std::map<std::string, std::string> input_mapping,
std::map<std::string, std::string> output_mapping);
};
/** @} */ // end of group

} // namespace morpheus
Loading

0 comments on commit 3c23991

Please sign in to comment.