平常会把新装服务器一些必须用到的比如 nginx、nezha-agent 放在一个文件夹,装了系统以后一般直接拷贝,简单修改一些配置后就可以使用 docker-compose 直接启动。然后就可以准备搭建服务,创建 nginx 配置文件,接下来就需要域名解析了。
由于域名托管在 cloudflare,每次都要登录,然后进入域名页面才能配置解析,非常麻烦。于是想着能不能写一个 python 文件,方便进行解析。马上动手让 gpt 写了如下的代码,再用 pyinstaller 编译成二进制文件,效果如图:
使用方式如下:
╭─root@hausen1012 ~╰─# cf用法: dns <action> [domain] [ip_or_target_domain]可用操作: create(c), update(u), delete(d), query(q), query_all(qa)
首先需要创建 python 文件 cf.py
,这里 CF_API_TOKEN
为了方便也可以写死再编译,由于我这里的话,文件夹拷贝过去还需要运行 shell 脚本,我就索性让 shell 脚本帮我写入环境变量了。
#!/usr/bin/env python3# -*- coding: utf-8 -*-import requestsimport sysimport re# -------- 配置区 --------CF_API_TOKEN = os.getenv("CF_API_TOKEN")CF_API_BASE = os.getenv("CF_API_BASE", "https://api.cloudflare.com/client/v4")PUBLIC_IP_URL = os.getenv("PUBLIC_IP_URL", "https://ip.3322.net/")if not CF_API_TOKEN: print("❌ 请在代码或环境变量中设置 CF_API_TOKEN") sys.exit(1)# -----------------------def get_public_ip(): """获取公网IP""" try: ip = requests.get(PUBLIC_IP_URL, timeout=5).text.strip() return ip except Exception as e: print(f"获取公网IP失败: {e}") sys.exit(1)def is_ipv4(value): return bool(re.match(r'^(\d{1,3}\.){3}\d{1,3}$', value))def is_ipv6(value): return bool(re.match(r'^[0-9a-fA-F:]+$', value)) and ":" in valuedef get_zone_id(domain): """根据域名获取Zone ID""" headers = {"Authorization": f"Bearer {CF_API_TOKEN}", "Content-Type": "application/json"} parts = domain.split('.') for i in range(len(parts)-1): zone_name = '.'.join(parts[i:]) resp = requests.get(f"{CF_API_BASE}/zones?name={zone_name}", headers=headers).json() if resp.get("success") and resp.get("result"): return resp["result"][0]["id"] print(f"未找到域名 {domain} 的Zone ID") sys.exit(1)def list_zones(): """列出账户下所有域名 (Zone)""" headers = {"Authorization": f"Bearer {CF_API_TOKEN}", "Content-Type": "application/json"} page = 1 per_page = 50 zones = [] while True: resp = requests.get(f"{CF_API_BASE}/zones?page={page}&per_page={per_page}", headers=headers).json() if not resp.get("success"): print(f"获取Zone失败: {resp}") return [] zones.extend(resp.get("result", [])) if page >= resp.get("result_info", {}).get("total_pages", 1): break page += 1 return zonesdef get_dns_record(zone_id, record_name): """获取指定DNS记录""" headers = {"Authorization": f"Bearer {CF_API_TOKEN}", "Content-Type": "application/json"} page = 1 per_page = 100 while True: resp = requests.get( f"{CF_API_BASE}/zones/{zone_id}/dns_records?name={record_name}&page={page}&per_page={per_page}", headers=headers).json() if not resp.get("success"): return None records = resp.get("result", []) if records: return records[0] # 返回第一个匹配的记录 if page >= resp.get("result_info", {}).get("total_pages", 1): break page += 1 return Nonedef create_or_update_dns(zone_id, record_name, target, record_type="A"): """创建或更新DNS记录""" headers = {"Authorization": f"Bearer {CF_API_TOKEN}", "Content-Type": "application/json"} record = get_dns_record(zone_id, record_name) data = {"type": record_type, "name": record_name, "content": target, "ttl": 120, "proxied": False} if record: record_id = record["id"] resp = requests.put(f"{CF_API_BASE}/zones/{zone_id}/dns_records/{record_id}", json=data, headers=headers).json() print(f"{'更新成功' if resp.get('success') else '更新失败'}: {record_name} -> {target} ({record_type})") else: resp = requests.post(f"{CF_API_BASE}/zones/{zone_id}/dns_records", json=data, headers=headers).json() print(f"{'创建成功' if resp.get('success') else '创建失败'}: {record_name} -> {target} ({record_type})")def delete_dns(zone_id, record_name): """删除DNS记录""" headers = {"Authorization": f"Bearer {CF_API_TOKEN}", "Content-Type": "application/json"} record = get_dns_record(zone_id, record_name) if not record: print(f"未找到 {record_name} 的DNS记录") return record_id = record["id"] resp = requests.delete(f"{CF_API_BASE}/zones/{zone_id}/dns_records/{record_id}", headers=headers).json() print(f"{'删除成功' if resp.get('success') else '删除失败'}: {record_name}")def query_dns(zone_id, record_name): """查询单个DNS记录""" record = get_dns_record(zone_id, record_name) if not record: print(f"{record_name} 没有记录") return print(f"查询结果: {record_name} -> {record['content']} (Type: {record['type']}, TTL: {record['ttl']}, Proxied: {record['proxied']})")def query_all_dns(zone_id): """查询Zone下所有DNS记录""" headers = {"Authorization": f"Bearer {CF_API_TOKEN}", "Content-Type": "application/json"} page = 1 per_page = 100 all_records = [] while True: resp = requests.get(f"{CF_API_BASE}/zones/{zone_id}/dns_records?page={page}&per_page={per_page}", headers=headers).json() if not resp.get("success"): print(f"查询失败: {resp}") return records = resp.get("result", []) if not records: break all_records.extend(records) if page >= resp.get("result_info", {}).get("total_pages", 1): break page += 1 if not all_records: print("没有找到任何DNS记录") return print(f"Zone下所有DNS记录:") for rec in all_records: print(f" {rec['name']} -> {rec['content']} (Type: {rec['type']}, TTL: {rec['ttl']}, Proxied: {rec.get('proxied', '-')})")if __name__ == "__main__": if len(sys.argv) < 2: print("用法: dns <action> [domain] [ip_or_target_domain]") print("可用操作: create(c), update(u), delete(d), query(q), query_all(qa)") sys.exit(1) action = sys.argv[1].strip().lower() domain = sys.argv[2].strip() if len(sys.argv) > 2 else None third_arg = sys.argv[3].strip() if len(sys.argv) > 3 else None action_map = { "c": "create", "u": "update", "d": "delete", "q": "query", "qa": "query_all", "create": "create", "update": "update", "delete": "delete", "query": "query", "query_all": "query_all" } action_full = action_map.get(action) if not action_full: print(f"无效操作: {action}") print("可用操作: create(c), update(u), delete(d), query(q), query_all(qa)") sys.exit(1) # 新逻辑:q / qa 如果没有 domain,就查询所有 zone if action_full in ["query", "query_all"] and not domain: zones = list_zones() if not zones: print("没有找到任何Zone") sys.exit(0) for z in zones: zid, zname = z["id"], z["name"] print(f"=== Zone: {zname} ({zid}) ===") if action_full == "query": print(f"域名: {zname}, 状态: {z['status']}, 创建时间: {z['created_on']}") elif action_full == "query_all": query_all_dns(zid) sys.exit(0) # 其他操作必须有 domain if not domain: print(f"{action_full} 操作需要指定域名") sys.exit(1) # 获取 Zone ID zone_id = get_zone_id(domain) # 执行操作 if action_full in ["create", "update"]: if third_arg: if is_ipv4(third_arg): record_type = "A" target = third_arg elif is_ipv6(third_arg): record_type = "AAAA" target = third_arg else: record_type = "CNAME" target = third_arg else: record_type = "A" target = get_public_ip() print(f"使用{record_type}记录: {domain} -> {target}") create_or_update_dns(zone_id, domain, target, record_type) elif action_full == "delete": delete_dns(zone_id, domain) elif action_full == "query": query_dns(zone_id, domain) elif action_full == "query_all": query_all_dns(zone_id)
将文件编译成二进制文件也非常简单,首先创建虚拟环境,否则可能报错 error: externally-managed-environment
。
Debian/Ubuntu 系列(尤其是 Debian 12 / Ubuntu 23.04+) 新加限制(PEP 668)。系统自带的 python3 环境是 “externally-managed”,不允许随便用 pip 往里面装包,避免破坏系统自带的软件。
python3 -m venv myenvsource myenv/bin/activate
然后安装 pyinstaller
和 pipreqs
,pipreqs
是为了生成 requirements.txt
,pyinstaller
用来生成二进制文件。
pip install pyinstaller pipreqs
安装依赖:
pipreqs ./ --encoding=utf8 --ignore myenvpip install -r requirements.txt
编译成二进制文件:
pyinstaller --onefile cf.py
然后在 ./dist/
目录下就可以看见生成的二进制文件了。