Chapter 4: Advanced RAG Techniques
This chapter explores advanced techniques and optimizations for RAG systems, including embeddings, vector storage, retrieval strategies, and evaluation methods based on your comprehensive materials.
Embedding Systems
Advanced Embedding Implementation
Building on your model I/O specifications, here's a comprehensive embedding system:
import numpy as np
import os
import hashlib
import pickle
import time
from typing import List, Dict, Any, Optional, Union, Tuple
from abc import ABC, abstractmethod
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass
import logging
logger = logging.getLogger(__name__)
@dataclass
class EmbeddingConfig:
"""Configuration for embedding models"""
model_name: str
dimension: int
max_sequence_length: int
batch_size: int = 32
device: Optional[str] = None
normalize_embeddings: bool = True
cache_embeddings: bool = True
class BaseEmbedding(ABC):
"""Abstract base class for embedding models"""
@abstractmethod
def embed_documents(self, texts: List[str]) -> List[List[float]]:
"""Embed a list of documents"""
pass
@abstractmethod
def embed_query(self, text: str) -> List[float]:
"""Embed a single query"""
pass
@property
@abstractmethod
def dimension(self) -> int:
"""Get embedding dimension"""
pass
class OpenAIEmbedding(BaseEmbedding):
"""OpenAI Embeddings API wrapper with advanced features"""
def __init__(
self,
model: str = "text-embedding-3-small",
api_key: Optional[str] = None,
api_base: Optional[str] = None,
batch_size: int = 100,
max_retries: int = 3,
request_timeout: int = 60,
dimensions: Optional[int] = None,
embedding_ctx_length: int = 8192,
cache_dir: Optional[str] = None
):
try:
import openai
except ImportError:
raise ImportError("Please install openai: pip install openai")
self.client = openai.OpenAI(
api_key=api_key or os.getenv("OPENAI_API_KEY"),
base_url=api_base
)
self.model = model
self.batch_size = batch_size
self.max_retries = max_retries
self.request_timeout = request_timeout
self.dimensions = dimensions
self.embedding_ctx_length = embedding_ctx_length
self.cache_dir = cache_dir
# Model dimensions mapping
self._model_dimensions = {
"text-embedding-3-small": 1536,
"text-embedding-3-large": 3072,
"text-embedding-ada-002": 1536
}
self._cache = {}
if self.cache_dir:
self._load_cache()
@property
def dimension(self) -> int:
"""Get embedding dimension"""
if self.dimensions:
return self.dimensions
return self._model_dimensions.get(self.model, 1536)
def embed_documents(self, texts: List[str]) -> List[List[float]]:
"""Embed multiple documents with batching and caching"""
all_embeddings = []
# Process in batches
for i in range(0, len(texts), self.batch_size):
batch = texts[i:i + self.batch_size]
batch_embeddings = self._embed_batch(batch)
all_embeddings.extend(batch_embeddings)
return all_embeddings
def embed_query(self, text: str) -> List[float]:
"""Embed single query"""
return self._embed_batch([text])[0]
def _embed_batch(self, texts: List[str]) -> List[List[float]]:
"""Embed a batch of texts with caching and retry logic"""
# Check cache first
cache_keys = [self._get_cache_key(text) for text in texts]
cached_embeddings = []
texts_to_embed = []
cache_indices = []
for i, (text, cache_key) in enumerate(zip(texts, cache_keys)):
if cache_key in self._cache:
cached_embeddings.append((i, self._cache[cache_key]))
else:
texts_to_embed.append(text)
cache_indices.append(i)
# Generate embeddings for uncached texts
new_embeddings = []
if texts_to_embed:
new_embeddings = self._call_api(texts_to_embed)
# Update cache
for text, embedding in zip(texts_to_embed, new_embeddings):
cache_key = self._get_cache_key(text)
self._cache[cache_key] = embedding
# Combine cached and new embeddings in correct order
result = [None] * len(texts)
# Add cached embeddings
for idx, embedding in cached_embeddings:
result[idx] = embedding
# Add new embeddings
for cache_idx, embedding in zip(cache_indices, new_embeddings):
result[cache_idx] = embedding
# Save cache if updated
if new_embeddings and self.cache_dir:
self._save_cache()
return result
def _call_api(self, texts: List[str]) -> List[List[float]]:
"""Call OpenAI API with retry logic"""
for attempt in range(self.max_retries):
try:
# Prepare API call parameters
kwargs = {
"model": self.model,
"input": texts,
"timeout": self.request_timeout
}
if self.dimensions:
kwargs["dimensions"] = self.dimensions
response = self.client.embeddings.create(**kwargs)
return [item.embedding for item in response.data]
except Exception as e:
if attempt == self.max_retries - 1:
logger.error(f"Failed to get embeddings after {self.max_retries} attempts: {e}")
raise
# Exponential backoff
wait_time = (2 ** attempt) + np.random.uniform(0, 1)
logger.warning(f"API call failed (attempt {attempt + 1}), retrying in {wait_time:.2f}s: {e}")
time.sleep(wait_time)
def _get_cache_key(self, text: str) -> str:
"""Generate cache key for text"""
key_string = f"{self.model}:{self.dimensions}:{text}"
return hashlib.sha256(key_string.encode()).hexdigest()
def _load_cache(self):
"""Load embedding cache from disk"""
if not self.cache_dir:
return
cache_file = os.path.join(self.cache_dir, f"openai_embedding_cache_{self.model}.pkl")
if os.path.exists(cache_file):
try:
with open(cache_file, "rb") as f:
self._cache = pickle.load(f)
logger.info(f"Loaded {len(self._cache)} cached embeddings")
except Exception as e:
logger.warning(f"Failed to load cache: {e}")
self._cache = {}
def _save_cache(self):
"""Save embedding cache to disk"""
if not self.cache_dir:
return
os.makedirs(self.cache_dir, exist_ok=True)
cache_file = os.path.join(self.cache_dir, f"openai_embedding_cache_{self.model}.pkl")
try:
with open(cache_file, "wb") as f:
pickle.dump(self._cache, f)
except Exception as e:
logger.warning(f"Failed to save cache: {e}")
class HuggingFaceEmbedding(BaseEmbedding):
"""HuggingFace SentenceTransformers embedding with advanced features"""
def __init__(
self,
model_name: str = "sentence-transformers/all-MiniLM-L6-v2",
device: Optional[str] = None,
model_kwargs: Optional[Dict] = None,
encode_kwargs: Optional[Dict] = None,
multi_process: bool = False,
normalize_embeddings: bool = True,
cache_dir: Optional[str] = None
):
try:
from sentence_transformers import SentenceTransformer
except ImportError:
raise ImportError("Please install sentence-transformers: pip install sentence-transformers")
self.model_name = model_name
self.model_kwargs = model_kwargs or {}
self.encode_kwargs = encode_kwargs or {}
self.multi_process = multi_process
self.normalize_embeddings = normalize_embeddings
self.cache_dir = cache_dir
# Initialize model
self.model = SentenceTransformer(
model_name,
device=device,
cache_folder=cache_dir,
**self.model_kwargs
)
self._cache = {}
if self.cache_dir:
self._load_cache()
@property
def dimension(self) -> int:
"""Get embedding dimension"""
return self.model.get_sentence_embedding_dimension()
def embed_documents(self, texts: List[str]) -> List[List[float]]:
"""Embed multiple documents"""
# Check cache
cached_results = []
texts_to_embed = []
text_indices = []
for i, text in enumerate(texts):
cache_key = self._get_cache_key(text)
if cache_key in self._cache:
cached_results.append((i, self._cache[cache_key]))
else:
texts_to_embed.append(text)
text_indices.append(i)
# Generate new embeddings
new_embeddings = []
if texts_to_embed:
embeddings = self.model.encode(
texts_to_embed,
normalize_embeddings=self.normalize_embeddings,
**self.encode_kwargs
)
if isinstance(embeddings, np.ndarray):
new_embeddings = embeddings.tolist()
else:
new_embeddings = embeddings
# Update cache
for text, embedding in zip(texts_to_embed, new_embeddings):
cache_key = self._get_cache_key(text)
self._cache[cache_key] = embedding
# Combine results
result = [None] * len(texts)
# Add cached results
for idx, embedding in cached_results:
result[idx] = embedding
# Add new results
for text_idx, embedding in zip(text_indices, new_embeddings):
result[text_idx] = embedding
# Save cache
if new_embeddings and self.cache_dir:
self._save_cache()
return result
def embed_query(self, text: str) -> List[float]:
"""Embed single query"""
cache_key = self._get_cache_key(text)
if cache_key in self._cache:
return self._cache[cache_key]
embedding = self.model.encode(
[text],
normalize_embeddings=self.normalize_embeddings,
**self.encode_kwargs
)[0]
if isinstance(embedding, np.ndarray):
embedding = embedding.tolist()
# Cache result
self._cache[cache_key] = embedding
if self.cache_dir:
self._save_cache()
return embedding
def _get_cache_key(self, text: str) -> str:
"""Generate cache key for text"""
key_string = f"{self.model_name}:{self.normalize_embeddings}:{text}"
return hashlib.sha256(key_string.encode()).hexdigest()
def _load_cache(self):
"""Load embedding cache"""
cache_file = os.path.join(self.cache_dir, f"hf_embedding_cache_{self.model_name.replace('/', '_')}.pkl")
if os.path.exists(cache_file):
try:
with open(cache_file, "rb") as f:
self._cache = pickle.load(f)
logger.info(f"Loaded {len(self._cache)} cached embeddings")
except Exception as e:
logger.warning(f"Failed to load cache: {e}")
self._cache = {}
def _save_cache(self):
"""Save embedding cache"""
if not self.cache_dir:
return
os.makedirs(self.cache_dir, exist_ok=True)
cache_file = os.path.join(self.cache_dir, f"hf_embedding_cache_{self.model_name.replace('/', '_')}.pkl")
try:
with open(cache_file, "wb") as f:
pickle.dump(self._cache, f)
except Exception as e:
logger.warning(f"Failed to save cache: {e}")
class EmbeddingPipeline:
"""Complete embedding pipeline with optimization and monitoring"""
def __init__(
self,
embedding_model: BaseEmbedding,
batch_size: int = 100,
max_workers: int = 4,
cache_dir: Optional[str] = None,
enable_monitoring: bool = True
):
self.embedding_model = embedding_model
self.batch_size = batch_size
self.max_workers = max_workers
self.cache_dir = cache_dir
self.enable_monitoring = enable_monitoring
# Statistics
self.stats = {
"total_embeddings": 0,
"cache_hits": 0,
"api_calls": 0,
"total_time": 0.0,
"errors": 0
}
def embed_documents(self, documents: List[Document]) -> List[Document]:
"""Embed documents and add embeddings to metadata"""
start_time = time.time()
try:
# Extract texts
texts = [doc.page_content for doc in documents]
# Generate embeddings
embeddings = self.embedding_model.embed_documents(texts)
# Add embeddings to documents
embedded_docs = []
for doc, embedding in zip(documents, embeddings):
new_doc = Document(
page_content=doc.page_content,
metadata={**doc.metadata, "embedding": embedding}
)
embedded_docs.append(new_doc)
# Update statistics
if self.enable_monitoring:
elapsed = time.time() - start_time
self.stats["total_embeddings"] += len(embeddings)
self.stats["total_time"] += elapsed
logger.info(f"Generated {len(embeddings)} embeddings in {elapsed:.2f}s")
return embedded_docs
except Exception as e:
self.stats["errors"] += 1
logger.error(f"Error embedding documents: {e}")
raise
def embed_queries(self, queries: List[str]) -> List[List[float]]:
"""Embed multiple queries efficiently"""
start_time = time.time()
try:
embeddings = []
# Process in batches
for i in range(0, len(queries), self.batch_size):
batch = queries[i:i + self.batch_size]
batch_embeddings = []
for query in batch:
embedding = self.embedding_model.embed_query(query)
batch_embeddings.append(embedding)
embeddings.extend(batch_embeddings)
# Update statistics
if self.enable_monitoring:
elapsed = time.time() - start_time
self.stats["total_embeddings"] += len(embeddings)
self.stats["total_time"] += elapsed
return embeddings
except Exception as e:
self.stats["errors"] += 1
logger.error(f"Error embedding queries: {e}")
raise
def get_stats(self) -> Dict[str, Any]:
"""Get embedding statistics"""
stats = self.stats.copy()
if stats["total_embeddings"] > 0:
stats["avg_time_per_embedding"] = stats["total_time"] / stats["total_embeddings"]
stats["cache_hit_rate"] = stats["cache_hits"] / stats["total_embeddings"]
return stats
def clear_stats(self):
"""Clear statistics"""
self.stats = {
"total_embeddings": 0,
"cache_hits": 0,
"api_calls": 0,
"total_time": 0.0,
"errors": 0
}
# Usage example based on your model_io.ipynb
def create_embedding_system():
"""Create embedding system as shown in model_io.ipynb"""
# OpenAI embeddings (as in your notebook)
openai_embeddings = OpenAIEmbedding(
model="text-embedding-3-small",
api_key=os.getenv("OPENAI_API_KEY"),
batch_size=100,
cache_dir="./embedding_cache"
)
# HuggingFace embeddings (alternative)
hf_embeddings = HuggingFaceEmbedding(
model_name="sentence-transformers/all-MiniLM-L6-v2",
cache_dir="./embedding_cache"
)
# Create pipeline
pipeline = EmbeddingPipeline(
embedding_model=openai_embeddings, # Use OpenAI as in your notebook
batch_size=50,
cache_dir="./embedding_cache"
)
return pipeline, openai_embeddings, hf_embeddingsVector Storage Systems
Comprehensive Vector Database Implementation
Based on your retrieval patterns, here's a complete vector storage system:
from typing import List, Dict, Any, Optional, Tuple, Union
from abc import ABC, abstractmethod
import numpy as np
import json
import os
from dataclasses import dataclass
@dataclass
class VectorSearchResult:
"""Result from vector search"""
document: Document
score: float
metadata: Dict[str, Any]
class BaseVectorStore(ABC):
"""Abstract base class for vector stores"""
@abstractmethod
def add_documents(
self,
documents: List[Document],
embeddings: Optional[List[List[float]]] = None,
ids: Optional[List[str]] = None
) -> List[str]:
"""Add documents to the vector store"""
pass
@abstractmethod
def similarity_search(
self,
query_embedding: List[float],
k: int = 5,
filter: Optional[Dict[str, Any]] = None
) -> List[VectorSearchResult]:
"""Search for similar documents"""
pass
@abstractmethod
def similarity_search_with_score(
self,
query_embedding: List[float],
k: int = 5,
filter: Optional[Dict[str, Any]] = None
) -> List[Tuple[Document, float]]:
"""Search with scores"""
pass
@abstractmethod
def delete(self, ids: List[str]) -> bool:
"""Delete documents by ID"""
pass
@abstractmethod
def update_documents(
self,
ids: List[str],
documents: List[Document]
) -> bool:
"""Update existing documents"""
pass
class ChromaVectorStore(BaseVectorStore):
"""Chroma vector store implementation"""
def __init__(
self,
collection_name: str,
persist_directory: Optional[str] = None,
embedding_function: Optional[Any] = None,
collection_metadata: Optional[Dict] = None,
client_settings: Optional[Dict] = None
):
try:
import chromadb
from chromadb.config import Settings
except ImportError:
raise ImportError("Please install chromadb: pip install chromadb")
# Initialize client
if persist_directory:
if client_settings:
settings = Settings(**client_settings)
else:
settings = Settings(persist_directory=persist_directory)
self.client = chromadb.PersistentClient(settings=settings)
else:
self.client = chromadb.Client()
self.collection_name = collection_name
self.embedding_function = embedding_function
# Get or create collection
self.collection = self.client.get_or_create_collection(
name=collection_name,
embedding_function=embedding_function,
metadata=collection_metadata
)
def add_documents(
self,
documents: List[Document],
embeddings: Optional[List[List[float]]] = None,
ids: Optional[List[str]] = None
) -> List[str]:
"""Add documents to Chroma collection"""
if not documents:
return []
# Generate IDs if not provided
if ids is None:
import uuid
ids = [str(uuid.uuid4()) for _ in documents]
# Prepare data
texts = [doc.page_content for doc in documents]
metadatas = []
for doc in documents:
metadata = doc.metadata.copy()
# Remove embedding from metadata to avoid duplication
metadata.pop('embedding', None)
# Ensure all metadata values are JSON serializable
clean_metadata = self._clean_metadata(metadata)
metadatas.append(clean_metadata)
# Add to collection
try:
if embeddings is not None:
self.collection.add(
documents=texts,
embeddings=embeddings,
metadatas=metadatas,
ids=ids
)
else:
# Let Chroma generate embeddings
self.collection.add(
documents=texts,
metadatas=metadatas,
ids=ids
)
except Exception as e:
logger.error(f"Error adding documents to Chroma: {e}")
raise
return ids
def similarity_search(
self,
query_embedding: List[float],
k: int = 5,
filter: Optional[Dict[str, Any]] = None
) -> List[VectorSearchResult]:
"""Search for similar documents"""
try:
results = self.collection.query(
query_embeddings=[query_embedding],
n_results=k,
where=filter,
include=["documents", "metadatas", "distances"]
)
search_results = []
if results["documents"] and results["documents"][0]:
for i in range(len(results["documents"][0])):
doc = Document(
page_content=results["documents"][0][i],
metadata=results["metadatas"][0][i] or {}
)
# Convert distance to similarity score (1 - distance)
distance = results["distances"][0][i] if results["distances"] else 0
score = 1 - distance
search_results.append(VectorSearchResult(
document=doc,
score=score,
metadata={"id": results["ids"][0][i], "distance": distance}
))
return search_results
except Exception as e:
logger.error(f"Error searching in Chroma: {e}")
raise
def similarity_search_with_score(
self,
query_embedding: List[float],
k: int = 5,
filter: Optional[Dict[str, Any]] = None
) -> List[Tuple[Document, float]]:
"""Search with scores in tuple format"""
results = self.similarity_search(query_embedding, k, filter)
return [(result.document, result.score) for result in results]
def delete(self, ids: List[str]) -> bool:
"""Delete documents by ID"""
try:
self.collection.delete(ids=ids)
return True
except Exception as e:
logger.error(f"Error deleting from Chroma: {e}")
return False
def update_documents(
self,
ids: List[str],
documents: List[Document]
) -> bool:
"""Update existing documents"""
try:
texts = [doc.page_content for doc in documents]
metadatas = [self._clean_metadata(doc.metadata) for doc in documents]
self.collection.update(
ids=ids,
documents=texts,
metadatas=metadatas
)
return True
except Exception as e:
logger.error(f"Error updating Chroma documents: {e}")
return False
def _clean_metadata(self, metadata: Dict[str, Any]) -> Dict[str, Any]:
"""Clean metadata for Chroma compatibility"""
clean_metadata = {}
for key, value in metadata.items():
# Convert non-JSON serializable types
if isinstance(value, (str, int, float, bool, type(None))):
clean_metadata[key] = value
elif isinstance(value, (list, dict)):
try:
# Test if it's JSON serializable
json.dumps(value)
clean_metadata[key] = value
except (TypeError, ValueError):
clean_metadata[key] = str(value)
else:
clean_metadata[key] = str(value)
return clean_metadata
def get_collection_info(self) -> Dict[str, Any]:
"""Get collection information"""
try:
count = self.collection.count()
return {
"name": self.collection_name,
"count": count,
"metadata": self.collection.metadata
}
except Exception as e:
logger.error(f"Error getting collection info: {e}")
return {}
class FAISSVectorStore(BaseVectorStore):
"""FAISS vector store implementation"""
def __init__(
self,
embedding_dimension: int,
index_type: str = "IndexFlatIP",
normalize_L2: bool = True,
save_path: Optional[str] = None
):
try:
import faiss
except ImportError:
raise ImportError("Please install faiss: pip install faiss-cpu or faiss-gpu")
self.dimension = embedding_dimension
self.normalize_L2 = normalize_L2
self.save_path = save_path
# Create FAISS index
if index_type == "IndexFlatIP":
self.index = faiss.IndexFlatIP(embedding_dimension)
elif index_type == "IndexFlatL2":
self.index = faiss.IndexFlatL2(embedding_dimension)
elif index_type == "IndexHNSW":
self.index = faiss.IndexHNSWFlat(embedding_dimension, 32)
else:
raise ValueError(f"Unsupported index type: {index_type}")
# Storage for documents and metadata
self.documents = []
self.metadatas = []
self.id_to_index = {}
self.index_to_id = {}
# Load existing index if path provided
if save_path and os.path.exists(save_path):
self.load_index()
def add_documents(
self,
documents: List[Document],
embeddings: Optional[List[List[float]]] = None,
ids: Optional[List[str]] = None
) -> List[str]:
"""Add documents to FAISS index"""
if embeddings is None:
raise ValueError("FAISS requires embeddings to be provided")
# Generate IDs if not provided
if ids is None:
import uuid
ids = [str(uuid.uuid4()) for _ in documents]
# Convert embeddings to numpy array
embeddings_array = np.array(embeddings).astype('float32')
if self.normalize_L2:
faiss.normalize_L2(embeddings_array)
# Add to FAISS index
start_index = len(self.documents)
self.index.add(embeddings_array)
# Store documents and metadata
for i, (doc, doc_id) in enumerate(zip(documents, ids)):
index_pos = start_index + i
self.documents.append(doc)
self.metadatas.append(doc.metadata)
self.id_to_index[doc_id] = index_pos
self.index_to_id[index_pos] = doc_id
# Save index if path provided
if self.save_path:
self.save_index()
return ids
def similarity_search(
self,
query_embedding: List[float],
k: int = 5,
filter: Optional[Dict[str, Any]] = None
) -> List[VectorSearchResult]:
"""Search for similar documents"""
if self.index.ntotal == 0:
return []
# Prepare query embedding
query_array = np.array([query_embedding]).astype('float32')
if self.normalize_L2:
faiss.normalize_L2(query_array)
# Search
scores, indices = self.index.search(query_array, min(k, self.index.ntotal))
results = []
for score, idx in zip(scores[0], indices[0]):
if idx == -1: # FAISS returns -1 for invalid indices
continue
doc = self.documents[idx]
metadata = self.metadatas[idx].copy()
doc_id = self.index_to_id.get(idx, str(idx))
# Apply filter if provided
if filter:
if not self._matches_filter(metadata, filter):
continue
results.append(VectorSearchResult(
document=doc,
score=float(score),
metadata={"id": doc_id, "index": int(idx)}
))
return results
def similarity_search_with_score(
self,
query_embedding: List[float],
k: int = 5,
filter: Optional[Dict[str, Any]] = None
) -> List[Tuple[Document, float]]:
"""Search with scores in tuple format"""
results = self.similarity_search(query_embedding, k, filter)
return [(result.document, result.score) for result in results]
def delete(self, ids: List[str]) -> bool:
"""Delete documents by ID (FAISS doesn't support deletion, so we mark as deleted)"""
logger.warning("FAISS doesn't support deletion. Consider rebuilding the index.")
return False
def update_documents(self, ids: List[str], documents: List[Document]) -> bool:
"""Update documents (FAISS doesn't support updates)"""
logger.warning("FAISS doesn't support updates. Consider rebuilding the index.")
return False
def _matches_filter(self, metadata: Dict[str, Any], filter: Dict[str, Any]) -> bool:
"""Check if metadata matches filter"""
for key, value in filter.items():
if key not in metadata or metadata[key] != value:
return False
return True
def save_index(self):
"""Save FAISS index to disk"""
if not self.save_path:
return
import faiss
import pickle
# Save FAISS index
faiss.write_index(self.index, f"{self.save_path}.index")
# Save documents and metadata
with open(f"{self.save_path}.pkl", "wb") as f:
pickle.dump({
"documents": self.documents,
"metadatas": self.metadatas,
"id_to_index": self.id_to_index,
"index_to_id": self.index_to_id
}, f)
def load_index(self):
"""Load FAISS index from disk"""
if not self.save_path:
return
import faiss
import pickle
try:
# Load FAISS index
self.index = faiss.read_index(f"{self.save_path}.index")
# Load documents and metadata
with open(f"{self.save_path}.pkl", "rb") as f:
data = pickle.load(f)
self.documents = data["documents"]
self.metadatas = data["metadatas"]
self.id_to_index = data["id_to_index"]
self.index_to_id = data["index_to_id"]
except Exception as e:
logger.error(f"Error loading FAISS index: {e}")
class VectorStoreManager:
"""Manager for vector store operations"""
def __init__(self, vector_store: BaseVectorStore, embedding_model: BaseEmbedding):
self.vector_store = vector_store
self.embedding_model = embedding_model
def add_documents(self, documents: List[Document]) -> List[str]:
"""Add documents with automatic embedding generation"""
# Check if documents already have embeddings
texts_to_embed = []
embeddings = []
for doc in documents:
if 'embedding' in doc.metadata:
embeddings.append(doc.metadata['embedding'])
else:
texts_to_embed.append(doc.page_content)
embeddings.append(None)
# Generate missing embeddings
if texts_to_embed:
new_embeddings = self.embedding_model.embed_documents(texts_to_embed)
embed_idx = 0
for i, embedding in enumerate(embeddings):
if embedding is None:
embeddings[i] = new_embeddings[embed_idx]
embed_idx += 1
return self.vector_store.add_documents(documents, embeddings)
def similarity_search(
self,
query: str,
k: int = 5,
filter: Optional[Dict[str, Any]] = None
) -> List[VectorSearchResult]:
"""Search using text query"""
query_embedding = self.embedding_model.embed_query(query)
return self.vector_store.similarity_search(query_embedding, k, filter)
def hybrid_search(
self,
query: str,
k: int = 5,
alpha: float = 0.7,
keyword_search_func: Optional[callable] = None
) -> List[VectorSearchResult]:
"""Hybrid search combining semantic and keyword search"""
# Semantic search
semantic_results = self.similarity_search(query, k * 2) # Get more results for reranking
if keyword_search_func:
# Keyword search
keyword_results = keyword_search_func(query, k * 2)
# Combine results with weighted scores
combined_results = self._combine_search_results(
semantic_results,
keyword_results,
alpha
)
return combined_results[:k]
else:
return semantic_results[:k]
def _combine_search_results(
self,
semantic_results: List[VectorSearchResult],
keyword_results: List[VectorSearchResult],
alpha: float
) -> List[VectorSearchResult]:
"""Combine semantic and keyword search results"""
# Create mapping of document content to results
semantic_map = {result.document.page_content: result for result in semantic_results}
keyword_map = {result.document.page_content: result for result in keyword_results}
# Get all unique documents
all_docs = set(semantic_map.keys()) | set(keyword_map.keys())
combined_results = []
for doc_content in all_docs:
semantic_score = semantic_map.get(doc_content, VectorSearchResult(None, 0.0, {})).score
keyword_score = keyword_map.get(doc_content, VectorSearchResult(None, 0.0, {})).score
# Weighted combination
combined_score = alpha * semantic_score + (1 - alpha) * keyword_score
# Use document from semantic results preferentially
if doc_content in semantic_map:
result = semantic_map[doc_content]
else:
result = keyword_map[doc_content]
combined_results.append(VectorSearchResult(
document=result.document,
score=combined_score,
metadata=result.metadata
))
# Sort by combined score
combined_results.sort(key=lambda x: x.score, reverse=True)
return combined_results
# Usage example based on your materials
def create_vector_store_system():
"""Create vector store system as shown in your examples"""
# Create embedding model
embedding_model = OpenAIEmbedding(
model="text-embedding-3-small",
cache_dir="./embedding_cache"
)
# Create vector store (Chroma as in your examples)
vector_store = ChromaVectorStore(
collection_name="rag_documents",
persist_directory="./chroma_db"
)
# Create manager
manager = VectorStoreManager(vector_store, embedding_model)
return manager, vector_store, embedding_modelRetrieval Strategies
Advanced Retrieval Implementation
from typing import List, Dict, Any, Optional, Tuple, Union, Callable
from dataclasses import dataclass
from enum import Enum
import numpy as np
class RetrievalMode(Enum):
"""Retrieval modes"""
DENSE = "dense"
SPARSE = "sparse"
HYBRID = "hybrid"
MULTI_QUERY = "multi_query"
CONTEXTUAL_COMPRESSION = "contextual_compression"
@dataclass
class RetrievalConfig:
"""Configuration for retrieval operations"""
mode: RetrievalMode = RetrievalMode.HYBRID
k: int = 5
fetch_k: int = 20 # Fetch more for reranking
alpha: float = 0.7 # Weight for dense vs sparse in hybrid mode
rerank: bool = True
rerank_model: Optional[str] = None
filters: Optional[Dict[str, Any]] = None
similarity_threshold: float = 0.0
diversity_lambda: float = 0.5 # For MMR diversity
enable_mmr: bool = False
class AdvancedRetriever:
"""Advanced retrieval system with multiple strategies"""
def __init__(
self,
vector_store_manager: VectorStoreManager,
sparse_retriever: Optional[Any] = None,
reranker: Optional[Any] = None
):
self.vector_manager = vector_store_manager
self.sparse_retriever = sparse_retriever
self.reranker = reranker
def retrieve(
self,
query: str,
config: RetrievalConfig
) -> List[VectorSearchResult]:
"""Main retrieval method with multiple strategies"""
if config.mode == RetrievalMode.DENSE:
return self._dense_retrieve(query, config)
elif config.mode == RetrievalMode.SPARSE:
return self._sparse_retrieve(query, config)
elif config.mode == RetrievalMode.HYBRID:
return self._hybrid_retrieve(query, config)
elif config.mode == RetrievalMode.MULTI_QUERY:
return self._multi_query_retrieve(query, config)
elif config.mode == RetrievalMode.CONTEXTUAL_COMPRESSION:
return self._contextual_compression_retrieve(query, config)
else:
raise ValueError(f"Unsupported retrieval mode: {config.mode}")
def _dense_retrieve(
self,
query: str,
config: RetrievalConfig
) -> List[VectorSearchResult]:
"""Dense (semantic) retrieval"""
k = config.fetch_k if config.rerank else config.k
results = self.vector_manager.similarity_search(
query=query,
k=k,
filter=config.filters
)
# Apply similarity threshold
results = [r for r in results if r.score >= config.similarity_threshold]
# Apply reranking
if config.rerank and self.reranker and len(results) > config.k:
results = self.reranker.rerank(query, results, config.k)
# Apply MMR for diversity
if config.enable_mmr:
results = self._apply_mmr(query, results, config.k, config.diversity_lambda)
return results[:config.k]
def _sparse_retrieve(
self,
query: str,
config: RetrievalConfig
) -> List[VectorSearchResult]:
"""Sparse (keyword) retrieval"""
if not self.sparse_retriever:
raise ValueError("Sparse retriever not configured")
k = config.fetch_k if config.rerank else config.k
results = self.sparse_retriever.search(
query=query,
k=k,
filters=config.filters
)
# Convert to VectorSearchResult format if needed
if results and not isinstance(results[0], VectorSearchResult):
results = [
VectorSearchResult(
document=r[0] if isinstance(r, tuple) else r.document,
score=r[1] if isinstance(r, tuple) else r.score,
metadata=getattr(r, 'metadata', {})
)
for r in results
]
# Apply reranking
if config.rerank and self.reranker and len(results) > config.k:
results = self.reranker.rerank(query, results, config.k)
return results[:config.k]
def _hybrid_retrieve(
self,
query: str,
config: RetrievalConfig
) -> List[VectorSearchResult]:
"""Hybrid retrieval combining dense and sparse methods"""
# Get dense results
dense_results = self._dense_retrieve(
query,
RetrievalConfig(
mode=RetrievalMode.DENSE,
k=config.fetch_k,
rerank=False,
filters=config.filters,
similarity_threshold=config.similarity_threshold
)
)
# Get sparse results if available
sparse_results = []
if self.sparse_retriever:
sparse_results = self._sparse_retrieve(
query,
RetrievalConfig(
mode=RetrievalMode.SPARSE,
k=config.fetch_k,
rerank=False,
filters=config.filters
)
)
# Combine results
combined_results = self._combine_results(
dense_results,
sparse_results,
config.alpha
)
# Apply reranking
if config.rerank and self.reranker and len(combined_results) > config.k:
combined_results = self.reranker.rerank(query, combined_results, config.k)
# Apply MMR for diversity
if config.enable_mmr:
combined_results = self._apply_mmr(
query,
combined_results,
config.k,
config.diversity_lambda
)
return combined_results[:config.k]
def _multi_query_retrieve(
self,
query: str,
config: RetrievalConfig
) -> List[VectorSearchResult]:
"""Multi-query retrieval using query expansion"""
# Generate query variations
query_variations = self._generate_query_variations(query)
all_results = []
# Retrieve for each query variation
for q in query_variations:
results = self._dense_retrieve(
q,
RetrievalConfig(
mode=RetrievalMode.DENSE,
k=config.fetch_k // len(query_variations),
rerank=False,
filters=config.filters
)
)
all_results.extend(results)
# Remove duplicates and merge scores
merged_results = self._merge_duplicate_results(all_results)
# Sort by score and take top k
merged_results.sort(key=lambda x: x.score, reverse=True)
# Apply reranking
if config.rerank and self.reranker and len(merged_results) > config.k:
merged_results = self.reranker.rerank(query, merged_results, config.k)
return merged_results[:config.k]
def _contextual_compression_retrieve(
self,
query: str,
config: RetrievalConfig
) -> List[VectorSearchResult]:
"""Contextual compression retrieval"""
# First, get more results than needed
initial_results = self._dense_retrieve(
query,
RetrievalConfig(
mode=RetrievalMode.DENSE,
k=config.fetch_k,
rerank=False,
filters=config.filters
)
)
# Apply contextual compression (filter irrelevant content)
compressed_results = self._apply_contextual_compression(query, initial_results)
# Apply reranking
if config.rerank and self.reranker:
compressed_results = self.reranker.rerank(query, compressed_results, config.k)
return compressed_results[:config.k]
def _combine_results(
self,
dense_results: List[VectorSearchResult],
sparse_results: List[VectorSearchResult],
alpha: float
) -> List[VectorSearchResult]:
"""Combine dense and sparse results with weighted scoring"""
if not sparse_results:
return dense_results
# Create document content to result mapping
dense_map = {r.document.page_content: r for r in dense_results}
sparse_map = {r.document.page_content: r for r in sparse_results}
# Get all unique documents
all_docs = set(dense_map.keys()) | set(sparse_map.keys())
combined_results = []
for doc_content in all_docs:
dense_score = dense_map.get(doc_content, VectorSearchResult(None, 0.0, {})).score
sparse_score = sparse_map.get(doc_content, VectorSearchResult(None, 0.0, {})).score
# Normalize scores to [0, 1] range
dense_score = max(0, min(1, dense_score))
sparse_score = max(0, min(1, sparse_score))
# Weighted combination
combined_score = alpha * dense_score + (1 - alpha) * sparse_score
# Use document from dense results preferentially
if doc_content in dense_map:
result = dense_map[doc_content]
else:
result = sparse_map[doc_content]
combined_results.append(VectorSearchResult(
document=result.document,
score=combined_score,
metadata=result.metadata
))
# Sort by combined score
combined_results.sort(key=lambda x: x.score, reverse=True)
return combined_results
def _apply_mmr(
self,
query: str,
results: List[VectorSearchResult],
k: int,
lambda_param: float
) -> List[VectorSearchResult]:
"""Apply Maximal Marginal Relevance for diversity"""
if len(results) <= k:
return results
# Get query embedding for relevance calculation
query_embedding = self.vector_manager.embedding_model.embed_query(query)
# Get embeddings for all documents
doc_embeddings = []
for result in results:
if 'embedding' in result.document.metadata:
doc_embeddings.append(result.document.metadata['embedding'])
else:
# Generate embedding if not available
embedding = self.vector_manager.embedding_model.embed_query(
result.document.page_content
)
doc_embeddings.append(embedding)
# MMR algorithm
selected_indices = []
remaining_indices = list(range(len(results)))
# Select first document (highest relevance)
if remaining_indices:
first_idx = 0 # Assuming results are already sorted by relevance
selected_indices.append(first_idx)
remaining_indices.remove(first_idx)
# Select remaining documents using MMR
while len(selected_indices) < k and remaining_indices:
mmr_scores = []
for idx in remaining_indices:
# Relevance score (similarity to query)
relevance = self._cosine_similarity(query_embedding, doc_embeddings[idx])
# Diversity score (max similarity to selected documents)
if selected_indices:
max_similarity = max(
self._cosine_similarity(doc_embeddings[idx], doc_embeddings[sel_idx])
for sel_idx in selected_indices
)
else:
max_similarity = 0
# MMR score
mmr_score = lambda_param * relevance - (1 - lambda_param) * max_similarity
mmr_scores.append((idx, mmr_score))
# Select document with highest MMR score
best_idx, _ = max(mmr_scores, key=lambda x: x[1])
selected_indices.append(best_idx)
remaining_indices.remove(best_idx)
return [results[i] for i in selected_indices]
def _generate_query_variations(self, query: str, num_variations: int = 3) -> List[str]:
"""Generate query variations for multi-query retrieval"""
# This is a simplified implementation
# In practice, you'd use an LLM to generate variations
variations = [query]
# Simple variations (you can enhance this with LLM-based generation)
words = query.split()
if len(words) > 1:
# Reorder words
variations.append(" ".join(reversed(words)))
# Add synonyms or related terms (simplified)
synonym_map = {
"what": "which",
"how": "in what way",
"where": "at what location",
"when": "at what time"
}
for i, word in enumerate(words):
if word.lower() in synonym_map:
new_words = words.copy()
new_words[i] = synonym_map[word.lower()]
variations.append(" ".join(new_words))
return variations[:num_variations]
def _merge_duplicate_results(
self,
results: List[VectorSearchResult]
) -> List[VectorSearchResult]:
"""Merge duplicate results by combining scores"""
doc_to_result = {}
for result in results:
doc_content = result.document.page_content
if doc_content in doc_to_result:
# Combine scores (take maximum)
existing_result = doc_to_result[doc_content]
if result.score > existing_result.score:
doc_to_result[doc_content] = result
else:
doc_to_result[doc_content] = result
return list(doc_to_result.values())
def _apply_contextual_compression(
self,
query: str,
results: List[VectorSearchResult]
) -> List[VectorSearchResult]:
"""Apply contextual compression to filter irrelevant content"""
# This is a simplified implementation
# In practice, you'd use a compression model to filter content
compressed_results = []
for result in results:
# Simple relevance check based on keyword overlap
query_words = set(query.lower().split())
doc_words = set(result.document.page_content.lower().split())
# Calculate overlap ratio
overlap = len(query_words & doc_words)
total_unique = len(query_words | doc_words)
if total_unique > 0:
relevance_ratio = overlap / len(query_words)
# Keep result if it has sufficient relevance
if relevance_ratio > 0.1: # Threshold can be adjusted
compressed_results.append(result)
return compressed_results
def _cosine_similarity(self, vec1: List[float], vec2: List[float]) -> float:
"""Calculate cosine similarity between two vectors"""
vec1 = np.array(vec1)
vec2 = np.array(vec2)
dot_product = np.dot(vec1, vec2)
norm_a = np.linalg.norm(vec1)
norm_b = np.linalg.norm(vec2)
if norm_a == 0 or norm_b == 0:
return 0.0
return dot_product / (norm_a * norm_b)
class CrossEncoderReranker:
"""Cross-encoder reranker for improving retrieval results"""
def __init__(
self,
model_name: str = "cross-encoder/ms-marco-MiniLM-L-6-v2",
device: Optional[str] = None
):
try:
from sentence_transformers import CrossEncoder
except ImportError:
raise ImportError("Please install sentence-transformers for reranking")
self.model = CrossEncoder(model_name, device=device)
def rerank(
self,
query: str,
results: List[VectorSearchResult],
top_k: int
) -> List[VectorSearchResult]:
"""Rerank results using cross-encoder"""
if len(results) <= top_k:
return results
# Prepare query-document pairs
pairs = [
(query, result.document.page_content)
for result in results
]
# Get cross-encoder scores
scores = self.model.predict(pairs)
# Update results with new scores
reranked_results = []
for i, result in enumerate(results):
new_result = VectorSearchResult(
document=result.document,
score=float(scores[i]),
metadata=result.metadata
)
reranked_results.append(new_result)
# Sort by new scores
reranked_results.sort(key=lambda x: x.score, reverse=True)
return reranked_results[:top_k]
# Usage example
def create_advanced_retrieval_system():
"""Create advanced retrieval system"""
# Create vector store manager
manager, vector_store, embedding_model = create_vector_store_system()
# Create reranker
reranker = CrossEncoderReranker()
# Create advanced retriever
retriever = AdvancedRetriever(
vector_store_manager=manager,
reranker=reranker
)
return retriever, managerThis comprehensive implementation provides:
- Multiple embedding models with caching and optimization
- Vector storage systems supporting Chroma and FAISS
- Advanced retrieval strategies including hybrid, multi-query, and MMR
- Reranking capabilities using cross-encoder models
- Performance optimizations including batching and caching
- Comprehensive error handling and logging
In Chapter 5: Production Deployment, we'll cover deployment strategies, monitoring, and optimization techniques for production RAG systems.