Files
langchain/langchain
Vijay 2b3b4e0f60 Add the ability to run the map_reduce chains process results step as async (#6181)
This will add the ability to add an AsyncCallbackManager (handler) for
the reducer chain, which would be able to stream the tokens via the
`async def on_llm_new_token` callback method



Fixes # (issue)
[5532](https://github.com/hwchase17/langchain/issues/5532)


 @hwchase17  @agola11 
The following code snippet explains how this change would be used to
enable `reduce_llm` with streaming support in a `map_reduce` chain

I have tested this change and it works for the streaming use-case of
reducer responses. I am happy to share more information if this makes
solution sense.

```

AsyncHandler
..........................
class StreamingLLMCallbackHandler(AsyncCallbackHandler):
    """Callback handler for streaming LLM responses."""

    def __init__(self, websocket):
        self.websocket = websocket
    
    # This callback method is to be executed in async
    async def on_llm_new_token(self, token: str, **kwargs: Any) -> None:
        resp = ChatResponse(sender="bot", message=token, type="stream")
        await self.websocket.send_json(resp.dict())


Chain
..........
stream_handler = StreamingLLMCallbackHandler(websocket)
stream_manager = AsyncCallbackManager([stream_handler])

streaming_llm = ChatOpenAI(
        streaming=True,
        callback_manager=stream_manager,
        verbose=False,
        temperature=0,
    )
    main_llm = OpenAI(
        temperature=0,
        verbose=False,
    )

    doc_chain = load_qa_chain(
        llm=main_llm,
        reduce_llm=streaming_llm,
        chain_type="map_reduce", 
        callback_manager=manager
    )
    qa_chain = ConversationalRetrievalChain(
        retriever=vectorstore.as_retriever(),
        combine_docs_chain=doc_chain,
        question_generator=question_generator,
        callback_manager=manager,
    )
    
    # Here `acall` will trigger `acombine_docs` on `map_reduce` which should then call `_aprocess_result` which in turn will call `self.combine_document_chain.arun` hence async callback will be awaited
    result = await qa_chain.acall(
         {"question": question, "chat_history": chat_history}
      )
```
2023-06-18 13:19:56 -07:00
..
2023-06-17 11:00:35 -07:00
2023-06-17 19:08:25 +01:00
2023-06-15 11:24:11 -07:00
2023-06-16 11:52:56 -07:00
2023-06-17 19:08:25 +01:00
2023-06-17 19:08:25 +01:00
2023-06-17 19:08:25 +01:00
2023-06-17 11:00:47 -07:00
2023-05-17 01:28:43 +00:00
2023-06-11 15:51:28 -07:00
2023-06-08 21:15:14 -07:00
2023-06-03 16:48:48 -07:00
2022-11-12 11:22:32 -08:00
2023-06-17 19:08:25 +01:00
2023-02-20 21:15:45 -08:00