""" Quality Metrics - Response quality scoring algorithms This module provides comprehensive quality assessment for RAG responses including relevance, completeness, coherence, and source fidelity scoring. """ import logging import re from dataclasses import dataclass from typing import Any, Dict, List, Optional, Set, Tuple logger = logging.getLogger(__name__) @dataclass class QualityScore: """Comprehensive quality score for RAG response.""" overall_score: float relevance_score: float completeness_score: float coherence_score: float source_fidelity_score: float professionalism_score: float # Additional metrics response_length: int citation_count: int source_count: int confidence_level: str # "high", "medium", "low" # Quality indicators meets_threshold: bool strengths: List[str] weaknesses: List[str] recommendations: List[str] class QualityMetrics: """ Comprehensive quality assessment system for RAG responses. Provides detailed scoring across multiple dimensions: - Relevance: How well response addresses the query - Completeness: Adequacy of information provided - Coherence: Logical structure and flow - Source Fidelity: Alignment with source documents - Professionalism: Appropriate business tone """ def __init__(self, config: Optional[Dict[str, Any]] = None): """ Initialize QualityMetrics with configuration. Args: config: Configuration dictionary for quality thresholds """ self.config = config or self._get_default_config() logger.info("QualityMetrics initialized") def _get_default_config(self) -> Dict[str, Any]: """Get default quality assessment configuration.""" return { "quality_threshold": 0.7, "relevance_weight": 0.3, "completeness_weight": 0.25, "coherence_weight": 0.2, "source_fidelity_weight": 0.25, "min_response_length": 50, "target_response_length": 300, "max_response_length": 1000, "min_citation_count": 1, "preferred_source_count": 3, "enable_detailed_analysis": True, } def calculate_quality_score( self, response: str, query: str, sources: List[Dict[str, Any]], context: Optional[str] = None, ) -> QualityScore: """ Calculate comprehensive quality score for response. Args: response: Generated response text query: Original user query sources: Source documents used context: Optional additional context Returns: QualityScore with detailed metrics and recommendations """ try: # Calculate individual dimension scores relevance = self._calculate_relevance_score(response, query) completeness = self._calculate_completeness_score(response, query) coherence = self._calculate_coherence_score(response) source_fidelity = self._calculate_source_fidelity_score(response, sources) professionalism = self._calculate_professionalism_score(response) # Calculate weighted overall score overall = self._calculate_overall_score( relevance, completeness, coherence, source_fidelity, professionalism ) # Analyze response characteristics response_analysis = self._analyze_response_characteristics(response, sources) # Determine confidence level confidence_level = self._determine_confidence_level(overall, response_analysis) # Generate insights strengths, weaknesses, recommendations = self._generate_quality_insights( relevance, completeness, coherence, source_fidelity, professionalism, response_analysis, ) return QualityScore( overall_score=overall, relevance_score=relevance, completeness_score=completeness, coherence_score=coherence, source_fidelity_score=source_fidelity, professionalism_score=professionalism, response_length=response_analysis["length"], citation_count=response_analysis["citation_count"], source_count=response_analysis["source_count"], confidence_level=confidence_level, meets_threshold=overall >= self.config["quality_threshold"], strengths=strengths, weaknesses=weaknesses, recommendations=recommendations, ) except Exception as e: logger.error(f"Quality scoring error: {e}") return QualityScore( overall_score=0.0, relevance_score=0.0, completeness_score=0.0, coherence_score=0.0, source_fidelity_score=0.0, professionalism_score=0.0, response_length=len(response), citation_count=0, source_count=len(sources), confidence_level="low", meets_threshold=False, strengths=[], weaknesses=["Error in quality assessment"], recommendations=["Retry quality assessment"], ) def _calculate_relevance_score(self, response: str, query: str) -> float: """Calculate how well response addresses the query.""" if not query.strip(): return 1.0 # No query to compare against # Extract key terms from query query_terms = self._extract_key_terms(query) response_terms = self._extract_key_terms(response) if not query_terms: return 1.0 # Calculate term overlap overlap = len(query_terms.intersection(response_terms)) term_coverage = overlap / len(query_terms) # Check for semantic relevance patterns semantic_relevance = self._check_semantic_relevance(response, query) # Combine scores relevance = (term_coverage * 0.6) + (semantic_relevance * 0.4) return min(relevance, 1.0) def _calculate_completeness_score(self, response: str, query: str) -> float: """Calculate how completely the response addresses the query.""" response_length = len(response) target_length = self.config["target_response_length"] min_length = self.config["min_response_length"] # Length-based completeness if response_length < min_length: length_score = response_length / min_length * 0.5 elif response_length <= target_length: length_score = 0.5 + (response_length - min_length) / (target_length - min_length) * 0.5 else: # Diminishing returns for very long responses excess = response_length - target_length penalty = min(excess / target_length * 0.2, 0.3) length_score = 1.0 - penalty # Structure-based completeness structure_score = self._assess_response_structure(response) # Information density density_score = self._assess_information_density(response, query) # Combine scores completeness = (length_score * 0.4) + (structure_score * 0.3) + (density_score * 0.3) return min(max(completeness, 0.0), 1.0) def _calculate_coherence_score(self, response: str) -> float: """Calculate logical structure and coherence of response.""" sentences = [s.strip() for s in response.split(".") if s.strip()] if len(sentences) < 2: return 0.8 # Short responses are typically coherent # Check for logical flow indicators flow_indicators = [ "however", "therefore", "additionally", "furthermore", "consequently", "moreover", "nevertheless", "in addition", "as a result", "for example", ] response_lower = response.lower() flow_score = sum(1 for indicator in flow_indicators if indicator in response_lower) flow_score = min(flow_score / 3, 1.0) # Normalize # Check for repetition (negative indicator) unique_sentences = len(set(s.lower() for s in sentences)) repetition_score = unique_sentences / len(sentences) # Check for topic consistency consistency_score = self._assess_topic_consistency(sentences) # Check for clear conclusion/summary conclusion_score = self._has_clear_conclusion(response) # Combine scores coherence = flow_score * 0.3 + repetition_score * 0.3 + consistency_score * 0.2 + conclusion_score * 0.2 return min(coherence, 1.0) def _calculate_source_fidelity_score(self, response: str, sources: List[Dict[str, Any]]) -> float: """Calculate alignment between response and source documents.""" if not sources: return 0.5 # Neutral score if no sources # Citation presence and quality citation_score = self._assess_citation_quality(response, sources) # Content alignment with sources alignment_score = self._assess_content_alignment(response, sources) # Source coverage (how many sources are referenced) coverage_score = self._assess_source_coverage(response, sources) # Factual consistency check consistency_score = self._check_factual_consistency(response, sources) # Combine scores fidelity = citation_score * 0.3 + alignment_score * 0.4 + coverage_score * 0.15 + consistency_score * 0.15 return min(fidelity, 1.0) def _calculate_professionalism_score(self, response: str) -> float: """Calculate professional tone and appropriateness.""" # Check for professional language patterns professional_indicators = [ r"\b(?:please|thank you|according to|based on|our policy|guidelines)\b", r"\b(?:recommend|suggest|advise|ensure|confirm)\b", r"\b(?:appropriate|professional|compliance|requirements)\b", ] professional_count = sum( len(re.findall(pattern, response, re.IGNORECASE)) for pattern in professional_indicators ) professional_score = min(professional_count / 3, 1.0) # Check for unprofessional patterns unprofessional_patterns = [ r"\b(?:yo|hey|wassup|gonna|wanna)\b", r"\b(?:lol|omg|wtf|tbh|idk)\b", r"[!]{2,}|[?]{2,}", r"\b(?:stupid|dumb|crazy|insane)\b", ] unprofessional_count = sum( len(re.findall(pattern, response, re.IGNORECASE)) for pattern in unprofessional_patterns ) unprofessional_penalty = min(unprofessional_count * 0.3, 0.8) # Check tone appropriateness tone_score = self._assess_tone_appropriateness(response) # Combine scores professionalism = professional_score + tone_score - unprofessional_penalty return min(max(professionalism, 0.0), 1.0) def _calculate_overall_score( self, relevance: float, completeness: float, coherence: float, source_fidelity: float, professionalism: float, ) -> float: """Calculate weighted overall quality score.""" weights = self.config overall = ( relevance * weights["relevance_weight"] + completeness * weights["completeness_weight"] + coherence * weights["coherence_weight"] + source_fidelity * weights["source_fidelity_weight"] + professionalism * 0.0 # Not weighted in overall for now ) return min(max(overall, 0.0), 1.0) def _extract_key_terms(self, text: str) -> Set[str]: """Extract key terms from text for relevance analysis.""" # Simple keyword extraction (can be enhanced with NLP) words = re.findall(r"\b\w+\b", text.lower()) # Filter out common stop words stop_words = { "the", "a", "an", "and", "or", "but", "in", "on", "at", "to", "for", "of", "with", "by", "from", "up", "about", "into", "through", "during", "before", "after", "above", "below", "between", "among", "is", "are", "was", "were", "be", "been", "being", "have", "has", "had", "do", "does", "did", "will", "would", "could", "should", "may", "might", "can", "what", "where", "when", "why", "how", "this", "that", "these", "those", } return {word for word in words if len(word) > 2 and word not in stop_words} def _check_semantic_relevance(self, response: str, query: str) -> float: """Check semantic relevance between response and query.""" # Look for question-answer patterns query_lower = query.lower() response_lower = response.lower() relevance_patterns = [ (r"\bwhat\b", r"\b(?:is|are|include|involves)\b"), (r"\bhow\b", r"\b(?:by|through|via|process|step)\b"), (r"\bwhen\b", r"\b(?:during|after|before|time|date)\b"), (r"\bwhere\b", r"\b(?:at|in|location|place)\b"), (r"\bwhy\b", r"\b(?:because|due to|reason|purpose)\b"), (r"\bpolicy\b", r"\b(?:policy|guideline|rule|procedure)\b"), ] relevance_score = 0.0 for query_pattern, response_pattern in relevance_patterns: if re.search(query_pattern, query_lower) and re.search(response_pattern, response_lower): relevance_score += 0.2 return min(relevance_score, 1.0) def _assess_response_structure(self, response: str) -> float: """Assess structural completeness of response.""" structure_score = 0.0 # Check for introduction/context intro_patterns = [r"according to", r"based on", r"our policy", r"the guideline"] if any(re.search(pattern, response, re.IGNORECASE) for pattern in intro_patterns): structure_score += 0.3 # Check for main content/explanation if len(response.split(".")) >= 2: structure_score += 0.4 # Check for conclusion/summary conclusion_patterns = [ r"in summary", r"therefore", r"as a result", r"please contact", ] if any(re.search(pattern, response, re.IGNORECASE) for pattern in conclusion_patterns): structure_score += 0.3 return min(structure_score, 1.0) def _assess_information_density(self, response: str, query: str) -> float: """Assess information density relative to query complexity.""" # Simple heuristic based on content richness words = len(response.split()) sentences = len([s for s in response.split(".") if s.strip()]) if sentences == 0: return 0.0 avg_sentence_length = words / sentences # Optimal range: 15-25 words per sentence for policy content if 15 <= avg_sentence_length <= 25: density_score = 1.0 elif avg_sentence_length < 15: density_score = avg_sentence_length / 15 else: density_score = max(0.5, 1.0 - (avg_sentence_length - 25) / 25) return min(density_score, 1.0) def _assess_topic_consistency(self, sentences: List[str]) -> float: """Assess topic consistency across sentences.""" if len(sentences) < 2: return 1.0 # Extract key terms from each sentence sentence_terms = [self._extract_key_terms(sentence) for sentence in sentences] # Calculate overlap between consecutive sentences consistency_scores = [] for i in range(len(sentence_terms) - 1): current_terms = sentence_terms[i] next_terms = sentence_terms[i + 1] if current_terms and next_terms: overlap = len(current_terms.intersection(next_terms)) total = len(current_terms.union(next_terms)) consistency = overlap / total if total > 0 else 0 consistency_scores.append(consistency) return sum(consistency_scores) / len(consistency_scores) if consistency_scores else 0.5 def _has_clear_conclusion(self, response: str) -> float: """Check if response has a clear conclusion.""" conclusion_indicators = [ r"in summary", r"in conclusion", r"therefore", r"as a result", r"please contact", r"for more information", r"if you have questions", ] response_lower = response.lower() has_conclusion = any(re.search(pattern, response_lower) for pattern in conclusion_indicators) return 1.0 if has_conclusion else 0.5 def _assess_citation_quality(self, response: str, sources: List[Dict[str, Any]]) -> float: """Assess quality and presence of citations.""" if not sources: return 0.5 citation_patterns = [ r"\[.*?\]", # [source] r"\(.*?\)", # (source) r"according to.*?", # according to X r"based on.*?", # based on X r"as stated in.*?", # as stated in X ] citations_found = sum(len(re.findall(pattern, response, re.IGNORECASE)) for pattern in citation_patterns) # Score based on citation density min_citations = self.config["min_citation_count"] citation_score = min(citations_found / min_citations, 1.0) return citation_score def _assess_content_alignment(self, response: str, sources: List[Dict[str, Any]]) -> float: """Assess how well response content aligns with sources.""" if not sources: return 0.5 # Extract content from sources source_content = " ".join(source.get("content", "") for source in sources).lower() response_terms = self._extract_key_terms(response) source_terms = self._extract_key_terms(source_content) if not source_terms: return 0.5 # Calculate alignment alignment = len(response_terms.intersection(source_terms)) / len(response_terms) return min(alignment, 1.0) def _assess_source_coverage(self, response: str, sources: List[Dict[str, Any]]) -> float: """Assess how many sources are referenced in response.""" response_lower = response.lower() referenced_sources = 0 for source in sources: doc_name = source.get("metadata", {}).get("filename", "").lower() if doc_name and doc_name in response_lower: referenced_sources += 1 preferred_count = min(self.config["preferred_source_count"], len(sources)) if preferred_count == 0: return 1.0 coverage = referenced_sources / preferred_count return min(coverage, 1.0) def _check_factual_consistency(self, response: str, sources: List[Dict[str, Any]]) -> float: """Check factual consistency between response and sources.""" # Simple consistency check (can be enhanced with fact-checking models) # For now, assume consistency if no obvious contradictions # Look for absolute statements that might contradict sources absolute_patterns = [ r"\b(?:never|always|all|none|every|no)\b", r"\b(?:definitely|certainly|absolutely)\b", ] absolute_count = sum(len(re.findall(pattern, response, re.IGNORECASE)) for pattern in absolute_patterns) # Penalize excessive absolute statements consistency_penalty = min(absolute_count * 0.1, 0.3) consistency_score = 1.0 - consistency_penalty return max(consistency_score, 0.0) def _assess_tone_appropriateness(self, response: str) -> float: """Assess appropriateness of tone for corporate communication.""" # Check for appropriate corporate tone indicators corporate_tone_indicators = [ r"\b(?:recommend|advise|suggest|ensure|comply)\b", r"\b(?:policy|procedure|guideline|requirement)\b", r"\b(?:appropriate|professional|please|thank you)\b", ] tone_score = 0.0 for pattern in corporate_tone_indicators: matches = len(re.findall(pattern, response, re.IGNORECASE)) tone_score += min(matches * 0.1, 0.3) return min(tone_score, 1.0) def _analyze_response_characteristics(self, response: str, sources: List[Dict[str, Any]]) -> Dict[str, Any]: """Analyze basic characteristics of the response.""" # Count citations citation_patterns = [r"\[.*?\]", r"\(.*?\)", r"according to", r"based on"] citation_count = sum(len(re.findall(pattern, response, re.IGNORECASE)) for pattern in citation_patterns) return { "length": len(response), "word_count": len(response.split()), "sentence_count": len([s for s in response.split(".") if s.strip()]), "citation_count": citation_count, "source_count": len(sources), } def _determine_confidence_level(self, overall_score: float, characteristics: Dict[str, Any]) -> str: """Determine confidence level based on score and characteristics.""" if overall_score >= 0.8 and characteristics["citation_count"] >= 1: return "high" elif overall_score >= 0.6: return "medium" else: return "low" def _generate_quality_insights( self, relevance: float, completeness: float, coherence: float, source_fidelity: float, professionalism: float, characteristics: Dict[str, Any], ) -> Tuple[List[str], List[str], List[str]]: """Generate strengths, weaknesses, and recommendations.""" strengths = [] weaknesses = [] recommendations = [] # Analyze strengths if relevance >= 0.8: strengths.append("Highly relevant to user query") if completeness >= 0.8: strengths.append("Comprehensive and complete response") if coherence >= 0.8: strengths.append("Well-structured and coherent") if source_fidelity >= 0.8: strengths.append("Strong alignment with source documents") if professionalism >= 0.8: strengths.append("Professional and appropriate tone") # Analyze weaknesses if relevance < 0.6: weaknesses.append("Limited relevance to user query") recommendations.append("Ensure response directly addresses the question") if completeness < 0.6: weaknesses.append("Incomplete or insufficient information") recommendations.append("Provide more comprehensive information") if coherence < 0.6: weaknesses.append("Poor logical structure or flow") recommendations.append("Improve logical organization and flow") if source_fidelity < 0.6: weaknesses.append("Weak alignment with source documents") recommendations.append("Include proper citations and source references") if professionalism < 0.6: weaknesses.append("Unprofessional tone or language") recommendations.append("Use more professional and appropriate language") # Length-based recommendations if characteristics["length"] < self.config["min_response_length"]: recommendations.append("Provide more detailed information") elif characteristics["length"] > self.config["max_response_length"]: recommendations.append("Consider condensing the response") return strengths, weaknesses, recommendations