实习的时候刚好接触到这个工具,想着学一下静态分析,网上也没搜到相关源码分析…只能自己看了orz,还特地学了一周 rust 哈哈哈
前置知识
Pcode
IR (Intermediate Representation)是中间表示的缩写,指在编译器或二进制分析工具中,将原始的机器代码或汇编代码转换为一种中间抽象形式,以便于进行分析和优化
Pcode(Program Code)是 Ghidra 反汇编器使用的中间表示(IR)。它是 Ghidra 将二进制代码分析后的低级表示形式,提供了一种独立于特定 CPU 架构的二进制代码表示,每条汇编指令被分解为一系列 Pcode 操作(一条汇编指令对应多个 Pcode 操作),常见 Pcode 操作码类型:
- 算术运算:
INT_ADD,INT_SUB,INT_MULT,INT_DIV,INT_REM - 逻辑运算:
INT_AND,INT_OR,INT_XOR,INT_NEGATE - 移位操作:
INT_LEFT,INT_RIGHT,INT_SRIGHT - 内存操作:
LOAD,STORE - 数据转换:
INT_ZEXT,INT_SEXT,INT_CARRY,INT_SCARRY - 控制流:
BRANCH,CBRANCH,CALL,CALLIND,RETURN - 复制:
COPY
Varnode 是 Pcode 中的基本数据单位,用于在 Pcode Raw 的 inputs / output 字段描述 输入 / 输出变量信息,包含:
- **
size**:数据大小(字节数) - **
address_space**:地址空间类型(const / register / ram 内存地址 / unique 临时变量 / stack) - **
address_space_offset**:在地址空间中的偏移(寄存器名 / 数值) - **
pointer_size**:指针大小(用于地址计算)
Pcode Raw 是 Ghidra 的 Pcode Extractor 插件输出的 json 格式文件,包含程序结构(函数、基本块)、Pcode 操作、寄存器信息、调用约定(函数调用和返回)、数据类型属性(数据类型的尺寸信息)
Pcode Raw 文件结构:
- **
program.functions**:程序中的所有函数列表 - **
register_properties**:CPU 寄存器的层次结构和属性 - **
calling_conventions**:函数调用约定(参数传递、返回值等) - **
datatype_properties**:C 数据类型的尺寸信息 - **
image_base**:程序在内存中的基址
{
"program": {
"functions": [
{
"name": "函数名",
"address": "函数地址",
"blocks": [
{
"address": "基本块地址",
"instructions": [
{
"mnemonic": "汇编指令助记符",
"address": "指令地址",
"size": 指令字节大小,
"terms": [
{
"address": "操作地址",
"index": 操作索引,
"operation": {
"pcode_mnemonic": "Pcode操作码",
"output": 输出varnode,
"inputs": [输入varnode列表]
}
}
"potential_targets": "间接控制流转移的潜在目标地址列表(存在 jmp / call)",
"fall_through": "顺序执行的下一条指令地址"
]
}
]
}
]
}
]
},
"register_properties": [...],
"cpu_arch": "CPU架构名称",
"external_functions": {...},
"entry_points": ["入口点地址列表"],
"stack_pointer_register": {...},
"calling_conventions": {...},
"datatype_properties": {...},
"image_base": "程序基址"
}
汇编转 Pcode 示例:
汇编:
0x1000: push %rbp ; 保存基指针
0x1001: mov %rsp,%rbp ; 设置新的基指针
0x1004: add %esi,%edi ; EDI = EDI + ESI
0x1007: pop %rbp ; 恢复基指针
0x1008: ret ; 返回
Pcode:
INT_SUB RSP - 8 → RSP
STORE RSP ← RBP // push %rbp
COPY RSP → RBP // mov %rsp,%rbp
INT_ADD EDI, ESI → EAX // add %esi,%edi
LOAD RSP → RBP
INT_ADD RSP + 8 → RSP // pop %rbp
RETURN // ret
Pcode 在 Pcode Raw 的 instructions 字段中的表示:以 push 为例
{
"mnemonic": "push",
"address": "0x1000",
"size": 1,
"terms": [
// Pcode 指令1:INT_SUB RSP - 8 → RSP
{
"address": "0x1000",
"index": 0,
"operation": {
"pcode_mnemonic": "INT_SUB", // Pcode 操作码
"input0": { // 输入1:RSP
"size": 8,
"address_space": "register",
"address_space_offset": "RSP",
"pointer_size": 8,
"register_name": "RSP",
"register_size": 8
},
"input1": { // 输入2:8
"size": 8,
"address_space": "const",
"address_space_offset": "8",
"pointer_size": 8,
"register_name": null,
"register_size": null
},
"input2": null, // 无第三个输入
"output": { // 输出:RSP
"size": 8,
"address_space": "register",
"address_space_offset": "RSP",
"pointer_size": 8,
"register_name": "RSP",
"register_size": 8
}
}
},
// Pcode 指令2:STORE RSP ← RBP
{
"address": "0x1000",
"index": 1,
"operation": {
"pcode_mnemonic": "STORE", // Pcode 操作码
"input0": { // 输入1:RSP
"size": 8,
"address_space": "register",
"address_space_offset": "RSP",
"pointer_size": 8,
"register_name": "RSP",
"register_size": 8
},
"input1": { // 输入2:RBP
"size": 8,
"address_space": "register",
"address_space_offset": "RBP",
"pointer_size": 8,
"register_name": "RBP",
"register_size": 8
},
"input2": null,
"output": null
}
}
],
"potential_targets": null, // 没有可能跳转的地址
"fall_through": "0x1001" // 顺序执行的下一条指令地址
},
cwe_checker 源码分析
caller/src/main.rs
功能:CWE 检查器的命令行界面,主程序
import module 导入模块
从 cwe_checker_lib 库导入的模块
use cwe_checker_lib::analysis::graph;
use cwe_checker_lib::checkers::CweModule;
use cwe_checker_lib::pipeline::{disassemble_binary, AnalysisResults};
use cwe_checker_lib::utils::binary::BareMetalConfig;
use cwe_checker_lib::utils::debug;
use cwe_checker_lib::utils::log::{print_all_messages, CweWarning, LogLevel, LogMessage};
use cwe_checker_lib::utils::read_config_file;
使用本地的 caller/src/cfg_stats.rs 模块(控制流图统计)
mod cfg_stats;
main 入口点
fn main() -> Result<(), Error> {
/// 主函数入口
let cmdline_args = CmdlineArgs::parse(); // clap::Parser 自动生成的 parse()
run_with_ghidra(&cmdline_args)
}
CmdlineArgs 参数解析
参数示例: cwe_checker binary --partial CWE476,CWE416 --json -o results.json
binary:指定二进制文件路径,除非使用 module_versions 参数否则必填,并且会调用 check_file_existence 检查文件是否存在,返回文件路径字符串
/// 仅当 file_path 指向现有文件时才返回 Ok(file_path) fn check_file_existence(file_path: &str) -> Result<String, String> { if std::fs::metadata(file_path) .map_err(|err| format!("{err}"))? .is_file() { Ok(file_path.to_string()) } else { Err(format!("{file_path} is not a file.")) } }module_versions:打印所有已知模块的版本号,不需要参数
config:指定自定义配置文件路径,可以用缩写 -c,会调用 check_file_existence 检查文件是否存在,返回文件路径字符串
out:将结果写入指定文件,不输出到不准输出,可以使用缩写 -o
partial:指定要运行的特定检查集, 使用
,分割,可以使用缩写-p,例如-p CWE332,CWE476,CWE782json:生成 json 输出,不需要参数,可以使用缩写 -j
quiet:静默输出,不打印日志消息,不需要参数,可以使用缩写 -q
verbose:打印额外的调试日志消息,不需要参数,可以使用缩写 -v,与 quiet 不能同时使用
statistics:在日志消息中包含各种统计信息,不需要参数,与 quiet 不能同时使用
bare_metal_config:用于裸机二进制分析的配置文件路径,输入二进制文件都将被视为裸机二进制文件,会调用 check_file_existence 检查文件是否存在,返回文件路径字符串
debug:用于调试目的的输出,参数为 CliDebugMode 枚举类型成员
#[derive(ValueEnum, Clone, Debug, Copy)] /// 选择要显示的调试输出类型 pub enum CliDebugMode { // Ghidra插件的原始输出 PcodeRaw, // Ghidra插件的输出反序列化到 Rust 类型 PcodeParsed, // 程序的第一个IR表示 IrEarly, // 在函数内基本块被正常排序之后 IrFnBlksSorted, // 在非返回外部函数被标记之后 IrNonRetExtFunctionsMarked, // 在外部函数存根调用被替换为外部函数调用之后 IrExtCallsReplaced, // 在现有引用块被内联到函数中之后 IrInlined, // 在子寄存器替换过程之后 IrSubregistersSubstituted, // 在所有控制流转换都有有效目标之后 IrCfPatched, // 在空函数被移除之后 IrEmptyFnRemoved, // 未优化的IR IrRaw, // 在函数的不可达基本块被移除之后 IrIntraproceduralDeadBlocksElimed, // 在平凡表达式被替换为其结果之后 IrTrivialExpressionsSubstituted, // 在输入表达式沿着变量赋值传播之后 IrInputExpressionsPropagated, // 在死变量赋值被移除之后 IrDeadVariablesElimed, // 在具有相同条件的条件语句的控制流被简化之后 IrControlFlowPropagated, // 在栈指针通过逻辑AND对齐被替换为减法操作之后 IrStackPointerAlignmentSubstituted, // 最终的IR IrOptimized, // 整个程序的调用图 Cg, // 整个程序的控制流图 Cfg, // 指针推理计算结果 Pi, }pcode_raw:从文件读取保存的 Pcode 提取器插件输出,而不是调用 Ghidra
cfg_stats:打印 IR 程序CFG的统计信息和指标并退出,不需要参数(如函数内圈复杂度、基本块数、指令数、外部函数被调用次数等)
run_with_ghidra 主要函数
功能:以 Ghidra 作为后端运行 cwe_checker
调用
impl From<&CmdlineArgs> for debug::Settings将 args 程序参数转换为目标类型let debug_settings = args.into();处理 debug:获取指定的 debug 模式(全部模式见 CliDebugMode 类型),未指定则使用默认值,即不调试
处理 verbose:指定日志详细程度,包括指定 –verbose 详细模式;–quiet 静默模式;未指定 默认模式
构建 Settings 对象,设置默认值
处理 –pcode-raw:选择保存 Pcode 的文件路径
构建最终的 Settings 对象,返回 debug::Settings 结构体
impl From<&CmdlineArgs> for debug::Settings { /// 将命令行参数转换为调试设置 fn from(args: &CmdlineArgs) -> Self { // 处理 debug 参数 let stage = match &args.debug { // debug mode 值见 CliDebugMode None => debug::Stage::default(), // 没有指定 --debug 参数则使用默认值(不调试) // 如果用户指定了 --debug 参数,则将 CliDebugMode 转换为 debug::Stage // mode.into() 会调用 impl From<&CliDebugMode> for debug::Stage 的 from 方法 Some(mode) => mode.into(), }; // 处理 verbose 参数:确定日志详细程度,检查用户是否指定了 --verbose 或 --quiet 参数 let verbosity = if args.verbose { debug::Verbosity::Verbose // 如果指定了 --verbose,使用详细模式(打印所有调试信息) } else if args.quiet { debug::Verbosity::Quiet // 如果指定了 --quiet,使用静默模式(不打印日志) } else { debug::Verbosity::default() // 如果两个都没指定,使用默认模式(正常日志) }; // 构建 Settings 对象:调用 SettingsBuilder::default() 创建建造者,设置默认值 let mut builder = debug::SettingsBuilder::default() .set_stage(stage) // 设置调试阶段(调用图、IR 等) .set_verbosity(verbosity) // 设置日志详细程度(verbose/quiet/normal) .set_termination_policy(debug::TerminationPolicy::EarlyExit); // 设置调试后退出 // 处理 --pcode-raw 参数:可选地设置保存的 Pcode 文件路径 if let Some(pcode_raw) = &args.pcode_raw { // 将字符串路径转换为 PathBuf,并设置到建造者中 // 这样可以从保存的文件读取 Pcode,而不需要调用 Ghidra builder = builder.set_saved_pcode_raw(PathBuf::from(pcode_raw.clone())); } // 调用 build() 方法构建最终的 Settings 对象,返回一个完整的 debug::Settings 结构体 builder.build() } }示例:
转换前:
CmdlineArgs { binary: Some("binary"), config: None, out: None, partial: None, json: false, quiet: false, verbose: false, statistics: false, bare_metal_config: None, module_versions: false, debug: None, // ← 没有指定 --debug pcode_raw: None, cfg_stats: false, }转换后:
debug::Settings { stage: Stage::No, // 不调试 verbose: Verbosity::Normal, // 正常日志 terminate: TerminationPolicy::EarlyExit, // 调试后退出 saved_pcode_raw: None, // 不使用保存的文件 }debug 相关函数位于 src\cwe_checker_lib\src\utils\debug.rs
Stage 类型:
#[derive(PartialEq, Eq, Copy, Clone, Debug, Default)] #[non_exhaustive] pub enum Stage { #[default] No, // 默认值,无阶段 All, // 全部阶段 CallGraph, // 构建调用图 ControlFlowGraph, // 构建控制流图 Pi, // 指针推理 Ir(IrForm), // 生成中间表示 Pcode(PcodeForm), // 解析 Pcode Cwe, // CWE 检查阶段 }其中 IrForm 类型(生成中间表示的子阶段):
#[derive(PartialEq, Eq, Copy, Clone, Debug)] #[non_exhaustive] pub enum IrForm { Early, // 程序的第一个 IR 表示 SingleTargetIndirectCallsReplaced, // 单目标间接调用已被替换为直接调用 FnBlksSorted, // 函数内基本块已正常排序后 NonRetExtFunctionsMarked, // 非返回外部函数已标记后 ExtCallsReplaced, // 外部函数存根调用已替换为外部函数调用后 Inlined, // 现有引用块已内联到函数中后 SubregistersSubstituted, // 子寄存器替换过程后 CfPatched, // 所有控制流转换都有有效目标后 EmptyFnRemoved, // 空函数已被移除后 EntryPointsExist, // 不存在的入口点已被移除后 Raw, // 未优化的 IR IntraproceduralDeadBlocksElimed, // 函数中不可达基本块已被移除后 TrivialExpressionsSubstituted, // 平凡表达式已替换为其结果后 InputExpressionsPropagated, // 输入表达式已沿变量赋值传播后 DeadVariablesElimed, // 死变量赋值已被移除后 ControlFlowPropagated, // 具有相同条件的条件语句的控制流已简化后 StackPointerAlignmentSubstituted, // 栈指针通过逻辑 AND 对齐已替换为减法操作后 Optimized, // 最终优化的 IR }PcodeForm:pcode 解析子阶段
#[derive(PartialEq, Eq, Copy, Clone, Debug)] #[non_exhaustive] pub enum PcodeForm { Raw, // 从 ghidra 获取的初始数据 Parsed, // 转化成 rust 类型的数据 }Verbosity 类型:
#[derive(PartialEq, Eq, Copy, Clone, Debug, Default)] #[non_exhaustive] pub enum Verbosity { Quiet, // 静默模式 #[default] Normal, // 默认的正常模式 Verbose, // 打印额外日志信息 }TerminationPolicy 类型:调试完成后的操作
#[derive(PartialEq, Eq, Copy, Clone, Debug, Default)] #[non_exhaustive] pub enum TerminationPolicy { KeepRunning, // 继续运行 #[default] EarlyExit, // 默认提前退出 Panic, // 异常终止 }
获取每个模块的 CweModule 结构体
let mut modules = cwe_checker_lib::checkers::get_modules();get_modules 位于 src\cwe_checker_lib\src\checkers.rs:
pub fn get_modules() -> Vec<&'static CweModule> { vec![ &crate::checkers::cwe_78::CWE_MODULE, &crate::checkers::cwe_119::CWE_MODULE, &crate::checkers::cwe_134::CWE_MODULE, &crate::checkers::cwe_190::CWE_MODULE, &crate::checkers::cwe_215::CWE_MODULE, &crate::checkers::cwe_243::CWE_MODULE, &crate::checkers::cwe_252::CWE_MODULE, &crate::checkers::cwe_332::CWE_MODULE, &crate::checkers::cwe_337::CWE_MODULE, &crate::checkers::cwe_367::CWE_MODULE, &crate::checkers::cwe_416::CWE_MODULE, &crate::checkers::cwe_426::CWE_MODULE, &crate::checkers::cwe_467::CWE_MODULE, &crate::checkers::cwe_476::CWE_MODULE, &crate::checkers::cwe_560::CWE_MODULE, &crate::checkers::cwe_676::CWE_MODULE, &crate::checkers::cwe_782::CWE_MODULE, &crate::checkers::cwe_789::CWE_MODULE, &crate::analysis::pointer_inference::CWE_MODULE, ] }同文件中定义了 CweModule 结构体,包括模块名、模块版本、函数名
pub struct CweModule { pub name: &'static str, pub version: &'static str, pub run: CweModuleFn, }在每个 CWE 的 mod.rs 中都会设置该结构体,例如
cwe_module!("CWE119", "0.3", check_cwe);打印每个模块的版本信息:
if args.module_versions { // 仅打印模块版本然后退出 println!("[cwe_checker] module_versions:"); for module in modules.iter() { println!("{module}"); } return Ok(()); }将 bare_metal_config 指定路径的 json 文件转化为 BareMetalConfig 结构体
let bare_metal_config_opt: Option<BareMetalConfig> = args.bare_metal_config.as_ref().map(|config_path| { let file = std::io::BufReader::new(std::fs::File::open(config_path).unwrap()); serde_json::from_reader(file) .expect("Parsing of the bare metal configuration file failed") });BareMetalConfig 结构体位于 src\cwe_checker_lib\src\utils\binary.rs
pub struct BareMetalConfig { pub processor_id: String, // 指定 CPU 类型 pub flash_base_address: String, // Flash 起始地址(程序代码) pub ram_base_address: String, // RAM 起始地址(数据、堆栈) pub ram_size: String, // RAM 大小 }bare_metal_config 配置文件示例
{ "_comment": "The CPU architecture of the chip. Valid values are those that Ghidra accepts as processor IDs.", "processor_id": "ARM:LE:32:v8", "_comment_1": "The base address, where the contents of the binary would be mapped on the chip, as a hexadecimal number.", "flash_base_address": "0x08000000", "_comment_2": "The base address, of the RAM memory region as a hexadecimal number.", "ram_base_address": "0x20000000", "_comment_3": "The size of the RAM memory region (in bytes) as a hexadecimal number.", "ram_size": "0x00030000" }区别:
普通程序内存布局: 逻辑程序内存布局: 0x08048000 ──────┐ 0x08000000 ──────┐ │ .text (代码段) │ Flash (整个二进制) 0x0804A000 ──────┼ 0x08020000 ──────┼ │ .data (数据段) │ RAM (数据区) 0x0804B000 ──────┼ 0x08022000 ──────┘ │ .bss (BSS段) │ 0x0804C000 ──────┘获取二进制文件路径
let binary_file_path = PathBuf::from(args.binary.clone().unwrap());反编译二进制文件:将二进制文件路径、配置文件路径、debug 设置传递给 disassemble_binary 函数(位于 src\cwe_checker_lib\src\pipeline\mod.rs),返回二进制程序字节形式的文件内容(bianry)和 Project 结构体(project)
let (binary, project) = disassemble_binary(&binary_file_path, bare_metal_config_opt, &debug_settings)?;其中 Project 结构体如下(位于 cwe_checker_lib\src\intermediate_representation\project.rs),该结构体是一个用于表示二进制文件的主要数据,包含反编译后的信息、执行环境等
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq, Clone)] pub struct Project { pub program: Term<Program>, // 二进制文件中所有已知可执行代码(反汇编后的程序) pub cpu_architecture: String, // 二进制文件的CPU 架构 pub stack_pointer_register: Variable, // 栈指针寄存器 // 可用于调用外部函数的已知调用约定集合 pub calling_conventions: BTreeMap<String, CallingConvention>, pub register_set: BTreeSet<Variable>, // 已知基础物理寄存器 pub datatype_properties: DatatypeProperties, // 数据类型属性(例如大小) pub runtime_memory_image: RuntimeMemoryImage, // 加载到内存的内存映像 }在调试模式(指定参数 –debug cg)下生成并输出程序的调用图,构建调用图函数位于 cwe_checker_lib/src/analysis/graph/call.rs
if debug_settings.should_debug(debug::Stage::CallGraph) { // 从 project.program 构建调用图,以函数为节点、调用关系为边的有向图,用于展示函数间的调用关系 let cg = graph::call::CallGraph::new(&project.program); // 以紧凑 JSON 的格式打印调用图数据 debug_settings.print_compact_json(&cg, debug::Stage::CallGraph); }处理 cfg_stats 参数:根据反编译后的程序构建一份控制流图(CFG)的统计信息对象(如函数内圈复杂度、基本块数、指令数、外部函数被调用次数等,函数位于 src/caller/src/cfg_stats.rs )
if args.cfg_stats { let cfg_stats = cfg_stats::CfgProperties::new(&project.program); println!("{:#}", serde_json::to_value(cfg_stats)?); return Ok(()); }过滤要执行的模块:
- 存在 partial 参数就从参数获取要执行的 CWE 检查
- 如果是内核模块,则仅启用内核模块的检查
- 否则只默认禁用 CWE78
if let Some(ref partial_module_list) = args.partial { // 从 partial 参数获取检查的 cwe 编号 filter_modules_for_partial_run(&mut modules, partial_module_list); } else if project.runtime_memory_image.is_lkm { // 是内核模块则选择相应的内置 cwe 序列 modules.retain(|module| cwe_checker_lib::checkers::MODULES_LKM.contains(&module.name)); } else { // CWE78在当前标准运行中被禁用,因为它在某些二进制文件上消耗大量RAM和计算时间 modules.retain(|module| module.name != "CWE78"); }MODULES_LKM 位于 src\cwe_checker_lib\src\checkers.rs
pub const MODULES_LKM: [&str; 10] = [ "CWE134", "CWE190", "CWE215", "CWE252", "CWE416", "CWE457", "CWE467", "CWE476", "CWE676", "CWE789", ];读取配置文件:
- 指定了 config 参数,从指定的 config 路径获取配置文件
- 是内核模块,读取 src\lkm_config.json
- 否则使用默认配置文件:src\config.json
let config: serde_json::Value = if let Some(ref config_path) = args.config { // 从 config 获取 let file = std::io::BufReader::new(std::fs::File::open(config_path).unwrap()); serde_json::from_reader(file).context("Parsing of the configuration file failed")? } else if project.runtime_memory_image.is_lkm { // 内核模块 read_config_file("lkm_config.json")? } else { // 默认 read_config_file("config.json")? };配置每个 CWE 检查模块使用的符号列表和参数,例如:
"CWE78": { "system_symbols": [ "system" ] }, "CWE190": { "symbols": [ "xmalloc", "malloc", "realloc", "calloc" ] },生成程序控制流图:生成控制流图函数位于 src\cwe_checker_lib\src\analysis\graph.rs
// 生成含日志的程序级控制流图 let control_flow_graph = graph::get_program_cfg_with_logs(&project.program); // 如需调试 CFG 则以紧凑 JSON 形式输出 debug_settings.print_compact_json(control_flow_graph.deref(), debug::Stage::ControlFlowGraph); // 汇总分析结果,包括二进制文件内容、控制流图 CFG、project 指针到 AnalysisResults 结构体 let analysis_results = AnalysisResults::new(&binary, &control_flow_graph, &project);其中 AnalysisResults 结构体如下
#[derive(Clone, Copy)] pub struct AnalysisResults<'a> { pub binary: &'a [u8], // 二进制文件内容 pub control_flow_graph: &'a Graph<'a>, // 程序控制流图 pub project: &'a Project, // 指向 project 结构体的指针 // 函数签名分析的结果 pub function_signatures: Option<&'a BTreeMap<Tid, FunctionSignature>>, // 指针推理分析的结果 pub pointer_inference: Option<&'a PointerInference<'a>>, // 字符串抽象分析的结果 pub string_abstraction: Option<&'a StringAbstraction<'a, BricksDomain>>, }其关联函数如下
pub fn new( binary: &'a [u8], control_flow_graph: &'a Graph<'a>, project: &'a Project, ) -> AnalysisResults<'a> { AnalysisResults { binary, control_flow_graph, project, function_signatures: None, pointer_inference: None, string_abstraction: None, } }设置依赖字符串抽象分析、指针推理的模块集合,并且根据集合是否为空来设置 needed 标志
// 依赖字符串抽象分析的模块集合 let modules_depending_on_string_abstraction = BTreeSet::from_iter(["CWE78"]); // 依赖指针推理的模块集合 let modules_depending_on_pointer_inference = BTreeSet::from_iter([ "CWE119", "CWE134", "CWE190", "CWE252", "CWE337", "CWE416", "CWE476", "CWE789", "Memory", ]); // 是否需要运行字符串抽象分析,即字符串抽象分析的模块集合是否为空 let string_abstraction_needed = modules .iter() .any(|module| modules_depending_on_string_abstraction.contains(&module.name)); // 是否需要运行指针推理,即指针推理集合是否为空 let pi_analysis_needed = string_abstraction_needed || modules .iter() .any(|module| modules_depending_on_pointer_inference.contains(&module.name));设置指针推理标记时,计算函数签名,并且保存到 analysis_results 结构体,函数位于 src\cwe_checker_lib\src\pipeline\results.rs
let function_signatures = if pi_analysis_needed { // 若需要指针推理,先计算函数签名 // 计算函数参数/返回等签名信息 let function_signatures = analysis_results.compute_function_signatures(); Some(function_signatures) } else { None // 不需要则为空 }; // 将函数签名附加到 analysis_results 分析结果结构体中 let analysis_results = analysis_results.with_function_signatures(function_signatures.as_deref());设置指针推理标记,计算完函数签名后,运行指针推理,并且保存到 analysis_results 结构体,函数位于 src\cwe_checker_lib\src\pipeline\results.rs
let pi_analysis_results = if pi_analysis_needed { // 需要运行指针推理 // 传入内存配置与统计开关,如果启用 statistics 则在日志消息中包含各种统计信息 Some(analysis_results.compute_pointer_inference(&config["Memory"], args.statistics)) } else { None // 否则不计算 }; // 将指针推理结果附加到分析结果 let analysis_results = analysis_results.with_pointer_inference(pi_analysis_results.as_ref());设置字符串抽象分析标记,计算字符串抽象分析,函数位于 src\cwe_checker_lib\src\pipeline\results.rs
let string_abstraction_results = // 需要则运行字符串抽象分析 if string_abstraction_needed { Some(analysis_results.compute_string_abstraction( &config["StringAbstraction"], // 字符串抽象相关配置 pi_analysis_results.as_ref(), // 可能依赖指针推理结果 )) } else { None }; // 将字符串抽象结果附加到分析结果 let analysis_results = analysis_results.with_string_abstraction(string_abstraction_results.as_ref());打印调试信息
if debug_settings.should_debug(debug::Stage::Pi) { // 若仅调试指针推理阶段 cwe_checker_lib::analysis::pointer_inference::run( &analysis_results, serde_json::from_value(config["Memory"].clone()).unwrap(), true, false, ); return Ok(()); // 调试输出后提前结束 }执行所有选定的检测模块,位于 [src\cwe_checker_lib\src\checkers](###*src\cwe_checker_lib\src\checkers(cwe modules))
// 执行模块并收集它们的日志和CWE警告 let mut all_cwe_warnings = Vec::new(); // 收集所有模块的CWE警告(含日志包装) for module in modules { // 逐个运行启用的CWE模块 let cwe_warnings = (module.run)(&analysis_results, &config[&module.name], &debug_settings); // 传入分析结果、该模块配置、调试设置 all_cwe_warnings.push(cwe_warnings); // 累加结果 }打印模块结果,当参数指定了 –quiet 则抑制日志消息
// 打印模块的结果 let all_logs: Vec<&LogMessage> = if args.quiet { // 如开启安静模式则抑制日志 Vec::new() // 由于设置了`--quiet`标志,抑制所有日志消息 } else { let mut all_logs = Vec::new(); // 否则汇总所有来源的日志 // 聚合所有带有日志的对象的日志 all_logs.extend(project.logs().iter()); // 工程阶段日志 all_logs.extend(control_flow_graph.logs().iter()); // CFG 构建日志 if let Some(function_signatures) = &function_signatures { // 函数签名日志 all_logs.extend(function_signatures.logs().iter()); } for cwe_warnings in all_cwe_warnings.iter() { // 各CWE模块日志 all_logs.extend(cwe_warnings.logs().iter()); } if args.statistics { todo!() } if !args.verbose { // 非详细模式下,过滤掉 Debug 级别日志 all_logs.retain(|log_msg| log_msg.level != LogLevel::Debug); } all_logs }; // 展平成所有CWE告警列表 let all_cwes: Vec<&CweWarning> = all_cwe_warnings.iter().flat_map(|x| x.iter()).collect(); // 根据参数选择输出到文件/控制台,支持JSON print_all_messages(all_logs, all_cwes, args.out.as_deref(), args.json);
src\cwe_checker_lib\src\pipeline\mod.rs
disassemble_binary 反编译程序
功能:反编译二进制程序
在 run_with_ghidra 中被调用,传入参数为二进制文件路径、bare_metal_config、调试设置,返回二进制程序内容和 project 结构体
let (binary, project) =
disassemble_binary(&binary_file_path, bare_metal_config_opt, &debug_settings)?;
以字节形式获取二进制文件的内容到 binary 字节向量
将二进制程序、文件路径、bare_metal_config、debug_settings 传递给 ghidra 进行分析,得到 project 结构体和原始 IR,相关代码位于 src\cwe_checker_lib\src\utils\ghidra.rs
调用 [project.optimize](####project.optimize 优化 IR) 对 IR 进行优化:cwe_checker 使用 Pcode 作为输入,将其转换为自己的中间表示(IR)进行漏洞检测
生成二进制文件的运行时内存映像表示,并全局调整一次内存地址,存放到 project.runtime_memory_image
ghidra 为了分析方便可能会将整个程序移动到一个非零的地址,也就是加上了一个基址,所以在获取真实地址的时候需要减去基址,这里调整内存地址后续就无需再考虑基址问题
pub fn disassemble_binary(
binary_file_path: &Path, // 二进制文件路径
bare_metal_config_opt: Option<BareMetalConfig>, // 可选的裸机配置
debug_settings: &debug::Settings, // 调试设置
) -> Result<(Vec<u8>, WithLogs<Project>), Error> { // 返回(二进制内容,带日志的项目)或错误
let binary: Vec<u8> = // 读取二进制文件内容到字节向量,读取失败时添加上下文错误信息
std::fs::read(binary_file_path).context("Could not read from binary file path {}")?;
let mut project = get_project_from_ghidra( // 从 Ghidra 获取项目信息
binary_file_path, // 文件路径
&binary[..], // 二进制内容字节向量
bare_metal_config_opt.clone(), // 裸机配置(克隆以避免所有权转移)
debug_settings, // 调试设置
)?;
// 规范化项目并收集其生成的日志消息,打印原始IR表示
debug_settings.print(&project.program.term, debug::Stage::Ir(debug::IrForm::Raw));
project.optimize(debug_settings); // 对项目进行优化(应用各种IR变换)
debug_settings.print( // 打印优化后的IR表示
&project.program.term,
debug::Stage::Ir(debug::IrForm::Optimized),
);
// 生成二进制文件的运行时内存映像表示
let mut runtime_memory_image = if let Some(bare_metal_config) = bare_metal_config_opt.as_ref() {
// 如果提供了裸机配置,使用裸机模式生成内存映像
RuntimeMemoryImage::new_from_bare_metal(&binary, bare_metal_config)
.context("Error while generating runtime memory image.")?
} else {
// 否则使用标准ELF/PE模式生成内存映像
RuntimeMemoryImage::new(&binary).context("Error while generating runtime memory image.")?
};
if project.program.term.address_base_offset != 0 {
// 全局调整一次内存地址,这样其他分析就不需要调整它们的地址
runtime_memory_image.add_global_memory_offset(project.program.term.address_base_offset);
}
project.runtime_memory_image = runtime_memory_image; // 将内存映像赋值给项目
Ok((binary, project)) // 返回(二进制内容,项目)元组
}
project.optimize 优化 IR
位于 src\cwe_checker_lib\src\intermediate_representation\project.rs
- IntraproceduralDeadBlockElimPass:删除函数内部不可达的基本快并移除无效控制流边与语句,例如入口不可达、恒真/恒假条件折叠后的残留块、无条件跳转后的冗余分支等
- InputExpressionPropagationPass: 沿赋值链传播“输入”表达式(参数、全局读、调用返回、内存加载等),在等价且安全的场合用来源替换表达式,减少临时变量层级
- TrivialExpressionSubstitutionPass:对简单/可决表达式进行代数化简与常量折叠,即完成中间简单计算的过程,例如
t = y & 0 => t = 0t = x + 0; t1 = 3 * 4 + t => t1 = 12 + x - DeadVariableElimPass:删除对未使用变量的赋值,例如写死/死存储,或被写入但未读的值
- ControlFlowPropagationPass:简化分支条件、等价分支,消除恒真/恒假路径
- StackPointerAlignmentSubstitutionPass:利用按位与的对齐操作对齐栈指针
pub fn optimize(&mut self, debug_settings: &debug::Settings) {
let mut logs = Vec::new(); // 初始化日志收集器,用于收集优化过程中的日志信息
// 运行过程内死块消除优化:移除函数内不可达的基本块
run_ir_pass![
self.program.term, // 要优化的程序 IR
(), // 无额外参数
IntraproceduralDeadBlockElimPass, // 优化过程名称
logs, // 日志收集器
debug_settings, // 调试设置
];
// 运行输入表达式传播优化:将输入表达式沿着赋值传播
run_ir_pass![
self.program.term,
(),
InputExpressionPropagationPass,
logs,
debug_settings,
];
// 运行平凡表达式替换优化:用结果替换简单的算术表达式
run_ir_pass![
self.program.term,
(),
TrivialExpressionSubstitutionPass,
logs,
debug_settings,
];
// 运行死变量消除优化:移除对未使用变量的赋值
run_ir_pass![
self.program.term,
self.register_set, // 需要寄存器集合信息
DeadVariableElimPass,
logs,
debug_settings,
];
// 运行控制流传播优化:简化具有相同条件的条件语句
run_ir_pass![
self.program.term,
(),
ControlFlowPropagationPass,
logs,
debug_settings,
];
// 运行栈指针对齐替换优化:用减法替换逻辑AND对齐操作
run_ir_pass![
self.program.term,
self, // 需要完整的项目信息
StackPointerAlignmentSubstitutionPass,
logs,
debug_settings,
];
// 验证各项优化的后置条件(仅在调试模式下)
debug_assert_postconditions![self.program.term, (), IntraproceduralDeadBlockElimPass];
debug_assert_postconditions![self.program.term, (), InputExpressionPropagationPass];
debug_assert_postconditions![self.program.term, (), TrivialExpressionSubstitutionPass];
debug_assert_postconditions![self.program.term, self.register_set, DeadVariableElimPass];
debug_assert_postconditions![self.program.term, (), ControlFlowPropagationPass];
debug_assert_postconditions![
self.program.term,
self,
StackPointerAlignmentSubstitutionPass,
];
self.add_logs(logs) // 将收集的日志添加到项目中
}
src\cwe_checker_lib\src\utils\ghidra.rs
功能:调用 ghidra 进行反编译,返回 Project 结构体指针
get_project_from_ghidra 调用 ghidra 获取 Project 结构体信息
如果使用 –pcode-raw 参数,则获取指定文件中的 pcode,解析为 PcodeProject 结构体(与 PcodeRaw 结构一致,只是表现形式不同,将 json 转化成 rust 类型结构)
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq, Clone)] pub struct PcodeProject { program: Program, // 程序代码 register_properties: Vec<RegisterProperties>, // 所有 CPU 架构特定寄存器的信息 cpu_arch: String, // CPU 架构 external_functions: HashMap<String, ExternFunction>, // 导入的函数 entry_points: Vec<String>, // 导出的函数(入口点) stack_pointer_register: Varnode, // CPU 架构的栈指针寄存器 calling_conventions: HashMap<String, CallingConvention>, // 给定 CPU 架构的已知调用约定信息 datatype_properties: DatatypeProperties, // 包含 C 数据类型的属性,例如它们的大小 image_base: String, // 程序映像在内存中的基址 }获取临时文件夹和时间戳,拼接得到唯一管道名称
调用 [generate_ghidra_call_command](####generate_ghidra_call_command 生成调用 ghidra 的命令) 函数生成调用 ghidra 的命令
调用 execute_ghidra 执行ghidra 并且获取 pcode,解析为 PcodeProject 结构体,执行失败将直接终止程序
调用 [parse_pcode_project_to_ir_project](####parse_pcode_project_to_ir_project 将 Pcode 项目解析为 IR 项目) 将 Pcode 项目解析为 IR 项目
pub fn get_project_from_ghidra(
file_path: &Path, // 二进制文件路径
binary: &[u8], // 二进制文件内容
bare_metal_config_opt: Option<BareMetalConfig>, // 可选的裸机配置
debug_settings: &debug::Settings, // 调试设置
) -> Result<WithLogs<Project>, Error> {
// 如果通过 --pcode-raw 指定过 pcode 则从文件获取 pcode
let pcode_project = if let Some(saved_pcode_raw) = debug_settings.get_saved_pcode_raw() {
let mut file = std::fs::File::open(saved_pcode_raw) // 打开指定的 pcode 文件
.expect("Failed to open saved output of Pcode Extractor plugin.");
let mut saved_pcode_raw = String::new();
file.read_to_string(&mut saved_pcode_raw) // 从文件读取指定的 pcode
.expect("Failed to read saved Pcode Extractor plugin output");
debug_settings.print(&saved_pcode_raw, debug::Stage::Pcode(debug::PcodeForm::Raw));
serde_json::from_str(&saved_pcode_raw)? // 解析 JSON 为 PcodeProject
} else {
// 没有指定 pcode 则调用 ghidra 生成
let tmp_folder = get_tmp_folder()?; // 获取临时文件夹
// 为文件名添加时间戳后缀,以便多个 cwe_checker 实例并行运行同一文件时互不干扰
let timestamp_suffix = format!(
"{:?}",
std::time::SystemTime::now()
.duration_since(std::time::SystemTime::UNIX_EPOCH)
.unwrap()
.as_millis()
);
// 为管道创建唯一名称
let fifo_path = tmp_folder.join(format!("pcode_{timestamp_suffix}.pipe"));
let ghidra_command = generate_ghidra_call_command( // 生成调用 Ghidra 的命令
file_path,
&fifo_path,
×tamp_suffix,
&bare_metal_config_opt,
)?;
execute_ghidra(ghidra_command, &fifo_path, debug_settings)? // 执行 Ghidra 并获取结果
};
debug_settings.print( // 打印解析后的 Pcode
&pcode_project,
debug::Stage::Pcode(debug::PcodeForm::Parsed),
);
parse_pcode_project_to_ir_project( // 将 Pcode 项目解析为 IR 项目
pcode_project,
binary,
&bare_metal_config_opt,
debug_settings,
)
}
generate_ghidra_call_command 生成调用 ghidra 的命令
从 ghidra.json 配置文件读取 ghidra 路径,获取 Pcode 插件路径
获取临时文件夹,优先使用系统运行时的 cwe_checker 目录,如果不存在则创建 /tmp/cwe_checker 目录
从二进制文件路径提取二进制文件的文件名
创建 ghidra 命令对象并设置命令参数,如果提供了 bare_metal_config 则进行额外设置
无 bare_metal_config:
<ghidra_path>/support/analyzeHeadless <tmp_folder> "PcodeExtractor_<filename>_<timestamp>" -import <file_path> -postScript <ghidra_plugin_path>/PcodeExtractor.java <fifo_path> -scriptPath <ghidra_plugin_path> -deleteProject -analysisTimeoutPerFile 3600示例:
/opt/ghidra/support/analyzeHeadless /tmp/cwe_checker "PcodeExtractor_test_program_1234567890" -import /home/user/binary -postScript /usr/local/lib/cwe_checker/ghidra_plugin/PcodeExtractor.java /tmp/cwe_checker/pcode_1234567890.pipe -scriptPath /usr/local/lib/cwe_checker/ghidra_plugin -deleteProject -analysisTimeoutPerFile 3600有 bare_metal_config:其中 -loader-baseAddr 地址不包含 0x,输入值含有 0x 的时候会自动去除
<ghidra_path>/support/analyzeHeadless <tmp_folder> "PcodeExtractor_<filename>_<timestamp>" -import <file_path> -postScript <ghidra_plugin_path>/PcodeExtractor.java <fifo_path> -scriptPath <ghidra_plugin_path> -deleteProject -analysisTimeoutPerFile 3600 -loader BinaryLoader -loader-baseAddr <base_address> -processor <processor_id>示例:
/opt/ghidra/support/analyzeHeadless /tmp/cwe_checker "PcodeExtractor_firmware.bin_1234567890" -import /home/user/firmware.bin -postScript /usr/local/lib/cwe_checker/ghidra_plugin/PcodeExtractor.java /tmp/cwe_checker/pcode_1234567890.pipe -scriptPath /usr/local/lib/cwe_checker/ghidra_plugin -deleteProject -analysisTimeoutPerFile 3600 -loader BinaryLoader -loader-baseAddr 8000000 -processor ARM:LE:32:Cortex
返回 ghidra 命令对象
// 生成用于调用 Ghidra 并在其中执行 P-Code-Extractor 插件的命令
fn generate_ghidra_call_command(
file_path: &Path, // 二进制文件路径
fifo_path: &Path, // FIFO 命名管道路径
timestamp_suffix: &str, // 时间戳后缀(用于创建唯一的项目名称)
bare_metal_config_opt: &Option<BareMetalConfig>, // 可选的裸机配置
) -> Result<Command, Error> {
// 从配置文件中读取 Ghidra 路径
let ghidra_path: std::path::PathBuf =
serde_json::from_value(read_config_file("ghidra.json")?["ghidra_path"].clone())
.context("Path to Ghidra not configured.")?;
let headless_path = ghidra_path.join("support/analyzeHeadless"); // Ghidra 无头模式可执行文件路径
let tmp_folder = get_tmp_folder()?; // 获取临时文件夹
// 从文件路径中提取文件名
let filename = file_path
.file_name()
.ok_or_else(|| anyhow!("Invalid file name"))?
.to_string_lossy()
.to_string();
let ghidra_plugin_path = get_ghidra_plugin_path("p_code_extractor")?; // 获取 Pcode 提取器插件路径
// 创建 Ghidra 命令对象
let mut ghidra_command = Command::new(headless_path);
ghidra_command
.arg(&tmp_folder) // 存储临时文件的文件夹
.arg(format!("PcodeExtractor_{filename}_{timestamp_suffix}")) // 临时 Ghidra 项目的名称
.arg("-import") // 将文件导入到 Ghidra 项目中
.arg(file_path) // 文件导入路径
.arg("-postScript") // 在 Ghidra 完成标准分析后执行脚本
.arg(ghidra_plugin_path.join("PcodeExtractor.java")) // PcodeExtractor.java 的路径
.arg(fifo_path) // 命名管道(fifo)的路径
.arg("-scriptPath") // 将包含其他脚本文件的文件夹添加到 Ghidra 脚本文件搜索路径
.arg(ghidra_plugin_path) // 包含 PcodeExtractor.java 的文件夹路径(以便找到其他 Java 文件)
.arg("-deleteProject") // 脚本完成后删除临时项目
.arg("-analysisTimeoutPerFile") // 设置标准分析在终止前可以运行多长时间的超时
.arg("3600"); // 超时时间为一小时(=3600秒)// TODO: 后置脚本可以检测到超时触发并相应处理
// 如果提供了裸机配置,添加额外的命令参数
if let Some(bare_metal_config) = bare_metal_config_opt {
let mut base_address: &str = &bare_metal_config.flash_base_address; // 获取基础地址字符串
// 如果地址以 "0x" 开头,去掉前缀
if let Some(stripped_address) = base_address.strip_prefix("0x") {
base_address = stripped_address;
}
ghidra_command
.arg("-loader") // 告诉 Ghidra 使用特定的加载器
.arg("BinaryLoader") // 对裸机二进制文件使用 BinaryLoader
.arg("-loader-baseAddr") // 提供二进制文件应映射到内存中的基础地址
.arg(base_address)
.arg("-processor") // 提供处理器类型 ID,该二进制文件是为该处理器编译的
.arg(bare_metal_config.processor_id.clone());
}
Ok(ghidra_command) // 返回配置好的命令
}
/// 获取程序应存储临时文件的文件夹
fn get_tmp_folder() -> Result<PathBuf, Error> {
// 获取 cwe_checker 的项目目录路径
let project_dirs = ProjectDirs::from("", "", "cwe_checker")
.context("Could not determine path for temporary files")?;
// 优先使用运行时目录,如果不存在则使用默认的 /tmp/cwe_checker
let tmp_folder = if let Some(folder) = project_dirs.runtime_dir() {
folder // 使用系统运行时目录(如 Linux 的 /run/user/xxx/cwe_checker)
} else {
Path::new("/tmp/cwe_checker") // 回退到 /tmp/cwe_checker
};
// 如果临时文件夹不存在,创建它
if !tmp_folder.exists() {
std::fs::create_dir(tmp_folder).context("Unable to create temporary folder")?;
}
Ok(tmp_folder.to_path_buf()) // 返回临时文件夹路径
}
parse_pcode_project_to_ir_project 将 Pcode 项目解析为 IR 项目
- 指定了 bare_metal_config 则获取其中的基址
- 从二进制文件中获取到基址后调用 pcode_project.into_ir_project 转换为 project 结构体
- 获取基址失败,则尝试从 bare_metal_config 获取基址,并且设置偏移量为 0
- 找不到基址,则设置 0 为默认值
- 返回 project 结构体
pub fn parse_pcode_project_to_ir_project(
pcode_project: PcodeProject, // Ghidra 的 Pcode 项目
binary: &[u8], // 二进制文件内容
bare_metal_config_opt: &Option<BareMetalConfig>, // 可选的裸机配置
debug_settings: &debug::Settings, // 调试设置
) -> Result<WithLogs<Project>, Error> {
// 如果提供了裸机配置,解析其基础地址
let bare_metal_base_address_opt = bare_metal_config_opt
.as_ref()
.map(|config| config.parse_binary_base_address());
let project: WithLogs<Project> = match RuntimeMemoryImage::get_base_address(binary) {
Ok(binary_base_address) => {
// 成功获取二进制文件基础地址,使用该地址转换项目
pcode_project.into_ir_project(binary_base_address, debug_settings)
}
Err(_err) => {
// 无法从二进制文件获取基础地址
if let Some(binary_base_address) = bare_metal_base_address_opt {
// 如果有裸机配置提供的基础地址,使用它
let mut project =
pcode_project.into_ir_project(binary_base_address, debug_settings);
// 对于裸机二进制,将地址偏移量设为 0
project.program.term.address_base_offset = 0;
project
} else {
// 没有基础地址,使用 0 作为默认值
let mut project = pcode_project.into_ir_project(0, debug_settings);
// 添加日志消息,说明使用了回退方案
project.add_log_msg(LogMessage::new_info("Could not determine binary base address. Using base address of Ghidra output as fallback."));
project.program.term.address_base_offset = 0;
project
}
}
};
Ok(project) // 返回解析后的项目
}
src\cwe_checker_lib\src\ghidra_pcode\mod.rs
PcodeProject.into_ir_project 将 PcodeProject 转化成 IR
初始化日志向量,创建寄存器映射表,将程序转化为 IR 函数项的映射表
let mut logs = Vec::new(); // 初始化日志向量,用于收集处理过程中的日志消息 let register_map = RegisterMap::new(&self.register_properties); // 创建寄存器映射表 // 将程序转换为 IR 函数项的映射表(函数 TID -> 函数项) let ir_function_terms_map = self.program.to_ir_function_terms_map();其中 to_ir_function_terms_map 位于src\cwe_checker_lib\src\ghidra_pcode\program.rs,返回一个 BTreeMap,键是函数的 Tid(线程标识符),值是转换后的 IR 函数项,Program 结构体如下(参考 PcodeRaw 中的 Program)
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq, Clone)] pub struct Program { functions: Vec<Function>, }Tid 结构体位于 src\cwe_checker_lib\src\intermediate_representation\term.rs
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq, Hash, Clone, PartialOrd, Ord)] pub struct Tid { id: String, // 唯一标识符 address: TidAddress, // 地址 }to_ir_function_terms_map 函数如下:
pub fn to_ir_function_terms_map(&self) -> BTreeMap<Tid, IrTerm<IrFunction>> { // 创建一个空的 HashSet 来存储所有跳转目标(类型为 Tid) let jump_targets: HashSet<Tid> = self .blocks() // 获取程序中所有基本块的迭代器 // 收集该块中的所有跳转目标,使用 flat_map 将多个 HashSet 展平为一个 HashSet .flat_map(|block| block.collect_jmp_targets()) .collect(); // 将所有跳转目标收集到一个 HashSet 中 // 创建一个空的 BTreeMap 来存储转换结果,键是函数 Tid,值是 IR 函数项 let ret: BTreeMap<Tid, IrTerm<IrFunction>> = self .functions() // 获取程序中所有函数的引用 .iter() // 获取函数的迭代器 .map(|function| { // 对每个函数进行转换操作,转换成 IR 函数项 // 传入之前收集的跳转目标集合,用于在转换过程中识别有效的跳转目标 let ir_function_term = function.to_ir_function_term(&jump_targets); // 返回一个元组:(函数的 Tid, IR 函数项) (ir_function_term.tid.clone(), ir_function_term) }) .collect(); // 将迭代器中的所有元组收集到 BTreeMap 中 // 验证转换结果的完整性,断言原始函数数量应该等于转换后的函数数量 // 如果数量不相等,说明存在重复的函数 TID(这不应该发生) assert_eq!(self.functions.len(), ret.len(), "Duplicate function TID."); // 返回转换后的函数映射表 ret }to_ir_function_term 位于 src\cwe_checker_lib\src\ghidra_pcode\function\mod.rs
pub fn to_ir_function_term(&self, jump_targets: &HashSet<Tid>) -> IrTerm<IrFunction> { // 创建 IrFunction (Sub) 结构体 let ir_function_term = IrFunction::new::<_, &str>( &self.name, // 函数名称 self.blocks() .iter() .flat_map(|block| block.to_ir_blocks(jump_targets).into_iter()) .collect(), // 将每个基本块转换为 IR 基本块列表 None, // 调用约定设为未知 ); // 使用函数地址创建 Tid,然后包装为 IrTerm IrTerm::new(Tid::new_function(&self.address), ir_function_term) }IrTerm 结构体位于 src\cwe_checker_lib\src\ghidra_pcode\term.rs(mod.rs 中被重命名)
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq, Hash, Clone)] pub struct Term { address: String, index: u64, operation: PcodeOperation, }IrFunction 结构体位于 src\cwe_checker_lib\src\intermediate_representation\sub.rs(mod.rs 中被重命名)
/// 子程序可以有多个出口,这些出口由 `Jmp::Return` 指令标识。 #[derive(Serialize, Deserialize, Debug, PartialEq, Eq, Hash, Clone)] pub struct Sub { pub name: String, // 子程序的名称 pub blocks: Vec<Term<Blk>>, // 属于该子程序的基本块,第一个块也是子程序的入口点 pub calling_convention: Option<String>, // 用于调用该函数的调用约定,如果已知 non_returning: bool, // 如果函数不返回则为 true }Blk 结构体位于 src\cwe_checker_lib\src\intermediate_representation\blk.rs
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq, Hash, Clone)] pub struct Blk { pub defs: Vec<Term<Def>>, // 按执行顺序的 Def 指令 pub jmps: Vec<Term<Jmp>>, // 数量为 0-2 个,对应 无法确定下一个指令 / 无条件 / 有条件跳转 indirect_control_flow_targets: Option<Box<IndirectCfTargets>>, }其中 def 和 jmp 枚举类型如下:
Def:加载和存储表达式
pub enum Def { Load { // 将内存加载到由 var 指定的寄存器中 var: Variable, // 内存加载的目标寄存器,var 的大小决定了从内存中读取的字节数 address: Expression, // 计算读取地址的表达式 }, Store { // 一个内存存储操作 address: Expression, // 计算写入地址的表达式 value: Expression, // 计算写入内存的值的表达式 }, Assign { // 寄存器分配,将表达式 value 的结果分配给寄存器 var var: Variable, // 被写入的寄存器 value: Expression, // 计算分配给寄存器的值的表达式 }, }Jmp:影响程序的执行流程
pub enum Jmp { Branch(Tid), // 直接在过程内跳转到目标 Blk 术语标识符 BranchInd(Expression), // 间接在过程内跳转到给定表达式求值后的地址 CBranch { // 符合条件时跳转 target: Tid, // 跳转目标块的术语 ID condition: Expression, // 跳转条件 }, Call { // 过程间跳转,表示子程序调用 target: Tid, // 目标子程序( Sub )或调用外部符号的术语 ID return_: Option<Tid>, // 被调用函数返回的块的术语 ID,永不返回,则可能是 None }, CallInd { // 一个间接的跨过程跳转到 target 表达式求值后的地址,表示子程序调用 target: Expression, // 计算调用目标地址的表达式 return_: Option<Tid>, // 被调用函数返回的块的术语 ID,永不返回,则可能是 None }, Return(Expression), // 从子程序返回的间接跨过程跳转 CallOther { // 其他指令无法表示或不被反汇编器支持的副作用 description: String, // 对副作用的描述 return_: Option<Tid>, // 反汇编器假定在处理副作用后执行将继续的代码块的块术语标识符 }, }将 pcode 操作转化为 IR Def 示例:
// 原始 pcode 操作,参考 PcodeRaw 结构体 Term { address: "0x00401000", index: 0, operation: PcodeOperation { // rsp = rsp - 8 opcode: Expression(IntSub), // 操作:sub output: Some(Varnode::new("temp_RSP_new")), // 输出:rsp inputs: [ Varnode::new("register_RSP_64"), // 输入:rsp Varnode::new("const_8_64"), // 8 ], }, } // 转换为 IR Def Term<Def> { tid: Tid { id: "instr_0x00401000_0", // tid address: TidAddress(Some(0x00401000)), // 程序地址 }, term: Def::Assign { var: Variable { name: "temp_RSP_new", size: 8, is_temp: true }, value: Expression::BinOp { op: IntSub, // 操作:sub lhs: Expression::Var(Variable { name: "register_RSP_64", size: 8, is_temp: false }), // 左操作数,64 位寄存器 rsp,大小为 8,是临时寄存器 rhs: Expression::Const(Bitvector::from_u64(8)), // 右操作数,常数 8 }, }, }IrFunction 示例:
IrTerm<IrFunction> { tid: Tid { id: "fun_0x00401000".to_string(), // 格式: "fun_{地址}" address: TidAddress(Some(0x00401000)), }, term: Sub { name: "example_function".to_string(), blocks: vec![ Term<Blk> { tid: Tid { id: "blk_0x00401000".to_string(), address: TidAddress(Some(0x00401000)), }, term: Blk { defs: vec![ // 操作 1:计算 RSP - 8 Term<Def> { tid: Tid { id: "instr_0x00401000_0".to_string(), address: TidAddress(Some(0x00401000)), }, term: Def::Assign { var: Variable { name: "temp_RSP_new".to_string(), size: ByteSize::new(8), is_temp: true, }, value: Expression::BinOp { op: BinOpType::IntSub, lhs: Box::new(Expression::Var(Variable { name: "register_RSP_64".to_string(), size: ByteSize::new(8), is_temp: false, })), rhs: Box::new(Expression::Const(Bitvector::from_u64(8))), }, }, }, // 操作 2:更新 RSP Term<Def> { tid: Tid { id: "instr_0x00401000_1".to_string(), address: TidAddress(Some(0x00401000)), }, term: Def::Assign { var: Variable { name: "register_RSP_64".to_string(), size: ByteSize::new(8), is_temp: false, }, value: Expression::Var(Variable { name: "temp_RSP_new".to_string(), size: ByteSize::new(8), is_temp: true, }), }, }, // 操作 3:存储 RBP 到栈 Term<Def> { tid: Tid { id: "instr_0x00401000_2".to_string(), address: TidAddress(Some(0x00401000)), }, term: Def::Store { address: Expression::Var(Variable { name: "register_RSP_64".to_string(), size: ByteSize::new(8), is_temp: false, }), value: Expression::Var(Variable { name: "register_RBP_64".to_string(), size: ByteSize::new(8), is_temp: false, }), }, }, ], jmps: Vec::new(), indirect_control_flow_targets: None, }, }, ], calling_convention: None, non_returning: false, }, }计算 ghidra 中的基址和实际基址的差值得到偏移 address_base_offset(仅在成功获取基址的情况下计算,否则 project 中的 address_base_offset = 0)
// 计算地址基址偏移量(Ghidra 报告的基址与实际基址的差值) let address_base_offset = // 将 image_base 字符串(去掉 "0x" 前缀)解析为十六进制 u64 match u64::from_str_radix(self.image_base.trim_start_matches("0x"), 16) .unwrap() .checked_sub(binary_base_address) // 安全地计算差值(防止下溢) { Some(a) => a, // 如果计算成功,使用差值 None => { // 如果下溢(Ghidra 报告的基址小于实际基址) logs.push(LogMessage::new_info(format!( // 添加警告日志 "Base address reported by Ghidra is smaller than actual base address: {} vs 0x{:x}", self.image_base, binary_base_address))); // 记录 Ghidra 报告的基址和实际基址 0 // 使用 0 作为偏移量 } };创建初始的 IR 程序结构
// 创建初始的 IR 程序结构 let mut ir_program = IrProgram { subs: ir_function_terms_map, // 函数项映射表(函数 TID -> 函数项) extern_symbols: self // 外部符号(导入的函数) .external_functions // 从外部函数映射中 .values() // 获取所有值(ExternFunction) .map(|ext_fn| { // 将每个外部函数转换为 IR 外部符号 let ext_sym = ext_fn.to_ir_extern_symbol(&self); // 转换为 IR 外部符号 (ext_sym.tid.clone(), ext_sym) // 返回 (TID, 外部符号) 元组 }) .collect(), // 收集为 HashMap // 将入口点字符串转换为函数 TID 列表 entry_points: self.entry_points.iter().map(Tid::new_function).collect(), address_base_offset, // 地址基址偏移量 }; // 打印早期 IR 表示 debug_settings.print(&ir_program, debug::Stage::Ir(debug::IrForm::Early));使用的 IrProgram 结构体如下(与self.program 不是同一个结构体,位于 src\cwe_checker_lib\src\intermediate_representation\program.rs):
pub struct Program { // 二进制文件中包含的已知函数,存放程序转化的 IR 函数映射表 pub subs: BTreeMap<Tid, Term<Sub>>, // 链接器链接到二进制文件的 extern 符号,从 PcodeProject 的 external_functions 获取所有外部符号 pub extern_symbols: BTreeMap<Tid, ExternSymbol>, // 二进制文件的入口点,从 PcodeProject 的 entry_points 获取 pub entry_points: BTreeSet<Tid>, // ghidra 与程序实际基址的偏移量 pub address_base_offset: u64, }运行 IR Pass 顺序执行多个独立的处理单元,对 IR 进行转换和优化
run_ir_pass![ ir_program, // 要转换的 IR 程序(可变的) construction_input, // 构造传递所需的输入(通常是寄存器映射等上下文信息) PassType, // 传递的类型 logs, // 日志向量,用于收集处理过程中的消息 debug_settings // 调试设置,控制调试输出 ];- SingleTargetIndirectCallsPass:只有单个可能的调用目标时,将间接调用替换为直接调用
- ReorderFnBlocksPass:按执行顺序流重新将 ghidra 得到的基本块顺序进行排列
- ReplaceCallsToExtFnsPass:将对外部函数存根(stub)的调用替换为对外部符号的实际调用
- SubregisterSubstitutionPass:将子寄存器的引用替换为基寄存器
- InliningPass:将简单或简短的函数内联到调用处,替换对函数的调用
- NoreturnExtFunctionsPass:标记不会返回的外部函数,执行到这些函数会终止程序
- RemoveEmptyFunctionsPass:移除空函数
- PatchCfPass:修复控制流,移除无效的跳转,确保跳转目标都有效、控制流图完整
- EntryPointsPass:移除不存在的入口点,确保入口点列表只包含实际存在的函数
// 运行 IR 优化传递(Passes),按顺序应用各种转换和优化 run_ir_pass![ // 运行 IR 传递:单目标间接调用替换 ir_program, // 要转换的 IR 程序 (), // 传递的上下文参数(无) SingleTargetIndirectCallsPass, // 将只有单个目标的间接调用替换为直接调用 logs, // 日志向量 debug_settings // 调试设置 ]; // 运行 IR 传递:重新排序函数内的基本块(按正常顺序) run_ir_pass![ir_program, (), ReorderFnBlocksPass, logs, debug_settings]; // 运行 IR 传递:将外部函数存根调用替换为外部函数调用 run_ir_pass!(ir_program, ReplaceCallsToExtFnsPass, logs, debug_settings); run_ir_pass![ // 运行 IR 传递:子寄存器替换 ir_program, // 要转换的 IR 程序 register_map, // 传递寄存器映射作为上下文 SubregisterSubstitutionPass, // 将子寄存器引用替换为基寄存器引用 logs, // 日志向量 debug_settings // 调试设置 ]; // 运行 IR 传递:内联现有引用的块到函数中 run_ir_pass!(ir_program, InliningPass, logs, debug_settings); // 运行 IR 传递:标记非返回的外部函数 run_ir_pass!(ir_program, NoreturnExtFunctionsPass, logs, debug_settings); // 运行 IR 传递:移除空函数 run_ir_pass!(ir_program, RemoveEmptyFunctionsPass, logs, debug_settings); // 运行 IR 传递:修复控制流(确保所有控制流转换都有有效目标) run_ir_pass!(ir_program, PatchCfPass, logs, debug_settings); // 运行 IR 传递:移除不存在的入口点 run_ir_pass![ir_program, (), EntryPointsPass, logs, debug_settings];调用 debug_assert_postconditions 断言各个传递的后置条件,验证传递的正确性
// 在调试模式下断言各个传递的后置条件(验证传递的正确性) // 断言单目标间接调用替换的后置条件 debug_assert_postconditions![ir_program, (), SingleTargetIndirectCallsPass]; // 断言基本块重排序的后置条件 debug_assert_postconditions![ir_program, (), ReorderFnBlocksPass]; // 断言外部函数调用替换的后置条件 debug_assert_postconditions!(ir_program, ReplaceCallsToExtFnsPass); // 断言子寄存器替换的后置条件 debug_assert_postconditions![ir_program, register_map, SubregisterSubstitutionPass]; // 断言内联传递的后置条件 debug_assert_postconditions!(ir_program, InliningPass); // 断言非返回函数标记的后置条件 debug_assert_postconditions!(ir_program, NoreturnExtFunctionsPass); // 断言空函数移除的后置条件 debug_assert_postconditions!(ir_program, RemoveEmptyFunctionsPass); // 断言控制流修复的后置条件 debug_assert_postconditions!(ir_program, PatchCfPass); // 断言入口点处理的后置条件 debug_assert_postconditions![ir_program, (), EntryPointsPass]; // 断言 IR 程序的不变量,验证程序的完整性,确保所有基本快 TID 唯一,所有跳转目标存在,所有函数调用约定有效 ir_program.debug_assert_invariants();创建并返回 Project 结构体实例
// 创建最终的 IR Project(项目)结构
let ir_project = IrProject {
// 创建程序项,使用程序基址作为 TID
program: IrTerm::new(Tid::new_program(&self.image_base), ir_program),
cpu_architecture: self.cpu_arch, // CPU 架构名称
// 将栈指针寄存器 Varnode 转换为 IR 变量
stack_pointer_register: self.stack_pointer_register.to_ir_var(),
calling_conventions: self // 调用约定
.calling_conventions // 从调用约定映射中
.into_iter() // 转换为迭代器(消耗所有权)
.map(|(cconv_name, cconv)| { // 将每个调用约定转换为 IR 调用约定
(
cconv_name, // 调用约定名称(键)
cconv // 调用约定对象
.to_ir_calling_convention(®ister_map) // 转换为 IR 调用约定
.move_logs_to(&mut logs) // 将日志移动到 logs 向量
.into_object(), // 提取对象(从 WithLogs 中)
)
})
.collect(), // 收集为 HashMap
register_set: register_map.get_base_reg_ir_vars(), // 获取基础寄存器的 IR 变量集合
// 将数据类型属性转换为 IR 数据类型属性
datatype_properties: self.datatype_properties.into(),
// 创建空的运行时内存映像(小端序)
runtime_memory_image: IrRuntimeMemoryImage::empty(true),
};
WithLogs::new(ir_project, logs) // 返回带日志的 IR Project
}
}