Pure-Python MSF 7.00 PDB extractor (no deps, stdlib only). Reads
refs/acclient.pdb directly:
- DBI stream (3) -> symbol record stream index + section header
stream index
- Section headers stream (9) -> per-segment image VA bases
- Symbol record stream (8) -> S_PUB32 records with image VAs
- TPI stream (2) -> LF_CLASS / LF_STRUCTURE named records (not
forward-declared), with size leaf + name
Includes a best-effort MSVC C++ demangler so symbols.json is
grep-friendly:
?EnchantAttribute@CEnchantmentRegistry@@QBEHKAAK@Z
-> CEnchantmentRegistry::EnchantAttribute
Both demangled `name` + raw `mangled` emitted per entry so callers
can choose. Operator overloads, vtables, and other special forms
where a partial demangle would be misleading are kept mangled.
Outputs committed to docs/research/named-retail/:
- symbols.json (2.9 MB) — 18,366 named public function symbols
- types.json (506 KB) — 5,371 unique named class/struct records
Spot check (matches discovery agent's earlier finding):
CEnchantmentRegistry::EnchantAttribute -> 0x00594570 ✓
Updated docs/research/acclient_function_map.md header preamble to
direct readers at the new symbols.json as the authoritative name
source; the hand-curated table stays as the cross-port (ACE/ACME)
index. Several addresses there are wrong vs the PDB and will be
swept in the issue #9 close (Phase E).
Closes #8 (filed in Phase D's commit). Foundation for the address
sweep + name-driven workflows from here on.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
458 lines
18 KiB
Python
458 lines
18 KiB
Python
"""Pure-Python MSF 7.00 PDB extractor for acclient.pdb.
|
||
|
||
Reads the Microsoft Program Database, extracts public function symbols
|
||
(S_PUB32, kind 0x110E) and named struct/class type records (LF_CLASS,
|
||
LF_STRUCTURE) from the TPI stream, and writes two JSON sidecars to
|
||
docs/research/named-retail/:
|
||
|
||
symbols.json — every named public function with its image VA
|
||
types.json — every named struct/class with size
|
||
|
||
This is a foundation for the named-retail workflow. After running, every
|
||
future session can grep the JSON for instant address↔name lookups.
|
||
|
||
Run with:
|
||
py tools/pdb-extract/pdb_extract.py refs/acclient.pdb
|
||
|
||
References:
|
||
https://llvm.org/docs/PDB/MsfFile.html — MSF container layout
|
||
https://llvm.org/docs/PDB/PublicStream.html — symbol record format
|
||
https://llvm.org/docs/PDB/TpiStream.html — type-info stream layout
|
||
https://llvm.org/docs/PDB/DbiStream.html — DBI header + sections
|
||
|
||
No external dependencies — uses stdlib `struct` + `json` only.
|
||
|
||
Wire constants (PDB7 / MSF 7.00):
|
||
"""
|
||
import json
|
||
import os
|
||
import struct
|
||
import sys
|
||
from pathlib import Path
|
||
|
||
# ── MSF / PDB7 constants ───────────────────────────────────────────────────
|
||
|
||
MSF_MAGIC = b"Microsoft C/C++ MSF 7.00\r\n\x1aDS\0\0\0"
|
||
SUPERBLOCK_SIZE = 56 # u32 ×6 + magic; MSF 7.00 layout
|
||
|
||
# Stream indices (fixed by spec)
|
||
STREAM_PDB_INFO = 1
|
||
STREAM_TPI = 2
|
||
STREAM_DBI = 3
|
||
STREAM_IPI = 4
|
||
|
||
# Symbol record kinds (subset)
|
||
S_PUB32 = 0x110E # Public symbol (32-bit)
|
||
|
||
# Type record kinds (subset)
|
||
LF_CLASS = 0x1504
|
||
LF_STRUCTURE = 0x1505
|
||
LF_UNION = 0x1506
|
||
LF_ENUM = 0x1507
|
||
|
||
# DBI machine type (image-base + section table source)
|
||
SECTION_HEADERS_DEFAULT_BASE = 0x00400000 # acclient.exe image base
|
||
|
||
# CV public-symbol flag bits
|
||
PUBSYM_FLAG_CODE = 0x00000002
|
||
|
||
|
||
# ── MSF reader ─────────────────────────────────────────────────────────────
|
||
|
||
class Msf:
|
||
"""In-memory wrapper over a PDB file. Loads the superblock + stream
|
||
directory, exposes per-stream byte buffers reconstructed from page
|
||
chains."""
|
||
|
||
def __init__(self, path):
|
||
with open(path, "rb") as f:
|
||
self._data = f.read()
|
||
|
||
if not self._data.startswith(MSF_MAGIC):
|
||
raise ValueError(f"not an MSF 7.00 file: {path}")
|
||
|
||
# Superblock fields (after the 32-byte magic).
|
||
(self.block_size,
|
||
self.free_block_map,
|
||
self.num_blocks,
|
||
self.num_dir_bytes,
|
||
_reserved,
|
||
self.block_map_addr) = struct.unpack_from("<6I", self._data, 32)
|
||
|
||
if self.block_size not in (512, 1024, 2048, 4096):
|
||
raise ValueError(f"unexpected block size: {self.block_size}")
|
||
|
||
# Stream directory: read the directory-block-map first (a list
|
||
# of page indices that themselves spell out the directory's
|
||
# page list). Then read the directory pages.
|
||
dir_pages_needed = _ceil_div(self.num_dir_bytes, self.block_size)
|
||
block_map_pages_needed = _ceil_div(dir_pages_needed * 4, self.block_size)
|
||
# The block_map_addr is the page index of the FIRST page in the
|
||
# block-map. The block-map's pages are stored sequentially.
|
||
# Wait, actually it's a single page index pointing to one page
|
||
# full of u32 page indices. If there are >block_size/4 pages in
|
||
# the directory, this overflows; for typical small PDBs it
|
||
# doesn't. acclient.pdb's 119 KB directory at 4 KB pages = 30
|
||
# pages -> 30 u32s = 120 bytes, fits in one page. OK.
|
||
block_map = self._read_page(self.block_map_addr)
|
||
dir_page_indices = struct.unpack_from(
|
||
f"<{dir_pages_needed}I", block_map, 0)
|
||
|
||
dir_data = bytearray()
|
||
for pi in dir_page_indices:
|
||
dir_data.extend(self._read_page(pi))
|
||
dir_data = bytes(dir_data[:self.num_dir_bytes])
|
||
|
||
# Directory layout:
|
||
# u32 NumStreams
|
||
# u32 StreamSizes[NumStreams]
|
||
# for i in 0..NumStreams: u32 StreamPageIndices[ceil(size_i / block_size)]
|
||
num_streams = struct.unpack_from("<I", dir_data, 0)[0]
|
||
sizes = struct.unpack_from(f"<{num_streams}I", dir_data, 4)
|
||
# 0xFFFFFFFF is the "deleted stream" sentinel — treat as size 0.
|
||
sizes = tuple(0 if s == 0xFFFFFFFF else s for s in sizes)
|
||
|
||
offset = 4 + 4 * num_streams
|
||
streams = []
|
||
for size in sizes:
|
||
pages_needed = _ceil_div(size, self.block_size)
|
||
indices = struct.unpack_from(
|
||
f"<{pages_needed}I", dir_data, offset)
|
||
offset += 4 * pages_needed
|
||
streams.append((size, indices))
|
||
self.streams = streams
|
||
|
||
def _read_page(self, page_index):
|
||
start = page_index * self.block_size
|
||
return self._data[start:start + self.block_size]
|
||
|
||
def stream(self, idx):
|
||
"""Return the raw bytes of stream `idx` (concatenated pages,
|
||
truncated to declared size)."""
|
||
size, pages = self.streams[idx]
|
||
buf = bytearray()
|
||
for p in pages:
|
||
buf.extend(self._read_page(p))
|
||
return bytes(buf[:size])
|
||
|
||
|
||
def _ceil_div(a, b):
|
||
return (a + b - 1) // b
|
||
|
||
|
||
# ── DBI parser (just enough for: section headers + symbol stream index) ────
|
||
|
||
def _parse_dbi(dbi_bytes):
|
||
"""Pull the section-header-stream index + symbol-record-stream index
|
||
out of the DBI header. Returns (sym_stream_idx, section_hdr_stream_idx).
|
||
"""
|
||
# DBI header (first 64 bytes)
|
||
if len(dbi_bytes) < 64:
|
||
raise ValueError("DBI stream too short")
|
||
(version_sig, version_hdr, age,
|
||
gsi_stream, build_no,
|
||
psgsi_stream, pdb_dll_ver,
|
||
sym_record_stream, pdb_dll_rbld,
|
||
mod_info_size, section_contrib_size, section_map_size,
|
||
source_info_size, type_server_map_size, mfc_type_server_idx,
|
||
opt_dbg_hdr_size, ec_substream_size,
|
||
flags, machine, padding) = struct.unpack_from(
|
||
"<iIIHHHHHHiiiiiIiiHHI", dbi_bytes, 0)
|
||
|
||
# The optional debug header sub-stream is at the end of the DBI.
|
||
# Layout offset = 64 + sum of all preceding sub-stream sizes.
|
||
base = 64
|
||
offsets = {
|
||
"mod_info": base,
|
||
"section_contrib": base + mod_info_size,
|
||
"section_map": base + mod_info_size + section_contrib_size,
|
||
"source_info": base + mod_info_size + section_contrib_size + section_map_size,
|
||
"type_server_map": base + mod_info_size + section_contrib_size + section_map_size + source_info_size,
|
||
"ec_substream": base + mod_info_size + section_contrib_size + section_map_size + source_info_size + type_server_map_size,
|
||
"opt_dbg_header": base + mod_info_size + section_contrib_size + section_map_size + source_info_size + type_server_map_size + ec_substream_size,
|
||
}
|
||
|
||
# The optional debug header is an array of u16 stream indices; the
|
||
# one at index 5 is the section-headers stream (per LLVM docs).
|
||
opt_off = offsets["opt_dbg_header"]
|
||
opt_count = opt_dbg_hdr_size // 2
|
||
opt_streams = struct.unpack_from(
|
||
f"<{opt_count}H", dbi_bytes, opt_off)
|
||
# Index 5 = original section headers stream.
|
||
section_hdr_stream = opt_streams[5] if opt_count > 5 else 0xFFFF
|
||
|
||
return sym_record_stream, section_hdr_stream
|
||
|
||
|
||
# ── Public symbols extractor (S_PUB32 from sym-record stream) ──────────────
|
||
|
||
def _demangle(mangled):
|
||
"""Best-effort demangle of MSVC C++ symbol names to Class::Method form.
|
||
|
||
PDB public symbols use the MSVC ABI mangling. A full demangler would
|
||
parse arg types + calling conventions; we only need the readable
|
||
qualified name for grep workflows.
|
||
|
||
Examples:
|
||
"?EnchantAttribute@CACQualities@@IBEHKAAK@Z" -> "CACQualities::EnchantAttribute"
|
||
"??0CEnchantmentRegistry@@QAE@XZ" -> "CEnchantmentRegistry::CEnchantmentRegistry"
|
||
"??1Foo@@QAE@XZ" -> "Foo::~Foo"
|
||
"_someThing" -> "_someThing" (C-style, kept as-is)
|
||
"?GlobalFunc@@..." -> "GlobalFunc" (no class)
|
||
"""
|
||
if not mangled or not mangled.startswith("?"):
|
||
return mangled # C-linkage symbol, return as-is
|
||
# Strip the leading '?' (or '??' for ctors/dtors)
|
||
if mangled.startswith("??0"): # constructor
|
||
rest = mangled[3:]
|
||
ctor_dtor = "ctor"
|
||
elif mangled.startswith("??1"): # destructor
|
||
rest = mangled[3:]
|
||
ctor_dtor = "dtor"
|
||
elif mangled.startswith("??_"): # vtable / vbtable / etc — leave as-is
|
||
return mangled
|
||
else:
|
||
rest = mangled[1:]
|
||
ctor_dtor = None
|
||
|
||
# `Name@Class@Outer@@<sig>` -> split on '@@' to drop the signature suffix
|
||
sep = rest.find("@@")
|
||
if sep < 0:
|
||
return mangled # not a recognised pattern
|
||
qualified = rest[:sep]
|
||
parts = qualified.split("@")
|
||
parts = [p for p in parts if p]
|
||
if not parts:
|
||
return mangled
|
||
|
||
if ctor_dtor:
|
||
# parts[0] is the class name (the only part); ctor name = class name.
|
||
cls = parts[0]
|
||
outer = "::".join(reversed(parts[1:]))
|
||
full = f"{outer}::{cls}" if outer else cls
|
||
if ctor_dtor == "dtor":
|
||
return f"{full}::~{cls}"
|
||
return f"{full}::{cls}"
|
||
|
||
# Method: parts[0] is the function name, parts[1:] are nested classes
|
||
# (innermost first -> reverse for outer::inner::method order).
|
||
method = parts[0]
|
||
classes = list(reversed(parts[1:]))
|
||
if classes:
|
||
return "::".join(classes) + "::" + method
|
||
return method
|
||
|
||
|
||
def _extract_pub32(sym_bytes, section_bases):
|
||
"""Iterate S_PUB32 records, compute image VA, return list of dicts.
|
||
|
||
sym_bytes: raw sym-record-stream bytes.
|
||
section_bases: list of section base VAs (1-indexed via segment field).
|
||
"""
|
||
pos = 0
|
||
end = len(sym_bytes)
|
||
out = []
|
||
while pos + 4 <= end:
|
||
rec_len, kind = struct.unpack_from("<HH", sym_bytes, pos)
|
||
if rec_len == 0:
|
||
break
|
||
rec_end = pos + 2 + rec_len # rec_len excludes its own u16
|
||
if kind == S_PUB32:
|
||
# body: u32 flags, u32 offset, u16 seg, char name[]
|
||
flags, offset, seg = struct.unpack_from("<IIH", sym_bytes, pos + 4)
|
||
name_start = pos + 14
|
||
zero = sym_bytes.index(b"\0", name_start, rec_end)
|
||
mangled = sym_bytes[name_start:zero].decode("ascii", errors="replace")
|
||
if (flags & PUBSYM_FLAG_CODE) and seg >= 1 and (seg - 1) < len(section_bases):
|
||
va = section_bases[seg - 1] + offset
|
||
out.append({
|
||
"address": f"0x{va:08X}",
|
||
"name": _demangle(mangled),
|
||
"mangled": mangled,
|
||
"flags": flags,
|
||
})
|
||
pos = rec_end
|
||
# Records align to 4 bytes
|
||
if pos % 4:
|
||
pos += 4 - (pos % 4)
|
||
return out
|
||
|
||
|
||
# ── Section-header stream parser (40 bytes per IMAGE_SECTION_HEADER) ───────
|
||
|
||
def _parse_section_headers(sec_bytes, image_base=SECTION_HEADERS_DEFAULT_BASE):
|
||
"""Each entry is a 40-byte IMAGE_SECTION_HEADER. Returns list of
|
||
section-VA bases (so symbol[seg-1] + offset = VA)."""
|
||
bases = []
|
||
SECTION_SIZE = 40
|
||
offset = 0
|
||
while offset + SECTION_SIZE <= len(sec_bytes):
|
||
# Layout: char Name[8], u32 VirtSize, u32 VirtAddress, ...
|
||
virt_size, virt_addr = struct.unpack_from("<II", sec_bytes, offset + 8)
|
||
if virt_addr == 0 and offset > 0:
|
||
# Padding / empty trailing entry — stop.
|
||
break
|
||
bases.append(image_base + virt_addr)
|
||
offset += SECTION_SIZE
|
||
return bases
|
||
|
||
|
||
# ── TPI parser — minimal pass to extract LF_CLASS/STRUCTURE names + sizes ──
|
||
|
||
def _extract_named_types(tpi_bytes):
|
||
"""Walk the TPI stream's type record array and yield named class /
|
||
struct / union / enum records with their declared size. Skips
|
||
forward-declared (incomplete) records."""
|
||
if len(tpi_bytes) < 56:
|
||
return []
|
||
# TPI header (56 bytes)
|
||
(version, header_size,
|
||
ti_min, ti_max,
|
||
gpi_size,
|
||
gpi_substream_offset,
|
||
hash_aux_idx,
|
||
hash_key_size, num_hash_buckets,
|
||
hash_value_off, hash_value_len,
|
||
ti_off_off, ti_off_len,
|
||
hash_adj_off, hash_adj_len) = struct.unpack_from(
|
||
"<IIIIIIHHIIIIIII", tpi_bytes, 0)
|
||
pos = header_size # records start right after the header
|
||
end = pos + gpi_size
|
||
if end > len(tpi_bytes):
|
||
end = len(tpi_bytes)
|
||
|
||
out = []
|
||
while pos + 4 <= end:
|
||
rec_len, kind = struct.unpack_from("<HH", tpi_bytes, pos)
|
||
if rec_len == 0:
|
||
break
|
||
rec_end = pos + 2 + rec_len
|
||
if kind in (LF_CLASS, LF_STRUCTURE):
|
||
# body layout (LLVM type-records.h):
|
||
# u16 count, u16 props, u32 fieldList, u32 derived,
|
||
# u32 vshape, then varint16 size, then null-term string.
|
||
try:
|
||
count, props, field_list, derived, vshape = struct.unpack_from(
|
||
"<HHIII", tpi_bytes, pos + 4)
|
||
# varint16 size: <0x8000 = u16 inline; ≥0x8000 = follow-on u16/u32.
|
||
size_pos = pos + 4 + 16
|
||
size_word = tpi_bytes[size_pos] | (tpi_bytes[size_pos + 1] << 8)
|
||
if size_word < 0x8000:
|
||
size_val = size_word
|
||
name_start = size_pos + 2
|
||
else:
|
||
# Numeric leaves. 0x8002=u16, 0x8003=u32, etc. Skip.
|
||
if size_word == 0x8002:
|
||
size_val = struct.unpack_from("<H", tpi_bytes, size_pos + 2)[0]
|
||
name_start = size_pos + 4
|
||
elif size_word == 0x8003:
|
||
size_val = struct.unpack_from("<I", tpi_bytes, size_pos + 2)[0]
|
||
name_start = size_pos + 6
|
||
else:
|
||
# Unknown numeric leaf — skip.
|
||
pos = rec_end
|
||
if pos % 4: pos += 4 - (pos % 4)
|
||
continue
|
||
# Name is null-terminated ASCII.
|
||
if name_start < rec_end:
|
||
zero = tpi_bytes.find(b"\0", name_start, rec_end)
|
||
if zero > name_start:
|
||
name = tpi_bytes[name_start:zero].decode("ascii", errors="replace")
|
||
# Skip forward-decls: bit 7 of `props` (0x80).
|
||
is_fwdref = (props & 0x80) != 0
|
||
if not is_fwdref and name and not name.startswith("<unnamed"):
|
||
out.append({
|
||
"name": name,
|
||
"size": size_val,
|
||
"kind": "class" if kind == LF_CLASS else "struct",
|
||
})
|
||
except (struct.error, IndexError, ValueError):
|
||
pass
|
||
|
||
pos = rec_end
|
||
if pos % 4:
|
||
pos += 4 - (pos % 4)
|
||
return out
|
||
|
||
|
||
# ── Driver ─────────────────────────────────────────────────────────────────
|
||
|
||
def _resolve_repo_root():
|
||
"""tools/pdb-extract/ -> repo root is two directories up."""
|
||
return Path(__file__).resolve().parent.parent.parent
|
||
|
||
|
||
def _main():
|
||
if len(sys.argv) != 2:
|
||
print("usage: py pdb_extract.py <path-to-acclient.pdb>", file=sys.stderr)
|
||
sys.exit(2)
|
||
pdb_path = Path(sys.argv[1])
|
||
if not pdb_path.exists():
|
||
print(f"not found: {pdb_path}", file=sys.stderr)
|
||
sys.exit(2)
|
||
|
||
print(f"loading {pdb_path} ({pdb_path.stat().st_size / 1024 / 1024:.1f} MB)...")
|
||
msf = Msf(str(pdb_path))
|
||
print(f" block size: {msf.block_size}, streams: {len(msf.streams)}")
|
||
|
||
# 1. DBI -> find sym-record + section-header stream indices
|
||
dbi_bytes = msf.stream(STREAM_DBI)
|
||
sym_stream_idx, sec_hdr_stream_idx = _parse_dbi(dbi_bytes)
|
||
print(f" sym stream: {sym_stream_idx}, section-hdr stream: {sec_hdr_stream_idx}")
|
||
|
||
# 2. Section headers -> segment bases
|
||
sec_bytes = msf.stream(sec_hdr_stream_idx)
|
||
section_bases = _parse_section_headers(sec_bytes)
|
||
print(f" sections: {len(section_bases)} (text base = 0x{section_bases[0]:08X})")
|
||
|
||
# 3. Symbol records -> S_PUB32 entries with image VAs
|
||
sym_bytes = msf.stream(sym_stream_idx)
|
||
print(f" sym record stream: {len(sym_bytes) / 1024:.1f} KB")
|
||
symbols = _extract_pub32(sym_bytes, section_bases)
|
||
print(f" extracted {len(symbols)} public function symbols")
|
||
|
||
# 4. TPI -> named types
|
||
tpi_bytes = msf.stream(STREAM_TPI)
|
||
print(f" TPI stream: {len(tpi_bytes) / 1024:.1f} KB")
|
||
types = _extract_named_types(tpi_bytes)
|
||
# Dedup by name (templates/forward-decl spam can produce duplicates)
|
||
seen = set()
|
||
unique_types = []
|
||
for t in types:
|
||
if t["name"] not in seen:
|
||
seen.add(t["name"])
|
||
unique_types.append(t)
|
||
print(f" extracted {len(unique_types)} unique named types ({len(types)} total records)")
|
||
|
||
# 5. Write outputs
|
||
repo = _resolve_repo_root()
|
||
out_dir = repo / "docs" / "research" / "named-retail"
|
||
out_dir.mkdir(parents=True, exist_ok=True)
|
||
|
||
sym_out = out_dir / "symbols.json"
|
||
type_out = out_dir / "types.json"
|
||
|
||
with open(sym_out, "w", encoding="utf-8") as f:
|
||
# Keep address + demangled name + raw mangled name (for callers
|
||
# that need the C++ ABI form). Strip flags as not useful for grep.
|
||
compact = [
|
||
{"address": s["address"], "name": s["name"], "mangled": s["mangled"]}
|
||
for s in symbols
|
||
]
|
||
json.dump(compact, f, indent=2)
|
||
with open(type_out, "w", encoding="utf-8") as f:
|
||
json.dump(unique_types, f, indent=2)
|
||
|
||
print(f"\nwrote {sym_out} ({sym_out.stat().st_size / 1024:.1f} KB)")
|
||
print(f"wrote {type_out} ({type_out.stat().st_size / 1024:.1f} KB)")
|
||
# Spot check: CEnchantmentRegistry::EnchantAttribute should be at 0x594570 per discovery agent.
|
||
target = "CEnchantmentRegistry::EnchantAttribute"
|
||
for s in symbols:
|
||
if s["name"] == target:
|
||
print(f"\nspot check: {target} -> {s['address']} (expected 0x00594570)")
|
||
break
|
||
else:
|
||
print(f"\nspot check: {target} NOT FOUND in symbols (PDB lookup mismatch?)")
|
||
|
||
|
||
_main()
|