msse-ai-engineering / src /guardrails /quality_metrics.py
sethmcknight
Refactor test cases for improved readability and consistency
159faf0
"""
Quality Metrics - Response quality scoring algorithms
This module provides comprehensive quality assessment for RAG responses
including relevance, completeness, coherence, and source fidelity scoring.
"""
import logging
import re
from dataclasses import dataclass
from typing import Any, Dict, List, Optional, Set, Tuple
logger = logging.getLogger(__name__)
@dataclass
class QualityScore:
"""Comprehensive quality score for RAG response."""
overall_score: float
relevance_score: float
completeness_score: float
coherence_score: float
source_fidelity_score: float
professionalism_score: float
# Additional metrics
response_length: int
citation_count: int
source_count: int
confidence_level: str # "high", "medium", "low"
# Quality indicators
meets_threshold: bool
strengths: List[str]
weaknesses: List[str]
recommendations: List[str]
class QualityMetrics:
"""
Comprehensive quality assessment system for RAG responses.
Provides detailed scoring across multiple dimensions:
- Relevance: How well response addresses the query
- Completeness: Adequacy of information provided
- Coherence: Logical structure and flow
- Source Fidelity: Alignment with source documents
- Professionalism: Appropriate business tone
"""
def __init__(self, config: Optional[Dict[str, Any]] = None):
"""
Initialize QualityMetrics with configuration.
Args:
config: Configuration dictionary for quality thresholds
"""
self.config = config or self._get_default_config()
logger.info("QualityMetrics initialized")
def _get_default_config(self) -> Dict[str, Any]:
"""Get default quality assessment configuration."""
return {
"quality_threshold": 0.7,
"relevance_weight": 0.3,
"completeness_weight": 0.25,
"coherence_weight": 0.2,
"source_fidelity_weight": 0.25,
"min_response_length": 50,
"target_response_length": 300,
"max_response_length": 1000,
"min_citation_count": 1,
"preferred_source_count": 3,
"enable_detailed_analysis": True,
}
def calculate_quality_score(
self,
response: str,
query: str,
sources: List[Dict[str, Any]],
context: Optional[str] = None,
) -> QualityScore:
"""
Calculate comprehensive quality score for response.
Args:
response: Generated response text
query: Original user query
sources: Source documents used
context: Optional additional context
Returns:
QualityScore with detailed metrics and recommendations
"""
try:
# Calculate individual dimension scores
relevance = self._calculate_relevance_score(response, query)
completeness = self._calculate_completeness_score(response, query)
coherence = self._calculate_coherence_score(response)
source_fidelity = self._calculate_source_fidelity_score(response, sources)
professionalism = self._calculate_professionalism_score(response)
# Calculate weighted overall score
overall = self._calculate_overall_score(
relevance, completeness, coherence, source_fidelity, professionalism
)
# Analyze response characteristics
response_analysis = self._analyze_response_characteristics(response, sources)
# Determine confidence level
confidence_level = self._determine_confidence_level(overall, response_analysis)
# Generate insights
strengths, weaknesses, recommendations = self._generate_quality_insights(
relevance,
completeness,
coherence,
source_fidelity,
professionalism,
response_analysis,
)
return QualityScore(
overall_score=overall,
relevance_score=relevance,
completeness_score=completeness,
coherence_score=coherence,
source_fidelity_score=source_fidelity,
professionalism_score=professionalism,
response_length=response_analysis["length"],
citation_count=response_analysis["citation_count"],
source_count=response_analysis["source_count"],
confidence_level=confidence_level,
meets_threshold=overall >= self.config["quality_threshold"],
strengths=strengths,
weaknesses=weaknesses,
recommendations=recommendations,
)
except Exception as e:
logger.error(f"Quality scoring error: {e}")
return QualityScore(
overall_score=0.0,
relevance_score=0.0,
completeness_score=0.0,
coherence_score=0.0,
source_fidelity_score=0.0,
professionalism_score=0.0,
response_length=len(response),
citation_count=0,
source_count=len(sources),
confidence_level="low",
meets_threshold=False,
strengths=[],
weaknesses=["Error in quality assessment"],
recommendations=["Retry quality assessment"],
)
def _calculate_relevance_score(self, response: str, query: str) -> float:
"""Calculate how well response addresses the query."""
if not query.strip():
return 1.0 # No query to compare against
# Extract key terms from query
query_terms = self._extract_key_terms(query)
response_terms = self._extract_key_terms(response)
if not query_terms:
return 1.0
# Calculate term overlap
overlap = len(query_terms.intersection(response_terms))
term_coverage = overlap / len(query_terms)
# Check for semantic relevance patterns
semantic_relevance = self._check_semantic_relevance(response, query)
# Combine scores
relevance = (term_coverage * 0.6) + (semantic_relevance * 0.4)
return min(relevance, 1.0)
def _calculate_completeness_score(self, response: str, query: str) -> float:
"""Calculate how completely the response addresses the query."""
response_length = len(response)
target_length = self.config["target_response_length"]
min_length = self.config["min_response_length"]
# Length-based completeness
if response_length < min_length:
length_score = response_length / min_length * 0.5
elif response_length <= target_length:
length_score = 0.5 + (response_length - min_length) / (target_length - min_length) * 0.5
else:
# Diminishing returns for very long responses
excess = response_length - target_length
penalty = min(excess / target_length * 0.2, 0.3)
length_score = 1.0 - penalty
# Structure-based completeness
structure_score = self._assess_response_structure(response)
# Information density
density_score = self._assess_information_density(response, query)
# Combine scores
completeness = (length_score * 0.4) + (structure_score * 0.3) + (density_score * 0.3)
return min(max(completeness, 0.0), 1.0)
def _calculate_coherence_score(self, response: str) -> float:
"""Calculate logical structure and coherence of response."""
sentences = [s.strip() for s in response.split(".") if s.strip()]
if len(sentences) < 2:
return 0.8 # Short responses are typically coherent
# Check for logical flow indicators
flow_indicators = [
"however",
"therefore",
"additionally",
"furthermore",
"consequently",
"moreover",
"nevertheless",
"in addition",
"as a result",
"for example",
]
response_lower = response.lower()
flow_score = sum(1 for indicator in flow_indicators if indicator in response_lower)
flow_score = min(flow_score / 3, 1.0) # Normalize
# Check for repetition (negative indicator)
unique_sentences = len(set(s.lower() for s in sentences))
repetition_score = unique_sentences / len(sentences)
# Check for topic consistency
consistency_score = self._assess_topic_consistency(sentences)
# Check for clear conclusion/summary
conclusion_score = self._has_clear_conclusion(response)
# Combine scores
coherence = flow_score * 0.3 + repetition_score * 0.3 + consistency_score * 0.2 + conclusion_score * 0.2
return min(coherence, 1.0)
def _calculate_source_fidelity_score(self, response: str, sources: List[Dict[str, Any]]) -> float:
"""Calculate alignment between response and source documents."""
if not sources:
return 0.5 # Neutral score if no sources
# Citation presence and quality
citation_score = self._assess_citation_quality(response, sources)
# Content alignment with sources
alignment_score = self._assess_content_alignment(response, sources)
# Source coverage (how many sources are referenced)
coverage_score = self._assess_source_coverage(response, sources)
# Factual consistency check
consistency_score = self._check_factual_consistency(response, sources)
# Combine scores
fidelity = citation_score * 0.3 + alignment_score * 0.4 + coverage_score * 0.15 + consistency_score * 0.15
return min(fidelity, 1.0)
def _calculate_professionalism_score(self, response: str) -> float:
"""Calculate professional tone and appropriateness."""
# Check for professional language patterns
professional_indicators = [
r"\b(?:please|thank you|according to|based on|our policy|guidelines)\b",
r"\b(?:recommend|suggest|advise|ensure|confirm)\b",
r"\b(?:appropriate|professional|compliance|requirements)\b",
]
professional_count = sum(
len(re.findall(pattern, response, re.IGNORECASE)) for pattern in professional_indicators
)
professional_score = min(professional_count / 3, 1.0)
# Check for unprofessional patterns
unprofessional_patterns = [
r"\b(?:yo|hey|wassup|gonna|wanna)\b",
r"\b(?:lol|omg|wtf|tbh|idk)\b",
r"[!]{2,}|[?]{2,}",
r"\b(?:stupid|dumb|crazy|insane)\b",
]
unprofessional_count = sum(
len(re.findall(pattern, response, re.IGNORECASE)) for pattern in unprofessional_patterns
)
unprofessional_penalty = min(unprofessional_count * 0.3, 0.8)
# Check tone appropriateness
tone_score = self._assess_tone_appropriateness(response)
# Combine scores
professionalism = professional_score + tone_score - unprofessional_penalty
return min(max(professionalism, 0.0), 1.0)
def _calculate_overall_score(
self,
relevance: float,
completeness: float,
coherence: float,
source_fidelity: float,
professionalism: float,
) -> float:
"""Calculate weighted overall quality score."""
weights = self.config
overall = (
relevance * weights["relevance_weight"]
+ completeness * weights["completeness_weight"]
+ coherence * weights["coherence_weight"]
+ source_fidelity * weights["source_fidelity_weight"]
+ professionalism * 0.0 # Not weighted in overall for now
)
return min(max(overall, 0.0), 1.0)
def _extract_key_terms(self, text: str) -> Set[str]:
"""Extract key terms from text for relevance analysis."""
# Simple keyword extraction (can be enhanced with NLP)
words = re.findall(r"\b\w+\b", text.lower())
# Filter out common stop words
stop_words = {
"the",
"a",
"an",
"and",
"or",
"but",
"in",
"on",
"at",
"to",
"for",
"of",
"with",
"by",
"from",
"up",
"about",
"into",
"through",
"during",
"before",
"after",
"above",
"below",
"between",
"among",
"is",
"are",
"was",
"were",
"be",
"been",
"being",
"have",
"has",
"had",
"do",
"does",
"did",
"will",
"would",
"could",
"should",
"may",
"might",
"can",
"what",
"where",
"when",
"why",
"how",
"this",
"that",
"these",
"those",
}
return {word for word in words if len(word) > 2 and word not in stop_words}
def _check_semantic_relevance(self, response: str, query: str) -> float:
"""Check semantic relevance between response and query."""
# Look for question-answer patterns
query_lower = query.lower()
response_lower = response.lower()
relevance_patterns = [
(r"\bwhat\b", r"\b(?:is|are|include|involves)\b"),
(r"\bhow\b", r"\b(?:by|through|via|process|step)\b"),
(r"\bwhen\b", r"\b(?:during|after|before|time|date)\b"),
(r"\bwhere\b", r"\b(?:at|in|location|place)\b"),
(r"\bwhy\b", r"\b(?:because|due to|reason|purpose)\b"),
(r"\bpolicy\b", r"\b(?:policy|guideline|rule|procedure)\b"),
]
relevance_score = 0.0
for query_pattern, response_pattern in relevance_patterns:
if re.search(query_pattern, query_lower) and re.search(response_pattern, response_lower):
relevance_score += 0.2
return min(relevance_score, 1.0)
def _assess_response_structure(self, response: str) -> float:
"""Assess structural completeness of response."""
structure_score = 0.0
# Check for introduction/context
intro_patterns = [r"according to", r"based on", r"our policy", r"the guideline"]
if any(re.search(pattern, response, re.IGNORECASE) for pattern in intro_patterns):
structure_score += 0.3
# Check for main content/explanation
if len(response.split(".")) >= 2:
structure_score += 0.4
# Check for conclusion/summary
conclusion_patterns = [
r"in summary",
r"therefore",
r"as a result",
r"please contact",
]
if any(re.search(pattern, response, re.IGNORECASE) for pattern in conclusion_patterns):
structure_score += 0.3
return min(structure_score, 1.0)
def _assess_information_density(self, response: str, query: str) -> float:
"""Assess information density relative to query complexity."""
# Simple heuristic based on content richness
words = len(response.split())
sentences = len([s for s in response.split(".") if s.strip()])
if sentences == 0:
return 0.0
avg_sentence_length = words / sentences
# Optimal range: 15-25 words per sentence for policy content
if 15 <= avg_sentence_length <= 25:
density_score = 1.0
elif avg_sentence_length < 15:
density_score = avg_sentence_length / 15
else:
density_score = max(0.5, 1.0 - (avg_sentence_length - 25) / 25)
return min(density_score, 1.0)
def _assess_topic_consistency(self, sentences: List[str]) -> float:
"""Assess topic consistency across sentences."""
if len(sentences) < 2:
return 1.0
# Extract key terms from each sentence
sentence_terms = [self._extract_key_terms(sentence) for sentence in sentences]
# Calculate overlap between consecutive sentences
consistency_scores = []
for i in range(len(sentence_terms) - 1):
current_terms = sentence_terms[i]
next_terms = sentence_terms[i + 1]
if current_terms and next_terms:
overlap = len(current_terms.intersection(next_terms))
total = len(current_terms.union(next_terms))
consistency = overlap / total if total > 0 else 0
consistency_scores.append(consistency)
return sum(consistency_scores) / len(consistency_scores) if consistency_scores else 0.5
def _has_clear_conclusion(self, response: str) -> float:
"""Check if response has a clear conclusion."""
conclusion_indicators = [
r"in summary",
r"in conclusion",
r"therefore",
r"as a result",
r"please contact",
r"for more information",
r"if you have questions",
]
response_lower = response.lower()
has_conclusion = any(re.search(pattern, response_lower) for pattern in conclusion_indicators)
return 1.0 if has_conclusion else 0.5
def _assess_citation_quality(self, response: str, sources: List[Dict[str, Any]]) -> float:
"""Assess quality and presence of citations."""
if not sources:
return 0.5
citation_patterns = [
r"\[.*?\]", # [source]
r"\(.*?\)", # (source)
r"according to.*?", # according to X
r"based on.*?", # based on X
r"as stated in.*?", # as stated in X
]
citations_found = sum(len(re.findall(pattern, response, re.IGNORECASE)) for pattern in citation_patterns)
# Score based on citation density
min_citations = self.config["min_citation_count"]
citation_score = min(citations_found / min_citations, 1.0)
return citation_score
def _assess_content_alignment(self, response: str, sources: List[Dict[str, Any]]) -> float:
"""Assess how well response content aligns with sources."""
if not sources:
return 0.5
# Extract content from sources
source_content = " ".join(source.get("content", "") for source in sources).lower()
response_terms = self._extract_key_terms(response)
source_terms = self._extract_key_terms(source_content)
if not source_terms:
return 0.5
# Calculate alignment
alignment = len(response_terms.intersection(source_terms)) / len(response_terms)
return min(alignment, 1.0)
def _assess_source_coverage(self, response: str, sources: List[Dict[str, Any]]) -> float:
"""Assess how many sources are referenced in response."""
response_lower = response.lower()
referenced_sources = 0
for source in sources:
doc_name = source.get("metadata", {}).get("filename", "").lower()
if doc_name and doc_name in response_lower:
referenced_sources += 1
preferred_count = min(self.config["preferred_source_count"], len(sources))
if preferred_count == 0:
return 1.0
coverage = referenced_sources / preferred_count
return min(coverage, 1.0)
def _check_factual_consistency(self, response: str, sources: List[Dict[str, Any]]) -> float:
"""Check factual consistency between response and sources."""
# Simple consistency check (can be enhanced with fact-checking models)
# For now, assume consistency if no obvious contradictions
# Look for absolute statements that might contradict sources
absolute_patterns = [
r"\b(?:never|always|all|none|every|no)\b",
r"\b(?:definitely|certainly|absolutely)\b",
]
absolute_count = sum(len(re.findall(pattern, response, re.IGNORECASE)) for pattern in absolute_patterns)
# Penalize excessive absolute statements
consistency_penalty = min(absolute_count * 0.1, 0.3)
consistency_score = 1.0 - consistency_penalty
return max(consistency_score, 0.0)
def _assess_tone_appropriateness(self, response: str) -> float:
"""Assess appropriateness of tone for corporate communication."""
# Check for appropriate corporate tone indicators
corporate_tone_indicators = [
r"\b(?:recommend|advise|suggest|ensure|comply)\b",
r"\b(?:policy|procedure|guideline|requirement)\b",
r"\b(?:appropriate|professional|please|thank you)\b",
]
tone_score = 0.0
for pattern in corporate_tone_indicators:
matches = len(re.findall(pattern, response, re.IGNORECASE))
tone_score += min(matches * 0.1, 0.3)
return min(tone_score, 1.0)
def _analyze_response_characteristics(self, response: str, sources: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Analyze basic characteristics of the response."""
# Count citations
citation_patterns = [r"\[.*?\]", r"\(.*?\)", r"according to", r"based on"]
citation_count = sum(len(re.findall(pattern, response, re.IGNORECASE)) for pattern in citation_patterns)
return {
"length": len(response),
"word_count": len(response.split()),
"sentence_count": len([s for s in response.split(".") if s.strip()]),
"citation_count": citation_count,
"source_count": len(sources),
}
def _determine_confidence_level(self, overall_score: float, characteristics: Dict[str, Any]) -> str:
"""Determine confidence level based on score and characteristics."""
if overall_score >= 0.8 and characteristics["citation_count"] >= 1:
return "high"
elif overall_score >= 0.6:
return "medium"
else:
return "low"
def _generate_quality_insights(
self,
relevance: float,
completeness: float,
coherence: float,
source_fidelity: float,
professionalism: float,
characteristics: Dict[str, Any],
) -> Tuple[List[str], List[str], List[str]]:
"""Generate strengths, weaknesses, and recommendations."""
strengths = []
weaknesses = []
recommendations = []
# Analyze strengths
if relevance >= 0.8:
strengths.append("Highly relevant to user query")
if completeness >= 0.8:
strengths.append("Comprehensive and complete response")
if coherence >= 0.8:
strengths.append("Well-structured and coherent")
if source_fidelity >= 0.8:
strengths.append("Strong alignment with source documents")
if professionalism >= 0.8:
strengths.append("Professional and appropriate tone")
# Analyze weaknesses
if relevance < 0.6:
weaknesses.append("Limited relevance to user query")
recommendations.append("Ensure response directly addresses the question")
if completeness < 0.6:
weaknesses.append("Incomplete or insufficient information")
recommendations.append("Provide more comprehensive information")
if coherence < 0.6:
weaknesses.append("Poor logical structure or flow")
recommendations.append("Improve logical organization and flow")
if source_fidelity < 0.6:
weaknesses.append("Weak alignment with source documents")
recommendations.append("Include proper citations and source references")
if professionalism < 0.6:
weaknesses.append("Unprofessional tone or language")
recommendations.append("Use more professional and appropriate language")
# Length-based recommendations
if characteristics["length"] < self.config["min_response_length"]:
recommendations.append("Provide more detailed information")
elif characteristics["length"] > self.config["max_response_length"]:
recommendations.append("Consider condensing the response")
return strengths, weaknesses, recommendations