下载地址:https://www.pan38.com/dow/share.php?code=JCnzE   提取密码:1889

这个爬虫工具可以自动采集小红书笔记下的所有评论,并提取评论用户的UID。使用时需要替换有效的Cookie,并注意控制请求频率以避免被封。当然还是仅供学习哈~~~~

下面是python源码哈,测试可以用


import requests
import json
import re
import time
import random
from bs4 import BeautifulSoup
from urllib.parse import urlparse, parse_qs
import csv
from datetime import datetime
import os

class XHSSpider:
    def __init__(self):
        self.headers = {
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
            'Referer': 'https://www.xiaohongshu.com/',
            'Cookie': 'your_cookie_here'  # 需要替换为有效cookie
        }
        self.session = requests.Session()
        self.base_url = "https://www.xiaohongshu.com"
        self.comment_api = "https://www.xiaohongshu.com/web_api/sns/v2/note/comment/page"
        self.output_dir = "xhs_comments"
        self.uid_pattern = re.compile(r'"user_id":"([a-f0-9]{24})"')
        
    def create_output_dir(self):
        if not os.path.exists(self.output_dir):
            os.makedirs(self.output_dir)
            
    def get_note_id(self, url):
        parsed = urlparse(url)
        path = parsed.path
        note_id = path.split('/')[-1]
        return note_id
    
    def get_comments(self, note_id, cursor=None):
        params = {
            'note_id': note_id,
            'page_size': 20,
            'sort': 'time'
        }
        if cursor:
            params['cursor'] = cursor
            
        try:
            response = self.session.get(
                self.comment_api,
                params=params,
                headers=self.headers,
                timeout=10
            )
            if response.status_code == 200:
                return response.json()
            else:
                print(f"请求失败,状态码:{response.status_code}")
                return None
        except Exception as e:
            print(f"请求异常:{str(e)}")
            return None
            
    def parse_comments(self, data):
        comments = []
        if not data or not data.get('data'):
            return comments
            
        for item in data['data']['comments']:
            comment = {
                'user_id': item['user']['user_id'],
                'nickname': item['user']['nickname'],
                'content': item['content'],
                'likes': item['like_count'],
                'time': datetime.fromtimestamp(item['create_time']/1000).strftime('%Y-%m-%d %H:%M:%S'),
                'reply_count': item['sub_comments_count']
            }
            comments.append(comment)
            
            # 处理子评论
            if item['sub_comments']:
                for sub in item['sub_comments']:
                    sub_comment = {
                        'user_id': sub['user']['user_id'],
                        'nickname': sub['user']['nickname'],
                        'content': sub['content'],
                        'likes': sub['like_count'],
                        'time': datetime.fromtimestamp(sub['create_time']/1000).strftime('%Y-%m-%d %H:%M:%S'),
                        'reply_count': 0
                    }
                    comments.append(sub_comment)
                    
        return comments
        
    def save_to_csv(self, note_id, comments):
        filename = f"{self.output_dir}/{note_id}_comments.csv"
        with open(filename, 'w', newline='', encoding='utf-8-sig') as f:
            writer = csv.DictWriter(f, fieldnames=['user_id', 'nickname', 'content', 'likes', 'time', 'reply_count'])
            writer.writeheader()
            writer.writerows(comments)
        print(f"已保存评论数据到 {filename}")
        
    def extract_uids(self, comments):
        uids = set()
        for comment in comments:
            uids.add(comment['user_id'])
        return list(uids)
        
    def save_uids(self, note_id, uids):
        filename = f"{self.output_dir}/{note_id}_uids.txt"
        with open(filename, 'w', encoding='utf-8') as f:
            f.write('\n'.join(uids))
        print(f"已保存UID数据到 {filename}")
        
    def crawl(self, note_url):
        self.create_output_dir()
        note_id = self.get_note_id(note_url)
        print(f"开始采集笔记 {note_id} 的评论...")
        
        all_comments = []
        cursor = None
        page = 1
        
        while True:
            print(f"正在获取第 {page} 页评论...")
            data = self.get_comments(note_id, cursor)
            if not data:
                break
                
            comments = self.parse_comments(data)
            all_comments.extend(comments)
            
            if not data['data']['has_more']:
                break
                
            cursor = data['data']['cursor']
            page += 1
            time.sleep(random.uniform(1, 3))  # 随机延时避免被封
            
        if all_comments:
            self.save_to_csv(note_id, all_comments)
            uids = self.extract_uids(all_comments)
            self.save_uids(note_id, uids)
            print(f"共采集到 {len(all_comments)} 条评论,{len(uids)} 个UID")
        else:
            print("未采集到任何评论数据")
            
        return all_comments

if __name__ == "__main__":
    spider = XHSSpider()
    note_url = input("请输入小红书笔记链接:")
    spider.crawl(note_url)

Logo

DAMO开发者矩阵,由阿里巴巴达摩院和中国互联网协会联合发起,致力于探讨最前沿的技术趋势与应用成果,搭建高质量的交流与分享平台,推动技术创新与产业应用链接,围绕“人工智能与新型计算”构建开放共享的开发者生态。

更多推荐