【AI】十一.Milvus向量数据库多案例实战 小滴课堂讲师 2025年09月18日 ai大模型, aigc 预计阅读 14 分钟 #### Python整合向量数据库Milvus案例实战 ##### Python操作Milvus实战 * 安装 Milvus Python SDK, 支持 Python、Node.js、GO 和 Java SDK。 * 建议安装与所安装 Milvus 服务器版本相匹配的 PyMilvus 版本 * 安装 ``` pip install pymilvus==2.5.5 ``` * 验证安装 如果 PyMilvus 安装正确,运行以下命令时不会出现异常 ``` python -c "from pymilvus import Collection" ``` * 接口可分为以下几类: * **DDL / DCL:**createCollection / createPartition / dropCollection / dropPartition / hasCollection / hasPartition * **DML / Produce:**插入 / 删除 / 上移 * **DQL:**搜索/查询 ##### 操作Milvus数据库 * 使用connect()连接 Milvus 服务器,进行相关操作 ```python #也可以使用MilvusClient #from pymilvus import MilvusClient #client = MilvusClient("http://47.119.128.20:19530") from pymilvus import connections, db conn = connections.connect(host="47.119.128.20", port=19530) # 创建数据库 #db.create_database("my_database") # 使用数据库 db.using_database("my_database") # 列出数据库 dbs = db.list_database() print(dbs) #['default', 'my_database'] # 删除数据库 db.drop_database("my_database") ``` ##### Collection与Schema的创建和管理 * Collection 是一个二维表,具有固定的列和变化的行,每列代表一个字段,每行代表一个实体。 * 要实现这样的结构化数据管理,需要一个 Schema定义 Collections 的表结构 * 每个Schema由多个`FieldSchema`组成: ```python from pymilvus import FieldSchema, DataType # 字段定义示例 fields = [ FieldSchema(name="id", dtype=DataType.INT64, is_primary=True), FieldSchema(name="vector", dtype=DataType.FLOAT_VECTOR, dim=128), FieldSchema(name="category", dtype=DataType.VARCHAR, max_length=50) ] ``` * 字段类型详解 | 数据类型 | 说明 | 示例 | | :-------------: | :----------: | :---------------: | | `INT8/16/32/64` | 整型 | `DataType.INT64` | | `FLOAT` | 单精度浮点数 | `DataType.FLOAT` | | `DOUBLE` | 双精度浮点数 | `DataType.DOUBLE` | | `VARCHAR` | 变长字符串 | `max_length=255` | | `FLOAT_VECTOR` | 浮点向量 | `dim=768` | * 创建collection实战 ```python from pymilvus import connections from pymilvus import FieldSchema, DataType from pymilvus import CollectionSchema, Collection conn = connections.connect(host="47.119.128.20", port=19530) # 步骤1:定义字段 fields = [ FieldSchema("id", DataType.INT64, is_primary=True), FieldSchema("vector", DataType.FLOAT_VECTOR, dim=128), FieldSchema("tag", DataType.VARCHAR, max_length=50) ] # 步骤2:创建Schema schema = CollectionSchema(fields, description="示例集合") # 步骤3:实例化Collection collection = Collection( name="demo_collection", schema=schema, shards_num=2 # 分片数(分布式扩展关键) ) ``` * 关键参数解析 | 参数 | 说明 | 推荐值 | | :-----------: | :------------------------: | :--------------: | | `shards_num` | 分片数量(创建后不可修改) | 集群节点数×2 | | `description` | 集合描述信息 | 建议填写业务用途 | * 动态字段Schema * 在集合中启用动态字段后,所有未在 Schema 中定义的字段及其值都将作为键值对存储在动态字段中 ```python # 启用动态字段(Milvus 2.3+) schema = CollectionSchema( fields, enable_dynamic_field=True ) ``` * 案例讲解 * 假设 Collections Schema 只定义两个字段,名为`id` 和`vector` ,启用了动态字段,在 Collections 中插入以下数据集 * 数据集包含 多个实体,每个实体都包括字段`id`,`vector`, 和`color` ,Schema 中没有定义`color` 字段。 * 由于 Collections 启用了动态字段,因此字段`color` 将作为键值对存储在动态字段中。 ```python [ {id: 0, vector: [0.3580376395471989, -0.6023495712049978, 0.18414012509913835, -0.26286205330961354, 0.9029438446296592], color: "pink_8682"}, {id: 7, vector: [-0.33445148015177995, -0.2567135004164067, 0.8987539745369246, 0.9402995886420709, 0.5378064918413052], color: "grey_8510"}, {id: 8, vector: [0.39524717779832685, 0.4000257286739164, -0.5890507376891594, -0.8650502298996872, -0.6140360785406336], color: "white_9381"}, {id: 9, vector: [0.5718280481994695, 0.24070317428066512, -0.3737913482606834, -0.06726932177492717, -0.6980531615588608], color: "purple_4976"} ] ``` | 类型 | 特点 | 适用场景 | | :------------: | :---------------------------: | :----------------------------: | | **静态Schema** | 严格字段定义 | 数据结构固定的业务(用户画像) | | **动态Schema** | 允许灵活字段(需Milvus 2.3+) | 日志类多变数据 | #### Milvus索引操作和最佳实践避坑指南 ##### 为什么需要索引 * 加速查询:避免暴力比对,快速定位相似向量, 平衡召回率与查询速度 * 节省资源:减少内存占用和计算开销, 建议为经常访问的向量和标量创建索引 ##### 常见的索引类型 | 索引类型 | 适用场景 | 内存占用 | 精度 | 构建速度 | | :------: | :----------------------: | :------: | :-----: | :------: | | FLAT | 小数据精确搜索(<100万) | 高 | 100% | 快 | | IVF_FLAT | 大数据平衡场景(千万级) | 中 | 95%-98% | 较快 | | HNSW | 高召回率需求 | 高 | 98%-99% | 慢 | | DISKANN | 超大规模(10亿+) | 低 | 90%-95% | 最慢 | ##### Milvus索引操作 * 创建索引 ```python # 导入MilvusClient和DataType模块,用于连接Milvus服务器并操作数据类型 from pymilvus import MilvusClient, DataType # 实例化MilvusClient以连接到指定的Milvus服务器 client = MilvusClient( uri="http://47.119.128.20:19530" ) # 创建schema对象,设置自动ID生成和动态字段特性 schema = MilvusClient.create_schema( auto_id=False, enable_dynamic_field=True, ) # 向schema中添加字段"id",数据类型为INT64,作为主键 schema.add_field(field_name="id", datatype=DataType.INT64, is_primary=True) # 向schema中添加字段"vector",数据类型为FLOAT_VECTOR,维度为5 schema.add_field(field_name="vector", datatype=DataType.FLOAT_VECTOR, dim=5) # 使用create_collection方法根据schema创建集合"customized_setup" client.create_collection( collection_name="customized_setup", schema=schema, ) # 准备索引参数,为"vector"字段创建索引 index_params = MilvusClient.prepare_index_params() # 添加索引配置,指定字段名、度量类型、索引类型、索引名和参数 index_params.add_index( field_name="vector", metric_type="COSINE", # 距离计算方式 (L2/IP/COSINE) index_type="IVF_FLAT", index_name="vector_index", params={ "nlist": 128 } #聚类中心数 (建议值:sqrt(数据量)) ) # 创建索引,不等待索引创建完成即返回 client.create_index( collection_name="customized_setup", index_params=index_params, sync=False # 是否等待索引创建完成后再返回。默认为True。 ) ``` * 参数说明 | 参数 | 参数 | | ----------------- | ------------------------------------------------------------ | | `field_name` | 指定字段名称 | | `metric_type` | 用于衡量向量间相似性的算法。值有**IP**、**L2**、**COSINE**、**JACCARD**、**HAMMING**。只有当指定字段是向量字段时才可用。 | | `index_type` | 索引类型 | | `index_name` | 索引名称 | | `params` | 指定索引类型的微调参数 | | `collection_name` | Collections 的名称。 | | `sync` | 控制与客户端请求相关的索引构建方式。有效值: `True` (默认):客户端等待索引完全建立后才返回。在该过程完成之前不会收到响应。`False`:客户端收到请求后立即返回,索引在后台建立 | * 查看索引信息 ```python #列出索引名称 res = client.list_indexes( collection_name="customized_setup" ) print(res) #获取索引详细信息 res = client.describe_index( collection_name="customized_setup", index_name="vector_index" ) print(res) ``` * 删除索引 * 删除前需确保无查询正在使用该索引 * 删除后需重新创建索引才能进行有效查询 ```python #如果不再需要索引,可以直接将其删除。 client.drop_index( collection_name="customized_setup", index_name="vector_index" ) print("索引已删除") ``` ##### 最佳实践与避坑指南 * **Schema设计原则** - 主键选择 - 推荐自增ID避免冲突 - 禁止使用向量字段作为主键 - **字段数量**:单个集合不超过32个字段 - **向量维度**:创建后不可修改,需提前规划 * **索引选择策略**: - 百万级以下 → FLAT - 百万到亿级 → IVF/HNSW - 十亿级以上 → DISKANN * **操作规范**: - 数据插入完成后再建索引 - 定期重建索引(数据变更超过30%) - 为高频查询字段建立独立索引 * **常见错误处理** | 错误场景 | 解决方案 | | :--------------: | :----------------------------------: | | "字段类型不匹配" | 检查插入数据与Schema定义的一致性 | | "主键冲突" | 插入前检查ID唯一性,或使用自动生成ID | | "向量维度错误" | 校验dim参数与数据实际维度 | #### Milvus向量数据库的DML操作实战 ##### 核心DML操作实战 * 创建集合(Collection),集合是Milvus中数据存储的基本单位,需定义字段和索引 * `auto_id=True`时无需手动指定主键 * 动态字段(`enable_dynamic_field=True`)允许灵活扩展非预定义字段 ```python # 导入MilvusClient和DataType模块,用于连接Milvus服务器并操作数据类型 from pymilvus import MilvusClient, DataType # 实例化MilvusClient以连接到指定的Milvus服务器 client = MilvusClient( uri="http://47.119.128.20:19530",user="root", password="xxx", db_name="my_database" ) # 定义Schema schema = client.create_schema(auto_id=False, enable_dynamic_field=True) schema.add_field(field_name="id", datatype=DataType.INT64, is_primary=True) schema.add_field(field_name="vector", datatype=DataType.FLOAT_VECTOR, dim=128) schema.verify() # 验证Schema # 定义索引参数 index_params = client.prepare_index_params() index_params.add_index( field_name="vector", index_type="IVF_FLAT", # 量化索引,平衡速度与精度 metric_type="L2", # 相似性度量标准(欧式距离) params={"nlist": 1024} # 聚类中心数 ) # 创建集合 client.create_collection( collection_name="my_collection", schema=schema, index_params=index_params ) ``` * 插入数据(Insert)支持单条或批量插入【可视化工具那边需要加载,包括查询等都是需要加载状态才可以操作】 ```python data = [ {"id": 1, "vector": [0.1]*128, "text": "Sample text 1"}, {"id": 2, "vector": [0.2]*128, "text": "Sample text 2"} ] # 插入数据 insert_result = client.insert( collection_name="my_collection", data=data ) print("插入ID列表:", insert_result["ids"]) # 返回主键ID ``` * 删除数据(Delete)通过主键或条件表达式删除 ```python # 按主键删除 client.delete( collection_name="my_collection", ids=[1, 2] # 主键列表 ) # 按条件删除(如删除text字段为空的记录) client.delete( collection_name="my_collection", filter="text == ''" ) ``` * 更新数据(Update)Milvus不支持直接更新,需通过“删除+插入”实现: ```python # 删除旧数据 client.delete(collection_name="my_collection", ids=[3]) # 插入新数据 client.insert( collection_name="my_collection", data=[{"id": 3, "vector": [0.3]*128, "text": "Updated text"}] ) ``` #### Milvus向量Search查询综合案例实战 ##### 需求说明 * 创建包含混合数据类型(标量+向量)的集合 * 批量插入结构化和非结构化数据 * 实现带过滤条件的混合查询 * 验证端到端的向量搜索流程 ##### Search语法深度解析 * 核心参数说明 ```python results = client 或 collection.search( data=[[0.12, 0.23, ..., 0.88]], # 查询向量(必须) anns_field="vector", # 要搜索的向量字段名(必须) param={"metric_type": "L2", "params": {"nprobe": 10}}, # 搜索参数 limit=10, # 返回结果数量 expr="price > 50", # 过滤表达式(可选) output_fields=["product_id", "price"], # 返回的字段 ) ``` | 参数 | 类型 | 说明 | 常用值 | | :-----------: | :--: | :----------: | :--------------------------------------: | | data | list | 查询向量列表 | 二维数组 | | anns_field | str | 向量字段名 | 创建时定义的字段 | | param | dict | 搜索参数 | 包含metric_type和params | | limit | int | 返回结果数 | 5-100 | | expr | str | 过滤条件 | price > 50 AND category == 'electronics' | | output_fields | list | 返回字段 | ["field1", "field2"] | ##### 搜索案例实战(MilvusClient方式) * 准备数据 ```python from pymilvus import ( connections,MilvusClient, FieldSchema, CollectionSchema, DataType, Collection, utility ) import random # # 创建Milvus客户端 client = MilvusClient( uri="http://47.119.128.20:19530", ) #删除已存在的同名集合 if client.has_collection("book"): client.drop_collection("book") # 定义字段 fields = [ FieldSchema(name="book_id", dtype=DataType.INT64, is_primary=True, auto_id=True), FieldSchema(name="title", dtype=DataType.VARCHAR, max_length=200), FieldSchema(name="category", dtype=DataType.VARCHAR, max_length=50), FieldSchema(name="price", dtype=DataType.DOUBLE), FieldSchema(name="book_intro", dtype=DataType.FLOAT_VECTOR, dim=4) ] # 创建集合Schema schema = CollectionSchema( fields=fields, description="Book search collection" ) #创建集合 client.create_collection(collection_name="book", schema=schema) # 生成测试数据 num_books = 1000 categories = ["科幻", "科技", "文学", "历史"] titles = ["量子世界", "AI简史", "时光之轮", "文明起源", "未来简史", "数据科学"] data = [] for i in range(num_books): data.append({ "title": f"{random.choice(titles)}_{i}", "category": random.choice(categories), "price": round(random.uniform(10, 100), 2), "book_intro": [random.random() for _ in range(4)] # 4维向量 }) # 批量插入 insert_result = client.insert( collection_name="book", data=data ) print(f"插入数据量:{len(insert_result['ids'])}") ``` * 创建索引 ```python # 准备索引参数,为"vector"字段创建索引 index_params = MilvusClient.prepare_index_params() # 添加索引配置,指定字段名、度量类型、索引类型、索引名和参数 index_params.add_index( field_name="book_intro", metric_type="L2", # 距离计算方式 (L2/IP/COSINE) index_type="IVF_FLAT", index_name="vector_index", params={ "nlist": 128 } #聚类中心数 (建议值:sqrt(数据量)) ) # 创建索引,不等待索引创建完成即返回 client.create_index( collection_name="book", index_params=index_params ) print("索引创建完成") ``` * 执行查询【执行查询前需要加载才可以使用】 ```python client.load_collection(collection_name="book") # 加载集合到内存 # 生成查询向量 query_vector = [random.random() for _ in range(4)] # 执行带过滤条件的向量搜索 results = client.search( collection_name="book", data=[query_vector], # 支持批量查询 filter="category == '科幻' and price < 50", output_fields=["title", "category", "price"], limit=3, search_params={"nprobe": 10} ) # 解析结果 print("\n科幻类且价格<50的搜索结果:") for result in results[0]: # 第一个查询结果集 print(f"ID: {result['book_id']}") print(f"距离: {result['distance']:.4f}") print(f"标题: {result['entity']['title']}") print(f"价格: {result['entity']['price']:.2f}") print("-" * 30) ``` * 向量数据库完整工作流程示意图 ``` 1. 创建集合Schema ↓ 2. 插入测试数据 ↓ 3. 创建向量索引 ↓ 4. 加载集合到内存 ↓ 5. 执行混合查询(向量+标量过滤) ``` * 全量查询案例演示 * 测试是否有 output_fields 字段,返回结果的差异 ```python # 案例1:基础向量查询 basic_res = client.search( collection_name="book", data=[query_vector], limit=5 ) # 案例2:分页查询 page_res = client.search( collection_name="book", data=[query_vector], offset=2, limit=3 ) # 案例3:批量查询 batch_res = client.search( collection_name="book", data=[query_vector, [0.5]*4], # 同时查询两个向量,每个向量都会返回2条 limit=2 ) ``` * 集合状态 ```python # 验证集合状态 print(client.describe_collection("book")) # 索引状态检查 print(client.list_indexes("book")) ``` ##### 新旧版本对比表 | 功能 | PyMilvus旧版 | MilvusClient新版 | | :----------- | :---------------------- | :---------------------------- | | 连接管理 | 需要手动管理connections | 客户端自动管理 | | 数据插入格式 | 多列表结构 | 字典列表 | | 字段定义 | 使用FieldSchema | 在create_collection中直接定义 | | 返回结果格式 | 对象属性访问 | 标准化字典格式 | | 错误处理 | 异常类捕获 | 统一错误码系统 | | 动态字段支持 | 需要额外配置 | 参数开启即可 |
评论区