Module 2: RAG
Chapter 4: Advanced Techniques

Chapter 4: Advanced RAG Techniques

This chapter explores advanced techniques and optimizations for RAG systems, including embeddings, vector storage, retrieval strategies, and evaluation methods based on your comprehensive materials.

Embedding Systems

Advanced Embedding Implementation

Building on your model I/O specifications, here's a comprehensive embedding system:

import numpy as np
import os
import hashlib
import pickle
import time
from typing import List, Dict, Any, Optional, Union, Tuple
from abc import ABC, abstractmethod
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass
import logging
 
logger = logging.getLogger(__name__)
 
@dataclass
class EmbeddingConfig:
    """Configuration for embedding models"""
    model_name: str
    dimension: int
    max_sequence_length: int
    batch_size: int = 32
    device: Optional[str] = None
    normalize_embeddings: bool = True
    cache_embeddings: bool = True
 
class BaseEmbedding(ABC):
    """Abstract base class for embedding models"""
 
    @abstractmethod
    def embed_documents(self, texts: List[str]) -> List[List[float]]:
        """Embed a list of documents"""
        pass
 
    @abstractmethod
    def embed_query(self, text: str) -> List[float]:
        """Embed a single query"""
        pass
 
    @property
    @abstractmethod
    def dimension(self) -> int:
        """Get embedding dimension"""
        pass
 
class OpenAIEmbedding(BaseEmbedding):
    """OpenAI Embeddings API wrapper with advanced features"""
 
    def __init__(
        self,
        model: str = "text-embedding-3-small",
        api_key: Optional[str] = None,
        api_base: Optional[str] = None,
        batch_size: int = 100,
        max_retries: int = 3,
        request_timeout: int = 60,
        dimensions: Optional[int] = None,
        embedding_ctx_length: int = 8192,
        cache_dir: Optional[str] = None
    ):
        try:
            import openai
        except ImportError:
            raise ImportError("Please install openai: pip install openai")
 
        self.client = openai.OpenAI(
            api_key=api_key or os.getenv("OPENAI_API_KEY"),
            base_url=api_base
        )
 
        self.model = model
        self.batch_size = batch_size
        self.max_retries = max_retries
        self.request_timeout = request_timeout
        self.dimensions = dimensions
        self.embedding_ctx_length = embedding_ctx_length
        self.cache_dir = cache_dir
 
        # Model dimensions mapping
        self._model_dimensions = {
            "text-embedding-3-small": 1536,
            "text-embedding-3-large": 3072,
            "text-embedding-ada-002": 1536
        }
 
        self._cache = {}
        if self.cache_dir:
            self._load_cache()
 
    @property
    def dimension(self) -> int:
        """Get embedding dimension"""
        if self.dimensions:
            return self.dimensions
        return self._model_dimensions.get(self.model, 1536)
 
    def embed_documents(self, texts: List[str]) -> List[List[float]]:
        """Embed multiple documents with batching and caching"""
        all_embeddings = []
 
        # Process in batches
        for i in range(0, len(texts), self.batch_size):
            batch = texts[i:i + self.batch_size]
            batch_embeddings = self._embed_batch(batch)
            all_embeddings.extend(batch_embeddings)
 
        return all_embeddings
 
    def embed_query(self, text: str) -> List[float]:
        """Embed single query"""
        return self._embed_batch([text])[0]
 
    def _embed_batch(self, texts: List[str]) -> List[List[float]]:
        """Embed a batch of texts with caching and retry logic"""
        # Check cache first
        cache_keys = [self._get_cache_key(text) for text in texts]
        cached_embeddings = []
        texts_to_embed = []
        cache_indices = []
 
        for i, (text, cache_key) in enumerate(zip(texts, cache_keys)):
            if cache_key in self._cache:
                cached_embeddings.append((i, self._cache[cache_key]))
            else:
                texts_to_embed.append(text)
                cache_indices.append(i)
 
        # Generate embeddings for uncached texts
        new_embeddings = []
        if texts_to_embed:
            new_embeddings = self._call_api(texts_to_embed)
 
            # Update cache
            for text, embedding in zip(texts_to_embed, new_embeddings):
                cache_key = self._get_cache_key(text)
                self._cache[cache_key] = embedding
 
        # Combine cached and new embeddings in correct order
        result = [None] * len(texts)
 
        # Add cached embeddings
        for idx, embedding in cached_embeddings:
            result[idx] = embedding
 
        # Add new embeddings
        for cache_idx, embedding in zip(cache_indices, new_embeddings):
            result[cache_idx] = embedding
 
        # Save cache if updated
        if new_embeddings and self.cache_dir:
            self._save_cache()
 
        return result
 
    def _call_api(self, texts: List[str]) -> List[List[float]]:
        """Call OpenAI API with retry logic"""
        for attempt in range(self.max_retries):
            try:
                # Prepare API call parameters
                kwargs = {
                    "model": self.model,
                    "input": texts,
                    "timeout": self.request_timeout
                }
 
                if self.dimensions:
                    kwargs["dimensions"] = self.dimensions
 
                response = self.client.embeddings.create(**kwargs)
                return [item.embedding for item in response.data]
 
            except Exception as e:
                if attempt == self.max_retries - 1:
                    logger.error(f"Failed to get embeddings after {self.max_retries} attempts: {e}")
                    raise
 
                # Exponential backoff
                wait_time = (2 ** attempt) + np.random.uniform(0, 1)
                logger.warning(f"API call failed (attempt {attempt + 1}), retrying in {wait_time:.2f}s: {e}")
                time.sleep(wait_time)
 
    def _get_cache_key(self, text: str) -> str:
        """Generate cache key for text"""
        key_string = f"{self.model}:{self.dimensions}:{text}"
        return hashlib.sha256(key_string.encode()).hexdigest()
 
    def _load_cache(self):
        """Load embedding cache from disk"""
        if not self.cache_dir:
            return
 
        cache_file = os.path.join(self.cache_dir, f"openai_embedding_cache_{self.model}.pkl")
        if os.path.exists(cache_file):
            try:
                with open(cache_file, "rb") as f:
                    self._cache = pickle.load(f)
                logger.info(f"Loaded {len(self._cache)} cached embeddings")
            except Exception as e:
                logger.warning(f"Failed to load cache: {e}")
                self._cache = {}
 
    def _save_cache(self):
        """Save embedding cache to disk"""
        if not self.cache_dir:
            return
 
        os.makedirs(self.cache_dir, exist_ok=True)
        cache_file = os.path.join(self.cache_dir, f"openai_embedding_cache_{self.model}.pkl")
 
        try:
            with open(cache_file, "wb") as f:
                pickle.dump(self._cache, f)
        except Exception as e:
            logger.warning(f"Failed to save cache: {e}")
 
class HuggingFaceEmbedding(BaseEmbedding):
    """HuggingFace SentenceTransformers embedding with advanced features"""
 
    def __init__(
        self,
        model_name: str = "sentence-transformers/all-MiniLM-L6-v2",
        device: Optional[str] = None,
        model_kwargs: Optional[Dict] = None,
        encode_kwargs: Optional[Dict] = None,
        multi_process: bool = False,
        normalize_embeddings: bool = True,
        cache_dir: Optional[str] = None
    ):
        try:
            from sentence_transformers import SentenceTransformer
        except ImportError:
            raise ImportError("Please install sentence-transformers: pip install sentence-transformers")
 
        self.model_name = model_name
        self.model_kwargs = model_kwargs or {}
        self.encode_kwargs = encode_kwargs or {}
        self.multi_process = multi_process
        self.normalize_embeddings = normalize_embeddings
        self.cache_dir = cache_dir
 
        # Initialize model
        self.model = SentenceTransformer(
            model_name,
            device=device,
            cache_folder=cache_dir,
            **self.model_kwargs
        )
 
        self._cache = {}
        if self.cache_dir:
            self._load_cache()
 
    @property
    def dimension(self) -> int:
        """Get embedding dimension"""
        return self.model.get_sentence_embedding_dimension()
 
    def embed_documents(self, texts: List[str]) -> List[List[float]]:
        """Embed multiple documents"""
        # Check cache
        cached_results = []
        texts_to_embed = []
        text_indices = []
 
        for i, text in enumerate(texts):
            cache_key = self._get_cache_key(text)
            if cache_key in self._cache:
                cached_results.append((i, self._cache[cache_key]))
            else:
                texts_to_embed.append(text)
                text_indices.append(i)
 
        # Generate new embeddings
        new_embeddings = []
        if texts_to_embed:
            embeddings = self.model.encode(
                texts_to_embed,
                normalize_embeddings=self.normalize_embeddings,
                **self.encode_kwargs
            )
 
            if isinstance(embeddings, np.ndarray):
                new_embeddings = embeddings.tolist()
            else:
                new_embeddings = embeddings
 
            # Update cache
            for text, embedding in zip(texts_to_embed, new_embeddings):
                cache_key = self._get_cache_key(text)
                self._cache[cache_key] = embedding
 
        # Combine results
        result = [None] * len(texts)
 
        # Add cached results
        for idx, embedding in cached_results:
            result[idx] = embedding
 
        # Add new results
        for text_idx, embedding in zip(text_indices, new_embeddings):
            result[text_idx] = embedding
 
        # Save cache
        if new_embeddings and self.cache_dir:
            self._save_cache()
 
        return result
 
    def embed_query(self, text: str) -> List[float]:
        """Embed single query"""
        cache_key = self._get_cache_key(text)
        if cache_key in self._cache:
            return self._cache[cache_key]
 
        embedding = self.model.encode(
            [text],
            normalize_embeddings=self.normalize_embeddings,
            **self.encode_kwargs
        )[0]
 
        if isinstance(embedding, np.ndarray):
            embedding = embedding.tolist()
 
        # Cache result
        self._cache[cache_key] = embedding
        if self.cache_dir:
            self._save_cache()
 
        return embedding
 
    def _get_cache_key(self, text: str) -> str:
        """Generate cache key for text"""
        key_string = f"{self.model_name}:{self.normalize_embeddings}:{text}"
        return hashlib.sha256(key_string.encode()).hexdigest()
 
    def _load_cache(self):
        """Load embedding cache"""
        cache_file = os.path.join(self.cache_dir, f"hf_embedding_cache_{self.model_name.replace('/', '_')}.pkl")
        if os.path.exists(cache_file):
            try:
                with open(cache_file, "rb") as f:
                    self._cache = pickle.load(f)
                logger.info(f"Loaded {len(self._cache)} cached embeddings")
            except Exception as e:
                logger.warning(f"Failed to load cache: {e}")
                self._cache = {}
 
    def _save_cache(self):
        """Save embedding cache"""
        if not self.cache_dir:
            return
 
        os.makedirs(self.cache_dir, exist_ok=True)
        cache_file = os.path.join(self.cache_dir, f"hf_embedding_cache_{self.model_name.replace('/', '_')}.pkl")
 
        try:
            with open(cache_file, "wb") as f:
                pickle.dump(self._cache, f)
        except Exception as e:
            logger.warning(f"Failed to save cache: {e}")
 
class EmbeddingPipeline:
    """Complete embedding pipeline with optimization and monitoring"""
 
    def __init__(
        self,
        embedding_model: BaseEmbedding,
        batch_size: int = 100,
        max_workers: int = 4,
        cache_dir: Optional[str] = None,
        enable_monitoring: bool = True
    ):
        self.embedding_model = embedding_model
        self.batch_size = batch_size
        self.max_workers = max_workers
        self.cache_dir = cache_dir
        self.enable_monitoring = enable_monitoring
 
        # Statistics
        self.stats = {
            "total_embeddings": 0,
            "cache_hits": 0,
            "api_calls": 0,
            "total_time": 0.0,
            "errors": 0
        }
 
    def embed_documents(self, documents: List[Document]) -> List[Document]:
        """Embed documents and add embeddings to metadata"""
        start_time = time.time()
 
        try:
            # Extract texts
            texts = [doc.page_content for doc in documents]
 
            # Generate embeddings
            embeddings = self.embedding_model.embed_documents(texts)
 
            # Add embeddings to documents
            embedded_docs = []
            for doc, embedding in zip(documents, embeddings):
                new_doc = Document(
                    page_content=doc.page_content,
                    metadata={**doc.metadata, "embedding": embedding}
                )
                embedded_docs.append(new_doc)
 
            # Update statistics
            if self.enable_monitoring:
                elapsed = time.time() - start_time
                self.stats["total_embeddings"] += len(embeddings)
                self.stats["total_time"] += elapsed
 
                logger.info(f"Generated {len(embeddings)} embeddings in {elapsed:.2f}s")
 
            return embedded_docs
 
        except Exception as e:
            self.stats["errors"] += 1
            logger.error(f"Error embedding documents: {e}")
            raise
 
    def embed_queries(self, queries: List[str]) -> List[List[float]]:
        """Embed multiple queries efficiently"""
        start_time = time.time()
 
        try:
            embeddings = []
 
            # Process in batches
            for i in range(0, len(queries), self.batch_size):
                batch = queries[i:i + self.batch_size]
                batch_embeddings = []
 
                for query in batch:
                    embedding = self.embedding_model.embed_query(query)
                    batch_embeddings.append(embedding)
 
                embeddings.extend(batch_embeddings)
 
            # Update statistics
            if self.enable_monitoring:
                elapsed = time.time() - start_time
                self.stats["total_embeddings"] += len(embeddings)
                self.stats["total_time"] += elapsed
 
            return embeddings
 
        except Exception as e:
            self.stats["errors"] += 1
            logger.error(f"Error embedding queries: {e}")
            raise
 
    def get_stats(self) -> Dict[str, Any]:
        """Get embedding statistics"""
        stats = self.stats.copy()
 
        if stats["total_embeddings"] > 0:
            stats["avg_time_per_embedding"] = stats["total_time"] / stats["total_embeddings"]
            stats["cache_hit_rate"] = stats["cache_hits"] / stats["total_embeddings"]
 
        return stats
 
    def clear_stats(self):
        """Clear statistics"""
        self.stats = {
            "total_embeddings": 0,
            "cache_hits": 0,
            "api_calls": 0,
            "total_time": 0.0,
            "errors": 0
        }
 
# Usage example based on your model_io.ipynb
def create_embedding_system():
    """Create embedding system as shown in model_io.ipynb"""
 
    # OpenAI embeddings (as in your notebook)
    openai_embeddings = OpenAIEmbedding(
        model="text-embedding-3-small",
        api_key=os.getenv("OPENAI_API_KEY"),
        batch_size=100,
        cache_dir="./embedding_cache"
    )
 
    # HuggingFace embeddings (alternative)
    hf_embeddings = HuggingFaceEmbedding(
        model_name="sentence-transformers/all-MiniLM-L6-v2",
        cache_dir="./embedding_cache"
    )
 
    # Create pipeline
    pipeline = EmbeddingPipeline(
        embedding_model=openai_embeddings,  # Use OpenAI as in your notebook
        batch_size=50,
        cache_dir="./embedding_cache"
    )
 
    return pipeline, openai_embeddings, hf_embeddings

Vector Storage Systems

Comprehensive Vector Database Implementation

Based on your retrieval patterns, here's a complete vector storage system:

from typing import List, Dict, Any, Optional, Tuple, Union
from abc import ABC, abstractmethod
import numpy as np
import json
import os
from dataclasses import dataclass
 
@dataclass
class VectorSearchResult:
    """Result from vector search"""
    document: Document
    score: float
    metadata: Dict[str, Any]
 
class BaseVectorStore(ABC):
    """Abstract base class for vector stores"""
 
    @abstractmethod
    def add_documents(
        self,
        documents: List[Document],
        embeddings: Optional[List[List[float]]] = None,
        ids: Optional[List[str]] = None
    ) -> List[str]:
        """Add documents to the vector store"""
        pass
 
    @abstractmethod
    def similarity_search(
        self,
        query_embedding: List[float],
        k: int = 5,
        filter: Optional[Dict[str, Any]] = None
    ) -> List[VectorSearchResult]:
        """Search for similar documents"""
        pass
 
    @abstractmethod
    def similarity_search_with_score(
        self,
        query_embedding: List[float],
        k: int = 5,
        filter: Optional[Dict[str, Any]] = None
    ) -> List[Tuple[Document, float]]:
        """Search with scores"""
        pass
 
    @abstractmethod
    def delete(self, ids: List[str]) -> bool:
        """Delete documents by ID"""
        pass
 
    @abstractmethod
    def update_documents(
        self,
        ids: List[str],
        documents: List[Document]
    ) -> bool:
        """Update existing documents"""
        pass
 
class ChromaVectorStore(BaseVectorStore):
    """Chroma vector store implementation"""
 
    def __init__(
        self,
        collection_name: str,
        persist_directory: Optional[str] = None,
        embedding_function: Optional[Any] = None,
        collection_metadata: Optional[Dict] = None,
        client_settings: Optional[Dict] = None
    ):
        try:
            import chromadb
            from chromadb.config import Settings
        except ImportError:
            raise ImportError("Please install chromadb: pip install chromadb")
 
        # Initialize client
        if persist_directory:
            if client_settings:
                settings = Settings(**client_settings)
            else:
                settings = Settings(persist_directory=persist_directory)
            self.client = chromadb.PersistentClient(settings=settings)
        else:
            self.client = chromadb.Client()
 
        self.collection_name = collection_name
        self.embedding_function = embedding_function
 
        # Get or create collection
        self.collection = self.client.get_or_create_collection(
            name=collection_name,
            embedding_function=embedding_function,
            metadata=collection_metadata
        )
 
    def add_documents(
        self,
        documents: List[Document],
        embeddings: Optional[List[List[float]]] = None,
        ids: Optional[List[str]] = None
    ) -> List[str]:
        """Add documents to Chroma collection"""
        if not documents:
            return []
 
        # Generate IDs if not provided
        if ids is None:
            import uuid
            ids = [str(uuid.uuid4()) for _ in documents]
 
        # Prepare data
        texts = [doc.page_content for doc in documents]
        metadatas = []
 
        for doc in documents:
            metadata = doc.metadata.copy()
            # Remove embedding from metadata to avoid duplication
            metadata.pop('embedding', None)
            # Ensure all metadata values are JSON serializable
            clean_metadata = self._clean_metadata(metadata)
            metadatas.append(clean_metadata)
 
        # Add to collection
        try:
            if embeddings is not None:
                self.collection.add(
                    documents=texts,
                    embeddings=embeddings,
                    metadatas=metadatas,
                    ids=ids
                )
            else:
                # Let Chroma generate embeddings
                self.collection.add(
                    documents=texts,
                    metadatas=metadatas,
                    ids=ids
                )
        except Exception as e:
            logger.error(f"Error adding documents to Chroma: {e}")
            raise
 
        return ids
 
    def similarity_search(
        self,
        query_embedding: List[float],
        k: int = 5,
        filter: Optional[Dict[str, Any]] = None
    ) -> List[VectorSearchResult]:
        """Search for similar documents"""
        try:
            results = self.collection.query(
                query_embeddings=[query_embedding],
                n_results=k,
                where=filter,
                include=["documents", "metadatas", "distances"]
            )
 
            search_results = []
 
            if results["documents"] and results["documents"][0]:
                for i in range(len(results["documents"][0])):
                    doc = Document(
                        page_content=results["documents"][0][i],
                        metadata=results["metadatas"][0][i] or {}
                    )
 
                    # Convert distance to similarity score (1 - distance)
                    distance = results["distances"][0][i] if results["distances"] else 0
                    score = 1 - distance
 
                    search_results.append(VectorSearchResult(
                        document=doc,
                        score=score,
                        metadata={"id": results["ids"][0][i], "distance": distance}
                    ))
 
            return search_results
 
        except Exception as e:
            logger.error(f"Error searching in Chroma: {e}")
            raise
 
    def similarity_search_with_score(
        self,
        query_embedding: List[float],
        k: int = 5,
        filter: Optional[Dict[str, Any]] = None
    ) -> List[Tuple[Document, float]]:
        """Search with scores in tuple format"""
        results = self.similarity_search(query_embedding, k, filter)
        return [(result.document, result.score) for result in results]
 
    def delete(self, ids: List[str]) -> bool:
        """Delete documents by ID"""
        try:
            self.collection.delete(ids=ids)
            return True
        except Exception as e:
            logger.error(f"Error deleting from Chroma: {e}")
            return False
 
    def update_documents(
        self,
        ids: List[str],
        documents: List[Document]
    ) -> bool:
        """Update existing documents"""
        try:
            texts = [doc.page_content for doc in documents]
            metadatas = [self._clean_metadata(doc.metadata) for doc in documents]
 
            self.collection.update(
                ids=ids,
                documents=texts,
                metadatas=metadatas
            )
            return True
        except Exception as e:
            logger.error(f"Error updating Chroma documents: {e}")
            return False
 
    def _clean_metadata(self, metadata: Dict[str, Any]) -> Dict[str, Any]:
        """Clean metadata for Chroma compatibility"""
        clean_metadata = {}
 
        for key, value in metadata.items():
            # Convert non-JSON serializable types
            if isinstance(value, (str, int, float, bool, type(None))):
                clean_metadata[key] = value
            elif isinstance(value, (list, dict)):
                try:
                    # Test if it's JSON serializable
                    json.dumps(value)
                    clean_metadata[key] = value
                except (TypeError, ValueError):
                    clean_metadata[key] = str(value)
            else:
                clean_metadata[key] = str(value)
 
        return clean_metadata
 
    def get_collection_info(self) -> Dict[str, Any]:
        """Get collection information"""
        try:
            count = self.collection.count()
            return {
                "name": self.collection_name,
                "count": count,
                "metadata": self.collection.metadata
            }
        except Exception as e:
            logger.error(f"Error getting collection info: {e}")
            return {}
 
class FAISSVectorStore(BaseVectorStore):
    """FAISS vector store implementation"""
 
    def __init__(
        self,
        embedding_dimension: int,
        index_type: str = "IndexFlatIP",
        normalize_L2: bool = True,
        save_path: Optional[str] = None
    ):
        try:
            import faiss
        except ImportError:
            raise ImportError("Please install faiss: pip install faiss-cpu or faiss-gpu")
 
        self.dimension = embedding_dimension
        self.normalize_L2 = normalize_L2
        self.save_path = save_path
 
        # Create FAISS index
        if index_type == "IndexFlatIP":
            self.index = faiss.IndexFlatIP(embedding_dimension)
        elif index_type == "IndexFlatL2":
            self.index = faiss.IndexFlatL2(embedding_dimension)
        elif index_type == "IndexHNSW":
            self.index = faiss.IndexHNSWFlat(embedding_dimension, 32)
        else:
            raise ValueError(f"Unsupported index type: {index_type}")
 
        # Storage for documents and metadata
        self.documents = []
        self.metadatas = []
        self.id_to_index = {}
        self.index_to_id = {}
 
        # Load existing index if path provided
        if save_path and os.path.exists(save_path):
            self.load_index()
 
    def add_documents(
        self,
        documents: List[Document],
        embeddings: Optional[List[List[float]]] = None,
        ids: Optional[List[str]] = None
    ) -> List[str]:
        """Add documents to FAISS index"""
        if embeddings is None:
            raise ValueError("FAISS requires embeddings to be provided")
 
        # Generate IDs if not provided
        if ids is None:
            import uuid
            ids = [str(uuid.uuid4()) for _ in documents]
 
        # Convert embeddings to numpy array
        embeddings_array = np.array(embeddings).astype('float32')
 
        if self.normalize_L2:
            faiss.normalize_L2(embeddings_array)
 
        # Add to FAISS index
        start_index = len(self.documents)
        self.index.add(embeddings_array)
 
        # Store documents and metadata
        for i, (doc, doc_id) in enumerate(zip(documents, ids)):
            index_pos = start_index + i
            self.documents.append(doc)
            self.metadatas.append(doc.metadata)
            self.id_to_index[doc_id] = index_pos
            self.index_to_id[index_pos] = doc_id
 
        # Save index if path provided
        if self.save_path:
            self.save_index()
 
        return ids
 
    def similarity_search(
        self,
        query_embedding: List[float],
        k: int = 5,
        filter: Optional[Dict[str, Any]] = None
    ) -> List[VectorSearchResult]:
        """Search for similar documents"""
        if self.index.ntotal == 0:
            return []
 
        # Prepare query embedding
        query_array = np.array([query_embedding]).astype('float32')
        if self.normalize_L2:
            faiss.normalize_L2(query_array)
 
        # Search
        scores, indices = self.index.search(query_array, min(k, self.index.ntotal))
 
        results = []
        for score, idx in zip(scores[0], indices[0]):
            if idx == -1:  # FAISS returns -1 for invalid indices
                continue
 
            doc = self.documents[idx]
            metadata = self.metadatas[idx].copy()
            doc_id = self.index_to_id.get(idx, str(idx))
 
            # Apply filter if provided
            if filter:
                if not self._matches_filter(metadata, filter):
                    continue
 
            results.append(VectorSearchResult(
                document=doc,
                score=float(score),
                metadata={"id": doc_id, "index": int(idx)}
            ))
 
        return results
 
    def similarity_search_with_score(
        self,
        query_embedding: List[float],
        k: int = 5,
        filter: Optional[Dict[str, Any]] = None
    ) -> List[Tuple[Document, float]]:
        """Search with scores in tuple format"""
        results = self.similarity_search(query_embedding, k, filter)
        return [(result.document, result.score) for result in results]
 
    def delete(self, ids: List[str]) -> bool:
        """Delete documents by ID (FAISS doesn't support deletion, so we mark as deleted)"""
        logger.warning("FAISS doesn't support deletion. Consider rebuilding the index.")
        return False
 
    def update_documents(self, ids: List[str], documents: List[Document]) -> bool:
        """Update documents (FAISS doesn't support updates)"""
        logger.warning("FAISS doesn't support updates. Consider rebuilding the index.")
        return False
 
    def _matches_filter(self, metadata: Dict[str, Any], filter: Dict[str, Any]) -> bool:
        """Check if metadata matches filter"""
        for key, value in filter.items():
            if key not in metadata or metadata[key] != value:
                return False
        return True
 
    def save_index(self):
        """Save FAISS index to disk"""
        if not self.save_path:
            return
 
        import faiss
        import pickle
 
        # Save FAISS index
        faiss.write_index(self.index, f"{self.save_path}.index")
 
        # Save documents and metadata
        with open(f"{self.save_path}.pkl", "wb") as f:
            pickle.dump({
                "documents": self.documents,
                "metadatas": self.metadatas,
                "id_to_index": self.id_to_index,
                "index_to_id": self.index_to_id
            }, f)
 
    def load_index(self):
        """Load FAISS index from disk"""
        if not self.save_path:
            return
 
        import faiss
        import pickle
 
        try:
            # Load FAISS index
            self.index = faiss.read_index(f"{self.save_path}.index")
 
            # Load documents and metadata
            with open(f"{self.save_path}.pkl", "rb") as f:
                data = pickle.load(f)
                self.documents = data["documents"]
                self.metadatas = data["metadatas"]
                self.id_to_index = data["id_to_index"]
                self.index_to_id = data["index_to_id"]
 
        except Exception as e:
            logger.error(f"Error loading FAISS index: {e}")
 
class VectorStoreManager:
    """Manager for vector store operations"""
 
    def __init__(self, vector_store: BaseVectorStore, embedding_model: BaseEmbedding):
        self.vector_store = vector_store
        self.embedding_model = embedding_model
 
    def add_documents(self, documents: List[Document]) -> List[str]:
        """Add documents with automatic embedding generation"""
        # Check if documents already have embeddings
        texts_to_embed = []
        embeddings = []
 
        for doc in documents:
            if 'embedding' in doc.metadata:
                embeddings.append(doc.metadata['embedding'])
            else:
                texts_to_embed.append(doc.page_content)
                embeddings.append(None)
 
        # Generate missing embeddings
        if texts_to_embed:
            new_embeddings = self.embedding_model.embed_documents(texts_to_embed)
            embed_idx = 0
            for i, embedding in enumerate(embeddings):
                if embedding is None:
                    embeddings[i] = new_embeddings[embed_idx]
                    embed_idx += 1
 
        return self.vector_store.add_documents(documents, embeddings)
 
    def similarity_search(
        self,
        query: str,
        k: int = 5,
        filter: Optional[Dict[str, Any]] = None
    ) -> List[VectorSearchResult]:
        """Search using text query"""
        query_embedding = self.embedding_model.embed_query(query)
        return self.vector_store.similarity_search(query_embedding, k, filter)
 
    def hybrid_search(
        self,
        query: str,
        k: int = 5,
        alpha: float = 0.7,
        keyword_search_func: Optional[callable] = None
    ) -> List[VectorSearchResult]:
        """Hybrid search combining semantic and keyword search"""
        # Semantic search
        semantic_results = self.similarity_search(query, k * 2)  # Get more results for reranking
 
        if keyword_search_func:
            # Keyword search
            keyword_results = keyword_search_func(query, k * 2)
 
            # Combine results with weighted scores
            combined_results = self._combine_search_results(
                semantic_results,
                keyword_results,
                alpha
            )
 
            return combined_results[:k]
        else:
            return semantic_results[:k]
 
    def _combine_search_results(
        self,
        semantic_results: List[VectorSearchResult],
        keyword_results: List[VectorSearchResult],
        alpha: float
    ) -> List[VectorSearchResult]:
        """Combine semantic and keyword search results"""
        # Create mapping of document content to results
        semantic_map = {result.document.page_content: result for result in semantic_results}
        keyword_map = {result.document.page_content: result for result in keyword_results}
 
        # Get all unique documents
        all_docs = set(semantic_map.keys()) | set(keyword_map.keys())
 
        combined_results = []
        for doc_content in all_docs:
            semantic_score = semantic_map.get(doc_content, VectorSearchResult(None, 0.0, {})).score
            keyword_score = keyword_map.get(doc_content, VectorSearchResult(None, 0.0, {})).score
 
            # Weighted combination
            combined_score = alpha * semantic_score + (1 - alpha) * keyword_score
 
            # Use document from semantic results preferentially
            if doc_content in semantic_map:
                result = semantic_map[doc_content]
            else:
                result = keyword_map[doc_content]
 
            combined_results.append(VectorSearchResult(
                document=result.document,
                score=combined_score,
                metadata=result.metadata
            ))
 
        # Sort by combined score
        combined_results.sort(key=lambda x: x.score, reverse=True)
        return combined_results
 
# Usage example based on your materials
def create_vector_store_system():
    """Create vector store system as shown in your examples"""
 
    # Create embedding model
    embedding_model = OpenAIEmbedding(
        model="text-embedding-3-small",
        cache_dir="./embedding_cache"
    )
 
    # Create vector store (Chroma as in your examples)
    vector_store = ChromaVectorStore(
        collection_name="rag_documents",
        persist_directory="./chroma_db"
    )
 
    # Create manager
    manager = VectorStoreManager(vector_store, embedding_model)
 
    return manager, vector_store, embedding_model

Retrieval Strategies

Advanced Retrieval Implementation

from typing import List, Dict, Any, Optional, Tuple, Union, Callable
from dataclasses import dataclass
from enum import Enum
import numpy as np
 
class RetrievalMode(Enum):
    """Retrieval modes"""
    DENSE = "dense"
    SPARSE = "sparse"
    HYBRID = "hybrid"
    MULTI_QUERY = "multi_query"
    CONTEXTUAL_COMPRESSION = "contextual_compression"
 
@dataclass
class RetrievalConfig:
    """Configuration for retrieval operations"""
    mode: RetrievalMode = RetrievalMode.HYBRID
    k: int = 5
    fetch_k: int = 20  # Fetch more for reranking
    alpha: float = 0.7  # Weight for dense vs sparse in hybrid mode
    rerank: bool = True
    rerank_model: Optional[str] = None
    filters: Optional[Dict[str, Any]] = None
    similarity_threshold: float = 0.0
    diversity_lambda: float = 0.5  # For MMR diversity
    enable_mmr: bool = False
 
class AdvancedRetriever:
    """Advanced retrieval system with multiple strategies"""
 
    def __init__(
        self,
        vector_store_manager: VectorStoreManager,
        sparse_retriever: Optional[Any] = None,
        reranker: Optional[Any] = None
    ):
        self.vector_manager = vector_store_manager
        self.sparse_retriever = sparse_retriever
        self.reranker = reranker
 
    def retrieve(
        self,
        query: str,
        config: RetrievalConfig
    ) -> List[VectorSearchResult]:
        """Main retrieval method with multiple strategies"""
 
        if config.mode == RetrievalMode.DENSE:
            return self._dense_retrieve(query, config)
        elif config.mode == RetrievalMode.SPARSE:
            return self._sparse_retrieve(query, config)
        elif config.mode == RetrievalMode.HYBRID:
            return self._hybrid_retrieve(query, config)
        elif config.mode == RetrievalMode.MULTI_QUERY:
            return self._multi_query_retrieve(query, config)
        elif config.mode == RetrievalMode.CONTEXTUAL_COMPRESSION:
            return self._contextual_compression_retrieve(query, config)
        else:
            raise ValueError(f"Unsupported retrieval mode: {config.mode}")
 
    def _dense_retrieve(
        self,
        query: str,
        config: RetrievalConfig
    ) -> List[VectorSearchResult]:
        """Dense (semantic) retrieval"""
 
        k = config.fetch_k if config.rerank else config.k
 
        results = self.vector_manager.similarity_search(
            query=query,
            k=k,
            filter=config.filters
        )
 
        # Apply similarity threshold
        results = [r for r in results if r.score >= config.similarity_threshold]
 
        # Apply reranking
        if config.rerank and self.reranker and len(results) > config.k:
            results = self.reranker.rerank(query, results, config.k)
 
        # Apply MMR for diversity
        if config.enable_mmr:
            results = self._apply_mmr(query, results, config.k, config.diversity_lambda)
 
        return results[:config.k]
 
    def _sparse_retrieve(
        self,
        query: str,
        config: RetrievalConfig
    ) -> List[VectorSearchResult]:
        """Sparse (keyword) retrieval"""
 
        if not self.sparse_retriever:
            raise ValueError("Sparse retriever not configured")
 
        k = config.fetch_k if config.rerank else config.k
 
        results = self.sparse_retriever.search(
            query=query,
            k=k,
            filters=config.filters
        )
 
        # Convert to VectorSearchResult format if needed
        if results and not isinstance(results[0], VectorSearchResult):
            results = [
                VectorSearchResult(
                    document=r[0] if isinstance(r, tuple) else r.document,
                    score=r[1] if isinstance(r, tuple) else r.score,
                    metadata=getattr(r, 'metadata', {})
                )
                for r in results
            ]
 
        # Apply reranking
        if config.rerank and self.reranker and len(results) > config.k:
            results = self.reranker.rerank(query, results, config.k)
 
        return results[:config.k]
 
    def _hybrid_retrieve(
        self,
        query: str,
        config: RetrievalConfig
    ) -> List[VectorSearchResult]:
        """Hybrid retrieval combining dense and sparse methods"""
 
        # Get dense results
        dense_results = self._dense_retrieve(
            query,
            RetrievalConfig(
                mode=RetrievalMode.DENSE,
                k=config.fetch_k,
                rerank=False,
                filters=config.filters,
                similarity_threshold=config.similarity_threshold
            )
        )
 
        # Get sparse results if available
        sparse_results = []
        if self.sparse_retriever:
            sparse_results = self._sparse_retrieve(
                query,
                RetrievalConfig(
                    mode=RetrievalMode.SPARSE,
                    k=config.fetch_k,
                    rerank=False,
                    filters=config.filters
                )
            )
 
        # Combine results
        combined_results = self._combine_results(
            dense_results,
            sparse_results,
            config.alpha
        )
 
        # Apply reranking
        if config.rerank and self.reranker and len(combined_results) > config.k:
            combined_results = self.reranker.rerank(query, combined_results, config.k)
 
        # Apply MMR for diversity
        if config.enable_mmr:
            combined_results = self._apply_mmr(
                query,
                combined_results,
                config.k,
                config.diversity_lambda
            )
 
        return combined_results[:config.k]
 
    def _multi_query_retrieve(
        self,
        query: str,
        config: RetrievalConfig
    ) -> List[VectorSearchResult]:
        """Multi-query retrieval using query expansion"""
 
        # Generate query variations
        query_variations = self._generate_query_variations(query)
 
        all_results = []
 
        # Retrieve for each query variation
        for q in query_variations:
            results = self._dense_retrieve(
                q,
                RetrievalConfig(
                    mode=RetrievalMode.DENSE,
                    k=config.fetch_k // len(query_variations),
                    rerank=False,
                    filters=config.filters
                )
            )
            all_results.extend(results)
 
        # Remove duplicates and merge scores
        merged_results = self._merge_duplicate_results(all_results)
 
        # Sort by score and take top k
        merged_results.sort(key=lambda x: x.score, reverse=True)
 
        # Apply reranking
        if config.rerank and self.reranker and len(merged_results) > config.k:
            merged_results = self.reranker.rerank(query, merged_results, config.k)
 
        return merged_results[:config.k]
 
    def _contextual_compression_retrieve(
        self,
        query: str,
        config: RetrievalConfig
    ) -> List[VectorSearchResult]:
        """Contextual compression retrieval"""
 
        # First, get more results than needed
        initial_results = self._dense_retrieve(
            query,
            RetrievalConfig(
                mode=RetrievalMode.DENSE,
                k=config.fetch_k,
                rerank=False,
                filters=config.filters
            )
        )
 
        # Apply contextual compression (filter irrelevant content)
        compressed_results = self._apply_contextual_compression(query, initial_results)
 
        # Apply reranking
        if config.rerank and self.reranker:
            compressed_results = self.reranker.rerank(query, compressed_results, config.k)
 
        return compressed_results[:config.k]
 
    def _combine_results(
        self,
        dense_results: List[VectorSearchResult],
        sparse_results: List[VectorSearchResult],
        alpha: float
    ) -> List[VectorSearchResult]:
        """Combine dense and sparse results with weighted scoring"""
 
        if not sparse_results:
            return dense_results
 
        # Create document content to result mapping
        dense_map = {r.document.page_content: r for r in dense_results}
        sparse_map = {r.document.page_content: r for r in sparse_results}
 
        # Get all unique documents
        all_docs = set(dense_map.keys()) | set(sparse_map.keys())
 
        combined_results = []
        for doc_content in all_docs:
            dense_score = dense_map.get(doc_content, VectorSearchResult(None, 0.0, {})).score
            sparse_score = sparse_map.get(doc_content, VectorSearchResult(None, 0.0, {})).score
 
            # Normalize scores to [0, 1] range
            dense_score = max(0, min(1, dense_score))
            sparse_score = max(0, min(1, sparse_score))
 
            # Weighted combination
            combined_score = alpha * dense_score + (1 - alpha) * sparse_score
 
            # Use document from dense results preferentially
            if doc_content in dense_map:
                result = dense_map[doc_content]
            else:
                result = sparse_map[doc_content]
 
            combined_results.append(VectorSearchResult(
                document=result.document,
                score=combined_score,
                metadata=result.metadata
            ))
 
        # Sort by combined score
        combined_results.sort(key=lambda x: x.score, reverse=True)
        return combined_results
 
    def _apply_mmr(
        self,
        query: str,
        results: List[VectorSearchResult],
        k: int,
        lambda_param: float
    ) -> List[VectorSearchResult]:
        """Apply Maximal Marginal Relevance for diversity"""
 
        if len(results) <= k:
            return results
 
        # Get query embedding for relevance calculation
        query_embedding = self.vector_manager.embedding_model.embed_query(query)
 
        # Get embeddings for all documents
        doc_embeddings = []
        for result in results:
            if 'embedding' in result.document.metadata:
                doc_embeddings.append(result.document.metadata['embedding'])
            else:
                # Generate embedding if not available
                embedding = self.vector_manager.embedding_model.embed_query(
                    result.document.page_content
                )
                doc_embeddings.append(embedding)
 
        # MMR algorithm
        selected_indices = []
        remaining_indices = list(range(len(results)))
 
        # Select first document (highest relevance)
        if remaining_indices:
            first_idx = 0  # Assuming results are already sorted by relevance
            selected_indices.append(first_idx)
            remaining_indices.remove(first_idx)
 
        # Select remaining documents using MMR
        while len(selected_indices) < k and remaining_indices:
            mmr_scores = []
 
            for idx in remaining_indices:
                # Relevance score (similarity to query)
                relevance = self._cosine_similarity(query_embedding, doc_embeddings[idx])
 
                # Diversity score (max similarity to selected documents)
                if selected_indices:
                    max_similarity = max(
                        self._cosine_similarity(doc_embeddings[idx], doc_embeddings[sel_idx])
                        for sel_idx in selected_indices
                    )
                else:
                    max_similarity = 0
 
                # MMR score
                mmr_score = lambda_param * relevance - (1 - lambda_param) * max_similarity
                mmr_scores.append((idx, mmr_score))
 
            # Select document with highest MMR score
            best_idx, _ = max(mmr_scores, key=lambda x: x[1])
            selected_indices.append(best_idx)
            remaining_indices.remove(best_idx)
 
        return [results[i] for i in selected_indices]
 
    def _generate_query_variations(self, query: str, num_variations: int = 3) -> List[str]:
        """Generate query variations for multi-query retrieval"""
        # This is a simplified implementation
        # In practice, you'd use an LLM to generate variations
        variations = [query]
 
        # Simple variations (you can enhance this with LLM-based generation)
        words = query.split()
        if len(words) > 1:
            # Reorder words
            variations.append(" ".join(reversed(words)))
 
            # Add synonyms or related terms (simplified)
            synonym_map = {
                "what": "which",
                "how": "in what way",
                "where": "at what location",
                "when": "at what time"
            }
 
            for i, word in enumerate(words):
                if word.lower() in synonym_map:
                    new_words = words.copy()
                    new_words[i] = synonym_map[word.lower()]
                    variations.append(" ".join(new_words))
 
        return variations[:num_variations]
 
    def _merge_duplicate_results(
        self,
        results: List[VectorSearchResult]
    ) -> List[VectorSearchResult]:
        """Merge duplicate results by combining scores"""
 
        doc_to_result = {}
 
        for result in results:
            doc_content = result.document.page_content
 
            if doc_content in doc_to_result:
                # Combine scores (take maximum)
                existing_result = doc_to_result[doc_content]
                if result.score > existing_result.score:
                    doc_to_result[doc_content] = result
            else:
                doc_to_result[doc_content] = result
 
        return list(doc_to_result.values())
 
    def _apply_contextual_compression(
        self,
        query: str,
        results: List[VectorSearchResult]
    ) -> List[VectorSearchResult]:
        """Apply contextual compression to filter irrelevant content"""
        # This is a simplified implementation
        # In practice, you'd use a compression model to filter content
 
        compressed_results = []
 
        for result in results:
            # Simple relevance check based on keyword overlap
            query_words = set(query.lower().split())
            doc_words = set(result.document.page_content.lower().split())
 
            # Calculate overlap ratio
            overlap = len(query_words & doc_words)
            total_unique = len(query_words | doc_words)
 
            if total_unique > 0:
                relevance_ratio = overlap / len(query_words)
 
                # Keep result if it has sufficient relevance
                if relevance_ratio > 0.1:  # Threshold can be adjusted
                    compressed_results.append(result)
 
        return compressed_results
 
    def _cosine_similarity(self, vec1: List[float], vec2: List[float]) -> float:
        """Calculate cosine similarity between two vectors"""
        vec1 = np.array(vec1)
        vec2 = np.array(vec2)
 
        dot_product = np.dot(vec1, vec2)
        norm_a = np.linalg.norm(vec1)
        norm_b = np.linalg.norm(vec2)
 
        if norm_a == 0 or norm_b == 0:
            return 0.0
 
        return dot_product / (norm_a * norm_b)
 
class CrossEncoderReranker:
    """Cross-encoder reranker for improving retrieval results"""
 
    def __init__(
        self,
        model_name: str = "cross-encoder/ms-marco-MiniLM-L-6-v2",
        device: Optional[str] = None
    ):
        try:
            from sentence_transformers import CrossEncoder
        except ImportError:
            raise ImportError("Please install sentence-transformers for reranking")
 
        self.model = CrossEncoder(model_name, device=device)
 
    def rerank(
        self,
        query: str,
        results: List[VectorSearchResult],
        top_k: int
    ) -> List[VectorSearchResult]:
        """Rerank results using cross-encoder"""
 
        if len(results) <= top_k:
            return results
 
        # Prepare query-document pairs
        pairs = [
            (query, result.document.page_content)
            for result in results
        ]
 
        # Get cross-encoder scores
        scores = self.model.predict(pairs)
 
        # Update results with new scores
        reranked_results = []
        for i, result in enumerate(results):
            new_result = VectorSearchResult(
                document=result.document,
                score=float(scores[i]),
                metadata=result.metadata
            )
            reranked_results.append(new_result)
 
        # Sort by new scores
        reranked_results.sort(key=lambda x: x.score, reverse=True)
 
        return reranked_results[:top_k]
 
# Usage example
def create_advanced_retrieval_system():
    """Create advanced retrieval system"""
 
    # Create vector store manager
    manager, vector_store, embedding_model = create_vector_store_system()
 
    # Create reranker
    reranker = CrossEncoderReranker()
 
    # Create advanced retriever
    retriever = AdvancedRetriever(
        vector_store_manager=manager,
        reranker=reranker
    )
 
    return retriever, manager

This comprehensive implementation provides:

  1. Multiple embedding models with caching and optimization
  2. Vector storage systems supporting Chroma and FAISS
  3. Advanced retrieval strategies including hybrid, multi-query, and MMR
  4. Reranking capabilities using cross-encoder models
  5. Performance optimizations including batching and caching
  6. Comprehensive error handling and logging

In Chapter 5: Production Deployment, we'll cover deployment strategies, monitoring, and optimization techniques for production RAG systems.