Reth 存储层源码走读

Reth 是什么?

Reth is an Execution Layer (EL) and is compatible with all Ethereum Consensus Layer (CL) implementations that support the Engine API.

在以太坊 The Merge 之后,节点分成了两个部分,其中部分负责共识,另部分负责状态维护。

graph LR
    subgraph 共识层 CL
        cl1[Slot / Epoch 驱动]
        cl2[区块提议 / 投票]
        cl3[Finality 决定]
        cl4[Beacon 链同步]
    end

    subgraph 执行层 EL
        el1[交易执行(EVM)]
        el2[状态更新(State)]
        el3[交易池管理]
        el4[账户 / 区块存储]
    end

    cl1 -.-> API[Engine API]
    cl2 -.-> API
    cl3 -.-> API
    cl4 -.-> API
    API -.-> el1
    API -.-> el2
    API -.-> el3
    API -.-> el4


以太坊的状态

  • 区块

    • 区块头
    • 区块体
    • 父叔
    • 高度
  • 账户状态 [MPT]

    • balance

共识层通过 EngineAPI 调用执行层,那么 Engine API 有哪些语义?

方法名
用途
场景
engine_newPayloadVx
提交一个新区块给执行层
共识层接收到区块头(比如来自其他节点)
engine_forkchoiceUpdatedVx
告诉执行层哪个区块是链头
切换链头、出块前、同步时
engine_getPayloadVx
要求执行层构造一个新区块
共识层要提议新区块(出块)时
engine_exchangeTransitionConfiguration
启动时交换合并配置

共识客户端刚启动时
engine_getPayloadBodies
- ByHash
- ByRange
请求区块体(交易列表)

共识节点快速同步时

为了实现 Engine API 的语义,执行层要有哪些能力?

方法名
数据
能力
engine_newPayloadVx
- 链头
- 账户状态
- 获取当前链头
- 通过账户ID获取账户状态
- 记录临时状态
engine_forkchoiceUpdatedVx
- 历史某一个区块
- 更新链的头部状态 [最高的链]
- 快速切换/回滚到不同的链
- 维护 finalized/safe block
engine_getPayloadVx
- 链头
- 账户状态
- 构建新区块
- 获取当前链头
- 快速生成 block header/body
engine_exchangeTransitionConfiguration
- 配置项


engine_getPayloadBodies
- ByHash
- ByRange
- 链头
- 链高度
- 链哈希
- 通过哈希定位一个块
- 通过高度范围定位一系列块

Reth [执行层] = 逻辑层 + 存储层,存储层给逻辑层暴露了哪些能力?怎么样暴露的?

怎么样暴露的 ?

  • Reth 中 BlockchainProvider 作为整个存储层的门户。

暴露了哪些能力 ?

  • 实现了 数据结构的 Trait + 链状态的 Trait 【门面模式】,将存储的能力暴露给逻辑层。
    • Header
    • Block
    • Receipts
    • State
      • Latest
      • by_block_num
      • by_block_hash
#![allow(unused)]
fn main() {
crates/storage/provider/src/providers/blockchain_provider.rs

/// The main type for interacting with the blockchain.
///
/// This type serves as the main entry point for interacting with the blockchain and provides data
/// from database storage and from the blockchain tree (pending state etc.) It is a simple wrapper
/// type that holds an instance of the database and the blockchain tree.
#[derive(Debug)]
pub struct BlockchainProvider<N: NodeTypesWithDB> {
    /// Provider factory used to access the database.
    pub(crate) database: ProviderFactory<N>,
    /// Tracks the chain info wrt forkchoice updates and in memory canonical
    /// state.
    pub(crate) canonical_in_memory_state: CanonicalInMemoryState<N::Primitives>,
}
}
字段
类型
作用
服务对象
database

ProviderFactory
持久化层的访问接口
- mdbx
- static_file
历史数据访问
canonical_in_memory_state
CanonicalInMemoryState
管理和缓存当前主链的内存状态,包括:
- 当前的 pending block
- 当前的 head block
- 当前的 finalized block
- 当前的 safe block
- 验证区块
- 更新链头
- 产生区块

内存中管理的 Block

名称
定义
特性
Head Block
当前 forkchoice 算法选出的主链 tip 区块,是执行引擎所“跟随”的最新区块
- 最前沿/高的合法块
- 可能被重组
- 实时
Safe Block
被所有 honest validator 知晓、投票支持的区块,虽然未 final,但几乎不会 reorg
- 大概率不会被重组
- 尚未最终确定
Finalized Block

被 >2/3 超级多数 validator 通过 Casper FFG 投票最终确定的区块
- 永不可重组
- 状态可裁剪
- 滞后几个 epoch
Pending Block

还未被正式打包并广播到区块链上的下一个候选区块,由以太坊节点本地构建,包含已知合法但尚未上链的交易。
- 未上链
- 可能被丢弃
- 模拟

通过 canonical_in_memory_state 访问了哪些接口

BlockNumReader

#![allow(unused)]
fn main() {
impl<N: ProviderNodeTypes> BlockNumReader for BlockchainProvider<N> {
    fn chain_info(&self) -> ProviderResult<ChainInfo> {
        Ok(self.canonical_in_memory_state.chain_info())
    }

    fn best_block_number(&self) -> ProviderResult<BlockNumber> {
        Ok(self.canonical_in_memory_state.get_canonical_block_number())
    }

    fn last_block_number(&self) -> ProviderResult<BlockNumber> {
        self.database.last_block_number()
    }

    fn block_number(&self, hash: B256) -> ProviderResult<Option<BlockNumber>> {
        self.consistent_provider()?.block_number(hash)
    }
}
}
方法名
数据源
原因
`chain_info()`
`canonical_in_memory_state`
主链状态缓存,可快速返回
`best_block_number()`
`canonical_in_memory_state`
最新同步高度,在 forkchoice 更新时写入
`last_block_number()`
`database`
实际持久化层中最后一个区块号
`block_number(hash)`
`consistent_provider`(数据库)
反查,没有维护在内存里,只能查表

BlockIdReader

#![allow(unused)]
fn main() {
impl<N: ProviderNodeTypes> BlockIdReader for BlockchainProvider<N> {
    fn pending_block_num_hash(&self) -> ProviderResult<Option<BlockNumHash>> {
        Ok(self.canonical_in_memory_state.pending_block_num_hash())
    }

    fn safe_block_num_hash(&self) -> ProviderResult<Option<BlockNumHash>> {
        Ok(self.canonical_in_memory_state.get_safe_num_hash())
    }

    fn finalized_block_num_hash(&self) -> ProviderResult<Option<BlockNumHash>> {
        Ok(self.canonical_in_memory_state.get_finalized_num_hash())
    }
}
}
状态类型
含义
更新频率
使用场景
`pending`
正在构建但未打包的区块(如打包交易)
非常频繁
出块者构造区块时使用
`safe`
被大多数节点认为“短期内不会被重组”的区块
同步期间频繁更新
提供相对稳定视图
`finalized`
绝大多数网络认为“永不回滚”的区块
较稳定,偶尔更新
对外暴露、存档数据

BlockReader

#![allow(unused)]
fn main() {
impl<N: ProviderNodeTypes> BlockReader for BlockchainProvider<N> {
    type Block = BlockTy<N>;

    fn find_block_by_hash(
        &self,
        hash: B256,
        source: BlockSource,
    ) -> ProviderResult<Option<Self::Block>> {
        self.consistent_provider()?.find_block_by_hash(hash, source)
    }

    fn block(&self, id: BlockHashOrNumber) -> ProviderResult<Option<Self::Block>> {
        self.consistent_provider()?.block(id)
    }

    fn pending_block(&self) -> ProviderResult<Option<SealedBlock<Self::Block>>> {
        Ok(self.canonical_in_memory_state.pending_block())
    }

    fn pending_block_with_senders(&self) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
        Ok(self.canonical_in_memory_state.pending_recovered_block())
    }

    fn pending_block_and_receipts(
        &self,
    ) -> ProviderResult<Option<(SealedBlock<Self::Block>, Vec<Self::Receipt>)>> {
        Ok(self.canonical_in_memory_state.pending_block_and_receipts())
    }
}
方法名
数据来源
用途/功能
典型使用场景
`find_block_by_hash(hash, src)`
database
查找任意区块
同步校验
历史查询
`block(id)`
database
获取指定高度或哈希的区块
JSON-RPC 接口
执行层读取历史状态
`pending_block()`
canonical_in_memory_state
获取当前内存中的 pending 区块
构造新区块、
RPC 返回“pending”数据
`pending_block_with_senders()`
canonical_in_memory_state
获取 pending 区块,并附带交易的 sender 信息
区块分析
前端展示发送方地址
`pending_block_and_receipts()`
canonical_in_memory_state
获取 pending 区块及其 receipts(模拟执行)

提前分析 Gas 消耗
出块准备
调试用

StateProviderFactory

#![allow(unused)]
fn main() {
impl<N: ProviderNodeTypes> StateProviderFactory for BlockchainProvider<N> {
    /// Storage provider for latest block
    fn latest(&self) -> ProviderResult<StateProviderBox> {
        trace!(target: "providers::blockchain", "Getting latest block state provider");
        // use latest state provider if the head state exists
        if let Some(state) = self.canonical_in_memory_state.head_state() {
            trace!(target: "providers::blockchain", "Using head state for latest state provider");
            Ok(self.block_state_provider(&state)?.boxed())
        } else {
            trace!(target: "providers::blockchain", "Using database state for latest state provider");
            self.database.latest()
  

    }
    /// Returns the state provider for pending state.
    ///
    /// If there's no pending block available then the latest state provider is returned:
    /// [`Self::latest`]
    fn pending(&self) -> ProviderResult<StateProviderBox> {
        trace!(target: "providers::blockchain", "Getting provider for pending state");

        if let Some(pending) = self.canonical_in_memory_state.pending_state() {
            // we have a pending block
            return Ok(Box::new(self.block_state_provider(&pending)?));
        }

        // fallback to latest state if the pending block is not available
        self.latest()
    }

    fn pending_state_by_hash(&self, block_hash: B256) -> ProviderResult<Option<StateProviderBox>> {
        if let Some(pending) = self.canonical_in_memory_state.pending_state() {
            if pending.hash() == block_hash {
                return Ok(Some(Box::new(self.block_state_provider(&pending)?)));
            }
        }
        Ok(None)
    }
}
}
方法名
数据来源
用途/功能
典型使用场景
`latest()`
优先内存`head_state()`
否则从数据库
返回当前最新链头区块对应的状态提供器(用于访问该区块的账户/存储状态)
查询 `head` 区块的状态(比如 RPC、执行层模拟执行)
`pending()`
优先内存 `pending_state()`
否则 `latest()`
返回当前构建中(未打包)区块的状态提供器,用于模拟交易、出块构建等
出块构建器在构造 `pending block` 时,访问预执行状态
`pending_state_by_hash(h)`
内存中 `pending_state()` 匹配 hash
根据区块 hash 判断是否为当前 pending 状态,如果是,则返回对应状态提供器
共识层校验传入的 block 是否与本地 `pending` 一致,用于状态一致性检测等用途

CanonChainTracker

#![allow(unused)]
fn main() {
impl<N: ProviderNodeTypes> CanonChainTracker for BlockchainProvider<N> {
    type Header = HeaderTy<N>;

    fn on_forkchoice_update_received(&self, _update: &ForkchoiceState) {
        // update timestamp
        self.canonical_in_memory_state.on_forkchoice_update_received();
    }

    fn last_received_update_timestamp(&self) -> Option<Instant> {
        self.canonical_in_memory_state.last_received_update_timestamp()
    }

    fn set_canonical_head(&self, header: SealedHeader<Self::Header>) {
        self.canonical_in_memory_state.set_canonical_head(header);
    }

    fn set_safe(&self, header: SealedHeader<Self::Header>) {
        self.canonical_in_memory_state.set_safe(header);
    }

    fn set_finalized(&self, header: SealedHeader<Self::Header>) {
        self.canonical_in_memory_state.set_finalized(header);
    }
}
}
方法名
触发来源
作用描述
更新目标
场景示例
`on_forkchoice_update_received(&ForkchoiceState)`
来自共识层的 forkchoiceUpdated 请求
记录本地最近一次接收到 forkchoiceUpdated 的时间戳
`last_forkchoice_update_timestamp`(在内存中)
收到共识客户端的链状态更新通知时
`last_received_update_timestamp()`
查询方法
获取上一次 forkchoiceUpdated 的时间戳

提供给监控系统或判断节点是否活跃
`set_canonical_head(header)`
共识层通知、执行成功后调用
设置新的链头(head block)
`head` 区块(包括 hash、高度、timestamp)
出块成功、同步完成、切换链分支时
`set_safe(header)`
共识层 forkchoice 更新
设置“安全区块” safe block,不能轻易被回滚
`safe` 区块信息
执行层配合共识做状态保存、避免不必要的回滚
`set_finalized(header)`
共识层 forkchoice 更新
设置“最终确定”的 finalized block,表示不可再被回滚
`finalized` 区块信息
数据归档触发、状态持久化、RPC 提供稳定数据源

持久化层如何工作?

ProviderFactory 对外提供了哪些方法?

Reth 的架构鼓励通过 trait 来抽象每一种数据访问行为。ProviderFactory 实现了如下 trait 接口:

Trait
方法示例
用途
`HeaderProvider`
`header_by_number()`
读取区块头
`BlockReader`
`block(id)`
查询区块体
`TransactionsProvider`
`transaction_by_hash()`
查询交易内容
`ReceiptProvider`
`receipts_by_block()`
获取交易回执
`BlockNumReader`
`best_block_number()`
获取链上高度信息
`PruneCheckpointReader`
`get_prune_checkpoint()`
查询数据修剪进度
`StaticFileProviderFactory`
`static_file_provider()`
获取静态文件提供器

所有 trait 的背后实际调用的是 DatabaseProvider 对象,该对象则由 ProviderFactory 管理并创建。

典型工作场景

数据库层如何工作:数据分层

#![allow(unused)]
fn main() {
/// A provider struct that fetches data from the database.
/// Wrapper around [`DbTx`] and [`DbTxMut`]. Example: [`HeaderProvider`] [`BlockHashReader`]
#[derive(Debug)]
pub struct DatabaseProvider<TX, N: NodeTypes> {
    /// Database transaction.
    tx: TX,
    /// Chain spec
    chain_spec: Arc<N::ChainSpec>,
    /// Static File provider
    static_file_provider: StaticFileProvider<N::Primitives>,
    /// Pruning configuration
    prune_modes: PruneModes,
    /// Node storage handler.
    storage: Arc<N::Storage>,
}
}
  • tx 提供数据库访问
  • static_file_provider 提供静态文件访问
#![allow(unused)]
fn main() {
impl<TX: DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
    fn transactions_by_tx_range_with_cursor<C>(
        &self,
        range: impl RangeBounds<TxNumber>,
        cursor: &mut C,
    ) -> ProviderResult<Vec<TxTy<N>>>
    where
        C: DbCursorRO<tables::Transactions<TxTy<N>>>,
    {
        self.static_file_provider.get_range_with_static_file_or_database(
            StaticFileSegment::Transactions,
            to_range(range),
            |static_file, range, _| static_file.transactions_by_tx_range(range),
            |range, _| self.cursor_collect(cursor, range),
            |_| true,
        )
    }
}
  • get_range_with_static_file_or_database
#![allow(unused)]
fn main() {
/// Gets data within a specified range, potentially spanning different `static_files` and_
/// database._
///_
/// # Arguments_
/// * `segment` - The segment of the static file to query._
/// * `block_range` - The range of data to fetch._
/// * `fetch_from_static_file` - A function to fetch data from the `static_file`._
/// * `fetch_from_database` - A function to fetch data from the database._
/// * `predicate` - A function used to evaluate each item in the fetched data. Fetching is_
///   terminated when this function returns false, thereby filtering the data based on the_
///   provided condition._
pub fn get_range_with_static_file_or_database<T, P, FS, FD>(
    &self,
    segment: StaticFileSegment,
    mut block_or_tx_range: Range<u64>,
    fetch_from_static_file: FS,
    mut fetch_from_database: FD,
    mut predicate: P,
) -> ProviderResult<Vec<T>>
where
    FS: Fn(&Self, Range<u64>, &mut P) -> ProviderResult<Vec<T>>,
    FD: FnMut(Range<u64>, P) -> ProviderResult<Vec<T>>,
    P: FnMut(&T) -> bool,
{
    let mut data = Vec::_new_();

    // If there is, check the maximum block or transaction number of the segment.
    if let _Some_(static_file_upper_bound) = if segment.is_block_based() {
        self.get_highest_static_file_block(segment)
    } else {
        self.get_highest_static_file_tx(segment)
    } {
        if block_or_tx_range.start <= static_file_upper_bound {
            let end = block_or_tx_range.end.min(static_file_upper_bound + 1);
            data.extend(fetch_from_static_file(
                self,
                block_or_tx_range.start..end,
                &mut predicate,
            )?);
            block_or_tx_range.start = end;
        }
    }

    if block_or_tx_range.end > block_or_tx_range.start {
        data.extend(fetch_from_database(block_or_tx_range, predicate)?)
    }

    _Ok_(data)
}
}

核心业务逻辑

  1. 获取当前段的最大静态文件范围
#![allow(unused)]
fn main() {
let static_file_upper_bound = if segment.is_block_based() {
    self.get_highest_static_file_block(segment)
} else {
    self.get_highest_static_file_tx(segment)
};
}
  • segment 可以基于区块或交易,函数会调用相应的方法来获取当前静态文件的最大已保存边界(例如最大区块号或最大交易号)。
  1. 如果目标范围的一部分在静态文件中,就优先从静态文件读取
#![allow(unused)]
fn main() {
if block_or_tx_range.start <= static_file_upper_bound {
    let end = block_or_tx_range.end.min(static_file_upper_bound + 1);
    data.extend(fetch_from_static_file(
        self,
        block_or_tx_range.start..end,
        &mut predicate,
    )?);
    block_or_tx_range.start = end;
}
}
  • 如果目标范围的起始值小于等于静态文件边界,说明这部分数据可以从静态文件中读取。
  • 读取的实际区间是 [start, min(end, static_file_upper_bound + 1))
  • 调用 fetch_from_static_file,并传入可变谓词 &mut predicate 用于过滤。
  • 然后更新 block_or_tx_range.start = end,只保留未处理的区间。
  1. 对于剩下的区间(即静态文件中没有的),从数据库读取
#![allow(unused)]
fn main() {
if block_or_tx_range.end > block_or_tx_range.start {
    data.extend(fetch_from_database(block_or_tx_range, predicate)?)
}
}
  • 如果仍有未处理的区间(即 end > start),则调用 fetch_from_database 获取数据。
  1. 返回所有结果
#![allow(unused)]
fn main() {
Ok(data)
}