acdream/tools/pdb-extract/pdb_extract.py
Erik 69d884a3d6 tools(pdb-extract): #8 PDB -> symbols.json + types.json sidecar
Pure-Python MSF 7.00 PDB extractor (no deps, stdlib only). Reads
refs/acclient.pdb directly:
  - DBI stream (3) -> symbol record stream index + section header
    stream index
  - Section headers stream (9) -> per-segment image VA bases
  - Symbol record stream (8) -> S_PUB32 records with image VAs
  - TPI stream (2) -> LF_CLASS / LF_STRUCTURE named records (not
    forward-declared), with size leaf + name

Includes a best-effort MSVC C++ demangler so symbols.json is
grep-friendly:
  ?EnchantAttribute@CEnchantmentRegistry@@QBEHKAAK@Z
  -> CEnchantmentRegistry::EnchantAttribute

Both demangled `name` + raw `mangled` emitted per entry so callers
can choose. Operator overloads, vtables, and other special forms
where a partial demangle would be misleading are kept mangled.

Outputs committed to docs/research/named-retail/:
  - symbols.json (2.9 MB) — 18,366 named public function symbols
  - types.json (506 KB) — 5,371 unique named class/struct records

Spot check (matches discovery agent's earlier finding):
  CEnchantmentRegistry::EnchantAttribute -> 0x00594570 ✓

Updated docs/research/acclient_function_map.md header preamble to
direct readers at the new symbols.json as the authoritative name
source; the hand-curated table stays as the cross-port (ACE/ACME)
index. Several addresses there are wrong vs the PDB and will be
swept in the issue #9 close (Phase E).

Closes #8 (filed in Phase D's commit). Foundation for the address
sweep + name-driven workflows from here on.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-25 17:31:52 +02:00

458 lines
18 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Pure-Python MSF 7.00 PDB extractor for acclient.pdb.
Reads the Microsoft Program Database, extracts public function symbols
(S_PUB32, kind 0x110E) and named struct/class type records (LF_CLASS,
LF_STRUCTURE) from the TPI stream, and writes two JSON sidecars to
docs/research/named-retail/:
symbols.json — every named public function with its image VA
types.json — every named struct/class with size
This is a foundation for the named-retail workflow. After running, every
future session can grep the JSON for instant address↔name lookups.
Run with:
py tools/pdb-extract/pdb_extract.py refs/acclient.pdb
References:
https://llvm.org/docs/PDB/MsfFile.html — MSF container layout
https://llvm.org/docs/PDB/PublicStream.html — symbol record format
https://llvm.org/docs/PDB/TpiStream.html — type-info stream layout
https://llvm.org/docs/PDB/DbiStream.html — DBI header + sections
No external dependencies — uses stdlib `struct` + `json` only.
Wire constants (PDB7 / MSF 7.00):
"""
import json
import os
import struct
import sys
from pathlib import Path
# ── MSF / PDB7 constants ───────────────────────────────────────────────────
MSF_MAGIC = b"Microsoft C/C++ MSF 7.00\r\n\x1aDS\0\0\0"
SUPERBLOCK_SIZE = 56 # u32 ×6 + magic; MSF 7.00 layout
# Stream indices (fixed by spec)
STREAM_PDB_INFO = 1
STREAM_TPI = 2
STREAM_DBI = 3
STREAM_IPI = 4
# Symbol record kinds (subset)
S_PUB32 = 0x110E # Public symbol (32-bit)
# Type record kinds (subset)
LF_CLASS = 0x1504
LF_STRUCTURE = 0x1505
LF_UNION = 0x1506
LF_ENUM = 0x1507
# DBI machine type (image-base + section table source)
SECTION_HEADERS_DEFAULT_BASE = 0x00400000 # acclient.exe image base
# CV public-symbol flag bits
PUBSYM_FLAG_CODE = 0x00000002
# ── MSF reader ─────────────────────────────────────────────────────────────
class Msf:
"""In-memory wrapper over a PDB file. Loads the superblock + stream
directory, exposes per-stream byte buffers reconstructed from page
chains."""
def __init__(self, path):
with open(path, "rb") as f:
self._data = f.read()
if not self._data.startswith(MSF_MAGIC):
raise ValueError(f"not an MSF 7.00 file: {path}")
# Superblock fields (after the 32-byte magic).
(self.block_size,
self.free_block_map,
self.num_blocks,
self.num_dir_bytes,
_reserved,
self.block_map_addr) = struct.unpack_from("<6I", self._data, 32)
if self.block_size not in (512, 1024, 2048, 4096):
raise ValueError(f"unexpected block size: {self.block_size}")
# Stream directory: read the directory-block-map first (a list
# of page indices that themselves spell out the directory's
# page list). Then read the directory pages.
dir_pages_needed = _ceil_div(self.num_dir_bytes, self.block_size)
block_map_pages_needed = _ceil_div(dir_pages_needed * 4, self.block_size)
# The block_map_addr is the page index of the FIRST page in the
# block-map. The block-map's pages are stored sequentially.
# Wait, actually it's a single page index pointing to one page
# full of u32 page indices. If there are >block_size/4 pages in
# the directory, this overflows; for typical small PDBs it
# doesn't. acclient.pdb's 119 KB directory at 4 KB pages = 30
# pages -> 30 u32s = 120 bytes, fits in one page. OK.
block_map = self._read_page(self.block_map_addr)
dir_page_indices = struct.unpack_from(
f"<{dir_pages_needed}I", block_map, 0)
dir_data = bytearray()
for pi in dir_page_indices:
dir_data.extend(self._read_page(pi))
dir_data = bytes(dir_data[:self.num_dir_bytes])
# Directory layout:
# u32 NumStreams
# u32 StreamSizes[NumStreams]
# for i in 0..NumStreams: u32 StreamPageIndices[ceil(size_i / block_size)]
num_streams = struct.unpack_from("<I", dir_data, 0)[0]
sizes = struct.unpack_from(f"<{num_streams}I", dir_data, 4)
# 0xFFFFFFFF is the "deleted stream" sentinel — treat as size 0.
sizes = tuple(0 if s == 0xFFFFFFFF else s for s in sizes)
offset = 4 + 4 * num_streams
streams = []
for size in sizes:
pages_needed = _ceil_div(size, self.block_size)
indices = struct.unpack_from(
f"<{pages_needed}I", dir_data, offset)
offset += 4 * pages_needed
streams.append((size, indices))
self.streams = streams
def _read_page(self, page_index):
start = page_index * self.block_size
return self._data[start:start + self.block_size]
def stream(self, idx):
"""Return the raw bytes of stream `idx` (concatenated pages,
truncated to declared size)."""
size, pages = self.streams[idx]
buf = bytearray()
for p in pages:
buf.extend(self._read_page(p))
return bytes(buf[:size])
def _ceil_div(a, b):
return (a + b - 1) // b
# ── DBI parser (just enough for: section headers + symbol stream index) ────
def _parse_dbi(dbi_bytes):
"""Pull the section-header-stream index + symbol-record-stream index
out of the DBI header. Returns (sym_stream_idx, section_hdr_stream_idx).
"""
# DBI header (first 64 bytes)
if len(dbi_bytes) < 64:
raise ValueError("DBI stream too short")
(version_sig, version_hdr, age,
gsi_stream, build_no,
psgsi_stream, pdb_dll_ver,
sym_record_stream, pdb_dll_rbld,
mod_info_size, section_contrib_size, section_map_size,
source_info_size, type_server_map_size, mfc_type_server_idx,
opt_dbg_hdr_size, ec_substream_size,
flags, machine, padding) = struct.unpack_from(
"<iIIHHHHHHiiiiiIiiHHI", dbi_bytes, 0)
# The optional debug header sub-stream is at the end of the DBI.
# Layout offset = 64 + sum of all preceding sub-stream sizes.
base = 64
offsets = {
"mod_info": base,
"section_contrib": base + mod_info_size,
"section_map": base + mod_info_size + section_contrib_size,
"source_info": base + mod_info_size + section_contrib_size + section_map_size,
"type_server_map": base + mod_info_size + section_contrib_size + section_map_size + source_info_size,
"ec_substream": base + mod_info_size + section_contrib_size + section_map_size + source_info_size + type_server_map_size,
"opt_dbg_header": base + mod_info_size + section_contrib_size + section_map_size + source_info_size + type_server_map_size + ec_substream_size,
}
# The optional debug header is an array of u16 stream indices; the
# one at index 5 is the section-headers stream (per LLVM docs).
opt_off = offsets["opt_dbg_header"]
opt_count = opt_dbg_hdr_size // 2
opt_streams = struct.unpack_from(
f"<{opt_count}H", dbi_bytes, opt_off)
# Index 5 = original section headers stream.
section_hdr_stream = opt_streams[5] if opt_count > 5 else 0xFFFF
return sym_record_stream, section_hdr_stream
# ── Public symbols extractor (S_PUB32 from sym-record stream) ──────────────
def _demangle(mangled):
"""Best-effort demangle of MSVC C++ symbol names to Class::Method form.
PDB public symbols use the MSVC ABI mangling. A full demangler would
parse arg types + calling conventions; we only need the readable
qualified name for grep workflows.
Examples:
"?EnchantAttribute@CACQualities@@IBEHKAAK@Z" -> "CACQualities::EnchantAttribute"
"??0CEnchantmentRegistry@@QAE@XZ" -> "CEnchantmentRegistry::CEnchantmentRegistry"
"??1Foo@@QAE@XZ" -> "Foo::~Foo"
"_someThing" -> "_someThing" (C-style, kept as-is)
"?GlobalFunc@@..." -> "GlobalFunc" (no class)
"""
if not mangled or not mangled.startswith("?"):
return mangled # C-linkage symbol, return as-is
# Strip the leading '?' (or '??' for ctors/dtors)
if mangled.startswith("??0"): # constructor
rest = mangled[3:]
ctor_dtor = "ctor"
elif mangled.startswith("??1"): # destructor
rest = mangled[3:]
ctor_dtor = "dtor"
elif mangled.startswith("??_"): # vtable / vbtable / etc — leave as-is
return mangled
else:
rest = mangled[1:]
ctor_dtor = None
# `Name@Class@Outer@@<sig>` -> split on '@@' to drop the signature suffix
sep = rest.find("@@")
if sep < 0:
return mangled # not a recognised pattern
qualified = rest[:sep]
parts = qualified.split("@")
parts = [p for p in parts if p]
if not parts:
return mangled
if ctor_dtor:
# parts[0] is the class name (the only part); ctor name = class name.
cls = parts[0]
outer = "::".join(reversed(parts[1:]))
full = f"{outer}::{cls}" if outer else cls
if ctor_dtor == "dtor":
return f"{full}::~{cls}"
return f"{full}::{cls}"
# Method: parts[0] is the function name, parts[1:] are nested classes
# (innermost first -> reverse for outer::inner::method order).
method = parts[0]
classes = list(reversed(parts[1:]))
if classes:
return "::".join(classes) + "::" + method
return method
def _extract_pub32(sym_bytes, section_bases):
"""Iterate S_PUB32 records, compute image VA, return list of dicts.
sym_bytes: raw sym-record-stream bytes.
section_bases: list of section base VAs (1-indexed via segment field).
"""
pos = 0
end = len(sym_bytes)
out = []
while pos + 4 <= end:
rec_len, kind = struct.unpack_from("<HH", sym_bytes, pos)
if rec_len == 0:
break
rec_end = pos + 2 + rec_len # rec_len excludes its own u16
if kind == S_PUB32:
# body: u32 flags, u32 offset, u16 seg, char name[]
flags, offset, seg = struct.unpack_from("<IIH", sym_bytes, pos + 4)
name_start = pos + 14
zero = sym_bytes.index(b"\0", name_start, rec_end)
mangled = sym_bytes[name_start:zero].decode("ascii", errors="replace")
if (flags & PUBSYM_FLAG_CODE) and seg >= 1 and (seg - 1) < len(section_bases):
va = section_bases[seg - 1] + offset
out.append({
"address": f"0x{va:08X}",
"name": _demangle(mangled),
"mangled": mangled,
"flags": flags,
})
pos = rec_end
# Records align to 4 bytes
if pos % 4:
pos += 4 - (pos % 4)
return out
# ── Section-header stream parser (40 bytes per IMAGE_SECTION_HEADER) ───────
def _parse_section_headers(sec_bytes, image_base=SECTION_HEADERS_DEFAULT_BASE):
"""Each entry is a 40-byte IMAGE_SECTION_HEADER. Returns list of
section-VA bases (so symbol[seg-1] + offset = VA)."""
bases = []
SECTION_SIZE = 40
offset = 0
while offset + SECTION_SIZE <= len(sec_bytes):
# Layout: char Name[8], u32 VirtSize, u32 VirtAddress, ...
virt_size, virt_addr = struct.unpack_from("<II", sec_bytes, offset + 8)
if virt_addr == 0 and offset > 0:
# Padding / empty trailing entry — stop.
break
bases.append(image_base + virt_addr)
offset += SECTION_SIZE
return bases
# ── TPI parser — minimal pass to extract LF_CLASS/STRUCTURE names + sizes ──
def _extract_named_types(tpi_bytes):
"""Walk the TPI stream's type record array and yield named class /
struct / union / enum records with their declared size. Skips
forward-declared (incomplete) records."""
if len(tpi_bytes) < 56:
return []
# TPI header (56 bytes)
(version, header_size,
ti_min, ti_max,
gpi_size,
gpi_substream_offset,
hash_aux_idx,
hash_key_size, num_hash_buckets,
hash_value_off, hash_value_len,
ti_off_off, ti_off_len,
hash_adj_off, hash_adj_len) = struct.unpack_from(
"<IIIIIIHHIIIIIII", tpi_bytes, 0)
pos = header_size # records start right after the header
end = pos + gpi_size
if end > len(tpi_bytes):
end = len(tpi_bytes)
out = []
while pos + 4 <= end:
rec_len, kind = struct.unpack_from("<HH", tpi_bytes, pos)
if rec_len == 0:
break
rec_end = pos + 2 + rec_len
if kind in (LF_CLASS, LF_STRUCTURE):
# body layout (LLVM type-records.h):
# u16 count, u16 props, u32 fieldList, u32 derived,
# u32 vshape, then varint16 size, then null-term string.
try:
count, props, field_list, derived, vshape = struct.unpack_from(
"<HHIII", tpi_bytes, pos + 4)
# varint16 size: <0x8000 = u16 inline; ≥0x8000 = follow-on u16/u32.
size_pos = pos + 4 + 16
size_word = tpi_bytes[size_pos] | (tpi_bytes[size_pos + 1] << 8)
if size_word < 0x8000:
size_val = size_word
name_start = size_pos + 2
else:
# Numeric leaves. 0x8002=u16, 0x8003=u32, etc. Skip.
if size_word == 0x8002:
size_val = struct.unpack_from("<H", tpi_bytes, size_pos + 2)[0]
name_start = size_pos + 4
elif size_word == 0x8003:
size_val = struct.unpack_from("<I", tpi_bytes, size_pos + 2)[0]
name_start = size_pos + 6
else:
# Unknown numeric leaf — skip.
pos = rec_end
if pos % 4: pos += 4 - (pos % 4)
continue
# Name is null-terminated ASCII.
if name_start < rec_end:
zero = tpi_bytes.find(b"\0", name_start, rec_end)
if zero > name_start:
name = tpi_bytes[name_start:zero].decode("ascii", errors="replace")
# Skip forward-decls: bit 7 of `props` (0x80).
is_fwdref = (props & 0x80) != 0
if not is_fwdref and name and not name.startswith("<unnamed"):
out.append({
"name": name,
"size": size_val,
"kind": "class" if kind == LF_CLASS else "struct",
})
except (struct.error, IndexError, ValueError):
pass
pos = rec_end
if pos % 4:
pos += 4 - (pos % 4)
return out
# ── Driver ─────────────────────────────────────────────────────────────────
def _resolve_repo_root():
"""tools/pdb-extract/ -> repo root is two directories up."""
return Path(__file__).resolve().parent.parent.parent
def _main():
if len(sys.argv) != 2:
print("usage: py pdb_extract.py <path-to-acclient.pdb>", file=sys.stderr)
sys.exit(2)
pdb_path = Path(sys.argv[1])
if not pdb_path.exists():
print(f"not found: {pdb_path}", file=sys.stderr)
sys.exit(2)
print(f"loading {pdb_path} ({pdb_path.stat().st_size / 1024 / 1024:.1f} MB)...")
msf = Msf(str(pdb_path))
print(f" block size: {msf.block_size}, streams: {len(msf.streams)}")
# 1. DBI -> find sym-record + section-header stream indices
dbi_bytes = msf.stream(STREAM_DBI)
sym_stream_idx, sec_hdr_stream_idx = _parse_dbi(dbi_bytes)
print(f" sym stream: {sym_stream_idx}, section-hdr stream: {sec_hdr_stream_idx}")
# 2. Section headers -> segment bases
sec_bytes = msf.stream(sec_hdr_stream_idx)
section_bases = _parse_section_headers(sec_bytes)
print(f" sections: {len(section_bases)} (text base = 0x{section_bases[0]:08X})")
# 3. Symbol records -> S_PUB32 entries with image VAs
sym_bytes = msf.stream(sym_stream_idx)
print(f" sym record stream: {len(sym_bytes) / 1024:.1f} KB")
symbols = _extract_pub32(sym_bytes, section_bases)
print(f" extracted {len(symbols)} public function symbols")
# 4. TPI -> named types
tpi_bytes = msf.stream(STREAM_TPI)
print(f" TPI stream: {len(tpi_bytes) / 1024:.1f} KB")
types = _extract_named_types(tpi_bytes)
# Dedup by name (templates/forward-decl spam can produce duplicates)
seen = set()
unique_types = []
for t in types:
if t["name"] not in seen:
seen.add(t["name"])
unique_types.append(t)
print(f" extracted {len(unique_types)} unique named types ({len(types)} total records)")
# 5. Write outputs
repo = _resolve_repo_root()
out_dir = repo / "docs" / "research" / "named-retail"
out_dir.mkdir(parents=True, exist_ok=True)
sym_out = out_dir / "symbols.json"
type_out = out_dir / "types.json"
with open(sym_out, "w", encoding="utf-8") as f:
# Keep address + demangled name + raw mangled name (for callers
# that need the C++ ABI form). Strip flags as not useful for grep.
compact = [
{"address": s["address"], "name": s["name"], "mangled": s["mangled"]}
for s in symbols
]
json.dump(compact, f, indent=2)
with open(type_out, "w", encoding="utf-8") as f:
json.dump(unique_types, f, indent=2)
print(f"\nwrote {sym_out} ({sym_out.stat().st_size / 1024:.1f} KB)")
print(f"wrote {type_out} ({type_out.stat().st_size / 1024:.1f} KB)")
# Spot check: CEnchantmentRegistry::EnchantAttribute should be at 0x594570 per discovery agent.
target = "CEnchantmentRegistry::EnchantAttribute"
for s in symbols:
if s["name"] == target:
print(f"\nspot check: {target} -> {s['address']} (expected 0x00594570)")
break
else:
print(f"\nspot check: {target} NOT FOUND in symbols (PDB lookup mismatch?)")
_main()