在另一个大佬的基础上改的,发往邮件可能适合更多人
如果有用的话请投喂鸡腿~谢谢

import timeimport requestsfrom bs4 import BeautifulSoupimport jsonimport hashlibimport smtplibfrom email.mime.text import MIMETextfrom email.header import Headerimport tracebackimport socketURL = "https://www.netcup.com/en/deals/black-friday"CHECK_INTERVAL = 30  # secondsSNAPSHOT_FILE = "netcup_snapshot.json"# ===============================# QQ 邮箱 SMTP 配置(必填)# ===============================SMTP_HOST = "smtp.qq.com"SMTP_PORT = 465  # SSL 端口,QQ 邮箱推荐 465;587 也可用(TLS)  [oai_citation:1‡CSDN博客](https://blog.csdn.net/Coin_Collecter/article/details/129596488?utm_source=chatgpt.com)SMTP_USER = ""   # 发件人邮箱SMTP_PASS = ""       # 在 QQ 邮箱设置里生成的 16 位授权码,不是QQ密码 MAIL_TO = [""]  # 可以多个收件人MAIL_SUBJECT = "Netcup Black Friday 页面变化通知"# ===============================# 发邮件(带重试机制)# ===============================def send_email(html_message: str, max_retries=3):    msg = MIMEText(html_message, "html", "utf-8")    msg["From"] = SMTP_USER    msg["To"] = ", ".join(MAIL_TO)    msg["Subject"] = str(Header(MAIL_SUBJECT, "utf-8"))    for attempt in range(max_retries):        send_success = False        server = None        try:            print(f"  [邮件] 尝试发送邮件 (第 {attempt + 1}/{max_retries} 次)...")            server = smtplib.SMTP_SSL(SMTP_HOST, SMTP_PORT, timeout=30)            server.login(SMTP_USER, SMTP_PASS)            server.sendmail(SMTP_USER, MAIL_TO, msg.as_string())            send_success = True  # 标记邮件已成功发送            print(f"  [邮件] ✅ 邮件发送成功!")        except smtplib.SMTPAuthenticationError as e:            print(f"  [邮件] ❌ SMTP认证失败: {e}")            print("  [提示] 请检查QQ邮箱授权码是否正确或已过期")            raise  # 认证错误不重试        except socket.timeout:            print(f"  [邮件] ⚠️  连接超时 (尝试 {attempt + 1}/{max_retries})")            if attempt < max_retries - 1:                print(f"  [邮件] 等待 5 秒后重试...")                time.sleep(5)        except socket.error as e:            # 如果邮件已发送,这可能是连接关闭时的正常错误            if send_success:                print(f"  [邮件] ⚠️  连接关闭时出现Socket错误(已忽略): {e}")            else:                print(f"  [邮件] ⚠️  Socket错误: {e} (尝试 {attempt + 1}/{max_retries})")                if attempt < max_retries - 1:                    print(f"  [邮件] 等待 5 秒后重试...")                    time.sleep(5)        except Exception as e:            # 如果邮件已发送,忽略关闭连接时的其他错误            if send_success:                print(f"  [邮件] ⚠️  连接关闭时出现错误(已忽略): {type(e).__name__}")            else:                print(f"  [邮件] ❌ 发送失败: {type(e).__name__}: {e}")                if attempt < max_retries - 1:                    print(f"  [邮件] 等待 5 秒后重试...")                    time.sleep(5)                else:                    raise        finally:            # 安全关闭连接,忽略关闭时的错误            if server:                try:                    server.quit()                except:                    pass        # 如果邮件已成功发送,立即返回        if send_success:            return True    print(f"  [邮件] ❌ 所有 {max_retries} 次尝试均失败")    return False# ===============================# 抓取 & 解析产品# ===============================def fetch_products():    print(f"  [抓取] 正在访问: {URL}")    try:        resp = requests.get(            URL,            timeout=15,            headers={                "User-Agent": "Mozilla/5.0 (NetcupWatcher/1.0)"            }        )        resp.raise_for_status()        print(f"  [抓取] ✅ HTTP {resp.status_code} - 页面获取成功")    except requests.exceptions.Timeout:        print(f"  [抓取] ❌ 请求超时")        raise    except requests.exceptions.ConnectionError as e:        print(f"  [抓取] ❌ 连接错误: {e}")        raise    except requests.exceptions.HTTPError as e:        print(f"  [抓取] ❌ HTTP错误: {e}")        raise    soup = BeautifulSoup(resp.text, "html.parser")    products = []    product_cards = soup.find_all("div", class_="deal-card-container")    for card in product_cards:        # 标题:有些卡片可能是 h2/h3,做个兼容        title_tag = card.find(["h3", "h2"])        title = title_tag.get_text(strip=True) if title_tag else "Unknown Title"        # 配置        configs = "\n".join("- " + li.get_text(" ", strip=True) for li in card.find_all("li"))        # 购买链接 / 售罄判断        link_tag = card.find("a", href=True)        link = "https://www.netcup.com"        sold_out = False        if link_tag and link_tag.get("href"):            link += link_tag["href"]        else:            sold_out = True        # 价格(容错)        price = "Unknown Price"        price_div = card.find("div", class_="text-price text-green")        if price_div:            span = price_div.find("span")            if span:                price = span.get_text(strip=True)        # 指纹        fingerprint = hashlib.md5(            (title + price + link + ("1" if sold_out else "0")).encode("utf-8")        ).hexdigest()        products.append({            "title": title,            "price": price,            "sold_out": sold_out,            "link": link,            "config": configs,            "fingerprint": fingerprint,        })    return products# ===============================# 加载上一次快照# ===============================def load_snapshot():    try:        with open(SNAPSHOT_FILE, "r", encoding="utf-8") as f:            return json.load(f)    except (FileNotFoundError, json.JSONDecodeError):        return []# ===============================# 保存快照# ===============================def save_snapshot(data):    with open(SNAPSHOT_FILE, "w", encoding="utf-8") as f:        json.dump(data, f, indent=2, ensure_ascii=False)# ===============================# 对比产品变化# ===============================def detect_changes(old, new):    old_fps = {p["fingerprint"] for p in old}    new_fps = {p["fingerprint"] for p in new}    added = [p for p in new if p["fingerprint"] not in old_fps]    removed = [p for p in old if p["fingerprint"] not in new_fps]    return added, removed# ===============================# 产品格式化成邮件 HTML# ===============================def format_product(product):    return (        f"<b>{product['title']}</b><br>"        f"Price: {product['price']}<br>"        f"Sold Out: {'YES' if product['sold_out'] else 'NO'}<br>"        f"Config:<br><pre>{product['config']}</pre>"        f"<a href='{product['link']}'>Product Link</a><br><br>"    )# ===============================# 主循环# ===============================def main():    print("="*60)    print("🚀 Netcup Black Friday 监控已启动")    print(f"📧 邮件通知: {', '.join(MAIL_TO)}")    print(f"⏱️  检查间隔: {CHECK_INTERVAL} 秒")    print(f"🔗 监控地址: {URL}")    print("="*60)    last_snapshot = load_snapshot()    if last_snapshot:        print(f"📂 已加载上次快照,共 {len(last_snapshot)} 个产品\n")    else:        print("📂 无历史快照,首次运行\n")    error_count = 0    max_consecutive_errors = 5    while True:        try:            print(f"\n{'='*60}")            print(f"🔍 检查时间: {time.strftime('%Y-%m-%d %H:%M:%S')}")            products = fetch_products()            print(f"  [解析] 找到 {len(products)} 个产品")            added, removed = detect_changes(last_snapshot, products)            if added or removed:                print(f"  [变化] 🎯 检测到页面变化!")                print(f"         - 新增: {len(added)} 个产品")                print(f"         - 移除: {len(removed)} 个产品")                html = "⚡ <b>Netcup 页面发生变化!</b><br><br>"                if added:                    html += "🟢 <b>新增产品:</b><br>"                    for p in added:                        html += format_product(p)                if removed:                    html += "🔴 <b>消失产品:</b><br>"                    for p in removed:                        html += format_product(p)                html += f"📦 <b>当前产品总数:</b> {len(products)}<br>"                # 尝试发送邮件,即使报告失败也继续保存快照(邮件可能已发送)                send_email(html)                save_snapshot(products)                last_snapshot = products                print(f"  [快照] 已保存新快照")            else:                print(f"  [状态] ✅ 无变化")            # 成功后重置错误计数            error_count = 0            print(f"  [休眠] 等待 {CHECK_INTERVAL} 秒后继续...")            time.sleep(CHECK_INTERVAL)        except KeyboardInterrupt:            print("\n\n⚠️  收到中断信号,正在退出...")            break        except Exception as e:            error_count += 1            error_type = type(e).__name__            error_msg = str(e)            print(f"\n❌ 监控出现错误 (第 {error_count} 次):")            print(f"   类型: {error_type}")            print(f"   信息: {error_msg}")            print(f"\n详细堆栈:")            print(traceback.format_exc())            # 尝试发送错误通知邮件(但不要让邮件发送失败影响主循环)            try:                error_html = (                    f"❗ <b>监控出现错误</b><br><br>"                    f"<b>错误类型:</b> {error_type}<br>"                    f"<b>错误信息:</b> {error_msg}<br><br>"                    f"<b>详细堆栈:</b><br>"                    f"<pre>{traceback.format_exc()}</pre><br>"                    f"<b>时间:</b> {time.strftime('%Y-%m-%d %H:%M:%S')}<br>"                    f"<b>连续错误次数:</b> {error_count}/{max_consecutive_errors}"                )                send_email(error_html)            except Exception as mail_error:                print(f"⚠️  无法发送错误通知邮件: {mail_error}")            # 如果连续错误过多,退出程序            if error_count >= max_consecutive_errors:                print(f"\n🛑 连续错误达到 {max_consecutive_errors} 次,程序退出")                break            print(f"  [休眠] 等待 60 秒后重试...")            time.sleep(60)if __name__ == "__main__":    main()