主页 > imtoken苹果地址 > 以太坊源码学习0x08矿工模块


imtoken苹果地址 2023-08-15 05:11:17

我们都知道,从比特币开始以太坊挖矿模块,我们把打包合法区块的节点称为Miner(矿工),这个过程叫做Mining(挖矿)。 这个比喻很贴切,因为比特币和以太坊的代币数量都是有限的,就像地球上的黄金储量一样,如果从金矿中挖出一点,储量就会少一些。

wiki对挖矿的描述这里不再赘述。 直接上黄龙开源吧! ! !


// Miner creates blocks and searches for proof-of-work values.
type Miner struct {
    // 事件锁
    mux *event.TypeMux
    // 真正干活的人
    worker *worker
    // 矿工地址
    coinbase common.Address
    // 表示正在挖矿的状态
    mining   int32
    // Backend对象,Backend是一个自定义接口封装了所有挖矿所需方法
    eth      Backend
    // 共识引擎 以太坊有两种共识引擎ethash和clique
    engine   consensus.Engine
    // 是否能够开始挖矿
    canStart    int32 // can start indicates whether we can start the mining operation
    // 同步后是否应该开始挖矿
    shouldStart int32 // should start indicates whether we should start after sync

Miner只是以太坊对外实现挖矿功能的一个公开类,真正干活的是矿工。 因此,继续深入了解工人结构。

// worker is the main object which takes care of applying messages to the new state
type worker struct {
    // 链的配置属性
    config *params.ChainConfig
    // 前面提到的共识引擎
    engine consensus.Engine
    // 同步锁
    mu sync.Mutex
    // update loop
    mux          *event.TypeMux
    txsCh        chan core.NewTxsEvent
    txsSub       event.Subscription
    chainHeadCh  chan core.ChainHeadEvent
    chainHeadSub event.Subscription
    chainSideCh  chan core.ChainSideEvent
    chainSideSub event.Subscription
    wg           sync.WaitGroup
    // Agent的map集合
    agents map[Agent]struct{}
    recv   chan *Result
    eth     Backend
    chain   *core.BlockChain
    proc    core.Validator
    chainDb ethdb.Database
    coinbase common.Address
    extra    []byte
    currentMu sync.Mutex
    current   *Work
    snapshotMu    sync.RWMutex
    snapshotBlock *types.Block
    snapshotState *state.StateDB
    uncleMu        sync.Mutex
    possibleUncles map[common.Hash]*types.Block
    // 本地挖出的有待确认的区块
    unconfirmed *unconfirmedBlocks // set of locally mined blocks pending canonicalness confirmations
    // atomic status counters
    mining int32
    atWork int32

这里,我们重点关注一下params.ChainConfig的结构。 顾名思义,就是链的配置属性,针对以太坊的历史问题,定义了一些相关的配置。

// ChainConfig is stored in the database on a per block basis. This means
// that any network, identified by its genesis block, can have its own
// set of configuration options.
type ChainConfig struct {
    // 标识当前链,主键唯一id 也用来防止replay attack重放攻击
    ChainID *big.Int `json:"chainId"` // chainId identifies the current chain and is used for replay protection
    // 以太坊发展蓝图中的一个阶段,当前阶段为Homestead
    // 第一阶段为以太坊面世代号frontier,第二阶段为Homestead即当前阶段
    // 第三阶段为Metropolis(大都会),Metropolis又分为Byzantium(拜占庭硬分叉,引入新型零知识证明算法和pos共识),
    // 然后是constantinople(君士坦丁堡硬分叉,eth正是应用pow和pos混合链)
    // 第四阶段为Serenity(宁静),最终稳定版的以太坊
    HomesteadBlock *big.Int `json:"homesteadBlock,omitempty"` // Homestead switch block (nil = no fork, 0 = already homestead)
    // TheDao硬分叉切换,2017年6月18日应对DAO危机做出的调整
    DAOForkBlock   *big.Int `json:"daoForkBlock,omitempty"`   // TheDAO hard-fork switch block (nil = no fork)
    // 节点是否支持TheDao硬分叉
    DAOForkSupport bool     `json:"daoForkSupport,omitempty"` // Whether the nodes supports or opposes the DAO hard-fork
    // EIP150 implements the Gas price changes (https://github.com/ethereum/EIPs/issues/150)
    // eth改善方案硬分叉  没有硬分叉的置0


EIP150Block *big.Int `json:"eip150Block,omitempty"` // EIP150 HF block (nil = no fork) EIP150Hash common.Hash `json:"eip150Hash,omitempty"` // EIP150 HF hash (needed for header only clients as only gas pricing changed) EIP155Block *big.Int `json:"eip155Block,omitempty"` // EIP155 HF block EIP158Block *big.Int `json:"eip158Block,omitempty"` // EIP158 HF block ByzantiumBlock *big.Int `json:"byzantiumBlock,omitempty"` // Byzantium switch block (nil = no fork, 0 = already on byzantium) ConstantinopleBlock *big.Int `json:"constantinopleBlock,omitempty"` // Constantinople switch block (nil = no fork, 0 = already activated) // Various consensus engines Ethash *EthashConfig `json:"ethash,omitempty"` Clique *CliqueConfig `json:"clique,omitempty"` }

然后worker中有一个agent代理,他们的关系应该是这样的。 一个矿工有一个工人,一个工人同时有多个代理。 这里单个代理可以完成单个区块的挖矿,矿工的多个代理之间应该存在竞争关系。

// Agent can register themself with the worker
type Agent interface {
    Work() chan<- *Work
    SetReturnCh(chan<- *Result)
    GetHashRate() int64


// Work is the workers current environment and holds
// all of the current state information
type Work struct {
    // 链的配置属性
    config *params.ChainConfig
    signer types.Signer
    // 数据库状态
    state     *state.StateDB // apply state changes here
    // 祖先集,用来验证叔父块有效性
    ancestors *set.Set       // ancestor set (used for checking uncle parent validity)
    // 家庭集,用来验证叔块无效
    family    *set.Set       // family set (used for checking uncle invalidity)
    // 叔块集
    uncles    *set.Set       // uncle set
    // 交易量
    tcount    int            // tx count in cycle
    // 用于打包交易的可用天然气
    gasPool   *core.GasPool  // available gas used to pack transactions
    // 新区快
    Block *types.Block // the new block
    // 区块头
    header   *types.Header
    txs      []*types.Transaction
    receipts []*types.Receipt
    createdAt time.Time

这里有两个实现代理接口的类,分别是CpuAgent和RemoteAgent。 CpuAgent使用cpu进行挖矿操作,RemoteAgent用于远程挖矿,它提供了一套RPC接口来实现远程矿机进行挖矿的功能。

type CpuAgent struct {
    // 同步锁
    mu sync.Mutex
    // work通道
    workCh        chan *Work
    // 结构体通道对象
    stop          chan struct{}
    quitCurrentOp chan struct{}
    // Result指针通道
    returnCh      chan<- *Result
    // 共识引擎
    chain  consensus.ChainReader
    engine consensus.Engine
    // 当前agent是否在挖矿
    isMining int32 // isMining indicates whether the agent is currently mining
type RemoteAgent struct {
    mu sync.Mutex
    quitCh   chan struct{}
    workCh   chan *Work
    returnCh chan<- *Result
    chain       consensus.ChainReader
    engine      consensus.Engine
    currentWork *Work
    work        map[common.Hash]*Work
    hashrateMu sync.RWMutex
    hashrate   map[common.Hash]hashrate
    running int32 // running indicates whether the agent is active. Call atomically




// 创建miner对象
func New(eth Backend, config *params.ChainConfig, mux *event.TypeMux, engine consensus.Engine) *Miner {
    miner := &Miner{
        eth:      eth,
        mux:      mux,
        engine:   engine,
        // 创建一个worker
        worker:   newWorker(config, engine, common.Address{}, eth, mux),
        canStart: 1,
    // 注册newCpuAgent对象
    miner.Register(NewCpuAgent(eth.BlockChain(), engine))
    go miner.update()
    return miner


// 为miner创建worker
func newWorker(config *params.ChainConfig, engine consensus.Engine, coinbase common.Address, eth Backend, mux *event.TypeMux) *worker {
    worker := &worker{
        config:         config,
        engine:         engine,
        eth:            eth,
        mux:            mux,
        // NewTxsEvent面熟吧,前面讲交易时 TxPool会发出该事件,当一笔交易被放入到交易池
        // 这时候如果work空闲会把Tx放到work.txs准备下一次打包进块
        txsCh:          make(chan core.NewTxsEvent, txChanSize),
        // ChainHeadEvent事件,表示已经有一个块作为链头 work.ipdate监听到该事件会继续挖矿
        chainHeadCh:    make(chan core.ChainHeadEvent, chainHeadChanSize),
        // ChainSideEvent事件,表示一个新块作为链的旁支可能会被放入possibleUncles中
        chainSideCh:    make(chan core.ChainSideEvent, chainSideChanSize),
        // 区块链数据库
        chainDb:        eth.ChainDb(),
        recv:           make(chan *Result, resultQueueSize),
        chain:          eth.BlockChain(),
        proc:           eth.BlockChain().Validator(),
        // 可能的叔块
        possibleUncles: make(map[common.Hash]*types.Block),
        coinbase:       coinbase,
        agents:         make(map[Agent]struct{}),
        // 挖出的未被确认的区块
        unconfirmed:    newUnconfirmedBlocks(eth.BlockChain(), miningLogAtDepth),
    // Subscribe NewTxsEvent for tx pool
    worker.txsSub = eth.TxPool().SubscribeNewTxsEvent(worker.txsCh)
    // Subscribe events for blockchain
    worker.chainHeadSub = eth.BlockChain().SubscribeChainHeadEvent(worker.chainHeadCh)
    worker.chainSideSub = eth.BlockChain().SubscribeChainSideEvent(worker.chainSideCh)
    go worker.update()
    go worker.wait()
    return worker

从上面可以看出,worker.update方法处理的是上面的event事件。 这是我们上次分析交易模块时处理交易提交的地方。

那么新块的写入是在哪里进行的呢? 让我们看看 worker.wait 来寻找一些线索。

func (self *worker) wait() {
    for {
        for result := range self.recv {
            atomic.AddInt32(&self.atWork, -1)
            if result == nil {
            block := result.Block
            work := result.Work
            // Update the block hash in all logs since it is now available and not when the
            // receipt/log of individual transactions were created.
            // 更新所有日志的块哈希
            for _, r := range work.receipts {
                for _, l := range r.Logs {
                    l.BlockHash = block.Hash()
            for _, log := range work.state.Logs() {
                log.BlockHash = block.Hash()
            stat, err := self.chain.WriteBlockWithState(block, work.receipts, work.state)
            if err != nil {
                log.Error("Failed writing block to chain", "err", err)
            // Broadcast the block and announce chain insertion event
            // 广播新区块并宣布链插入事件
            self.mux.Post(core.NewMinedBlockEvent{Block: block})
            var (
                events []interface{}
                logs   = work.state.Logs()
            events = append(events, core.ChainEvent{Block: block, Hash: block.Hash(), Logs: logs})


if stat == core.CanonStatTy { events = append(events, core.ChainHeadEvent{Block: block}) } self.chain.PostChainEvents(events, logs) // Insert the block into the set of pending ones to wait for confirmations // 将数据插入待处理块中,等待确认 self.unconfirmed.Insert(block.NumberU64(), block.Hash()) } } }

然后回到矿工的创建。 创建worker后以太坊挖矿模块,会为worker注册agent。

func NewCpuAgent(chain consensus.ChainReader, engine consensus.Engine) *CpuAgent {
    miner := &CpuAgent{
        chain:  chain,
        engine: engine,
        stop:   make(chan struct{}, 1),
        workCh: make(chan *Work, 1),
    return miner
func (self *Miner) Register(agent Agent) {
    if self.Mining() {
func (self *worker) register(agent Agent) {
    defer self.mu.Unlock()
    self.agents[agent] = struct{}{}


// update keeps track of the downloader events. Please be aware that this is a one shot type of update loop.
// It's entered once and as soon as `Done` or `Failed` has been broadcasted the events are unregistered and
// the loop is exited. This to prevent a major security vuln where external parties can DOS you with blocks
// and halt your mining operation for as long as the DOS continues.
// update会跟踪下载程序事件。 请注意,这是一次性更新循环。
// 一旦广播“完成”或“失败”,事件就会被取消注册并退出循环。
// 这可以防止主要的安全漏洞,外部各方可以使用块来阻止你
// 并且只要DOS继续就停止你的挖掘操作
func (self *Miner) update() {
    events := self.mux.Subscribe(downloader.StartEvent{}, downloader.DoneEvent{}, downloader.FailedEvent{})
    for ev := range events.Chan() {
        switch ev.Data.(type) {
        // 下载开始
        case downloader.StartEvent:
            atomic.StoreInt32(&self.canStart, 0)
            if self.Mining() {
                atomic.StoreInt32(&self.shouldStart, 1)
                log.Info("Mining aborted due to sync")
            // 下载完成或失败
        case downloader.DoneEvent, downloader.FailedEvent:
            shouldStart := atomic.LoadInt32(&self.shouldStart) == 1
            atomic.StoreInt32(&self.canStart, 1)
            atomic.StoreInt32(&self.shouldStart, 0)
            if shouldStart {
                // 开始挖矿
            // unsubscribe. we're only interested in this event once
            // stop immediately and ignore all further pending events
            break out
func (self *Miner) Start(coinbase common.Address) {
    atomic.StoreInt32(&self.shouldStart, 1)
    if atomic.LoadInt32(&self.canStart) == 0 {
        log.Info("Network syncing, will start miner afterwards")
    atomic.StoreInt32(&self.mining, 1)
    log.Info("Starting mining operation")
    // 真正开始挖矿

worker.start() 函数表示真正挖矿的是worker。 继续下钻到相应的代码。

func (self *worker) start() {


defer self.mu.Unlock() atomic.StoreInt32(&self.mining, 1) // spin up agents // 遍历所有的agent,通知agent开始挖矿 for agent := range self.agents { agent.Start() } }

那么,它还不是真正的挖矿代码。 我们还得继续深入虎穴,从agent里面开始探索。

func (self *CpuAgent) Start() {
    if !atomic.CompareAndSwapInt32(&self.isMining, 0, 1) {
        return // agent already started
    go self.update()
func (self *CpuAgent) update() {
    for {
        select {
        // 检测是否有工作信号进入
        case work := <-self.workCh:
            if self.quitCurrentOp != nil {
            self.quitCurrentOp = make(chan struct{})
            go self.mine(work, self.quitCurrentOp)
            // 监测停止信号
        case <-self.stop:
            if self.quitCurrentOp != nil {
                self.quitCurrentOp = nil
            break out
func (self *CpuAgent) mine(work *Work, stop <-chan struct{}) {
    // 通过共识引擎算法来挖矿
    if result, err := self.engine.Seal(self.chain, work.Block, stop); result != nil {
        log.Info("Successfully sealed new block", "number", result.Number(), "hash", result.Hash())
        self.returnCh <- &Result{work, result}
    } else {
        if err != nil {
            log.Warn("Block sealing failed", "err", err)
        self.returnCh <- nil

这里agent.update和miner.update的逻辑是类似的。 幸运的是,在这里终于看到了最终挖矿中agent封装的共识引擎。


// 完成待挖掘区块的数据组装
func (self *worker) commitNewWork() {
    // 相关锁设置
    defer self.mu.Unlock()
    defer self.uncleMu.Unlock()
    defer self.currentMu.Unlock()
    tstart := time.Now()
    parent := self.chain.CurrentBlock()
    tstamp := tstart.Unix()
    if parent.Time().Cmp(new(big.Int).SetInt64(tstamp)) >= 0 {
        tstamp = parent.Time().Int64() + 1
    // this will ensure we're not going off too far in the future
    // 确保新区块的产生不会花费太多时间
    if now := time.Now().Unix(); tstamp > now+1 {
        wait := time.Duration(tstamp-now) * time.Second
        log.Info("Mining too far in the future", "wait", common.PrettyDuration(wait))
    // 组装区块头信息
    num := parent.Number()
    header := &types.Header{
        ParentHash: parent.Hash(),
        Number:     num.Add(num, common.Big1),
        GasLimit:   core.CalcGasLimit(parent),
        Extra:      self.extra,
        Time:       big.NewInt(tstamp),
    // Only set the coinbase if we are mining (avoid spurious block rewards)


// 如果正在挖掘,设置coinbase if atomic.LoadInt32(&self.mining) == 1 { header.Coinbase = self.coinbase } // 确保区块头信息准备好 if err := self.engine.Prepare(self.chain, header); err != nil { log.Error("Failed to prepare header for mining", "err", err) return } // If we are care about TheDAO hard-fork check whether to override the extra-data or not // TheDAO硬分叉相关设置 if daoBlock := self.config.DAOForkBlock; daoBlock != nil { // Check whether the block is among the fork extra-override range limit := new(big.Int).Add(daoBlock, params.DAOForkExtraRange) // 根据新区块Number判断是否受TheDAO硬分叉的影响 if header.Number.Cmp(daoBlock) >= 0 && header.Number.Cmp(limit) < 0 { // Depending whether we support or oppose the fork, override differently if self.config.DAOForkSupport { // 支持TheDAO硬分叉 header.Extra = common.CopyBytes(params.DAOForkBlockExtra) } else if bytes.Equal(header.Extra, params.DAOForkBlockExtra) { header.Extra = []byte{} // If miner opposes, don't let it use the reserved extra-data } } } // Could potentially happen if starting to mine in an odd state. err := self.makeCurrent(parent, header) if err != nil { log.Error("Failed to create mining context", "err", err) return } // Create the current work task and check any fork transitions needed // 创建当前所需的工作环境work work := self.current // 给work设置相关硬分叉设置 if self.config.DAOForkSupport && self.config.DAOForkBlock != nil && self.config.DAOForkBlock.Cmp(header.Number) == 0 { misc.ApplyDAOHardFork(work.state) } // 准备新区块的交易列表 pending, err := self.eth.TxPool().Pending() if err != nil { log.Error("Failed to fetch pending transactions", "err", err) return } // 交易按价格和nonce值排序 txs := types.NewTransactionsByPriceAndNonce(self.current.signer, pending) // 提交交易 work.commitTransactions(self.mux, txs, self.chain, self.coinbase) // compute uncles for the new block. // 为新的区块计算叔块 var ( uncles []*types.Header badUncles []common.Hash ) for hash, uncle := range self.possibleUncles { if len(uncles) == 2 { break } if err := self.commitUncle(work, uncle.Header()); err != nil { log.Trace("Bad uncle found and will be removed", "hash", hash) log.Trace(fmt.Sprint(uncle)) badUncles = append(badUncles, hash) } else { log.Debug("Committing new uncle to block", "hash", hash) uncles = append(uncles, uncle.Header()) } } for _, hash := range badUncles { delete(self.possibleUncles, hash) } // Create the new block to seal with the consensus engine // 使用共识引擎对新区块进行赋值,包括Header.Root, TxHash, ReceiptHash, UncleHash几个属性 if work.Block, err = self.engine.Finalize(self.chain, header, work.state, work.txs, uncles, work.receipts); err != nil { log.Error("Failed to finalize block for sealing", "err", err) return } // We only care about logging if we're actually mining. if atomic.LoadInt32(&self.mining) == 1 { log.Info("Commit new mining work", "number", work.Block.Number(), "txs", work.tcount, "uncles", len(uncles), "elapsed", common.PrettyDuration(time.Since(tstart))) self.unconfirmed.Shift(work.Block.NumberU64() - 1) } // 加载工作环境 self.push(work) self.updateSnapshot() }

总结一下思路,首先挖矿中存在三种角色:miner,worker和agent。 miner是以太坊封装的对外挖矿接口,worker是给矿工用的,agent是真正实现挖矿的东东。 一个矿工有一个worker属性,一个worker有多个agent,通过共识引擎实现真正的挖矿。 还有一个工作是存储挖矿所需的数据环境。

当挖矿开始时,矿工会创建一个worker并为其注册相应的agent。 然后worker发送一个work object给agent,所有agent根据共识引擎进行挖矿。 当一个代理人完成挖矿后,会形成一个授权区块加上相应的工作,形成一个结果对象返回给矿工。







互联网颠覆世界,区块链颠覆互联网! -------------------------------------------------- ---20181012 01:09