File size: 14,107 Bytes
135f0d6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
159faf0
135f0d6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
159faf0
 
135f0d6
 
 
 
 
 
 
 
 
 
 
 
 
159faf0
135f0d6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
159faf0
135f0d6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
159faf0
135f0d6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
159faf0
135f0d6
 
 
 
 
 
 
 
 
 
159faf0
135f0d6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
159faf0
135f0d6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
159faf0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
"""
Source Attribution - Citation and source tracking system

This module manages citation generation, source ranking, and quote extraction
for RAG responses with proper source attribution.
"""

import logging
import re
from dataclasses import dataclass
from typing import Any, Dict, List, Optional

logger = logging.getLogger(__name__)


@dataclass
class Citation:
    """Structured citation for source attribution."""

    document: str
    section: Optional[str] = None
    confidence: float = 0.0
    excerpt: str = ""
    page: Optional[int] = None
    url: Optional[str] = None


@dataclass
class Quote:
    """Extracted quote from source document."""

    text: str
    source_document: str
    relevance_score: float
    context_before: str = ""
    context_after: str = ""
    section: Optional[str] = None


@dataclass
class RankedSource:
    """Source document with ranking and metadata."""

    document: str
    relevance_score: float
    reliability_score: float
    excerpt: str
    metadata: Dict[str, Any]
    rank: int = 0


class SourceAttributor:
    """
    Manages citation generation and source tracking for RAG responses.

    Provides:
    - Structured citation formatting
    - Source ranking by relevance and reliability
    - Quote extraction from source documents
    - Citation validation and verification
    """

    def __init__(self, config: Optional[Dict[str, Any]] = None):
        """
        Initialize SourceAttributor with configuration.

        Args:
            config: Configuration dictionary for attribution settings
        """
        self.config = config or self._get_default_config()
        logger.info("SourceAttributor initialized")

    def _get_default_config(self) -> Dict[str, Any]:
        """Get default attribution configuration."""
        return {
            "max_citations": 5,
            "min_confidence_for_citation": 0.3,
            "citation_format": "numbered",  # "numbered", "parenthetical", "footnote"
            "include_excerpts": True,
            "max_excerpt_length": 150,
            "require_document_names": True,
            "prefer_specific_sections": True,
        }

    def generate_citations(self, response: str, sources: List[Dict[str, Any]]) -> List[Citation]:
        """
        Generate proper citations for response based on sources.

        Args:
            response: Generated response text
            sources: Source documents with metadata

        Returns:
            List of Citation objects for the response
        """
        try:
            citations = []

            # Rank sources by relevance and reliability
            ranked_sources = self.rank_sources(sources, [])

            # Generate citations for top sources
            for i, ranked_source in enumerate(ranked_sources[: self.config["max_citations"]]):
                if ranked_source.relevance_score >= self.config["min_confidence_for_citation"]:
                    citation = self._create_citation(ranked_source, i + 1)
                    citations.append(citation)

            # Ensure citations are properly embedded in response
            self._validate_citation_presence(response, citations)

            logger.debug(f"Generated {len(citations)} citations")
            return citations

        except Exception as e:
            logger.error(f"Citation generation error: {e}")
            return []

    def extract_quotes(self, response: str, documents: List[Dict[str, Any]]) -> List[Quote]:
        """
        Extract relevant quotes from source documents.

        Args:
            response: Generated response text
            documents: Source documents to extract quotes from

        Returns:
            List of Quote objects with extracted text
        """
        try:
            quotes = []

            for doc in documents:
                content = doc.get("content", "")
                document_name = doc.get("metadata", {}).get("filename", "unknown")

                # Find quotes that appear in both response and document
                extracted_quotes = self._find_matching_quotes(response, content)

                for quote_text in extracted_quotes:
                    relevance = self._calculate_quote_relevance(quote_text, response)

                    quote = Quote(
                        text=quote_text,
                        source_document=document_name,
                        relevance_score=relevance,
                        section=doc.get("metadata", {}).get("section"),
                    )
                    quotes.append(quote)

            # Sort by relevance
            quotes.sort(key=lambda q: q.relevance_score, reverse=True)

            logger.debug(f"Extracted {len(quotes)} quotes")
            return quotes

        except Exception as e:
            logger.error(f"Quote extraction error: {e}")
            return []

    def rank_sources(self, sources: List[Dict[str, Any]], relevance_scores: List[float]) -> List[RankedSource]:
        """
        Rank sources by relevance and reliability.

        Args:
            sources: Source documents with metadata
            relevance_scores: Pre-calculated relevance scores (optional)

        Returns:
            List of RankedSource objects sorted by ranking
        """
        try:
            ranked_sources = []

            for i, source in enumerate(sources):
                # Use provided relevance or calculate
                if i < len(relevance_scores):
                    relevance = relevance_scores[i]
                else:
                    relevance = source.get("relevance_score", 0.5)

                # Calculate reliability score
                reliability = self._calculate_reliability(source)

                # Create ranked source
                ranked_source = RankedSource(
                    document=source.get("metadata", {}).get("filename", "unknown"),
                    relevance_score=relevance,
                    reliability_score=reliability,
                    excerpt=self._create_excerpt(source),
                    metadata=source.get("metadata", {}),
                )

                ranked_sources.append(ranked_source)

            # Sort by combined score (relevance + reliability)
            ranked_sources.sort(
                key=lambda rs: (rs.relevance_score + rs.reliability_score) / 2,
                reverse=True,
            )

            # Assign ranks
            for i, ranked_source in enumerate(ranked_sources):
                ranked_source.rank = i + 1

            logger.debug(f"Ranked {len(ranked_sources)} sources")
            return ranked_sources

        except Exception as e:
            logger.error(f"Source ranking error: {e}")
            return []

    def format_citation_text(self, citations: List[Citation]) -> str:
        """
        Format citations as text for inclusion in response.

        Args:
            citations: List of Citation objects

        Returns:
            Formatted citation text
        """
        if not citations:
            return ""

        citation_format = self.config["citation_format"]

        if citation_format == "numbered":
            return self._format_numbered_citations(citations)
        elif citation_format == "parenthetical":
            return self._format_parenthetical_citations(citations)
        elif citation_format == "footnote":
            return self._format_footnote_citations(citations)
        else:
            return self._format_numbered_citations(citations)

    def validate_citations(self, response: str, citations: List[Citation]) -> Dict[str, bool]:
        """
        Validate that citations are properly referenced in response.

        Args:
            response: Response text to validate
            citations: Citations that should be referenced

        Returns:
            Dictionary mapping citation to validation status
        """
        validation_results = {}

        for citation in citations:
            is_valid = self._is_citation_referenced(response, citation)
            validation_results[citation.document] = is_valid

        return validation_results

    def _create_citation(self, ranked_source: RankedSource, number: int) -> Citation:
        """Create Citation object from ranked source."""
        return Citation(
            document=ranked_source.document,
            section=ranked_source.metadata.get("section"),
            confidence=ranked_source.relevance_score,
            excerpt=ranked_source.excerpt,
            page=ranked_source.metadata.get("page"),
            url=ranked_source.metadata.get("url"),
        )

    def _calculate_reliability(self, source: Dict[str, Any]) -> float:
        """Calculate reliability score for source document."""
        # Base reliability
        reliability = 0.7

        # Boost for official documents
        filename = source.get("metadata", {}).get("filename", "").lower()
        if any(term in filename for term in ["policy", "handbook", "guideline", "procedure", "manual"]):
            reliability += 0.2

        # Boost for recent documents (if timestamp available)
        # This would need timestamp metadata
        # if 'last_modified' in source.get('metadata', {}):
        #     # Add recency bonus
        #     pass

        # Boost for documents with clear structure
        content = source.get("content", "")
        if any(marker in content.lower() for marker in ["section", "article", "paragraph", "clause"]):
            reliability += 0.1

        return min(reliability, 1.0)

    def _create_excerpt(self, source: Dict[str, Any]) -> str:
        """Create excerpt from source document."""
        content = source.get("content", "")
        max_length = self.config["max_excerpt_length"]

        if len(content) <= max_length:
            return content

        # Try to find a good breaking point
        excerpt = content[:max_length]
        last_sentence = excerpt.rfind(".")
        last_space = excerpt.rfind(" ")

        if last_sentence > max_length * 0.7:
            return excerpt[: last_sentence + 1]
        elif last_space > max_length * 0.8:
            return excerpt[:last_space] + "..."
        else:
            return excerpt + "..."

    def _find_matching_quotes(self, response: str, document_content: str) -> List[str]:
        """Find quotes that appear in both response and document."""
        quotes = []

        # Look for phrases that appear in both
        response_sentences = [s.strip() for s in response.split(".") if s.strip()]
        doc_sentences = [s.strip() for s in document_content.split(".") if s.strip()]

        for resp_sent in response_sentences:
            for doc_sent in doc_sentences:
                # Check for substantial overlap
                if len(resp_sent) > 20 and len(doc_sent) > 20:
                    if self._calculate_sentence_similarity(resp_sent, doc_sent) > 0.7:
                        quotes.append(doc_sent)

        return list(set(quotes))  # Remove duplicates

    def _calculate_sentence_similarity(self, sent1: str, sent2: str) -> float:
        """Calculate similarity between two sentences."""
        words1 = set(sent1.lower().split())
        words2 = set(sent2.lower().split())

        intersection = words1.intersection(words2)
        union = words1.union(words2)

        if not union:
            return 0.0

        return len(intersection) / len(union)

    def _calculate_quote_relevance(self, quote: str, response: str) -> float:
        """Calculate relevance of quote to response."""
        return self._calculate_sentence_similarity(quote, response)

    def _validate_citation_presence(self, response: str, citations: List[Citation]) -> None:
        """Validate that citations are present in response."""
        if not self.config["require_document_names"]:
            return

        for citation in citations:
            if citation.document.lower() not in response.lower():
                logger.warning(f"Citation {citation.document} not found in response")

    def _format_numbered_citations(self, citations: List[Citation]) -> str:
        """Format citations in numbered format."""
        if not citations:
            return ""

        formatted = "\n\n**Sources:**\n"
        for i, citation in enumerate(citations, 1):
            formatted += f"{i}. {citation.document}"
            if citation.section:
                formatted += f" ({citation.section})"
            if self.config["include_excerpts"] and citation.excerpt:
                formatted += f'\n   "{citation.excerpt}"'
            formatted += "\n"

        return formatted

    def _format_parenthetical_citations(self, citations: List[Citation]) -> str:
        """Format citations in parenthetical format."""
        if not citations:
            return ""

        # Simple format: (Document1, Document2)
        doc_names = [citation.document for citation in citations]
        return f" ({', '.join(doc_names)})"

    def _format_footnote_citations(self, citations: List[Citation]) -> str:
        """Format citations as footnotes."""
        if not citations:
            return ""

        formatted = "\n\n**References:**\n"
        for i, citation in enumerate(citations, 1):
            formatted += f"[{i}] {citation.document}"
            if citation.section:
                formatted += f", {citation.section}"
            formatted += "\n"

        return formatted

    def _is_citation_referenced(self, response: str, citation: Citation) -> bool:
        """Check if citation is properly referenced in response."""
        response_lower = response.lower()
        doc_name_lower = citation.document.lower()

        # Look for document name mentions
        if doc_name_lower in response_lower:
            return True

        # Look for citation patterns
        citation_patterns = [
            rf"\[.*{re.escape(citation.document)}.*\]",
            rf"\(.*{re.escape(citation.document)}.*\)",
        ]

        return any(re.search(pattern, response, re.IGNORECASE) for pattern in citation_patterns)