8、安全测试指南
安全测试概述
安全测试是智能合约开发中最关键的环节之一。本指南将介绍如何使用 Foundry 进行全面的安全测试。
常见漏洞测试
重入攻击测试
// SPDX-License-Identifier: MIT
pragma solidity ^0.8.19;
import {Test} from "forge-std/Test.sol";
import {Vault} from "../src/Vault.sol";
import {Token} from "../src/Token.sol";
contract ReentrancyTest is Test {
Vault public vault;
Token public token;
ReentrancyAttacker public attacker;
address public constant VICTIM = address(0x1);
uint256 public constant INITIAL_BALANCE = 1000 ether;
function setUp() public {
token = new Token();
vault = new Vault(address(token));
attacker = new ReentrancyAttacker(vault, token);
// 给受害者分配代币
token.mint(VICTIM, INITIAL_BALANCE);
vm.prank(VICTIM);
token.approve(address(vault), INITIAL_BALANCE);
// 受害者存款
vm.prank(VICTIM);
vault.deposit(500 ether);
// 给攻击者分配代币
token.mint(address(attacker), 100 ether);
}
function test_ReentrancyAttack() public {
uint256 vaultBalanceBefore = token.balanceOf(address(vault));
uint256 attackerBalanceBefore = token.balanceOf(address(attacker));
console.log("Vault balance before attack:", vaultBalanceBefore);
console.log("Attacker balance before attack:", attackerBalanceBefore);
// 执行重入攻击
attacker.attack();
uint256 vaultBalanceAfter = token.balanceOf(address(vault));
uint256 attackerBalanceAfter = token.balanceOf(address(attacker));
console.log("Vault balance after attack:", vaultBalanceAfter);
console.log("Attacker balance after attack:", attackerBalanceAfter);
// 验证攻击是否成功(或被阻止)
if (attackerBalanceAfter > attackerBalanceBefore + 100 ether) {
console.log("❌ Reentrancy attack succeeded!");
// 在实际测试中,这应该失败以证明合约是安全的
} else {
console.log("✅ Reentrancy attack was prevented!");
}
}
}
contract ReentrancyAttacker {
Vault public vault;
Token public token;
bool public attacking;
constructor(Vault _vault, Token _token) {
vault = _vault;
token = _token;
}
function attack() external {
token.approve(address(vault), 100 ether);
vault.deposit(100 ether);
attacking = true;
vault.withdraw(100 ether);
}
// 恶意的 receive 函数,在接收 ETH 时触发重入
receive() external payable {
if (attacking && address(vault).balance > 0) {
vault.withdraw(100 ether);
}
}
// 对于 ERC20,可能需要实现恶意的 transfer 回调
function onTokenTransfer(address, uint256) external {
if (attacking && token.balanceOf(address(vault)) > 100 ether) {
vault.withdraw(100 ether);
}
}
}
整数溢出/下溢测试
contract OverflowTest is Test {
Calculator public calculator;
function setUp() public {
calculator = new Calculator();
}
function test_IntegerOverflow() public {
uint256 maxUint = type(uint256).max;
// 测试加法溢出
vm.expectRevert(); // 在 Solidity 0.8+ 中会自动检查溢出
calculator.add(maxUint, 1);
}
function test_IntegerUnderflow() public {
// 测试减法下溢
vm.expectRevert();
calculator.subtract(0, 1);
}
function testFuzz_SafeMath(uint256 a, uint256 b) public {
// 使用模糊测试检查数学运算的安全性
if (a > type(uint256).max - b) {
vm.expectRevert();
calculator.add(a, b);
} else {
uint256 result = calculator.add(a, b);
assertEq(result, a + b);
}
}
}
访问控制测试
contract AccessControlTest is Test {
SecureVault public vault;
address public constant OWNER = address(0x1);
address public constant USER = address(0x2);
address public constant ATTACKER = address(0x3);
function setUp() public {
vm.prank(OWNER);
vault = new SecureVault();
}
function test_OnlyOwnerFunctions() public {
// 测试只有所有者才能调用的函数
// 所有者可以调用
vm.prank(OWNER);
vault.emergencyStop();
// 非所有者不能调用
vm.prank(ATTACKER);
vm.expectRevert("Ownable: caller is not the owner");
vault.emergencyStop();
}
function test_RoleBasedAccess() public {
// 测试基于角色的访问控制
// 授予用户角色
vm.prank(OWNER);
vault.grantRole(vault.DEPOSITOR_ROLE(), USER);
// 用户可以存款
vm.prank(USER);
vault.deposit(100 ether);
// 攻击者不能存款
vm.prank(ATTACKER);
vm.expectRevert("AccessControl: account is missing role");
vault.deposit(100 ether);
}
function test_PrivilegeEscalation() public {
// 测试权限提升攻击
vm.prank(OWNER);
vault.grantRole(vault.DEPOSITOR_ROLE(), USER);
// 用户不能授予自己管理员权限
vm.prank(USER);
vm.expectRevert("AccessControl: account is missing role");
vault.grantRole(vault.ADMIN_ROLE(), USER);
}
}
闪电贷攻击测试
contract FlashLoanAttackTest is Test {
DeFiProtocol public protocol;
FlashLoanProvider public flashLoanProvider;
FlashLoanAttacker public attacker;
function setUp() public {
protocol = new DeFiProtocol();
flashLoanProvider = new FlashLoanProvider();
attacker = new FlashLoanAttacker(protocol, flashLoanProvider);
// 设置初始流动性
deal(address(protocol.token()), address(protocol), 1000000 ether);
deal(address(protocol.token()), address(flashLoanProvider), 1000000 ether);
}
function test_FlashLoanAttack() public {
uint256 protocolBalanceBefore = protocol.token().balanceOf(address(protocol));
uint256 attackerBalanceBefore = protocol.token().balanceOf(address(attacker));
console.log("Protocol balance before:", protocolBalanceBefore);
console.log("Attacker balance before:", attackerBalanceBefore);
// 执行闪电贷攻击
attacker.executeAttack(500000 ether);
uint256 protocolBalanceAfter = protocol.token().balanceOf(address(protocol));
uint256 attackerBalanceAfter = protocol.token().balanceOf(address(attacker));
console.log("Protocol balance after:", protocolBalanceAfter);
console.log("Attacker balance after:", attackerBalanceAfter);
// 检查攻击是否成功
if (attackerBalanceAfter > attackerBalanceBefore) {
console.log("❌ Flash loan attack succeeded!");
console.log("Profit:", attackerBalanceAfter - attackerBalanceBefore);
}
}
}
contract FlashLoanAttacker {
DeFiProtocol public protocol;
FlashLoanProvider public flashLoanProvider;
constructor(DeFiProtocol _protocol, FlashLoanProvider _flashLoanProvider) {
protocol = _protocol;
flashLoanProvider = _flashLoanProvider;
}
function executeAttack(uint256 amount) external {
// 1. 获取闪电贷
flashLoanProvider.flashLoan(amount, address(this));
}
function onFlashLoan(uint256 amount) external {
// 2. 利用闪电贷资金操纵价格
protocol.manipulatePrice(amount);
// 3. 在有利的价格下进行交易
uint256 profit = protocol.arbitrage();
// 4. 还款
IERC20(protocol.token()).transfer(msg.sender, amount + flashLoanProvider.fee());
// 5. 保留利润
// 利润留在合约中
}
}
经济攻击测试
通胀攻击测试
contract InflationAttackTest is Test {
Vault public vault;
Token public token;
address public constant ATTACKER = address(0x1);
address public constant VICTIM = address(0x2);
function setUp() public {
token = new Token();
vault = new Vault(address(token));
// 分配代币
token.mint(ATTACKER, 1000 ether);
token.mint(VICTIM, 1000 ether);
// 授权
vm.prank(ATTACKER);
token.approve(address(vault), type(uint256).max);
vm.prank(VICTIM);
token.approve(address(vault), type(uint256).max);
}
function test_InflationAttack() public {
console.log("=== Inflation Attack Test ===");
// 1. 攻击者存入最小金额
vm.prank(ATTACKER);
vault.deposit(1); // 1 wei
console.log("Step 1 - Attacker deposits 1 wei");
console.log("Attacker shares:", vault.balanceOf(ATTACKER));
console.log("Total supply:", vault.totalSupply());
// 2. 攻击者直接转账大量代币给vault(绕过deposit函数)
vm.prank(ATTACKER);
token.transfer(address(vault), 1000 ether);
console.log("Step 2 - Attacker transfers 1000 tokens to vault");
console.log("Vault token balance:", token.balanceOf(address(vault)));
// 3. 受害者尝试存款
vm.prank(VICTIM);
vault.deposit(1000 ether);
console.log("Step 3 - Victim deposits 1000 tokens");
console.log("Victim shares:", vault.balanceOf(VICTIM));
console.log("Total supply:", vault.totalSupply());
// 4. 检查攻击结果
uint256 attackerShares = vault.balanceOf(ATTACKER);
uint256 victimShares = vault.balanceOf(VICTIM);
if (victimShares == 0) {
console.log("❌ Inflation attack succeeded!");
console.log("Victim received 0 shares for 1000 tokens");
} else {
console.log("✅ Inflation attack prevented!");
}
// 5. 攻击者提取资金
if (attackerShares > 0) {
vm.prank(ATTACKER);
vault.withdraw(attackerShares);
uint256 attackerFinalBalance = token.balanceOf(ATTACKER);
console.log("Attacker final balance:", attackerFinalBalance);
if (attackerFinalBalance > 1000 ether) {
console.log("Attacker profit:", attackerFinalBalance - 1000 ether);
}
}
}
}
MEV 攻击测试
contract MEVAttackTest is Test {
DEX public dex;
Token public tokenA;
Token public tokenB;
address public constant VICTIM = address(0x1);
address public constant MEV_BOT = address(0x2);
function setUp() public {
tokenA = new Token("Token A", "TKA");
tokenB = new Token("Token B", "TKB");
dex = new DEX(address(tokenA), address(tokenB));
// 设置流动性
tokenA.mint(address(dex), 10000 ether);
tokenB.mint(address(dex), 10000 ether);
// 给用户分配代币
tokenA.mint(VICTIM, 1000 ether);
tokenA.mint(MEV_BOT, 1000 ether);
tokenB.mint(MEV_BOT, 1000 ether);
}
function test_SandwichAttack() public {
console.log("=== Sandwich Attack Test ===");
// 受害者准备进行大额交易
uint256 victimSwapAmount = 500 ether;
// MEV Bot 检测到待处理交易并进行抢跑
console.log("Step 1: MEV Bot front-runs victim");
vm.prank(MEV_BOT);
uint256 frontRunAmount = dex.swapAForB(200 ether);
console.log("MEV Bot receives:", frontRunAmount);
// 受害者的交易在更差的价格下执行
console.log("Step 2: Victim's transaction executes");
vm.prank(VICTIM);
uint256 victimReceived = dex.swapAForB(victimSwapAmount);
console.log("Victim receives:", victimReceived);
// MEV Bot 立即卖出获利
console.log("Step 3: MEV Bot back-runs for profit");
vm.prank(MEV_BOT);
uint256 backRunAmount = dex.swapBForA(frontRunAmount);
console.log("MEV Bot gets back:", backRunAmount);
// 计算 MEV Bot 的利润
uint256 mevProfit = backRunAmount > 200 ether ? backRunAmount - 200 ether : 0;
console.log("MEV Bot profit:", mevProfit);
if (mevProfit > 0) {
console.log("❌ Sandwich attack succeeded!");
}
}
}
治理攻击测试
治理代币攻击
contract GovernanceAttackTest is Test {
GovernanceToken public govToken;
DAO public dao;
address public constant ATTACKER = address(0x1);
address public constant HONEST_USER = address(0x2);
function setUp() public {
govToken = new GovernanceToken();
dao = new DAO(address(govToken));
// 分配治理代币
govToken.mint(HONEST_USER, 1000 ether);
govToken.mint(ATTACKER, 600 ether); // 攻击者持有少数代币
}
function test_FlashLoanGovernanceAttack() public {
console.log("=== Flash Loan Governance Attack ===");
// 1. 攻击者创建恶意提案
vm.prank(ATTACKER);
uint256 proposalId = dao.propose("Malicious proposal to drain treasury");
// 2. 攻击者通过闪电贷临时获得大量治理代币
FlashLoanProvider flashLoanProvider = new FlashLoanProvider();
govToken.mint(address(flashLoanProvider), 10000 ether);
// 3. 执行闪电贷攻击
GovernanceAttacker attacker = new GovernanceAttacker(dao, govToken, flashLoanProvider);
govToken.mint(address(attacker), 100 ether); // 手续费
attacker.executeGovernanceAttack(proposalId, 2000 ether);
// 4. 检查提案是否通过
bool proposalPassed = dao.proposalPassed(proposalId);
if (proposalPassed) {
console.log("❌ Governance attack succeeded!");
console.log("Malicious proposal passed with flash-loaned tokens");
} else {
console.log("✅ Governance attack prevented!");
}
}
}
contract GovernanceAttacker {
DAO public dao;
GovernanceToken public govToken;
FlashLoanProvider public flashLoanProvider;
constructor(DAO _dao, GovernanceToken _govToken, FlashLoanProvider _flashLoanProvider) {
dao = _dao;
govToken = _govToken;
flashLoanProvider = _flashLoanProvider;
}
function executeGovernanceAttack(uint256 proposalId, uint256 loanAmount) external {
flashLoanProvider.flashLoan(loanAmount, address(this));
}
function onFlashLoan(uint256 amount) external {
// 1. 用借来的代币投票
dao.vote(proposalId, true);
// 2. 还款
govToken.transfer(msg.sender, amount + flashLoanProvider.fee());
}
}
预言机攻击测试
价格操纵攻击
contract OracleAttackTest is Test {
PriceOracle public oracle;
LendingProtocol public lendingProtocol;
DEX public dex;
address public constant ATTACKER = address(0x1);
function setUp() public {
dex = new DEX();
oracle = new PriceOracle(address(dex)); // 使用 DEX 作为价格源
lendingProtocol = new LendingProtocol(address(oracle));
// 设置初始流动性
deal(address(dex.tokenA()), address(dex), 10000 ether);
deal(address(dex.tokenB()), address(dex), 10000 ether);
// 给攻击者分配大量资金
deal(address(dex.tokenA()), ATTACKER, 50000 ether);
}
function test_OraclePriceManipulation() public {
console.log("=== Oracle Price Manipulation Attack ===");
// 1. 记录初始价格
uint256 initialPrice = oracle.getPrice();
console.log("Initial price:", initialPrice);
// 2. 攻击者操纵 DEX 价格
vm.prank(ATTACKER);
dex.swap(address(dex.tokenA()), 40000 ether); // 大额交易影响价格
uint256 manipulatedPrice = oracle.getPrice();
console.log("Manipulated price:", manipulatedPrice);
// 3. 攻击者利用错误价格进行借贷
vm.prank(ATTACKER);
uint256 borrowAmount = lendingProtocol.calculateMaxBorrow(1000 ether);
vm.prank(ATTACKER);
lendingProtocol.borrow(borrowAmount);
console.log("Borrowed amount:", borrowAmount);
// 4. 攻击者恢复价格(可选)
// 在实际攻击中,攻击者可能会在同一笔交易中恢复价格
// 5. 检查攻击是否成功
uint256 attackerProfit = calculateProfit();
if (attackerProfit > 0) {
console.log("❌ Oracle manipulation attack succeeded!");
console.log("Attacker profit:", attackerProfit);
}
}
function calculateProfit() internal view returns (uint256) {
// 计算攻击者的净利润
return 0; // 简化实现
}
}
模糊测试安全性
安全属性模糊测试
contract SecurityFuzzTest is Test {
Vault public vault;
Token public token;
function setUp() public {
token = new Token();
vault = new Vault(address(token));
}
// 测试不变量:用户不能提取超过他们拥有的份额
function testFuzz_CannotWithdrawMoreThanOwned(
address user,
uint256 depositAmount,
uint256 withdrawAmount
) public {
vm.assume(user != address(0));
vm.assume(depositAmount > 0 && depositAmount <= 1000000 ether);
// 给用户代币并存款
deal(address(token), user, depositAmount);
vm.prank(user);
token.approve(address(vault), depositAmount);
vm.prank(user);
vault.deposit(depositAmount);
uint256 userShares = vault.balanceOf(user);
if (withdrawAmount > userShares) {
// 应该回滚
vm.prank(user);
vm.expectRevert();
vault.withdraw(withdrawAmount);
} else {
// 应该成功
vm.prank(user);
vault.withdraw(withdrawAmount);
assertEq(vault.balanceOf(user), userShares - withdrawAmount);
}
}
// 测试不变量:总供应量始终等于用户份额之和
function testFuzz_TotalSupplyInvariant(
address[5] memory users,
uint256[5] memory amounts
) public {
uint256 totalDeposited = 0;
for (uint256 i = 0; i < users.length; i++) {
vm.assume(users[i] != address(0));
amounts[i] = bound(amounts[i], 0, 1000 ether);
if (amounts[i] > 0) {
deal(address(token), users[i], amounts[i]);
vm.prank(users[i]);
token.approve(address(vault), amounts[i]);
vm.prank(users[i]);
vault.deposit(amounts[i]);
totalDeposited += amounts[i];
}
}
// 验证不变量
uint256 totalShares = 0;
for (uint256 i = 0; i < users.length; i++) {
totalShares += vault.balanceOf(users[i]);
}
assertEq(vault.totalSupply(), totalShares);
}
// 测试权限:只有代币持有者能够转账
function testFuzz_OnlyTokenHolderCanTransfer(
address from,
address to,
uint256 amount
) public {
vm.assume(from != address(0) && to != address(0));
vm.assume(from != to);
amount = bound(amount, 1, 1000 ether);
// 给 from 分配代币
deal(address(token), from, amount);
// from 应该能够转账
vm.prank(from);
bool success = token.transfer(to, amount);
assertTrue(success);
assertEq(token.balanceOf(to), amount);
// 其他人不应该能够从 from 转账(除非有授权)
address unauthorized = address(0x999);
vm.assume(unauthorized != from && unauthorized != to);
vm.prank(unauthorized);
vm.expectRevert();
token.transferFrom(from, to, 1);
}
}
静态分析集成
Slither 集成
#!/bin/bash
# scripts/security-analysis.sh
echo "🔒 Running security analysis..."
# 1. 运行 Slither
echo "Running Slither analysis..."
slither . \
--exclude-dependencies \
--exclude-informational \
--exclude-low \
--json slither-report.json
# 2. 分析结果
python3 scripts/analyze-slither.py slither-report.json
# 3. 运行 Mythril(如果安装了)
if command -v myth &> /dev/null; then
echo "Running Mythril analysis..."
myth analyze contracts/Vault.sol --solv 0.8.19
fi
# 4. 运行自定义安全检查
echo "Running custom security checks..."
forge test --match-contract "SecurityTest" -vv
echo "✅ Security analysis completed!"
自动化安全检查
# scripts/analyze-slither.py
import json
import sys
def analyze_slither_report(report_path):
"""分析 Slither 报告并生成总结"""
with open(report_path, 'r') as f:
report = json.load(f)
detectors = report.get('results', {}).get('detectors', [])
# 按严重程度分类
issues = {
'High': [],
'Medium': [],
'Low': [],
'Informational': []
}
for detector in detectors:
impact = detector.get('impact', 'Informational')
issues[impact].append(detector)
# 生成报告
print("🔍 Slither Analysis Summary")
print("=" * 50)
total_issues = sum(len(issues[severity]) for severity in issues)
if total_issues == 0:
print("✅ No security issues found!")
return True
for severity in ['High', 'Medium', 'Low', 'Informational']:
count = len(issues[severity])
if count > 0:
print(f"{severity}: {count} issues")
for issue in issues[severity]:
print(f" - {issue.get('check', 'Unknown')}: {issue.get('description', 'No description')}")
# 设置退出码
if len(issues['High']) > 0:
print("\n❌ High severity issues found!")
return False
elif len(issues['Medium']) > 3: # 允许少量中等严重程度问题
print("\n⚠️ Too many medium severity issues!")
return False
print("\n✅ Security analysis passed!")
return True
if __name__ == "__main__":
if len(sys.argv) != 2:
print("Usage: python analyze-slither.py <report.json>")
sys.exit(1)
success = analyze_slither_report(sys.argv[1])
sys.exit(0 if success else 1)
安全测试最佳实践
1. 全面的测试覆盖
contract ComprehensiveSecurityTest is Test {
// 测试所有可能的攻击向量
function test_AllAttackVectors() public {
_testReentrancy();
_testIntegerOverflow();
_testAccessControl();
_testFrontRunning();
_testFlashLoanAttacks();
_testGovernanceAttacks();
_testOracleManipulation();
}
function _testReentrancy() internal {
// 重入攻击测试
}
function _testIntegerOverflow() internal {
// 整数溢出测试
}
// ... 其他测试函数
}
2. 边界条件测试
contract BoundarySecurityTest is Test {
function test_EdgeCases() public {
// 测试极值
_testWithMaxUint();
_testWithZeroValues();
_testWithMinimalValues();
// 测试边界状态
_testEmptyContract();
_testFullContract();
}
function _testWithMaxUint() internal {
uint256 maxVal = type(uint256).max;
// 使用最大值进行测试
}
function _testWithZeroValues() internal {
// 使用零值进行测试
}
}
3. 状态机安全测试
contract StateMachineSecurityTest is Test {
enum State { Inactive, Active, Paused, Terminated }
function test_IllegalStateTransitions() public {
// 测试非法状态转换
// 应该不能从 Inactive 直接到 Terminated
vm.expectRevert("Invalid state transition");
contract.setState(State.Terminated);
// 测试所有可能的状态转换
_testAllStateTransitions();
}
function _testAllStateTransitions() internal {
State[4] memory states = [State.Inactive, State.Active, State.Paused, State.Terminated];
for (uint i = 0; i < states.length; i++) {
for (uint j = 0; j < states.length; j++) {
if (i != j) {
_testStateTransition(states[i], states[j]);
}
}
}
}
function _testStateTransition(State from, State to) internal {
// 测试特定的状态转换
}
}
持续安全监控
自动化安全检查
# .github/workflows/security.yml
name: Security Checks
on:
push:
branches: [ main, develop ]
pull_request:
branches: [ main ]
schedule:
- cron: '0 2 * * *' # 每天凌晨2点运行
jobs:
security-analysis:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
with:
submodules: recursive
- name: Install Foundry
uses: foundry-rs/foundry-toolchain@v1
- name: Install Slither
run: |
pip3 install slither-analyzer
pip3 install solc-select
solc-select install 0.8.19
solc-select use 0.8.19
- name: Run security tests
run: |
forge test --match-contract "SecurityTest" --gas-report
- name: Run Slither analysis
run: |
slither . --exclude-dependencies --json slither-report.json
- name: Analyze results
run: |
python3 scripts/analyze-slither.py slither-report.json
- name: Upload security report
uses: actions/upload-artifact@v3
with:
name: security-report
path: |
slither-report.json
gas-report.txt
总结
安全测试是智能合约开发的关键环节,需要:
- 全面覆盖: 测试所有已知的攻击向量
- 自动化: 集成到 CI/CD 流程中
- 持续监控: 定期运行安全检查
- 多层防护: 结合静态分析、动态测试和人工审计
- 及时更新: 跟上最新的安全威胁和防护措施
通过系统性的安全测试,可以大大提高智能合约的安全性和可靠性。
下一步
完成安全测试学习后,继续学习: