msse-ai-engineering / src /guardrails /source_attribution.py
sethmcknight
Refactor test cases for improved readability and consistency
159faf0
"""
Source Attribution - Citation and source tracking system
This module manages citation generation, source ranking, and quote extraction
for RAG responses with proper source attribution.
"""
import logging
import re
from dataclasses import dataclass
from typing import Any, Dict, List, Optional
logger = logging.getLogger(__name__)
@dataclass
class Citation:
"""Structured citation for source attribution."""
document: str
section: Optional[str] = None
confidence: float = 0.0
excerpt: str = ""
page: Optional[int] = None
url: Optional[str] = None
@dataclass
class Quote:
"""Extracted quote from source document."""
text: str
source_document: str
relevance_score: float
context_before: str = ""
context_after: str = ""
section: Optional[str] = None
@dataclass
class RankedSource:
"""Source document with ranking and metadata."""
document: str
relevance_score: float
reliability_score: float
excerpt: str
metadata: Dict[str, Any]
rank: int = 0
class SourceAttributor:
"""
Manages citation generation and source tracking for RAG responses.
Provides:
- Structured citation formatting
- Source ranking by relevance and reliability
- Quote extraction from source documents
- Citation validation and verification
"""
def __init__(self, config: Optional[Dict[str, Any]] = None):
"""
Initialize SourceAttributor with configuration.
Args:
config: Configuration dictionary for attribution settings
"""
self.config = config or self._get_default_config()
logger.info("SourceAttributor initialized")
def _get_default_config(self) -> Dict[str, Any]:
"""Get default attribution configuration."""
return {
"max_citations": 5,
"min_confidence_for_citation": 0.3,
"citation_format": "numbered", # "numbered", "parenthetical", "footnote"
"include_excerpts": True,
"max_excerpt_length": 150,
"require_document_names": True,
"prefer_specific_sections": True,
}
def generate_citations(self, response: str, sources: List[Dict[str, Any]]) -> List[Citation]:
"""
Generate proper citations for response based on sources.
Args:
response: Generated response text
sources: Source documents with metadata
Returns:
List of Citation objects for the response
"""
try:
citations = []
# Rank sources by relevance and reliability
ranked_sources = self.rank_sources(sources, [])
# Generate citations for top sources
for i, ranked_source in enumerate(ranked_sources[: self.config["max_citations"]]):
if ranked_source.relevance_score >= self.config["min_confidence_for_citation"]:
citation = self._create_citation(ranked_source, i + 1)
citations.append(citation)
# Ensure citations are properly embedded in response
self._validate_citation_presence(response, citations)
logger.debug(f"Generated {len(citations)} citations")
return citations
except Exception as e:
logger.error(f"Citation generation error: {e}")
return []
def extract_quotes(self, response: str, documents: List[Dict[str, Any]]) -> List[Quote]:
"""
Extract relevant quotes from source documents.
Args:
response: Generated response text
documents: Source documents to extract quotes from
Returns:
List of Quote objects with extracted text
"""
try:
quotes = []
for doc in documents:
content = doc.get("content", "")
document_name = doc.get("metadata", {}).get("filename", "unknown")
# Find quotes that appear in both response and document
extracted_quotes = self._find_matching_quotes(response, content)
for quote_text in extracted_quotes:
relevance = self._calculate_quote_relevance(quote_text, response)
quote = Quote(
text=quote_text,
source_document=document_name,
relevance_score=relevance,
section=doc.get("metadata", {}).get("section"),
)
quotes.append(quote)
# Sort by relevance
quotes.sort(key=lambda q: q.relevance_score, reverse=True)
logger.debug(f"Extracted {len(quotes)} quotes")
return quotes
except Exception as e:
logger.error(f"Quote extraction error: {e}")
return []
def rank_sources(self, sources: List[Dict[str, Any]], relevance_scores: List[float]) -> List[RankedSource]:
"""
Rank sources by relevance and reliability.
Args:
sources: Source documents with metadata
relevance_scores: Pre-calculated relevance scores (optional)
Returns:
List of RankedSource objects sorted by ranking
"""
try:
ranked_sources = []
for i, source in enumerate(sources):
# Use provided relevance or calculate
if i < len(relevance_scores):
relevance = relevance_scores[i]
else:
relevance = source.get("relevance_score", 0.5)
# Calculate reliability score
reliability = self._calculate_reliability(source)
# Create ranked source
ranked_source = RankedSource(
document=source.get("metadata", {}).get("filename", "unknown"),
relevance_score=relevance,
reliability_score=reliability,
excerpt=self._create_excerpt(source),
metadata=source.get("metadata", {}),
)
ranked_sources.append(ranked_source)
# Sort by combined score (relevance + reliability)
ranked_sources.sort(
key=lambda rs: (rs.relevance_score + rs.reliability_score) / 2,
reverse=True,
)
# Assign ranks
for i, ranked_source in enumerate(ranked_sources):
ranked_source.rank = i + 1
logger.debug(f"Ranked {len(ranked_sources)} sources")
return ranked_sources
except Exception as e:
logger.error(f"Source ranking error: {e}")
return []
def format_citation_text(self, citations: List[Citation]) -> str:
"""
Format citations as text for inclusion in response.
Args:
citations: List of Citation objects
Returns:
Formatted citation text
"""
if not citations:
return ""
citation_format = self.config["citation_format"]
if citation_format == "numbered":
return self._format_numbered_citations(citations)
elif citation_format == "parenthetical":
return self._format_parenthetical_citations(citations)
elif citation_format == "footnote":
return self._format_footnote_citations(citations)
else:
return self._format_numbered_citations(citations)
def validate_citations(self, response: str, citations: List[Citation]) -> Dict[str, bool]:
"""
Validate that citations are properly referenced in response.
Args:
response: Response text to validate
citations: Citations that should be referenced
Returns:
Dictionary mapping citation to validation status
"""
validation_results = {}
for citation in citations:
is_valid = self._is_citation_referenced(response, citation)
validation_results[citation.document] = is_valid
return validation_results
def _create_citation(self, ranked_source: RankedSource, number: int) -> Citation:
"""Create Citation object from ranked source."""
return Citation(
document=ranked_source.document,
section=ranked_source.metadata.get("section"),
confidence=ranked_source.relevance_score,
excerpt=ranked_source.excerpt,
page=ranked_source.metadata.get("page"),
url=ranked_source.metadata.get("url"),
)
def _calculate_reliability(self, source: Dict[str, Any]) -> float:
"""Calculate reliability score for source document."""
# Base reliability
reliability = 0.7
# Boost for official documents
filename = source.get("metadata", {}).get("filename", "").lower()
if any(term in filename for term in ["policy", "handbook", "guideline", "procedure", "manual"]):
reliability += 0.2
# Boost for recent documents (if timestamp available)
# This would need timestamp metadata
# if 'last_modified' in source.get('metadata', {}):
# # Add recency bonus
# pass
# Boost for documents with clear structure
content = source.get("content", "")
if any(marker in content.lower() for marker in ["section", "article", "paragraph", "clause"]):
reliability += 0.1
return min(reliability, 1.0)
def _create_excerpt(self, source: Dict[str, Any]) -> str:
"""Create excerpt from source document."""
content = source.get("content", "")
max_length = self.config["max_excerpt_length"]
if len(content) <= max_length:
return content
# Try to find a good breaking point
excerpt = content[:max_length]
last_sentence = excerpt.rfind(".")
last_space = excerpt.rfind(" ")
if last_sentence > max_length * 0.7:
return excerpt[: last_sentence + 1]
elif last_space > max_length * 0.8:
return excerpt[:last_space] + "..."
else:
return excerpt + "..."
def _find_matching_quotes(self, response: str, document_content: str) -> List[str]:
"""Find quotes that appear in both response and document."""
quotes = []
# Look for phrases that appear in both
response_sentences = [s.strip() for s in response.split(".") if s.strip()]
doc_sentences = [s.strip() for s in document_content.split(".") if s.strip()]
for resp_sent in response_sentences:
for doc_sent in doc_sentences:
# Check for substantial overlap
if len(resp_sent) > 20 and len(doc_sent) > 20:
if self._calculate_sentence_similarity(resp_sent, doc_sent) > 0.7:
quotes.append(doc_sent)
return list(set(quotes)) # Remove duplicates
def _calculate_sentence_similarity(self, sent1: str, sent2: str) -> float:
"""Calculate similarity between two sentences."""
words1 = set(sent1.lower().split())
words2 = set(sent2.lower().split())
intersection = words1.intersection(words2)
union = words1.union(words2)
if not union:
return 0.0
return len(intersection) / len(union)
def _calculate_quote_relevance(self, quote: str, response: str) -> float:
"""Calculate relevance of quote to response."""
return self._calculate_sentence_similarity(quote, response)
def _validate_citation_presence(self, response: str, citations: List[Citation]) -> None:
"""Validate that citations are present in response."""
if not self.config["require_document_names"]:
return
for citation in citations:
if citation.document.lower() not in response.lower():
logger.warning(f"Citation {citation.document} not found in response")
def _format_numbered_citations(self, citations: List[Citation]) -> str:
"""Format citations in numbered format."""
if not citations:
return ""
formatted = "\n\n**Sources:**\n"
for i, citation in enumerate(citations, 1):
formatted += f"{i}. {citation.document}"
if citation.section:
formatted += f" ({citation.section})"
if self.config["include_excerpts"] and citation.excerpt:
formatted += f'\n "{citation.excerpt}"'
formatted += "\n"
return formatted
def _format_parenthetical_citations(self, citations: List[Citation]) -> str:
"""Format citations in parenthetical format."""
if not citations:
return ""
# Simple format: (Document1, Document2)
doc_names = [citation.document for citation in citations]
return f" ({', '.join(doc_names)})"
def _format_footnote_citations(self, citations: List[Citation]) -> str:
"""Format citations as footnotes."""
if not citations:
return ""
formatted = "\n\n**References:**\n"
for i, citation in enumerate(citations, 1):
formatted += f"[{i}] {citation.document}"
if citation.section:
formatted += f", {citation.section}"
formatted += "\n"
return formatted
def _is_citation_referenced(self, response: str, citation: Citation) -> bool:
"""Check if citation is properly referenced in response."""
response_lower = response.lower()
doc_name_lower = citation.document.lower()
# Look for document name mentions
if doc_name_lower in response_lower:
return True
# Look for citation patterns
citation_patterns = [
rf"\[.*{re.escape(citation.document)}.*\]",
rf"\(.*{re.escape(citation.document)}.*\)",
]
return any(re.search(pattern, response, re.IGNORECASE) for pattern in citation_patterns)