部署与优化
将 LangChain 应用部署到生产环境需要考虑性能、成本、可靠性等多个方面。本章将介绍最佳实践和优化策略。
部署架构
典型架构
┌─────────────────────┐
│ 负载均衡器 │
└──────────┬──────────┘
│
┌───────────────────┼───────────────────┐
│ │ │
┌──────▼──────┐ ┌──────▼──────┐ ┌──────▼──────┐
│ 应用实例1 │ │ 应用实例2 │ │ 应用实例3 │
└──────┬──────┘ └──────┬──────┘ └──────┬──────┘
│ │ │
└───────────────────┼───────────────────┘
│
┌──────────────────────────┼──────────────────────────┐
│ │ │
┌───▼───┐ ┌───────▼───────┐ ┌──────▼──────┐
│ Redis │ │ 向量数据库 │ │ LLM API │
│ 缓存 │ │ (Pinecone等) │ │ (OpenAI等) │
└───────┘ └───────────────┘ └─────────────┘
使用 LangServe 部署
LangServe 是官方推荐的部署方案:
安装
pip install langserve[all] fastapi uvicorn
创建服务
# server.py
from fastapi import FastAPI
from langserve import add_routes
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from dotenv import load_dotenv
load_dotenv()
# 创建应用
app = FastAPI(
title="LangChain API",
version="1.0",
description="LangChain 应用 API"
)
# 创建链
prompt = ChatPromptTemplate.from_template("你是一个助手。{input}")
llm = ChatOpenAI(model="gpt-3.5-turbo")
chain = prompt | llm | StrOutputParser()
# 添加路由
add_routes(
app,
chain,
path="/chat"
)
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
运行服务
python server.py
# 或使用 uvicorn
uvicorn server:app --host 0.0.0.0 --port 8000 --reload
访问端点
LangServe 自动创建多个端点:
POST /chat/invoke- 同步调用POST /chat/batch- 批量调用POST /chat/stream- 流式调用GET /chat/playground- 交互式测试界面GET /chat/input_schema- 输入模式GET /chat/output_schema- 输出模式
客户端调用
from langserve import RemoteRunnable
# 连接远程服务
chain = RemoteRunnable("http://localhost:8000/chat")
# 调用
result = chain.invoke({"input": "你好"})
print(result)
# 流式调用
for chunk in chain.stream({"input": "讲个故事"}):
print(chunk, end="", flush=True)
使用 FastAPI 自定义部署
更灵活的部署方式:
# app.py
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from fastapi.responses import StreamingResponse
import asyncio
from dotenv import load_dotenv
load_dotenv()
app = FastAPI(title="AI Chat API")
# CORS 配置
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# 请求模型
class ChatRequest(BaseModel):
message: str
temperature: float = 0.7
class ChatResponse(BaseModel):
response: str
tokens_used: int = None
# 创建链
def create_chain(temperature: float = 0.7):
llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=temperature)
prompt = ChatPromptTemplate.from_template("{message}")
return prompt | llm | StrOutputParser()
@app.post("/chat", response_model=ChatResponse)
async def chat(request: ChatRequest):
"""同步聊天端点"""
try:
chain = create_chain(request.temperature)
response = await chain.ainvoke({"message": request.message})
return ChatResponse(response=response)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/chat/stream")
async def chat_stream(request: ChatRequest):
"""流式聊天端点"""
async def generate():
chain = create_chain(request.temperature)
async for chunk in chain.astream({"message": request.message}):
yield f"data: {chunk}\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(
generate(),
media_type="text/event-stream"
)
@app.get("/health")
async def health_check():
"""健康检查"""
return {"status": "healthy"}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
Docker 部署
Dockerfile
FROM python:3.11-slim
WORKDIR /app
# 安装依赖
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# 复制代码
COPY . .
# 暴露端口
EXPOSE 8000
# 启动命令
CMD ["uvicorn", "app:app", "--host", "0.0.0.0", "--port", "8000"]
docker-compose.yml
version: '3.8'
services:
app:
build: .
ports:
- "8000:8000"
environment:
- OPENAI_API_KEY=${OPENAI_API_KEY}
- LANGCHAIN_TRACING_V2=true
- LANGCHAIN_API_KEY=${LANGCHAIN_API_KEY}
restart: unless-stopped
redis:
image: redis:alpine
ports:
- "6379:6379"
volumes:
- redis_data:/data
volumes:
redis_data:
构建和运行
# 构建镜像
docker build -t langchain-app .
# 运行容器
docker-compose up -d
性能优化
1. 缓存策略
from langchain_core.globals import set_llm_cache
from langchain_community.cache import RedisCache
import redis
# Redis 缓存
redis_client = redis.Redis(host="localhost", port=6379)
set_llm_cache(RedisCache(redis_client))
# 语义缓存(相似问题返回缓存结果)
from langchain_community.cache import RedisSemanticCache
from langchain_openai import OpenAIEmbeddings
set_llm_cache(RedisSemanticCache(
redis_url="redis://localhost:6379",
embedding=OpenAIEmbeddings()
))
2. 批量处理
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(model="gpt-3.5-turbo")
# 批量调用(并行处理)
questions = ["问题1", "问题2", "问题3"]
responses = llm.batch(questions, config={"max_concurrency": 5})
3. 异步处理
import asyncio
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(model="gpt-3.5-turbo")
async def process_messages(messages: list):
tasks = [llm.ainvoke(msg) for msg in messages]
return await asyncio.gather(*tasks)
# 使用
messages = ["问题1", "问题2", "问题3"]
results = asyncio.run(process_messages(messages))
4. 连接池
from langchain_openai import ChatOpenAI
import httpx
# 使用自定义 HTTP 客户端
http_client = httpx.Client(
limits=httpx.Limits(
max_connections=100,
max_keepalive_connections=20
),
timeout=httpx.Timeout(30.0)
)
llm = ChatOpenAI(
model="gpt-3.5-turbo",
http_client=http_client
)
成本优化
1. 模型选择策略
class ModelRouter:
"""智能模型路由"""
def __init__(self):
self.cheap_model = ChatOpenAI(model="gpt-3.5-turbo")
self.powerful_model = ChatOpenAI(model="gpt-4")
def route(self, query: str):
# 简单查询用便宜模型
if len(query) < 100 and "简单" in query:
return self.cheap_model
# 复杂查询用强大模型
return self.powerful_model
2. Token 限制
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(
model="gpt-3.5-turbo",
max_tokens=500, # 限制输出长度
)
3. 提示词优化
# 精简提示词减少 Token
concise_prompt = ChatPromptTemplate.from_template(
"简短回答:{question}"
)
# 而不是
verbose_prompt = ChatPromptTemplate.from_template(
"""你是一个非常专业的助手,请根据你的知识,
详细、全面、准确地回答以下问题:{question}
请确保回答完整且有条理。"""
)
4. 监控成本
from langchain_core.callbacks import get_openai_callback
with get_openai_callback() as cb:
result = chain.invoke({"input": "问题"})
print(f"输入 Token: {cb.prompt_tokens}")
print(f"输出 Token: {cb.completion_tokens}")
print(f"总成本: ${cb.total_cost:.4f}")
可靠性优化
1. 重试机制
from langchain_openai import ChatOpenAI
from tenacity import retry, stop_after_attempt, wait_exponential
llm = ChatOpenAI(
model="gpt-3.5-turbo",
max_retries=3,
request_timeout=30
)
# 链级别重试
chain_with_retry = chain.with_retry(
stop_after_attempt=3,
wait_exponential_jitter=True
)
2. 降级策略
class ResilientLLM:
"""具有降级能力的 LLM"""
def __init__(self):
self.primary = ChatOpenAI(model="gpt-4")
self.fallback = ChatOpenAI(model="gpt-3.5-turbo")
def invoke(self, message: str):
try:
return self.primary.invoke(message)
except Exception as e:
print(f"主模型失败,使用降级模型: {e}")
return self.fallback.invoke(message)
3. 超时处理
import asyncio
async def invoke_with_timeout(chain, input_data, timeout=30):
"""带超时的调用"""
try:
return await asyncio.wait_for(
chain.ainvoke(input_data),
timeout=timeout
)
except asyncio.TimeoutError:
return {"error": "请求超时"}
4. 限流
from asyncio import Semaphore
class RateLimiter:
def __init__(self, max_concurrent: int = 10):
self.semaphore = Semaphore(max_concurrent)
async def __aenter__(self):
await self.semaphore.acquire()
async def __aexit__(self, *args):
self.semaphore.release()
rate_limiter = RateLimiter(max_concurrent=10)
async def rate_limited_call(chain, input_data):
async with rate_limiter:
return await chain.ainvoke(input_data)
安全最佳实践
1. 输入验证
from pydantic import BaseModel, validator
class UserInput(BaseModel):
message: str
@validator("message")
def validate_message(cls, v):
if len(v) > 10000:
raise ValueError("消息过长")
# 过滤危险内容
dangerous_patterns = ["system:", "ignore previous"]
for pattern in dangerous_patterns:
if pattern.lower() in v.lower():
raise ValueError("包含不允许的内容")
return v
2. 输出过滤
def filter_output(response: str) -> str:
"""过滤敏感输出"""
# 移除可能的敏感信息
import re
# 移除邮箱
response = re.sub(r'\b[\w.-]+@[\w.-]+\.\w+\b', '[EMAIL]', response)
# 移除电话
response = re.sub(r'\b\d{11}\b', '[PHONE]', response)
return response
3. API Key 管理
import os
from functools import lru_cache
@lru_cache
def get_api_key(key_name: str) -> str:
"""安全获取 API Key"""
key = os.getenv(key_name)
if not key:
raise ValueError(f"Missing {key_name}")
return key
# 使用
llm = ChatOpenAI(api_key=get_api_key("OPENAI_API_KEY"))
完整部署示例
"""
生产级 LangChain 应用
"""
from fastapi import FastAPI, HTTPException, Depends
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, validator
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.globals import set_llm_cache
from langchain_community.cache import RedisCache
from contextlib import asynccontextmanager
import redis
import os
from dotenv import load_dotenv
load_dotenv()
# 配置
class Settings:
REDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379")
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
MAX_MESSAGE_LENGTH = 5000
DEFAULT_MODEL = "gpt-3.5-turbo"
settings = Settings()
# 生命周期管理
@asynccontextmanager
async def lifespan(app: FastAPI):
# 启动时:设置缓存
redis_client = redis.from_url(settings.REDIS_URL)
set_llm_cache(RedisCache(redis_client))
print("缓存已初始化")
yield
# 关闭时:清理资源
print("应用关闭")
app = FastAPI(
title="Production LangChain API",
version="1.0.0",
lifespan=lifespan
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
)
# 请求模型
class ChatRequest(BaseModel):
message: str
model: str = "gpt-3.5-turbo"
temperature: float = 0.7
@validator("message")
def validate_message(cls, v):
if len(v) > settings.MAX_MESSAGE_LENGTH:
raise ValueError(f"消息长度不能超过 {settings.MAX_MESSAGE_LENGTH}")
return v.strip()
@validator("temperature")
def validate_temperature(cls, v):
if not 0 <= v <= 2:
raise ValueError("temperature 必须在 0-2 之间")
return v
# 依赖注入
def get_chain(request: ChatRequest):
llm = ChatOpenAI(
model=request.model,
temperature=request.temperature,
max_retries=3
)
prompt = ChatPromptTemplate.from_template(
"你是一个有帮助的助手。\n\n用户:{message}\n\n助手:"
)
return prompt | llm | StrOutputParser()
# 端点
@app.post("/chat")
async def chat(request: ChatRequest, chain=Depends(get_chain)):
try:
response = await chain.ainvoke({"message": request.message})
return {"response": response, "status": "success"}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/health")
async def health():
return {"status": "healthy"}
@app.get("/models")
async def list_models():
return {
"models": ["gpt-3.5-turbo", "gpt-4", "gpt-4-turbo"]
}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
小结
本章介绍了:
✅ 典型部署架构
✅ 使用 LangServe 快速部署
✅ FastAPI 自定义部署
✅ Docker 容器化
✅ 性能优化策略
✅ 成本控制方法
✅ 可靠性优化
✅ 安全最佳实践
✅ 完整的生产级示例
恭喜完成!
🎉 恭喜你完成了 LangChain 中文教程的全部内容!
现在你已经掌握了:
- 基础篇:环境搭建、模型调用、提示词模板
- 进阶篇:链式调用、记忆功能、工具使用、RAG
- 高级篇:Agent 智能体、LangGraph、LangSmith、生产部署
继续学习
- 阅读 LangChain 官方文档
- 参与 LangChain 社区
- 关注最新的 AI 技术动态
- 动手实践,构建你自己的 AI 应用!
祝你在 AI 应用开发的道路上越走越远!🚀