上一篇讲了 Model I/O——模型调用、提示词模板、输出解析器、部署服务。这篇接着往下走,聊 LangChain 的链式表达语言 LCEL,以及怎么给大模型加上"记忆"。代码都跑通了,直接拿去用。
1. LCEL 是什么
LCEL 全称 LangChain Expression Language。说白了就是一套用管道符 | 把组件串起来的写法。你有一个 prompt、一个模型、一个输出解析器,用 | 连起来就是一条链。
这个设计思路很 Unix——每个组件只干一件事,数据从左往右流。
上一篇已经用过最基本的形式:
chain = prompt | model | parser
result = chain.invoke({"language": "中文", "text": "hello"})
这篇深入一点,看看 LCEL 还能怎么玩。
2. 顺序链:RunnableSequence
最简单的链就是把 prompt、model、parser 串起来:
from langchain_community.chat_models import ChatTongyi
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import PromptTemplate
from langchain_core.runnables import RunnableSequence
model = ChatTongyi()
prompt = PromptTemplate(
input_variables=["topic"],
template="用5句话来介绍{topic}"
)
out = StrOutputParser()
# 两种写法等价
chain = RunnableSequence(prompt, model, out)
# chain = prompt | model | out
print(chain.invoke({"topic": "人工智能"}))
invoke 是同步调用。输入一个字典,数据依次经过 prompt 格式化、模型推理、字符串解析,最后拿到结果。管道符写法是语法糖,底层就是 RunnableSequence。
3. 并行链:RunnableParallel
有时候你想对同一个输入做不同的处理。RunnableParallel 让多个函数同时执行,结果用字典返回:
from langchain_core.runnables import RunnableParallel, RunnableLambda
def add_one(x: int) -> int:
return x + 1
def mul_two(x: int) -> int:
return x * 2
def mul_three(x: int) -> int:
return x * 3
chain = RunnableParallel(
a=add_one,
b=mul_two,
c=mul_three,
)
print(chain.invoke(1)) # {'a': 2, 'b': 2, 'c': 3}
也可以把并行链接在顺序链后面:
chain1 = RunnableLambda(add_one)
chain2 = chain1 | RunnableParallel(
a=mul_two,
b=mul_three,
)
print(chain2.invoke(2)) # 先 +1 得到 3,再并行 *2 和 *3 → {'a': 6, 'b': 9}
4. RunnablePassthrough:数据透传和增强
这个组件我一开始没搞懂它的用处。其实它做两件事:原样传递数据,或者在传递的同时给数据加点料。
from langchain_core.runnables import RunnableParallel, RunnablePassthrough
# 原样传递
chain = RunnableParallel(
passed=RunnablePassthrough(),
)
print(chain.invoke("hello world")) # {'passed': 'hello world'}
# 数据增强:保留原数据的同时追加一个字段
chain = RunnableParallel(
passed=RunnablePassthrough().assign(
modified=lambda x: x["k1"] + "!!!"
),
)
print(chain.invoke({"k1": "hello world"}))
# {'passed': {'k1': 'hello world', 'modified': 'hello world!!!'}}
.assign() 很实用。比如你想在调用模型前,把用户输入做一些预处理然后一起传下去,就能用这招。
5. RunnableLambda:把普通函数变成链的节点
任何 Python 函数都能通过 RunnableLambda 变成链的一部分:
from langchain_core.runnables import RunnableLambda, chain
from operator import itemgetter
def length_function(text):
return len(text)
# 方式一:直接包装
chain = RunnableLambda(length_function)
print(chain.invoke("hello")) # 5
# 方式二:用装饰器 @chain
@chain
def multiple_length_function(_dict):
return len(_dict["text1"]) * len(_dict["text2"])
装饰器写法更简洁,适合逻辑稍复杂的场景。
实际用的时候经常配合 itemgetter 从字典里取值,串成更复杂的数据流:
from langchain_core.prompts import ChatPromptTemplate
prompt = ChatPromptTemplate.from_template("{a} + {b} = ? 计算结果是多少?")
model = ChatTongyi()
out = StrOutputParser()
chain = (
{
"a": itemgetter("k1") | RunnableLambda(length_function), # 5
"b": {"text1": itemgetter("k1"), "text2": itemgetter("k2")} | multiple_length_function # 25
}
| prompt | model | out
)
print(chain.invoke({"k1": "hello", "k2": "world"}))
这个例子有点绕,但能看出 LCEL 的组合能力——字典里的每个 key 都是一条独立的数据处理管道,最后汇总给 prompt。
6. 批量调用和流式输出
LCEL 链自带 batch 和 stream 方法,不用额外写循环。
batch:并发请求多个输入
from langchain_core.prompts import ChatPromptTemplate
chain = (
ChatPromptTemplate.from_template("用一句话介绍{topic}")
| ChatTongyi()
| StrOutputParser()
)
topics = ["人工智能", "区块链", "量子计算", "基因编辑"]
inputs = [{"topic": topic} for topic in topics]
# 并发执行,比逐个调用快不少
results = chain.batch(inputs)
注意:API 供应商可能有请求频率限制。输入列表里的字典必须有相同的 key 结构。
stream:流式输出
用户能看到模型一个字一个字地往外蹦,体验好很多:
model = ChatTongyi(streaming=True)
prompt = PromptTemplate(input_variables=["topic"], template="用5句话来介绍{topic}")
out = StrOutputParser()
chain = prompt | model | out
for chunk in chain.stream({"topic": "人工智能"}):
print(chunk, end="", flush=True)
flush=True 别忘了加,不然输出会攒一堆再打出来。
三种调用方式总结
| 方法 | 用途 | 适合场景 |
|---|---|---|
invoke | 单次同步调用 | 普通请求 |
batch | 批量并发调用 | 多个独立输入 |
stream | 流式逐步输出 | 用户界面实时展示 |
7. 大模型没有记忆
到这里 LCEL 链的基本用法差不多了。接下来聊记忆。
先说一个事实:大模型本身是无状态的。每次调用都是独立的,它不记得你上一轮说了什么。
验证一下:
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate, SystemMessagePromptTemplate
from models import get_lc_model_client
client = get_lc_model_client()
chat_template = ChatPromptTemplate.from_messages([
SystemMessagePromptTemplate.from_template("你是人工智能助手"),
('human', '{text}')
])
parser = StrOutputParser()
chain = chat_template | client | parser
print(chain.invoke({'text': '你好,我是大白'}))
# 模型会打招呼
print(chain.invoke({'text': '我是谁?'}))
# 模型不知道你是大白,因为每次调用是独立的
所以"记忆"这个能力,得我们自己实现。思路很简单:把历史对话存下来,每次调用时一起发给模型。
8. ChatMessageHistory:手动管理对话历史
最直接的方案——用 ChatMessageHistory 存消息,每次把所有历史消息塞进 prompt:
from langchain_core.prompts import ChatPromptTemplate, SystemMessagePromptTemplate, MessagesPlaceholder
from langchain_core.output_parsers import StrOutputParser
from langchain_community.chat_message_histories import ChatMessageHistory
from models import get_ali_model_client
chat_template = ChatPromptTemplate.from_messages([
SystemMessagePromptTemplate.from_template("你是人工智能助手"),
# MessagesPlaceholder 会把历史消息列表插到这个位置
MessagesPlaceholder(variable_name="messages"),
])
client = get_ali_model_client()
parser = StrOutputParser()
chain = chat_template | client | parser
# 创建历史记录存储
chat_history = ChatMessageHistory()
while True:
user_input = input("用户:")
if user_input == "exit":
break
chat_history.add_user_message(user_input)
response = chain.invoke({'messages': chat_history.messages})
print(f"助手:{response}")
chat_history.add_ai_message(response)
这样模型每次都能"看到"之前的对话。代价是 token 消耗会随对话轮数增长。
MessagesPlaceholder 是关键组件——它在 prompt 模板里占个位,运行时把消息列表填进去。
这个方案有几个问题:历史存在内存里,程序重启就没了;多用户的话没法隔离对话。
9. Redis 持久化:对话历史不丢失
把 ChatMessageHistory 换成 RedisChatMessageHistory,历史记录就存到 Redis 里了:
from langchain_community.chat_message_histories import RedisChatMessageHistory
# session_id 区分不同用户或对话
history = RedisChatMessageHistory(
session_id="my_session_id",
url="redis://localhost:6379"
)
# 用法和 ChatMessageHistory 一样
history.add_user_message("你是谁?")
ai_response = client.invoke(history.messages)
history.add_ai_message(ai_response)
程序重启后,只要 Redis 里的数据还在,对话就能接着聊。
10. RunnableWithMessageHistory:自动化记忆管理
手动管理历史记录写起来啰嗦——每次要手动存、手动取。LangChain 提供了 RunnableWithMessageHistory,把这些操作自动化了:
from langchain_core.prompts import ChatPromptTemplate, SystemMessagePromptTemplate, HumanMessagePromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnableWithMessageHistory
from langchain_community.chat_message_histories import RedisChatMessageHistory
from models import get_lc_model_client
client = get_lc_model_client()
prompt_template = ChatPromptTemplate.from_messages([
SystemMessagePromptTemplate.from_template("你是一个聊天助手,用中文回答所有的问题"),
HumanMessagePromptTemplate.from_template("{input}"),
])
parser = StrOutputParser()
chain = prompt_template | client | parser
def get_session_history(session_id):
return RedisChatMessageHistory(
session_id=session_id,
url="redis://localhost:6379",
ttl=300 # 5分钟过期
)
# 把链和记忆绑在一起
chatbot = RunnableWithMessageHistory(
chain,
get_session_history,
input_messages_key="input",
)
用 session_id 区分不同用户的对话:
# 用户 A 的对话
resp = chatbot.invoke(
{"input": "我喜欢音乐,推荐一首轻音乐"},
config={"configurable": {"session_id": "user_a"}},
)
print(resp)
# 用户 B 的对话,互不干扰
resp = chatbot.invoke(
{"input": "我喜欢篮球,猜猜我喜欢哪个NBA球星"},
config={"configurable": {"session_id": "user_b"}},
)
print(resp)
# 用户 A 再次提问,模型记得他喜欢音乐
resp = chatbot.invoke(
{"input": "我喜欢什么?"},
config={"configurable": {"session_id": "user_a"}},
)
print(resp) # 会提到音乐
整个流程:
用户输入 + session_id
↓
从 Redis 取该用户的历史消息
↓
历史 + 当前输入 → 一起发给模型
↓
模型回复
↓
回复存回 Redis
ttl=300 表示对话数据 5 分钟后过期,适合短会话场景。生产环境可以根据业务调整。
调试:看看链里到底发生了什么
链式调用写起来简洁,但出了问题不好排查。打开全局调试开关,每一步的输入输出都会打印出来:
from langchain_core.globals import set_debug
set_debug(True) # 开发时打开
# set_debug(False) # 部署时关掉
环境配置备忘
pip install langchain langchain-openai langchain-community redis
Redis 需要本地启动:
# macOS
brew install redis
brew services start redis
# 或者用 Docker
docker run -d -p 6379:6379 redis
写到这里差不多了。这篇覆盖了 LCEL 的各种组合方式和对话记忆的实现路径——从手动管理到自动化