本文档介绍如何将 xb 与 Python LangChain 框架集成,构建强大的 RAG 应用。
┌──────────────────────────────────────────┐
│ Python LangChain 应用 │
│ (Chains, Agents, Memory) │
└──────────────────────────────────────────┘
↓ HTTP/gRPC
┌──────────────────────────────────────────┐
│ Go Backend (xb) │
│ • VectorSearch API │
│ • Hybrid Search API │
│ • Document Management API │
└──────────────────────────────────────────┘
↓
┌──────────────────────────────────────────┐
│ Qdrant / PostgreSQL+pgvector │
└──────────────────────────────────────────┘
package main
import (
"encoding/json"
"net/http"
"github.com/fndome/xb"
)
type SearchRequest struct {
Query string `json:"query"`
Embedding []float32 `json:"embedding"`
Filters map[string]interface{} `json:"filters"`
TopK int `json:"top_k"`
ScoreThreshold float64 `json:"score_threshold"`
}
type SearchResponse struct {
Results []SearchResult `json:"results"`
}
type SearchResult struct {
ID int64 `json:"id"`
Content string `json:"content"`
Metadata map[string]interface{} `json:"metadata"`
Score float64 `json:"score"`
}
func handleVectorSearch(w http.ResponseWriter, r *http.Request) {
var req SearchRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
// 构建查询
builder := xb.Of(&DocumentChunk{}).
VectorSearch("embedding", req.Embedding)
// 添加过滤条件
if docType, ok := req.Filters["doc_type"].(string); ok {
builder.Eq("doc_type", docType)
}
if lang, ok := req.Filters["language"].(string); ok {
builder.Eq("language", lang)
}
// 生成 Qdrant 查询
built := builder.
QdrantX(func(qx *xb.QdrantBuilderX) {
qx.ScoreThreshold(float32(req.ScoreThreshold))
}).
Build()
qdrantJSON, err := built.ToQdrantJSON()
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
// 执行查询(假设已有 qdrantClient)
results, err := qdrantClient.Search(r.Context(), qdrantQuery)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
// 转换结果
response := SearchResponse{
Results: convertResults(results),
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(response)
}
func main() {
http.HandleFunc("/api/vector-search", handleVectorSearch)
http.HandleFunc("/api/documents", handleDocumentCRUD)
http.ListenAndServe(":8080", nil)
}
from langchain.vectorstores.base import VectorStore
from langchain.embeddings.base import Embeddings
from typing import List, Tuple, Optional, Dict, Any
import requests
class XbVectorStore(VectorStore):
"""xb 向量存储适配器"""
def __init__(
self,
backend_url: str,
embedding: Embeddings,
collection_name: str = "default"
):
self.backend_url = backend_url
self.embedding = embedding
self.collection_name = collection_name
def add_texts(
self,
texts: List[str],
metadatas: Optional[List[dict]] = None,
**kwargs: Any
) -> List[str]:
"""添加文档"""
embeddings = self.embedding.embed_documents(texts)
documents = []
for i, (text, emb) in enumerate(zip(texts, embeddings)):
doc = {
"content": text,
"embedding": emb,
"metadata": metadatas[i] if metadatas else {}
}
documents.append(doc)
response = requests.post(
f"{self.backend_url}/api/documents",
json={"documents": documents, "collection": self.collection_name}
)
response.raise_for_status()
return [str(doc["id"]) for doc in response.json()["created"]]
def similarity_search_with_score(
self,
query: str,
k: int = 4,
filter: Optional[Dict[str, Any]] = None,
**kwargs: Any
) -> List[Tuple[Document, float]]:
"""相似度搜索(带分数)"""
# 生成查询向量
query_embedding = self.embedding.embed_query(query)
# 调用 xb backend
response = requests.post(
f"{self.backend_url}/api/vector-search",
json={
"query": query,
"embedding": query_embedding,
"filters": filter or {},
"top_k": k,
"score_threshold": kwargs.get("score_threshold", 0.0)
}
)
response.raise_for_status()
results = response.json()["results"]
# 转换为 LangChain Document 格式
docs_and_scores = []
for result in results:
doc = Document(
page_content=result["content"],
metadata=result["metadata"]
)
docs_and_scores.append((doc, result["score"]))
return docs_and_scores
def similarity_search(
self,
query: str,
k: int = 4,
**kwargs: Any
) -> List[Document]:
"""相似度搜索(不带分数)"""
docs_and_scores = self.similarity_search_with_score(query, k, **kwargs)
return [doc for doc, _ in docs_and_scores]
@classmethod
def from_texts(
cls,
texts: List[str],
embedding: Embeddings,
metadatas: Optional[List[dict]] = None,
backend_url: str = "http://localhost:8080",
**kwargs: Any
) -> "XbVectorStore":
"""从文本创建向量存储"""
store = cls(backend_url, embedding)
store.add_texts(texts, metadatas, **kwargs)
return store
from langchain.embeddings import OpenAIEmbeddings
from langchain.chat_models import ChatOpenAI
from langchain.chains import RetrievalQA
from langchain.document_loaders import TextLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
# 1. 初始化组件
embeddings = OpenAIEmbeddings()
llm = ChatOpenAI(model="gpt-4", temperature=0)
# 2. 创建向量存储
vector_store = XbVectorStore(
backend_url="http://localhost:8080",
embedding=embeddings,
collection_name="my_docs"
)
# 3. 加载并索引文档
loader = TextLoader("docs/knowledge.txt")
documents = loader.load()
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=500,
chunk_overlap=50
)
texts = text_splitter.split_documents(documents)
# 添加元数据
metadatas = [
{
"source": doc.metadata.get("source", ""),
"doc_type": "tutorial",
"language": "zh"
}
for doc in texts
]
vector_store.add_texts(
texts=[doc.page_content for doc in texts],
metadatas=metadatas
)
# 4. 创建检索链
qa_chain = RetrievalQA.from_chain_type(
llm=llm,
chain_type="stuff",
retriever=vector_store.as_retriever(
search_kwargs={
"k": 5,
"filter": {"language": "zh", "doc_type": "tutorial"}
}
),
return_source_documents=True
)
# 5. 查询
result = qa_chain({"query": "如何使用 xb 构建向量查询?"})
print(f"回答: {result['result']}")
print(f"\n来源文档:")
for doc in result['source_documents']:
print(f" - {doc.metadata['source']}")
class SqlxbHybridRetriever(BaseRetriever):
"""支持标量过滤的混合检索器"""
def __init__(
self,
vector_store: XbVectorStore,
base_filters: Optional[Dict[str, Any]] = None,
score_threshold: float = 0.7
):
self.vector_store = vector_store
self.base_filters = base_filters or {}
self.score_threshold = score_threshold
def get_relevant_documents(self, query: str) -> List[Document]:
"""检索相关文档"""
# 从查询中提取结构化过滤条件
filters = self._extract_filters(query)
filters.update(self.base_filters)
# 执行混合检索
docs_and_scores = self.vector_store.similarity_search_with_score(
query=query,
k=20, # 粗召回
filter=filters,
score_threshold=self.score_threshold
)
# 过滤低分结果
filtered_docs = [
doc for doc, score in docs_and_scores
if score >= self.score_threshold
]
return filtered_docs[:5] # 返回 top-5
def _extract_filters(self, query: str) -> Dict[str, Any]:
"""从查询中提取过滤条件(简化版)"""
filters = {}
# 语言检测
if contains_chinese(query):
filters["language"] = "zh"
else:
filters["language"] = "en"
# 文档类型识别
if "教程" in query or "tutorial" in query.lower():
filters["doc_type"] = "tutorial"
elif "API" in query.upper():
filters["doc_type"] = "api"
return filters
# 使用示例
hybrid_retriever = SqlxbHybridRetriever(
vector_store=vector_store,
base_filters={"status": "published"},
score_threshold=0.75
)
qa_chain = RetrievalQA.from_chain_type(
llm=llm,
retriever=hybrid_retriever
)
result = qa_chain({"query": "最近更新的 API 文档"})
from langchain.retrievers.multi_query import MultiQueryRetriever
# 自动生成多个查询变体
multi_query_retriever = MultiQueryRetriever.from_llm(
retriever=vector_store.as_retriever(),
llm=llm
)
# 单次查询会自动生成多个变体并合并结果
docs = multi_query_retriever.get_relevant_documents(
"xb 如何处理向量查询?"
)
# 内部可能生成:
# - "xb vector search usage"
# - "how to use xb for vector queries"
# - "xb vector query examples"
from langchain.retrievers import ContextualCompressionRetriever
from langchain.retrievers.document_compressors import LLMChainExtractor
# 创建压缩器
compressor = LLMChainExtractor.from_llm(llm)
# 包装检索器
compression_retriever = ContextualCompressionRetriever(
base_compressor=compressor,
base_retriever=vector_store.as_retriever(search_kwargs={"k": 10})
)
# 检索时自动压缩文档,只保留相关部分
compressed_docs = compression_retriever.get_relevant_documents(
"xb 的核心特性是什么?"
)
from langchain.chains.query_constructor.base import AttributeInfo
from langchain.retrievers.self_query.base import SelfQueryRetriever
# 定义元数据字段信息
metadata_field_info = [
AttributeInfo(
name="doc_type",
description="文档类型: tutorial, api, blog, faq",
type="string"
),
AttributeInfo(
name="language",
description="文档语言: zh, en",
type="string"
),
AttributeInfo(
name="created_at",
description="创建时间,格式为 YYYY-MM-DD",
type="string"
),
AttributeInfo(
name="author",
description="作者名称",
type="string"
),
]
document_content_description = "xb 库的技术文档和教程"
# 创建自查询检索器
self_query_retriever = SelfQueryRetriever.from_llm(
llm=llm,
vectorstore=vector_store,
document_contents=document_content_description,
metadata_field_info=metadata_field_info,
verbose=True
)
# 自然语言查询会自动提取过滤条件
docs = self_query_retriever.get_relevant_documents(
"查找2024年写的关于 API 的中文教程"
)
# 自动提取过滤条件:
# {
# "doc_type": "tutorial",
# "language": "zh",
# "created_at": {"$gte": "2024-01-01"}
# }
from langchain.agents import Tool, AgentType, initialize_agent
from langchain.memory import ConversationBufferMemory
# 定义工具
search_tool = Tool(
name="KnowledgeBaseSearch",
func=lambda q: vector_store.similarity_search(q, k=3),
description="""
用于搜索 xb 技术文档和教程。
输入应该是一个清晰的问题或关键词。
返回最相关的文档片段。
"""
)
# 创建 Agent
memory = ConversationBufferMemory(memory_key="chat_history", return_messages=True)
agent = initialize_agent(
tools=[search_tool],
llm=llm,
agent=AgentType.CONVERSATIONAL_REACT_DESCRIPTION,
memory=memory,
verbose=True
)
# 对话式查询
response = agent.run("xb 支持哪些数据库?")
print(response)
response = agent.run("那 Qdrant 的集成怎么用?") # 基于历史上下文
print(response)
from langchain.tools import StructuredTool
# 定义多个工具
search_docs_tool = StructuredTool.from_function(
func=lambda query, doc_type: vector_store.similarity_search(
query,
k=5,
filter={"doc_type": doc_type}
),
name="SearchDocs",
description="搜索特定类型的文档。参数: query (str), doc_type (str: tutorial|api|blog|faq)"
)
search_code_tool = StructuredTool.from_function(
func=lambda query: vector_store.similarity_search(
query,
k=3,
filter={"doc_type": "code_example"}
),
name="SearchCodeExamples",
description="搜索代码示例。参数: query (str)"
)
# 创建多工具 Agent
agent = initialize_agent(
tools=[search_docs_tool, search_code_tool],
llm=llm,
agent=AgentType.OPENAI_FUNCTIONS,
verbose=True
)
result = agent.run("我想看看如何使用 xb 进行向量检索的代码示例")
import os
from langchain.chains import ConversationalRetrievalChain
from langchain.memory import ConversationBufferMemory
class DocQASystem:
def __init__(self, backend_url: str, openai_api_key: str):
self.embeddings = OpenAIEmbeddings(openai_api_key=openai_api_key)
self.llm = ChatOpenAI(model="gpt-4", temperature=0, openai_api_key=openai_api_key)
self.vector_store = XbVectorStore(
backend_url=backend_url,
embedding=self.embeddings
)
self.memory = ConversationBufferMemory(
memory_key="chat_history",
return_messages=True,
output_key="answer"
)
self.qa_chain = ConversationalRetrievalChain.from_llm(
llm=self.llm,
retriever=self.vector_store.as_retriever(
search_kwargs={"k": 5, "score_threshold": 0.7}
),
memory=self.memory,
return_source_documents=True,
verbose=True
)
def index_directory(self, directory: str):
"""索引目录中的所有文档"""
from langchain.document_loaders import DirectoryLoader, TextLoader
loader = DirectoryLoader(
directory,
glob="**/*.md",
loader_cls=TextLoader
)
documents = loader.load()
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=500,
chunk_overlap=50,
separators=["\n\n", "\n", " ", ""]
)
splits = text_splitter.split_documents(documents)
# 添加元数据
for doc in splits:
doc.metadata.update({
"language": "zh" if contains_chinese(doc.page_content) else "en",
"doc_type": self._infer_doc_type(doc),
"indexed_at": datetime.now().isoformat()
})
self.vector_store.add_documents(splits)
return len(splits)
def query(self, question: str, filters: Optional[Dict] = None) -> Dict:
"""查询文档"""
if filters:
# 临时更新检索器的过滤条件
self.qa_chain.retriever.search_kwargs["filter"] = filters
result = self.qa_chain({"question": question})
return {
"answer": result["answer"],
"sources": [
{
"content": doc.page_content[:200] + "...",
"metadata": doc.metadata
}
for doc in result["source_documents"]
]
}
def _infer_doc_type(self, doc) -> str:
"""推断文档类型"""
filename = doc.metadata.get("source", "").lower()
if "tutorial" in filename or "guide" in filename:
return "tutorial"
elif "api" in filename:
return "api"
elif "blog" in filename:
return "blog"
elif "faq" in filename:
return "faq"
else:
return "general"
# 使用示例
if __name__ == "__main__":
qa_system = DocQASystem(
backend_url="http://localhost:8080",
openai_api_key=os.getenv("OPENAI_API_KEY")
)
# 索引文档
print("正在索引文档...")
num_chunks = qa_system.index_directory("./docs")
print(f"已索引 {num_chunks} 个文档块")
# 交互式问答
print("\n文档问答系统已就绪。输入 'quit' 退出。\n")
while True:
question = input("问题: ")
if question.lower() in ["quit", "exit"]:
break
result = qa_system.query(
question,
filters={"doc_type": "tutorial"} # 只搜索教程
)
print(f"\n回答: {result['answer']}\n")
print("来源:")
for i, source in enumerate(result['sources'], 1):
print(f" [{i}] {source['metadata']['source']}")
print()
# 批量生成 embedding 提高效率
texts = [doc.page_content for doc in documents]
# 每次处理 100 个
batch_size = 100
all_embeddings = []
for i in range(0, len(texts), batch_size):
batch = texts[i:i+batch_size]
embeddings = embeddings_model.embed_documents(batch)
all_embeddings.extend(embeddings)
# 批量插入
vector_store.add_texts_with_embeddings(texts, all_embeddings, metadatas)
import asyncio
from langchain.embeddings import OpenAIEmbeddings
class AsyncXbVectorStore(XbVectorStore):
async def aadd_texts(
self,
texts: List[str],
metadatas: Optional[List[dict]] = None,
**kwargs
) -> List[str]:
"""异步添加文档"""
embeddings = await self.embedding.aembed_documents(texts)
# ... 异步 HTTP 请求
async with aiohttp.ClientSession() as session:
async with session.post(
f"{self.backend_url}/api/documents",
json={"documents": documents}
) as response:
result = await response.json()
return [str(doc["id"]) for doc in result["created"]]
# 使用异步版本
async def index_documents_async(docs):
tasks = [
vector_store.aadd_texts([doc.page_content], [doc.metadata])
for doc in docs
]
await asyncio.gather(*tasks)
asyncio.run(index_documents_async(documents))
查看 examples/langchain-rag-app/ 目录获取完整的项目模板,包括:
下一步: 查看 LLAMAINDEX_INTEGRATION.md 了解 LlamaIndex 集成。