"""snapshot_compare.py [out_file] Take a structured snapshot of key clients for time-series comparison. Output: TSV row with timestamp and per-client metrics. Tracks: - Memory (private bytes) - UIElement_UIItem total / cleared / active counts - Palette instance count - CObjCell_subvt instance count - CPhysicsObj instance count """ import ctypes, ctypes.wintypes as wt, struct, sys, time, subprocess def discover_clients(): """Find all running acclient.exe processes by window title. Returns list of (char_name, pid, label) tuples sorted by char_name. Char name extracted from title: 'sessionkey-Coldeve-CharName' -> CharName. """ try: out = subprocess.check_output( ["powershell.exe", "-NoProfile", "-Command", "Get-Process acclient -EA SilentlyContinue | " "ForEach-Object { \"$($_.Id)|$($_.MainWindowTitle)\" }"], text=True, stderr=subprocess.DEVNULL).strip() except Exception: return [] out_list = [] for line in out.splitlines(): line = line.strip() if "|" not in line: continue pid_str, title = line.split("|", 1) try: pid = int(pid_str) except ValueError: continue # extract char name from "session-Coldeve-CharName" parts = title.split("-Coldeve-", 1) char = parts[1].strip() if len(parts) == 2 else title # short tag tag = char.split()[0].lower() if char else f"pid{pid}" label = "auto-discovered" if "Jerry" in title: label = "UNPATCHED control" out_list.append((tag, pid, label)) return sorted(out_list, key=lambda x: x[0]) CLIENTS = discover_clients() VTABLES = { "uiitem": 0x007c0498, "palette": 0x007caa08, "cphysicsobj": 0x007c78ec, "renderSurf": 0x0079a67c, "renderSurfD3D": 0x00801a94, "renderTexD3D": 0x00801a18, "csurface": 0x007ca4dc, "imgtex": 0x007cab04, "cgfxobj": 0x007ca418, "d3dxmesh": 0x007ed3b0, } PROCESS_VM_READ = 0x10 PROCESS_QUERY_INFORMATION = 0x400 MEM_COMMIT = 0x1000 MEM_PRIVATE = 0x20000 class MBI(ctypes.Structure): _fields_ = [('BaseAddress', ctypes.c_void_p), ('AllocationBase', ctypes.c_void_p), ('AllocationProtect', wt.DWORD), ('PartitionId', wt.WORD), ('RegionSize', ctypes.c_size_t), ('State', wt.DWORD), ('Protect', wt.DWORD), ('Type', wt.DWORD)] k = ctypes.windll.kernel32 k.OpenProcess.argtypes = [wt.DWORD, wt.BOOL, wt.DWORD]; k.OpenProcess.restype = wt.HANDLE k.CloseHandle.argtypes = [wt.HANDLE]; k.CloseHandle.restype = wt.BOOL k.ReadProcessMemory.argtypes = [wt.HANDLE, wt.LPCVOID, wt.LPVOID, ctypes.c_size_t, ctypes.POINTER(ctypes.c_size_t)] k.ReadProcessMemory.restype = wt.BOOL k.VirtualQueryEx.argtypes = [wt.HANDLE, ctypes.c_void_p, ctypes.POINTER(MBI), ctypes.c_size_t] k.VirtualQueryEx.restype = ctypes.c_size_t _mb_cache = None def _populate_mb_cache(): """One PowerShell call returns all acclient WS in one shot to avoid spawning a PS process per PID (which was failing 80% of the time due to concurrent-spawn resource pressure).""" global _mb_cache _mb_cache = {} try: out = subprocess.check_output( ["powershell.exe", "-NoProfile", "-Command", "Get-Process acclient -EA SilentlyContinue | " "ForEach-Object { \"$($_.Id) $([int]($_.PrivateMemorySize64/1MB))\" }"], text=True, stderr=subprocess.DEVNULL, timeout=30).strip() except Exception: return for line in out.splitlines(): parts = line.strip().split() if len(parts) == 2: try: _mb_cache[int(parts[0])] = int(parts[1]) except ValueError: pass def get_mb(pid): if _mb_cache is None: _populate_mb_cache() return _mb_cache.get(pid, 0) def scan_process(pid): """Return dict of counts per VTABLE name, plus uiitem_cleared (count where +0x5fc==0).""" h = k.OpenProcess(PROCESS_VM_READ | PROCESS_QUERY_INFORMATION, False, pid) if not h: return None counts = {name: 0 for name in VTABLES} uiitem_addrs = [] vt_to_name = {vt: name for name, vt in VTABLES.items()} mbi = MBI() addr = 0 while k.VirtualQueryEx(h, addr, ctypes.byref(mbi), ctypes.sizeof(mbi)): pr = mbi.Protect & 0xff if (mbi.State == MEM_COMMIT and mbi.Type == MEM_PRIVATE and pr in (0x04, 0x40)): buf = (ctypes.c_ubyte * mbi.RegionSize)() sz = ctypes.c_size_t(0) if k.ReadProcessMemory(h, mbi.BaseAddress, buf, mbi.RegionSize, ctypes.byref(sz)): data = bytes(buf[:sz.value]) end = (len(data) // 4) * 4 for off in range(0, end, 4): v = struct.unpack_from("= 0x80000000: break # For each uiitem, read +0x5fc to determine cleared vs active uiitem_cleared = 0 for inst in uiitem_addrs: buf4 = (ctypes.c_ubyte * 4)() sz4 = ctypes.c_size_t(0) if k.ReadProcessMemory(h, inst + 0x5fc, buf4, 4, ctypes.byref(sz4)): if struct.unpack(" 1 else None ts = time.strftime("%Y-%m-%d %H:%M:%S") METRICS = ["uiitem", "uiitem_cleared", "uiitem_active", "palette", "cphysicsobj", "renderSurf", "renderSurfD3D", "renderTexD3D", "csurface", "imgtex", "cgfxobj", "d3dxmesh"] header_cols = ["timestamp", "client", "label", "pid", "mb"] + METRICS rows = [] for name, pid, label in CLIENTS: mb = get_mb(pid) if mb == 0: row = [ts, name, label, pid, 0, "DEAD"] + [""] * (len(METRICS) - 1) else: c = scan_process(pid) if c is None: row = [ts, name, label, pid, mb, "NOACCESS"] + [""] * (len(METRICS) - 1) else: row = [ts, name, label, pid, mb] + [c.get(m, 0) for m in METRICS] rows.append(row) # Print table print(f"\n=== Snapshot @ {ts} ===") short = {"uiitem": "UIIt", "uiitem_cleared": "clrd", "uiitem_active": "actv", "palette": "Pal", "cphysicsobj": "CPhy", "renderSurf": "RSurf", "renderSurfD3D": "RSD3D", "renderTexD3D": "RTD3D", "csurface": "CSurf", "imgtex": "ImgT", "cgfxobj": "CGfx", "d3dxmesh": "Mesh"} hdr = f"{'client':<10} {'label':<22} {'pid':>5} {'MB':>5}" for m in METRICS: hdr += f" {short[m]:>6}" print(hdr) for r in rows: if str(r[5]) in ("DEAD", "NOACCESS"): print(f"{r[1]:<10} {r[2]:<22} {r[3]:>5} {r[4]:>5} {r[5]}") else: line = f"{r[1]:<10} {r[2]:<22} {r[3]:>5} {r[4]:>5}" for i in range(len(METRICS)): line += f" {r[5+i]:>6}" print(line) # Append to TSV if out_file: write_header = False try: with open(out_file, "r"): pass except FileNotFoundError: write_header = True with open(out_file, "a") as f: if write_header: f.write("\t".join(header_cols) + "\n") for r in rows: f.write("\t".join(str(x) for x in r) + "\n") print(f"\nappended to {out_file}")