智能化数据抓取与结构化解析指南

摘要

本文详细介绍了如何使用 Python 的 requests 库结合自动解析工具与生成式 AI 技术,实现智能化的数据抓取与结构化解析。首先,我们讲解基本的 HTTP 请求流程与静态页面解析方法;随后,介绍 BeautifulSouplxml 等解析库的使用;接着,引入 AutoScraper、Prompt Engineering 与大型语言模型(LLM)等 AI 驱动方案,处理更复杂或动态加载的网页内容;最后,通过示例代码演示完整的从请求、解析到智能化处理的端到端流程,特别关注如何解析不规则网站结构并实现自适应抓取。

1. 引言

Web 抓取(Web scraping)是从网站提取数据的过程,通常通过发送 HTTP 请求并解析响应的 HTML 内容来实现。Web 抓取可以替代手动复制粘贴,提高数据收集效率和准确性。

随着互联网技术的发展,网站结构日益复杂化和多样化,传统的定向抓取方法往往需要为每个网站编写特定的解析规则,维护成本高且适应性差。同时,许多网站采用了动态加载、反爬虫技术和不规则布局,进一步增加了数据抓取的难度。

随着 AI 技术特别是生成式 AI(Generative AI)和大模型(LLM)的发展,研究者开始探索将 Prompt Engineering 与机器学习技术结合,实现对任意网页结构的自适应解析和抓取,从而迈向"智能化"抓取。这种方法能够自动识别数据模式,适应网页结构变化,大幅降低人工干预需求,提高抓取系统的通用性和鲁棒性。

2. 工具与库

2.1 HTTP 请求:requests

  • requests 是 Python 中最流行的 HTTP 客户端库,能够简洁地发送 GET/POST 等请求,并自动处理 cookies、编码等细节。
  • 安装:
pip install requests
  • 基本使用:
import requests

# 基本 GET 请求
response = requests.get("https://example.com")

# 带参数的 GET 请求
params = {"page": 1, "count": 10}
response = requests.get("https://api.example.com/items", params=params)

# POST 请求
data = {"username": "test", "password": "password"}
response = requests.post("https://api.example.com/login", data=data)

# 设置请求头
headers = {
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
    "Referer": "https://example.com"
}
response = requests.get("https://example.com", headers=headers)

# 处理 Cookie
session = requests.Session()
session.get("https://example.com/login")  # 获取初始 Cookie
session.post("https://example.com/login", data=login_data)  # 登录并更新 Cookie
authenticated_response = session.get("https://example.com/protected")  # 使用同一会话访问

2.2 静态解析:BeautifulSouplxml

  • BeautifulSoup:通过 DOM 树对 HTML 进行遍历与查找,API 简洁。
  • lxml:基于 C 语言库,解析速度更快,支持 XPath 查询。安装:
pip install beautifulsoup4 lxml
  • BeautifulSoup 基本用法:
from bs4 import BeautifulSoup

# 创建 BeautifulSoup 对象
soup = BeautifulSoup(html_content, "lxml")  # 使用 lxml 解析器

# 1. 按标签查找
title = soup.title
paragraphs = soup.find_all("p")

# 2. 按类名查找
content_div = soup.find("div", class_="content")
items = soup.select(".item")  # CSS 选择器

# 3. 按 ID 查找
header = soup.find(id="header")
footer = soup.select_one("#footer")  # 只返回第一个匹配元素

# 4. 综合查询
links = soup.find_all("a", href=lambda x: x and "http" in x)
  • XPath 与 lxml 用法:
from lxml import etree

# 创建 lxml 对象
html = etree.HTML(html_content)

# 使用 XPath 查询
# 1. 查找所有链接
links = html.xpath("//a/@href")

# 2. 查找特定类的元素
products = html.xpath('//div[@class="product"]')

# 3. 复杂条件查询
items = html.xpath('//div[contains(@class, "item") and ./span[@class="price"]]')

# 4. 获取文本内容
titles = html.xpath('//h2[@class="title"]/text()')

2.3 AI 驱动抓取:AutoScraper

  • AutoScraper 是一个自动化学习抓取规则的工具,只需提供示例 URL 和目标数据样本,便可自动生成可复用的抓取器。
  • 安装与使用示例:
pip install autoscraper
from autoscraper import AutoScraper

# 初始化并学习抓取规则
url = "https://example.com"
wanted_list = ["示例数据1", "示例数据2"]
scraper = AutoScraper()
scraper.build(url, wanted_list)

# 应用到相似页面
results = scraper.get_result_similar("https://example.com/page2")

# 保存模型以便复用
scraper.save("my_scraper_model")

# 加载已有模型
loaded_scraper = AutoScraper().load("my_scraper_model")

2.4 动态内容抓取:Selenium 与 undetected_chromedriver

  • 当网页依赖 JavaScript 渲染时,静态请求无法获取完整内容,此时可使用 Selenium 驱动真实浏览器或无头浏览器。
  • 对抗反爬虫:可结合 undetected_chromedriver 和伪装 User-Agent 等手段提高稳健性。
pip install selenium undetected-chromedriver webdriver-manager
import undetected_chromedriver as uc
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from webdriver_manager.chrome import ChromeDriverManager

# 使用 undetected_chromedriver 创建浏览器实例
driver = uc.Chrome()

# 或使用标准 Selenium 配置
options = webdriver.ChromeOptions()
options.add_argument("--headless")  # 无头模式
options.add_argument("--disable-gpu")
options.add_argument("--no-sandbox")
options.add_argument("--disable-dev-shm-usage")
options.add_argument("--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64)")

driver = webdriver.Chrome(
    service=Service(ChromeDriverManager().install()),
    options=options
)

# 访问页面
driver.get("https://example.com")

# 等待元素加载
WebDriverWait(driver, 10).until(
    EC.presence_of_element_located((By.CSS_SELECTOR, ".content"))
)

# 获取动态加载内容
content = driver.find_element(By.CSS_SELECTOR, ".content").text

# 执行 JavaScript
driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")

# 关闭浏览器
driver.quit()

3. 核心实现

以下我们通过实际案例,展示从基础请求到智能化解析的完整过程。

3.1 基础静态网页抓取

import requests
from bs4 import BeautifulSoup
import pandas as pd

def basic_scraper(url):
    """
    # 基础静态网页抓取函数
    """
    # 模拟浏览器请求头
    headers = {
        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
        "Accept": "text/html,application/xhtml+xml,application/xml",
        "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8"
    }
    
    # 发送请求并获取响应
    response = requests.get(url, headers=headers)
    response.encoding = response.apparent_encoding  # 自动检测编码
    
    # 检查请求是否成功
    if response.status_code != 200:
        print(f"请求失败: {response.status_code}")
        return None
    
    # 解析HTML内容
    soup = BeautifulSoup(response.text, "lxml")
    
    # 示例:获取名人名言网站的数据
    quotes = []
    for quote in soup.select(".quote"):
        text = quote.select_one(".text").get_text(strip=True)
        author = quote.select_one(".author").get_text(strip=True)
        tags = [tag.get_text(strip=True) for tag in quote.select(".tags .tag")]
        
        quotes.append({
            "text": text,
            "author": author,
            "tags": tags
        })
    
    # 转换为DataFrame便于后续处理
    df = pd.DataFrame(quotes)
    return df

# 使用示例
data = basic_scraper("https://quotes.toscrape.com/page/1/")
print(data.head())

3.2 自适应抓取器实现

from autoscraper import AutoScraper
import requests
from bs4 import BeautifulSoup
import pandas as pd

class AdaptiveScraper:
    """
    # 自适应网页抓取器类
    """
    def __init__(self):
        self.scraper = AutoScraper()
        self.trained = False
    
    def train(self, url, sample_data, field_names=None):
        """
        # 基于样本数据训练抓取规则
        """
        # 扁平化样本数据
        flat_samples = []
        for item in sample_data:
            if isinstance(item, dict):
                flat_samples.extend(item.values())
            elif isinstance(item, (list, tuple)):
                flat_samples.extend(item)
            else:
                flat_samples.append(item)
        
        # 训练抓取规则
        result_rules = self.scraper.build(url, flat_samples)
        self.trained = True
        
        # 设置字段名称
        self.field_names = field_names
        
        print(f"训练完成,获取到 {len(result_rules)} 条规则")
        return result_rules
    
    def scrape(self, url):
        """
        # 应用训练好的规则抓取数据
        """
        if not self.trained:
            print("请先训练抓取规则")
            return None
        
        # 获取所有抓取结果
        results = self.scraper.get_result_similar(url)
        
        # 如果有字段名称,则构建结构化数据
        if self.field_names and len(results) % len(self.field_names) == 0:
            structured_data = []
            items_count = len(results) // len(self.field_names)
            
            for i in range(items_count):
                item = {}
                for j, field in enumerate(self.field_names):
                    item[field] = results[i * len(self.field_names) + j]
                structured_data.append(item)
            
            return pd.DataFrame(structured_data)
        
        return results
    
    def save_model(self, filename):
        """
        # 保存训练好的模型
        """
        self.scraper.save(filename)
    
    def load_model(self, filename):
        """
        # 加载已有模型
        """
        self.scraper.load(filename)
        self.trained = True

# 使用示例
def demo_adaptive_scraper():
    # 1. 初始化抓取器
    adaptive_scraper = AdaptiveScraper()
    
    # 2. 准备训练样本
    url = "https://quotes.toscrape.com/page/1/"
    sample_data = [
        {"text": "The world as we have created it is a process of our thinking. It cannot be changed without changing our thinking.", 
         "author": "Albert Einstein"},
        {"text": "It is our choices, Harry, that show what we truly are, far more than our abilities.", 
         "author": "J.K. Rowling"}
    ]
    field_names = ["text", "author"]
    
    # 3. 训练抓取规则
    adaptive_scraper.train(url, sample_data, field_names)
    
    # 4. 应用到新页面
    results = adaptive_scraper.scrape("https://quotes.toscrape.com/page/2/")
    print(results.head())
    
    # 5. 保存模型以便复用
    adaptive_scraper.save_model("quotes_scraper_model")

# 运行示例
demo_adaptive_scraper()

3.3 处理动态加载内容

import time
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from webdriver_manager.chrome import ChromeDriverManager
from bs4 import BeautifulSoup
import pandas as pd

class DynamicContentScraper:
    """
    # 动态内容抓取器
    """
    def __init__(self, headless=True):
        """
        # 初始化浏览器实例
        """
        options = webdriver.ChromeOptions()
        if headless:
            options.add_argument("--headless")
        options.add_argument("--disable-gpu")
        options.add_argument("--no-sandbox")
        options.add_argument("--disable-dev-shm-usage")
        options.add_argument("--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64)")
        
        self.driver = webdriver.Chrome(
            service=Service(ChromeDriverManager().install()),
            options=options
        )
    
    def __del__(self):
        """
        # 确保浏览器实例被正确关闭
        """
        try:
            self.driver.quit()
        except:
            pass
    
    def scrape(self, url, scroll_times=0, wait_time=2, infinite_scroll=False):
        """
        # 抓取动态加载内容
        # - scroll_times: 滚动次数
        # - wait_time: 每次滚动后等待时间
        # - infinite_scroll: 是否持续滚动直到页面不再变化
        """
        self.driver.get(url)
        
        # 等待页面初始加载
        time.sleep(wait_time)
        
        # 处理无限滚动页面
        if infinite_scroll:
            last_height = self.driver.execute_script("return document.body.scrollHeight")
            while True:
                # 滚动到页面底部
                self.driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
                time.sleep(wait_time)
                
                # 计算新的滚动高度并与上一个滚动高度进行比较
                new_height = self.driver.execute_script("return document.body.scrollHeight")
                if new_height == last_height:
                    # 如果高度没有变化,则页面已加载完毕
                    break
                last_height = new_height
        else:
            # 执行固定次数的滚动
            for _ in range(scroll_times):
                self.driver.execute_script("window.scrollBy(0, window.innerHeight);")
                time.sleep(wait_time)
        
        # 获取完整的HTML内容
        html_content = self.driver.page_source
        
        return html_content
    
    def scrape_with_interaction(self, url, interaction_script, interaction_count=1, wait_time=2):
        """
        # 带交互的抓取(点击、输入等)
        # - interaction_script: 交互脚本函数,接收driver参数
        # - interaction_count: 交互次数
        # - wait_time: 每次交互后等待时间
        """
        self.driver.get(url)
        time.sleep(wait_time)
        
        results = []
        
        # 执行交互脚本指定次数
        for i in range(interaction_count):
            # 执行交互
            interaction_script(self.driver, i)
            time.sleep(wait_time)
            
            # 保存当前页面内容
            results.append(self.driver.page_source)
        
        return results

# 使用示例:抓取需要无限滚动加载的产品列表
def demo_dynamic_scraper():
    # 初始化抓取器
    scraper = DynamicContentScraper(headless=True)
    
    # 抓取无限滚动页面
    html_content = scraper.scrape(
        "https://www.example.com/products",  # 替换为实际的无限滚动页面
        infinite_scroll=True,
        wait_time=3
    )
    
    # 解析HTML内容
    soup = BeautifulSoup(html_content, "lxml")
    
    # 提取产品信息
    products = []
    for product_elem in soup.select(".product-item"):  # 替换为实际的产品项选择器
        name = product_elem.select_one(".product-name").text.strip()
        price = product_elem.select_one(".product-price").text.strip()
        
        products.append({
            "name": name,
            "price": price
        })
    
    # 转换为DataFrame
    df = pd.DataFrame(products)
    print(f"共抓取到 {len(df)} 个产品")
    print(df.head())

# 使用示例:带交互的抓取(点击分页按钮)
def demo_interactive_scraper():
    # 定义交互脚本:点击下一页按钮
    def click_next_page(driver, page_index):
        # 等待下一页按钮出现
        next_btn = WebDriverWait(driver, 10).until(
            EC.element_to_be_clickable((By.CSS_SELECTOR, ".pagination .next"))
        )
        print(f"点击第 {page_index + 1} 页")
        next_btn.click()
    
    # 初始化抓取器
    scraper = DynamicContentScraper(headless=True)
    
    # 抓取5页内容
    page_contents = scraper.scrape_with_interaction(
        "https://quotes.toscrape.com/",  # 使用实际有分页的网站
        click_next_page,
        interaction_count=5,
        wait_time=2
    )
    
    # 解析所有页面内容
    all_quotes = []
    for page_html in page_contents:
        soup = BeautifulSoup(page_html, "lxml")
        for quote in soup.select(".quote"):
            text = quote.select_one(".text").get_text(strip=True)
            author = quote.select_one(".author").get_text(strip=True)
            
            all_quotes.append({
                "text": text,
                "author": author
            })
    
    # 转换为DataFrame
    df = pd.DataFrame(all_quotes)
    print(f"共抓取到 {len(df)} 条名言")
    print(df.head())

4. 智能化解析不规则网站

不规则网站指那些结构变化频繁、缺乏一致性命名规则、使用复杂嵌套布局或依赖JavaScript动态生成DOM的网站。针对这类网站,我们需要更智能的解析策略。

4.1 模式识别与自适应解析

模式识别是处理不规则网站的关键技术,通过学习数据的内在模式,而非依赖固定的DOM结构。

import requests
from bs4 import BeautifulSoup
import re
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
import numpy as np

class PatternRecognitionScraper:
    """
    # 基于模式识别的智能抓取器
    """
    def __init__(self):
        self.vectorizer = TfidfVectorizer(analyzer='char', ngram_range=(3, 5))
        self.samples = []
        self.sample_texts = []
    
    def add_samples(self, sample_texts, labels):
        """
        # 添加样本数据
        # - sample_texts: 文本样本列表
        # - labels: 对应的标签列表
        """
        if len(sample_texts) != len(labels):
            raise ValueError("样本文本和标签数量不匹配")
        
        for text, label in zip(sample_texts, labels):
            self.samples.append((text, label))
            self.sample_texts.append(text)
    
    def train(self):
        """
        # 训练TF-IDF向量化器
        """
        if not self.sample_texts:
            raise ValueError("请先添加样本数据")
        
        self.vectorizer.fit(self.sample_texts)
        self.sample_vectors = self.vectorizer.transform(self.sample_texts)
    
    def extract_similar_elements(self, html_content, similarity_threshold=0.8):
        """
        # 从HTML中提取与样本相似的元素
        """
        soup = BeautifulSoup(html_content, "lxml")
        
        # 获取所有可能的文本块
        text_elements = []
        for element in soup.find_all(text=True):
            if element.parent.name not in ['script', 'style', 'meta', 'noscript']:
                text = element.strip()
                if text and len(text) > 5:  # 忽略过短的文本
                    text_elements.append((text, element))
        
        # 如果没有样本数据,直接返回所有文本元素
        if not self.samples:
            return [{"text": text, "element": element, "label": None} 
                    for text, element in text_elements]
        
        # 向量化所有文本元素
        element_texts = [text for text, _ in text_elements]
        element_vectors = self.vectorizer.transform(element_texts)
        
        # 计算相似度并提取结果
        results = []
        for i, (text, element) in enumerate(text_elements):
            # 计算与所有样本的相似度
            similarities = cosine_similarity(
                element_vectors[i:i+1], 
                self.sample_vectors
            )[0]
            
            # 找到最相似的样本
            max_sim_idx = np.argmax(similarities)
            max_similarity = similarities[max_sim_idx]
            
            # 如果相似度超过阈值,添加到结果中
            if max_similarity >= similarity_threshold:
                _, label = self.samples[max_sim_idx]
                results.append({
                    "text": text,
                    "element": element,
                    "label": label,
                    "similarity": float(max_similarity)
                })
        
        return results
    
    def extract_structured_data(self, html_content, similarity_threshold=0.8):
        """
        # 提取结构化数据并按标签组织
        """
        elements = self.extract_similar_elements(
            html_content, 
            similarity_threshold
        )
        
        # 按标签组织数据
        structured_data = {}
        for element in elements:
            label = element["label"]
            if label not in structured_data:
                structured_data[label] = []
            
            structured_data[label].append(element["text"])
        
        return structured_data

# 使用示例
def demo_pattern_recognition():
    # 初始化抓取器
    scraper = PatternRecognitionScraper()
    
    # 添加样本数据
    sample_texts = [
        "产品价格: ¥199.99",
        "价格: ¥299.50",
        "¥399.00",
        "产品名称: 高级耳机",
        "名称: 蓝牙音箱",
        "无线键盘鼠标套装"
    ]
    
    labels = [
        "price",
        "price",
        "price",
        "product_name",
        "product_name",
        "product_name"
    ]
    
    scraper.add_samples(sample_texts, labels)
    
    # 训练模型
    scraper.train()
    
    # 抓取网页内容
    url = "https://example.com/products"  # 替换为实际URL
    response = requests.get(url)
    
    # 提取结构化数据
    structured_data = scraper.extract_structured_data(
        response.text,
        similarity_threshold=0.75
    )
    
    print("提取的结构化数据:")
    for label, texts in structured_data.items():
        print(f"{label}: {texts[:3]}...")  # 只显示前3个

4.2 基于规则学习的智能解析

结合启发式规则与机器学习,自动发现页面中的数据模式。

import requests
from bs4 import BeautifulSoup
import re
import pandas as pd
from collections import Counter

class RuleLearningScraper:
    """
    # 基于规则学习的智能抓取器
    """
    def __init__(self):
        self.rules = {}
        self.context_patterns = {}
    
    def learn_from_examples(self, html_content, examples):
        """
        # 从示例中学习抽取规则
        # - html_content: 页面HTML内容
        # - examples: 示例数据字典 {字段名: [示例值]}
        """
        soup = BeautifulSoup(html_content, "lxml")
        
        for field, sample_values in examples.items():
            # 为每个字段学习规则
            field_rules = []
            context_patterns = []
            
            for sample in sample_values:
                # 1. 尝试直接文本匹配
                elements = soup.find_all(string=re.compile(re.escape(sample)))
                
                # 2. 如果找不到完全匹配,尝试模糊匹配
                if not elements:
                    # 对于数字,可能格式不完全一致
                    if re.search(r'\d', sample):
                        numeric_part = re.sub(r'[^\d.]', '', sample)
                        elements = soup.find_all(string=re.compile(numeric_part))
                
                # 3. 分析每个匹配元素的上下文
                for element in elements:
                    # 提取父元素和祖先元素的特征
                    parent = element.parent
                    if parent:
                        # 分析标签
                        tag_name = parent.name
                        
                        # 分析类名
                        class_names = parent.get('class', [])
                        class_str = ' '.join(class_names) if class_names else ''
                        
                        # 分析ID
                        id_attr = parent.get('id', '')
                        
                        # 构建特征规则
                        rule = {
                            'tag': tag_name,
                            'class': class_str,
                            'id': id_attr
                        }
                        field_rules.append(rule)
                        
                        # 分析上下文模式(前后文本)
                        prev_text = element.previous_sibling.string if element.previous_sibling else ''
                        next_text = element.next_sibling.string if element.next_sibling else ''
                        
                        if prev_text:
                            prev_pattern = prev_text.strip()
                            if prev_pattern:
                                context_patterns.append(('prev', prev_pattern))
                        
                        if next_text:
                            next_pattern = next_text.strip()
                            if next_pattern:
                                context_patterns.append(('next', next_pattern))
            
            # 统计最常见的规则和上下文模式
            if field_rules:
                # 找出最频繁出现的标签、类和ID组合
                rule_counter = Counter([
                    (r['tag'], r['class'], r['id']) for r in field_rules
                ])
                most_common_rule = rule_counter.most_common(1)[0][0]
                
                self.rules[field] = {
                    'tag': most_common_rule[0],
                    'class': most_common_rule[1],
                    'id': most_common_rule[2]
                }
            
            # 保存上下文模式
            if context_patterns:
                pattern_counter = Counter(context_patterns)
                most_common_patterns = [p[0] for p in pattern_counter.most_common(3)]
                self.context_patterns[field] = most_common_patterns
            
            print(f"字段 '{field}' 的规则学习完成")
    
    def extract_data(self, html_content):
        """
        # 使用学习到的规则提取数据
        """
        soup = BeautifulSoup(html_content, "lxml")
        result = {}
        
        for field, rule in self.rules.items():
            # 基于规则查找元素
            candidates = soup.find_all(rule['tag'], class_=rule['class'] if rule['class'] else None)
            
            # 进一步使用上下文模式过滤
            field_data = []
            for element in candidates:
                # 检查元素是否符合上下文模式
                if field in self.context_patterns:
                    for pattern_type, pattern in self.context_patterns[field]:
                        if pattern_type == 'prev' and element.previous_sibling:
                            prev_text = element.previous_sibling.string
                            if prev_text and pattern in prev_text:
                                field_data.append(element.get_text(strip=True))
                                break
                        elif pattern_type == 'next' and element.next_sibling:
                            next_text = element.next_sibling.string
                            if next_text and pattern in next_text:
                                field_data.append(element.get_text(strip=True))
                                break
                else:
                    # 如果没有上下文模式,直接添加文本
                    field_data.append(element.get_text(strip=True))
            
            result[field] = field_data
        
        return result

# 使用示例
def demo_rule_learning_scraper():
    # 1. 获取HTML内容
    url = "https://example.com/products-page"  # 替换为实际URL
    response = requests.get(url)
    html_content = response.text
    
    # 2. 准备示例数据
    examples = {
        "product_name": [
            "超薄笔记本电脑",
            "无线蓝牙耳机",
            "智能手表"
        ],
        "price": [
            "¥5999.00",
            "¥399.00",
            "¥1299.00"
        ]
    }
    
    # 3. 初始化抓取器并学习规则
    scraper = RuleLearningScraper()
    scraper.learn_from_examples(html_content, examples)
    
    # 4. 应用到新页面
    new_url = "https://example.com/products-page?page=2"  # 替换为实际URL
    new_response = requests.get(new_url)
    extracted_data = scraper.extract_data(new_response.text)
    
    # 5. 转换为DataFrame
    df = pd.DataFrame({
        field: values[:min(len(values), len(extracted_data["product_name"]))]
        for field, values in extracted_data.items()
    })
    
    print(df.head())

4.3 结合生成式 AI 的智能解析系统

利用大型语言模型(LLM)的理解能力,直接从网页提取结构化数据。

import requests
import json
import os
from bs4 import BeautifulSoup
import pandas as pd
import openai
import re

class LLMPoweredScraper:
    """
    # 基于大型语言模型的智能抓取器
    """
    def __init__(self, api_key=None, model="gpt-3.5-turbo"):
        """
        # 初始化LLM抓取器
        # - api_key: OpenAI API密钥(可选,默认从环境变量读取)
        # - model: 使用的模型名称
        """
        if api_key:
            openai.api_key = api_key
        else:
            openai.api_key = os.environ.get("OPENAI_API_KEY")
        
        if not openai.api_key:
            raise ValueError("请提供OpenAI API密钥")
        
        self.model = model
    
    def extract_data_with_llm(self, html_content, extraction_prompt, max_tokens=4000):
        """
        # 使用语言模型从HTML中提取数据
        # - html_content: HTML内容
        # - extraction_prompt: 数据提取指令
        # - max_tokens: 发送给模型的最大令牌数
        """
        # 清理HTML并提取文本
        soup = BeautifulSoup(html_content, "lxml")
        
        # 移除脚本和样式标签
        for script in soup(["script", "style"]):
            script.extract()
        
        # 获取可读文本
        text = soup.get_text(separator="\n", strip=True)
        
        # 截断文本以适应令牌限制
        # 这是一个简化处理,实际应用中可能需要更复杂的处理
        if len(text) > max_tokens * 4:  # 粗略估计字符数与令牌数的比例
            text = text[:max_tokens * 4]
        
        # 构建完整提示
        full_prompt = f"""我需要从以下网页内容中提取结构化数据。

提取要求:{extraction_prompt}

网页内容:
{text}

请以JSON格式返回结果,确保格式正确且可解析。"""
        
        # 调用语言模型API
        response = openai.ChatCompletion.create(
            model=self.model,
            messages=[
                {"role": "system", "content": "你是一个专业的数据提取助手。你的任务是从网页内容中精确提取结构化数据。"},
                {"role": "user", "content": full_prompt}
            ],
            temperature=0.2,  # 低温度以获得更确定性的输出
            max_tokens=1000,  # 限制响应长度
        )
        
        # 获取模型响应
        result_text = response.choices[0].message.content
        
        # 提取JSON部分
        try:
            # 尝试直接解析整个响应
            extracted_data = json.loads(result_text)
        except json.JSONDecodeError:
            # 如果失败,尝试从文本中提取JSON部分
            json_match = re.search(r'```json\s*([\s\S]*?)\s*```', result_text)
            if json_match:
                try:
                    extracted_data = json.loads(json_match.group(1))
                except:
                    # 如果仍然失败,使用更宽松的方法
                    json_str = re.search(r'{[\s\S]*}', result_text)
                    if json_str:
                        try:
                            extracted_data = json.loads(json_str.group(0))
                        except:
                            raise ValueError("无法从模型响应中提取有效的JSON数据")
                    else:
                        raise ValueError("模型响应中没有找到JSON数据")
            else:
                raise ValueError("模型响应中没有找到JSON数据")
        
        return extracted_data
    
    def scrape_website(self, url, extraction_prompt):
        """
        # 抓取网站并使用LLM提取数据
        """
        # 发送HTTP请求
        response = requests.get(url)
        
        if response.status_code != 200:
            raise Exception(f"请求失败: HTTP {response.status_code}")
        
        # 使用LLM提取数据
        data = self.extract_data_with_llm(
            response.text,
            extraction_prompt
        )
        
        return data

# 使用示例
def demo_llm_powered_scraper():
    # 确保设置了API密钥
    api_key = "your_openai_api_key"  # 替换为你的API密钥
    
    # 初始化抓取器
    scraper = LLMPoweredScraper(api_key=api_key)
    
    # 目标网站
    url = "https://example.com/complex-product-page"  # 替换为实际URL
    
    # 提取指令
    extraction_prompt = """
    请从这个产品页面中提取以下信息:
    1. 产品名称
    2. 价格(包括任何折扣价)
    3. 产品描述
    4. 产品规格(如尺寸、重量、颜色等)
    5. 评分(如果有)
    6. 评论数量(如果有)
    
    对于不规则或难以提取的数据,请使用你的理解能力提取最接近的信息。
    """
    
    # 执行抓取
    try:
        data = scraper.scrape_website(url, extraction_prompt)
        print("提取的数据:")
        print(json.dumps(data, indent=2, ensure_ascii=False))
    except Exception as e:
        print(f"抓取失败: {e}")

4.4 WebAgent:LLM驱动的自主抓取代理

WebAgent是一种先进的抓取方法,它使用LLM来理解网页结构并自主生成抓取代码。

import requests
import json
import time
import os
from bs4 import BeautifulSoup
import openai
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from webdriver_manager.chrome import ChromeDriverManager

class WebAgent:
    """
    # LLM驱动的网页抓取代理
    """
    def __init__(self, api_key=None, model="gpt-4"):
        """
        # 初始化WebAgent
        """
        if api_key:
            openai.api_key = api_key
        else:
            openai.api_key = os.environ.get("OPENAI_API_KEY")
        
        if not openai.api_key:
            raise ValueError("请提供OpenAI API密钥")
        
        self.model = model
        self.browser = None
    
    def __del__(self):
        """
        # 确保浏览器实例被正确关闭
        """
        if self.browser:
            self.browser.quit()
    
    def get_webpage_content(self, url, use_selenium=False):
        """
        # 获取网页内容
        # - use_selenium: 是否使用Selenium获取动态内容
        """
        if use_selenium:
            if not self.browser:
                options = webdriver.ChromeOptions()
                options.add_argument("--headless")
                options.add_argument("--disable-gpu")
                options.add_argument("--no-sandbox")
                
                self.browser = webdriver.Chrome(
                    service=Service(ChromeDriverManager().install()),
                    options=options
                )
            
            self.browser.get(url)
            time.sleep(3)  # 等待页面加载
            return self.browser.page_source
        else:
            response = requests.get(url)
            return response.text
    
    def analyze_webpage(self, html_content, task_description):
        """
        # 使用LLM分析网页并生成抓取计划
        """
        # 清理HTML
        soup = BeautifulSoup(html_content, "lxml")
        
        # 提取可能包含关键数据的重要HTML部分
        # 移除不必要的标签
        for tag in soup(['script', 'style', 'meta', 'noscript', 'svg']):
            tag.extract()
        
        # 获取清理后的HTML
        cleaned_html = str(soup)
        
        # 如果HTML太长,提取主要部分
        if len(cleaned_html) > 12000:
            # 简单方法:尝试提取主要内容区域
            main_content = soup.find('main') or soup.find(id='main') or soup.find(class_='main')
            if main_content:
                cleaned_html = str(main_content)
                
            # 如果仍然太长,截断
            if len(cleaned_html) > 12000:
                cleaned_html = cleaned_html[:12000] + "..."
        
        # 构建分析提示
        analysis_prompt = f"""作为一个Web抓取专家,请分析以下网页并制定抓取计划。

任务描述:{task_description}

网页HTML结构(部分):
```html
{cleaned_html}

请提供以下内容:

  1. 关键数据字段及其在网页中的位置(提供选择器或路径)
  2. 抓取策略(是否需要处理分页、点击按钮等交互)
  3. 可能的挑战和解决方案(动态加载、反爬虫等)
  4. 推荐的Python代码框架来完成这个抓取任务

以JSON格式返回分析结果。“”"

    # 调用LLM API
    response = openai.ChatCompletion.create(
        model=self.model,
        messages=[
            {"role": "system", "content": "你是一个网页分析和抓取专家。你擅长理解网页结构并设计有效的抓取策略。"},
            {"role": "user", "content": analysis_prompt}
        ],
        temperature=0.3,
        max_tokens=1500
    )
    
    # 获取分析结果
    result_text = response.choices[0].message.content
    
    # 提取JSON部分
    try:
        # 尝试从文本中提取JSON
        json_match = re.search(r'```json\s*([\s\S]*?)\s*```', result_text)
        if json_match:
            analysis = json.loads(json_match.group(1))
        else:
            # 尝试直接解析
            analysis = json.loads(result_text)
    except:
        # 如果解析失败,直接返回文本结果
        analysis = {"raw_analysis": result_text}
    
    return analysis

def generate_scraper_code(self, analysis, url):
    """
    # 基于分析生成抓取代码
    """
    code_prompt = f"""请基于以下网页分析结果,生成一个完整的Python抓取脚本:

分析结果:{json.dumps(analysis, ensure_ascii=False)}

目标URL:{url}

请生成一个包含必要导入、异常处理和数据清洗的完整Python脚本。
脚本应该能够从URL抓取数据并将结果保存为JSON或CSV文件。
添加详细注释以解释代码的每个部分。
“”"

    # 调用LLM API
    response = openai.ChatCompletion.create(
        model=self.model,
        messages=[
            {"role": "system", "content": "你是一个Python抓取专家。你擅长编写高效、可靠的网页抓取代码。"},
            {"role": "user", "content": code_prompt}
        ],
        temperature=0.2,
        max_tokens=2000
    )
    
    # 获取代码
    result_text = response.choices[0].message.content
    
    # 提取代码部分
    code_match = re.search(r'```python\s*([\s\S]*?)\s*```', result_text)
    if code_match:
        code = code_match.group(1)
    else:
        code = result_text
    
    return code

def execute_task(self, url, task_description, use_selenium=False):
    """
    # 执行完整的抓取任务
    """
    # 1. 获取网页内容
    print("正在获取网页内容...")
    html_content = self.get_webpage_content(url, use_selenium)
    
    # 2. 分析网页
    print("正在分析网页结构...")
    analysis = self.analyze_webpage(html_content, task_description)
    
    # 3. 生成代码
    print("正在生成抓取代码...")
    code = self.generate_scraper_code(analysis, url)
    
    # 4. 返回结果
    return {
        "analysis": analysis,
        "code": code
    }

使用示例

def demo_web_agent():
# 初始化WebAgent
agent = WebAgent(api_key=“your_openai_api_key”) # 替换为你的API密钥

# 目标网站和任务
url = "https://example.com/products/category/electronics"
task = "抓取所有电子产品的名称、价格、评分和库存状态,并处理多页结果"

# 执行任务
result = agent.execute_task(url, task, use_selenium=True)

# 输出结果
print("\n分析结果:")
print(json.dumps(result["analysis"], indent=2, ensure_ascii=False))

print("\n生成的代码:")
print(result["code"])

# 保存代码到文件
with open("generated_scraper.py", "w", encoding="utf-8") as f:
    f.write(result["code"])

print("\n代码已保存到 generated_scraper.py")

## 5. 高级技巧与最佳实践

### 5.1 处理反爬虫机制

现代网站通常会实施各种反爬虫措施,以下是一些有效的对抗策略:

```python
import requests
import random
import time
from fake_useragent import UserAgent
import urllib3
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

class RobustScraper:
    """
    # 具有反反爬虫能力的抓取器
    """
    def __init__(self, use_proxies=False):
        # 禁用SSL警告
        urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
        
        # 创建会话
        self.session = requests.Session()
        
        # 重试策略
        retry_strategy = Retry(
            total=3,  # 总共重试次数
            backoff_factor=0.3,  # 重试间隔系数
            status_forcelist=[429, 500, 502, 503, 504],  # 需要重试的HTTP状态码
            allowed_methods=["GET", "POST"]  # 允许重试的请求方法
        )
        adapter = HTTPAdapter(max_retries=retry_strategy)
        self.session.mount("http://", adapter)
        self.session.mount("https://", adapter)
        
        # User-Agent池
        self.ua = UserAgent()
        
        # 代理池(示例)
        self.proxies = []
        if use_proxies:
            self.proxies = [
                "http://proxy1.example.com:8080",
                "http://proxy2.example.com:8080",
                "http://proxy3.example.com:8080"
            ]
    
    def get(self, url, delay=True, verify_ssl=False):
        """
        # 发送健壮的GET请求
        # - delay: 是否添加随机延迟
        # - verify_ssl: 是否验证SSL证书
        """
        # 添加随机延迟
        if delay:
            time.sleep(random.uniform(1, 3))
        
        # 准备请求头
        headers = {
            "User-Agent": self.ua.random,
            "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
            "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
            "Cache-Control": "no-cache",
            "Pragma": "no-cache",
            "Referer": "https://www.google.com/",  # 伪造来源
            "DNT": "1",  # Do Not Track
            "Upgrade-Insecure-Requests": "1"
        }
        
        # 选择代理(如果有)
        proxy = None
        if self.proxies:
            proxy = {"http": random.choice(self.proxies), "https": random.choice(self.proxies)}
        
        # 发送请求
        try:
            response = self.session.get(
                url, 
                headers=headers,
                proxies=proxy,
                verify=verify_ssl,
                timeout=10
            )
            response.raise_for_status()
            return response
        except requests.exceptions.RequestException as e:
            print(f"请求失败: {e}")
            return None

# 使用示例
def demo_robust_scraping():
    scraper = RobustScraper(use_proxies=False)  # 设置为True将使用代理
    
    # 抓取多个网页
    urls = [
        "https://example.com/page1",
        "https://example.com/page2",
        "https://example.com/page3"
    ]
    
    for url in urls:
        response = scraper.get(url)
        if response and response.status_code == 200:
            print(f"成功抓取 {url}")
            # 处理响应...
        else:
            print(f"抓取 {url} 失败")

5.2 分布式抓取与数据管道

对于大规模抓取,可以使用分布式框架如Scrapy和数据管道:

# Scrapy框架示例(需要在单独的项目中使用)
"""
# 安装Scrapy:pip install scrapy

# 项目结构:
- myspider/
  - scrapy.cfg
  - myspider/
    - __init__.py
    - items.py
    - middlewares.py
    - pipelines.py
    - settings.py
    - spiders/
      - product_spider.py
"""

# spiders/product_spider.py示例
import scrapy
from myspider.items import ProductItem

class ProductSpider(scrapy.Spider):
    name = "products"
    allowed_domains = ["example.com"]
    start_urls = ["https://example.com/products"]
    
    def parse(self, response):
        # 解析产品列表页
        for product in response.css(".product-item"):
            item = ProductItem()
            item["name"] = product.css(".product-name::text").get().strip()
            item["price"] = product.css(".product-price::text").get().strip()
            item["url"] = product.css("a::attr(href)").get()
            
            # 如果有详情页,进一步抓取
            if item["url"]:
                yield response.follow(
                    item["url"], 
                    self.parse_product_detail,
                    meta={"item": item}
                )
            else:
                yield item
        
        # 处理分页
        next_page = response.css(".pagination .next::attr(href)").get()
        if next_page:
            yield response.follow(next_page, self.parse)
    
    def parse_product_detail(self, response):
        # 从详情页获取更多信息
        item = response.meta["item"]
        item["description"] = response.css(".product-description::text").get().strip()
        item["specs"] = response.css(".product-specs li::text").getall()
        
        yield item

# items.py示例
import scrapy

class ProductItem(scrapy.Item):
    name = scrapy.Field()
    price = scrapy.Field()
    url = scrapy.Field()
    description = scrapy.Field()
    specs = scrapy.Field()

# pipelines.py示例
import json
from itemadapter import ItemAdapter

class JsonPipeline:
    def open_spider(self, spider):
        self.file = open("products.json", "w")
        self.file.write("[")
        self.first_item = True
    
    def close_spider(self, spider):
        self.file.write("]")
        self.file.close()
    
    def process_item(self, item, spider):
        adapter = ItemAdapter(item)
        line = json.dumps(adapter.asdict(), ensure_ascii=False)
        
        if self.first_item:
            self.first_item = False
        else:
            line = "," + line
        
        self.file.write(line)
        return item

5.3 智能化解析多样化网站

对于多样化网站结构的智能解析,可以结合多种技术:

import openai
from bs4 import BeautifulSoup
import pandas as pd
import re
from autoscraper import AutoScraper
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.cluster import KMeans

class HybridIntelligentScraper:
    """
    # 混合智能抓取系统:结合多种技术
    """
    def __init__(self, openai_key=None):
        # 初始化组件
        self.llm_available = False
        if openai_key:
            openai.api_key = openai_key
            self.llm_available = True
        
        self.autoscraper = AutoScraper()
        self.vectorizer = TfidfVectorizer(analyzer='char', ngram_range=(3, 5))
    
    def analyze_website_structure(self, html_content):
        """
        # 分析网站结构并确定最佳抓取策略
        """
        soup = BeautifulSoup(html_content, "lxml")
        
        # 1. 识别主要内容区域
        main_content = soup.find('main') or soup.find(id='main') or soup.find(class_='main')
        content_area = main_content if main_content else soup
        
        # 2. 提取所有可能的项目容器
        containers = []
        
        # 查找可能包含多个项目的容器
        for tag in ['div', 'ul', 'ol', 'table']:
            for element in content_area.find_all(tag):
                # 计算直接子元素数量
                children = [c for c in element.children if c.name is not None]
                if len(children) >= 3:  # 至少有3个子元素
                    # 检查子元素是否具有相似结构
                    if self._elements_have_similar_structure(children):
                        containers.append({
                            'element': element,
                            'children_count': len(children),
                            'tag': tag,
                            'id': element.get('id', ''),
                            'classes': ' '.join(element.get('class', []))
                        })
        
        # 3. 按子元素数量排序
        containers.sort(key=lambda x: x['children_count'], reverse=True)
        
        # 4. 返回最可能的内容容器
        if containers:
            return {
                'main_container': containers[0],
                'item_containers': containers,
                'structure_type': self._determine_structure_type(soup)
            }
        
        return {
            'structure_type': 'unknown',
            'message': '无法识别规律的内容容器'
        }
    
    def _elements_have_similar_structure(self, elements, similarity_threshold=0.7):
        """
        # 检查元素是否具有相似结构
        """
        if len(elements) < 2:
            return False
        
        # 获取每个元素的HTML结构特征
        structures = []
        for element in elements:
            # 提取标签名称序列
            tags = [tag.name for tag in element.find_all()]
            structures.append(' '.join(tags))
        
        # 对结构进行向量化
        if not structures:
            return False
            
        vect = TfidfVectorizer(analyzer='char', ngram_range=(2, 3))
        try:
            X = vect.fit_transform(structures)
            
            # 使用K-means聚类
            kmeans = KMeans(n_clusters=2, random_state=0).fit(X)
            
            # 计算主要簇的比例
            from collections import Counter
            labels = kmeans.labels_
            main_cluster = Counter(labels).most_common(1)[0][0]
            main_cluster_ratio = sum(1 for l in labels if l == main_cluster) / len(labels)
            
            return main_cluster_ratio >= similarity_threshold
        except:
            return False
    
    def _determine_structure_type(self, soup):
        """
        # 确定网站结构类型
        """
        # 检查是否为电子商务网站
        if soup.find_all(string=re.compile(r'(购买|加入购物车|立即购买|¥|\$|€|£|price)', re.I)):
            return 'ecommerce'
        
        # 检查是否为新闻/博客网站
        if soup.find_all(['article', 'section']) or soup.find_all(string=re.compile(r'(发布时间|发表于|published|posted on)', re.I)):
            return 'article'
        
        # 检查是否为论坛/评论
        if soup.find_all(['forum', 'comment', 'discussion']) or soup.find_all(string=re.compile(r'(评论|回复|回帖|comment|reply)', re.I)):
            return 'forum'
        
        return 'general'
    
    def extract_with_best_method(self, html_content, samples=None, extraction_prompt=None):
        """
        # 使用最佳方法提取数据
        """
        # 1. 分析网站结构
        structure_analysis = self.analyze_website_structure(html_content)
        
        # 2. 根据结构类型选择最佳方法
        structure_type = structure_analysis.get('structure_type', 'unknown')
        
        if structure_type == 'unknown' or structure_type == 'general':
            # 对于未知或复杂结构,使用LLM(如果可用)
            if self.llm_available and extraction_prompt:
                return self._extract_with_llm(html_content, extraction_prompt)
            elif samples:
                # 退回到AutoScraper
                return self._extract_with_autoscraper(html_content, samples)
            else:
                raise ValueError("对于未知结构网站,需要提供样本数据或LLM提取提示")
        elif structure_type in ['ecommerce', 'article', 'forum']:
            # 对于已知结构类型,优先使用AutoScraper
            if samples:
                return self._extract_with_autoscraper(html_content, samples)
            elif self.llm_available and extraction_prompt:
                return self._extract_with_llm(html_content, extraction_prompt)
            else:
                # 使用特定结构的启发式规则
                return self._extract_with_heuristics(html_content, structure_type, structure_analysis)
    
    def _extract_with_autoscraper(self, html_content, samples):
        """
        # 使用AutoScraper提取数据
        """
        # 重置AutoScraper
        self.autoscraper = AutoScraper()
        
        # 训练规则
        self.autoscraper.build(html_content, samples)
        
        # 从同一HTML抓取
        results = self.autoscraper.get_result_similar(html_content)
        
        return results
    
    def _extract_with_llm(self, html_content, extraction_prompt):
        """
        # 使用LLM提取数据
        """
        if not self.llm_available:
            raise ValueError("LLM未配置")
        
        soup = BeautifulSoup(html_content, "lxml")
        
        # 清理HTML
        for script in soup(["script", "style"]):
            script.extract()
        
        text = soup.get_text(separator="\n", strip=True)
        
        # 截断文本
        if len(text) > 16000:
            text = text[:16000] + "..."
        
        # 构建提示
        prompt = f"""请从以下网页内容中提取结构化数据:

提取要求:{extraction_prompt}

网页内容:
{text}

请以JSON格式返回结果。"""
        
        # 调用API
        response = openai.ChatCompletion.create(
            model="gpt-3.5-turbo",
            messages=[
                {"role": "system", "content": "你是一个数据提取专家。"},
                {"role": "user", "content": prompt}
            ],
            temperature=0.2
        )
        
        result_text = response.choices[0].message.content
        
        # 提取JSON
        import json
        try:
            # 尝试直接解析
            return json.loads(result_text)
        except:
            # 寻找JSON块
            json_match = re.search(r'```json\s*([\s\S]*?)\s*```', result_text)
            if json_match:
                try:
                    return json.loads(json_match.group(1))
                except:
                    pass
            
            # 寻找花括号包围的内容
            json_match = re.search(r'{[\s\S]*}', result_text)
            if json_match:
                try:
                    return json.loads(json_match.group(0))
                except:
                    pass
        
        # 如果无法解析JSON,返回原始文本
        return {"error": "无法提取结构化数据", "raw_result": result_text}
    
    def _extract_with_heuristics(self, html_content, structure_type, structure_analysis):
        """
        # 使用特定结构类型的启发式规则提取
        """
        soup = BeautifulSoup(html_content, "lxml")
        
        if structure_type == 'ecommerce':
            # 电子商务网站启发式提取
            products = []
            
            # 获取主容器
            main_container = structure_analysis.get('main_container', {}).get('element')
            container = main_container if main_container else soup
            
            # 查找产品项
            product_items = container.find_all(['div', 'li'], class_=re.compile(r'(product|item|goods)', re.I))
            
            for item in product_items:
                product = {}
                
                # 提取名称
                name_elem = item.find(['h2', 'h3', 'h4', 'a'], class_=re.compile(r'(name|title)', re.I))
                if name_elem:
                    product['name'] = name_elem.get_text(strip=True)
                
                # 提取价格
                price_elem = item.find(string=re.compile(r'(¥|\$|€|£|\d+\.\d{2})', re.I))
                if price_elem:
                    product['price'] = price_elem.strip()
                
                # 提取图片
                img_elem = item.find('img')
                if img_elem and img_elem.get('src'):
                    product['image'] = img_elem['src']
                
                if product:  # 只有找到有效数据才添加
                    products.append(product)
            
            return products
            
        elif structure_type == 'article':
            # 文章/博客网站启发式提取
            articles = []
            
            # 获取主容器
            main_container = structure_analysis.get('main_container', {}).get('element')
            container = main_container if main_container else soup
            
            # 查找文章项
            article_items = container.find_all(['article', 'div', 'section'], class_=re.compile(r'(post|article|entry)', re.I))
            
            for item in article_items:
                article = {}
                
                # 提取标题
                title_elem = item.find(['h1', 'h2', 'h3'], class_=re.compile(r'(title|heading)', re.I))
                if title_elem:
                    article['title'] = title_elem.get_text(strip=True)
                
                # 提取日期
                date_elem = item.find(string=re.compile(r'\d{4}[-/]\d{1,2}[-/]\d{1,2}'))
                if date_elem:
                    article['date'] = date_elem.strip()
                
                # 提取摘要
                summary_elem = item.find(['p', 'div'], class_=re.compile(r'(summary|excerpt|description)', re.I))
                if summary_elem:
                    article['summary'] = summary_elem.get_text(strip=True)
                
                if article:  # 只有找到有效数据才添加
                    articles.append(article)
            
            return articles
            
        elif structure_type == 'forum':
            # 论坛/评论网站启发式提取
            comments = []
            
            # 获取主容器
            main_container = structure_analysis.get('main_container', {}).get('element')
            container = main_container if main_container else soup
            
            # 查找评论项
            comment_items = container.find_all(['div', 'li'], class_=re.compile(r'(comment|reply|post|message)', re.I))
            
            for item in comment_items:
                comment = {}
                
                # 提取用户名
                user_elem = item.find(class_=re.compile(r'(user|author|username)', re.I))
                if user_elem:
                    comment['user'] = user_elem.get_text(strip=True)
                
                # 提取评论内容
                content_elem = item.find(class_=re.compile(r'(content|text|message|body)', re.I))
                if content_elem:
                    comment['content'] = content_elem.get_text(strip=True)
                
                # 提取时间
                time_elem = item.find(class_=re.compile(r'(time|date)', re.I))
                if time_elem:
                    comment['time'] = time_elem.get_text(strip=True)
                
                if comment:  # 只有找到有效数据才添加
                    comments.append(comment)
            
            return comments
        
        # 默认返回空结果
        return {"error": f"没有针对 {structure_type} 结构类型的启发式规则"}

# 使用示例
def demo_hybrid_intelligent_scraper():
    # 初始化抓取器
    scraper = HybridIntelligentScraper(openai_key="your_openai_api_key")  # 可选,如果不提供则不使用LLM
    
    # 获取页面内容
    import requests
    url = "https://example.com/products"  # 替换为实际URL
    response = requests.get(url)
    html_content = response.text
    
    # 方法1:使用样本数据
    samples = [
        "Product Name 1",
        "Product Name 2",
        "$19.99",
        "$29.99"
    ]
    
    # 方法2:使用LLM提取提示
    extraction_prompt = """
    提取所有产品信息,包括:
    - 产品名称
    - 价格
    - 描述
    - 评分(如果有)
    """
    
    # 智能选择最佳提取方法
    results = scraper.extract_with_best_method(
        html_content,
        samples=samples,
        extraction_prompt=extraction_prompt
    )
    
    print("提取结果:")
    print(results)

6. 总结

本文详细介绍了从基础的HTTP请求与HTML解析,到使用AutoScraper和规则学习实现自适应抓取,再到基于LLM和WebAgent的智能化解析系统的完整技术栈。以下是核心要点:

  1. 基础抓取技术:掌握requestsBeautifulSouplxml等基础库是网络抓取的基石。

  2. 处理动态内容:对于JavaScript渲染的网页,Seleniumundetected_chromedriver是处理动态内容的有效工具。

  3. 自适应抓取:通过AutoScraper等工具实现基于样例的自适应抓取,无需手动编写解析规则。

  4. 智能化解析:利用模式识别、规则学习和大型语言模型来处理不规则网站,实现真正意义上的智能抓取。

  5. 稳健性策略:通过代理池、随机延迟、会话管理等技术提高抓取系统的稳健性,有效应对反爬虫措施。

  6. 分布式与数据管道:对于大规模抓取任务,Scrapy等分布式框架能够显著提高效率和可管理性。

  7. 混合智能方法:最先进的抓取系统结合多种技术,能够自动分析网站结构并选择最合适的抓取策略。

更多精彩内容查看

在这里插入图片描述

Logo

DAMO开发者矩阵,由阿里巴巴达摩院和中国互联网协会联合发起,致力于探讨最前沿的技术趋势与应用成果,搭建高质量的交流与分享平台,推动技术创新与产业应用链接,围绕“人工智能与新型计算”构建开放共享的开发者生态。

更多推荐