在另一个大佬的基础上改的,发往邮件可能适合更多人
如果有用的话请投喂鸡腿~谢谢
import timeimport requestsfrom bs4 import BeautifulSoupimport jsonimport hashlibimport smtplibfrom email.mime.text import MIMETextfrom email.header import Headerimport tracebackimport socketURL = "https://www.netcup.com/en/deals/black-friday"CHECK_INTERVAL = 30 # secondsSNAPSHOT_FILE = "netcup_snapshot.json"# ===============================# QQ 邮箱 SMTP 配置(必填)# ===============================SMTP_HOST = "smtp.qq.com"SMTP_PORT = 465 # SSL 端口,QQ 邮箱推荐 465;587 也可用(TLS) [oai_citation:1‡CSDN博客](https://blog.csdn.net/Coin_Collecter/article/details/129596488?utm_source=chatgpt.com)SMTP_USER = "" # 发件人邮箱SMTP_PASS = "" # 在 QQ 邮箱设置里生成的 16 位授权码,不是QQ密码 MAIL_TO = [""] # 可以多个收件人MAIL_SUBJECT = "Netcup Black Friday 页面变化通知"# ===============================# 发邮件(带重试机制)# ===============================def send_email(html_message: str, max_retries=3): msg = MIMEText(html_message, "html", "utf-8") msg["From"] = SMTP_USER msg["To"] = ", ".join(MAIL_TO) msg["Subject"] = str(Header(MAIL_SUBJECT, "utf-8")) for attempt in range(max_retries): send_success = False server = None try: print(f" [邮件] 尝试发送邮件 (第 {attempt + 1}/{max_retries} 次)...") server = smtplib.SMTP_SSL(SMTP_HOST, SMTP_PORT, timeout=30) server.login(SMTP_USER, SMTP_PASS) server.sendmail(SMTP_USER, MAIL_TO, msg.as_string()) send_success = True # 标记邮件已成功发送 print(f" [邮件] ✅ 邮件发送成功!") except smtplib.SMTPAuthenticationError as e: print(f" [邮件] ❌ SMTP认证失败: {e}") print(" [提示] 请检查QQ邮箱授权码是否正确或已过期") raise # 认证错误不重试 except socket.timeout: print(f" [邮件] ⚠️ 连接超时 (尝试 {attempt + 1}/{max_retries})") if attempt < max_retries - 1: print(f" [邮件] 等待 5 秒后重试...") time.sleep(5) except socket.error as e: # 如果邮件已发送,这可能是连接关闭时的正常错误 if send_success: print(f" [邮件] ⚠️ 连接关闭时出现Socket错误(已忽略): {e}") else: print(f" [邮件] ⚠️ Socket错误: {e} (尝试 {attempt + 1}/{max_retries})") if attempt < max_retries - 1: print(f" [邮件] 等待 5 秒后重试...") time.sleep(5) except Exception as e: # 如果邮件已发送,忽略关闭连接时的其他错误 if send_success: print(f" [邮件] ⚠️ 连接关闭时出现错误(已忽略): {type(e).__name__}") else: print(f" [邮件] ❌ 发送失败: {type(e).__name__}: {e}") if attempt < max_retries - 1: print(f" [邮件] 等待 5 秒后重试...") time.sleep(5) else: raise finally: # 安全关闭连接,忽略关闭时的错误 if server: try: server.quit() except: pass # 如果邮件已成功发送,立即返回 if send_success: return True print(f" [邮件] ❌ 所有 {max_retries} 次尝试均失败") return False# ===============================# 抓取 & 解析产品# ===============================def fetch_products(): print(f" [抓取] 正在访问: {URL}") try: resp = requests.get( URL, timeout=15, headers={ "User-Agent": "Mozilla/5.0 (NetcupWatcher/1.0)" } ) resp.raise_for_status() print(f" [抓取] ✅ HTTP {resp.status_code} - 页面获取成功") except requests.exceptions.Timeout: print(f" [抓取] ❌ 请求超时") raise except requests.exceptions.ConnectionError as e: print(f" [抓取] ❌ 连接错误: {e}") raise except requests.exceptions.HTTPError as e: print(f" [抓取] ❌ HTTP错误: {e}") raise soup = BeautifulSoup(resp.text, "html.parser") products = [] product_cards = soup.find_all("div", class_="deal-card-container") for card in product_cards: # 标题:有些卡片可能是 h2/h3,做个兼容 title_tag = card.find(["h3", "h2"]) title = title_tag.get_text(strip=True) if title_tag else "Unknown Title" # 配置 configs = "\n".join("- " + li.get_text(" ", strip=True) for li in card.find_all("li")) # 购买链接 / 售罄判断 link_tag = card.find("a", href=True) link = "https://www.netcup.com" sold_out = False if link_tag and link_tag.get("href"): link += link_tag["href"] else: sold_out = True # 价格(容错) price = "Unknown Price" price_div = card.find("div", class_="text-price text-green") if price_div: span = price_div.find("span") if span: price = span.get_text(strip=True) # 指纹 fingerprint = hashlib.md5( (title + price + link + ("1" if sold_out else "0")).encode("utf-8") ).hexdigest() products.append({ "title": title, "price": price, "sold_out": sold_out, "link": link, "config": configs, "fingerprint": fingerprint, }) return products# ===============================# 加载上一次快照# ===============================def load_snapshot(): try: with open(SNAPSHOT_FILE, "r", encoding="utf-8") as f: return json.load(f) except (FileNotFoundError, json.JSONDecodeError): return []# ===============================# 保存快照# ===============================def save_snapshot(data): with open(SNAPSHOT_FILE, "w", encoding="utf-8") as f: json.dump(data, f, indent=2, ensure_ascii=False)# ===============================# 对比产品变化# ===============================def detect_changes(old, new): old_fps = {p["fingerprint"] for p in old} new_fps = {p["fingerprint"] for p in new} added = [p for p in new if p["fingerprint"] not in old_fps] removed = [p for p in old if p["fingerprint"] not in new_fps] return added, removed# ===============================# 产品格式化成邮件 HTML# ===============================def format_product(product): return ( f"<b>{product['title']}</b><br>" f"Price: {product['price']}<br>" f"Sold Out: {'YES' if product['sold_out'] else 'NO'}<br>" f"Config:<br><pre>{product['config']}</pre>" f"<a href='{product['link']}'>Product Link</a><br><br>" )# ===============================# 主循环# ===============================def main(): print("="*60) print("🚀 Netcup Black Friday 监控已启动") print(f"📧 邮件通知: {', '.join(MAIL_TO)}") print(f"⏱️ 检查间隔: {CHECK_INTERVAL} 秒") print(f"🔗 监控地址: {URL}") print("="*60) last_snapshot = load_snapshot() if last_snapshot: print(f"📂 已加载上次快照,共 {len(last_snapshot)} 个产品\n") else: print("📂 无历史快照,首次运行\n") error_count = 0 max_consecutive_errors = 5 while True: try: print(f"\n{'='*60}") print(f"🔍 检查时间: {time.strftime('%Y-%m-%d %H:%M:%S')}") products = fetch_products() print(f" [解析] 找到 {len(products)} 个产品") added, removed = detect_changes(last_snapshot, products) if added or removed: print(f" [变化] 🎯 检测到页面变化!") print(f" - 新增: {len(added)} 个产品") print(f" - 移除: {len(removed)} 个产品") html = "⚡ <b>Netcup 页面发生变化!</b><br><br>" if added: html += "🟢 <b>新增产品:</b><br>" for p in added: html += format_product(p) if removed: html += "🔴 <b>消失产品:</b><br>" for p in removed: html += format_product(p) html += f"📦 <b>当前产品总数:</b> {len(products)}<br>" # 尝试发送邮件,即使报告失败也继续保存快照(邮件可能已发送) send_email(html) save_snapshot(products) last_snapshot = products print(f" [快照] 已保存新快照") else: print(f" [状态] ✅ 无变化") # 成功后重置错误计数 error_count = 0 print(f" [休眠] 等待 {CHECK_INTERVAL} 秒后继续...") time.sleep(CHECK_INTERVAL) except KeyboardInterrupt: print("\n\n⚠️ 收到中断信号,正在退出...") break except Exception as e: error_count += 1 error_type = type(e).__name__ error_msg = str(e) print(f"\n❌ 监控出现错误 (第 {error_count} 次):") print(f" 类型: {error_type}") print(f" 信息: {error_msg}") print(f"\n详细堆栈:") print(traceback.format_exc()) # 尝试发送错误通知邮件(但不要让邮件发送失败影响主循环) try: error_html = ( f"❗ <b>监控出现错误</b><br><br>" f"<b>错误类型:</b> {error_type}<br>" f"<b>错误信息:</b> {error_msg}<br><br>" f"<b>详细堆栈:</b><br>" f"<pre>{traceback.format_exc()}</pre><br>" f"<b>时间:</b> {time.strftime('%Y-%m-%d %H:%M:%S')}<br>" f"<b>连续错误次数:</b> {error_count}/{max_consecutive_errors}" ) send_email(error_html) except Exception as mail_error: print(f"⚠️ 无法发送错误通知邮件: {mail_error}") # 如果连续错误过多,退出程序 if error_count >= max_consecutive_errors: print(f"\n🛑 连续错误达到 {max_consecutive_errors} 次,程序退出") break print(f" [休眠] 等待 60 秒后重试...") time.sleep(60)if __name__ == "__main__": main()
评论 (0)