服务层、中间件、上下文管理及更多内容
现代智能体AI系统,无论运行于开发、预发布还是生产环境,均采用分层架构 构建,而非单一服务。每一层各司其职,分别负责智能体编排、内存管理、安全控制、可扩展性与故障处理等核心功能。企业级智能体系统需整合这些层级,确保智能体在真实业务负载下保持高可靠性、可观测性与安全性。
企业级智能体系统架构(法里德·汗 制作)
智能体系统需持续监控两大核心维度:
1. 智能体行为 :包括推理准确性、工具调用正确性、内存一致性、安全边界合规性,以及多轮对话和多智能体协作中的上下文处理能力。
2. 系统可靠性与性能 :覆盖全架构的延迟、可用性、吞吐量、成本效益、故障恢复能力,以及依赖组件的健康状态。
这两大维度对于大规模多智能体系统的稳定运行至关重要。
本文将详细讲解构建一套可投入生产的智能体系统所需的全部核心架构层,帮助团队在自有基础设施或面向客户的场景中,自信地部署AI智能体。
完整代码库已开源至GitHub仓库: GitHub - FareedKhan-dev/production-grade-Agentic-system:企业级智能体系统的七层核心架构
一、构建模块化代码库
Python项目往往从小规模起步,随着功能迭代逐渐变得混乱。构建企业级系统时,开发者通常采用模块化架构 ,将应用的不同组件拆分为独立模块。这种方式便于维护、测试和更新单个模块,且不会影响整个系统。
以下是为AI系统设计的结构化目录结构:
├── app/ # 主应用源码目录 │ ├── api/ # API路由处理器 │ │ └── v1/ # 版本化API(v1接口) │ ├── core/ # 应用核心配置与逻辑 │ │ ├── langgraph/ # AI智能体/LangGraph逻辑 │ │ │ └── tools/ # 智能体工具(搜索、操作等) │ │ └── prompts/ # AI系统与智能体提示词 │ ├── models/ # 数据库模型(基于SQLModel) │ ├── schemas/ # 数据校验模式(基于Pydantic) │ ├── services/ # 业务逻辑层 │ └── utils/ # 通用工具函数 ├── evals/ # AI评估框架 │ └── metrics/ # 评估指标与标准 │ └── prompts/ # 大语言模型评估提示词定义 ├── grafana/ # Grafana可观测性配置 │ └── dashboards/ # Grafana仪表盘 │ └── json/ # 仪表盘JSON配置文件 ├── prometheus/ # Prometheus监控配置 ├── scripts/ # 运维与本地自动化脚本 │ └── rules/ # Cursor项目规则 └── .github/ # GitHub配置 └── workflows/ # GitHub Actions CI/CD工作流
这个目录结构初看可能复杂,但遵循了智能体系统乃至通用软件工程的最佳实践,每个文件夹都有明确分工:
• app/:包含主应用代码,涵盖API路由、核心逻辑、数据库模型与工具函数
• evals/:存放用于评估AI性能的指标与提示词框架
• grafana/和prometheus/:存储监控与可观测性工具的配置文件
许多组件还设有子文件夹(如langgraph/和tools/),进一步实现关注点分离。下文将逐步拆解每个模块的构建过程,并阐明其核心价值。
1.1 依赖管理
构建企业级AI系统的第一步,是制定依赖管理策略 。小型项目通常使用简单的requirements.txt文件,但复杂项目需采用pyproject.toml,它支持依赖解析、版本控制与构建系统规范等高级功能。
以下是为本项目编写的pyproject.toml文件,包含依赖项与基础配置:
# ========================== # 项目元数据 # ========================== # 遵循PEP 621标准定义的Python项目基础信息 [project] name = "My Agentic AI System" # 项目包名 version = "0.1.0" # 当前版本(推荐语义化版本) description = "Deploying it as a SASS" # 简短描述 readme = "README.md" # 长描述文件 requires-python = ">=3.13" # 最低Python版本要求
第一部分定义项目元数据,包括名称、版本、描述和Python版本要求,这些信息在发布到PyPI等包索引时会被使用。
接下来是核心依赖项部分,列出项目运行所需的所有库。 由于我们构建的是面向1万以下活跃用户 的智能体AI系统,需要涵盖Web框架、数据库、认证、AI编排、可观测性等多类库:
# ========================== # 核心运行时依赖 # ========================== # 项目安装时会自动安装这些包 # 构成应用的核心功能 dependencies = [ # --- Web框架与服务器 --- "fastapi>=0.121.0", # 高性能异步Web框架 "uvicorn>=0.34.0", # 运行FastAPI的ASGI服务器 "asgiref>=3.8.1", # ASGI工具库(同步/异步桥接) "uvloop>=0.22.1", # 更快的asyncio事件循环 # --- LangChain / LangGraph生态 --- "langchain>=1.0.5", # 高级LLM编排框架 "langchain-core>=1.0.4", # LangChain核心抽象层 "langchain-openai>=1.0.2", # LangChain的OpenAI集成 "langchain-community>=0.4.1", # 社区维护的LangChain工具 "langgraph>=1.0.2", # 基于图的智能体/状态工作流 "langgraph-checkpoint-postgres>=3.0.1",# 基于PostgreSQL的LangGraph检查点 # --- 可观测性与追踪 --- "langfuse==3.9.1", # LLM调用追踪、监控与评估工具 "structlog>=25.2.0", # 结构化日志库 # --- 认证与安全 --- "passlib[bcrypt]>=1.7.4", # 密码哈希工具 "bcrypt>=4.3.0", # 底层bcrypt哈希算法 "python-jose[cryptography]>=3.4.0", # JWT处理与加密 "email-validator>=2.2.0", # 认证流程中的邮箱校验 # --- 数据库与持久化 --- "psycopg2-binary>=2.9.10", # PostgreSQL驱动 "sqlmodel>=0.0.24", # SQLAlchemy + Pydantic ORM框架 "supabase>=2.15.0", # Supabase客户端SDK # --- 配置与环境 --- "pydantic[email]>=2.11.1", # 带邮箱支持的数据校验库 "pydantic-settings>=2.8.1", # 基于环境变量的配置管理 "python-dotenv>=1.1.0", # 从.env文件加载环境变量 # --- API工具 --- "python-multipart>=0.0.20", # 支持multipart/form-data(文件上传) "slowapi>=0.1.9", # FastAPI限流工具 # --- 指标与监控 --- "prometheus-client>=0.19.0", # Prometheus指标导出器 "starlette-prometheus>=0.7.0",# Starlette/FastAPI的Prometheus中间件 # --- 搜索与外部工具 --- "duckduckgo-search>=3.9.0", # DuckDuckGo搜索集成 "ddgs>=9.6.0", # DuckDuckGo搜索客户端(替代方案) # --- 可靠性与工具 --- "tenacity>=9.1.2", # 不稳定操作的重试逻辑 "tqdm>=4.67.1", # 进度条工具 "colorama>=0.4.6", # 终端彩色输出 # --- 内存/智能体工具 --- "mem0ai>=1.0.0", # AI内存管理库 ]
值得注意的是,我们为每个依赖项指定了具体版本(使用>=运算符)。这在生产环境中至关重要,能避免因不同库依赖同一包的不兼容版本而引发的依赖地狱 问题。
随后是开发依赖项部分。多人协作的开发阶段,需要代码格式化、代码检查、类型校验等工具保障代码质量与一致性:
# ========================== # 可选依赖 # ========================== # 可通过以下命令安装: # pip install .[dev] [project.optional-dependencies] dev = [ "black", # 代码格式化工具 "isort", # 导入语句排序工具 "flake8", # 代码风格检查工具 "ruff", # 快速Python代码检查工具(flake8的现代化替代) "djlint==1.36.4", # HTML与模板的检查/格式化工具 ]
我们还可以按功能对依赖项分组(遵循PEP 735标准),例如将所有测试相关库归到test组:
# ========================== # 依赖分组(PEP 735风格) # ========================== # 便于现代工具按逻辑管理依赖 [dependency-groups] test = [ "httpx>=0.28.1", # 用于API测试的异步HTTP客户端 "pytest>=8.3.5", # 测试框架 ] # ========================== # Pytest配置 # ========================== [tool.pytest.ini_options] markers = [ "slow: marks tests as slow (deselect with '-m \\"not slow\\"')", ] python_files = [ "test_*.py", "*_test.py", "tests.py", ] # ========================== # Black(代码格式化工具)配置 # ========================== [tool.black] line-length = 119 # 最大行长度 exclude = "venv|migrations" # 忽略的文件/目录 # ========================== # Flake8(代码检查工具)配置 # ========================== [tool.flake8] docstring-convention = "all" # 强制文档字符串规范 ignore = [ "D107", "D212", "E501", "W503", "W605", "D203", "D100", ] exclude = "venv|migrations" max-line-length = 119 # ========================== # Radon(圈复杂度分析)配置 # ========================== # 允许的最大圈复杂度 radon-max-cc = 10 # ========================== # isort(导入排序工具)配置 # ========================== [tool.isort] profile = "black" # 与Black兼容 multi_line_output = "VERTICAL_HANGING_INDENT" force_grid_wrap = 2 line_length = 119 skip = ["migrations", "venv"] # ========================== # Pylint配置 # ========================== [tool.pylint."messages control"] disable = [ "line-too-long", "trailing-whitespace", "missing-function-docstring", "consider-using-f-string", "import-error", "too-few-public-methods", "redefined-outer-name", ] [tool.pylint.master] ignore = "migrations" # ========================== # Ruff(快速代码检查工具)配置 # ========================== [tool.ruff] line-length = 119 exclude = ["migrations", "*.ipynb", "venv"] [tool.ruff.lint] # 按文件忽略规则 [tool.ruff.lint.per-file-ignores] "__init__.py" = ["E402"] # 允许__init__.py中的导入语句不在顶部
我们来逐一解读剩余配置项:
• 依赖分组 :将功能相关的依赖归类,例如测试依赖统一放在test组
• Pytest配置 :自定义测试用例的发现规则与标记
• isort :自动整理Python文件中的导入语句
这些开发工具属于可选依赖,但在生产级项目中强烈推荐使用——随着代码库扩张,没有它们的约束,代码会逐渐变得难以维护。
1.2 环境配置
接下来需要配置项目的通用参数,在开发领域通常称为配置管理 。
小型项目中,开发者常使用简单的.env文件存储环境变量,但更规范的做法是将模板文件命名为.env.example并提交到版本控制,同时为不同环境创建独立配置文件:
# 不同环境的配置文件 .env.[development|staging|production] # 例如:.env.development
为什么不直接使用.env? 这种方式能为开发、预发布、生产等不同环境维护独立的隔离配置(例如开发环境启用调试模式,生产环境禁用),无需频繁修改单个文件切换环境。
我们创建.env.example文件,填入所有必要的环境变量并使用占位符赋值:
# ================================================== # 应用配置 # ================================================== APP_ENV=development # 应用环境(development | staging | production) PROJECT_NAME="Project Name" # 项目可读名称 VERSION=1.0.0 # 应用版本 DEBUG=true # 启用调试模式(生产环境需禁用)
第一部分定义应用基础配置,包括环境类型、项目名、版本和调试模式。
随后是API配置,定义API版本的基础路径:
# ================================================== # API配置 # ================================================== API_V1_STR=/api/v1 # API版本化基础路径 # ================================================== # 跨域资源共享(CORS)配置 # ================================================== # 允许访问的前端源列表(逗号分隔) ALLOWED_ORIGINS="<http://localhost:3000>,<http://localhost:8000>" # ================================================== # Langfuse可观测性配置 # ================================================== # 用于LLM调用追踪、监控与分析 LANGFUSE_PUBLIC_KEY="your-langfuse-public-key" # Langfuse公钥 LANGFUSE_SECRET_KEY="your-langfuse-secret-key" # Langfuse密钥 LANGFUSE_HOST=https://cloud.langfuse.com # Langfuse云端地址
API_V1_STR让我们可以轻松实现API版本控制,这是OpenAI、Cohere等主流AI服务商的标准做法。
CORS配置对Web应用至关重要,它控制哪些前端域名可以访问后端API(进而集成AI智能体)。
我们使用行业标准工具Langfuse实现LLM交互的可观测性与监控,因此需要配置对应的API密钥与服务地址。
# ================================================== # 大语言模型(LLM)配置 # ================================================== OPENAI_API_KEY="your-llm-api-key" # LLM服务商API密钥(如OpenAI) DEFAULT_LLM_MODEL=gpt-4o-mini # 默认对话模型 DEFAULT_LLM_TEMPERATURE=0.2 # 随机性控制(0=确定性,1=创造性) # ================================================== # JWT认证配置 # ================================================== JWT_SECRET_KEY="your-jwt-secret-key" # 签名JWT令牌的密钥 JWT_ALGORITHM=HS256 # JWT签名算法 JWT_ACCESS_TOKEN_EXPIRE_DAYS=30 # 令牌过期时间(天) # ================================================== # 数据库(PostgreSQL)配置 # ================================================== POSTGRES_HOST=db # 数据库地址(Docker服务名或主机名) POSTGRES_DB=mydb # 数据库名 POSTGRES_USER=myuser # 数据库用户名 POSTGRES_PORT=5432 # 数据库端口 POSTGRES_PASSWORD=mypassword # 数据库密码 # 连接池配置 POSTGRES_POOL_SIZE=5 # 持久数据库连接的基础数量 POSTGRES_MAX_OVERFLOW=10 # 连接池允许的最大额外连接数 # ================================================== # 限流配置(SlowAPI) # ================================================== # 所有路由的默认限流规则 RATE_LIMIT_DEFAULT="1000 per day,200 per hour" # 接口专属限流规则 RATE_LIMIT_CHAT="100 per minute" # 对话接口 RATE_LIMIT_CHAT_STREAM="100 per minute" # 流式对话接口 RATE_LIMIT_MESSAGES="200 per minute" # 消息创建接口 RATE_LIMIT_LOGIN="100 per minute" # 登录/认证接口 # ================================================== # 日志配置 # ================================================== LOG_LEVEL=DEBUG # 日志级别(DEBUG, INFO, WARNING, ERROR) LOG_FORMAT=console # 日志输出格式(console | json)
本项目选用OpenAI作为主要LLM服务商,因此需配置API密钥、默认模型与温度参数。
JWT配置在认证与会话管理中扮演关键角色,需要设置令牌签名密钥、编解码算法与过期时间。
数据库采用工业级关系型数据库PostgreSQL。当智能体系统规模扩张时,合理的连接池配置能避免因连接数过多压垮数据库,此处设置基础池大小为5,最大额外连接数为10。
最后是限流与日志配置,既能防止API被滥用,也能为调试和监控提供完整的日志记录。
完成依赖管理与配置管理策略后,就可以开始开发AI系统的核心逻辑了。第一步是在应用代码中加载这些配置,我们创建app/core/config.py文件,基于Pydantic实现配置加载:
首先导入所需模块:
# 导入配置管理所需的模块 import json # 处理JSON数据 import os # 与操作系统交互 from enum import Enum # 创建枚举类型 from pathlib import Path # 处理文件路径 from typing import ( # 类型注解 Any, # 任意类型 Dict, # 字典类型 List, # 列表类型 Optional, # 可选类型 Union, # 联合类型 ) from dotenv import load_dotenv # 从.env文件加载环境变量
这些是文件处理、类型注解和环境变量加载的基础依赖。
接下来,用枚举类型定义环境类型:
# 定义环境类型 class Environment(str, Enum): """应用环境类型 定义应用可运行的环境:开发、预发布、生产、测试 """ DEVELOPMENT = "development" STAGING = "staging" PRODUCTION = "production" TEST = "test"
任何项目通常都包含开发、预发布、生产、测试等多个环境,各自承担不同功能。
定义完环境类型后,编写函数根据环境变量判断当前环境:
# 判断当前环境 def get_environment() -> Environment: """获取当前环境 返回值: Environment: 当前环境(开发/预发布/生产/测试) """ match os.getenv("APP_ENV", "development").lower(): case "production" | "prod": return Environment.PRODUCTION case "staging" | "stage": return Environment.STAGING case "test": return Environment.TEST case _: return Environment.DEVELOPMENT
该函数通过APP_ENV环境变量判断当前环境,未设置时默认使用开发环境。
最后,根据当前环境加载对应的.env文件:
# 根据当前环境加载对应的.env文件 def load_env_file(): """加载特定环境的.env文件""" env = get_environment() print(f"Loading environment: {env}") base_dir = os.path.dirname(os.path.dirname(os.path.dirname(__file__))) # 按优先级定义环境文件列表 env_files = [ os.path.join(base_dir, f".env.{env.value}.local"), os.path.join(base_dir, f".env.{env.value}"), os.path.join(base_dir, ".env.local"), os.path.join(base_dir, ".env"), ] # 加载第一个存在的环境文件 for env_file in env_files: if os.path.isfile(env_file): load_dotenv(dotenv_path=env_file) print(f"Loaded environment from {env_file}") return env_file # 未找到环境文件时使用默认配置 return None
应用启动时需立即调用此函数加载环境变量。
实际开发中,部分环境变量的值是列表或字典类型,因此需要编写工具函数解析这些值:
# 从环境变量解析列表 def parse_list_from_env(env_key, default=None): """从环境变量解析逗号分隔的列表""" value = os.getenv(env_key) if not value: return default or [] # 移除可能存在的引号 value = value.strip("\\"'") # 处理单个值的情况 if "," not in value: return [value] # 分割逗号分隔的值 return [item.strip() for item in value.split(",") if item.strip()] # 从带前缀的环境变量解析字典(值为列表) def parse_dict_of_lists_from_env(prefix, default_dict=None): """从带公共前缀的环境变量解析字典(值为列表)""" result = default_dict or {} # 遍历所有带指定前缀的环境变量 for key, value in os.environ.items(): if key.startswith(prefix): endpoint = key[len(prefix) :].lower() # 提取接口名称 # 解析该接口对应的配置值 if value: value = value.strip("\\"'") if "," in value: result[endpoint] = [item.strip() for item in value.split(",") if item.strip()] else: result[endpoint] = [value] return result
这两个函数分别用于解析逗号分隔的列表和带前缀的字典类型环境变量,方便在代码中直接使用。
现在可以定义主配置类Settings,集中管理应用的所有配置项。该类从环境变量读取配置,并在未设置时应用默认值:
class Settings: """ 应用集中式配置 从环境变量加载配置,并应用默认值 """ def __init__(self): # 设置当前环境 self.ENVIRONMENT = get_environment() # ========================== # 应用基础配置 # ========================== self.PROJECT_NAME = os.getenv("PROJECT_NAME", "FastAPI LangGraph Agent") self.VERSION = os.getenv("VERSION", "1.0.0") self.API_V1_STR = os.getenv("API_V1_STR", "/api/v1") self.DEBUG = os.getenv("DEBUG", "false").lower() in ("true", "1", "t", "yes") # 使用工具函数解析CORS允许的源列表 self.ALLOWED_ORIGINS = parse_list_from_env("ALLOWED_ORIGINS", ["*"]) # ========================== # LLM & LangGraph配置 # ========================== self.OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "") self.DEFAULT_LLM_MODEL = os.getenv("DEFAULT_LLM_MODEL", "gpt-4o-mini") self.DEFAULT_LLM_TEMPERATURE = float(os.getenv("DEFAULT_LLM_TEMPERATURE", "0.2")) # 智能体专属配置 self.MAX_TOKENS = int(os.getenv("MAX_TOKENS", "2000")) self.MAX_LLM_CALL_RETRIES = int(os.getenv("MAX_LLM_CALL_RETRIES", "3")) # ========================== # 可观测性配置(Langfuse) # ========================== self.LANGFUSE_PUBLIC_KEY = os.getenv("LANGFUSE_PUBLIC_KEY", "") self.LANGFUSE_SECRET_KEY = os.getenv("LANGFUSE_SECRET_KEY", "") self.LANGFUSE_HOST = os.getenv("LANGFUSE_HOST", "<https://cloud.langfuse.com>") # ========================== # 数据库配置(PostgreSQL) # ========================== self.POSTGRES_HOST = os.getenv("POSTGRES_HOST", "localhost") self.POSTGRES_PORT = int(os.getenv("POSTGRES_PORT", "5432")) self.POSTGRES_DB = os.getenv("POSTGRES_DB", "postgres") self.POSTGRES_USER = os.getenv("POSTGRES_USER", "postgres") self.POSTGRES_PASSWORD = os.getenv("POSTGRES_PASSWORD", "postgres") # 连接池配置对高并发智能体至关重要 self.POSTGRES_POOL_SIZE = int(os.getenv("POSTGRES_POOL_SIZE", "20")) self.POSTGRES_MAX_OVERFLOW = int(os.getenv("POSTGRES_MAX_OVERFLOW", "10")) # LangGraph持久化所需的表名 self.CHECKPOINT_TABLES = ["checkpoint_blobs", "checkpoint_writes", "checkpoints"] # ========================== # 安全配置(JWT) # ========================== self.JWT_SECRET_KEY = os.getenv("JWT_SECRET_KEY", "unsafe-secret-for-dev") self.JWT_ALGORITHM = os.getenv("JWT_ALGORITHM", "HS256") self.JWT_ACCESS_TOKEN_EXPIRE_DAYS = int(os.getenv("JWT_ACCESS_TOKEN_EXPIRE_DAYS", "30")) # ========================== # 限流配置 # ========================== self.RATE_LIMIT_DEFAULT = parse_list_from_env("RATE_LIMIT_DEFAULT", ["200 per day", "50 per hour"]) # 定义接口专属限流规则 self.RATE_LIMIT_ENDPOINTS = { "chat": parse_list_from_env("RATE_LIMIT_CHAT", ["30 per minute"]), "chat_stream": parse_list_from_env("RATE_LIMIT_CHAT_STREAM", ["20 per minute"]), "auth": parse_list_from_env("RATE_LIMIT_LOGIN", ["20 per minute"]), "root": parse_list_from_env("RATE_LIMIT_ROOT", ["10 per minute"]), "health": parse_list_from_env("RATE_LIMIT_HEALTH", ["20 per minute"]), } # 根据环境应用配置覆盖规则 self.apply_environment_settings() def apply_environment_settings(self): """ 根据当前环境应用严格的配置覆盖规则 确保即使.env配置错误,生产环境的安全性也不受影响 """ if self.ENVIRONMENT == Environment.DEVELOPMENT: self.DEBUG = True self.LOG_LEVEL = "DEBUG" self.LOG_FORMAT = "console" # 开发环境放宽限流规则 self.RATE_LIMIT_DEFAULT = ["1000 per day", "200 per hour"] elif self.ENVIRONMENT == Environment.PRODUCTION: self.DEBUG = False self.LOG_LEVEL = "WARNING" self.LOG_FORMAT = "json" # 生产环境收紧限流规则 self.RATE_LIMIT_DEFAULT = ["200 per day", "50 per hour"]
Settings类从环境变量读取各类配置,同时提供合理的默认值。apply_environment_settings方法会根据当前环境调整配置——例如开发环境启用调试模式并放宽限流,生产环境则禁用调试并收紧限流,确保安全性。
其中checkpoint_tables定义了LangGraph在PostgreSQL中实现持久化所需的表名。
最后,初始化全局配置对象,供整个应用导入使用:
# 初始化全局配置对象 settings = Settings()
至此,我们完成了企业级AI系统的依赖管理与配置管理策略。
1.3 容器化策略
接下来需要创建docker-compose.yml文件,定义应用运行所需的所有服务。
选择容器化的原因在于:生产级系统中,数据库、监控工具、API服务等组件并非孤立运行,它们需要相互通信,而Docker Compose是编排多容器Docker应用的标准工具。
首先定义数据库服务。由于我们构建的AI智能体需要长期记忆 能力,标准PostgreSQL数据库无法满足需求,需要支持向量相似度搜索功能:
version: '3.8' # ================================================== # Docker Compose配置 # ================================================== # 该文件定义了本地或单节点环境下运行应用所需的所有服务 services: # ================================================== # PostgreSQL + pgvector数据库 # ================================================== db: image: pgvector/pgvector:pg16 # 带pgvector扩展的PostgreSQL 16 environment: - POSTGRES_DB= # 数据库名 - POSTGRES_USER= # 数据库用户名 - POSTGRES_PASSWORD= # 数据库密码 ports: - "5432:5432" # 暴露端口供主机访问(仅开发环境) volumes: - postgres-data:/var/lib/postgresql/data # 数据库数据持久化 healthcheck: test: ["CMD-SHELL", "pg_isready -U -d "] interval: 10s timeout: 5s retries: 5 restart: always networks: - monitoring
我们明确使用pgvector/pgvector:pg16镜像而非标准PostgreSQL镜像,该镜像内置向量扩展,是mem0ai和LangGraph检查点功能的必需依赖。
配置中的健康检查至关重要:部署时,API服务需要等待数据库完全就绪后再启动,避免连接失败。
接下来定义主应用服务,即FastAPI代码的运行容器:
# ================================================== # FastAPI应用服务 # ================================================== app: build: context: . # 从项目根目录构建镜像 args: APP_ENV: # 构建时环境变量 ports: - "8000:8000" # 暴露FastAPI服务端口 volumes: - ./app:/app/app # 挂载代码目录实现热重载 - ./logs:/app/logs # 持久化应用日志 env_file: - .env. # 加载特定环境的变量 environment: - APP_ENV= - JWT_SECRET_KEY= depends_on: db: condition: service_healthy # 等待数据库健康检查通过 healthcheck: test: ["CMD", "curl", "-f", "<http://localhost:8000/health>"] interval: 30s timeout: 10s retries: 3 start_period: 10s restart: on-failure networks: - monitoring
注意volumes配置:将本地./app目录挂载到容器内的/app目录,实现热重载 ——修改代码后无需重启容器,服务会自动更新,极大提升开发效率。
生产系统离不开可观测性工具,开发团队需要实时掌握API响应速度、错误率等指标。我们采用Prometheus + Grafana组合实现监控:
# ================================================== # Prometheus(指标收集) # ================================================== prometheus: image: prom/prometheus:latest ports: - "9090:9090" # Prometheus UI端口 volumes: - ./prometheus/prometheus.yml:/etc/prometheus/prometheus.yml command: - '--config.file=/etc/prometheus/prometheus.yml' networks: - monitoring restart: always
Prometheus是指标收集器 ,每隔几秒从FastAPI应用拉取指标(如请求延迟、错误率)。通过挂载配置文件,我们可以指定指标采集的目标。
然后添加Grafana,作为指标可视化工具 :
# ================================================== # Grafana(指标可视化) # ================================================== grafana: image: grafana/grafana:latest ports: - "3000:3000" # Grafana UI端口 volumes: - grafana-storage:/var/lib/grafana - ./grafana/dashboards:/etc/grafana/provisioning/dashboards - ./grafana/dashboards/dashboards.yml:/etc/grafana/provisioning/dashboards/dashboards.yml environment: - GF_SECURITY_ADMIN_PASSWORD=admin - GF_USERS_ALLOW_SIGN_UP=false networks: - monitoring restart: always
Grafana将Prometheus采集的原始数据转化为直观的图表。通过挂载./grafana/dashboards目录,我们实现仪表盘即代码 ——启动容器时,监控面板会自动加载,无需手动配置。
最后,需要监控容器自身的健康状态(CPU使用率、内存泄漏等)。我们使用Google开发的轻量级监控工具cAdvisor,它能提供容器资源使用的实时数据:
# ================================================== # cAdvisor(容器指标监控) # ================================================== cadvisor: image: gcr.io/cadvisor/cadvisor:latest ports: - "8080:8080" # cAdvisor UI端口 volumes: - /:/rootfs:ro - /var/run:/var/run:rw - /sys:/sys:ro - /var/lib/docker/:/var/lib/docker:ro networks: - monitoring restart: always # ================================================== # 网络与数据卷 # ================================================== networks: monitoring: driver: bridge # 所有服务共享的网络 volumes: grafana-storage: # 持久化Grafana仪表盘与数据 postgres-data: # 持久化PostgreSQL数据
最后,我们定义共享网络monitoring,确保所有服务能安全通信;同时通过命名数据卷,保证数据库和仪表盘配置在容器重启后不会丢失。
二、构建数据持久层
我们已经搭建好数据库服务,但此时数据库还是空的。AI系统高度依赖结构化数据 ——我们不能简单地将JSON数据存入NoSQL数据库,而是需要在用户、对话会话、AI状态之间建立严格的关系。
(点击查看完整尺寸图片)
数据持久层(法里德·汗 制作)
为实现这一目标,我们选用SQLModel库。它整合了SQLAlchemy(数据库交互)和Pydantic(数据校验)的功能,是Python生态中现代化的ORM框架。
2.1 结构化建模
软件工程中有一条核心原则:不要重复自己(DRY) 。数据库中的几乎所有表都需要时间戳字段记录创建时间,我们无需在每个模型中重复编写这一逻辑,而是创建一个基础模型类。
(点击查看完整尺寸图片)
结构化建模(法里德·汗 制作)
创建app/models/base.py文件,定义抽象基础模型:
from datetime import datetime, UTC from typing import List, Optional from sqlmodel import Field, SQLModel, Relationship # ================================================== # 数据库基础模型 # ================================================== class BaseModel(SQLModel): """ 为所有表添加通用字段的抽象基础模型 使用抽象类确保数据库模式的一致性 """ # 生产环境中务必使用UTC时间,避免时区问题 created_at: datetime = Field(default_factory=lambda: datetime.now(UTC))
这个类非常简洁,为所有继承它的模型添加了created_at时间戳字段。
有了基础模型,就可以构建核心业务实体。任何面向用户的系统,最基础的需求就是认证 ,因此我们需要一个安全的用户模型来管理凭证。
2.2 实体定义
借鉴主流AI服务商的用户数据管理方式,我们创建包含邮箱和哈希密码字段的用户模型:
(点击查看完整尺寸图片)
实体定义(法里德·汗 制作)
创建app/models/user.py文件,定义用户模型:
from typing import TYPE_CHECKING, List import bcrypt from sqlmodel import Field, Relationship from app.models.base import BaseModel # 避免类型注解导致的循环导入 if TYPE_CHECKING: from app.models.session import Session # ================================================== # 用户模型 # ================================================== class User(BaseModel, table=True): """ 系统中的注册用户 """ # 主键 id: int = Field(default=None, primary_key=True) # 邮箱必须唯一,且建立索引以加速登录查询 email: str = Field(unique=True, index=True) # 绝对不要存储明文密码,只存储Bcrypt哈希值 hashed_password: str # 关系:一个用户拥有多个对话会话 sessions: List["Session"] = Relationship(back_populates="user") def verify_password(self, password: str) -> bool: """ 验证明文密码与存储的哈希值是否匹配 """ return bcrypt.checkpw(password.encode("utf-8"), self.hashed_password.encode("utf-8")) @staticmethod def hash_password(password: str) -> str: """ 为新密码生成安全的Bcrypt哈希值(带盐) """ salt = bcrypt.gensalt() return bcrypt.hashpw(password.encode("utf-8"), salt).decode("utf-8")
我们将密码哈希逻辑直接嵌入模型中,这是封装 思想的体现——用户数据的处理逻辑与数据本身放在一起,避免在应用其他地方出现安全漏洞。
接下来需要组织AI交互数据。用户不会只有一个无限长的对话,而是有多个独立的会话(Session) 。我们创建app/models/session.py文件定义会话模型:
from typing import TYPE_CHECKING, List from sqlmodel import Field, Relationship from app.models.base import BaseModel if TYPE_CHECKING: from app.models.user import User # ================================================== # 会话模型 # ================================================== class Session(BaseModel, table=True): """ 代表一次具体的对话/线程 将AI的内存与特定上下文关联 """ # 会话ID使用字符串(UUID),防止被轻易猜测 id: str = Field(primary_key=True) # 外键:关联到具体用户 user_id: int = Field(foreign_key="user.id") # 可选的对话友好名称(例如"食谱推荐") name: str = Field(default="") # 反向关联到用户 user: "User" = Relationship(back_populates="sessions")
该模型通过外键与用户模型关联,每个会话对应一个用户,代表一次独立的对话上下文。
2.3 数据传输对象(DTO)
最后,我们需要为LangGraph持久化功能定义模型。LangGraph是有状态的——如果服务器重启,我们不希望AI忘记当前的对话进度。
(点击查看完整尺寸图片)
数据传输对象(DTO)(法里德·汗 制作)
我们需要一个Thread模型作为检查点的锚点。创建app/models/thread.py文件:
from datetime import UTC, datetime from sqlmodel import Field, SQLModel # ================================================== # 会话线程模型(LangGraph状态) # ================================================== class Thread(SQLModel, table=True): """ 作为LangGraph检查点的轻量级锚点 实际的状态数据由AsyncPostgresSaver存储, 本模型仅用于验证线程是否存在 """ id: str = Field(primary_key=True) created_at: datetime = Field(default_factory=lambda: datetime.now(UTC))
为了简化应用中的导入操作,我们将所有模型汇总到app/models/database.py文件:
""" 数据库模型导出文件 支持简洁的导入方式:`from app.models.database import User, Thread` """ from app.models.thread import Thread # 显式定义导出内容 __all__ = ["Thread"]
完成数据库模型定义后,还需要处理数据传输 问题。
初级API开发的常见误区是将数据库模型直接暴露给用户——这既危险(可能泄露hashed_password等内部字段)又僵化。生产系统中,我们使用模式(Schema) ,也称为数据传输对象(DTO),定义API与外部的交互契约。
首先定义认证相关的模式。这部分需要严格的校验:密码必须满足复杂度要求,邮箱格式必须合法。创建app/schemas/auth.py文件:
import re from datetime import datetime from pydantic import BaseModel, EmailStr, Field, SecretStr, field_validator # ================================================== # 认证模式 # ================================================== class UserCreate(BaseModel): """ 用户注册输入的模式 """ email: EmailStr = Field(..., description="用户邮箱地址") # SecretStr避免密码在异常回溯中被记录 password: SecretStr = Field(..., description="用户密码", min_length=8, max_length=64) @field_validator("password") @classmethod def validate_password(cls, v: SecretStr) -> SecretStr: """ 强制执行强密码策略 """ password = v.get_secret_value() if len(password) < 8: raise ValueError("密码长度至少为8位") if not re.search(r"[A-Z]", password): raise ValueError("密码必须包含至少一个大写字母") if not re.search(r"[0-9]", password): raise ValueError("密码必须包含至少一个数字") if not re.search(r'[!@#$%^&*(),.?":{}|<>]', password): raise ValueError("密码必须包含至少一个特殊字符") return v class Token(BaseModel): """ JWT访问令牌响应模式 """ access_token: str = Field(..., description="JWT访问令牌") token_type: str = Field(default="bearer", description="令牌类型") expires_at: datetime = Field(..., description="令牌过期时间戳") class UserResponse(BaseModel): """ 公开的用户信息模式(可安全返回给前端) 注意:此处排除了密码字段 """ id: int email: str token: Token
接着定义对话接口的模式,处理用户输入消息和AI的流式响应。创建app/schemas/chat.py文件:
import re from typing import List, Literal from pydantic import BaseModel, Field, field_validator # ================================================== # 对话模式 # ================================================== class Message(BaseModel): """ 对话历史中的单条消息 """ role: Literal["user", "assistant", "system"] = Field(..., description="消息发送方") content: str = Field(..., description="消息内容", min_length=1, max_length=3000) @field_validator("content") @classmethod def validate_content(cls, v: str) -> str: """ 输入清洗:防止基础的XSS或注入攻击 """ if re.search(r"<script.*?>.*?</script>", v, re.IGNORECASE | re.DOTALL): raise ValueError("内容包含潜在危险的脚本标签") return v class ChatRequest(BaseModel): """ /chat接口的请求体 """ messages: List[Message] = Field(..., min_length=1) class ChatResponse(BaseModel): """ /chat接口的标准响应 """ messages: List[Message] class StreamResponse(BaseModel): """ 服务器发送事件(SSE)流式响应的格式 """ content: str = Field(default="") done: bool = Field(default=False)
最后,为LangGraph状态定义模式。LangGraph的工作原理是在节点(智能体、工具、内存)之间传递状态对象,因此需要明确状态的结构。创建app/schemas/graph.py文件:
from typing import Annotated from langgraph.graph.message import add_messages from pydantic import BaseModel, Field # ================================================== # LangGraph状态模式 # ================================================== class GraphState(BaseModel): """ 在图节点之间传递的核心状态对象 """ # 'add_messages'是一个归约函数,告诉LangGraph: # "新消息到来时,追加到列表中,而非覆盖原有内容" messages: Annotated[list, add_messages] = Field( default_factory=list, description="对话历史" ) # 从长期记忆(mem0ai)中检索的上下文 long_term_memory: str = Field( default="", description="从向量数据库提取的相关上下文" )
通过严格定义数据库模型(数据层)和API模式(接口层),我们为应用搭建了类型安全的基础。这能确保非法数据不会污染数据库,敏感数据不会泄露给用户。
三、安全与防护层
生产环境中,绝对不能信任用户输入,也不能允许无限制地访问系统资源。
你可能注意到,together.ai等API服务商都会限制每分钟的请求次数,这既是为了防止滥用,也是为了保护基础设施、控制成本。
(点击查看完整尺寸图片)
安全层(法里德·汗 制作)
如果部署没有防护措施的AI智能体,会出现两个严重问题:
1. 滥用 :机器人程序会疯狂请求API,导致OpenAI等服务商的账单激增
3.1 限流功能
在编写业务逻辑之前,我们需要先实现限流和输入清洗工具。
(点击查看完整尺寸图片)
限流测试(法里德·汗 制作)
首先实现限流功能,我们选用与FastAPI无缝集成的SlowAPI库。需要定义如何识别唯一用户(通常使用IP地址),并应用配置中定义的默认限流规则。创建app/core/limiter.py文件:
from slowapi import Limiter from slowapi.util import get_remote_address from app.core.config import settings # ================================================== # 限流配置 # ================================================== # 使用客户端IP地址作为限流标识 # 生产环境中,可能需要根据X-Forwarded-For头调整`key_func` limiter = Limiter( key_func=get_remote_address, default_limits=settings.RATE_LIMIT_DEFAULT )
后续只需为特定API路由添加@limiter.limit(...)装饰器,即可实现精细化限流控制。
3.2 输入清洗校验逻辑
其次是输入清洗。尽管现代前端框架已经提供了大量XSS(跨站脚本)防护,但后端API绝不能盲目信任传入的字符串。
(点击查看完整尺寸图片)
输入清洗校验(法里德·汗 制作)
我们编写工具函数实现字符串清洗,创建app/utils/sanitization.py文件:
import html import re from typing import Any, Dict, List # ================================================== # 输入清洗工具 # ================================================== def sanitize_string(value: str) -> str: """ 清洗字符串,防止XSS和其他注入攻击 """ if not isinstance(value, str): value = str(value) # 1. HTML转义:将<script>转换为<script> value = html.escape(value) # 2. 深度清理:彻底移除可能遗漏的脚本标签 # (这是一种纵深防御措施) value = re.sub(r"<script.*?>.*?</script>", "", value, flags=re.DOTALL) # 3. 移除空字节:防止底层二进制攻击 value = value.replace("\\0", "") return value def sanitize_email(email: str) -> str: """ 清洗并验证邮箱地址格式 """ # 基础清洗 email = sanitize_string(email) # 正则验证标准邮箱格式 if not re.match(r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\\.[a-zA-Z]{2,}$", email): raise ValueError("邮箱格式无效") return email.lower()
前面我们已经定义了令牌相关的模式,现在需要实现生成和验证令牌 的逻辑。我们使用JSON Web Token(JWT),它是无状态的——验证令牌时无需查询数据库,只需校验加密签名即可。创建app/utils/auth.py文件:
import re from datetime import UTC, datetime, timedelta from typing import Optional from jose import JWTError, jwt from app.core.config import settings from app.schemas.auth import Token from app.utils.sanitization import sanitize_string # ================================================== # JWT认证工具 # ================================================== def create_access_token(subject: str, expires_delta: Optional[timedelta] = None) -> Token: """ 生成新的JWT访问令牌 参数: subject: 唯一标识(用户ID或会话ID) expires_delta: 可选的自定义过期时间 """ if expires_delta: expire = datetime.now(UTC) + expires_delta else: expire = datetime.now(UTC) + timedelta(days=settings.JWT_ACCESS_TOKEN_EXPIRE_DAYS) # 载荷:编码到令牌中的数据 to_encode = { "sub": subject, # 主题(标准声明) "exp": expire, # 过期时间(标准声明) "iat": datetime.now(UTC), # 签发时间(标准声明) # JTI(JWT ID):当前令牌的唯一标识 # 便于后续实现令牌黑名单功能 "jti": sanitize_string(f"{subject}-{datetime.now(UTC).timestamp()}"), } encoded_jwt = jwt.encode(to_encode,