问天问地没人理,找哈基米写了个脚本
import requestsimport jsonimport concurrent.futuresimport time# ================= 配置区 =================# 1. 面板直连地址 (IP形式)DASHBOARD_URL = "http://127.0.0.1:8008"# 2. 账号密码 (用于自动登录获取 Token)USERNAME = "admin"PASSWORD = "your_password_here" # 🔴 请在这里填入你的真实密码 🔴# 3. 线程数 (并发检测数量)MAX_WORKERS = 10# =========================================class NezhaCleaner: def __init__(self, base_url, username, password): self.base_url = base_url self.username = username self.password = password self.headers = { "Content-Type": "application/json", "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) Chrome/126.0.0.0 Safari/537.36", "Origin": base_url, "Referer": f"{base_url}/dashboard" } self.token = None def login(self): """执行登录并更新 Token""" login_url = f"{self.base_url}/api/v1/login" payload = { "username": self.username, "password": self.password } print(f"🔑 正在登录: {self.username} ...") try: resp = requests.post(login_url, json=payload, headers=self.headers, timeout=10) if resp.status_code == 200: data = resp.json() if data.get('success'): self.token = data['data']['token'] # 更新全局 Header,后续请求自动带上 Token self.headers["Authorization"] = f"Bearer {self.token}" print(f"✅ 登录成功! Token 已自动获取。") return True else: print(f"❌ 登录失败: {data}") else: print(f"❌ 登录请求失败: {resp.status_code} - {resp.text}") except Exception as e: print(f"❌ 登录过程出错: {e}") return False def delete_server(self, server_id, server_name): """单删接口""" url = f"{self.base_url}/api/v1/batch-delete/server" try: # 虽然是 batch 接口,但我们只传一个 ID 实现单删 resp = requests.post(url, headers=self.headers, json=[server_id], timeout=10) if resp.status_code == 200: print(f"🗑️ [已删除] {server_name} (ID: {server_id})") else: print(f"❌ [删除失败] {server_name}: {resp.text}") except Exception as e: print(f"❌ [删除请求错] {server_name}: {e}") def process_server(self, server): """ 核心工作线程:检测 -> 判断 -> 删除 """ sid = server.get('id') name = server.get('name') # 构造 RPC 检测请求 check_url = f"{self.base_url}/api/v1/terminal" payload = {"server_id": sid} try: # 发送 Terminal 请求检测 RPC 状态 resp = requests.post(check_url, headers=self.headers, json=payload, timeout=10) is_offline = False # === 核心判定逻辑 (基于你的抓包) === # 即使状态码是 200,如果内容里有 rpc error 也是离线 if resp.status_code == 200: text = resp.text if "rpc error" in text and "transport is closing" in text: is_offline = True # 实锤离线 elif "Unavailable" in text: is_offline = True # 另一种离线报错 if is_offline: print(f"💀 [发现死机] {name} (ID: {sid}) -> RPC通道已关闭") self.delete_server(sid, name) else: # print(f"🟢 [在线] {name}") # 想要日志清静点就注释掉 pass except Exception as e: print(f"⚠️ [检测出错] {name}: {e}") def run(self): # 1. 先登录 if not self.login(): return # 2. 获取列表 list_url = f"{self.base_url}/api/v1/server" print("📡 正在获取服务器列表...") try: resp = requests.get(list_url, headers=self.headers, timeout=15) if resp.status_code != 200: print(f"❌ 获取列表失败: {resp.status_code}") return json_data = resp.json() servers = json_data.get('data') or json_data.get('result', []) if not servers: print("✨ 列表为空,没有机器。") return print(f"✅ 获取成功,共有 {len(servers)} 台机器。") print(f"🔥 启动 {MAX_WORKERS} 线程并发清理...") print("-" * 50) # 3. 线程池并发处理 with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: executor.map(self.process_server, servers) print("-" * 50) print("🎉 清理任务完成。") except Exception as e: print(f"❌ 运行出错: {e}")if __name__ == "__main__": # 实例化并运行 cleaner = NezhaCleaner(DASHBOARD_URL, USERNAME, PASSWORD) cleaner.run()
评论 (0)