Reth 存储层源码走读
Reth 是什么?
Reth is an Execution Layer (EL) and is compatible with all Ethereum Consensus Layer (CL) implementations that support the Engine API.
在以太坊 The Merge 之后,节点分成了两个部分,其中部分负责共识,另部分负责状态维护。
graph LR subgraph 共识层 CL cl1[Slot / Epoch 驱动] cl2[区块提议 / 投票] cl3[Finality 决定] cl4[Beacon 链同步] end subgraph 执行层 EL el1[交易执行(EVM)] el2[状态更新(State)] el3[交易池管理] el4[账户 / 区块存储] end cl1 -.-> API[Engine API] cl2 -.-> API cl3 -.-> API cl4 -.-> API API -.-> el1 API -.-> el2 API -.-> el3 API -.-> el4
以太坊的状态
-
区块
- 区块头
- 区块体
-
链
- 父叔
- 高度
-
账户状态 [MPT]
- balance
共识层通过 EngineAPI 调用执行层,那么 Engine API 有哪些语义?
方法名 | 用途 | 场景 |
engine_newPayloadVx | 提交一个新区块给执行层 | 共识层接收到区块头(比如来自其他节点) |
engine_forkchoiceUpdatedVx | 告诉执行层哪个区块是链头 | 切换链头、出块前、同步时 |
engine_getPayloadVx | 要求执行层构造一个新区块 | 共识层要提议新区块(出块)时 |
engine_exchangeTransitionConfiguration | 启动时交换合并配置 | 共识客户端刚启动时 |
engine_getPayloadBodies - ByHash - ByRange | 请求区块体(交易列表) | 共识节点快速同步时 |
为了实现 Engine API 的语义,执行层要有哪些能力?
方法名 | 数据 | 能力 |
engine_newPayloadVx | - 链头 - 账户状态 | - 获取当前链头 - 通过账户ID获取账户状态 - 记录临时状态 |
engine_forkchoiceUpdatedVx | - 历史某一个区块 | - 更新链的头部状态 [最高的链] - 快速切换/回滚到不同的链 - 维护 finalized/safe block |
engine_getPayloadVx | - 链头 - 账户状态 | - 构建新区块 - 获取当前链头 - 快速生成 block header/body |
engine_exchangeTransitionConfiguration | - 配置项 | |
engine_getPayloadBodies - ByHash - ByRange | - 链头 - 链高度 - 链哈希 | - 通过哈希定位一个块 - 通过高度范围定位一系列块 |
Reth [执行层] = 逻辑层 + 存储层,存储层给逻辑层暴露了哪些能力?怎么样暴露的?
怎么样暴露的 ?
- Reth 中 BlockchainProvider 作为整个存储层的门户。
暴露了哪些能力 ?
- 实现了 数据结构的 Trait + 链状态的 Trait 【门面模式】,将存储的能力暴露给逻辑层。
- Header
- Block
- Receipts
- State
- Latest
- by_block_num
- by_block_hash
#![allow(unused)] fn main() { crates/storage/provider/src/providers/blockchain_provider.rs /// The main type for interacting with the blockchain. /// /// This type serves as the main entry point for interacting with the blockchain and provides data /// from database storage and from the blockchain tree (pending state etc.) It is a simple wrapper /// type that holds an instance of the database and the blockchain tree. #[derive(Debug)] pub struct BlockchainProvider<N: NodeTypesWithDB> { /// Provider factory used to access the database. pub(crate) database: ProviderFactory<N>, /// Tracks the chain info wrt forkchoice updates and in memory canonical /// state. pub(crate) canonical_in_memory_state: CanonicalInMemoryState<N::Primitives>, } }
字段 | 类型 | 作用 | 服务对象 |
database | ProviderFactory | 持久化层的访问接口 - mdbx - static_file | 历史数据访问 |
canonical_in_memory_state | CanonicalInMemoryState | 管理和缓存当前主链的内存状态,包括: - 当前的 pending block - 当前的 head block - 当前的 finalized block - 当前的 safe block | - 验证区块 - 更新链头 - 产生区块 |
内存中管理的 Block
名称 | 定义 | 特性 |
Head Block | 当前 forkchoice 算法选出的主链 tip 区块,是执行引擎所“跟随”的最新区块 | - 最前沿/高的合法块 - 可能被重组 - 实时 |
Safe Block | 被所有 honest validator 知晓、投票支持的区块,虽然未 final,但几乎不会 reorg | - 大概率不会被重组 - 尚未最终确定 |
Finalized Block | 被 >2/3 超级多数 validator 通过 Casper FFG 投票最终确定的区块 | - 永不可重组 - 状态可裁剪 - 滞后几个 epoch |
Pending Block | 还未被正式打包并广播到区块链上的下一个候选区块,由以太坊节点本地构建,包含已知合法但尚未上链的交易。 | - 未上链 - 可能被丢弃 - 模拟 |
通过 canonical_in_memory_state 访问了哪些接口
BlockNumReader
#![allow(unused)] fn main() { impl<N: ProviderNodeTypes> BlockNumReader for BlockchainProvider<N> { fn chain_info(&self) -> ProviderResult<ChainInfo> { Ok(self.canonical_in_memory_state.chain_info()) } fn best_block_number(&self) -> ProviderResult<BlockNumber> { Ok(self.canonical_in_memory_state.get_canonical_block_number()) } fn last_block_number(&self) -> ProviderResult<BlockNumber> { self.database.last_block_number() } fn block_number(&self, hash: B256) -> ProviderResult<Option<BlockNumber>> { self.consistent_provider()?.block_number(hash) } } }
方法名 | 数据源 | 原因 |
`chain_info()` | `canonical_in_memory_state` | 主链状态缓存,可快速返回 |
`best_block_number()` | `canonical_in_memory_state` | 最新同步高度,在 forkchoice 更新时写入 |
`last_block_number()` | `database` | 实际持久化层中最后一个区块号 |
`block_number(hash)` | `consistent_provider`(数据库) | 反查,没有维护在内存里,只能查表 |
BlockIdReader
#![allow(unused)] fn main() { impl<N: ProviderNodeTypes> BlockIdReader for BlockchainProvider<N> { fn pending_block_num_hash(&self) -> ProviderResult<Option<BlockNumHash>> { Ok(self.canonical_in_memory_state.pending_block_num_hash()) } fn safe_block_num_hash(&self) -> ProviderResult<Option<BlockNumHash>> { Ok(self.canonical_in_memory_state.get_safe_num_hash()) } fn finalized_block_num_hash(&self) -> ProviderResult<Option<BlockNumHash>> { Ok(self.canonical_in_memory_state.get_finalized_num_hash()) } } }
状态类型 | 含义 | 更新频率 | 使用场景 |
`pending` | 正在构建但未打包的区块(如打包交易) | 非常频繁 | 出块者构造区块时使用 |
`safe` | 被大多数节点认为“短期内不会被重组”的区块 | 同步期间频繁更新 | 提供相对稳定视图 |
`finalized` | 绝大多数网络认为“永不回滚”的区块 | 较稳定,偶尔更新 | 对外暴露、存档数据 |
BlockReader
#![allow(unused)] fn main() { impl<N: ProviderNodeTypes> BlockReader for BlockchainProvider<N> { type Block = BlockTy<N>; fn find_block_by_hash( &self, hash: B256, source: BlockSource, ) -> ProviderResult<Option<Self::Block>> { self.consistent_provider()?.find_block_by_hash(hash, source) } fn block(&self, id: BlockHashOrNumber) -> ProviderResult<Option<Self::Block>> { self.consistent_provider()?.block(id) } fn pending_block(&self) -> ProviderResult<Option<SealedBlock<Self::Block>>> { Ok(self.canonical_in_memory_state.pending_block()) } fn pending_block_with_senders(&self) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> { Ok(self.canonical_in_memory_state.pending_recovered_block()) } fn pending_block_and_receipts( &self, ) -> ProviderResult<Option<(SealedBlock<Self::Block>, Vec<Self::Receipt>)>> { Ok(self.canonical_in_memory_state.pending_block_and_receipts()) } }
方法名 | 数据来源 | 用途/功能 | 典型使用场景 |
`find_block_by_hash(hash, src)` | database | 查找任意区块 | 同步校验 历史查询 |
`block(id)` | database | 获取指定高度或哈希的区块 | JSON-RPC 接口 执行层读取历史状态 |
`pending_block()` | canonical_in_memory_state | 获取当前内存中的 pending 区块 | 构造新区块、 RPC 返回“pending”数据 |
`pending_block_with_senders()` | canonical_in_memory_state | 获取 pending 区块,并附带交易的 sender 信息 | 区块分析 前端展示发送方地址 |
`pending_block_and_receipts()` | canonical_in_memory_state | 获取 pending 区块及其 receipts(模拟执行) | 提前分析 Gas 消耗 出块准备 调试用 |
StateProviderFactory
#![allow(unused)] fn main() { impl<N: ProviderNodeTypes> StateProviderFactory for BlockchainProvider<N> { /// Storage provider for latest block fn latest(&self) -> ProviderResult<StateProviderBox> { trace!(target: "providers::blockchain", "Getting latest block state provider"); // use latest state provider if the head state exists if let Some(state) = self.canonical_in_memory_state.head_state() { trace!(target: "providers::blockchain", "Using head state for latest state provider"); Ok(self.block_state_provider(&state)?.boxed()) } else { trace!(target: "providers::blockchain", "Using database state for latest state provider"); self.database.latest() } /// Returns the state provider for pending state. /// /// If there's no pending block available then the latest state provider is returned: /// [`Self::latest`] fn pending(&self) -> ProviderResult<StateProviderBox> { trace!(target: "providers::blockchain", "Getting provider for pending state"); if let Some(pending) = self.canonical_in_memory_state.pending_state() { // we have a pending block return Ok(Box::new(self.block_state_provider(&pending)?)); } // fallback to latest state if the pending block is not available self.latest() } fn pending_state_by_hash(&self, block_hash: B256) -> ProviderResult<Option<StateProviderBox>> { if let Some(pending) = self.canonical_in_memory_state.pending_state() { if pending.hash() == block_hash { return Ok(Some(Box::new(self.block_state_provider(&pending)?))); } } Ok(None) } } }
方法名 | 数据来源 | 用途/功能 | 典型使用场景 |
`latest()` | 优先内存`head_state()` 否则从数据库 | 返回当前最新链头区块对应的状态提供器(用于访问该区块的账户/存储状态) | 查询 `head` 区块的状态(比如 RPC、执行层模拟执行) |
`pending()` | 优先内存 `pending_state()` 否则 `latest()` | 返回当前构建中(未打包)区块的状态提供器,用于模拟交易、出块构建等 | 出块构建器在构造 `pending block` 时,访问预执行状态 |
`pending_state_by_hash(h)` | 内存中 `pending_state()` 匹配 hash | 根据区块 hash 判断是否为当前 pending 状态,如果是,则返回对应状态提供器 | 共识层校验传入的 block 是否与本地 `pending` 一致,用于状态一致性检测等用途 |
CanonChainTracker
#![allow(unused)] fn main() { impl<N: ProviderNodeTypes> CanonChainTracker for BlockchainProvider<N> { type Header = HeaderTy<N>; fn on_forkchoice_update_received(&self, _update: &ForkchoiceState) { // update timestamp self.canonical_in_memory_state.on_forkchoice_update_received(); } fn last_received_update_timestamp(&self) -> Option<Instant> { self.canonical_in_memory_state.last_received_update_timestamp() } fn set_canonical_head(&self, header: SealedHeader<Self::Header>) { self.canonical_in_memory_state.set_canonical_head(header); } fn set_safe(&self, header: SealedHeader<Self::Header>) { self.canonical_in_memory_state.set_safe(header); } fn set_finalized(&self, header: SealedHeader<Self::Header>) { self.canonical_in_memory_state.set_finalized(header); } } }
方法名 | 触发来源 | 作用描述 | 更新目标 | 场景示例 |
`on_forkchoice_update_received(&ForkchoiceState)` | 来自共识层的 forkchoiceUpdated 请求 | 记录本地最近一次接收到 forkchoiceUpdated 的时间戳 | `last_forkchoice_update_timestamp`(在内存中) | 收到共识客户端的链状态更新通知时 |
`last_received_update_timestamp()` | 查询方法 | 获取上一次 forkchoiceUpdated 的时间戳 | — | 提供给监控系统或判断节点是否活跃 |
`set_canonical_head(header)` | 共识层通知、执行成功后调用 | 设置新的链头(head block) | `head` 区块(包括 hash、高度、timestamp) | 出块成功、同步完成、切换链分支时 |
`set_safe(header)` | 共识层 forkchoice 更新 | 设置“安全区块” safe block,不能轻易被回滚 | `safe` 区块信息 | 执行层配合共识做状态保存、避免不必要的回滚 |
`set_finalized(header)` | 共识层 forkchoice 更新 | 设置“最终确定”的 finalized block,表示不可再被回滚 | `finalized` 区块信息 | 数据归档触发、状态持久化、RPC 提供稳定数据源 |
持久化层如何工作?
ProviderFactory
对外提供了哪些方法?
Reth 的架构鼓励通过 trait 来抽象每一种数据访问行为。ProviderFactory
实现了如下 trait 接口:
Trait | 方法示例 | 用途 |
`HeaderProvider` | `header_by_number()` | 读取区块头 |
`BlockReader` | `block(id)` | 查询区块体 |
`TransactionsProvider` | `transaction_by_hash()` | 查询交易内容 |
`ReceiptProvider` | `receipts_by_block()` | 获取交易回执 |
`BlockNumReader` | `best_block_number()` | 获取链上高度信息 |
`PruneCheckpointReader` | `get_prune_checkpoint()` | 查询数据修剪进度 |
`StaticFileProviderFactory` | `static_file_provider()` | 获取静态文件提供器 |
所有 trait 的背后实际调用的是 DatabaseProvider
对象,该对象则由 ProviderFactory
管理并创建。
典型工作场景
数据库层如何工作:数据分层
#![allow(unused)] fn main() { /// A provider struct that fetches data from the database. /// Wrapper around [`DbTx`] and [`DbTxMut`]. Example: [`HeaderProvider`] [`BlockHashReader`] #[derive(Debug)] pub struct DatabaseProvider<TX, N: NodeTypes> { /// Database transaction. tx: TX, /// Chain spec chain_spec: Arc<N::ChainSpec>, /// Static File provider static_file_provider: StaticFileProvider<N::Primitives>, /// Pruning configuration prune_modes: PruneModes, /// Node storage handler. storage: Arc<N::Storage>, } }
- tx 提供数据库访问
- static_file_provider 提供静态文件访问
#![allow(unused)] fn main() { impl<TX: DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> { fn transactions_by_tx_range_with_cursor<C>( &self, range: impl RangeBounds<TxNumber>, cursor: &mut C, ) -> ProviderResult<Vec<TxTy<N>>> where C: DbCursorRO<tables::Transactions<TxTy<N>>>, { self.static_file_provider.get_range_with_static_file_or_database( StaticFileSegment::Transactions, to_range(range), |static_file, range, _| static_file.transactions_by_tx_range(range), |range, _| self.cursor_collect(cursor, range), |_| true, ) } }
- get_range_with_static_file_or_database
#![allow(unused)] fn main() { /// Gets data within a specified range, potentially spanning different `static_files` and_ /// database._ ///_ /// # Arguments_ /// * `segment` - The segment of the static file to query._ /// * `block_range` - The range of data to fetch._ /// * `fetch_from_static_file` - A function to fetch data from the `static_file`._ /// * `fetch_from_database` - A function to fetch data from the database._ /// * `predicate` - A function used to evaluate each item in the fetched data. Fetching is_ /// terminated when this function returns false, thereby filtering the data based on the_ /// provided condition._ pub fn get_range_with_static_file_or_database<T, P, FS, FD>( &self, segment: StaticFileSegment, mut block_or_tx_range: Range<u64>, fetch_from_static_file: FS, mut fetch_from_database: FD, mut predicate: P, ) -> ProviderResult<Vec<T>> where FS: Fn(&Self, Range<u64>, &mut P) -> ProviderResult<Vec<T>>, FD: FnMut(Range<u64>, P) -> ProviderResult<Vec<T>>, P: FnMut(&T) -> bool, { let mut data = Vec::_new_(); // If there is, check the maximum block or transaction number of the segment. if let _Some_(static_file_upper_bound) = if segment.is_block_based() { self.get_highest_static_file_block(segment) } else { self.get_highest_static_file_tx(segment) } { if block_or_tx_range.start <= static_file_upper_bound { let end = block_or_tx_range.end.min(static_file_upper_bound + 1); data.extend(fetch_from_static_file( self, block_or_tx_range.start..end, &mut predicate, )?); block_or_tx_range.start = end; } } if block_or_tx_range.end > block_or_tx_range.start { data.extend(fetch_from_database(block_or_tx_range, predicate)?) } _Ok_(data) } }
核心业务逻辑
- 获取当前段的最大静态文件范围
#![allow(unused)] fn main() { let static_file_upper_bound = if segment.is_block_based() { self.get_highest_static_file_block(segment) } else { self.get_highest_static_file_tx(segment) }; }
segment
可以基于区块或交易,函数会调用相应的方法来获取当前静态文件的最大已保存边界(例如最大区块号或最大交易号)。
- 如果目标范围的一部分在静态文件中,就优先从静态文件读取
#![allow(unused)] fn main() { if block_or_tx_range.start <= static_file_upper_bound { let end = block_or_tx_range.end.min(static_file_upper_bound + 1); data.extend(fetch_from_static_file( self, block_or_tx_range.start..end, &mut predicate, )?); block_or_tx_range.start = end; } }
- 如果目标范围的起始值小于等于静态文件边界,说明这部分数据可以从静态文件中读取。
- 读取的实际区间是
[start, min(end, static_file_upper_bound + 1))
- 调用
fetch_from_static_file
,并传入可变谓词&mut predicate
用于过滤。 - 然后更新
block_or_tx_range.start = end
,只保留未处理的区间。
- 对于剩下的区间(即静态文件中没有的),从数据库读取
#![allow(unused)] fn main() { if block_or_tx_range.end > block_or_tx_range.start { data.extend(fetch_from_database(block_or_tx_range, predicate)?) } }
- 如果仍有未处理的区间(即
end > start
),则调用fetch_from_database
获取数据。
- 返回所有结果
#![allow(unused)] fn main() { Ok(data) }