【AI】十九.FastAPI框架高级技能和综合项目实战 小滴课堂讲师 2025年09月21日 flask python, ai大模型, aigc 预计阅读 11 分钟 #### FastAPI路由管理APIRouter案例实战 ##### 需求背景 * 代码混乱问题, 未使用路由管理(所有接口堆在main.py中) * 所有接口混杂在一个文件中,随着功能增加,文件会变得臃肿难维护。 ```python # main.py from fastapi import FastAPI app = FastAPI() # 用户相关接口 @app.get("/users") def get_users(): ... # 商品相关接口 @app.get("/items") def get_items(): ... # 订单相关接口 @app.get("/orders") def get_orders(): ... ``` * 重复配置问题, 未使用路由管理(重复写前缀和标签) * 每个接口都要重复写`/api/v1`前缀和`tags`,容易出错且难以统一修改 ```python # 用户接口 @app.get("/api/v1/users", tags=["用户管理"]) def get_users_v1(): ... # 商品接口 @app.get("/api/v1/items", tags=["商品管理"]) def get_items_v1(): ... ``` * 权限控制问题, 未使用路由管理(每个接口单独添加认证) * 需要在每个管理员接口重复添加认证依赖。 ```python @app.get("/admin/stats", dependencies=[Depends(admin_auth)]) def get_stats(): ... @app.post("/admin/users", dependencies=[Depends(admin_auth)]) def create_user(): ... ``` ##### 使用路由管理(模块化拆分) * 按业务模块拆分,每个文件职责单一。 ```python # 文件结构 routers/ ├── users.py # 用户路由 ├── items.py # 商品路由 └── orders.py # 订单路由 # main.py from routers import users, items, orders app.include_router(users.router) app.include_router(items.router) app.include_router(orders.router) ``` * 使用路由管理 ```python # 创建管理员专属路由组 admin_router = APIRouter( prefix="/admin", dependencies=[Depends(admin_auth)], # 👈 统一认证 tags=["管理员"] ) @admin_router.get("/stats") def get_stats(): ... @admin_router.post("/users") def create_user(): ... ``` ##### APIRouter核心概念 * 基础使用模板 ```python from fastapi import APIRouter router = APIRouter( prefix="/users", # 路由前缀 tags=["用户管理"], # OpenAPI分组 responses={404: {"description": "资源未找到"}} # 默认响应 ) @router.get("/", summary="获取用户列表") async def list_users(): return [{"id": 1, "name": "张三"}] ``` * 核心配置参数 | 参数 | 作用 | 示例值 | | :----------: | :--------------: | :---------------------: | | prefix | 路由统一前缀 | "/api/v1" | | tags | OpenAPI文档分组 | ["认证相关"] | | dependencies | 路由组公共依赖项 | [Depends(verify_token)] | | responses | 统一响应定义 | {400: {"model": Error}} | ##### 案例实战 * 创建文件` app/users.py` ```python from fastapi import APIRouter, HTTPException from pydantic import BaseModel router = APIRouter( prefix="/users", # 路由前缀 tags=["用户管理"], # OpenAPI文档分组 dependencies=[] # 模块级依赖 ) @router.get("/", summary="获取用户列表") async def list_users(): return [{"id": 1, "name": "Alice"}] @router.post("/", summary="创建新用户") async def create_user(): return {"id": 2, "name": "Bob"} class UserCreate(BaseModel): username: str password: str @router.post("/register", status_code=201) async def register(user: UserCreate): """用户注册接口""" # 实际应保存到数据库 return {"message": "用户创建成功", "username": user.username} @router.get("/{user_id}") async def get_user(user_id: int): if user_id > 100: raise HTTPException(404, "用户不存在") return {"user_id": user_id, "name": "虚拟用户"} ``` * 创建文件 `app/products.py` ```python # app/api/v1/products.py from fastapi import APIRouter router = APIRouter( prefix="/products", tags=["商品管理"], dependencies=[] ) @router.get("/search", summary="商品搜索") async def search_products( q: str, min_price: float = None, max_price: float = None ): # 实现搜索逻辑 return {"message": "搜索成功"} @router.get("/{product_id}", summary="获取商品详情") async def get_product_details(product_id: int): return {"id": product_id} ``` * 创建入口文件 `main.py` ```python from fastapi import FastAPI from app import users, products import uvicorn # 创建FastAPI应用实例 app = FastAPI() # 注册用户模块的路由 app.include_router(users.router) # 注册产品模块的路由 app.include_router(products.router) # 访问路径: # GET /users/ → 用户列表 # POST /users/ → 创建用户 #... # 打印所有注册的路由信息,便于开发者查看和调试 for route in app.routes: print(f"{route.path} → {route.methods}") # 定义根路径的GET请求处理函数 """ 处理根路径的GET请求。 返回值: dict: 返回一个JSON对象,包含欢迎消息。 """ @app.get("/") async def root(): return {"message": "Hello 小滴课堂"} # 主程序入口,用于启动FastAPI应用 if __name__ == '__main__': # 使用uvicorn运行FastAPI应用,默认监听本地地址和端口 uvicorn.run(app,port=8001) ``` * 模块化结构参考 ```python #整体项目结构 project/ ├── main.py └── routers/ ├── __init__.py ├── users.py ├── items.py ├── admin/ │ ├── dashboard.py │ └── audit.py └── v2/ └── users.py ``` * 总结 | 实践要点 | 说明 | | :----------: | :------------------------------------: | | 模块化组织 | 按业务功能拆分路由模块 | | 统一前缀管理 | 使用`prefix`参数避免路径重复 | | 文档友好 | 合理使用`tags`和`summary`优化API文档 | | 依赖分层 | 模块级依赖处理认证,路由级处理业务逻辑 | #### FastAPI依赖注入和常见项目结构设计 ##### 什么是依赖注入 * 用于将重复逻辑(如鉴权、数据库连接、参数校验)抽象为可复用的组件, * 通过依赖注入(Dependency Injection)自动注入到路由处理函数中,让代码更简洁、模块化且易于测试 * 核心 * **代码复用**:避免在多个路由中重复相同逻辑(如权限检查)。 * **解耦**:将业务逻辑与基础设施(如数据库、认证)分离。 * **层级化**:支持嵌套依赖,构建多层逻辑(如先验证用户,再验证权限) * 与 Java Spring 对比 | **功能** | **FastAPI** | **Spring (Java)** | | :----------- | :-------------------- | :----------------------------- | | **依赖注入** | 函数/类 + `Depends()` | `@Autowired` + 容器管理 | | **作用域** | 默认每次请求 | Singleton/Prototype/Request 等 | * 语法案例 * **定义依赖项函数** * 依赖项可以是任何可调用对象(如函数、类),通过 `Depends()` 声明依赖关系 ```python from fastapi import Depends, FastAPI app = FastAPI() # 定义一个依赖项(函数) def common_params(query: str = None, page: int = 1): return {"query": query, "page": page} # 在路由中使用依赖项 @app.get("/read_items") async def read_items(params: dict = Depends(common_params)): return params #说明:common_params 会被自动调用,结果注入到 params 参数。 #访问结果:read_items 中可直接使用 params["query"] 和 params["page"]。 ``` * 类作为依赖项 * 依赖项也可以是类,适合需要初始化或状态管理的场景, 通过依赖注入机制,将复杂逻辑解耦为可复用的模块 ```python #案例一 class DatabaseSession: def __init__(self): self.session = "模拟数据库连接" def close(self): print("关闭数据库连接") def get_db(): db = DatabaseSession() try: yield db finally: db.close() @app.get("/users/") async def get_users(db: DatabaseSession = Depends(get_db)): return {"db_session": db.session} #案例二 class Pagination: def __init__(self, page: int = 1, size: int = 10): self.page = page self.size = size @app.get("/articles/") async def get_articles(pagination: Pagination = Depends()): return {"page": pagination.page, "size": pagination.size} ``` * 全局依赖项 * 为所有路由添加公共依赖项(如统一认证) ```python app = FastAPI(dependencies=[Depends(verify_token)]) # 或针对特定路由组: router = APIRouter(dependencies=[Depends(log_request)]) #在部分管理员接口添加认证依赖。 @app.get("/admin/stats", dependencies=[Depends(admin_auth)]) def get_stats(): ... @app.post("/admin/users", dependencies=[Depends(admin_auth)]) def create_user(): ... ``` ##### FastAPI 项目结构设计原则 * 基础分层结构(适合小型项目) ```python myproject/ ├── main.py ├── routers/ │ ├── users.py │ └── items.py ├── models/ │ └── schemas.py └── dependencies.py ``` * 模块化拆分结构一(推荐中型项目) ```python src/ ├── app/ │ ├── core/ # 核心配置 │ │ ├── config.py │ │ └── security.py │ ├── api/ # 路由入口 │ │ ├── v1/ # 版本控制 │ │ │ ├── users/ │ │ │ │ ├── endpoints.py │ │ │ │ └── schemas.py │ │ │ └── items/ │ ├── models/ # 数据模型 │ ├── services/ # 业务逻辑 │ └── utils/ # 工具类 ├── tests/ # 测试目录 └── requirements.txt ``` * 模块化拆分结构二(推荐中型项目) ```python myproject/ ├── app/ # 应用核心目录 │ ├── core/ # 全局配置和工具 │ │ ├── config.py # 配置管理 │ │ └── security.py # 安全相关工具 │ ├── api/ # 路由端点 │ │ ├── v1/ # API版本目录 │ │ │ ├── users.py │ │ │ ├── items.py │ │ │ └── ai.py │ │ └── deps.py # 公共依赖项 │ ├── models/ # Pydantic模型 │ │ ├── user.py │ │ └── item.py │ ├── services/ # 业务逻辑层 │ │ ├── user_service.py │ │ └── ai_service.py │ ├── db/ # 数据库相关 │ │ ├── session.py # 数据库会话 │ │ └── models.py # SQLAlchemy模型 │ └── utils/ # 工具函数 │ └── logger.py ├── tests/ # 测试目录 │ ├── test_users.py │ └── conftest.py ├── static/ # 静态文件 ├── main.py # 应用入口 ├── requirements.txt └── .env # 环境变量 ``` * 推荐原则【遵循团队规范即可】 | 原则 | 实施方法 | | :------: | :--------------------------------: | | 单一职责 | 每个文件/类只做一件事 | | 依赖倒置 | 通过依赖注入解耦组件 | | 分层清晰 | 严格区分路由层、服务层、数据访问层 | | 版本控制 | 通过URL路径实现API版本管理 | | 文档友好 | 为每个路由添加summary和description | #### FastAPI+大模型流式AI问答助手实战 ##### 需求 * 开发一个基于AI的问答工具,能够根据用户提供的知识点或主题生成简洁的介绍或解释。 * 使用了大语言模型(LLM)来实现流式生成文本的功能,适用于教育、内容创作等场景 * FastAPI框架整合LLM大模型,提供HTTP服务 ##### StreamingResponse介绍 * FastAPI 的提供了 `StreamingResponse` 是一个用于处理流式传输数据的工具,适用于需要逐步发送大量数据或实时内容的场景。 * 允许通过生成器逐块发送数据,避免一次性加载全部内容到内存,提升性能和资源利用率。 ``` from fastapi.responses import StreamingResponse ``` * 核心功能 * 流式传输:逐步发送数据块,适用于大文件(如视频、日志)、大模型实时生成内容(如LLM响应、服务器推送事件)或长时间运行的任务。 * 内存高效:无需将完整数据加载到内存,减少服务器负载。 * 异步支持:兼容同步和异步生成器,灵活适配不同场景。 * 基本用法 * 在路由中返回 `StreamingResponse` 实例,并传入生成器作为数据源 * 参数说明 * `content`: 生成器函数,产生字节或字符串数据块。 * `media_type`: 指定 MIME 类型(如 `"text/event-stream"`、`"application/json"`)。 * `headers`: 自定义响应头(如 `{"Content-Disposition": "attachment; filename=data.csv"}`)。 * `status_code`: 设置 HTTP 状态码(默认为 `200`)。 * 案例实操 ```python async def ai_qa_stream_generator(query: str): """生成A回答的流式响应""" try: async for chunk in ai_writer.run_stream(query): json_data = json.dumps({"text": chunk}) yield f"data: {json_data}\n\n" except Exception as e: error_msg = json.dumps({"error": str(e)}) yield f"data: {error_msg}\n\n" @app.get("/ai_write") async def ai_writer_endpoint(query: str): """AI写作接口,返回流式响应""" return StreamingResponse( ai_qa_stream_generator(query), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive" } ) ``` ##### 编码实战 `app/ai_writer.py` ```python from langchain_openai import ChatOpenAI from langchain_core.prompts import ChatPromptTemplate from langchain_core.output_parsers import StrOutputParser from typing import AsyncGenerator # 封装AI问答的类 class AIWriter: def __init__(self): # 初始化语言模型 self.llm = self.llm_model() # 定义一个返回自定义语言模型的方法 def llm_model(self): #创建模型 model = ChatOpenAI( model_name = "qwen-plus", base_url="https://dashscope.aliyuncs.com/compatible-mode/v1", api_key="sk-005c3c25f6d042848b29d75f2f020f08", temperature=0.7, streaming=True ) return model async def run_stream(self, query: str) -> AsyncGenerator[str, None]: """运行AI问答并返回流式响应""" try: # 定义提示模板,要求对文档进行摘要总结 prompt_template = "用100个字解释下面的知识点或者介绍:{concept}" # 使用提示模板类创建模板 prompt = ChatPromptTemplate.from_template(prompt_template) # 定义LLM链,将自定义语言模型和提示模板结合 chain = prompt | self.llm | StrOutputParser() # 使用流式输出 async for chunk in chain.astream({"concept": query}): if isinstance(chunk, str): yield chunk elif isinstance(chunk, dict) and "content" in chunk: yield chunk["content"] else: yield str(chunk) except Exception as e: yield f"发生错误: {str(e)}" async def chat(self, query: str): """处理用户消息并返回流式响应""" try: async for chunk in self.run_stream(query): yield chunk except Exception as e: yield f"发生错误: {str(e)}" ``` * 编码实战 `writer_app.py` ```python import uvicorn from fastapi import FastAPI from fastapi.responses import StreamingResponse from app.ai_writer import AIWriter import json app = FastAPI() # 初始化智能体 ai_writer = AIWriter() async def ai_qa_stream_generator(query: str): """生成A回答的流式响应""" try: async for chunk in ai_writer.run_stream(query): json_data = json.dumps({"text": chunk}, ensure_ascii=False) yield f"data: {json_data}\n\n" except Exception as e: error_msg = json.dumps({"error": str(e)}) yield f"data: {error_msg}\n\n" @app.get("/ai_write") async def ai_writer_endpoint(query: str): """AI写作接口,返回流式响应""" return StreamingResponse( ai_qa_stream_generator(query), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive" } ) # 启动服务器的命令 if __name__ == "__main__": uvicorn.run(app, port=8003) ``` * 如何调试 * Apifox 提供了专门的 SSE(Server-Sent Events)调试功能,适合处理 AI 大模型的流式响应场景 * 配置 SSE,选择接口的请求方法,在请求头中添加 `Accept: text/event-stream` 来启用 SSE。
评论区