Spaces:
Sleeping
Sleeping
File size: 3,348 Bytes
7f22d3c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 |
"""
robots.txt 支持模块
"""
import asyncio
import aiohttp
import logging
from urllib.parse import urlparse
from urllib.robotparser import RobotFileParser
from typing import Dict, Optional
import time
logger = logging.getLogger(__name__)
class RobotsChecker:
"""robots.txt 检查器(异步版本)"""
def __init__(self, user_agent: str = '*', timeout: int = 5):
"""
初始化robots.txt检查器
Args:
user_agent: User-Agent字符串(默认'*'表示所有)
timeout: 请求超时时间(秒)
"""
self.user_agent = user_agent
self.timeout = timeout
self._cache: Dict[str, tuple[RobotFileParser, float]] = {}
self._cache_ttl = 3600 # 缓存1小时
async def can_fetch(self, url: str) -> bool:
"""
检查是否可以爬取指定URL
Args:
url: 要检查的URL
Returns:
是否可以爬取
"""
parsed = urlparse(url)
robots_url = f"{parsed.scheme}://{parsed.netloc}/robots.txt"
# 检查缓存
if robots_url in self._cache:
rp, cached_time = self._cache[robots_url]
if time.time() - cached_time < self._cache_ttl:
return rp.can_fetch(self.user_agent, url)
# 获取robots.txt
rp = await self._fetch_robots(robots_url)
if rp is None:
# 如果无法获取robots.txt,默认允许爬取
return True
# 缓存结果
self._cache[robots_url] = (rp, time.time())
return rp.can_fetch(self.user_agent, url)
async def _fetch_robots(self, robots_url: str) -> Optional[RobotFileParser]:
"""
获取并解析robots.txt
Args:
robots_url: robots.txt的URL
Returns:
RobotFileParser对象,如果失败则返回None
"""
try:
timeout = aiohttp.ClientTimeout(total=self.timeout)
async with aiohttp.ClientSession(timeout=timeout) as session:
async with session.get(robots_url) as response:
if response.status == 200:
content = await response.text()
# 手动解析robots.txt内容
rp = RobotFileParser()
rp.set_url(robots_url)
# 手动解析内容
lines = content.splitlines()
for line in lines:
# 忽略空行和注释
line = line.strip()
if not line or line.startswith('#'):
continue
rp.feed(line)
return rp
else:
# 如果没有robots.txt(404),默认允许
return None
except Exception as e:
# 如果出错,默认允许爬取(不阻塞)
logger = logging.getLogger(__name__)
logger.debug(f"Failed to fetch robots.txt from {robots_url}: {e}")
return None
def clear_cache(self):
"""清空缓存"""
self._cache.clear()
|