Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(actions): enable streaming in custom actions #735

Open
wants to merge 2 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 93 additions & 0 deletions examples/configs/rag/custom_rag_streaming/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
# Custom RAG with streaming

It is possible to use streaming in custom actions, such as RAG.

This is because the streaming handler is defined and available as a context variable.

```python
import contextvars
streaming_handler_var = contextvars.ContextVar("streaming_handler", default=None)
streaming_handler: StreamingHandler = streaming_handler_var.get()
```

But let's first clarify the folder structure of this example:

- `kb/` - A folder containing our knowledge base to retrieve context from. This folder includes the March 2023 US Jobs
report in `kb/report.md`.
- `rails/output.co` - A colang file that contains a flow that routes all user messages into our
custom RAG.
- `config.py` - The config file containing the custom RAG action, the disclaimer action, and the init function that gets
called as part of the initialization of the LLMRails instance.
- `config.yml` - The config file holding all the configuration options.

The following code samples demonstrate the core of this example in action:

```colang
# output.co

define flow answer report question
user ...
$answer = execute rag
bot $answer
$disclaimer = execute disclaimer
bot $disclaimer
```

```python
# config.py

class ContinuousStreamingHandler(StreamingHandler):
async def _process(self, chunk: str):
"""Processes a chunk of text.

Stops the stream if the chunk is `""` or `None` (stopping chunks).
In case you want to keep the stream open, all non-stopping chunks can be piped to a specified handler.
"""
if chunk is None or chunk == "":
await self.queue.put(chunk)
self.streaming_finished_event.set()
self.top_k_nonempty_lines_event.set()
return

await super()._process(chunk)


async def rag(context: dict, llm: BaseLLM, kb: KnowledgeBase) -> ActionResult:

# ...

chain = prompt_template | llm | output_parser

# 💡 Enable streaming
streaming_handler: StreamingHandler = streaming_handler_var.get()
local_streaming_handler = ContinuousStreamingHandler()
local_streaming_handler.set_pipe_to(streaming_handler)

config = RunnableConfig(callbacks=[local_streaming_handler])
answer = await chain.ainvoke(input_variables, config)

return ActionResult(return_value=answer, context_updates=context_updates)
```

Here's what's happening, step by step:

1. We define a custom RAG chain using LangChain's LCEL, but it could be any library. For example, you could call the
`openai` library directly.
2. We then define a `RunnableConfig` with a local streaming handler as a callback. The local handler is configured to
pipe the stream to the main streaming handler. The idea behind this is to handle stream-stopping chunks (`""` or
`None`) only locally, while keeping the main streaming handler running. This enables streaming results from multiple
actions.
3. We then invoke the chain with the config, which will trigger the streaming handler to be called.
4. Finally, we return the final answer as `ActionResult` which enables downstream processing. In this example, we define
a `disclaimer` action that just prints a sentence; it could also access the final answer or other context
variables we define as `context_updates`.

_Note: For simplicity, we re-use the LLM instance configured in [config.yml](./config.yml) as well as the
built-in retrieval via the knowledge base._

## Run the example

```shell
$ export OPENAI_API_KEY='sk-xxx'
$ python -m nemoguardrails.__main__ chat --config /<path_to>/examples/configs/rag/custom_rag_streaming --streaming
```
100 changes: 100 additions & 0 deletions examples/configs/rag/custom_rag_streaming/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
# SPDX-FileCopyrightText: Copyright (c) 2023 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from langchain.llms.base import BaseLLM
from langchain.prompts import PromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnableConfig

from nemoguardrails import LLMRails
from nemoguardrails.actions.actions import ActionResult
from nemoguardrails.context import streaming_handler_var
from nemoguardrails.kb.kb import KnowledgeBase
from nemoguardrails.streaming import StreamingHandler

TEMPLATE = """Use the following pieces of context to answer the question at the end.
If you don't know the answer, just say that you don't know, don't try to make up an answer.
Use three sentences maximum and keep the answer as concise as possible.
Always say "thanks for asking!" at the end of the answer.

{context}

Question: {question}

Helpful Answer:"""


class ContinuousStreamingHandler(StreamingHandler):
async def _process(self, chunk: str):
"""Processes a chunk of text.

Stops the stream if the chunk is `""` or `None` (stopping chunks).
In case you want to keep the stream open, all non-stopping chunks can be piped to a specified handler.
"""
if chunk is None or chunk == "":
await self.queue.put(chunk)
self.streaming_finished_event.set()
self.top_k_nonempty_lines_event.set()
return

await super()._process(chunk)


async def rag(context: dict, llm: BaseLLM, kb: KnowledgeBase) -> ActionResult:
user_message = context.get("last_user_message")
context_updates = {}

# For our custom RAG, we re-use the built-in retrieval
chunks = await kb.search_relevant_chunks(user_message)
relevant_chunks = "\n".join([chunk["body"] for chunk in chunks])
# Store the chunks for downstream use
context_updates["relevant_chunks"] = relevant_chunks

# Use a custom prompt template
prompt_template = PromptTemplate.from_template(TEMPLATE)
input_variables = {"question": user_message, "context": relevant_chunks}
# Store the template for downstream use
context_updates["_last_bot_prompt"] = prompt_template.format(**input_variables)

print(f"💬 RAG :: prompt_template: {context_updates['_last_bot_prompt']}")

# Put together a simple LangChain chain
output_parser = StrOutputParser()
chain = prompt_template | llm | output_parser

# 💡 Enable streaming
streaming_handler: StreamingHandler = streaming_handler_var.get()
local_streaming_handler = ContinuousStreamingHandler()
local_streaming_handler.set_pipe_to(streaming_handler)

config = RunnableConfig(callbacks=[local_streaming_handler])
answer = await chain.ainvoke(input_variables, config)

context_updates["streamed_bot_message"] = answer

return ActionResult(return_value=None, context_updates=context_updates)


async def disclaimer(context: dict) -> ActionResult:
last_bot_message = context["streamed_bot_message"]

return ActionResult(
return_value="I learn something new every day, so my answers may not always be perfect."
)


def init(app: LLMRails):
app.register_action(rag, "rag")
app.register_action(disclaimer, "disclaimer")
6 changes: 6 additions & 0 deletions examples/configs/rag/custom_rag_streaming/config.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
streaming: True

models:
- type: main
engine: openai
model: gpt-3.5-turbo
135 changes: 135 additions & 0 deletions examples/configs/rag/custom_rag_streaming/kb/report.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
# Jobs Report - March 2023

Technical information:
Household data: (202) 691-6378 * [email protected] * www.bls.gov/cps
Establishment data: (202) 691-6555 * [email protected] * www.bls.gov/ces

Media contact: (202) 691-5902 * [email protected]


THE EMPLOYMENT SITUATION -- MARCH 2023


Total nonfarm payroll employment rose by 236,000 in March, and the unemployment rate
changed little at 3.5 percent, the U.S. Bureau of Labor Statistics reported today.
Employment continued to trend up in leisure and hospitality, government, professional
and business services, and health care.

This news release presents statistics from two monthly surveys. The household survey
measures labor force status, including unemployment, by demographic characteristics.
The establishment survey measures nonfarm employment, hours, and earnings by industry.
For more information about the concepts and statistical methodology used in these two
surveys, see the Technical Note.

## Household Survey Data

Both the unemployment rate, at 3.5 percent, and the number of unemployed persons, at
5.8 million, changed little in March. These measures have shown little net movement
since early 2022. (See table A-1.)

Among the major worker groups, the unemployment rate for Hispanics decreased to 4.6
percent in March, essentially offsetting an increase in the prior month. The
unemployment rates for adult men (3.4 percent), adult women (3.1 percent), teenagers
(9.8 percent), Whites (3.2 percent), Blacks (5.0 percent), and Asians (2.8 percent)
showed little or no change over the month. (See tables A-1, A-2, and A-3.)

Among the unemployed, the number of permanent job losers increased by 172,000 to 1.6
million in March, and the number of reentrants to the labor force declined by 182,000
to 1.7 million. (Reentrants are persons who previously worked but were not in the
labor force prior to beginning their job search.) (See table A-11.)

The number of long-term unemployed (those jobless for 27 weeks or more) was little
changed at 1.1 million in March. These individuals accounted for 18.9 percent of all
unemployed persons. (See table A-12.)

The labor force participation rate, at 62.6 percent, continued to trend up in March.
The employment-population ratio edged up over the month to 60.4 percent. These
measures remain below their pre-pandemic February 2020 levels (63.3 percent and 61.1
percent, respectively). (See table A-1.)

The number of persons employed part time for economic reasons was essentially
unchanged at 4.1 million in March. These individuals, who would have preferred full-
time employment, were working part time because their hours had been reduced or
they were unable to find full-time jobs. (See table A-8.)

The number of persons not in the labor force who currently want a job was little
changed at 4.9 million in March and has returned to its February 2020 level. These
individuals were not counted as unemployed because they were not actively looking
for work during the 4 weeks preceding the survey or were unavailable to take a job.
(See table A-1.)

Among those not in the labor force who wanted a job, the number of persons marginally
attached to the labor force was little changed at 1.3 million in March. These
individuals wanted and were available for work and had looked for a job sometime
in the prior 12 months but had not looked for work in the 4 weeks preceding the
survey. The number of discouraged workers, a subset of the marginally attached who
believed that no jobs were available for them, also was little changed over the month
at 351,000. (See Summary table A.)

## Establishment Survey Data

Total nonfarm payroll employment increased by 236,000 in March, compared with the
average monthly gain of 334,000 over the prior 6 months. In March, employment
continued to trend up in leisure and hospitality, government, professional and
business services, and health care. (See table B-1.)

Leisure and hospitality added 72,000 jobs in March, lower than the average monthly
gain of 95,000 over the prior 6 months. Most of the job growth occurred in food
services and drinking places, where employment rose by 50,000 in March. Employment
in leisure and hospitality is below its pre-pandemic February 2020 level by 368,000,
or 2.2 percent.

Government employment increased by 47,000 in March, the same as the average monthly
gain over the prior 6 months. Overall, employment in government is below its February
2020 level by 314,000, or 1.4 percent.

Employment in professional and business services continued to trend up in March
(+39,000), in line with the average monthly growth over the prior 6 months (+34,000).
Within the industry, employment in professional, scientific, and technical services
continued its upward trend in March (+26,000).

Over the month, health care added 34,000 jobs, lower than the average monthly gain
of 54,000 over the prior 6 months. In March, job growth occurred in home health
care services (+15,000) and hospitals (+11,000). Employment continued to trend up
in nursing and residential care facilities (+8,000).

Employment in social assistance continued to trend up in March (+17,000), in line
with the average monthly growth over the prior 6 months (+22,000).

In March, employment in transportation and warehousing changed little (+10,000).
Couriers and messengers (+7,000) and air transportation (+6,000) added jobs, while
warehousing and storage lost jobs (-12,000). Employment in transportation and
warehousing has shown little net change in recent months.

Employment in retail trade changed little in March (-15,000). Job losses in building
material and garden equipment and supplies dealers (-9,000) and in furniture, home
furnishings, electronics, and appliance retailers (-9,000) were partially offset
by a job gain in department stores (+15,000). Retail trade employment is little
changed on net over the year.

Employment showed little change over the month in other major industries, including
mining, quarrying, and oil and gas extraction; construction; manufacturing; wholesale
trade; information; financial activities; and other services.

In March, average hourly earnings for all employees on private nonfarm payrolls
rose by 9 cents, or 0.3 percent, to $33.18. Over the past 12 months, average hourly
earnings have increased by 4.2 percent. In March, average hourly earnings of
private-sector production and nonsupervisory employees rose by 9 cents, or 0.3
percent, to $28.50. (See tables B-3 and B-8.)

The average workweek for all employees on private nonfarm payrolls edged down by
0.1 hour to 34.4 hours in March. In manufacturing, the average workweek was unchanged
at 40.3 hours, and overtime remained at 3.0 hours. The average workweek for production
and nonsupervisory employees on private nonfarm payrolls was unchanged at 33.9 hours.
(See tables B-2 and B-7.)

The change in total nonfarm payroll employment for January was revised down by
32,000, from +504,000 to +472,000, and the change for February was revised up by
15,000, from +311,000 to +326,000. With these revisions, employment in January and
February combined is 17,000 lower than previously reported. (Monthly revisions result
from additional reports received from businesses and government agencies since the
last published estimates and from the recalculation of seasonal factors.)

_____________
The Employment Situation for April is scheduled to be released on Friday,
May 5, 2023, at 8:30 a.m. (ET).
11 changes: 11 additions & 0 deletions examples/configs/rag/custom_rag_streaming/rails/output.co
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
define user ask about report
"What was last month's unemployment rate?"
"Which industry added the most jobs?"
"How many jobs were added in the transportation industry?"

define flow answer report question
user ...
$answer = execute rag
bot $answer
$disclaimer = execute disclaimer
bot $disclaimer
13 changes: 2 additions & 11 deletions nemoguardrails/actions/llm/generation.py
Original file line number Diff line number Diff line change
Expand Up @@ -951,14 +951,8 @@ async def generate_bot_message(
context_updates=context_updates,
)
else:
# In streaming mode, we also push this.
bot_utterance = "I'm not sure what to say."
if streaming_handler:
await streaming_handler.push_chunk(bot_utterance)

return ActionResult(
events=[new_event_dict("BotMessage", text=bot_utterance)],
context_updates=context_updates,
log.info(
"Generated bot message: %s", bot_utterance if bot_utterance else "None"
)

@action(is_system_action=True)
Expand Down Expand Up @@ -1264,9 +1258,6 @@ async def generate_intent_steps_message(
else:
bot_intent = "general response"

if not bot_message:
bot_message = "I'm not sure what to say."

log.info(
"Canonical form for user intent: "
+ (user_intent if user_intent else "None")
Expand Down