LangChain 中文教程LangChain 中文教程
首页
  • 什么是 LangChain
  • 环境搭建
  • 第一个应用
  • 模型调用
  • 提示词模板
  • 链式调用
  • 记忆功能
  • 工具使用
  • 检索增强生成(RAG)
  • Agent 智能体
  • LangGraph 入门
  • LangSmith 监控
  • 部署与优化
LangChain 官网
首页
  • 什么是 LangChain
  • 环境搭建
  • 第一个应用
  • 模型调用
  • 提示词模板
  • 链式调用
  • 记忆功能
  • 工具使用
  • 检索增强生成(RAG)
  • Agent 智能体
  • LangGraph 入门
  • LangSmith 监控
  • 部署与优化
LangChain 官网
  • 高级篇

    • Agent 智能体
    • LangGraph 入门
    • LangSmith 监控
    • 部署与优化

部署与优化

将 LangChain 应用部署到生产环境需要考虑性能、成本、可靠性等多个方面。本章将介绍最佳实践和优化策略。

部署架构

典型架构

                    ┌─────────────────────┐
                    │     负载均衡器       │
                    └──────────┬──────────┘
                               │
           ┌───────────────────┼───────────────────┐
           │                   │                   │
    ┌──────▼──────┐     ┌──────▼──────┐     ┌──────▼──────┐
    │   应用实例1  │     │   应用实例2  │     │   应用实例3  │
    └──────┬──────┘     └──────┬──────┘     └──────┬──────┘
           │                   │                   │
           └───────────────────┼───────────────────┘
                               │
    ┌──────────────────────────┼──────────────────────────┐
    │                          │                          │
┌───▼───┐              ┌───────▼───────┐           ┌──────▼──────┐
│ Redis │              │  向量数据库    │           │   LLM API   │
│ 缓存  │              │  (Pinecone等) │           │  (OpenAI等) │
└───────┘              └───────────────┘           └─────────────┘

使用 LangServe 部署

LangServe 是官方推荐的部署方案:

安装

pip install langserve[all] fastapi uvicorn

创建服务

# server.py
from fastapi import FastAPI
from langserve import add_routes
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from dotenv import load_dotenv

load_dotenv()

# 创建应用
app = FastAPI(
    title="LangChain API",
    version="1.0",
    description="LangChain 应用 API"
)

# 创建链
prompt = ChatPromptTemplate.from_template("你是一个助手。{input}")
llm = ChatOpenAI(model="gpt-3.5-turbo")
chain = prompt | llm | StrOutputParser()

# 添加路由
add_routes(
    app,
    chain,
    path="/chat"
)

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

运行服务

python server.py
# 或使用 uvicorn
uvicorn server:app --host 0.0.0.0 --port 8000 --reload

访问端点

LangServe 自动创建多个端点:

  • POST /chat/invoke - 同步调用
  • POST /chat/batch - 批量调用
  • POST /chat/stream - 流式调用
  • GET /chat/playground - 交互式测试界面
  • GET /chat/input_schema - 输入模式
  • GET /chat/output_schema - 输出模式

客户端调用

from langserve import RemoteRunnable

# 连接远程服务
chain = RemoteRunnable("http://localhost:8000/chat")

# 调用
result = chain.invoke({"input": "你好"})
print(result)

# 流式调用
for chunk in chain.stream({"input": "讲个故事"}):
    print(chunk, end="", flush=True)

使用 FastAPI 自定义部署

更灵活的部署方式:

# app.py
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from fastapi.responses import StreamingResponse
import asyncio
from dotenv import load_dotenv

load_dotenv()

app = FastAPI(title="AI Chat API")

# CORS 配置
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# 请求模型
class ChatRequest(BaseModel):
    message: str
    temperature: float = 0.7

class ChatResponse(BaseModel):
    response: str
    tokens_used: int = None

# 创建链
def create_chain(temperature: float = 0.7):
    llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=temperature)
    prompt = ChatPromptTemplate.from_template("{message}")
    return prompt | llm | StrOutputParser()

@app.post("/chat", response_model=ChatResponse)
async def chat(request: ChatRequest):
    """同步聊天端点"""
    try:
        chain = create_chain(request.temperature)
        response = await chain.ainvoke({"message": request.message})
        return ChatResponse(response=response)
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@app.post("/chat/stream")
async def chat_stream(request: ChatRequest):
    """流式聊天端点"""
    async def generate():
        chain = create_chain(request.temperature)
        async for chunk in chain.astream({"message": request.message}):
            yield f"data: {chunk}\n\n"
        yield "data: [DONE]\n\n"
    
    return StreamingResponse(
        generate(),
        media_type="text/event-stream"
    )

@app.get("/health")
async def health_check():
    """健康检查"""
    return {"status": "healthy"}

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

Docker 部署

Dockerfile

FROM python:3.11-slim

WORKDIR /app

# 安装依赖
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# 复制代码
COPY . .

# 暴露端口
EXPOSE 8000

# 启动命令
CMD ["uvicorn", "app:app", "--host", "0.0.0.0", "--port", "8000"]

docker-compose.yml

version: '3.8'

services:
  app:
    build: .
    ports:
      - "8000:8000"
    environment:
      - OPENAI_API_KEY=${OPENAI_API_KEY}
      - LANGCHAIN_TRACING_V2=true
      - LANGCHAIN_API_KEY=${LANGCHAIN_API_KEY}
    restart: unless-stopped
    
  redis:
    image: redis:alpine
    ports:
      - "6379:6379"
    volumes:
      - redis_data:/data

volumes:
  redis_data:

构建和运行

# 构建镜像
docker build -t langchain-app .

# 运行容器
docker-compose up -d

性能优化

1. 缓存策略

from langchain_core.globals import set_llm_cache
from langchain_community.cache import RedisCache
import redis

# Redis 缓存
redis_client = redis.Redis(host="localhost", port=6379)
set_llm_cache(RedisCache(redis_client))

# 语义缓存(相似问题返回缓存结果)
from langchain_community.cache import RedisSemanticCache
from langchain_openai import OpenAIEmbeddings

set_llm_cache(RedisSemanticCache(
    redis_url="redis://localhost:6379",
    embedding=OpenAIEmbeddings()
))

2. 批量处理

from langchain_openai import ChatOpenAI

llm = ChatOpenAI(model="gpt-3.5-turbo")

# 批量调用(并行处理)
questions = ["问题1", "问题2", "问题3"]
responses = llm.batch(questions, config={"max_concurrency": 5})

3. 异步处理

import asyncio
from langchain_openai import ChatOpenAI

llm = ChatOpenAI(model="gpt-3.5-turbo")

async def process_messages(messages: list):
    tasks = [llm.ainvoke(msg) for msg in messages]
    return await asyncio.gather(*tasks)

# 使用
messages = ["问题1", "问题2", "问题3"]
results = asyncio.run(process_messages(messages))

4. 连接池

from langchain_openai import ChatOpenAI
import httpx

# 使用自定义 HTTP 客户端
http_client = httpx.Client(
    limits=httpx.Limits(
        max_connections=100,
        max_keepalive_connections=20
    ),
    timeout=httpx.Timeout(30.0)
)

llm = ChatOpenAI(
    model="gpt-3.5-turbo",
    http_client=http_client
)

成本优化

1. 模型选择策略

class ModelRouter:
    """智能模型路由"""
    
    def __init__(self):
        self.cheap_model = ChatOpenAI(model="gpt-3.5-turbo")
        self.powerful_model = ChatOpenAI(model="gpt-4")
    
    def route(self, query: str):
        # 简单查询用便宜模型
        if len(query) < 100 and "简单" in query:
            return self.cheap_model
        # 复杂查询用强大模型
        return self.powerful_model

2. Token 限制

from langchain_openai import ChatOpenAI

llm = ChatOpenAI(
    model="gpt-3.5-turbo",
    max_tokens=500,  # 限制输出长度
)

3. 提示词优化

# 精简提示词减少 Token
concise_prompt = ChatPromptTemplate.from_template(
    "简短回答:{question}"
)

# 而不是
verbose_prompt = ChatPromptTemplate.from_template(
    """你是一个非常专业的助手,请根据你的知识,
    详细、全面、准确地回答以下问题:{question}
    请确保回答完整且有条理。"""
)

4. 监控成本

from langchain_core.callbacks import get_openai_callback

with get_openai_callback() as cb:
    result = chain.invoke({"input": "问题"})
    
    print(f"输入 Token: {cb.prompt_tokens}")
    print(f"输出 Token: {cb.completion_tokens}")
    print(f"总成本: ${cb.total_cost:.4f}")

可靠性优化

1. 重试机制

from langchain_openai import ChatOpenAI
from tenacity import retry, stop_after_attempt, wait_exponential

llm = ChatOpenAI(
    model="gpt-3.5-turbo",
    max_retries=3,
    request_timeout=30
)

# 链级别重试
chain_with_retry = chain.with_retry(
    stop_after_attempt=3,
    wait_exponential_jitter=True
)

2. 降级策略

class ResilientLLM:
    """具有降级能力的 LLM"""
    
    def __init__(self):
        self.primary = ChatOpenAI(model="gpt-4")
        self.fallback = ChatOpenAI(model="gpt-3.5-turbo")
    
    def invoke(self, message: str):
        try:
            return self.primary.invoke(message)
        except Exception as e:
            print(f"主模型失败,使用降级模型: {e}")
            return self.fallback.invoke(message)

3. 超时处理

import asyncio

async def invoke_with_timeout(chain, input_data, timeout=30):
    """带超时的调用"""
    try:
        return await asyncio.wait_for(
            chain.ainvoke(input_data),
            timeout=timeout
        )
    except asyncio.TimeoutError:
        return {"error": "请求超时"}

4. 限流

from asyncio import Semaphore

class RateLimiter:
    def __init__(self, max_concurrent: int = 10):
        self.semaphore = Semaphore(max_concurrent)
    
    async def __aenter__(self):
        await self.semaphore.acquire()
    
    async def __aexit__(self, *args):
        self.semaphore.release()

rate_limiter = RateLimiter(max_concurrent=10)

async def rate_limited_call(chain, input_data):
    async with rate_limiter:
        return await chain.ainvoke(input_data)

安全最佳实践

1. 输入验证

from pydantic import BaseModel, validator

class UserInput(BaseModel):
    message: str
    
    @validator("message")
    def validate_message(cls, v):
        if len(v) > 10000:
            raise ValueError("消息过长")
        # 过滤危险内容
        dangerous_patterns = ["system:", "ignore previous"]
        for pattern in dangerous_patterns:
            if pattern.lower() in v.lower():
                raise ValueError("包含不允许的内容")
        return v

2. 输出过滤

def filter_output(response: str) -> str:
    """过滤敏感输出"""
    # 移除可能的敏感信息
    import re
    
    # 移除邮箱
    response = re.sub(r'\b[\w.-]+@[\w.-]+\.\w+\b', '[EMAIL]', response)
    # 移除电话
    response = re.sub(r'\b\d{11}\b', '[PHONE]', response)
    
    return response

3. API Key 管理

import os
from functools import lru_cache

@lru_cache
def get_api_key(key_name: str) -> str:
    """安全获取 API Key"""
    key = os.getenv(key_name)
    if not key:
        raise ValueError(f"Missing {key_name}")
    return key

# 使用
llm = ChatOpenAI(api_key=get_api_key("OPENAI_API_KEY"))

完整部署示例

"""
生产级 LangChain 应用
"""

from fastapi import FastAPI, HTTPException, Depends
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, validator
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.globals import set_llm_cache
from langchain_community.cache import RedisCache
from contextlib import asynccontextmanager
import redis
import os
from dotenv import load_dotenv

load_dotenv()

# 配置
class Settings:
    REDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379")
    OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
    MAX_MESSAGE_LENGTH = 5000
    DEFAULT_MODEL = "gpt-3.5-turbo"

settings = Settings()

# 生命周期管理
@asynccontextmanager
async def lifespan(app: FastAPI):
    # 启动时:设置缓存
    redis_client = redis.from_url(settings.REDIS_URL)
    set_llm_cache(RedisCache(redis_client))
    print("缓存已初始化")
    yield
    # 关闭时:清理资源
    print("应用关闭")

app = FastAPI(
    title="Production LangChain API",
    version="1.0.0",
    lifespan=lifespan
)

app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_methods=["*"],
    allow_headers=["*"],
)

# 请求模型
class ChatRequest(BaseModel):
    message: str
    model: str = "gpt-3.5-turbo"
    temperature: float = 0.7
    
    @validator("message")
    def validate_message(cls, v):
        if len(v) > settings.MAX_MESSAGE_LENGTH:
            raise ValueError(f"消息长度不能超过 {settings.MAX_MESSAGE_LENGTH}")
        return v.strip()
    
    @validator("temperature")
    def validate_temperature(cls, v):
        if not 0 <= v <= 2:
            raise ValueError("temperature 必须在 0-2 之间")
        return v

# 依赖注入
def get_chain(request: ChatRequest):
    llm = ChatOpenAI(
        model=request.model,
        temperature=request.temperature,
        max_retries=3
    )
    prompt = ChatPromptTemplate.from_template(
        "你是一个有帮助的助手。\n\n用户:{message}\n\n助手:"
    )
    return prompt | llm | StrOutputParser()

# 端点
@app.post("/chat")
async def chat(request: ChatRequest, chain=Depends(get_chain)):
    try:
        response = await chain.ainvoke({"message": request.message})
        return {"response": response, "status": "success"}
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@app.get("/health")
async def health():
    return {"status": "healthy"}

@app.get("/models")
async def list_models():
    return {
        "models": ["gpt-3.5-turbo", "gpt-4", "gpt-4-turbo"]
    }

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

小结

本章介绍了:

✅ 典型部署架构
✅ 使用 LangServe 快速部署
✅ FastAPI 自定义部署
✅ Docker 容器化
✅ 性能优化策略
✅ 成本控制方法
✅ 可靠性优化
✅ 安全最佳实践
✅ 完整的生产级示例

恭喜完成!

🎉 恭喜你完成了 LangChain 中文教程的全部内容!

现在你已经掌握了:

  • 基础篇:环境搭建、模型调用、提示词模板
  • 进阶篇:链式调用、记忆功能、工具使用、RAG
  • 高级篇:Agent 智能体、LangGraph、LangSmith、生产部署

继续学习

  • 阅读 LangChain 官方文档
  • 参与 LangChain 社区
  • 关注最新的 AI 技术动态
  • 动手实践,构建你自己的 AI 应用!

祝你在 AI 应用开发的道路上越走越远!🚀

Prev
LangSmith 监控