Skip to content

向量数据库详解

掌握向量数据库的核心技术,选择合适的存储与检索方案

🎯 核心概念

什么是向量数据库?

向量数据库是专门用于存储、索引和检索高维向量数据的数据库系统,是RAG系统的核心基础设施。

传统数据库 vs 向量数据库

python
# 传统数据库
SELECT * FROM articles WHERE title LIKE '%RAG%'  # 精确匹配

# 向量数据库  
SELECT * FROM embeddings ORDER BY cosine_distance(vector, query_vector) LIMIT 5  # 语义相似度

为什么需要向量数据库?

核心价值

高维检索:支持数百到数千维向量的高效相似性搜索
语义理解:基于向量距离进行语义匹配,不再依赖关键词
规模化处理:支持百万到亿级向量的存储与毫秒级检索

向量数据库在RAG中的关键作用

向量数据库是RAG系统的核心基础设施,直接影响检索质量和系统性能:

┌─────────────────────────────────────────────────────────────┐
│                    RAG 系统数据流                            │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  文档 → [Embedding模型] → 向量 → [向量数据库] ← 查询向量     │
│                                       ↓                     │
│                               检索Top-K文档                 │
│                                       ↓                     │
│                               [LLM生成答案]                 │
│                                                             │
└─────────────────────────────────────────────────────────────┘

检索质量直接决定生成质量

  • 若检索器未找到相关信息 → "垃圾进,垃圾出"
  • 向量数据库的索引策略、距离度量选择直接影响Recall@K
  • 检索层失败会在生成层被放大(错误冒泡效应)

🏗️ 向量检索算法原理

基于《RAGFlow的检索神器-HNSW:高维向量空间中的高效近似最近邻搜索算法》

检索算法演进

算法类型代表算法时间复杂度优势劣势
暴力搜索Linear ScanO(n)精度100%速度慢
树结构KD-TreeO(log n)低维效果好高维性能差
哈希方法LSHO(1)速度快精度不稳定
图方法HNSWO(log n)高精度+高速度内存占用大

HNSW算法深度解析

核心思想

HNSW(Hierarchical Navigable Small World)结合了跳表小世界网络的优势:

  1. 跳表启发:构建多层索引,快速缩小搜索范围
  2. 小世界网络:通过"捷径"连接实现快速导航
  3. 层次结构:从粗粒度到细粒度的渐进式搜索

算法架构

python
# HNSW的层次化结构
Layer 2: [入口节点] ----长距离连接----> [节点A]
         ↓                               ↓
Layer 1: [节点群1] ----中距离连接----> [节点群2] 
         ↓                               ↓
Layer 0: [所有节点] --短距离连接--> [目标区域]

搜索流程

  1. 顶层导航:从入口点开始,快速定位大致区域
  2. 逐层下降:在每层找到局部最优点,作为下层起点
  3. 底层精搜:在最密集的底层进行精确检索

性能表现

实验数据(10,000条768维向量)

平均查询时间:5.58毫秒
检索精度:接近100%(相比暴力搜索)
内存开销:约为原始数据的1.5-2倍


📊 主流向量数据库对比

开源解决方案

数据库算法支持语言特点适用场景
ChromaHNSWPython轻量、易用原型开发、小规模
WeaviateHNSWGo功能丰富、GraphQL中型项目
QdrantHNSWRust高性能、云原生高并发场景
MilvusHNSW/IVF/FAISSC++企业级、分布式大规模生产

云服务方案

服务提供商优势定价模式
PineconePinecone托管式、易扩展按向量数+查询量
Zilliz CloudZilliz基于Milvus按资源使用量
Supabase VectorSupabase集成PostgreSQL按存储+计算
Elastic CloudElastic与ES生态整合按节点资源

选型指南

按场景选择

🚀 快速原型:Chroma - 5分钟上手,本地开发友好
📈 中型项目:Qdrant - 性能好,部署简单
🏢 企业生产:Milvus - 功能全面,支持集群
☁️ 托管服务:Pinecone - 免运维,按需付费

选型决策矩阵

考量因素ChromaQdrantMilvusPinecone
数据规模<100万<1000万亿级亿级
部署复杂度⭐⭐⭐⭐⭐⭐
运维成本无(托管)
混合检索
元数据过滤基础强大强大强大
GPU加速
价格免费免费/商业免费/商业按量付费

与RAG评估体系的关联

向量数据库的检索质量直接影响RAG评估中的检索层指标

评估指标向量数据库影响因素
Recall@K索引算法(HNSW参数)、距离度量选择
Context Precision检索策略、过滤条件设置
MRR排序算法、相似度计算精度
检索延迟索引类型、硬件配置、数据规模

检索失败的代价

根据RAG三元组理论,低上下文相关性意味着RAG管道从源头偏离方向。即使生成器能力再强,检索器未找到相关信息时,也无法生成正确答案。


💻 实战代码示例

Chroma 快速上手

python
import chromadb
from chromadb.config import Settings

# 1. 初始化客户端
client = chromadb.Client(Settings(
    persist_directory="./chroma_db",  # 数据持久化目录
    anonymized_telemetry=False
))

# 2. 创建集合
collection = client.create_collection(
    name="rag_documents",
    metadata={"description": "RAG系统文档向量"}
)

# 3. 添加文档
documents = [
    "检索增强生成(RAG)是一种结合检索和生成的AI技术",
    "向量数据库用于存储和检索高维向量数据",
    "HNSW算法提供了高效的近似最近邻搜索"
]

# 自动向量化并存储
collection.add(
    documents=documents,
    ids=[f"doc_{i}" for i in range(len(documents))],
    metadatas=[{"source": "manual", "index": i} for i in range(len(documents))]
)

# 4. 语义搜索
results = collection.query(
    query_texts=["什么是RAG技术?"],
    n_results=2,
    include=["documents", "distances", "metadatas"]
)

print("检索结果:")
for i, (doc, distance) in enumerate(zip(results['documents'][0], results['distances'][0])):
    print(f"{i+1}. 相似度: {1-distance:.3f}")
    print(f"   内容: {doc[:50]}...")

Qdrant 高性能部署

python
from qdrant_client import QdrantClient
from qdrant_client.models import Distance, VectorParams, PointStruct

# 1. 连接Qdrant服务
client = QdrantClient(
    url="http://localhost:6333",  # 本地部署
    # url="https://your-cluster.qdrant.tech",  # 云服务
    # api_key="your-api-key"
)

# 2. 创建集合
collection_name = "document_vectors"
client.create_collection(
    collection_name=collection_name,
    vectors_config=VectorParams(
        size=1536,  # OpenAI embedding维度
        distance=Distance.COSINE
    )
)

# 3. 批量插入向量
from openai import OpenAI
openai_client = OpenAI()

def get_embeddings(texts):
    response = openai_client.embeddings.create(
        model="text-embedding-3-small",
        input=texts
    )
    return [item.embedding for item in response.data]

# 准备数据
documents = [
    "RAG系统的核心是检索和生成的结合",
    "向量数据库支持高维向量的相似性搜索",
    "HNSW算法在精度和速度间达到最佳平衡"
]

embeddings = get_embeddings(documents)

# 构造点数据
points = []
for i, (doc, vector) in enumerate(zip(documents, embeddings)):
    points.append(PointStruct(
        id=i,
        vector=vector,
        payload={
            "text": doc,
            "timestamp": "2024-01-01",
            "category": "rag_tech"
        }
    ))

# 批量上传
client.upsert(
    collection_name=collection_name,
    points=points
)

# 4. 高级检索
query_text = "向量搜索算法"
query_vector = get_embeddings([query_text])[0]

search_results = client.search(
    collection_name=collection_name,
    query_vector=query_vector,
    limit=5,
    score_threshold=0.7,  # 相似度阈值
    with_payload=True,
    with_vectors=False
)

print("检索结果:")
for result in search_results:
    print(f"ID: {result.id}, 得分: {result.score:.3f}")
    print(f"内容: {result.payload['text']}")
    print("---")

Milvus 企业级方案

python
from pymilvus import connections, Collection, FieldSchema, CollectionSchema, DataType, utility

# 1. 连接Milvus
connections.connect(
    alias="default",
    host="localhost",  # Milvus服务地址
    port="19530"
)

# 2. 定义数据结构
fields = [
    FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),
    FieldSchema(name="text", dtype=DataType.VARCHAR, max_length=65535),
    FieldSchema(name="vector", dtype=DataType.FLOAT_VECTOR, dim=1536),
    FieldSchema(name="category", dtype=DataType.VARCHAR, max_length=100)
]

schema = CollectionSchema(
    fields=fields,
    description="RAG文档向量集合",
    enable_dynamic_field=True
)

# 3. 创建集合
collection_name = "rag_collection"
if utility.has_collection(collection_name):
    utility.drop_collection(collection_name)

collection = Collection(name=collection_name, schema=schema)

# 4. 创建索引(HNSW)
index_params = {
    "metric_type": "COSINE",
    "index_type": "HNSW",
    "params": {
        "M": 16,        # 每个节点的最大连接数
        "efConstruction": 200  # 构建时的搜索深度
    }
}

collection.create_index(
    field_name="vector",
    index_params=index_params,
    timeout=300
)

# 5. 插入数据
import random

def generate_test_data(n=1000):
    texts = [f"这是第{i}条RAG相关文档内容" for i in range(n)]
    vectors = [[random.random() for _ in range(1536)] for _ in range(n)]
    categories = ["tech", "business", "research"] * (n // 3 + 1)
    
    return {
        "text": texts,
        "vector": vectors,
        "category": categories[:n]
    }

data = generate_test_data(10000)
collection.insert(data)
collection.flush()  # 确保数据写入

# 6. 加载集合到内存
collection.load()

# 7. 执行搜索
search_params = {
    "metric_type": "COSINE",
    "params": {"ef": 64}  # 搜索时的候选数量
}

query_vector = [[random.random() for _ in range(1536)]]
results = collection.search(
    data=query_vector,
    anns_field="vector",
    param=search_params,
    limit=10,
    expr='category == "tech"',  # 过滤条件
    output_fields=["text", "category"],
    timeout=30
)

print("检索结果:")
for hits in results:
    for hit in hits:
        print(f"ID: {hit.id}, 距离: {hit.distance:.3f}")
        print(f"文本: {hit.entity.get('text')}")
        print(f"分类: {hit.entity.get('category')}")
        print("---")

⚡ 性能优化策略

1. 索引参数调优

HNSW参数详解

python
# Qdrant HNSW配置
hnsw_config = {
    "m": 16,           # 连接数:越大精度越高,内存越大
    "ef_construct": 200, # 构建参数:影响构建质量
    "max_indexing_threads": 4,  # 构建线程数
}

# 搜索参数
search_params = {
    "ef": 128,         # 搜索候选数:越大精度越高,速度越慢
    "exact": False     # 是否精确搜索
}

参数选择建议

场景Mef_constructef内存占用速度精度
高精度64400200极高
平衡1620064
高速度810032中等

2. 数据预处理优化

python
def optimize_vectors(embeddings):
    """向量预处理优化"""
    import numpy as np
    from sklearn.preprocessing import normalize
    
    # 1. 标准化处理
    normalized = normalize(embeddings, norm='l2')
    
    # 2. 降维(可选)
    from sklearn.decomposition import PCA
    pca = PCA(n_components=512)  # 从1536降到512
    reduced = pca.fit_transform(normalized)
    
    # 3. 量化压缩
    quantized = (reduced * 127).astype(np.int8)  # 8位量化
    
    return quantized

3. 批量操作优化

python
class BatchProcessor:
    def __init__(self, collection, batch_size=1000):
        self.collection = collection
        self.batch_size = batch_size
        self.buffer = []
    
    def add(self, document, vector, metadata=None):
        """添加到缓冲区"""
        self.buffer.append({
            'doc': document,
            'vector': vector,
            'metadata': metadata or {}
        })
        
        # 达到批次大小时自动提交
        if len(self.buffer) >= self.batch_size:
            self.flush()
    
    def flush(self):
        """批量提交"""
        if not self.buffer:
            return
        
        documents = [item['doc'] for item in self.buffer]
        vectors = [item['vector'] for item in self.buffer]
        metadatas = [item['metadata'] for item in self.buffer]
        
        self.collection.add(
            documents=documents,
            embeddings=vectors,
            metadatas=metadatas,
            ids=[f"batch_{i}" for i in range(len(documents))]
        )
        
        self.buffer.clear()
        print(f"已提交 {len(documents)} 条记录")

# 使用示例
processor = BatchProcessor(collection)

for doc, vector in document_stream:
    processor.add(doc, vector, {"timestamp": time.time()})

processor.flush()  # 处理剩余数据

🔧 生产部署实践

Docker 容器化部署

yaml
# docker-compose.yml
version: '3.8'

services:
  qdrant:
    image: qdrant/qdrant:v1.7.0
    ports:
      - "6333:6333"
      - "6334:6334"
    volumes:
      - ./qdrant_storage:/qdrant/storage
    environment:
      - QDRANT__LOG_LEVEL=INFO
    deploy:
      resources:
        limits:
          memory: 4G
        reservations:
          memory: 2G

  milvus-etcd:
    image: quay.io/coreos/etcd:v3.5.0
    environment:
      - ETCD_AUTO_COMPACTION_MODE=revision
      - ETCD_AUTO_COMPACTION_RETENTION=1000
    volumes:
      - ./etcd:/etcd
    command: etcd -advertise-client-urls=http://127.0.0.1:2379 -listen-client-urls http://0.0.0.0:2379 --data-dir /etcd

  milvus-minio:
    image: minio/minio:RELEASE.2023-03-20T20-16-18Z
    environment:
      MINIO_ACCESS_KEY: minioadmin
      MINIO_SECRET_KEY: minioadmin
    ports:
      - "9001:9001"
      - "9000:9000"
    volumes:
      - ./minio:/minio_data
    command: minio server /minio_data --console-address ":9001"

  milvus-standalone:
    image: milvusdb/milvus:v2.3.0
    command: ["milvus", "run", "standalone"]
    environment:
      ETCD_ENDPOINTS: milvus-etcd:2379
      MINIO_ADDRESS: milvus-minio:9000
    volumes:
      - ./milvus.yaml:/milvus/configs/milvus.yaml
    ports:
      - "19530:19530"
      - "9091:9091"
    depends_on:
      - "milvus-etcd"
      - "milvus-minio"

监控与运维

python
class VectorDBMonitor:
    def __init__(self, client):
        self.client = client
    
    def health_check(self):
        """健康检查"""
        try:
            # 执行简单查询
            start_time = time.time()
            result = self.client.search(...)
            latency = time.time() - start_time
            
            return {
                "status": "healthy",
                "latency": latency,
                "timestamp": time.time()
            }
        except Exception as e:
            return {
                "status": "error", 
                "error": str(e),
                "timestamp": time.time()
            }
    
    def performance_metrics(self):
        """性能指标"""
        return {
            "collection_size": self.client.count(),
            "memory_usage": self.get_memory_usage(),
            "qps": self.calculate_qps(),
            "avg_latency": self.get_avg_latency()
        }

� 动态知识与向量更新

知识时效性挑战

RAG系统的核心价值之一是接入动态知识源,但这给向量数据库带来了独特挑战:

挑战描述解决方案
知识更新延迟新文档从入库到可检索的时间差增量索引、实时同步
新旧知识冲突同一主题存在过时和最新信息时间戳过滤、版本管理
索引重建成本大规模数据变更时的重建开销分区索引、增量更新

增量更新策略

python
class IncrementalVectorUpdater:
    """增量向量更新管理"""
    
    def __init__(self, vector_db):
        self.db = vector_db
        self.update_queue = []
    
    def add_with_versioning(self, doc_id, vector, metadata):
        """带版本控制的向量添加"""
        metadata['version'] = self.get_next_version(doc_id)
        metadata['timestamp'] = time.time()
        metadata['is_latest'] = True
        
        # 将旧版本标记为非最新
        self.mark_old_versions(doc_id)
        
        # 添加新向量
        self.db.upsert(
            ids=[f"{doc_id}_v{metadata['version']}"],
            embeddings=[vector],
            metadatas=[metadata]
        )
    
    def search_latest_only(self, query_vector, top_k=10):
        """仅检索最新版本"""
        return self.db.query(
            query_embeddings=[query_vector],
            n_results=top_k,
            where={"is_latest": True}  # 过滤条件
        )
    
    def cleanup_old_versions(self, retention_days=30):
        """清理过期版本"""
        cutoff = time.time() - (retention_days * 86400)
        self.db.delete(where={
            "$and": [
                {"is_latest": False},
                {"timestamp": {"$lt": cutoff}}
            ]
        })

知识更新性能指标

评估向量数据库动态知识能力的关键指标:

  • 知识更新延迟:从"新增文档"到"可检索到该文档"的时间差
  • 新信息优先率:新旧知识冲突时选择新信息的比例
  • 旧信息过滤率:能否识别并过滤"已过时的旧信息"

�🔗 相关阅读

相关文章

外部资源

基于 VitePress 构建