自定义内存占用,给吃灰小鸡擦擦灰吧
#!/usr/bin/env python3# -*- coding: utf-8 -*-"""可控制的常驻内存占用工具 - 纯标准库版本适用于Debian/Ubuntu等Linux发行版,无需安装额外依赖"""import sysimport timeimport osimport platformimport gcimport subprocessfrom typing import Optional# --- 配置常量 ---MIN_MEMORY_MB = 1 # 最小允许分配的内存(MB)DEFAULT_MAX_MEMORY_MB = 8192 # 默认最大内存(当无法检测系统内存时)class MemoryManager: """内存管理类,仅使用标准库""" def __init__(self): self.memory_chunk: Optional[bytearray] = None self.is_allocated: bool = False self.target_mb: int = 0 self.actual_allocated_mb: int = 0 self._max_memory_mb = self._calculate_max_memory() def _calculate_max_memory(self) -> int: """根据系统内存动态计算最大允许分配的内存""" try: available_mb = self._get_available_memory_mb() if available_mb is not None: # 设置为系统可用内存的90%,但不超过默认最大值 calculated_max = min(int(available_mb * 0.9), DEFAULT_MAX_MEMORY_MB) return max(calculated_max, MIN_MEMORY_MB) # 确保至少为最小值 else: return DEFAULT_MAX_MEMORY_MB except Exception: return DEFAULT_MAX_MEMORY_MB def get_max_memory_mb(self) -> int: """获取最大允许分配的内存大小""" return self._max_memory_mb def set_target(self, target_mb: int) -> bool: """设置目标内存大小""" if self.is_allocated: return False if not (MIN_MEMORY_MB <= target_mb <= self._max_memory_mb): raise ValueError(f"内存大小必须在 {MIN_MEMORY_MB}-{self._max_memory_mb} MB 之间") self.target_mb = target_mb return True def allocate(self) -> tuple[bool, str]: """分配内存""" if self.target_mb <= 0: return False, "请先设定内存大小" if self.is_allocated: return False, "内存已在占用中" # 检查系统可用内存(更宽松的限制) try: available_mb = self._get_available_memory_mb() if available_mb is not None and self.target_mb > available_mb * 0.95: # 放宽到95% return False, f"目标内存过大,系统可用内存约为 {available_mb:.0f} MB,建议不超过 {int(available_mb * 0.95)} MB" except Exception: pass # 如果获取失败,继续尝试分配 bytes_to_allocate = self.target_mb * 1024 * 1024 try: print(f"正在分配 {self.target_mb} MB 内存...") self.memory_chunk = bytearray(bytes_to_allocate) # 写入数据确保内存真正被使用 self._fill_memory_pattern() self.is_allocated = True self.actual_allocated_mb = self.target_mb return True, f"成功分配 {self.target_mb} MB 内存" except MemoryError: return False, "内存分配失败:系统内存不足" except Exception as e: return False, f"内存分配失败:{str(e)}" def release(self) -> tuple[bool, str]: """释放内存""" if not self.is_allocated: return False, "当前没有占用内存" try: released_mb = self.actual_allocated_mb self.memory_chunk = None # 强制垃圾回收 gc.collect() self.is_allocated = False self.actual_allocated_mb = 0 return True, f"成功释放 {released_mb} MB 内存" except Exception as e: return False, f"内存释放失败:{str(e)}" def _fill_memory_pattern(self): """填充内存模式,确保内存真正被使用""" if self.memory_chunk is None: return # 分块填充,避免一次性操作大内存块 pattern = b'MEMTEST!' pattern_len = len(pattern) chunk_size = min(1024 * 1024, len(self.memory_chunk)) # 1MB块 print("正在填充内存数据...") for i in range(0, len(self.memory_chunk), chunk_size): end_pos = min(i + pattern_len, len(self.memory_chunk)) self.memory_chunk[i:end_pos] = pattern[:end_pos-i] # 每填充100MB显示一次进度 if (i // (1024 * 1024)) % 100 == 0 and i > 0: progress_mb = i // (1024 * 1024) print(f"已填充 {progress_mb} MB...") def _get_available_memory_mb(self) -> Optional[float]: """获取系统可用内存,仅使用标准库方法""" try: system = platform.system().lower() if system == "linux": return self._get_linux_memory() elif system == "darwin": # macOS return self._get_macos_memory() elif system == "windows": return self._get_windows_memory() else: return None except Exception: return None def _get_linux_memory(self) -> Optional[float]: """获取Linux系统内存信息""" try: # 读取 /proc/meminfo if os.path.exists("/proc/meminfo"): with open("/proc/meminfo", 'r', encoding='utf-8') as f: meminfo = f.read() # 解析内存信息 mem_available = None mem_free = None buffers = 0 cached = 0 for line in meminfo.split('\n'): if line.startswith("MemAvailable:"): mem_available = int(line.split()[1]) / 1024 # KB to MB break elif line.startswith("MemFree:"): mem_free = int(line.split()[1]) / 1024 elif line.startswith("Buffers:"): buffers = int(line.split()[1]) / 1024 elif line.startswith("Cached:"): cached = int(line.split()[1]) / 1024 # 优先使用 MemAvailable,否则估算 if mem_available is not None: return mem_available elif mem_free is not None: return mem_free + buffers + cached return None except Exception: return None def _get_macos_memory(self) -> Optional[float]: """获取macOS系统内存信息""" try: # 使用 vm_stat 命令 result = subprocess.run(['vm_stat'], capture_output=True, text=True, timeout=5) if result.returncode == 0: lines = result.stdout.split('\n') page_size = 4096 # macOS默认页面大小 free_pages = 0 for line in lines: if 'Pages free:' in line: free_pages = int(line.split(':')[1].strip().rstrip('.')) return (free_pages * page_size) / (1024 * 1024) return None except Exception: return None def _get_windows_memory(self) -> Optional[float]: """获取Windows系统内存信息""" try: # 使用 wmic 命令 result = subprocess.run( ['wmic', 'OS', 'get', 'FreePhysicalMemory', '/value'], capture_output=True, text=True, timeout=5 ) if result.returncode == 0: for line in result.stdout.split('\n'): if 'FreePhysicalMemory=' in line: kb = int(line.split('=')[1].strip()) return kb / 1024 # KB to MB return None except Exception: return Noneclass MemoryToolUI: """用户界面类 - 纯标准库版本""" def __init__(self): self.memory_manager = MemoryManager() self.colors = self._detect_colors() def _detect_colors(self) -> dict: """检测并返回颜色代码字典""" # 检查是否支持颜色输出 if (hasattr(sys.stdout, "isatty") and sys.stdout.isatty() and os.environ.get("TERM") not in [None, "dumb"] and platform.system() != "Windows"): return { 'red': '\033[91m', 'green': '\033[92m', 'yellow': '\033[93m', 'blue': '\033[94m', 'purple': '\033[95m', 'cyan': '\033[96m', 'bold': '\033[1m', 'reset': '\033[0m' } else: # 不支持颜色时返回空字符串 return {key: '' for key in ['red', 'green', 'yellow', 'blue', 'purple', 'cyan', 'bold', 'reset']} def clear_screen(self): """清屏(跨平台)""" try: if platform.system() == "Windows": os.system('cls') else: os.system('clear') except Exception: # 如果清屏失败,打印空行 print('\n' * 50) def display_menu(self): """显示主菜单""" print("\n" + "=" * 55) print(f"{self.colors['bold']}🔧 可控制的常驻内存占用工具 v2.0{self.colors['reset']}") print(f"{self.colors['cyan']} 适用于 Debian/Ubuntu 等 Linux 发行版{self.colors['reset']}") print("=" * 55) # 状态显示 self._display_status() print("-" * 55) print(" 1. 📏 设定要占用的内存大小 (MB)") print(f" 2. {self.colors['green']}🚀 开启{self.colors['reset']} 内存占用") print(f" 3. {self.colors['red']}🛑 关闭{self.colors['reset']} 内存占用") print(" 4. 📊 显示系统内存信息") print(" 5. 🔄 清屏") print(" 6. 🚪 退出程序") print("=" * 55) def _display_status(self): """显示当前状态""" target_text = f"{self.memory_manager.target_mb} MB" if self.memory_manager.target_mb > 0 else "未设定" if self.memory_manager.is_allocated: status_text = f"{self.colors['green']}✅ 已占用 ({self.memory_manager.actual_allocated_mb} MB){self.colors['reset']}" else: status_text = f"{self.colors['red']}⭕ 未占用{self.colors['reset']}" print(f" {self.colors['blue']}📋 目标大小: {self.colors['yellow']}{target_text}{self.colors['reset']}") print(f" {self.colors['blue']}📊 当前状态: {status_text}") def set_target_size(self): """设置目标内存大小""" if self.memory_manager.is_allocated: print(f"\n{self.colors['red']}❌ 错误:请先关闭当前的内存占用,才能设定新的大小。{self.colors['reset']}") self._wait() return print(f"\n{self.colors['yellow']}💡 提示:{self.colors['reset']}") max_memory = self.memory_manager.get_max_memory_mb() print(f" • 内存范围: {MIN_MEMORY_MB} - {max_memory} MB (根据您的系统内存自动调整)") print(f" • 输入 0 可返回主菜单") # 显示当前系统可用内存 available_mb = self.memory_manager._get_available_memory_mb() if available_mb: print(f" • 当前系统可用内存: {available_mb:.0f} MB") recommended_max = int(available_mb * 0.95) print(f" • 推荐最大分配: {recommended_max} MB") while True: try: size_input = input(f"\n{self.colors['cyan']}请输入内存大小 (MB): {self.colors['reset']}").strip() if size_input == '0': print(f"{self.colors['yellow']}返回主菜单...{self.colors['reset']}") return new_size = int(size_input) try: if self.memory_manager.set_target(new_size): print(f"\n{self.colors['green']}✅ 目标内存大小已设定为 {new_size} MB{self.colors['reset']}") break except ValueError as e: print(f"{self.colors['red']}❌ {str(e)}{self.colors['reset']}") continue except ValueError: print(f"{self.colors['red']}❌ 请输入有效的数字{self.colors['reset']}") except KeyboardInterrupt: print(f"\n{self.colors['yellow']}⚠️ 操作已取消{self.colors['reset']}") return self._wait(1.5) def allocate_memory(self): """分配内存""" print(f"\n{self.colors['blue']}🔄 开始分配内存...{self.colors['reset']}") # 显示警告信息 if self.memory_manager.target_mb > 1024: print(f"{self.colors['yellow']}⚠️ 注意:将要分配 {self.memory_manager.target_mb} MB 内存,请确保系统有足够空间{self.colors['reset']}") confirm = input("是否继续?(y/N): ").strip().lower() if confirm not in ['y', 'yes', '是']: print(f"{self.colors['yellow']}操作已取消{self.colors['reset']}") self._wait() return success, message = self.memory_manager.allocate() if success: print(f"\n{self.colors['green']}✅ {message}{self.colors['reset']}") print(f"{self.colors['cyan']}💡 提示:{self.colors['reset']}") print(" • 现在可以通过以下命令观察内存变化:") print(" - htop 或 top (实时监控)") print(" - free -h (查看内存使用)") print(f" - ps -o pid,vsz,rss,comm -p {os.getpid()} (查看本进程)") else: print(f"{self.colors['red']}❌ {message}{self.colors['reset']}") self._wait(3) def release_memory(self): """释放内存""" print(f"\n{self.colors['blue']}🔄 正在释放内存...{self.colors['reset']}") success, message = self.memory_manager.release() if success: print(f"{self.colors['green']}✅ {message}{self.colors['reset']}") else: print(f"{self.colors['yellow']}ℹ️ {message}{self.colors['reset']}") self._wait(2) def show_system_info(self): """显示系统内存信息""" print(f"\n{self.colors['purple']}=" * 50) print(f"{self.colors['bold']}📊 系统信息{self.colors['reset']}") print(f"{self.colors['purple']}=" * 50) # 系统基础信息 print(f"{self.colors['blue']}🖥️ 操作系统:{self.colors['reset']} {platform.system()} {platform.release()}") print(f"{self.colors['blue']}🐍 Python 版本:{self.colors['reset']} {sys.version.split()[0]}") print(f"{self.colors['blue']}💾 系统架构:{self.colors['reset']} {platform.machine()}") # 内存信息 print(f"\n{self.colors['yellow']}💾 内存信息:{self.colors['reset']}") try: available_mb = self.memory_manager._get_available_memory_mb() if available_mb is not None: print(f" 可用内存: {available_mb:.1f} MB ({available_mb/1024:.2f} GB)") else: print(" 无法获取系统内存信息") except Exception: print(" 获取内存信息时出错") # 当前进程信息 print(f"\n{self.colors['yellow']}📝 当前进程信息:{self.colors['reset']}") print(f" 进程ID: {os.getpid()}") try: # 尝试获取进程内存使用情况(Linux) if platform.system().lower() == "linux": status_file = f"/proc/{os.getpid()}/status" if os.path.exists(status_file): with open(status_file, 'r') as f: for line in f: if line.startswith("VmRSS:"): rss_kb = int(line.split()[1]) print(f" 物理内存使用: {rss_kb/1024:.1f} MB") elif line.startswith("VmSize:"): vsize_kb = int(line.split()[1]) print(f" 虚拟内存使用: {vsize_kb/1024:.1f} MB") except Exception: print(" 无法获取进程详细信息") # 垃圾回收信息 print(f"\n{self.colors['yellow']}🗑️ 垃圾回收信息:{self.colors['reset']}") gc_counts = gc.get_count() print(f" 各代对象数量: {gc_counts}") print(f"\n{self.colors['cyan']}💡 建议的监控命令:{self.colors['reset']}") print(" • free -h # 查看系统内存使用") print(" • htop # 实时系统监控") print(f" • cat /proc/{os.getpid()}/status | grep Vm # 查看本进程内存") print(f"{self.colors['purple']}=" * 50) self._wait(4) def _wait(self, seconds: float = 2.0): """等待指定时间,显示倒计时""" if seconds > 2: for i in range(int(seconds), 0, -1): print(f"\r{self.colors['yellow']}⏱️ {i} 秒后继续...{self.colors['reset']}", end='', flush=True) time.sleep(1) print(f"\r{' ' * 20}\r", end='') # 清除倒计时 else: time.sleep(seconds) def get_user_choice(self) -> str: """获取用户选择""" try: return input(f"{self.colors['cyan']}请选择操作 [1-6]: {self.colors['reset']}").strip() except KeyboardInterrupt: print(f"\n{self.colors['yellow']}⚠️ 程序被用户中断{self.colors['reset']}") return '6' # 退出程序 except EOFError: return '6' def run(self): """主程序循环""" print(f"{self.colors['green']}🎉 欢迎使用内存占用测试工具!{self.colors['reset']}") print(f"{self.colors['cyan']} 纯Python标准库实现,无需额外依赖{self.colors['reset']}") try: while True: self.display_menu() choice = self.get_user_choice() if choice == '1': self.set_target_size() elif choice == '2': self.allocate_memory() elif choice == '3': self.release_memory() elif choice == '4': self.show_system_info() elif choice == '5': self.clear_screen() elif choice == '6': self._safe_exit() break else: print(f"\n{self.colors['red']}❌ 请输入 1-6 之间的数字{self.colors['reset']}") self._wait(1.5) except KeyboardInterrupt: print(f"\n{self.colors['yellow']}⚠️ 程序被中断{self.colors['reset']}") self._safe_exit() except Exception as e: print(f"\n{self.colors['red']}💥 程序出现错误: {e}{self.colors['reset']}") self._safe_exit() def _safe_exit(self): """安全退出程序""" if self.memory_manager.is_allocated: print(f"\n{self.colors['yellow']}🔄 退出前自动释放内存...{self.colors['reset']}") success, message = self.memory_manager.release() if success: print(f"{self.colors['green']}✅ {message}{self.colors['reset']}") print(f"\n{self.colors['green']}👋 感谢使用,程序已安全退出!{self.colors['reset']}") print(f"{self.colors['cyan']}如有问题,请检查系统日志或使用 dmesg 命令{self.colors['reset']}")def main(): """程序入口""" try: app = MemoryToolUI() app.run() except Exception as e: print(f"💥 程序启动失败: {e}") print("请确保使用 Python 3.6 或更高版本") sys.exit(1)if __name__ == "__main__": main()*bash# 在Debian 12上直接运行
python3 xxx.py
或者给予执行权限后直接运行
chmod +x xxx.py
./xxx.py*
评论 (0)