跳转至

17 回调函数:在AI应用中引入异步通信机制

你好,我是黄佳,欢迎来到LangChain实战课!

这节课我们一起来学习一下LangChain中的回调函数。

回调函数和异步编程

回调函数,你可能并不陌生。它是函数A作为参数传给另一个函数B,然后在函数B内部执行函数A。当函数B完成某些操作后,会调用(即“回调”)函数A。这种编程模式常见于处理异步操作,如事件监听、定时任务或网络请求。

在编程中,异步通常是指代码不必等待某个操作完成(如I/O操作、网络请求、数据库查询等)就可以继续执行的能力。异步机制的实现涉及事件循环、任务队列和其他复杂的底层机制。这与同步编程形成对比,在同步编程中,操作必须按照它们出现的顺序完成。

下面是回调函数的一个简单示例。

def compute(x, y, callback):
    result = x + y
    callback(result)

def print_result(value):
    print(f"The result is: {value}")

def square_result(value):
    print(f"The squared result is: {value**2}")

# 使用print_result作为回调
compute(3, 4, print_result)  # 输出: The result is: 7

# 使用square_result作为回调
compute(3, 4, square_result)  # 输出: The squared result is: 49

不过,上面这个程序中并没有体现出异步操作。虽然回调函数这种编程模式常见于处理异步操作,但回调函数本身并不代表异步。回调只是一种编程模式,允许你在某个操作完成时(无论是否异步)执行某些代码。

而下面的例子,就是在异步操作时使用回调函数的示例。

import asyncio

async def compute(x, y, callback):
    print("Starting compute...")
    await asyncio.sleep(0.5)  # 模拟异步操作
    result = x + y
    # callback(result)
    print("Finished compute...")

def print_result(value):
    print(f"The result is: {value}")

async def another_task():
    print("Starting another task...")
    await asyncio.sleep(1)
    print("Finished another task...")

async def main():
    print("Main starts...")
    task1 = asyncio.create_task(compute(3, 4, print_result))
    task2 = asyncio.create_task(another_task())
    
    await task1
    await task2
    print("Main ends...")

asyncio.run(main())

这个示例中,当我们调用 asyncio.create_task(compute(3, 4, print_result)),compute函数开始执行。当它遇到 await asyncio.sleep(2) 时,它会暂停,并将控制权交还给事件循环。这时,事件循环可以选择开始执行another_task,这是另一个异步任务。这样,你可以清晰地看到,尽管compute函数还没有完成,another_task函数也得以开始执行并完成。这就是异步编程,允许你同时执行多个操作,而不需要等待一个完成后再开始另一个。

LangChain 中的 Callback 处理器

LangChain 的 Callback 机制允许你在应用程序的不同阶段进行自定义操作,如日志记录、监控和数据流处理,这个机制通过 CallbackHandler(回调处理器)来实现。

回调处理器是LangChain中实现 CallbackHandler 接口的对象,为每类可监控的事件提供一个方法。当该事件被触发时,CallbackManager 会在这些处理器上调用适当的方法。

BaseCallbackHandler是最基本的回调处理器,你可以继承它来创建自己的回调处理器。它包含了多种方法,如on_llm_start/on_chat(当 LLM 开始运行时调用)和on_llm_error(当 LLM 出现错误时调用)等。

LangChain 也提供了一些内置的处理器,例如 StdOutCallbackHandler,它会将所有事件记录到标准输出。还有FileCallbackHandler,会将所有的日志记录到一个指定的文件中。

在组件中使用回调处理器

在 LangChain 的各个组件,如 Chains、Models、Tools、Agents 等,都提供了两种类型的回调设置方法:构造函数回调和请求回调。你可以在初始化 LangChain 时将回调处理器传入,或者在单独的请求中使用回调。例如,当你想要在整个链的所有请求中进行日志记录时,可以在初始化时传入处理器;而当你只想在某个特定请求中使用回调时,可以在请求时传入。

这两者的区别,我给你整理了一下。

下面这段示例代码,使用 LangChain 执行了一个简单的任务,结合使用 LangChain 的回调机制与 loguru 日志库,将相关事件同时输出到标准输出和 "output.log" 文件中。

from loguru import logger

from langchain.callbacks import FileCallbackHandler
from langchain.chains import LLMChain
from langchain.llms import OpenAI
from langchain.prompts import PromptTemplate

logfile = "output.log"

logger.add(logfile, colorize=True, enqueue=True)
handler = FileCallbackHandler(logfile)

llm = OpenAI()
prompt = PromptTemplate.from_template("1 + {number} = ")

# this chain will both print to stdout (because verbose=True) and write to 'output.log'
# if verbose=False, the FileCallbackHandler will still write to 'output.log'
chain = LLMChain(llm=llm, prompt=prompt, callbacks=[handler], verbose=True)
answer = chain.run(number=2)
logger.info(answer)

其中,初始化LLMChain时指定的 verbose 参数,就等同于将一个输出到控制台的回调处理器添加到你的对象中。这个在你调试程序时非常有用,因为它会将所有事件的信息输出到控制台。

简而言之,LangChain 通过回调系统提供了一种灵活的方式,来监控和操作应用程序的不同阶段。

自定义回调函数

我们也可以通过BaseCallbackHandler和AsyncCallbackHandler来自定义回调函数。下面是一个示例。

import asyncio
from typing import Any, Dict, List

from langchain.chat_models import ChatOpenAI
from langchain.schema import LLMResult, HumanMessage
from langchain.callbacks.base import AsyncCallbackHandler, BaseCallbackHandler

# 创建同步回调处理器
class MyFlowerShopSyncHandler(BaseCallbackHandler):
    def on_llm_new_token(self, token: str, **kwargs) -> None:
        print(f"获取花卉数据: token: {token}")

# 创建异步回调处理器
class MyFlowerShopAsyncHandler(AsyncCallbackHandler):

    async def on_llm_start(
        self, serialized: Dict[str, Any], prompts: List[str], **kwargs: Any
    ) -> None:
        print("正在获取花卉数据...")
        await asyncio.sleep(0.5)  # 模拟异步操作
        print("花卉数据获取完毕。提供建议...")

    async def on_llm_end(self, response: LLMResult, **kwargs: Any) -> None:
        print("整理花卉建议...")
        await asyncio.sleep(0.5)  # 模拟异步操作
        print("祝你今天愉快!")

# 主要的异步函数
async def main():
    flower_shop_chat = ChatOpenAI(
        max_tokens=100,
        streaming=True,
        callbacks=[MyFlowerShopSyncHandler(), MyFlowerShopAsyncHandler()],
    )

    # 异步生成聊天回复
    await flower_shop_chat.agenerate([[HumanMessage(content="哪种花卉最适合生日?只简单说3种,不超过50字")]])

# 运行主异步函数
asyncio.run(main())

在这个鲜花店客服的程序中,当客户问及关于鲜花的建议时,我们使用了一个同步和一个异步回调。

MyFlowerShopSyncHandler 是一个同步回调,每当新的Token生成时,它就简单地打印出正在获取的鲜花数据。

而 MyFlowerShopAsyncHandler 则是异步的,当客服开始提供鲜花建议时,它会模拟数据的异步获取。在建议完成后,它还会模拟一个结束的操作,如向客户发出感谢。

这种结合了同步和异步操作的方法,使得程序能够更有效率地处理客户请求,同时提供实时反馈。

这里的异步体现在这样几个方面。

  1. 模拟延时操作:在MyFlowerShopAsyncHandler中,我们使用了await asyncio.sleep(0.5)来模拟其他请求异步获取花卉信息的过程。当执行到这个await语句时,当前的on_llm_start函数会“暂停”,释放控制权回到事件循环。这意味着,在这个sleep期间,其他异步任务(如其他客户的请求)可以被处理。
  2. 回调机制:当ChatOpenAI在处理每个新Token时,它会调用on_llm_new_token方法。因为这是一个同步回调,所以它会立即输出。但是,开始和结束的异步回调on_llm_start和on_llm_end在开始和结束时都有一个小的延时操作,这是通过await asyncio.sleep(0.5)模拟的。
  3. 事件循环:Python的syncio库提供了一个事件循环,允许多个异步任务并发运行。在我们的例子中,虽然看起来所有的操作都是按顺序发生的,但由于我们使用了异步操作和回调,如果有其他并发任务,它们可以在await暂停期间运行。

为了更清晰地展示异步的优势,通常我们会在程序中同时运行多个异步任务,并观察它们如何“并发”执行。但在这个简单的例子中,我们主要是通过模拟延时来展示异步操作的基本机制。

因此说,回调函数为异步操作提供了一个机制,使你可以定义“当操作完成时要做什么”,而异步机制的真正实现涉及更深层次的底层工作,如事件循环和任务调度。

用 get_openai_callback 构造令牌计数器

下面,我带着你使用LangChain中的回调函数来构造一个令牌计数器。这个计数功能对于监控大模型的会话消耗以及成本控制十分重要。

在构造令牌计数器之前,我们来回忆一下第10课中的记忆机制。我们用下面的代码生成了ConversationBufferMemory。

from langchain import OpenAI
from langchain.chains import ConversationChain
from langchain.chains.conversation.memory import ConversationBufferMemory

# 初始化大语言模型
llm = OpenAI(
    temperature=0.5,
    model_name="gpt-3.5-turbo-instruct")

# 初始化对话链
conversation = ConversationChain(
    llm=llm,
    memory=ConversationBufferMemory()
)

# 第一天的对话
# 回合1
conversation("我姐姐明天要过生日,我需要一束生日花束。")
print("第一次对话后的记忆:", conversation.memory.buffer)

# 回合2
conversation("她喜欢粉色玫瑰,颜色是粉色的。")
print("第二次对话后的记忆:", conversation.memory.buffer)

# 回合3 (第二天的对话)
conversation("我又来了,还记得我昨天为什么要来买花吗?")
print("/n第三次对话后时提示:/n",conversation.prompt.template)
print("/n第三次对话后的记忆:/n", conversation.memory.buffer)

同时,我们也给出了各种记忆机制对Token的消耗数量的估算示意图。

不过,这张图毕竟是估算,要真正地衡量出每种记忆机制到底耗费了多少个Token,那就需要回调函数上场了。

下面,我们通过回调函数机制,重构这段程序。为了做到这一点,我们首先需要确保在与大语言模型进行交互时,使用了get_openai_callback上下文管理器。

在Python中,一个上下文管理器通常用于管理资源,如文件或网络连接,这些资源在使用前需要设置,在使用后需要清理。上下文管理器经常与with语句一起使用,以确保资源正确地设置和清理。

get_openai_callback被设计用来监控与OpenAI交互的Token数量。当你进入该上下文时,它会通过监听器跟踪Token的使用。当你退出上下文时,它会清理监听器并提供一个Token的总数。通过这种方式,它充当了一个回调机制,允许你在特定事件发生时执行特定的操作或收集特定的信息。

具体代码如下:

from langchain import OpenAI
from langchain.chains import ConversationChain
from langchain.chains.conversation.memory import ConversationBufferMemory
from langchain.callbacks import get_openai_callback

# 初始化大语言模型
llm = OpenAI(temperature=0.5, model_name="gpt-3.5-turbo-instruct")

# 初始化对话链
conversation = ConversationChain(
    llm=llm,
    memory=ConversationBufferMemory()
)

# 使用context manager进行token counting
with get_openai_callback() as cb:
    # 第一天的对话
    # 回合1
    conversation("我姐姐明天要过生日,我需要一束生日花束。")
    print("第一次对话后的记忆:", conversation.memory.buffer)

    # 回合2
    conversation("她喜欢粉色玫瑰,颜色是粉色的。")
    print("第二次对话后的记忆:", conversation.memory.buffer)

    # 回合3 (第二天的对话)
    conversation("我又来了,还记得我昨天为什么要来买花吗?")
    print("/n第三次对话后时提示:/n",conversation.prompt.template)
    print("/n第三次对话后的记忆:/n", conversation.memory.buffer)

# 输出使用的tokens
print("\n总计使用的tokens:", cb.total_tokens)

这里,我使用了get_openai_callback上下文管理器来监控与ConversationChain的交互。这允许我们计算在这些交互中使用的总Tokens数。

输出:

总计使用的tokens: 966

下面,我再添加了一个additional_interactions异步函数,用于演示如何在多个并发交互中计算Tokens。

当我们讨论异步交互时,指的是我们可以启动多个任务,它们可以并发(而不是并行)地运行,并且不会阻塞主线程。在Python中,这是通过asyncio库实现的,它使用事件循环来管理并发的异步任务。

import asyncio
# 进行更多的异步交互和token计数
async def additional_interactions():
    with get_openai_callback() as cb:
        await asyncio.gather(
            *[llm.agenerate(["我姐姐喜欢什么颜色的花?"]) for _ in range(3)]
        )
    print("\n另外的交互中使用的tokens:", cb.total_tokens)

# 运行异步函数
asyncio.run(additional_interactions())

简单解释一下。

  1. async def:这表示additional_interactions是一个异步函数。它可以使用await关键字在其中挂起执行,允许其他异步任务继续。
  2. await asyncio.gather(...):这是asyncio库提供的一个非常有用的方法,用于并发地运行多个异步任务。它会等待所有任务完成,然后继续执行。
  3. *[llm.agenerate(["我姐姐喜欢什么颜色的花?"]) for _ in range(3)]:这实际上是一个Python列表解析,它生成了3个 llm.agenerate(…)的异步调用。asyncio.gather将并发地运行这3个调用。

由于这3个llm.agenerate调用是并发的,所以它们不会按顺序执行,而是几乎同时启动,并在各自完成时返回。这意味着,即使其中一个调用由于某种原因需要更长时间,其他调用也不会被阻塞,它们会继续并完成。

总结时刻

回调函数是计算机科学中一个重要和广泛应用的概念,它允许我们在特定的时间或条件下执行特定的代码。

回调函数在开发过程中有很多应用场景。

  1. 异步编程:在JavaScript中,回调函数常常用于异步编程。例如,当你发送一个AJAX请求到服务器时,你可以提供一个回调函数,这个函数将在服务器的响应到达时被调用。
  2. 事件处理:在许多编程语言和框架中,回调函数被用作事件处理器。例如,你可能会写一个回调函数来处理用户的点击事件,当用户点击某个按钮时,这个函数就会被调用。
  3. 定时器:你可以使用回调函数来创建定时器。例如,你可以使用JavaScript的setTimeout或setInterval函数,并提供一个回调函数,这个函数会在指定的时间过后被调用。

在 LangChain 中,回调机制同样为用户提供了灵活性和自定义能力,以便更好地控制和响应事件。CallbackHandler允许开发者在链的特定阶段或条件下注入自定义的行为,例如异步编程中的响应处理、事件驱动编程中的事件处理等。这为 LangChain 提供了灵活性和扩展性,使其能够适应各种应用场景。

思考题

  1. 我通过get_openai_callback重构了ConversationBufferMemory的程序,你能否把这个令牌计数器实现到其他记忆机制中?
  2. 在LangChain开发过程中,可以在构造函数中引入回调机制,我给出了一个示例,你能否尝试在请求过程(run/apply方法)中引入回调机制?

提示:请求回调常用在流式传输的实现中。在传统的传输中,我们必须等待这个函数生成所有数据后才能开始处理。在流式传输中,我们可以在数据被生成时立即开始处理。如果你想将单个请求的输出流式传输到一个WebSocket,你可以将一个Callback处理器传递给 call() 方法。

期待在留言区看到你的分享,如果觉得内容对你有帮助,也欢迎分享给有需要的朋友!最后如果你学有余力,可以进一步学习下面的延伸阅读。

延伸阅读

  1. GitHub 代码:CallbackHandler 中的可监控事件和方法
  2. 文档:LangChain中的回调机制
  3. 文档:什么是回调函数(知乎)
精选留言(7)
  • 阿斯蒂芬 👍(1) 💬(1)

    笔记mark: 1.疑似纠错: 第二个代码示例后写道:【compute 函数开始执行。当它遇到 await asyncio.sleep(2) 时,它会暂停】 但是代码中是 await asyncio.sleep(0.5),休眠时长会影响最终程序的打印输出顺序; 后面花卉部分 【在 MyFlowerShopAsyncHandler 中,我们使用了 await asyncio.sleep(0.3) 】也与代码中的 await asyncio.sleep(0.5) 不一致; 如果属实,还是建议修改下,否则容易造成困惑 2. 自定义回调函数 代码报错问题 一开始直接使用老师的代码,未能获得流式响应的打印,出现报错: Retrying langchain.chat_models.openai.acompletion_with_retry.<locals>._completion_with_retry in 4.0 seconds as it raised APIConnectionError: Error communicating with OpenAI. 但是此时非流式的响应,是能够正常完成的; 折腾许久,定位到是openai代理的问题,但是流式响应是通过SSE协议,此时vpn似乎被绕过了,将vpn代理显示添加到 OPENAI_PROXY 环境变量后解决 3. 最后的 additional_interactions() 示例中,可以将 asynio.gather 的返回结果打印出来,能够看到每个任务使用的token数量,与最终的总数是一致的

    2023-10-20

  • 阿斯蒂芬 👍(1) 💬(1)

    来交作业了: 思考题1: 在确保效果相同的三轮交互中,使用了其他记忆机制,并记录令牌使用情况(分别测试三次取范围值) ConversationBufferMemory: 1000 ~ 1500 ConversationBufferWindowMemory(k=2): 1200 ~ 1600 ConversationSummaryMemory: 2000 ~ 2500 ConversationSummaryBufferMemory(max_token_limt=300): 1000~1500 大致看也符合估算示意图第一阶段 0-5 interacitions 的走势,ConversationSummaryMemory 增长较快,其他几个增长速率较为一致 思考题2: 使用LLMChain的 run 方法也可以传递callback class MyCallBackHandler(BaseCallbackHandler): def on_llm_new_token(self, token: str, **kwargs) -> None: print(f"recv token: {token}") llm = ChatOpenAI(streaming=True) prompt = PromptTemplate.from_template("1 + {number} = ") chain = LLMChain(llm=llm, prompt=prompt) chain.run(number=2, callbacks=[MyCallBackHandler()]) 其实这个示例就是从老师的延伸阅读中“拿来”的,不知答对没

    2023-10-20

  • 悟尘 👍(0) 💬(1)

    老师,这节课少了输出结果的演示~理解起来有点费劲

    2023-11-12

  • yanyu-xin 👍(1) 💬(0)

    ###3 将课程代码到大模型修订为国产通义千问模型(03_LangChainOpenAICallback.py) ###“用 get_openai_callback 构造令牌计数器”代码。用 ChatOpenAI和千问模型平替 OpenAI # 原代码3 llm = OpenAI() # 新代码3 from langchain_openai import ChatOpenAI llm = ChatOpenAI( model_name="qwen-turbo", stream_usage=True, api_key= “API_KEY”, #填写你自己的DASHSCOPE_API_KEY base_url="https://dashscope.aliyuncs.com/compatible-mode/v1", ) ###4 修改“additional_interactions 异步函数”代码(03_LangChainOpenAICallback.py),将llm.agenerate中的参数用 langchain_core.messages 修订。 # 旧代码4 import asyncio async def additional_interactions(): with get_openai_callback() as cb: await asyncio.gather( *[llm.agenerate(["我姐姐喜欢什么颜色的花?"]) for _ in range(3)] ) print("\n另外的交互中使用的tokens:", cb.total_tokens) asyncio.run(additional_interactions()) # 新代码4 import asyncio from langchain_core.messages import HumanMessage, SystemMessage messages = [[ SystemMessage(content="你是个花店小助手。"), HumanMessage(content="我姐姐喜欢什么颜色的花?"), ]] #进行更多的异步交互和token计数 async def additional_interactions(): print("\n开始进行更多的交互...") with get_openai_callback() as cb: await asyncio.gather( *[llm.agenerate(messages) for _ in range(3)] ) print(f"交互{i+1}使用的tokens: {cb.total_tokens}") asyncio.run(additional_interactions())

    2024-08-31

  • yanyu-xin 👍(1) 💬(0)

    将课程代码到大模型修订为国产通义千问模型: ###1 修改“在组件中使用回调处理器”代码,用 ChatOpenAI和千问模型平替 OpenAI # 原代码1 llm = OpenAI() # 新代码1 from langchain_openai import ChatOpenAI llm = ChatOpenAI( model_name="qwen-turbo", #用通义模型 api_key=“API_KEY”, #填写你自己的DASHSCOPE_API_KEY base_url="https://dashscope.aliyuncs.com/compatible-mode/v1" ) ———— ###2 修改“自定义回调函数”代码,将通义模型平替OpenAI # 原代码2 flower_shop_chat = ChatOpenAI( max_tokens=100, streaming=True, callbacks=[MyFlowerShopSyncHandler(), MyFlowerShopAsyncHandler()], ) # 新代码2 flower_shop_chat = ChatOpenAI( model_name="qwen-turbo", #用千问模型 api_key=“API_KEY”, #填写你自己的DASHSCOPE_API_KEY base_url="https://dashscope.aliyuncs.com/compatible-mode/v1", max_tokens=100, streaming=True, callbacks=[MyFlowerShopSyncHandler(), MyFlowerShopAsyncHandler()], )

    2024-08-31

  • 张帅 👍(0) 💬(0)

    *[llm.agenerate(["我姐姐喜欢什么颜色的花?"]) for _ in range(3)]这一行运行不了了,会抛出错误【ValueError: Got unsupported message type: 我】,改成 *[llm.agenerate([[HumanMessage("我姐姐喜欢什么颜色的花?")]]) for _ in range(3)] 可以成功运行

    2024-12-29

  • 张申傲 👍(0) 💬(0)

    第17讲打卡~ 可以通过回调机制,将LLM运行过程中产生的日志异步写入文件或日志服务,后续通过ELK等机制进行日志采集和链路追踪

    2024-07-18