项目
博客
文档
归档
资源链接
关于我
项目
博客
文档
归档
资源链接
关于我
Python异步与多线程
2025-03-09
·
·
原创
·
Python
FastAPI
·
本文共 1,364个字,预计阅读需要 5分钟。
https://www.bilibili.com/video/BV1HDwBegE1z #### Python异步与多线程 #### Asyncio vs. Multithread ##### Async 异步是一种执行任务的顺序,可以通过多种方式实现 - 多线程异步(多线程可以实现异步-将每个任务放到不同的线程中执行就是实现了异步) ##### Python线程 import threading - 操作系统线程: pthread/Windows,在操作系统中是看得到的一个应用启动的 - Global Interpreter Lock (GIL) - **任一时刻,一个Python进程内,只有一个线程能够运行python解释器**; Python Thread1: TaskA()Python Thread 2: TaskB()释放GIL获得GIL获得GIL释放GIL释放GIL获得GIL #### Python Asyncio: 协程异步 Asyncio库通过单线程实现异步lO执行 - 等待IO时会释放GIL - - http://www.dabeaz.com/GlL/ Asyncio vs Multithread ``` if io_bound: if io_very_slow: print("Use Asyncio") else: print("Use Threads") else: print("Multi Processing") ``` - CPU Bound => Multi Processing多进程 - I/O Bound, Fast I/O, Limited Number of Connections => Multi Threading - I/0 Bound, Slow I/O, Many connections => Asyncio https://stackoverflow.com/questions/27435284/multiprocessing-vs-multithreading-vs-asyncio ``` def task(): s=0.0 for x in range(10000): s= s+ random() #Send the GET request response = requests.get(url) ``` ``` async def atask(): s=0.0 for x in range(10000): s = s+ random() # Send the GET request async with aiohttp.ClientSession()as session: async with session.get(url) as response: ... ``` | x20 串行 | 17.816s | | ------------ | ------- | | x20 多线程行 | 0.912 | | x20 Asyncio | 1.009s | Asyncio vs Multithread code执行code执行等待I0等待I0 主要考量 1. 是否有async的库函数 2. 任务的数量 3. 尽量使用已有的库(numpy,pytorch等)进行复杂的数学计算 #### Python Coroutine and Task 协程与任务 Python Coroutine Coroutines and Tasks - Python 3.12.6 documentation https://docs.python.org/3/library/asyncio-task.htmlo 中文 : https://docs.python.org/zh-cn/3/library/asyncio-task.html - Coroutines - Awaitables - Creating Tasks - Task Object Non-goal yield &生成器(generator)语义 事件循环(Event loop)实现 ```python async def say_after(delay,what): print(f"start: {what}, number of tasks: {count_task()}") await asyncio.sleep(delay) print(f"end: {what}") async def main(): task1 = asyncio.create_task(say_after(1,'hello')) task2 = asyncio.create_task(say_after(2,'world')) print(f"started at {time.strftime('%X')}") #等待直到两个任务都完成 #(会花费约2秒钟。) await task1 await task2 print(f"finished at {time.strftime('%x')}") asyncio.run(main()) # startedat 17:40:42 # start:hello,number of tasks: 3 # start: world, number of tasks: 3 # end:hello # end: world # finished at 17:40:44 async def main(): print(f"started at {time.strftime('%X')}") await say_after(1, 'hello') await say_after(2,'world') print(f"finished at {time.strftime('%x')}") # 没有效果跟串行一样,用了跟没用一样 # asyncio.run(main()) # started at 19:21:36 # start: hello, number of tasks: 1 # end:hello # start: world, number of tasks: 1 # end:world # finished at 19:21:39 ``` ##### 可等待对象 ##### Task对象 #### Coroutine与FastAPI ##### Asynchronous Server Gateway Interface(ASGI main.py from fastapi import FastAPI app = FastAPI() uvicorn main:app https://asgi.readthedocs.io/en/latest/implementations.html > FastAPI中关于普通函数和asyn函数重点: > > - asyn函数 async def app(scope, receive, send):是运行在Main thread中的,普通函数def handle()是运行在 Worker thread > - asyn函数可支持高并发的,普通函数支持的并发量是设定的线程池数量(默认40个线程),当超前过40个请求之后,其他请求进来就要排队,而asyn函数是没有限制的 > - 如果在运行函数时,存在**长时间运行而不释放**事件循环,**asyn函数**是在主线程中运行,第一个请求之后,其他请求都会**被阻塞**,而**普通函数**因为是在另一个线程中执行,**不会阻塞**。 #### BackgroundTasks BackgroundTasks用于长时间执行的任务 虽然是普通函数,但是使用了BackgroundTasks,而它的函数task_compute是async,它也会被放到主线程中去执行。 ```python async def task_compute(num: int): logger.info("start task computing..") time.sleep(10) return {"num": num,"q": str(num**2)} @app.get("/task/{num}") def task(num: int, background_tasks: BackgroundTasks) : logger.info("task added") background_tasks.add_task(task_compute,num) ``` 协程的诅咒 - 慢操作造成的任务阻塞 - 缺少类似线程调度的公平性 - IO-bound应用 # fastApi https://www.bilibili.com/video/BV1vYS1YQEvh ### 0.开发说明 在学习开发本项目之前,必须保证有以下知识储备和环境工具。 | 技术栈 | 说明 | | ---------------------------------------- | ---------------------------------------- | | python>=3.9、pydantic>=2.7.1 | python基础,http协议,异步编程、类型标注 | | fastapi>=0.111.0 | web异步框架,有web开发基础 | | mysql>=8.0、Tortoise-ORM>=0.20.1 | mysql数据库相关 | | redis>=6.xredis | 数据库相关 | | 微信开发者工具、uni-app、HbuilderX编辑器 | 开发小程序项目的UI框架,有小程序开发基础 | | vue>=3.x、vite | 前端web开发框架 | | git | 代码版本管理工具 | | docker、docker-compose | 镜像与容器基本操作 | ### 1.项目构建 #### 1.1服务端构建 手动创建工程目录路径不要使用中文或者特殊符号。 fastchat 创建虚拟环境,终端下执行命令如下: conda create -n fastchat python=3.10 安装完成以后,需要激活当前虚拟环境[切换虚拟],终端执行如下命令: conda activate fastchat ##### 1.1.1依赖安装 ```shell pip install -U python-dotenv -i https://pypi.tuna.tsinghua.edu.cn/simple pip install -U fastapi -i https://pypi.tuna.tsinghua.edu.cn/simple pip install -U uvicorn[standard] -i https://pypi.tuna.tsinghua.edu.cn/simple pip install -U tortoise-orm[aiomysql]-i https://pypi.tuna.tsinghua.edu.cn/simple pip install -U cryptography -i https://pypi.tuna.tsinghua.edu.cn/simple ``` ##### 1.1.2 项目目录 工程目录尽量和虚拟环境的名称保持一致,pycharm一般都可以自动识别。如果pycharm不能自动识别,则点击编辑器右下角选择自定义解释器即可。 ```py fastchat/ #工程目录 |—api/ #api服务端-基于fastAPI框架 | |—application #项目代码存储目录 | | __init__.py #项目初始化文件【创建App应用对象的函数,各种模块初始化】 | - main.py # 服务端程序入口 ``` man.py ```python import uvicorn from applicaton import create_app,FastAPI app: FastAPI = create_app() @app.get(/api') async def api(): return {'title':'fastchat api'} if._name__=='__main__': uvicorn.run(app:'main:app', host='0.0.0.0',port=8000, reload=True) ``` `application/__init__.py` ```python from fastapi import FastAPI def create_app()-> FastAPI: """工厂函数:创建App对象""" app = FastAPI() return app ``` ##### 1.1.3项目配置 在原目录结构基础上增加settings.py与.env、·gitignore,效果如下: ```py fastchat/ #工程目录 |—api/ #api服务端-基于fastAPI框架 | |—application #项目代码存储目录 | | | -settings.py #项目配置文件 | | | -__init__.py #项目初始化文件【创建App应用对象的函数,各种模块初始化】 | - env #环境配置[被settings.py加载],不会被git记录 | - main.py # 服务端程序入口 - .gitignore#git #忽略文件配置 ``` .env ```properties # 常规配置 APP_ENV=dev #当前开发环境 APP_NAME=fastchat #应用默认名称 APP_P0RT=8000 #web服务器监听端口 APP_H0ST=0.0.0.0 #web服务器监听地址,0.0.0.0表示监听任意指向当前服务器的地址 APP_VERSION=v0.0.1 #项目版本 APP_DEBUG=true #调试模式,true表示开启 APP_TIMEZoNE=Asia/Shanghai #时区 #数据库配置 DB_H0ST=127.0.0.1 #数据库地址 DB_P0RT=3306 #数据库端口 DB_USER=fastchat #用户名 DB_PASSWORD=fastchat #密码 DB_DATABASE=fastchat #数据库名 DB_CHARSET=Utf8mb4 #连接编码 DB_P00L_MINSIZE=10 #连接池中的最小连接数 DB_P00L_MAXSIZE=30 #连接池中的最大连接数 ``` 加载环境配置文件 ```python from fastapi import FastAPI from dotenv import load_dotenv def create_app()-> FastAPI: """工厂函数:创建App对象""" app = FastAPI() #读取环境配置文件的信息,加载到环境配置 load_dotenv() print(os.environ.get('APP_TIMEZoNE')) return app ``` .gitignore代码: ```python .env .idea __pycache__ ``` 修改启动 ```python if __name__ == '__main__': uvicorn.run( app:'main:app', host=os.environ.get('APP_HOST','0.0.0.0'), port=int(os.environ.get('APP_PORT',8000)), reload=bool(os.environ.get('APP_DEBUG'))) ``` 手动创建配置文件api/application/settings.py,编写tortoise-orm的配置信息,代码: ```python import os """tortoise-orm数据库配置""" TORTOISE_ORM={ "connections": { "default": { 'engine': 'tortoise.backends.mysql', # #MysQL or Mariadb 'credentials': { #连接参数 "host': os.environ.get('DB_HOST','127.0.0.1'), # 数据库IP/域名地址 'port': int(os.environ.get('DB_PORT', 3306)), #端口 'user': os.environ.get('DB_USER','root'), #连接账户 'password': os.environ.get('DB_PASSWORD','123'), #连接密=码 'database': os.environ.get('DBDATABASE','fastchat'), #数据库 'charset': os.environ.get('DB_CHARSET','utf8mb4'),#编码 'minsize': int(os.environ.get('DB_POOL_MINSIZE',1)),#连接池中的最小连接数 'maxsize':int(os.environ.get('DB_POOL_MAXSIZE',5)),#连接池中的最大连接数 "echo": bool(os.environ.get('DEBUG',True)) #执行数据库操作时,是否打印SQL语句 } } }, 'apps':{ #默认所在的应用目录 'models':{ #数据模型的分组名 'modeLs':[],#模型所在目录文件的导包路径[字符串格式] 'default_connection':'default',#上一行配置中的模型列表的默认连接配置 } }, # 时区设置 # 当use_tz=True,当前tortoise-orm会默认使用当前程序所在操作系统的时区, #当use_tz=False时,当前tortoise-orm会默认使用timezone配置项中的时区 'use_tz': False, 'timezone': os.environ.get('APP_TIMEZoNE','Asia/Shanghai') } ``` 注册 ```python from fastapi import FastAPI from dotenv import load_dotenv from tortoise.contrib.fastapi import register_tortoise from . import settings def create_app()-> FastAPI: """工厂函数:创建App对象""" app = FastAPI() #读取环境配置文件的信息,加载到环境配置 load_dotenv() print(os.environ.get('APP_TIMEZoNE')) #把Tortoise-orm注册到App应用对象中 register_tortoise( app, config=settings.TORTOISE_ORM, generate_schemas=False, # 是否自动生成表结构 add_exception_handlers=True,#是否启用自动异常处理 ) return app ``` 完成上面的配置以后,因为tortoise-orm默认并没有连接数据库,,因此我们需要编写一个数据表模型进行数据库连接操作以测试连接配置是否正确,不过这块我们先放一放,因为项目开发过程中有可能数据库需要保存很多数据,自然也就需要创建对应很多的数据表模型,而不同的数据对应的功能业务是不同的,因此我们需要分开写在不同的文件或者目录下,所以我们得先配置分组应用,不同的功能分属于不同的应用下,每一个应用都属于自己的数据表模型、api视图接口、路由数据等。 ##### 1.1.3.2 应用分组 首先创建分组应用存储目录apps,并在apps目录下先创建2个应用分组目录,分别是common公共数据应用分组与users用户数据应用分组,目录结构如下: ```py application #项目代码存储目录 — apps/ #分组应用存储目录 - _init_.py - common/ #公共数据的应用分组 - models.py #表模型文件 - views.py #api视图接口文件 - users/ #用户数据的应用分组 — models.py #表模型文件 - views.py #api视图接口文件 - scheams.py #请求与响应数据模型文件 ``` ```python from apps.common.views import app as common_app #注册各个分组应用中的视图接口代码到App应用对象中 app.include_router(common_app) return app ```