GitHub监控脚本配置全攻略:实时获取最新漏洞情报并推送到微信

张开发
2026/4/17 17:35:54 15 分钟阅读

分享文章

GitHub监控脚本配置全攻略:实时获取最新漏洞情报并推送到微信
GitHub监控脚本配置全攻略实时获取最新漏洞情报并推送到微信在网络安全领域情报的时效性往往决定了防御的成败。想象一下当一个新的高危漏洞被披露时如果你能比攻击者更早获取相关信息就能为系统打上补丁或部署缓解措施。GitHub作为全球最大的代码托管平台每天都有大量安全研究人员发布最新的漏洞利用代码PoC、漏洞分析报告和安全工具。本文将带你构建一个自动化监控系统实时抓取GitHub上的安全情报并通过微信即时推送到你的手机。1. 环境准备与基础配置在开始之前确保你的系统已安装Python 3.7或更高版本。推荐使用Linux或macOS系统Windows用户建议使用WSL2以获得最佳兼容性。以下是需要预先安装的依赖项pip install requests beautifulsoup4 schedule python-dotenv创建一个新的项目目录并初始化配置文件mkdir github-monitor cd github-monitor touch config.env monitor.py wechat.py在config.env中配置基础参数# GitHub API配置 GITHUB_TOKENyour_personal_access_token SEARCH_KEYWORDSCVE,exploit,vulnerability # 微信推送配置 WECHAT_APP_IDwx1234567890 WECHAT_APP_SECRETyour_app_secret WECHAT_USER_IDuser_openid提示GitHub个人访问令牌需要在账号设置中生成至少授予repo和read:user权限2. 微信推送服务集成要实现消息推送我们需要接入微信公众平台的服务号功能。以下是具体操作步骤注册企业微信访问企业微信官网完成注册在「应用管理」中创建自建应用记录下AgentId、CorpId和Secret三项关键信息配置消息接收# wechat.py import requests class WeChatNotifier: def __init__(self, corp_id, corp_secret, agent_id): self.base_url https://qyapi.weixin.qq.com/cgi-bin self.corp_id corp_id self.corp_secret corp_secret self.agent_id agent_id self.access_token self._get_token() def _get_token(self): url f{self.base_url}/gettoken?corpid{self.corp_id}corpsecret{self.corp_secret} resp requests.get(url).json() return resp[access_token] def send_message(self, content, user_idall): url f{self.base_url}/message/send?access_token{self.access_token} payload { touser: user_id, msgtype: text, agentid: self.agent_id, text: {content: content}, safe: 0 } return requests.post(url, jsonpayload).json()测试推送功能notifier WeChatNotifier( corp_idyour_corp_id, corp_secretyour_corp_secret, agent_id1000002 ) notifier.send_message(测试消息监控系统已启动)3. GitHub监控脚本开发核心监控脚本需要实现以下功能定期搜索GitHub最新安全相关仓库过滤高质量内容格式化推送消息# monitor.py import os import time import requests from bs4 import BeautifulSoup from dotenv import load_dotenv load_dotenv(config.env) class GitHubMonitor: def __init__(self): self.headers { Authorization: ftoken {os.getenv(GITHUB_TOKEN)}, Accept: application/vnd.github.v3json } self.keywords os.getenv(SEARCH_KEYWORDS).split(,) self.last_check int(time.time()) - 3600 # 默认检查过去1小时的内容 def search_repositories(self): results [] for keyword in self.keywords: url fhttps://api.github.com/search/repositories?q{keyword}created:{self._time_to_iso()}sortupdatedorderdesc try: resp requests.get(url, headersself.headers) if resp.status_code 200: results.extend(resp.json()[items]) except Exception as e: print(f搜索出错: {str(e)}) return self._filter_results(results) def _filter_results(self, items): # 按星标数、更新时间、描述完整性进行过滤 return sorted( [item for item in items if item[stargazers_count] 10], keylambda x: x[stargazers_count], reverseTrue )[:10] # 取前10个结果 def _time_to_iso(self): return time.strftime(%Y-%m-%dT%H:%M:%SZ, time.gmtime(self.last_check)) def format_message(self, repo): return f 新安全资源警报 仓库: {repo[name]} 作者: {repo[owner][login]} 描述: {repo[description] or 无描述} 星标: {repo[stargazers_count]} 语言: {repo[language] or 未知} 链接: {repo[html_url]} 更新时间: {repo[updated_at]} 4. 系统整合与优化将各个模块组合成完整系统并添加定时任务和异常处理# main.py import schedule import time from monitor import GitHubMonitor from wechat import WeChatNotifier def job(): monitor GitHubMonitor() notifier WeChatNotifier( corp_idos.getenv(WECHAT_CORP_ID), corp_secretos.getenv(WECHAT_CORP_SECRET), agent_idint(os.getenv(WECHAT_AGENT_ID)) ) try: repos monitor.search_repositories() for repo in repos: message monitor.format_message(repo) notifier.send_message(message) time.sleep(1) # 避免触发速率限制 monitor.last_check int(time.time()) except Exception as e: error_msg f监控任务执行失败: {str(e)} notifier.send_message(error_msg) if __name__ __main__: # 每30分钟运行一次 schedule.every(30).minutes.do(job) while True: schedule.run_pending() time.sleep(60)高级优化技巧关键词动态调整def update_keywords(self, new_keywords): self.keywords list(set(self.keywords new_keywords)) os.environ[SEARCH_KEYWORDS] ,.join(self.keywords) with open(config.env, w) as f: for key, value in os.environ.items(): if key.startswith((GITHUB_, WECHAT_, SEARCH_)): f.write(f{key}{value}\n)结果去重存储import sqlite3 class ResultDatabase: def __init__(self): self.conn sqlite3.connect(results.db) self._init_db() def _init_db(self): self.conn.execute(CREATE TABLE IF NOT EXISTS repos (id INTEGER PRIMARY KEY AUTOINCREMENT, repo_id INT UNIQUE, name TEXT, url TEXT, processed BOOLEAN DEFAULT 0)) def is_new(self, repo_id): cursor self.conn.execute(SELECT 1 FROM repos WHERE repo_id?, (repo_id,)) return cursor.fetchone() is None def add_repo(self, repo): try: self.conn.execute(INSERT INTO repos (repo_id, name, url) VALUES (?, ?, ?), (repo[id], repo[name], repo[html_url])) self.conn.commit() return True except sqlite3.IntegrityError: return False性能监控看板可选from prometheus_client import start_http_server, Counter MONITORED_REPOS Counter(github_monitor_repositories, Total monitored repositories, [keyword]) ALERTS_SENT Counter(github_monitor_alerts, Total alerts sent to WeChat) # 在搜索方法中增加指标记录 def search_repositories(self): for keyword in self.keywords: MONITORED_REPOS.labels(keywordkeyword).inc()5. 实际应用场景扩展基础监控系统搭建完成后可以考虑以下扩展方向漏洞情报深度分析def analyze_cve(content): # 使用正则提取CVE编号 import re cve_pattern rCVE-\d{4}-\d{4,7} cves list(set(re.findall(cve_pattern, content))) # 查询NVD数据库获取漏洞详情 results [] for cve in cves: resp requests.get(fhttps://services.nvd.nist.gov/rest/json/cves/2.0?cveId{cve}) if resp.status_code 200: results.append(resp.json()) return results多平台集成方案平台集成方式优势SlackIncoming Webhook适合团队协作TelegramBot API高送达率支持富媒体企业微信应用消息国内访问稳定钉钉自定义机器人与企业OA系统集成方便历史数据回溯功能def get_historical_data(days7): 获取过去N天的热门安全仓库 since (datetime.now() - timedelta(daysdays)).isoformat() url fhttps://api.github.com/search/repositories?qsecuritycreated:{since}sortstarsorderdesc response requests.get(url, headersself.headers) return response.json().get(items, [])在部署到生产环境时建议使用Docker容器化方案# Dockerfile FROM python:3.9-slim WORKDIR /app COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt COPY . . CMD [python, main.py]构建并运行容器docker build -t github-monitor . docker run -d --restart always --name monitor github-monitor对于需要监控特定项目更新的场景可以扩展仓库级别的监控def watch_repository(owner, repo): 监控特定仓库的更新 url fhttps://api.github.com/repos/{owner}/{repo}/events response requests.get(url, headersself.headers) events response.json() important_events [ e for e in events if e[type] in (PushEvent, ReleaseEvent, IssuesEvent) ] return important_events性能优化建议使用GitHub GraphQL API替代REST API减少请求次数实现增量更新机制只处理新产生的内容对推送消息进行优先级分级处理添加请求缓存避免重复处理相同内容from cachetools import TTLCache # 在类初始化中添加 self.cache TTLCache(maxsize1000, ttl3600) def _make_request(self, url): if url in self.cache: return self.cache[url] response requests.get(url, headersself.headers) self.cache[url] response return response随着监控系统的持续运行你会积累大量安全情报数据。这时可以考虑引入Elasticsearch进行存储和分析from elasticsearch import Elasticsearch es Elasticsearch([localhost:9200]) def index_repository(repo): doc { name: repo[name], owner: repo[owner][login], description: repo[description], stars: repo[stargazers_count], language: repo[language], url: repo[html_url], topics: repo.get(topics, []), timestamp: datetime.now() } es.index(indexgithub-security, documentdoc)最后别忘了设置适当的监控频率。GitHub API有严格的速率限制认证用户每小时5000次请求建议根据实际需求调整监控级别检查频率适用场景高每10分钟关键基础设施漏洞监控中每小时常规安全情报收集低每天趋势分析和历史数据回溯

更多文章