小红书数据采集实战:如何用Python高效获取电商社交平台公开数据

张开发
2026/4/16 12:01:03 15 分钟阅读

分享文章

小红书数据采集实战:如何用Python高效获取电商社交平台公开数据
小红书数据采集实战如何用Python高效获取电商社交平台公开数据【免费下载链接】xhs基于小红书 Web 端进行的请求封装。https://reajason.github.io/xhs/项目地址: https://gitcode.com/gh_mirrors/xh/xhs在电商社交平台日益重要的今天小红书作为中国领先的生活方式分享社区汇聚了海量的用户生成内容和商业价值数据。对于数据分析师、市场研究人员和开发者来说如何合规、高效地获取这些公开数据成为了一个关键技术挑战。本文将深入探讨如何利用xhs库实现小红书数据的自动化采集无需深入了解复杂的反爬机制即可快速上手。 为什么选择xhs库进行小红书数据采集小红书数据采集的三大技术难点小红书平台为了保护数据安全设置了多重技术壁垒动态签名验证每次请求都需要生成唯一的x-s和x-t签名环境检测机制通过浏览器指纹识别自动化工具请求频率限制严格的IP和账号访问控制xhs库的核心优势签名自动化内置智能签名生成无需手动计算加密参数反检测技术集成stealth.min.js脚本有效绕过环境检测模块化设计清晰的API接口设计降低学习成本企业级稳定性完善的错误处理和重试机制 十分钟快速入门指南环境准备与安装# 安装xhs库 pip install xhs # 安装Playwright依赖 pip install playwright playwright install # 克隆项目仓库包含完整示例 git clone https://gitcode.com/gh_mirrors/xh/xhs cd xhs获取Cookie的三种方法Cookie是xhs库正常运行的关键以下是三种获取方式方法一浏览器手动获取# 访问小红书网站按F12打开开发者工具 # 在Application Cookies中找到以下三个字段 # 1. a1: 用户身份标识 # 2. web_session: 会话标识 # 3. webId: 设备标识方法二使用示例代码自动登录# 参考 example/login_qrcode.py # 通过二维码登录自动获取Cookie from example.login_qrcode import get_cookie_by_qrcode cookie get_cookie_by_qrcode()方法三Docker签名服务# 使用Docker运行签名服务 docker run -it -d -p 5005:5005 reajason/xhs-api:latest第一个采集程序from xhs import XhsClient, FeedType # 初始化客户端 cookie your_cookie_string_here client XhsClient(cookie) # 获取推荐内容 recommend_notes client.get_home_feed(FeedType.RECOMMEND) print(f获取到 {len(recommend_notes)} 条推荐笔记) # 获取第一条笔记详情 if recommend_notes: first_note recommend_notes[0] print(f标题: {first_note.get(title, 无标题)}) print(f作者: {first_note.get(user, {}).get(nickname, 匿名)}) print(f点赞数: {first_note.get(liked_count, 0)})️ 核心架构深度解析签名机制的技术实现xhs库的核心技术突破在于动态签名的自动化生成。让我们深入分析xhs/help.py中的签名函数def sign(uri, dataNone, a1, web_session): 签名函数核心实现 for _ in range(10): # 最多重试10次 try: with sync_playwright() as playwright: browser playwright.chromium.launch(headlessTrue) browser_context browser.new_context() # 加载反检测脚本 browser_context.add_init_script(pathstealth_js_path) context_page browser_context.new_page() context_page.goto(https://www.xiaohongshu.com) # 设置cookie并重载页面 browser_context.add_cookies([ {name: a1, value: a1, domain: .xiaohongshu.com, path: /} ]) context_page.reload() sleep(1) # 关键等待时间 # 调用JavaScript加密函数生成签名 encrypt_params context_page.evaluate( ([url, data]) window._webmsxyw(url, data), [uri, data] ) return { x-s: encrypt_params[X-s], x-t: str(encrypt_params[X-t]) } except Exception: pass # 失败重试 raise Exception(重试多次仍无法签名成功)错误处理体系设计xhs/exception.py定义了完整的异常处理机制异常类型触发条件处理建议DataFetchError数据获取失败检查网络连接重试请求IPBlockErrorIP被限制访问更换代理IP降低请求频率SignError签名生成失败检查Cookie有效性重新登录NeedVerifyError需要验证码验证处理验证码逻辑 多维度数据采集实战1. 用户数据采集# 获取用户基本信息 user_info client.get_user_info(user_id_here) print(f用户昵称: {user_info.get(nickname)}) print(f粉丝数量: {user_info.get(fans)}) print(f获赞总数: {user_info.get(likes)}) # 获取用户发布的笔记 user_notes client.get_user_notes(user_id_here, limit50) print(f用户共发布 {len(user_notes)} 条笔记)2. 内容搜索与分析from xhs import SearchSortType, SearchNoteType # 按关键词搜索 search_results client.search( keyword美妆教程, sort_typeSearchSortType.MOST_POPULAR, # 按热度排序 note_typeSearchNoteType.VIDEO, # 只搜索视频笔记 page1, page_size20 ) # 分析搜索结果 for note in search_results: engagement_rate ( int(note.get(liked_count, 0)) int(note.get(comment_count, 0)) * 2 ) / max(int(note.get(collect_count, 1)), 1) print(f标题: {note.get(title)}) print(f互动率: {engagement_rate:.2f})3. 分类内容采集from xhs import FeedType # 获取不同分类的内容 categories { 穿搭: FeedType.FASION, 美食: FeedType.FOOD, 彩妆: FeedType.COSMETICS, 旅行: FeedType.TRAVEL, 健身: FeedType.FITNESS } for category_name, feed_type in categories.items(): notes client.get_home_feed(feed_type, limit30) print(f{category_name}分类获取到 {len(notes)} 条内容) # 计算平均互动数据 avg_likes sum(int(n.get(liked_count, 0)) for n in notes) / len(notes) avg_comments sum(int(n.get(comment_count, 0)) for n in notes) / len(notes) print(f平均点赞: {avg_likes:.1f}, 平均评论: {avg_comments:.1f}) 高级配置与性能优化并发采集策略对于大规模数据采集单线程效率低下。以下是优化的并发方案import concurrent.futures import time from typing import List class ConcurrentCollector: def __init__(self, max_workers5, delay_between_requests1.0): self.max_workers max_workers self.delay delay_between_requests def collect_user_notes_batch(self, user_ids: List[str]) - dict: 批量采集多个用户的笔记 results {} with concurrent.futures.ThreadPoolExecutor(max_workersself.max_workers) as executor: future_to_user { executor.submit(self._collect_single_user, user_id): user_id for user_id in user_ids } for future in concurrent.futures.as_completed(future_to_user): user_id future_to_user[future] try: notes future.result() results[user_id] notes print(f用户 {user_id} 采集完成共 {len(notes)} 条笔记) except Exception as e: print(f用户 {user_id} 采集失败: {e}) results[user_id] [] time.sleep(self.delay) # 控制请求频率 return results def _collect_single_user(self, user_id: str): 采集单个用户的笔记 # 这里可以添加具体的采集逻辑 return [] # 返回采集结果数据持久化方案import json import csv import sqlite3 from datetime import datetime from pathlib import Path class DataStorage: def __init__(self, base_path./data): self.base_path Path(base_path) self._setup_directories() def _setup_directories(self): 创建存储目录结构 directories [raw, processed, analysis, logs] for dir_name in directories: (self.base_path / dir_name).mkdir(parentsTrue, exist_okTrue) def save_as_json(self, data, filename_prefixnote): 保存为JSON格式 timestamp datetime.now().strftime(%Y%m%d_%H%M%S) filename f{filename_prefix}_{timestamp}.json filepath self.base_path / raw / filename with open(filepath, w, encodingutf-8) as f: json.dump(data, f, ensure_asciiFalse, indent2) print(f数据已保存至: {filepath}) return filepath def save_as_csv(self, data_list, filename_prefixnotes): 保存为CSV格式 if not data_list: return None timestamp datetime.now().strftime(%Y%m%d) filename f{filename_prefix}_{timestamp}.csv filepath self.base_path / processed / filename # 提取所有可能的字段 all_fields set() for item in data_list: all_fields.update(item.keys()) with open(filepath, w, newline, encodingutf-8-sig) as f: writer csv.DictWriter(f, fieldnamessorted(all_fields)) writer.writeheader() writer.writerows(data_list) print(fCSV文件已保存至: {filepath}) return filepath def save_to_sqlite(self, data_list, table_namenotes): 保存到SQLite数据库 db_path self.base_path / database / xhs_data.db db_path.parent.mkdir(parentsTrue, exist_okTrue) conn sqlite3.connect(db_path) cursor conn.cursor() # 创建表简化示例 cursor.execute(f CREATE TABLE IF NOT EXISTS {table_name} ( note_id TEXT PRIMARY KEY, title TEXT, content TEXT, likes INTEGER, comments INTEGER, collects INTEGER, publish_time TIMESTAMP, user_id TEXT ) ) # 插入数据 for note in data_list: cursor.execute(f INSERT OR REPLACE INTO {table_name} VALUES (?, ?, ?, ?, ?, ?, ?, ?) , ( note.get(note_id), note.get(title, ), note.get(desc, ), note.get(liked_count, 0), note.get(comment_count, 0), note.get(collected_count, 0), datetime.fromtimestamp(note.get(time, 0)), note.get(user, {}).get(user_id, ) )) conn.commit() conn.close() print(f数据已保存到SQLite数据库: {db_path}) 实际业务场景应用场景一竞品分析def competitor_analysis(competitor_ids, days7): 竞品账号分析 analysis_results {} for user_id in competitor_ids: # 获取用户近期笔记 user_notes client.get_user_notes(user_id, limit100) # 计算关键指标 total_notes len(user_notes) total_likes sum(int(n.get(liked_count, 0)) for n in user_notes) total_comments sum(int(n.get(comment_count, 0)) for n in user_notes) avg_engagement (total_likes total_comments * 2) / total_notes # 分析内容类型分布 content_types {} for note in user_notes: note_type note.get(type, unknown) content_types[note_type] content_types.get(note_type, 0) 1 analysis_results[user_id] { total_notes: total_notes, total_likes: total_likes, total_comments: total_comments, avg_engagement: avg_engagement, content_distribution: content_types, top_tags: self._extract_top_tags(user_notes, top_n10) } return analysis_results场景二热点话题追踪class TrendTracker: def __init__(self, keywords): self.keywords keywords self.trend_data {} def track_daily_trends(self): 追踪每日趋势 daily_results {} for keyword in self.keywords: # 搜索相关笔记 search_results client.search( keywordkeyword, sort_typeSearchSortType.GENERAL, page_size50 ) # 计算趋势指标 total_notes len(search_results) total_interactions sum( int(n.get(liked_count, 0)) int(n.get(comment_count, 0)) * 2 int(n.get(collect_count, 0)) * 3 for n in search_results ) # 分析发布时间分布 time_distribution self._analyze_time_distribution(search_results) daily_results[keyword] { total_notes: total_notes, total_interactions: total_interactions, avg_interaction_per_note: total_interactions / max(total_notes, 1), time_distribution: time_distribution, top_influencers: self._extract_top_users(search_results, top_n5) } return daily_results⚠️ 常见陷阱与规避方法问题1签名频繁失败现象频繁出现SignError异常解决方案检查Cookie有效性确保a1、web_session、webId三个字段完整适当增加签名函数中的sleep时间参考example/basic_usage.py第29行尝试在签名时设置headlessFalse查看浏览器状态更新stealth.min.js反检测脚本问题2IP被限制访问现象返回错误代码300012规避策略降低请求频率建议单次请求间隔≥3秒使用代理IP池实现IP轮换实现指数退避重试机制监控请求成功率动态调整频率class RateLimiter: def __init__(self, base_delay3.0, max_delay60.0): self.base_delay base_delay self.max_delay max_delay self.failure_count 0 def should_retry(self, response): 判断是否需要重试 if response.status_code 429: # 请求过多 self.failure_count 1 delay min( self.base_delay * (2 ** self.failure_count), self.max_delay ) print(f请求被限制等待 {delay} 秒后重试) time.sleep(delay) return True elif response.status_code 500: # 服务器错误 time.sleep(self.base_delay) return True else: self.failure_count 0 # 重置失败计数 return False问题3数据解析异常现象获取的数据格式不符合预期调试方法使用调试模式查看原始响应数据检查xhs/help.py中的数据解析函数验证API参数是否正确传递确认目标内容是否为公开可访问 生态集成方案与数据分析工具集成import pandas as pd import matplotlib.pyplot as plt import seaborn as sns class DataAnalyzer: def __init__(self, data_storage): self.storage data_storage def create_engagement_report(self, notes_data): 创建互动数据报告 df pd.DataFrame(notes_data) # 基础统计 stats { 笔记总数: len(df), 总点赞数: df[liked_count].sum(), 总评论数: df[comment_count].sum(), 平均点赞数: df[liked_count].mean(), 平均评论数: df[comment_count].mean(), 互动率最高笔记: df.loc[df[liked_count].idxmax()][title] if len(df) 0 else None } # 可视化 plt.figure(figsize(12, 8)) # 点赞分布直方图 plt.subplot(2, 2, 1) df[liked_count].hist(bins20, edgecolorblack) plt.title(点赞数分布) plt.xlabel(点赞数) plt.ylabel(频次) # 评论分布直方图 plt.subplot(2, 2, 2) df[comment_count].hist(bins20, edgecolorblack, colororange) plt.title(评论数分布) plt.xlabel(评论数) plt.ylabel(频次) # 点赞vs评论散点图 plt.subplot(2, 2, 3) plt.scatter(df[liked_count], df[comment_count], alpha0.6) plt.title(点赞与评论关系) plt.xlabel(点赞数) plt.ylabel(评论数) # 保存报告 plt.tight_layout() report_path self.storage.base_path / analysis / engagement_report.png plt.savefig(report_path) plt.close() return stats, report_path与数据库系统集成from sqlalchemy import create_engine, Column, String, Integer, DateTime, Text from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker Base declarative_base() class NoteModel(Base): 笔记数据模型 __tablename__ notes id Column(String(64), primary_keyTrue) title Column(String(500)) content Column(Text) likes Column(Integer, default0) comments Column(Integer, default0) collects Column(Integer, default0) publish_time Column(DateTime) user_id Column(String(64)) user_name Column(String(100)) tags Column(Text) # JSON格式存储标签 created_at Column(DateTime, defaultdatetime.now) updated_at Column(DateTime, defaultdatetime.now, onupdatedatetime.now) class DatabaseManager: def __init__(self, db_urlsqlite:///xhs_data.db): self.engine create_engine(db_url) Base.metadata.create_all(self.engine) self.Session sessionmaker(bindself.engine) def save_notes(self, notes_data): 保存笔记数据到数据库 session self.Session() for note in notes_data: note_model NoteModel( idnote.get(note_id), titlenote.get(title, )[:500], contentnote.get(desc, ), likesint(note.get(liked_count, 0)), commentsint(note.get(comment_count, 0)), collectsint(note.get(collected_count, 0)), publish_timedatetime.fromtimestamp(note.get(time, 0)), user_idnote.get(user, {}).get(user_id, ), user_namenote.get(user, {}).get(nickname, ), tagsjson.dumps(note.get(tag_list, []), ensure_asciiFalse) ) session.merge(note_model) # 使用merge避免重复 session.commit() session.close() print(f成功保存 {len(notes_data)} 条笔记到数据库) 性能优化最佳实践请求优化策略class OptimizedCollector: def __init__(self, client, cache_enabledTrue): self.client client self.cache_enabled cache_enabled self.request_cache {} def get_note_with_cache(self, note_id, force_refreshFalse): 带缓存的笔记获取 if self.cache_enabled and not force_refresh and note_id in self.request_cache: cached_data, timestamp self.request_cache[note_id] # 检查缓存是否过期5分钟 if time.time() - timestamp 300: print(f从缓存获取笔记 {note_id}) return cached_data # 实际请求 note_data self.client.get_note_by_id(note_id) if self.cache_enabled: self.request_cache[note_id] (note_data, time.time()) return note_data def batch_collect_optimized(self, note_ids, batch_size10, delay2.0): 优化批量采集 results [] for i in range(0, len(note_ids), batch_size): batch note_ids[i:ibatch_size] print(f处理批次 {i//batch_size 1}: {len(batch)} 条笔记) batch_results [] for note_id in batch: try: note self.get_note_with_cache(note_id) batch_results.append(note) except Exception as e: print(f获取笔记 {note_id} 失败: {e}) batch_results.append(None) time.sleep(delay / len(batch)) # 均匀分布延迟 results.extend(batch_results) # 批次间延迟 if i batch_size len(note_ids): time.sleep(delay) return results内存管理优化import gc from typing import Generator class MemoryEfficientCollector: def __init__(self, client, chunk_size100): self.client client self.chunk_size chunk_size def stream_user_notes(self, user_id, limit1000) - Generator: 流式获取用户笔记减少内存占用 page 1 collected 0 while collected limit: try: notes self.client.get_user_notes(user_id, pagepage, page_sizeself.chunk_size) if not notes: break for note in notes: if collected limit: break yield note collected 1 page 1 # 定期清理内存 if page % 5 0: gc.collect() except Exception as e: print(f获取第 {page} 页失败: {e}) break def process_large_dataset(self, user_ids, output_file): 处理大型数据集 with open(output_file, w, encodingutf-8) as f: f.write(note_id,title,likes,comments,user_id\n) for user_id in user_ids: print(f处理用户: {user_id}) note_count 0 for note in self.stream_user_notes(user_id): # 逐行写入避免内存积累 f.write(f{note.get(note_id)}, f{note.get(title, ).replace(,, )}, f{note.get(liked_count, 0)}, f{note.get(comment_count, 0)}, f{note.get(user, {}).get(user_id, )}\n) note_count 1 if note_count % 100 0: f.flush() # 定期刷新缓冲区 print(f用户 {user_id} 处理完成共 {note_count} 条笔记) 进阶学习路径第一阶段基础掌握学习xhs/core.py中的核心类和方法运行example/目录中的所有示例代码理解签名机制和反检测原理第二阶段应用开发开发自定义数据处理器实现数据持久化方案构建监控和告警系统学习并发和异步编程优化第三阶段系统集成与现有数据分析平台集成开发RESTful API服务构建分布式采集系统实现自动化部署和运维推荐学习资源官方文档docs/目录测试用例tests/目录社区讨论关注项目更新和Issue讨论相关技术Playwright、Requests、SQLAlchemy、Pandas 总结与展望xhs库为小红书数据采集提供了一个强大而灵活的解决方案。通过本文的介绍您应该已经掌握了快速部署十分钟内完成环境搭建和基础采集核心技术理解签名机制和反检测原理实战应用多种业务场景的数据采集方案性能优化大规模数据采集的性能调优技巧系统集成与现有技术栈的无缝集成随着小红书平台的不断演进数据采集技术也需要持续更新。建议关注项目的GitHub仓库及时获取最新版本和功能更新。同时请始终遵循合规使用原则尊重平台规则和用户隐私。开始您的数据采集之旅挖掘小红书平台的商业价值吧【免费下载链接】xhs基于小红书 Web 端进行的请求封装。https://reajason.github.io/xhs/项目地址: https://gitcode.com/gh_mirrors/xh/xhs创作声明:本文部分内容由AI辅助生成(AIGC),仅供参考

更多文章