上一篇讲了 Model I/O——模型调用、提示词模板、输出解析器、部署服务。这篇接着往下走,聊 LangChain 的链式表达语言 LCEL,以及怎么给大模型加上"记忆"。代码都跑通了,直接拿去用。

1. LCEL 是什么

LCEL 全称 LangChain Expression Language。说白了就是一套用管道符 | 把组件串起来的写法。你有一个 prompt、一个模型、一个输出解析器,用 | 连起来就是一条链。

这个设计思路很 Unix——每个组件只干一件事,数据从左往右流。

上一篇已经用过最基本的形式:

chain = prompt | model | parser
result = chain.invoke({"language": "中文", "text": "hello"})

这篇深入一点,看看 LCEL 还能怎么玩。

2. 顺序链:RunnableSequence

最简单的链就是把 prompt、model、parser 串起来:

from langchain_community.chat_models import ChatTongyi
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import PromptTemplate
from langchain_core.runnables import RunnableSequence

model = ChatTongyi()

prompt = PromptTemplate(
    input_variables=["topic"],
    template="用5句话来介绍{topic}"
)

out = StrOutputParser()

# 两种写法等价
chain = RunnableSequence(prompt, model, out)
# chain = prompt | model | out

print(chain.invoke({"topic": "人工智能"}))

invoke 是同步调用。输入一个字典,数据依次经过 prompt 格式化、模型推理、字符串解析,最后拿到结果。管道符写法是语法糖,底层就是 RunnableSequence

3. 并行链:RunnableParallel

有时候你想对同一个输入做不同的处理。RunnableParallel 让多个函数同时执行,结果用字典返回:

from langchain_core.runnables import RunnableParallel, RunnableLambda

def add_one(x: int) -> int:
    return x + 1

def mul_two(x: int) -> int:
    return x * 2

def mul_three(x: int) -> int:
    return x * 3

chain = RunnableParallel(
    a=add_one,
    b=mul_two,
    c=mul_three,
)

print(chain.invoke(1))  # {'a': 2, 'b': 2, 'c': 3}

也可以把并行链接在顺序链后面:

chain1 = RunnableLambda(add_one)

chain2 = chain1 | RunnableParallel(
    a=mul_two,
    b=mul_three,
)

print(chain2.invoke(2))  # 先 +1 得到 3,再并行 *2 和 *3 → {'a': 6, 'b': 9}

4. RunnablePassthrough:数据透传和增强

这个组件我一开始没搞懂它的用处。其实它做两件事:原样传递数据,或者在传递的同时给数据加点料。

from langchain_core.runnables import RunnableParallel, RunnablePassthrough

# 原样传递
chain = RunnableParallel(
    passed=RunnablePassthrough(),
)
print(chain.invoke("hello world"))  # {'passed': 'hello world'}

# 数据增强:保留原数据的同时追加一个字段
chain = RunnableParallel(
    passed=RunnablePassthrough().assign(
        modified=lambda x: x["k1"] + "!!!"
    ),
)
print(chain.invoke({"k1": "hello world"}))
# {'passed': {'k1': 'hello world', 'modified': 'hello world!!!'}}

.assign() 很实用。比如你想在调用模型前,把用户输入做一些预处理然后一起传下去,就能用这招。

5. RunnableLambda:把普通函数变成链的节点

任何 Python 函数都能通过 RunnableLambda 变成链的一部分:

from langchain_core.runnables import RunnableLambda, chain
from operator import itemgetter

def length_function(text):
    return len(text)

# 方式一:直接包装
chain = RunnableLambda(length_function)
print(chain.invoke("hello"))  # 5

# 方式二:用装饰器 @chain
@chain
def multiple_length_function(_dict):
    return len(_dict["text1"]) * len(_dict["text2"])

装饰器写法更简洁,适合逻辑稍复杂的场景。

实际用的时候经常配合 itemgetter 从字典里取值,串成更复杂的数据流:

from langchain_core.prompts import ChatPromptTemplate

prompt = ChatPromptTemplate.from_template("{a} + {b} = ? 计算结果是多少?")
model = ChatTongyi()
out = StrOutputParser()

chain = (
    {
        "a": itemgetter("k1") | RunnableLambda(length_function),  # 5
        "b": {"text1": itemgetter("k1"), "text2": itemgetter("k2")} | multiple_length_function  # 25
    }
    | prompt | model | out
)

print(chain.invoke({"k1": "hello", "k2": "world"}))

这个例子有点绕,但能看出 LCEL 的组合能力——字典里的每个 key 都是一条独立的数据处理管道,最后汇总给 prompt。

6. 批量调用和流式输出

LCEL 链自带 batchstream 方法,不用额外写循环。

batch:并发请求多个输入

from langchain_core.prompts import ChatPromptTemplate

chain = (
    ChatPromptTemplate.from_template("用一句话介绍{topic}")
    | ChatTongyi()
    | StrOutputParser()
)

topics = ["人工智能", "区块链", "量子计算", "基因编辑"]
inputs = [{"topic": topic} for topic in topics]

# 并发执行,比逐个调用快不少
results = chain.batch(inputs)

注意:API 供应商可能有请求频率限制。输入列表里的字典必须有相同的 key 结构。

stream:流式输出

用户能看到模型一个字一个字地往外蹦,体验好很多:

model = ChatTongyi(streaming=True)
prompt = PromptTemplate(input_variables=["topic"], template="用5句话来介绍{topic}")
out = StrOutputParser()

chain = prompt | model | out

for chunk in chain.stream({"topic": "人工智能"}):
    print(chunk, end="", flush=True)

flush=True 别忘了加,不然输出会攒一堆再打出来。

三种调用方式总结

方法用途适合场景
invoke单次同步调用普通请求
batch批量并发调用多个独立输入
stream流式逐步输出用户界面实时展示

7. 大模型没有记忆

到这里 LCEL 链的基本用法差不多了。接下来聊记忆。

先说一个事实:大模型本身是无状态的。每次调用都是独立的,它不记得你上一轮说了什么。

验证一下:

from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate, SystemMessagePromptTemplate
from models import get_lc_model_client

client = get_lc_model_client()

chat_template = ChatPromptTemplate.from_messages([
    SystemMessagePromptTemplate.from_template("你是人工智能助手"),
    ('human', '{text}')
])
parser = StrOutputParser()
chain = chat_template | client | parser

print(chain.invoke({'text': '你好,我是大白'}))
# 模型会打招呼

print(chain.invoke({'text': '我是谁?'}))
# 模型不知道你是大白,因为每次调用是独立的

所以"记忆"这个能力,得我们自己实现。思路很简单:把历史对话存下来,每次调用时一起发给模型。

8. ChatMessageHistory:手动管理对话历史

最直接的方案——用 ChatMessageHistory 存消息,每次把所有历史消息塞进 prompt:

from langchain_core.prompts import ChatPromptTemplate, SystemMessagePromptTemplate, MessagesPlaceholder
from langchain_core.output_parsers import StrOutputParser
from langchain_community.chat_message_histories import ChatMessageHistory
from models import get_ali_model_client

chat_template = ChatPromptTemplate.from_messages([
    SystemMessagePromptTemplate.from_template("你是人工智能助手"),
    # MessagesPlaceholder 会把历史消息列表插到这个位置
    MessagesPlaceholder(variable_name="messages"),
])

client = get_ali_model_client()
parser = StrOutputParser()
chain = chat_template | client | parser

# 创建历史记录存储
chat_history = ChatMessageHistory()

while True:
    user_input = input("用户:")
    if user_input == "exit":
        break

    chat_history.add_user_message(user_input)
    response = chain.invoke({'messages': chat_history.messages})
    print(f"助手:{response}")
    chat_history.add_ai_message(response)

这样模型每次都能"看到"之前的对话。代价是 token 消耗会随对话轮数增长。

MessagesPlaceholder 是关键组件——它在 prompt 模板里占个位,运行时把消息列表填进去。

这个方案有几个问题:历史存在内存里,程序重启就没了;多用户的话没法隔离对话。

9. Redis 持久化:对话历史不丢失

ChatMessageHistory 换成 RedisChatMessageHistory,历史记录就存到 Redis 里了:

from langchain_community.chat_message_histories import RedisChatMessageHistory

# session_id 区分不同用户或对话
history = RedisChatMessageHistory(
    session_id="my_session_id",
    url="redis://localhost:6379"
)

# 用法和 ChatMessageHistory 一样
history.add_user_message("你是谁?")
ai_response = client.invoke(history.messages)
history.add_ai_message(ai_response)

程序重启后,只要 Redis 里的数据还在,对话就能接着聊。

10. RunnableWithMessageHistory:自动化记忆管理

手动管理历史记录写起来啰嗦——每次要手动存、手动取。LangChain 提供了 RunnableWithMessageHistory,把这些操作自动化了:

from langchain_core.prompts import ChatPromptTemplate, SystemMessagePromptTemplate, HumanMessagePromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnableWithMessageHistory
from langchain_community.chat_message_histories import RedisChatMessageHistory
from models import get_lc_model_client

client = get_lc_model_client()

prompt_template = ChatPromptTemplate.from_messages([
    SystemMessagePromptTemplate.from_template("你是一个聊天助手,用中文回答所有的问题"),
    HumanMessagePromptTemplate.from_template("{input}"),
])
parser = StrOutputParser()
chain = prompt_template | client | parser

def get_session_history(session_id):
    return RedisChatMessageHistory(
        session_id=session_id,
        url="redis://localhost:6379",
        ttl=300  # 5分钟过期
    )

# 把链和记忆绑在一起
chatbot = RunnableWithMessageHistory(
    chain,
    get_session_history,
    input_messages_key="input",
)

session_id 区分不同用户的对话:

# 用户 A 的对话
resp = chatbot.invoke(
    {"input": "我喜欢音乐,推荐一首轻音乐"},
    config={"configurable": {"session_id": "user_a"}},
)
print(resp)

# 用户 B 的对话,互不干扰
resp = chatbot.invoke(
    {"input": "我喜欢篮球,猜猜我喜欢哪个NBA球星"},
    config={"configurable": {"session_id": "user_b"}},
)
print(resp)

# 用户 A 再次提问,模型记得他喜欢音乐
resp = chatbot.invoke(
    {"input": "我喜欢什么?"},
    config={"configurable": {"session_id": "user_a"}},
)
print(resp)  # 会提到音乐

整个流程:

用户输入 + session_id
    ↓
从 Redis 取该用户的历史消息
    ↓
历史 + 当前输入 → 一起发给模型
    ↓
模型回复
    ↓
回复存回 Redis

ttl=300 表示对话数据 5 分钟后过期,适合短会话场景。生产环境可以根据业务调整。

调试:看看链里到底发生了什么

链式调用写起来简洁,但出了问题不好排查。打开全局调试开关,每一步的输入输出都会打印出来:

from langchain_core.globals import set_debug

set_debug(True)  # 开发时打开
# set_debug(False)  # 部署时关掉

环境配置备忘

pip install langchain langchain-openai langchain-community redis

Redis 需要本地启动:

# macOS
brew install redis
brew services start redis

# 或者用 Docker
docker run -d -p 6379:6379 redis

写到这里差不多了。这篇覆盖了 LCEL 的各种组合方式和对话记忆的实现路径——从手动管理到自动化