""" Source Attribution - Citation and source tracking system This module manages citation generation, source ranking, and quote extraction for RAG responses with proper source attribution. """ import logging import re from dataclasses import dataclass from typing import Any, Dict, List, Optional logger = logging.getLogger(__name__) @dataclass class Citation: """Structured citation for source attribution.""" document: str section: Optional[str] = None confidence: float = 0.0 excerpt: str = "" page: Optional[int] = None url: Optional[str] = None @dataclass class Quote: """Extracted quote from source document.""" text: str source_document: str relevance_score: float context_before: str = "" context_after: str = "" section: Optional[str] = None @dataclass class RankedSource: """Source document with ranking and metadata.""" document: str relevance_score: float reliability_score: float excerpt: str metadata: Dict[str, Any] rank: int = 0 class SourceAttributor: """ Manages citation generation and source tracking for RAG responses. Provides: - Structured citation formatting - Source ranking by relevance and reliability - Quote extraction from source documents - Citation validation and verification """ def __init__(self, config: Optional[Dict[str, Any]] = None): """ Initialize SourceAttributor with configuration. Args: config: Configuration dictionary for attribution settings """ self.config = config or self._get_default_config() logger.info("SourceAttributor initialized") def _get_default_config(self) -> Dict[str, Any]: """Get default attribution configuration.""" return { "max_citations": 5, "min_confidence_for_citation": 0.3, "citation_format": "numbered", # "numbered", "parenthetical", "footnote" "include_excerpts": True, "max_excerpt_length": 150, "require_document_names": True, "prefer_specific_sections": True, } def generate_citations(self, response: str, sources: List[Dict[str, Any]]) -> List[Citation]: """ Generate proper citations for response based on sources. Args: response: Generated response text sources: Source documents with metadata Returns: List of Citation objects for the response """ try: citations = [] # Rank sources by relevance and reliability ranked_sources = self.rank_sources(sources, []) # Generate citations for top sources for i, ranked_source in enumerate(ranked_sources[: self.config["max_citations"]]): if ranked_source.relevance_score >= self.config["min_confidence_for_citation"]: citation = self._create_citation(ranked_source, i + 1) citations.append(citation) # Ensure citations are properly embedded in response self._validate_citation_presence(response, citations) logger.debug(f"Generated {len(citations)} citations") return citations except Exception as e: logger.error(f"Citation generation error: {e}") return [] def extract_quotes(self, response: str, documents: List[Dict[str, Any]]) -> List[Quote]: """ Extract relevant quotes from source documents. Args: response: Generated response text documents: Source documents to extract quotes from Returns: List of Quote objects with extracted text """ try: quotes = [] for doc in documents: content = doc.get("content", "") document_name = doc.get("metadata", {}).get("filename", "unknown") # Find quotes that appear in both response and document extracted_quotes = self._find_matching_quotes(response, content) for quote_text in extracted_quotes: relevance = self._calculate_quote_relevance(quote_text, response) quote = Quote( text=quote_text, source_document=document_name, relevance_score=relevance, section=doc.get("metadata", {}).get("section"), ) quotes.append(quote) # Sort by relevance quotes.sort(key=lambda q: q.relevance_score, reverse=True) logger.debug(f"Extracted {len(quotes)} quotes") return quotes except Exception as e: logger.error(f"Quote extraction error: {e}") return [] def rank_sources(self, sources: List[Dict[str, Any]], relevance_scores: List[float]) -> List[RankedSource]: """ Rank sources by relevance and reliability. Args: sources: Source documents with metadata relevance_scores: Pre-calculated relevance scores (optional) Returns: List of RankedSource objects sorted by ranking """ try: ranked_sources = [] for i, source in enumerate(sources): # Use provided relevance or calculate if i < len(relevance_scores): relevance = relevance_scores[i] else: relevance = source.get("relevance_score", 0.5) # Calculate reliability score reliability = self._calculate_reliability(source) # Create ranked source ranked_source = RankedSource( document=source.get("metadata", {}).get("filename", "unknown"), relevance_score=relevance, reliability_score=reliability, excerpt=self._create_excerpt(source), metadata=source.get("metadata", {}), ) ranked_sources.append(ranked_source) # Sort by combined score (relevance + reliability) ranked_sources.sort( key=lambda rs: (rs.relevance_score + rs.reliability_score) / 2, reverse=True, ) # Assign ranks for i, ranked_source in enumerate(ranked_sources): ranked_source.rank = i + 1 logger.debug(f"Ranked {len(ranked_sources)} sources") return ranked_sources except Exception as e: logger.error(f"Source ranking error: {e}") return [] def format_citation_text(self, citations: List[Citation]) -> str: """ Format citations as text for inclusion in response. Args: citations: List of Citation objects Returns: Formatted citation text """ if not citations: return "" citation_format = self.config["citation_format"] if citation_format == "numbered": return self._format_numbered_citations(citations) elif citation_format == "parenthetical": return self._format_parenthetical_citations(citations) elif citation_format == "footnote": return self._format_footnote_citations(citations) else: return self._format_numbered_citations(citations) def validate_citations(self, response: str, citations: List[Citation]) -> Dict[str, bool]: """ Validate that citations are properly referenced in response. Args: response: Response text to validate citations: Citations that should be referenced Returns: Dictionary mapping citation to validation status """ validation_results = {} for citation in citations: is_valid = self._is_citation_referenced(response, citation) validation_results[citation.document] = is_valid return validation_results def _create_citation(self, ranked_source: RankedSource, number: int) -> Citation: """Create Citation object from ranked source.""" return Citation( document=ranked_source.document, section=ranked_source.metadata.get("section"), confidence=ranked_source.relevance_score, excerpt=ranked_source.excerpt, page=ranked_source.metadata.get("page"), url=ranked_source.metadata.get("url"), ) def _calculate_reliability(self, source: Dict[str, Any]) -> float: """Calculate reliability score for source document.""" # Base reliability reliability = 0.7 # Boost for official documents filename = source.get("metadata", {}).get("filename", "").lower() if any(term in filename for term in ["policy", "handbook", "guideline", "procedure", "manual"]): reliability += 0.2 # Boost for recent documents (if timestamp available) # This would need timestamp metadata # if 'last_modified' in source.get('metadata', {}): # # Add recency bonus # pass # Boost for documents with clear structure content = source.get("content", "") if any(marker in content.lower() for marker in ["section", "article", "paragraph", "clause"]): reliability += 0.1 return min(reliability, 1.0) def _create_excerpt(self, source: Dict[str, Any]) -> str: """Create excerpt from source document.""" content = source.get("content", "") max_length = self.config["max_excerpt_length"] if len(content) <= max_length: return content # Try to find a good breaking point excerpt = content[:max_length] last_sentence = excerpt.rfind(".") last_space = excerpt.rfind(" ") if last_sentence > max_length * 0.7: return excerpt[: last_sentence + 1] elif last_space > max_length * 0.8: return excerpt[:last_space] + "..." else: return excerpt + "..." def _find_matching_quotes(self, response: str, document_content: str) -> List[str]: """Find quotes that appear in both response and document.""" quotes = [] # Look for phrases that appear in both response_sentences = [s.strip() for s in response.split(".") if s.strip()] doc_sentences = [s.strip() for s in document_content.split(".") if s.strip()] for resp_sent in response_sentences: for doc_sent in doc_sentences: # Check for substantial overlap if len(resp_sent) > 20 and len(doc_sent) > 20: if self._calculate_sentence_similarity(resp_sent, doc_sent) > 0.7: quotes.append(doc_sent) return list(set(quotes)) # Remove duplicates def _calculate_sentence_similarity(self, sent1: str, sent2: str) -> float: """Calculate similarity between two sentences.""" words1 = set(sent1.lower().split()) words2 = set(sent2.lower().split()) intersection = words1.intersection(words2) union = words1.union(words2) if not union: return 0.0 return len(intersection) / len(union) def _calculate_quote_relevance(self, quote: str, response: str) -> float: """Calculate relevance of quote to response.""" return self._calculate_sentence_similarity(quote, response) def _validate_citation_presence(self, response: str, citations: List[Citation]) -> None: """Validate that citations are present in response.""" if not self.config["require_document_names"]: return for citation in citations: if citation.document.lower() not in response.lower(): logger.warning(f"Citation {citation.document} not found in response") def _format_numbered_citations(self, citations: List[Citation]) -> str: """Format citations in numbered format.""" if not citations: return "" formatted = "\n\n**Sources:**\n" for i, citation in enumerate(citations, 1): formatted += f"{i}. {citation.document}" if citation.section: formatted += f" ({citation.section})" if self.config["include_excerpts"] and citation.excerpt: formatted += f'\n "{citation.excerpt}"' formatted += "\n" return formatted def _format_parenthetical_citations(self, citations: List[Citation]) -> str: """Format citations in parenthetical format.""" if not citations: return "" # Simple format: (Document1, Document2) doc_names = [citation.document for citation in citations] return f" ({', '.join(doc_names)})" def _format_footnote_citations(self, citations: List[Citation]) -> str: """Format citations as footnotes.""" if not citations: return "" formatted = "\n\n**References:**\n" for i, citation in enumerate(citations, 1): formatted += f"[{i}] {citation.document}" if citation.section: formatted += f", {citation.section}" formatted += "\n" return formatted def _is_citation_referenced(self, response: str, citation: Citation) -> bool: """Check if citation is properly referenced in response.""" response_lower = response.lower() doc_name_lower = citation.document.lower() # Look for document name mentions if doc_name_lower in response_lower: return True # Look for citation patterns citation_patterns = [ rf"\[.*{re.escape(citation.document)}.*\]", rf"\(.*{re.escape(citation.document)}.*\)", ] return any(re.search(pattern, response, re.IGNORECASE) for pattern in citation_patterns)