Spaces:
Sleeping
Sleeping
| """ | |
| Quality Metrics - Response quality scoring algorithms | |
| This module provides comprehensive quality assessment for RAG responses | |
| including relevance, completeness, coherence, and source fidelity scoring. | |
| """ | |
| import logging | |
| import re | |
| from dataclasses import dataclass | |
| from typing import Any, Dict, List, Optional, Set, Tuple | |
| logger = logging.getLogger(__name__) | |
| class QualityScore: | |
| """Comprehensive quality score for RAG response.""" | |
| overall_score: float | |
| relevance_score: float | |
| completeness_score: float | |
| coherence_score: float | |
| source_fidelity_score: float | |
| professionalism_score: float | |
| # Additional metrics | |
| response_length: int | |
| citation_count: int | |
| source_count: int | |
| confidence_level: str # "high", "medium", "low" | |
| # Quality indicators | |
| meets_threshold: bool | |
| strengths: List[str] | |
| weaknesses: List[str] | |
| recommendations: List[str] | |
| class QualityMetrics: | |
| """ | |
| Comprehensive quality assessment system for RAG responses. | |
| Provides detailed scoring across multiple dimensions: | |
| - Relevance: How well response addresses the query | |
| - Completeness: Adequacy of information provided | |
| - Coherence: Logical structure and flow | |
| - Source Fidelity: Alignment with source documents | |
| - Professionalism: Appropriate business tone | |
| """ | |
| def __init__(self, config: Optional[Dict[str, Any]] = None): | |
| """ | |
| Initialize QualityMetrics with configuration. | |
| Args: | |
| config: Configuration dictionary for quality thresholds | |
| """ | |
| self.config = config or self._get_default_config() | |
| logger.info("QualityMetrics initialized") | |
| def _get_default_config(self) -> Dict[str, Any]: | |
| """Get default quality assessment configuration.""" | |
| return { | |
| "quality_threshold": 0.7, | |
| "relevance_weight": 0.3, | |
| "completeness_weight": 0.25, | |
| "coherence_weight": 0.2, | |
| "source_fidelity_weight": 0.25, | |
| "min_response_length": 50, | |
| "target_response_length": 300, | |
| "max_response_length": 1000, | |
| "min_citation_count": 1, | |
| "preferred_source_count": 3, | |
| "enable_detailed_analysis": True, | |
| } | |
| def calculate_quality_score( | |
| self, | |
| response: str, | |
| query: str, | |
| sources: List[Dict[str, Any]], | |
| context: Optional[str] = None, | |
| ) -> QualityScore: | |
| """ | |
| Calculate comprehensive quality score for response. | |
| Args: | |
| response: Generated response text | |
| query: Original user query | |
| sources: Source documents used | |
| context: Optional additional context | |
| Returns: | |
| QualityScore with detailed metrics and recommendations | |
| """ | |
| try: | |
| # Calculate individual dimension scores | |
| relevance = self._calculate_relevance_score(response, query) | |
| completeness = self._calculate_completeness_score(response, query) | |
| coherence = self._calculate_coherence_score(response) | |
| source_fidelity = self._calculate_source_fidelity_score(response, sources) | |
| professionalism = self._calculate_professionalism_score(response) | |
| # Calculate weighted overall score | |
| overall = self._calculate_overall_score( | |
| relevance, completeness, coherence, source_fidelity, professionalism | |
| ) | |
| # Analyze response characteristics | |
| response_analysis = self._analyze_response_characteristics(response, sources) | |
| # Determine confidence level | |
| confidence_level = self._determine_confidence_level(overall, response_analysis) | |
| # Generate insights | |
| strengths, weaknesses, recommendations = self._generate_quality_insights( | |
| relevance, | |
| completeness, | |
| coherence, | |
| source_fidelity, | |
| professionalism, | |
| response_analysis, | |
| ) | |
| return QualityScore( | |
| overall_score=overall, | |
| relevance_score=relevance, | |
| completeness_score=completeness, | |
| coherence_score=coherence, | |
| source_fidelity_score=source_fidelity, | |
| professionalism_score=professionalism, | |
| response_length=response_analysis["length"], | |
| citation_count=response_analysis["citation_count"], | |
| source_count=response_analysis["source_count"], | |
| confidence_level=confidence_level, | |
| meets_threshold=overall >= self.config["quality_threshold"], | |
| strengths=strengths, | |
| weaknesses=weaknesses, | |
| recommendations=recommendations, | |
| ) | |
| except Exception as e: | |
| logger.error(f"Quality scoring error: {e}") | |
| return QualityScore( | |
| overall_score=0.0, | |
| relevance_score=0.0, | |
| completeness_score=0.0, | |
| coherence_score=0.0, | |
| source_fidelity_score=0.0, | |
| professionalism_score=0.0, | |
| response_length=len(response), | |
| citation_count=0, | |
| source_count=len(sources), | |
| confidence_level="low", | |
| meets_threshold=False, | |
| strengths=[], | |
| weaknesses=["Error in quality assessment"], | |
| recommendations=["Retry quality assessment"], | |
| ) | |
| def _calculate_relevance_score(self, response: str, query: str) -> float: | |
| """Calculate how well response addresses the query.""" | |
| if not query.strip(): | |
| return 1.0 # No query to compare against | |
| # Extract key terms from query | |
| query_terms = self._extract_key_terms(query) | |
| response_terms = self._extract_key_terms(response) | |
| if not query_terms: | |
| return 1.0 | |
| # Calculate term overlap | |
| overlap = len(query_terms.intersection(response_terms)) | |
| term_coverage = overlap / len(query_terms) | |
| # Check for semantic relevance patterns | |
| semantic_relevance = self._check_semantic_relevance(response, query) | |
| # Combine scores | |
| relevance = (term_coverage * 0.6) + (semantic_relevance * 0.4) | |
| return min(relevance, 1.0) | |
| def _calculate_completeness_score(self, response: str, query: str) -> float: | |
| """Calculate how completely the response addresses the query.""" | |
| response_length = len(response) | |
| target_length = self.config["target_response_length"] | |
| min_length = self.config["min_response_length"] | |
| # Length-based completeness | |
| if response_length < min_length: | |
| length_score = response_length / min_length * 0.5 | |
| elif response_length <= target_length: | |
| length_score = 0.5 + (response_length - min_length) / (target_length - min_length) * 0.5 | |
| else: | |
| # Diminishing returns for very long responses | |
| excess = response_length - target_length | |
| penalty = min(excess / target_length * 0.2, 0.3) | |
| length_score = 1.0 - penalty | |
| # Structure-based completeness | |
| structure_score = self._assess_response_structure(response) | |
| # Information density | |
| density_score = self._assess_information_density(response, query) | |
| # Combine scores | |
| completeness = (length_score * 0.4) + (structure_score * 0.3) + (density_score * 0.3) | |
| return min(max(completeness, 0.0), 1.0) | |
| def _calculate_coherence_score(self, response: str) -> float: | |
| """Calculate logical structure and coherence of response.""" | |
| sentences = [s.strip() for s in response.split(".") if s.strip()] | |
| if len(sentences) < 2: | |
| return 0.8 # Short responses are typically coherent | |
| # Check for logical flow indicators | |
| flow_indicators = [ | |
| "however", | |
| "therefore", | |
| "additionally", | |
| "furthermore", | |
| "consequently", | |
| "moreover", | |
| "nevertheless", | |
| "in addition", | |
| "as a result", | |
| "for example", | |
| ] | |
| response_lower = response.lower() | |
| flow_score = sum(1 for indicator in flow_indicators if indicator in response_lower) | |
| flow_score = min(flow_score / 3, 1.0) # Normalize | |
| # Check for repetition (negative indicator) | |
| unique_sentences = len(set(s.lower() for s in sentences)) | |
| repetition_score = unique_sentences / len(sentences) | |
| # Check for topic consistency | |
| consistency_score = self._assess_topic_consistency(sentences) | |
| # Check for clear conclusion/summary | |
| conclusion_score = self._has_clear_conclusion(response) | |
| # Combine scores | |
| coherence = flow_score * 0.3 + repetition_score * 0.3 + consistency_score * 0.2 + conclusion_score * 0.2 | |
| return min(coherence, 1.0) | |
| def _calculate_source_fidelity_score(self, response: str, sources: List[Dict[str, Any]]) -> float: | |
| """Calculate alignment between response and source documents.""" | |
| if not sources: | |
| return 0.5 # Neutral score if no sources | |
| # Citation presence and quality | |
| citation_score = self._assess_citation_quality(response, sources) | |
| # Content alignment with sources | |
| alignment_score = self._assess_content_alignment(response, sources) | |
| # Source coverage (how many sources are referenced) | |
| coverage_score = self._assess_source_coverage(response, sources) | |
| # Factual consistency check | |
| consistency_score = self._check_factual_consistency(response, sources) | |
| # Combine scores | |
| fidelity = citation_score * 0.3 + alignment_score * 0.4 + coverage_score * 0.15 + consistency_score * 0.15 | |
| return min(fidelity, 1.0) | |
| def _calculate_professionalism_score(self, response: str) -> float: | |
| """Calculate professional tone and appropriateness.""" | |
| # Check for professional language patterns | |
| professional_indicators = [ | |
| r"\b(?:please|thank you|according to|based on|our policy|guidelines)\b", | |
| r"\b(?:recommend|suggest|advise|ensure|confirm)\b", | |
| r"\b(?:appropriate|professional|compliance|requirements)\b", | |
| ] | |
| professional_count = sum( | |
| len(re.findall(pattern, response, re.IGNORECASE)) for pattern in professional_indicators | |
| ) | |
| professional_score = min(professional_count / 3, 1.0) | |
| # Check for unprofessional patterns | |
| unprofessional_patterns = [ | |
| r"\b(?:yo|hey|wassup|gonna|wanna)\b", | |
| r"\b(?:lol|omg|wtf|tbh|idk)\b", | |
| r"[!]{2,}|[?]{2,}", | |
| r"\b(?:stupid|dumb|crazy|insane)\b", | |
| ] | |
| unprofessional_count = sum( | |
| len(re.findall(pattern, response, re.IGNORECASE)) for pattern in unprofessional_patterns | |
| ) | |
| unprofessional_penalty = min(unprofessional_count * 0.3, 0.8) | |
| # Check tone appropriateness | |
| tone_score = self._assess_tone_appropriateness(response) | |
| # Combine scores | |
| professionalism = professional_score + tone_score - unprofessional_penalty | |
| return min(max(professionalism, 0.0), 1.0) | |
| def _calculate_overall_score( | |
| self, | |
| relevance: float, | |
| completeness: float, | |
| coherence: float, | |
| source_fidelity: float, | |
| professionalism: float, | |
| ) -> float: | |
| """Calculate weighted overall quality score.""" | |
| weights = self.config | |
| overall = ( | |
| relevance * weights["relevance_weight"] | |
| + completeness * weights["completeness_weight"] | |
| + coherence * weights["coherence_weight"] | |
| + source_fidelity * weights["source_fidelity_weight"] | |
| + professionalism * 0.0 # Not weighted in overall for now | |
| ) | |
| return min(max(overall, 0.0), 1.0) | |
| def _extract_key_terms(self, text: str) -> Set[str]: | |
| """Extract key terms from text for relevance analysis.""" | |
| # Simple keyword extraction (can be enhanced with NLP) | |
| words = re.findall(r"\b\w+\b", text.lower()) | |
| # Filter out common stop words | |
| stop_words = { | |
| "the", | |
| "a", | |
| "an", | |
| "and", | |
| "or", | |
| "but", | |
| "in", | |
| "on", | |
| "at", | |
| "to", | |
| "for", | |
| "of", | |
| "with", | |
| "by", | |
| "from", | |
| "up", | |
| "about", | |
| "into", | |
| "through", | |
| "during", | |
| "before", | |
| "after", | |
| "above", | |
| "below", | |
| "between", | |
| "among", | |
| "is", | |
| "are", | |
| "was", | |
| "were", | |
| "be", | |
| "been", | |
| "being", | |
| "have", | |
| "has", | |
| "had", | |
| "do", | |
| "does", | |
| "did", | |
| "will", | |
| "would", | |
| "could", | |
| "should", | |
| "may", | |
| "might", | |
| "can", | |
| "what", | |
| "where", | |
| "when", | |
| "why", | |
| "how", | |
| "this", | |
| "that", | |
| "these", | |
| "those", | |
| } | |
| return {word for word in words if len(word) > 2 and word not in stop_words} | |
| def _check_semantic_relevance(self, response: str, query: str) -> float: | |
| """Check semantic relevance between response and query.""" | |
| # Look for question-answer patterns | |
| query_lower = query.lower() | |
| response_lower = response.lower() | |
| relevance_patterns = [ | |
| (r"\bwhat\b", r"\b(?:is|are|include|involves)\b"), | |
| (r"\bhow\b", r"\b(?:by|through|via|process|step)\b"), | |
| (r"\bwhen\b", r"\b(?:during|after|before|time|date)\b"), | |
| (r"\bwhere\b", r"\b(?:at|in|location|place)\b"), | |
| (r"\bwhy\b", r"\b(?:because|due to|reason|purpose)\b"), | |
| (r"\bpolicy\b", r"\b(?:policy|guideline|rule|procedure)\b"), | |
| ] | |
| relevance_score = 0.0 | |
| for query_pattern, response_pattern in relevance_patterns: | |
| if re.search(query_pattern, query_lower) and re.search(response_pattern, response_lower): | |
| relevance_score += 0.2 | |
| return min(relevance_score, 1.0) | |
| def _assess_response_structure(self, response: str) -> float: | |
| """Assess structural completeness of response.""" | |
| structure_score = 0.0 | |
| # Check for introduction/context | |
| intro_patterns = [r"according to", r"based on", r"our policy", r"the guideline"] | |
| if any(re.search(pattern, response, re.IGNORECASE) for pattern in intro_patterns): | |
| structure_score += 0.3 | |
| # Check for main content/explanation | |
| if len(response.split(".")) >= 2: | |
| structure_score += 0.4 | |
| # Check for conclusion/summary | |
| conclusion_patterns = [ | |
| r"in summary", | |
| r"therefore", | |
| r"as a result", | |
| r"please contact", | |
| ] | |
| if any(re.search(pattern, response, re.IGNORECASE) for pattern in conclusion_patterns): | |
| structure_score += 0.3 | |
| return min(structure_score, 1.0) | |
| def _assess_information_density(self, response: str, query: str) -> float: | |
| """Assess information density relative to query complexity.""" | |
| # Simple heuristic based on content richness | |
| words = len(response.split()) | |
| sentences = len([s for s in response.split(".") if s.strip()]) | |
| if sentences == 0: | |
| return 0.0 | |
| avg_sentence_length = words / sentences | |
| # Optimal range: 15-25 words per sentence for policy content | |
| if 15 <= avg_sentence_length <= 25: | |
| density_score = 1.0 | |
| elif avg_sentence_length < 15: | |
| density_score = avg_sentence_length / 15 | |
| else: | |
| density_score = max(0.5, 1.0 - (avg_sentence_length - 25) / 25) | |
| return min(density_score, 1.0) | |
| def _assess_topic_consistency(self, sentences: List[str]) -> float: | |
| """Assess topic consistency across sentences.""" | |
| if len(sentences) < 2: | |
| return 1.0 | |
| # Extract key terms from each sentence | |
| sentence_terms = [self._extract_key_terms(sentence) for sentence in sentences] | |
| # Calculate overlap between consecutive sentences | |
| consistency_scores = [] | |
| for i in range(len(sentence_terms) - 1): | |
| current_terms = sentence_terms[i] | |
| next_terms = sentence_terms[i + 1] | |
| if current_terms and next_terms: | |
| overlap = len(current_terms.intersection(next_terms)) | |
| total = len(current_terms.union(next_terms)) | |
| consistency = overlap / total if total > 0 else 0 | |
| consistency_scores.append(consistency) | |
| return sum(consistency_scores) / len(consistency_scores) if consistency_scores else 0.5 | |
| def _has_clear_conclusion(self, response: str) -> float: | |
| """Check if response has a clear conclusion.""" | |
| conclusion_indicators = [ | |
| r"in summary", | |
| r"in conclusion", | |
| r"therefore", | |
| r"as a result", | |
| r"please contact", | |
| r"for more information", | |
| r"if you have questions", | |
| ] | |
| response_lower = response.lower() | |
| has_conclusion = any(re.search(pattern, response_lower) for pattern in conclusion_indicators) | |
| return 1.0 if has_conclusion else 0.5 | |
| def _assess_citation_quality(self, response: str, sources: List[Dict[str, Any]]) -> float: | |
| """Assess quality and presence of citations.""" | |
| if not sources: | |
| return 0.5 | |
| citation_patterns = [ | |
| r"\[.*?\]", # [source] | |
| r"\(.*?\)", # (source) | |
| r"according to.*?", # according to X | |
| r"based on.*?", # based on X | |
| r"as stated in.*?", # as stated in X | |
| ] | |
| citations_found = sum(len(re.findall(pattern, response, re.IGNORECASE)) for pattern in citation_patterns) | |
| # Score based on citation density | |
| min_citations = self.config["min_citation_count"] | |
| citation_score = min(citations_found / min_citations, 1.0) | |
| return citation_score | |
| def _assess_content_alignment(self, response: str, sources: List[Dict[str, Any]]) -> float: | |
| """Assess how well response content aligns with sources.""" | |
| if not sources: | |
| return 0.5 | |
| # Extract content from sources | |
| source_content = " ".join(source.get("content", "") for source in sources).lower() | |
| response_terms = self._extract_key_terms(response) | |
| source_terms = self._extract_key_terms(source_content) | |
| if not source_terms: | |
| return 0.5 | |
| # Calculate alignment | |
| alignment = len(response_terms.intersection(source_terms)) / len(response_terms) | |
| return min(alignment, 1.0) | |
| def _assess_source_coverage(self, response: str, sources: List[Dict[str, Any]]) -> float: | |
| """Assess how many sources are referenced in response.""" | |
| response_lower = response.lower() | |
| referenced_sources = 0 | |
| for source in sources: | |
| doc_name = source.get("metadata", {}).get("filename", "").lower() | |
| if doc_name and doc_name in response_lower: | |
| referenced_sources += 1 | |
| preferred_count = min(self.config["preferred_source_count"], len(sources)) | |
| if preferred_count == 0: | |
| return 1.0 | |
| coverage = referenced_sources / preferred_count | |
| return min(coverage, 1.0) | |
| def _check_factual_consistency(self, response: str, sources: List[Dict[str, Any]]) -> float: | |
| """Check factual consistency between response and sources.""" | |
| # Simple consistency check (can be enhanced with fact-checking models) | |
| # For now, assume consistency if no obvious contradictions | |
| # Look for absolute statements that might contradict sources | |
| absolute_patterns = [ | |
| r"\b(?:never|always|all|none|every|no)\b", | |
| r"\b(?:definitely|certainly|absolutely)\b", | |
| ] | |
| absolute_count = sum(len(re.findall(pattern, response, re.IGNORECASE)) for pattern in absolute_patterns) | |
| # Penalize excessive absolute statements | |
| consistency_penalty = min(absolute_count * 0.1, 0.3) | |
| consistency_score = 1.0 - consistency_penalty | |
| return max(consistency_score, 0.0) | |
| def _assess_tone_appropriateness(self, response: str) -> float: | |
| """Assess appropriateness of tone for corporate communication.""" | |
| # Check for appropriate corporate tone indicators | |
| corporate_tone_indicators = [ | |
| r"\b(?:recommend|advise|suggest|ensure|comply)\b", | |
| r"\b(?:policy|procedure|guideline|requirement)\b", | |
| r"\b(?:appropriate|professional|please|thank you)\b", | |
| ] | |
| tone_score = 0.0 | |
| for pattern in corporate_tone_indicators: | |
| matches = len(re.findall(pattern, response, re.IGNORECASE)) | |
| tone_score += min(matches * 0.1, 0.3) | |
| return min(tone_score, 1.0) | |
| def _analyze_response_characteristics(self, response: str, sources: List[Dict[str, Any]]) -> Dict[str, Any]: | |
| """Analyze basic characteristics of the response.""" | |
| # Count citations | |
| citation_patterns = [r"\[.*?\]", r"\(.*?\)", r"according to", r"based on"] | |
| citation_count = sum(len(re.findall(pattern, response, re.IGNORECASE)) for pattern in citation_patterns) | |
| return { | |
| "length": len(response), | |
| "word_count": len(response.split()), | |
| "sentence_count": len([s for s in response.split(".") if s.strip()]), | |
| "citation_count": citation_count, | |
| "source_count": len(sources), | |
| } | |
| def _determine_confidence_level(self, overall_score: float, characteristics: Dict[str, Any]) -> str: | |
| """Determine confidence level based on score and characteristics.""" | |
| if overall_score >= 0.8 and characteristics["citation_count"] >= 1: | |
| return "high" | |
| elif overall_score >= 0.6: | |
| return "medium" | |
| else: | |
| return "low" | |
| def _generate_quality_insights( | |
| self, | |
| relevance: float, | |
| completeness: float, | |
| coherence: float, | |
| source_fidelity: float, | |
| professionalism: float, | |
| characteristics: Dict[str, Any], | |
| ) -> Tuple[List[str], List[str], List[str]]: | |
| """Generate strengths, weaknesses, and recommendations.""" | |
| strengths = [] | |
| weaknesses = [] | |
| recommendations = [] | |
| # Analyze strengths | |
| if relevance >= 0.8: | |
| strengths.append("Highly relevant to user query") | |
| if completeness >= 0.8: | |
| strengths.append("Comprehensive and complete response") | |
| if coherence >= 0.8: | |
| strengths.append("Well-structured and coherent") | |
| if source_fidelity >= 0.8: | |
| strengths.append("Strong alignment with source documents") | |
| if professionalism >= 0.8: | |
| strengths.append("Professional and appropriate tone") | |
| # Analyze weaknesses | |
| if relevance < 0.6: | |
| weaknesses.append("Limited relevance to user query") | |
| recommendations.append("Ensure response directly addresses the question") | |
| if completeness < 0.6: | |
| weaknesses.append("Incomplete or insufficient information") | |
| recommendations.append("Provide more comprehensive information") | |
| if coherence < 0.6: | |
| weaknesses.append("Poor logical structure or flow") | |
| recommendations.append("Improve logical organization and flow") | |
| if source_fidelity < 0.6: | |
| weaknesses.append("Weak alignment with source documents") | |
| recommendations.append("Include proper citations and source references") | |
| if professionalism < 0.6: | |
| weaknesses.append("Unprofessional tone or language") | |
| recommendations.append("Use more professional and appropriate language") | |
| # Length-based recommendations | |
| if characteristics["length"] < self.config["min_response_length"]: | |
| recommendations.append("Provide more detailed information") | |
| elif characteristics["length"] > self.config["max_response_length"]: | |
| recommendations.append("Consider condensing the response") | |
| return strengths, weaknesses, recommendations | |