Skip to content

Commit

Permalink
Modified mock server for OpenAI within LangChain
Browse files Browse the repository at this point in the history
Co-authored-by: Hannah Stepanek <[email protected]>
  • Loading branch information
lrafeei and hmstepanek committed Dec 20, 2023
1 parent c4964aa commit abfdb1c
Show file tree
Hide file tree
Showing 4 changed files with 165 additions and 191 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,13 @@

import json

import pytest
from testing_support.mock_external_http_server import MockExternalHTTPServer

from newrelic.common.package_version_utils import get_package_version_tuple

# This defines an external server test apps can make requests to instead of
# the real LangChain backend. This provides 3 features:
# the real OpenAI backend. This provides 3 features:
#
# 1) This removes dependencies on external websites.
# 2) Provides a better mechanism for making an external call in a test app than
Expand All @@ -27,13 +30,12 @@
# created by an external call.
# 3) This app runs on a separate thread meaning it won't block the test app.


RESPONSES = {
RESPONSES_V1 = {
"9906": [
{
"Content-Type": "application/json",
"content-type": "application/json",
"openai-organization": "new-relic-nkmd8b",
"openai-processing-ms": "24",
"openai-processing-ms": "23",
"openai-version": "2020-10-01",
"x-ratelimit-limit-requests": "3000",
"x-ratelimit-limit-tokens": "1000000",
Expand All @@ -43,6 +45,7 @@
"x-ratelimit-reset-tokens": "0s",
"x-request-id": "058b2dd82590aa4145e97c2e59681f62",
},
200,
{
"object": "list",
"data": [
Expand All @@ -58,9 +61,9 @@
],
"12833": [
{
"Content-Type": "application/json",
"content-type": "application/json",
"openai-organization": "new-relic-nkmd8b",
"openai-processing-ms": "16",
"openai-processing-ms": "26",
"openai-version": "2020-10-01",
"x-ratelimit-limit-requests": "3000",
"x-ratelimit-limit-tokens": "1000000",
Expand All @@ -70,6 +73,7 @@
"x-ratelimit-reset-tokens": "0s",
"x-request-id": "d5d71019880e25a94de58b927045a202",
},
200,
{
"object": "list",
"data": [
Expand All @@ -86,61 +90,87 @@
}


def simple_get(self):
content_len = int(self.headers.get("content-length"))
content = json.loads(self.rfile.read(content_len).decode("utf-8"))
@pytest.fixture(scope="session")
def simple_get(openai_version, extract_shortened_prompt):
def _simple_get(self):
content_len = int(self.headers.get("content-length"))
content = json.loads(self.rfile.read(content_len).decode("utf-8"))

prompt = extract_shortened_prompt(content)
if not prompt:
self.send_response(500)
self.end_headers()
self.wfile.write("Could not parse prompt.".encode("utf-8"))
return
prompt = extract_shortened_prompt(content)
if not prompt:
self.send_response(500)
self.end_headers()
self.wfile.write("Could not parse prompt.".encode("utf-8"))
return

headers, response = ({}, "")

mocked_responses = RESPONSES_V1

for k, v in mocked_responses.items():
if prompt.startswith(k):
headers, status_code, response = v
break
else: # If no matches found
self.send_response(500)
self.end_headers()
self.wfile.write(("Unknown Prompt:\n%s" % prompt).encode("utf-8"))
return

headers, response = ({}, "")
for k, v in RESPONSES.items():
if prompt.startswith(k):
headers, response = v
break
else: # If no matches found
self.send_response(500)
# Send response code
self.send_response(status_code)

# Send headers
for k, v in headers.items():
self.send_header(k, v)
self.end_headers()
self.wfile.write(("Unknown Prompt:\n%s" % prompt).encode("utf-8"))

# Send response body
self.wfile.write(json.dumps(response).encode("utf-8"))
return

# Send response code
self.send_response(200)
return _simple_get


@pytest.fixture(scope="session")
def MockExternalOpenAIServer(simple_get):
class _MockExternalOpenAIServer(MockExternalHTTPServer):
# To use this class in a test one needs to start and stop this server
# before and after making requests to the test app that makes the external
# calls.

def __init__(self, handler=simple_get, port=None, *args, **kwargs):
super(_MockExternalOpenAIServer, self).__init__(handler=handler, port=port, *args, **kwargs)

return _MockExternalOpenAIServer


# Send headers
for k, v in headers.items():
self.send_header(k, v)
self.end_headers()
@pytest.fixture(scope="session")
def extract_shortened_prompt(openai_version):
def _extract_shortened_prompt(content):
_input = content.get("input", None)
prompt = (_input and str(_input[0][0])) or content.get("messages")[0]["content"]
return prompt

# Send response body
self.wfile.write(json.dumps(response).encode("utf-8"))
return
return _extract_shortened_prompt


def extract_shortened_prompt(content):
prompt = (
content.get("prompt", None)
or "\n".join(str(m) for m in content.get("input")[0])
or "\n".join(m["content"] for m in content.get("messages"))
)
return prompt.lstrip().split("\n")[0]
def get_openai_version():
# Import OpenAI so that get package version can catpure the version from the
# system module. OpenAI does not have a package version in v0.
import openai # noqa: F401; pylint: disable=W0611

return get_package_version_tuple("openai")

class MockExternalLangChainServer(MockExternalHTTPServer):
# To use this class in a test one needs to start and stop this server
# before and after making requests to the test app that makes the external
# calls.

def __init__(self, handler=simple_get, port=None, *args, **kwargs):
super(MockExternalLangChainServer, self).__init__(handler=handler, port=port, *args, **kwargs)
@pytest.fixture(scope="session")
def openai_version():
return get_openai_version()


if __name__ == "__main__":
with MockExternalLangChainServer() as server:
print("MockExternalLangChainServer serving on port %s" % str(server.port))
_MockExternalOpenAIServer = MockExternalOpenAIServer()
with MockExternalOpenAIServer() as server:
print("MockExternalOpenAIServer serving on port %s" % str(server.port))
while True:
pass # Serve forever
158 changes: 80 additions & 78 deletions tests/mlmodel_langchain/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,22 @@
import os

import pytest
from _mock_external_langchain_server import (
MockExternalLangChainServer,
from _mock_external_openai_server import ( # noqa: F401; pylint: disable=W0611
MockExternalOpenAIServer,
extract_shortened_prompt,
get_openai_version,
openai_version,
simple_get,
)
from langchain_community.embeddings.openai import OpenAIEmbeddings
from testing_support.fixture.event_loop import ( # noqa: F401; pylint: disable=W0611
event_loop as loop,
)
from testing_support.fixtures import ( # noqa: F401, pylint: disable=W0611
collector_agent_registration_fixture,
collector_available_fixture,
)

from newrelic.api.time_trace import current_trace
from newrelic.api.transaction import current_transaction
from newrelic.common.object_wrapper import wrap_function_wrapper

Expand All @@ -44,112 +50,108 @@
linked_applications=["Python Agent Test (mlmodel_langchain)"],
)

LANGCHAIN_AUDIT_LOG_FILE = os.path.join(os.path.realpath(os.path.dirname(__file__)), "langchain_audit.log")
LANGCHAIN_AUDIT_LOG_CONTENTS = {}

OPENAI_AUDIT_LOG_FILE = os.path.join(os.path.realpath(os.path.dirname(__file__)), "openai_audit.log")
OPENAI_AUDIT_LOG_CONTENTS = {}
# Intercept outgoing requests and log to file for mocking
RECORDED_HEADERS = set(["x-request-id", "content-type"])


@pytest.fixture(scope="session")
def openai_clients(openai_version, MockExternalOpenAIServer): # noqa: F811
"""
This configures the openai client and returns it for openai v1 and only configures
openai for v0 since there is no client.
"""
from newrelic.core.config import _environ_as_bool

if not _environ_as_bool("NEW_RELIC_TESTING_RECORD_OPENAI_RESPONSES", False):
with MockExternalOpenAIServer() as server:
yield OpenAIEmbeddings(
openai_api_key="NOT-A-REAL-SECRET", openai_api_base="http://localhost:%d" % server.port
)
else:
openai_api_key = os.environ.get("OPENAI_API_KEY")
if not openai_api_key:
raise RuntimeError("OPENAI_API_KEY environment variable required.")

yield OpenAIEmbeddings(openai_api_key=openai_api_key)


@pytest.fixture(scope="session")
def embeding_openai_client(openai_clients):
embedding_client = openai_clients
return embedding_client


# In practice this changes with each run, so to account for this
# in testing, we will set it ourselves
@pytest.fixture
def set_trace_info():
def set_info():
txn = current_transaction()
if txn:
txn.guid = "transaction-id"
txn._trace_id = "trace-id"
trace = current_trace()
if trace:
trace.guid = "span-id"

return set_info


@pytest.fixture(autouse=True, scope="session")
def langchain_server():
def openai_server(
openai_version, # noqa: F811
openai_clients,
wrap_httpx_client_send,
):
"""
This fixture will either create a mocked backend for testing purposes, or will
set up an audit log file to log responses of the real OpenAI backend to a file.
The behavior can be controlled by setting NEW_RELIC_TESTING_RECORD_LANGCHAIN_RESPONSES=1 as
The behavior can be controlled by setting NEW_RELIC_TESTING_RECORD_OPENAI_RESPONSES=1 as
an environment variable to run using the real OpenAI backend. (Default: mocking)
"""

from newrelic.core.config import _environ_as_bool

if not _environ_as_bool("NEW_RELIC_TESTING_RECORD_LANGCHAIN_RESPONSES", False):
# Use mocked OpenAI backend and prerecorded responses
with MockExternalLangChainServer() as server:
os.environ["OPENAI_API_BASE"] = "http://localhost:%d" % server.port
os.environ["OPENAI_API_KEY"] = "NOT-A-REAL-SECRET"
yield
else:
# Use real OpenAI backend and record responses
openai_api_key = os.environ.get("OPENAI_API_KEY", "")
if not openai_api_key:
raise RuntimeError("OPENAI_API_KEY environment variable required.")

# Apply function wrappers to record data
wrap_function_wrapper("openai.api_requestor", "APIRequestor.request", wrap_openai_api_requestor_request)
wrap_function_wrapper(
"openai.api_requestor", "APIRequestor._interpret_response", wrap_openai_api_requestor_interpret_response
)
if _environ_as_bool("NEW_RELIC_TESTING_RECORD_OPENAI_RESPONSES", False):
wrap_function_wrapper("httpx._client", "Client.send", wrap_httpx_client_send)
yield # Run tests

# Write responses to audit log
with open(LANGCHAIN_AUDIT_LOG_FILE, "w") as audit_log_fp:
json.dump(LANGCHAIN_AUDIT_LOG_CONTENTS, fp=audit_log_fp, indent=4)

with open(OPENAI_AUDIT_LOG_FILE, "w") as audit_log_fp:
json.dump(OPENAI_AUDIT_LOG_CONTENTS, fp=audit_log_fp, indent=4)
else:
# We are mocking openai responses so we don't need to do anything in this case.
yield

# Intercept outgoing requests and log to file for mocking
RECORDED_HEADERS = set(["x-request-id", "content-type"])

def bind_send_params(request, *, stream=False, **kwargs):
return request

def wrap_openai_api_requestor_interpret_response(wrapped, instance, args, kwargs):
rbody, rcode, rheaders = bind_request_interpret_response_params(*args, **kwargs)
headers = dict(
filter(
lambda k: k[0].lower() in RECORDED_HEADERS
or k[0].lower().startswith("openai")
or k[0].lower().startswith("x-ratelimit"),
rheaders.items(),
)
)

if rcode >= 400 or rcode < 200:
rbody = json.loads(rbody)
LANGCHAIN_AUDIT_LOG_CONTENTS["error"] = headers, rcode, rbody # Append response data to audit log
return wrapped(*args, **kwargs)
@pytest.fixture(scope="session")
def wrap_httpx_client_send(extract_shortened_prompt): # noqa: F811
def _wrap_httpx_client_send(wrapped, instance, args, kwargs):
request = bind_send_params(*args, **kwargs)
if not request:
return wrapped(*args, **kwargs)

params = json.loads(request.content.decode("utf-8"))
prompt = extract_shortened_prompt(params)

def wrap_openai_api_requestor_request(wrapped, instance, args, kwargs):
params = bind_request_params(*args, **kwargs)
if not params:
return wrapped(*args, **kwargs)
# Send request
response = wrapped(*args, **kwargs)

prompt = extract_shortened_prompt(params)
if response.status_code >= 400 or response.status_code < 200:
prompt = "error"

# Send request
result = wrapped(*args, **kwargs)
rheaders = getattr(response, "headers")

# Clean up data
data = result[0].data
headers = result[0]._headers
headers = dict(
filter(
lambda k: k[0].lower() in RECORDED_HEADERS
or k[0].lower().startswith("openai")
or k[0].lower().startswith("x-ratelimit"),
headers.items(),
headers = dict(
filter(
lambda k: k[0].lower() in RECORDED_HEADERS
or k[0].lower().startswith("openai")
or k[0].lower().startswith("x-ratelimit"),
rheaders.items(),
)
)
)

# Log response
LANGCHAIN_AUDIT_LOG_CONTENTS[prompt] = headers, data # Append response data to audit log
return result


def bind_request_params(method, url, params=None, *args, **kwargs):
return params

body = json.loads(response.content.decode("utf-8"))
OPENAI_AUDIT_LOG_CONTENTS[prompt] = headers, response.status_code, body # Append response data to log
return response

def bind_request_interpret_response_params(result, stream):
return result.content.decode("utf-8"), result.headers
return _wrap_httpx_client_send
Loading

0 comments on commit abfdb1c

Please sign in to comment.