第一章 revm浅析
REVM核心是一个解释器,负责evm tx的解释执行,本次目标是弄清一笔tx是如何执行的
1. 本地运算与栈
EVM是一个stack machine,stack最多1024个item,每个item 是256 bit.
#![allow(unused)] fn main() { // crates/interpreter/src/interpreter/stack.rs /// EVM interpreter stack limit. pub const STACK_LIMIT: usize = 1024; /// EVM stack with [STACK_LIMIT] capacity of words. #[derive(Debug, PartialEq, Eq, Hash)] #[cfg_attr(feature = "serde", derive(serde::Serialize))] pub struct Stack { /// The underlying data of the stack. data: Vec<U256>, } }
执行 ADD 操作
#![allow(unused)] fn main() { // crates/interpreter/src/instructions/arithmetic.rs pub fn add<WIRE: InterpreterTypes, H: Host + ?Sized>( interpreter: &mut Interpreter<WIRE>, _host: &mut H, ) { gas!(interpreter, gas::VERYLOW); // 检查gas 是否足够 popn_top!([op1], op2, interpreter); // 弹出op1, op2是top的可变引用,stack就保存在interpreter中 *op2 = op1.wrapping_add(*op2); // 相加 } }
2. 访问链上storage
定义storage接口
#![allow(unused)] fn main() { // crates/database/interface/src/lib.rs /// EVM database interface. #[auto_impl(&mut, Box)] pub trait Database { /// The database error type. type Error: DBErrorMarker + Error; /// Gets basic account information. fn basic(&mut self, address: Address) -> Result<Option<AccountInfo>, Self::Error>; /// Gets account code by its hash. fn code_by_hash(&mut self, code_hash: B256) -> Result<Bytecode, Self::Error>; /// Gets storage value of address at index. fn storage(&mut self, address: Address, index: U256) -> Result<U256, Self::Error>; /// Gets block hash by block number. fn block_hash(&mut self, number: u64) -> Result<B256, Self::Error>; } }
以执行 SLOAD为例
#![allow(unused)] fn main() { // crates/interpreter/src/instructions/host.rs pub fn sload<WIRE: InterpreterTypes, H: Host + ?Sized>( interpreter: &mut Interpreter<WIRE>, host: &mut H, // host结构体包含一个database的实例 ) { popn_top!([], index, interpreter); // index就是要查询的key let Some(value) = host.sload(interpreter.input.target_address(), *index) else { // 调用host的 sload() 函数 interpreter .control .set_instruction_result(InstructionResult::FatalExternalError); // 调用失败就报错 return; }; gas!( interpreter, gas::sload_cost(interpreter.runtime_flag.spec_id(), value.is_cold) ); *index = value.data; } }
Host.sload() 会先从HashMap缓存中尝试读取,如果缓存未命中就从 Database 的实例中调用storage(address, key)
函数中获取并缓存。
Host.sstore() 先判断新值和旧值是否一样,如果一样直接返回,否则修改旧值,并记录一个修改事件 ENTRY::storage_changed(address, key, present.data)
3. 访问Memory
定义 Memory 接口,操作看上去和u8数组非常类似
#![allow(unused)] fn main() { // crates/interpreter/src/interpreter_types.rs /// Trait for Interpreter memory operations. pub trait MemoryTr { /// Sets memory data at given offset from data with a given data_offset and len. /// /// # Panics /// /// Panics if range is out of scope of allocated memory. fn set_data(&mut self, memory_offset: usize, data_offset: usize, len: usize, data: &[u8]); /// Sets memory data at given offset. /// /// # Panics /// /// Panics if range is out of scope of allocated memory. fn set(&mut self, memory_offset: usize, data: &[u8]); /// Returns memory size. fn size(&self) -> usize; /// Copies memory data from source to destination. /// /// # Panics /// Panics if range is out of scope of allocated memory. fn copy(&mut self, destination: usize, source: usize, len: usize); /// Memory slice with range /// /// # Panics /// /// Panics if range is out of scope of allocated memory. fn slice(&self, range: Range<usize>) -> impl Deref<Target = [u8]> + '_; /// Memory slice len /// /// Uses [`slice`][MemoryTr::slice] internally. fn slice_len(&self, offset: usize, len: usize) -> impl Deref<Target = [u8]> + '_ { self.slice(offset..offset + len) } /// Resizes memory to new size /// /// # Note /// /// It checks memory limits. fn resize(&mut self, new_size: usize) -> bool; } }
Memory实例
#![allow(unused)] fn main() { // crates/interpreter/src/interpreter/shared_memory.rs /// A sequential memory shared between calls, which uses /// a `Vec` for internal representation. /// A [SharedMemory] instance should always be obtained using /// the `new` static method to ensure memory safety. #[derive(Clone, PartialEq, Eq, Hash)] #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] pub struct SharedMemory { /// The underlying buffer. buffer: Vec<u8>, /// Memory checkpoints for each depth. /// Invariant: these are always in bounds of `data`. checkpoints: Vec<usize>, /// Invariant: equals `self.checkpoints.last()` last_checkpoint: usize, /// Memory limit. See [`Cfg`](context_interface::Cfg). #[cfg(feature = "memory_limit")] memory_limit: u64, } }
mload实现
#![allow(unused)] fn main() { pub fn mload<WIRE: InterpreterTypes, H: Host + ?Sized>( interpreter: &mut Interpreter<WIRE>, _host: &mut H, ) { gas!(interpreter, gas::VERYLOW); popn_top!([], top, interpreter); let offset = as_usize_or_fail!(interpreter, top); resize_memory!(interpreter, offset, 32); *top = U256::try_from_be_slice(interpreter.memory.slice_len(offset, 32).as_ref()).unwrap() // 从interpreter中读memory } }
4. 访问transient storage
什么是transient storage? 类似storage,但是生命周期比storage短,类似memory,但是生命周期比memory长
A -> B
以tload举例
#![allow(unused)] fn main() { /// EIP-1153: Transient storage opcodes /// Load value from transient storage pub fn tload<WIRE: InterpreterTypes, H: Host + ?Sized>( interpreter: &mut Interpreter<WIRE>, host: &mut H, ) { check!(interpreter, CANCUN); gas!(interpreter, gas::WARM_STORAGE_READ_COST); popn_top!([], index, interpreter); *index = host.tload(interpreter.input.target_address(), *index); // 从host.tload中得到值 } }
实际类型如下
#![allow(unused)] fn main() { pub type TransientStorage = HashMap<(Address, U256), U256>; }
5. Call 与 Return
#![allow(unused)] fn main() { pub fn call<WIRE: InterpreterTypes, H: Host + ?Sized>( interpreter: &mut Interpreter<WIRE>, host: &mut H, ) { popn!([local_gas_limit, to, value], interpreter); //... //省略一些参数读取,校验 // Call host to interact with target contract interpreter.control.set_next_action( InterpreterAction::NewFrame(FrameInput::Call(Box::new(CallInputs { input, gas_limit, target_address: to, caller: interpreter.input.target_address(), bytecode_address: to, value: CallValue::Transfer(value), scheme: CallScheme::Call, is_static: interpreter.runtime_flag.is_static(), is_eof: false, return_memory_offset, }))), InstructionResult::CallOrCreate, ); } }
核心是InterpreterAction::NewFrame,设置了call的各种参数。
#![allow(unused)] fn main() { fn return_inner( interpreter: &mut Interpreter<impl InterpreterTypes>, instruction_result: InstructionResult, ) { // Zero gas cost // gas!(interpreter, gas::ZERO) popn!([offset, len], interpreter); // ... // 省略一些步骤 interpreter.control.set_next_action( InterpreterAction::Return { result: InterpreterResult { output, gas, result: instruction_result, }, }, instruction_result, ); } }
核心是设置InterpreterAction::Return,包含了返回值。
一个Frame代表一次call或者create,例如合约A调用合约B的某个函数,总共就有2个frame。 如果合约A只是内部调用合约A的另一个函数,那么只有一个frame
contract A {
function foo() {
// B.bar(); 会创建一个新frame,bar结束后会执行return
// bar(); 不会创建新frame,以jump的形式返回到foo函数中
// this.bar(); 会创建一个新frame,bar结束后会执行return
}
function bar() {
}
}
contract B {
function bar() {}
}
#![allow(unused)] fn main() { pub struct EthFrame<EVM, ERROR, IW: InterpreterTypes> { phantom: core::marker::PhantomData<(EVM, ERROR)>, /// Data of the frame. data: FrameData, /// Input data for the frame. 包含calldata,gas_limit等数据 pub input: FrameInput, /// Depth of the call frame. depth: usize, /// Journal checkpoint. pub checkpoint: JournalCheckpoint, /// Interpreter. pub interpreter: Interpreter<IW>, // This is worth making as a generic type FrameSharedContext. pub memory: Rc<RefCell<SharedMemory>>, } }
每个frame有自己的calldata,msg.sender, 独立的栈和memory,但共享 storage 和 transient storage
(所以transient storage非常适合用来做重入锁)
6. tx执行入口
需要准备好Context,这个Context包含database实例,tx的相关metadata,链的配置信息,block信息等
#![allow(unused)] fn main() { /// EVM context contains data that EVM needs for execution. #[derive_where(Clone, Debug; BLOCK, CFG, CHAIN, TX, DB, JOURNAL, <DB as Database>::Error)] pub struct Context< BLOCK = BlockEnv, TX = TxEnv, CFG = CfgEnv, DB: Database = EmptyDB, JOURNAL: JournalTr<Database = DB> = Journal<DB>, CHAIN = (), > { /// Block information. pub block: BLOCK, /// Transaction information. pub tx: TX, /// Configurations. pub cfg: CFG, /// EVM State with journaling support and database. pub journaled_state: JOURNAL, /// Inner context. pub chain: CHAIN, /// Error that happened during execution. pub error: Result<(), ContextError<DB::Error>>, } }
调用run()
函数开始执行
#![allow(unused)] fn main() { // crates/handler/src/handler.rs /// The main entry point for transaction execution. /// /// This method calls [`Handler::run_without_catch_error`] and if it returns an error, /// calls [`Handler::catch_error`] to handle the error and cleanup. /// /// The [`Handler::catch_error`] method ensures intermediate state is properly cleared. #[inline] fn run( &mut self, evm: &mut Self::Evm, ) -> Result<ResultAndState<Self::HaltReason>, Self::Error> { // Run inner handler and catch all errors to handle cleanup. match self.run_without_catch_error(evm) { Ok(output) => Ok(output), Err(e) => self.catch_error(evm, e), } } }
在执行前后做一些验证或收尾(refund)工作
#![allow(unused)] fn main() { fn run_without_catch_error( &mut self, evm: &mut Self::Evm, ) -> Result<ResultAndState<Self::HaltReason>, Self::Error> { let init_and_floor_gas = self.validate(evm)?; let eip7702_refund = self.pre_execution(evm)? as i64; let exec_result = self.execution(evm, &init_and_floor_gas)?; // 执行入口 self.post_execution(evm, exec_result, init_and_floor_gas, eip7702_refund) } }
初始化frame,并开始执行第一个frame
#![allow(unused)] fn main() { fn execution( &mut self, evm: &mut Self::Evm, init_and_floor_gas: &InitialAndFloorGas, ) -> Result<FrameResult, Self::Error> { let gas_limit = evm.ctx().tx().gas_limit() - init_and_floor_gas.initial_gas; // Create first frame action let first_frame_input = self.first_frame_input(evm, gas_limit)?; let first_frame = self.first_frame_init(evm, first_frame_input)?; let mut frame_result = match first_frame { ItemOrResult::Item(frame) => self.run_exec_loop(evm, frame)?, // 开始执行第一个frame ItemOrResult::Result(result) => result, }; self.last_frame_result(evm, &mut frame_result)?; Ok(frame_result) } }
不断执行存在的frame
#![allow(unused)] fn main() { fn run_exec_loop( &mut self, evm: &mut Self::Evm, frame: Self::Frame, ) -> Result<FrameResult, Self::Error> { let mut frame_stack: Vec<Self::Frame> = vec![frame]; loop { let frame = frame_stack.last_mut().unwrap(); let call_or_result = self.frame_call(frame, evm)?; // 执行frame let result = match call_or_result { ItemOrResult::Item(init) => { match self.frame_init(frame, evm, init)? { ItemOrResult::Item(new_frame) => { frame_stack.push(new_frame); continue; } // Do not pop the frame since no new frame was created ItemOrResult::Result(result) => result, } } ItemOrResult::Result(result) => { // Remove the frame that returned the result frame_stack.pop(); result } }; let Some(frame) = frame_stack.last_mut() else { return Ok(result); }; self.frame_return_result(frame, evm, result)?; } } }
#![allow(unused)] fn main() { pub fn run_plain<H: Host + ?Sized>( &mut self, instruction_table: &InstructionTable<IW, H>, host: &mut H, ) -> InterpreterAction { self.reset_control(); // 设置为continue // Main loop while self.control.instruction_result().is_continue() { self.step(instruction_table, host); // 不断执行opcode } self.take_next_action() } }
step函数
#![allow(unused)] fn main() { pub(crate) fn step<H: Host + ?Sized>( &mut self, instruction_table: &[Instruction<IW, H>; 256], host: &mut H, ) { // Get current opcode. let opcode = self.bytecode.opcode(); // SAFETY: In analysis we are doing padding of bytecode so that we are sure that last // byte instruction is STOP so we are safe to just increment program_counter bcs on last instruction // it will do noop and just stop execution of this contract self.bytecode.relative_jump(1); // Execute instruction. instruction_table[opcode as usize](self, host) } }
instruction table
#![allow(unused)] fn main() { pub const fn instruction_table<WIRE: InterpreterTypes, H: Host + ?Sized>( ) -> [Instruction<WIRE, H>; 256] { use bytecode::opcode::*; let mut table = [control::unknown as Instruction<WIRE, H>; 256]; table[STOP as usize] = control::stop; table[ADD as usize] = arithmetic::add; table[MUL as usize] = arithmetic::mul; table[SUB as usize] = arithmetic::sub; table[DIV as usize] = arithmetic::div; table[SDIV as usize] = arithmetic::sdiv; table[MOD as usize] = arithmetic::rem; table[SMOD as usize] = arithmetic::smod; table[ADDMOD as usize] = arithmetic::addmod; table[MULMOD as usize] = arithmetic::mulmod; table[EXP as usize] = arithmetic::exp; table[SIGNEXTEND as usize] = arithmetic::signextend; ... }
7. 合约部署
evm部署合约都是执行init code,然后将runtime code部署到链上storage
- 整个tx就是部署合约,构造的第一个frame类型为create
#![allow(unused)] fn main() { pub enum FrameData { Call(CallFrame), Create(CreateFrame), EOFCreate(EOFCreateFrame), } }
- 在链上通过create 指令,也是构造一个新的frame,frame类型为create,这样可以继续执行合约的构造函数并部署合约。
Reth Storage 存储层设计
1. 概述
Reth 存储层是一个高性能、模块化的以太坊数据存储解决方案。它采用分层架构设计,实现了数据访问的抽象和具体存储的分离,同时提供了丰富的功能和优秀的性能。
在接下来的内容中,我们将依次介绍各个核心层次的设计理念、主要接口和它们之间的协作关系,帮助你系统理解 Reth 存储系统的整体架构。
1.1 核心特点
- 分层架构:清晰的层次划分,职责分明
- 高性能:优化的数据访问和存储机制
- 可扩展:模块化设计,易于扩展
- 可靠性:完善的事务和并发控制
- 可维护:清晰的代码组织和文档
1.2 整体架构
flowchart LR App[应用层] --> API[Storage API] API --> DB[Database API] DB --> Store[(存储层)] style App fill:#f9f9f9,stroke:#333,stroke-width:2px style API fill:#e8f5e9,stroke:#2e7d32,stroke-width:2px style DB fill:#fff3e0,stroke:#e65100,stroke-width:2px style Store fill:#e1f5fe,stroke:#01579b,stroke-width:2px
2. 核心组件
下面结合模块架构图,依次介绍各核心层次的设计与接口。
在整体架构的基础上,Reth 存储系统将核心功能分为四大层次。下面我们将依次展开介绍。
2.1 存储 API 层 (storage-api)
flowchart LR API[Storage API] subgraph State[State Provider] direction TB S1[account_balance] S2[account_code] S3[storage] S4[bytecode_by_hash] end subgraph Block[Block Provider] direction TB B1[block_by_hash] B2[block_by_number] B3[block_transactions] end subgraph Tx[Transaction Provider] direction TB T1[transaction_by_hash] T2[receipt_by_hash] T3[transaction_by_block] end subgraph Chain[Chain Provider] direction TB C1[chain_info] C2[block_number] C3[block_hash] end API --> State API --> Block API --> Tx API --> Chain style API fill:#e8f5e9,stroke:#2e7d32,stroke-width:2px style State fill:#e8f5e9,stroke:#2e7d32,stroke-width:2px style Block fill:#e8f5e9,stroke:#2e7d32,stroke-width:2px style Tx fill:#e8f5e9,stroke:#2e7d32,stroke-width:2px style Chain fill:#e8f5e9,stroke:#2e7d32,stroke-width:2px
首先,最上层的 Storage API 层为上层模块(如 RPC、同步、执行等)提供了统一、类型安全的数据访问接口。它屏蔽了底层实现细节,使开发者可以专注于业务逻辑。
-
状态管理
StateProvider
: 状态数据访问StateWriter
: 状态数据写入StateReader
: 状态数据读取
-
区块管理
BlockProvider
: 区块数据访问BlockWriter
: 区块数据写入HeaderProvider
: 区块头管理
-
交易管理
TransactionsProvider
: 交易数据访问ReceiptsProvider
: 收据数据访问WithdrawalsProvider
: 提款数据访问
-
链管理
ChainProvider
: 链数据访问ChainInfo
: 链信息管理HistoryProvider
: 历史数据访问
2.1.1 storage-api 主要 trait 与接口
Reth 的 storage-api
层定义了以太坊节点存储访问的核心 trait 和类型,是上层与底层存储实现解耦的关键。其主要特性和接口如下:
1. 统一抽象与分层设计
- 通过 trait 抽象区块、状态、交易、链、收据、历史、哈希、trie 等多种数据访问与写入接口。
- 支持只读/读写、历史/最新/裁剪等多种访问模式。
- 便于 provider 层、db-api 层和具体存储实现的解耦与组合。
2. 主要 trait 及功能
StateProvider
/StateProviderFactory
:统一的状态访问与工厂接口,支持最新、历史、pending 状态的获取。BlockReader
/BlockWriter
:区块数据的读取与写入,支持多种来源(数据库、pending、canonical)。HeaderProvider
:区块头访问,支持 hash/number/tag 多种定位方式。TransactionsProvider
/ReceiptProvider
:交易与收据的高效访问。AccountReader
/StorageReader
:账户与存储槽的访问,支持批量与变更集。TrieWriter
/StateRootProvider
/StorageRootProvider
:trie 相关操作与状态根计算。HistoryWriter
/HashingWriter
:历史索引与哈希表的维护。FullRpcProvider
:组合所有核心 trait,便于 RPC 层依赖。
3. 类型安全与可扩展性
- 所有 trait 强类型约束,接口清晰,便于静态检查和 IDE 自动补全。
- 支持 auto_impl,便于 Arc/Box/引用等多种包装。
- trait 组合灵活,便于扩展和 mock 测试。
4. 典型用法
#![allow(unused)] fn main() { // 获取最新状态 provider let state_provider = StateProviderFactory::latest()?; // 查询账户余额 let balance = state_provider.account_balance(&address)?; // 查询区块 let block = block_reader.block_by_number(1000)?; // 写入区块 block_writer.insert_block(block, StorageLocation::Database)?; // 计算状态根 let state_root = state_provider.state_root(hashed_state)?; }
5. 设计亮点
- 统一 trait 抽象,便于多后端实现和切换
- 支持历史/裁剪/最新等多种状态视图
- 类型安全、易于测试和扩展
- 与 provider/db-api 层协作紧密,支撑高性能和灵活的以太坊节点存储
结论:storage-api 层是 Reth 存储系统的"接口规范层",通过 trait 组合和类型系统,极大提升了系统的灵活性、可维护性和性能,是理解 Reth 存储架构的基础。
2.2 Provider 层核心接口与功能
flowchart LR State[State Provider] subgraph Reader[State Reader] direction TB R1[account_balance] R2[account_code] R3[storage] R4[bytecode_by_hash] end subgraph Writer[State Writer] direction TB W1[insert_account] W2[insert_storage] W3[insert_code] W4[delete_account] end subgraph Cache[State Cache] direction TB C1[get_account] C2[get_storage] C3[get_code] end State --> Reader State --> Writer State --> Cache style State fill:#e8f5e9,stroke:#2e7d32,stroke-width:2px style Reader fill:#e8f5e9,stroke:#2e7d32,stroke-width:2px style Writer fill:#e8f5e9,stroke:#2e7d32,stroke-width:2px style Cache fill:#e8f5e9,stroke:#2e7d32,stroke-width:2px
紧接着,Provider 层作为"服务接口层",在 storage-api trait 的基础上,聚合了多种数据源(数据库、静态文件、内存 overlay、裁剪/历史等),为上层提供一致的数据视图。
1. ProviderFactory
- 管理数据库、静态文件、链配置等,生成只读/读写 provider。
- 支持多后端、裁剪、metrics、可插拔。
2. DatabaseProvider
- 封装事务,统一实现区块、交易、账户、状态等多种数据访问和写入接口。
- 支持只读/读写、历史/最新、裁剪/非裁剪等多种模式。
3. BlockchainProvider
- 区块链全局视图入口,聚合数据库、内存 overlay、forkchoice 状态等。
- 实现所有核心数据访问 trait,支持一致性视图和多种状态访问。
4. StateProvider/LatestStateProviderRef/HistoricalStateProviderRef
- 状态访问统一 trait,支持"最新状态"和"历史状态"两种模式。
- 历史 provider 支持裁剪感知,能优雅处理被裁剪的历史区块。
5. FullProvider trait
- 组合所有 provider trait,便于上层依赖"全功能 provider"。
6. 典型用法
#![allow(unused)] fn main() { // 获取 provider let provider = provider_factory.provider()?; // 区块头、区块、交易、账户访问 let header = provider.header_by_number(1000)?; let block = provider.block(BlockHashOrNumber::Number(1000))?; let tx = provider.transaction_by_hash(tx_hash)?; let account = provider.basic_account(&address)?; // 状态访问 let state_provider = provider_factory.latest()?; let historical_provider = provider_factory.history_by_block_number(9000000)?; // 写入与回滚 let mut provider_rw = provider_factory.provider_rw()?; provider_rw.insert_block(block, StorageLocation::Database)?; provider_rw.remove_state_above(block_number, StorageLocation::Database)?; }
7. 设计优势
- 统一接口、多后端支持、裁剪与历史感知、类型安全、可扩展性、高性能。
结论:Provider 层是 Reth 存储系统的"服务接口层",通过 trait 组合和泛型,极大提升了系统的灵活性、可维护性和性能,是理解 Reth 存储架构的关键一环。
2.2 数据库抽象层 (db-api)
flowchart LR DB[Database API] subgraph Table[Table] direction TB T1[NAME: &str] T2[DUPSORT: bool] T3[Key: Key] T4[Value: Value] end subgraph Tx[Transaction] direction TB X1[tx] X2[tx_mut] X3[commit] X4[abort] end subgraph Lock[Lock Manager] direction TB L1[lock] L2[unlock] L3[try_lock] end DB --> Table DB --> Tx DB --> Lock style DB fill:#fff3e0,stroke:#e65100,stroke-width:2px style Table fill:#fff3e0,stroke:#e65100,stroke-width:2px style Tx fill:#fff3e0,stroke:#e65100,stroke-width:2px style Lock fill:#fff3e0,stroke:#e65100,stroke-width:2px
在 Provider 层之下,db-api 层承担着"数据库抽象层"的角色。它屏蔽了具体数据库实现(如 MDBX),为上层提供类型安全、事务化的表操作接口。
2.2.1 db-api 主要 trait 与接口
Reth 的 db-api
层定义了底层数据库的抽象接口,是存储实现与上层 provider/storage-api 解耦的关键。其主要特性和接口如下:
1. 事务与表抽象
Database
trait:数据库主接口,支持只读/读写事务(tx
/tx_mut
),并提供事务生命周期管理(view
/update
)。DbTx
/DbTxMut
:只读/读写事务抽象,支持表的读写、游标遍历、提交/回滚等。Table
trait:表结构抽象,定义表名、是否 DUPSORT、Key/Value 类型。DupSort
trait:支持一对多(重复 key)表。
2. 游标与遍历
DbCursorRO
/DbCursorRW
:表的只读/读写游标,支持 seek/next/prev/first/last 等操作。Walker
/RangeWalker
/ReverseWalker
/DupWalker
:基于游标的高效遍历与批量操作。
3. 序列化与压缩
Encode
/Decode
trait:Key 的序列化与反序列化。Compress
/Decompress
trait:Value 的压缩与解压缩,支持多种编码格式(如 Compact、Scale、RoaringBitmap)。- 支持自定义表模型(如 ShardedKey、StorageShardedKey、IntegerList 等)。
4. 类型安全与可扩展性
- 所有 trait 强类型约束,接口清晰,便于静态检查和 IDE 自动补全。
- 支持 mock 数据库(
DatabaseMock
),便于测试。 - trait 组合灵活,便于扩展和多后端实现。
5. 典型用法
#![allow(unused)] fn main() { // 打开数据库并开启只读事务 let db: Arc<dyn Database> = ...; let tx = db.tx()?; // 读取表数据 let value = tx.get::<MyTable>(key)?; // 使用游标遍历表 let mut cursor = tx.cursor_read::<MyTable>()?; while let Some((k, v)) = cursor.next()? { // 处理 k, v } // 写入事务 let mut tx_mut = db.tx_mut()?; tx_mut.put::<MyTable>(key, value)?; tx_mut.commit()?; }
6. 设计亮点
- 事务安全、强一致性,所有操作都在事务内完成
- 支持高效批量遍历与范围操作
- 多种表模型和压缩格式,兼容以太坊多样数据结构
- 类型安全、易于测试和扩展
- 与 storage-api/provider 层协作紧密,支撑高性能和灵活的以太坊节点存储
结论:db-api 层是 Reth 存储系统的"数据库抽象层",通过事务、表、游标、序列化等 trait 组合,极大提升了系统的灵活性、可维护性和性能,是理解 Reth 存储架构的核心基础。
2.3 存储实现层 (db)
flowchart LR Store[存储层] subgraph MDBX[MDBX] direction TB M1[open] M2[create_table] M3[drop_table] end subgraph Cache[Cache] direction TB C1[get] C2[insert] C3[remove] C4[clear] end subgraph Static[Static File] direction TB S1[read] S2[write] S3[append] end Store --> MDBX Store --> Cache Store --> Static style Store fill:#e1f5fe,stroke:#01579b,stroke-width:2px style MDBX fill:#e1f5fe,stroke:#01579b,stroke-width:2px style Cache fill:#e1f5fe,stroke:#01579b,stroke-width:2px style Static fill:#e1f5fe,stroke:#01579b,stroke-width:2px
最后,存储实现层提供了具体的存储后端实现(如 MDBX、静态文件、缓存等),支撑了高性能和多样化的存储需求。它实现了 db-api trait,供上层调用。
3. 关键特性
在上述分层架构的基础上,Reth 存储系统还具备如下关键特性:
3.1 性能优化
-
数据压缩
- ZSTD 压缩支持
- 自定义压缩算法
- 压缩级别可配置
-
缓存机制
- 多级缓存
- LRU 缓存策略
- 预加载机制
-
批量操作
- 批量读取
- 批量写入
- 事务批处理
3.2 可靠性保证
-
事务支持
- ACID 特性
- 事务隔离
- 原子操作
-
并发控制
- 文件锁机制
- 读写锁
- 死锁预防
-
错误处理
- 统一错误类型
- 错误传播链
- 错误恢复机制
3.3 可扩展性
-
模块化设计
- 接口抽象
- 实现分离
- 插件化架构
-
存储引擎
- MDBX 支持
- 静态文件支持
- 可扩展接口
-
数据模型
- 灵活的表结构
- 自定义编码
- 版本兼容
4. 使用示例
通过前面的分层讲解,我们可以看到各层 trait 和接口如何协作。下面给出一些典型的使用场景代码片段,帮助理解实际开发中的调用方式。
4.1 基本操作
#![allow(unused)] fn main() { // 创建状态提供者 let state_provider = StateProviderFactory::latest()?; // 查询账户信息 let balance = state_provider.account_balance(&address)?; let code = state_provider.account_code(&address)?; // 查询存储数据 let storage = state_provider.storage(address, storage_key)?; }
4.2 历史数据访问
#![allow(unused)] fn main() { // 获取历史状态 let historical_state = state_provider.history_by_block_number(block_number)?; // 获取待处理状态 let pending_state = state_provider.pending()?; }
5. 最佳实践
结合实际开发经验,推荐如下最佳实践以充分发挥 Reth 存储系统的性能和可靠性:
5.1 状态访问
- 优先使用批量操作
- 合理使用缓存
- 注意错误处理
5.2 数据写入
- 使用事务保证原子性
- 注意并发安全
- 合理使用批量写入
5.3 性能优化
- 使用适当的索引
- 避免重复查询
- 合理使用缓存
6. 总结
综上所述,Reth 存储层设计展示了以下特点:
- 高度模块化:清晰的层次结构和职责划分
- 类型安全:强类型系统和编译时检查
- 性能优化:多级缓存和压缩机制
- 可扩展性:灵活的接口和实现分离
- 可靠性:完善的事务和并发控制
- 可维护性:清晰的代码组织和文档
这些设计使得 Reth 能够高效地处理以太坊节点所需的各种数据存储需求,同时保证了系统的可靠性和可维护性。存储 API 的设计特别注重模块化和可扩展性,使得系统能够轻松适应不同的存储需求和性能要求。
Reth 存储层源码走读
Reth 存储层源码走读
Reth 是什么?
Reth is an Execution Layer (EL) and is compatible with all Ethereum Consensus Layer (CL) implementations that support the Engine API.
在以太坊 The Merge 之后,节点分成了两个部分,其中部分负责共识,另部分负责状态维护。
graph LR subgraph 共识层 CL cl1[Slot / Epoch 驱动] cl2[区块提议 / 投票] cl3[Finality 决定] cl4[Beacon 链同步] end subgraph 执行层 EL el1[交易执行(EVM)] el2[状态更新(State)] el3[交易池管理] el4[账户 / 区块存储] end cl1 -.-> API[Engine API] cl2 -.-> API cl3 -.-> API cl4 -.-> API API -.-> el1 API -.-> el2 API -.-> el3 API -.-> el4
以太坊的状态
-
区块
- 区块头
- 区块体
-
链
- 父叔
- 高度
-
账户状态 [MPT]
- balance
共识层通过 EngineAPI 调用执行层,那么 Engine API 有哪些语义?
方法名 | 用途 | 场景 |
engine_newPayloadVx | 提交一个新区块给执行层 | 共识层接收到区块头(比如来自其他节点) |
engine_forkchoiceUpdatedVx | 告诉执行层哪个区块是链头 | 切换链头、出块前、同步时 |
engine_getPayloadVx | 要求执行层构造一个新区块 | 共识层要提议新区块(出块)时 |
engine_exchangeTransitionConfiguration | 启动时交换合并配置 | 共识客户端刚启动时 |
engine_getPayloadBodies - ByHash - ByRange | 请求区块体(交易列表) | 共识节点快速同步时 |
为了实现 Engine API 的语义,执行层要有哪些能力?
方法名 | 数据 | 能力 |
engine_newPayloadVx | - 链头 - 账户状态 | - 获取当前链头 - 通过账户ID获取账户状态 - 记录临时状态 |
engine_forkchoiceUpdatedVx | - 历史某一个区块 | - 更新链的头部状态 [最高的链] - 快速切换/回滚到不同的链 - 维护 finalized/safe block |
engine_getPayloadVx | - 链头 - 账户状态 | - 构建新区块 - 获取当前链头 - 快速生成 block header/body |
engine_exchangeTransitionConfiguration | - 配置项 | |
engine_getPayloadBodies - ByHash - ByRange | - 链头 - 链高度 - 链哈希 | - 通过哈希定位一个块 - 通过高度范围定位一系列块 |
Reth [执行层] = 逻辑层 + 存储层,存储层给逻辑层暴露了哪些能力?怎么样暴露的?
怎么样暴露的 ?
- Reth 中 BlockchainProvider 作为整个存储层的门户。
暴露了哪些能力 ?
- 实现了 数据结构的 Trait + 链状态的 Trait 【门面模式】,将存储的能力暴露给逻辑层。
- Header
- Block
- Receipts
- State
- Latest
- by_block_num
- by_block_hash
#![allow(unused)] fn main() { crates/storage/provider/src/providers/blockchain_provider.rs /// The main type for interacting with the blockchain. /// /// This type serves as the main entry point for interacting with the blockchain and provides data /// from database storage and from the blockchain tree (pending state etc.) It is a simple wrapper /// type that holds an instance of the database and the blockchain tree. #[derive(Debug)] pub struct BlockchainProvider<N: NodeTypesWithDB> { /// Provider factory used to access the database. pub(crate) database: ProviderFactory<N>, /// Tracks the chain info wrt forkchoice updates and in memory canonical /// state. pub(crate) canonical_in_memory_state: CanonicalInMemoryState<N::Primitives>, } }
字段 | 类型 | 作用 | 服务对象 |
database | ProviderFactory | 持久化层的访问接口 - mdbx - static_file | 历史数据访问 |
canonical_in_memory_state | CanonicalInMemoryState | 管理和缓存当前主链的内存状态,包括: - 当前的 pending block - 当前的 head block - 当前的 finalized block - 当前的 safe block | - 验证区块 - 更新链头 - 产生区块 |
内存中管理的 Block
名称 | 定义 | 特性 |
Head Block | 当前 forkchoice 算法选出的主链 tip 区块,是执行引擎所“跟随”的最新区块 | - 最前沿/高的合法块 - 可能被重组 - 实时 |
Safe Block | 被所有 honest validator 知晓、投票支持的区块,虽然未 final,但几乎不会 reorg | - 大概率不会被重组 - 尚未最终确定 |
Finalized Block | 被 >2/3 超级多数 validator 通过 Casper FFG 投票最终确定的区块 | - 永不可重组 - 状态可裁剪 - 滞后几个 epoch |
Pending Block | 还未被正式打包并广播到区块链上的下一个候选区块,由以太坊节点本地构建,包含已知合法但尚未上链的交易。 | - 未上链 - 可能被丢弃 - 模拟 |
通过 canonical_in_memory_state 访问了哪些接口
BlockNumReader
#![allow(unused)] fn main() { impl<N: ProviderNodeTypes> BlockNumReader for BlockchainProvider<N> { fn chain_info(&self) -> ProviderResult<ChainInfo> { Ok(self.canonical_in_memory_state.chain_info()) } fn best_block_number(&self) -> ProviderResult<BlockNumber> { Ok(self.canonical_in_memory_state.get_canonical_block_number()) } fn last_block_number(&self) -> ProviderResult<BlockNumber> { self.database.last_block_number() } fn block_number(&self, hash: B256) -> ProviderResult<Option<BlockNumber>> { self.consistent_provider()?.block_number(hash) } } }
方法名 | 数据源 | 原因 |
`chain_info()` | `canonical_in_memory_state` | 主链状态缓存,可快速返回 |
`best_block_number()` | `canonical_in_memory_state` | 最新同步高度,在 forkchoice 更新时写入 |
`last_block_number()` | `database` | 实际持久化层中最后一个区块号 |
`block_number(hash)` | `consistent_provider`(数据库) | 反查,没有维护在内存里,只能查表 |
BlockIdReader
#![allow(unused)] fn main() { impl<N: ProviderNodeTypes> BlockIdReader for BlockchainProvider<N> { fn pending_block_num_hash(&self) -> ProviderResult<Option<BlockNumHash>> { Ok(self.canonical_in_memory_state.pending_block_num_hash()) } fn safe_block_num_hash(&self) -> ProviderResult<Option<BlockNumHash>> { Ok(self.canonical_in_memory_state.get_safe_num_hash()) } fn finalized_block_num_hash(&self) -> ProviderResult<Option<BlockNumHash>> { Ok(self.canonical_in_memory_state.get_finalized_num_hash()) } } }
状态类型 | 含义 | 更新频率 | 使用场景 |
`pending` | 正在构建但未打包的区块(如打包交易) | 非常频繁 | 出块者构造区块时使用 |
`safe` | 被大多数节点认为“短期内不会被重组”的区块 | 同步期间频繁更新 | 提供相对稳定视图 |
`finalized` | 绝大多数网络认为“永不回滚”的区块 | 较稳定,偶尔更新 | 对外暴露、存档数据 |
BlockReader
#![allow(unused)] fn main() { impl<N: ProviderNodeTypes> BlockReader for BlockchainProvider<N> { type Block = BlockTy<N>; fn find_block_by_hash( &self, hash: B256, source: BlockSource, ) -> ProviderResult<Option<Self::Block>> { self.consistent_provider()?.find_block_by_hash(hash, source) } fn block(&self, id: BlockHashOrNumber) -> ProviderResult<Option<Self::Block>> { self.consistent_provider()?.block(id) } fn pending_block(&self) -> ProviderResult<Option<SealedBlock<Self::Block>>> { Ok(self.canonical_in_memory_state.pending_block()) } fn pending_block_with_senders(&self) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> { Ok(self.canonical_in_memory_state.pending_recovered_block()) } fn pending_block_and_receipts( &self, ) -> ProviderResult<Option<(SealedBlock<Self::Block>, Vec<Self::Receipt>)>> { Ok(self.canonical_in_memory_state.pending_block_and_receipts()) } }
方法名 | 数据来源 | 用途/功能 | 典型使用场景 |
`find_block_by_hash(hash, src)` | database | 查找任意区块 | 同步校验 历史查询 |
`block(id)` | database | 获取指定高度或哈希的区块 | JSON-RPC 接口 执行层读取历史状态 |
`pending_block()` | canonical_in_memory_state | 获取当前内存中的 pending 区块 | 构造新区块、 RPC 返回“pending”数据 |
`pending_block_with_senders()` | canonical_in_memory_state | 获取 pending 区块,并附带交易的 sender 信息 | 区块分析 前端展示发送方地址 |
`pending_block_and_receipts()` | canonical_in_memory_state | 获取 pending 区块及其 receipts(模拟执行) | 提前分析 Gas 消耗 出块准备 调试用 |
StateProviderFactory
#![allow(unused)] fn main() { impl<N: ProviderNodeTypes> StateProviderFactory for BlockchainProvider<N> { /// Storage provider for latest block fn latest(&self) -> ProviderResult<StateProviderBox> { trace!(target: "providers::blockchain", "Getting latest block state provider"); // use latest state provider if the head state exists if let Some(state) = self.canonical_in_memory_state.head_state() { trace!(target: "providers::blockchain", "Using head state for latest state provider"); Ok(self.block_state_provider(&state)?.boxed()) } else { trace!(target: "providers::blockchain", "Using database state for latest state provider"); self.database.latest() } /// Returns the state provider for pending state. /// /// If there's no pending block available then the latest state provider is returned: /// [`Self::latest`] fn pending(&self) -> ProviderResult<StateProviderBox> { trace!(target: "providers::blockchain", "Getting provider for pending state"); if let Some(pending) = self.canonical_in_memory_state.pending_state() { // we have a pending block return Ok(Box::new(self.block_state_provider(&pending)?)); } // fallback to latest state if the pending block is not available self.latest() } fn pending_state_by_hash(&self, block_hash: B256) -> ProviderResult<Option<StateProviderBox>> { if let Some(pending) = self.canonical_in_memory_state.pending_state() { if pending.hash() == block_hash { return Ok(Some(Box::new(self.block_state_provider(&pending)?))); } } Ok(None) } } }
方法名 | 数据来源 | 用途/功能 | 典型使用场景 |
`latest()` | 优先内存`head_state()` 否则从数据库 | 返回当前最新链头区块对应的状态提供器(用于访问该区块的账户/存储状态) | 查询 `head` 区块的状态(比如 RPC、执行层模拟执行) |
`pending()` | 优先内存 `pending_state()` 否则 `latest()` | 返回当前构建中(未打包)区块的状态提供器,用于模拟交易、出块构建等 | 出块构建器在构造 `pending block` 时,访问预执行状态 |
`pending_state_by_hash(h)` | 内存中 `pending_state()` 匹配 hash | 根据区块 hash 判断是否为当前 pending 状态,如果是,则返回对应状态提供器 | 共识层校验传入的 block 是否与本地 `pending` 一致,用于状态一致性检测等用途 |
CanonChainTracker
#![allow(unused)] fn main() { impl<N: ProviderNodeTypes> CanonChainTracker for BlockchainProvider<N> { type Header = HeaderTy<N>; fn on_forkchoice_update_received(&self, _update: &ForkchoiceState) { // update timestamp self.canonical_in_memory_state.on_forkchoice_update_received(); } fn last_received_update_timestamp(&self) -> Option<Instant> { self.canonical_in_memory_state.last_received_update_timestamp() } fn set_canonical_head(&self, header: SealedHeader<Self::Header>) { self.canonical_in_memory_state.set_canonical_head(header); } fn set_safe(&self, header: SealedHeader<Self::Header>) { self.canonical_in_memory_state.set_safe(header); } fn set_finalized(&self, header: SealedHeader<Self::Header>) { self.canonical_in_memory_state.set_finalized(header); } } }
方法名 | 触发来源 | 作用描述 | 更新目标 | 场景示例 |
`on_forkchoice_update_received(&ForkchoiceState)` | 来自共识层的 forkchoiceUpdated 请求 | 记录本地最近一次接收到 forkchoiceUpdated 的时间戳 | `last_forkchoice_update_timestamp`(在内存中) | 收到共识客户端的链状态更新通知时 |
`last_received_update_timestamp()` | 查询方法 | 获取上一次 forkchoiceUpdated 的时间戳 | — | 提供给监控系统或判断节点是否活跃 |
`set_canonical_head(header)` | 共识层通知、执行成功后调用 | 设置新的链头(head block) | `head` 区块(包括 hash、高度、timestamp) | 出块成功、同步完成、切换链分支时 |
`set_safe(header)` | 共识层 forkchoice 更新 | 设置“安全区块” safe block,不能轻易被回滚 | `safe` 区块信息 | 执行层配合共识做状态保存、避免不必要的回滚 |
`set_finalized(header)` | 共识层 forkchoice 更新 | 设置“最终确定”的 finalized block,表示不可再被回滚 | `finalized` 区块信息 | 数据归档触发、状态持久化、RPC 提供稳定数据源 |
持久化层如何工作?
ProviderFactory
对外提供了哪些方法?
Reth 的架构鼓励通过 trait 来抽象每一种数据访问行为。ProviderFactory
实现了如下 trait 接口:
Trait | 方法示例 | 用途 |
`HeaderProvider` | `header_by_number()` | 读取区块头 |
`BlockReader` | `block(id)` | 查询区块体 |
`TransactionsProvider` | `transaction_by_hash()` | 查询交易内容 |
`ReceiptProvider` | `receipts_by_block()` | 获取交易回执 |
`BlockNumReader` | `best_block_number()` | 获取链上高度信息 |
`PruneCheckpointReader` | `get_prune_checkpoint()` | 查询数据修剪进度 |
`StaticFileProviderFactory` | `static_file_provider()` | 获取静态文件提供器 |
所有 trait 的背后实际调用的是 DatabaseProvider
对象,该对象则由 ProviderFactory
管理并创建。
典型工作场景
数据库层如何工作:数据分层
#![allow(unused)] fn main() { /// A provider struct that fetches data from the database. /// Wrapper around [`DbTx`] and [`DbTxMut`]. Example: [`HeaderProvider`] [`BlockHashReader`] #[derive(Debug)] pub struct DatabaseProvider<TX, N: NodeTypes> { /// Database transaction. tx: TX, /// Chain spec chain_spec: Arc<N::ChainSpec>, /// Static File provider static_file_provider: StaticFileProvider<N::Primitives>, /// Pruning configuration prune_modes: PruneModes, /// Node storage handler. storage: Arc<N::Storage>, } }
- tx 提供数据库访问
- static_file_provider 提供静态文件访问
#![allow(unused)] fn main() { impl<TX: DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> { fn transactions_by_tx_range_with_cursor<C>( &self, range: impl RangeBounds<TxNumber>, cursor: &mut C, ) -> ProviderResult<Vec<TxTy<N>>> where C: DbCursorRO<tables::Transactions<TxTy<N>>>, { self.static_file_provider.get_range_with_static_file_or_database( StaticFileSegment::Transactions, to_range(range), |static_file, range, _| static_file.transactions_by_tx_range(range), |range, _| self.cursor_collect(cursor, range), |_| true, ) } }
- get_range_with_static_file_or_database
#![allow(unused)] fn main() { /// Gets data within a specified range, potentially spanning different `static_files` and_ /// database._ ///_ /// # Arguments_ /// * `segment` - The segment of the static file to query._ /// * `block_range` - The range of data to fetch._ /// * `fetch_from_static_file` - A function to fetch data from the `static_file`._ /// * `fetch_from_database` - A function to fetch data from the database._ /// * `predicate` - A function used to evaluate each item in the fetched data. Fetching is_ /// terminated when this function returns false, thereby filtering the data based on the_ /// provided condition._ pub fn get_range_with_static_file_or_database<T, P, FS, FD>( &self, segment: StaticFileSegment, mut block_or_tx_range: Range<u64>, fetch_from_static_file: FS, mut fetch_from_database: FD, mut predicate: P, ) -> ProviderResult<Vec<T>> where FS: Fn(&Self, Range<u64>, &mut P) -> ProviderResult<Vec<T>>, FD: FnMut(Range<u64>, P) -> ProviderResult<Vec<T>>, P: FnMut(&T) -> bool, { let mut data = Vec::_new_(); // If there is, check the maximum block or transaction number of the segment. if let _Some_(static_file_upper_bound) = if segment.is_block_based() { self.get_highest_static_file_block(segment) } else { self.get_highest_static_file_tx(segment) } { if block_or_tx_range.start <= static_file_upper_bound { let end = block_or_tx_range.end.min(static_file_upper_bound + 1); data.extend(fetch_from_static_file( self, block_or_tx_range.start..end, &mut predicate, )?); block_or_tx_range.start = end; } } if block_or_tx_range.end > block_or_tx_range.start { data.extend(fetch_from_database(block_or_tx_range, predicate)?) } _Ok_(data) } }
核心业务逻辑
- 获取当前段的最大静态文件范围
#![allow(unused)] fn main() { let static_file_upper_bound = if segment.is_block_based() { self.get_highest_static_file_block(segment) } else { self.get_highest_static_file_tx(segment) }; }
segment
可以基于区块或交易,函数会调用相应的方法来获取当前静态文件的最大已保存边界(例如最大区块号或最大交易号)。
- 如果目标范围的一部分在静态文件中,就优先从静态文件读取
#![allow(unused)] fn main() { if block_or_tx_range.start <= static_file_upper_bound { let end = block_or_tx_range.end.min(static_file_upper_bound + 1); data.extend(fetch_from_static_file( self, block_or_tx_range.start..end, &mut predicate, )?); block_or_tx_range.start = end; } }
- 如果目标范围的起始值小于等于静态文件边界,说明这部分数据可以从静态文件中读取。
- 读取的实际区间是
[start, min(end, static_file_upper_bound + 1))
- 调用
fetch_from_static_file
,并传入可变谓词&mut predicate
用于过滤。 - 然后更新
block_or_tx_range.start = end
,只保留未处理的区间。
- 对于剩下的区间(即静态文件中没有的),从数据库读取
#![allow(unused)] fn main() { if block_or_tx_range.end > block_or_tx_range.start { data.extend(fetch_from_database(block_or_tx_range, predicate)?) } }
- 如果仍有未处理的区间(即
end > start
),则调用fetch_from_database
获取数据。
- 返回所有结果
#![allow(unused)] fn main() { Ok(data) }