Skip to content

Commit

Permalink
core, eth: improve delivery speed on header requests (#23105)
Browse files Browse the repository at this point in the history
  • Loading branch information
cheng762 committed Nov 14, 2023
1 parent 418a648 commit e22abdf
Show file tree
Hide file tree
Showing 10 changed files with 351 additions and 20 deletions.
6 changes: 6 additions & 0 deletions core/blockchain_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,12 @@ func (bc *BlockChain) GetHeaderByNumber(number uint64) *types.Header {
return bc.hc.GetHeaderByNumber(number)
}

// GetHeadersFrom returns a contiguous segment of headers, in rlp-form, going
// backwards from the given number.
func (bc *BlockChain) GetHeadersFrom(number, count uint64) []rlp.RawValue {
return bc.hc.GetHeadersFrom(number, count)
}

// GetBody retrieves a block body (transactions and uncles) from the database by
// hash, caching it if found.
func (bc *BlockChain) GetBody(hash common.Hash) *types.Body {
Expand Down
41 changes: 41 additions & 0 deletions core/headerchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
crand "crypto/rand"
"errors"
"fmt"
"github.com/PlatONnetwork/PlatON-Go/rlp"
"math"
"math/big"
mrand "math/rand"
Expand Down Expand Up @@ -469,6 +470,46 @@ func (hc *HeaderChain) GetHeaderByNumber(number uint64) *types.Header {
return hc.GetHeader(hash, number)
}

// GetHeadersFrom returns a contiguous segment of headers, in rlp-form, going
// backwards from the given number.
// If the 'number' is higher than the highest local header, this method will
// return a best-effort response, containing the headers that we do have.
func (hc *HeaderChain) GetHeadersFrom(number, count uint64) []rlp.RawValue {
// If the request is for future headers, we still return the portion of
// headers that we are able to serve
if current := hc.CurrentHeader().Number.Uint64(); current < number {
if count > number-current {
count -= number - current
number = current
} else {
return nil
}
}
var headers []rlp.RawValue
// If we have some of the headers in cache already, use that before going to db.
hash := rawdb.ReadCanonicalHash(hc.chainDb, number)
if hash == (common.Hash{}) {
return nil
}
for count > 0 {
header, ok := hc.headerCache.Get(hash)
if !ok {
break
}
h := header.(*types.Header)
rlpData, _ := rlp.EncodeToBytes(h)
headers = append(headers, rlpData)
hash = h.ParentHash
count--
number--
}
// Read remaining from db
if count > 0 {
headers = append(headers, rawdb.ReadHeaderRange(hc.chainDb, number, count)...)
}
return headers
}

func (hc *HeaderChain) GetCanonicalHash(number uint64) common.Hash {
return rawdb.ReadCanonicalHash(hc.chainDb, number)
}
Expand Down
50 changes: 50 additions & 0 deletions core/rawdb/accessors_chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,56 @@ func WriteFastTxLookupLimit(db ethdb.KeyValueWriter, number uint64) {
}
}

// ReadHeaderRange returns the rlp-encoded headers, starting at 'number', and going
// backwards towards genesis. This method assumes that the caller already has
// placed a cap on count, to prevent DoS issues.
// Since this method operates in head-towards-genesis mode, it will return an empty
// slice in case the head ('number') is missing. Hence, the caller must ensure that
// the head ('number') argument is actually an existing header.
//
// N.B: Since the input is a number, as opposed to a hash, it's implicit that
// this method only operates on canon headers.
func ReadHeaderRange(db ethdb.Reader, number uint64, count uint64) []rlp.RawValue {
var rlpHeaders []rlp.RawValue
if count == 0 {
return rlpHeaders
}
i := number
if count-1 > number {
// It's ok to request block 0, 1 item
count = number + 1
}
limit, _ := db.Ancients()
// First read live blocks
if i >= limit {
// If we need to read live blocks, we need to figure out the hash first
hash := ReadCanonicalHash(db, number)
for ; i >= limit && count > 0; i-- {
if data, _ := db.Get(headerKey(i, hash)); len(data) > 0 {
rlpHeaders = append(rlpHeaders, data)
// Get the parent hash for next query
hash = types.HeaderParentHashFromRLP(data)
} else {
break // Maybe got moved to ancients
}
count--
}
}
if count == 0 {
return rlpHeaders
}
// read remaining from ancients
max := count * 700
data, err := db.AncientRange(freezerHeaderTable, i+1-count, count, max)
if err == nil && uint64(len(data)) == count {
// the data is on the order [h, h+1, .., n] -- reordering needed
for i := range data {
rlpHeaders = append(rlpHeaders, data[len(data)-1-i])
}
}
return rlpHeaders
}

// ReadHeaderRLP retrieves a block header in its raw RLP database encoding.
func ReadHeaderRLP(db ethdb.Reader, hash common.Hash, number uint64) rlp.RawValue {
var data []byte
Expand Down
64 changes: 64 additions & 0 deletions core/rawdb/accessors_chain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"bytes"
"encoding/hex"
"fmt"
"io/ioutil"
"math/big"
"math/rand"
"os"
Expand Down Expand Up @@ -814,3 +815,66 @@ func makeTestReceipts(n int, nPerBlock int) []types.Receipts {
}
return allReceipts
}

func TestHeadersRLPStorage(t *testing.T) {
// Have N headers in the freezer
frdir, err := ioutil.TempDir("", "")
if err != nil {
t.Fatalf("failed to create temp freezer dir: %v", err)
}
defer os.Remove(frdir)

db, err := NewDatabaseWithFreezer(NewMemoryDatabase(), frdir, "", false)
if err != nil {
t.Fatalf("failed to create database with ancient backend")
}
defer db.Close()
// Create blocks
var chain []*types.Block
var pHash common.Hash
for i := 0; i < 100; i++ {
block := types.NewBlockWithHeader(&types.Header{
Number: big.NewInt(int64(i)),
Extra: []byte("test block"),
TxHash: types.EmptyRootHash,
ReceiptHash: types.EmptyRootHash,
ParentHash: pHash,
})
chain = append(chain, block)
pHash = block.Hash()
}
var receipts []types.Receipts = make([]types.Receipts, 100)
// Write first half to ancients
WriteAncientBlocks(db, chain[:50], receipts[:50])
// Write second half to db
for i := 50; i < 100; i++ {
WriteCanonicalHash(db, chain[i].Hash(), chain[i].NumberU64())
WriteBlock(db, chain[i])
}
checkSequence := func(from, amount int) {
headersRlp := ReadHeaderRange(db, uint64(from), uint64(amount))
if have, want := len(headersRlp), amount; have != want {
t.Fatalf("have %d headers, want %d", have, want)
}
for i, headerRlp := range headersRlp {
var header types.Header
if err := rlp.DecodeBytes(headerRlp, &header); err != nil {
t.Fatal(err)
}
if have, want := header.Number.Uint64(), uint64(from-i); have != want {
t.Fatalf("wrong number, have %d want %d", have, want)
}
}
}
checkSequence(99, 20) // Latest block and 19 parents
checkSequence(99, 50) // Latest block -> all db blocks
checkSequence(99, 51) // Latest block -> one from ancients
checkSequence(99, 52) // Latest blocks -> two from ancients
checkSequence(50, 2) // One from db, one from ancients
checkSequence(49, 1) // One from ancients
checkSequence(49, 50) // All ancient ones
checkSequence(99, 100) // All blocks
checkSequence(0, 1) // Only genesis
checkSequence(1, 1) // Only block 1
checkSequence(1, 2) // Genesis + block 1
}
18 changes: 18 additions & 0 deletions core/types/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -571,3 +571,21 @@ func (b Blocks) String() string {
s += "]"
return s
}

// HeaderParentHashFromRLP returns the parentHash of an RLP-encoded
// header. If 'header' is invalid, the zero hash is returned.
func HeaderParentHashFromRLP(header []byte) common.Hash {
// parentHash is the first list element.
listContent, _, err := rlp.SplitList(header)
if err != nil {
return common.Hash{}
}
parentHash, _, err := rlp.SplitString(listContent)
if err != nil {
return common.Hash{}
}
if len(parentHash) != 32 {
return common.Hash{}
}
return common.BytesToHash(parentHash)
}
57 changes: 57 additions & 0 deletions core/types/block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,3 +273,60 @@ func makeBenchBlock() *Block {
}
return NewBlock(header, txs, receipts, newHasher())
}

func TestRlpDecodeParentHash(t *testing.T) {
// A minimum one
want := common.HexToHash("0x112233445566778899001122334455667788990011223344556677889900aabb")
if rlpData, err := rlp.EncodeToBytes(&Header{ParentHash: want}); err != nil {
t.Fatal(err)
} else {
if have := HeaderParentHashFromRLP(rlpData); have != want {
t.Fatalf("have %x, want %x", have, want)
}
}
// And a maximum one
// | Number | dynamic| *big.Int | 64 bits |
// | Extra | dynamic| []byte | 65+32 byte (clique) |
// | BaseFee | dynamic| *big.Int | 64 bits |
if rlpData, err := rlp.EncodeToBytes(&Header{
ParentHash: want,
Number: new(big.Int).SetUint64(math.MaxUint64),
Extra: make([]byte, 65+32),
BaseFee: new(big.Int).SetUint64(math.MaxUint64),
}); err != nil {
t.Fatal(err)
} else {
if have := HeaderParentHashFromRLP(rlpData); have != want {
t.Fatalf("have %x, want %x", have, want)
}
}
// Also test a very very large header.
{
// The rlp-encoding of the heder belowCauses _total_ length of 65540,
// which is the first to blow the fast-path.
h := &Header{
ParentHash: want,
Extra: make([]byte, 65041),
}
if rlpData, err := rlp.EncodeToBytes(h); err != nil {
t.Fatal(err)
} else {
if have := HeaderParentHashFromRLP(rlpData); have != want {
t.Fatalf("have %x, want %x", have, want)
}
}
}
{
// Test some invalid erroneous stuff
for i, rlpData := range [][]byte{
nil,
common.FromHex("0x"),
common.FromHex("0x01"),
common.FromHex("0x3031323334"),
} {
if have, want := HeaderParentHashFromRLP(rlpData), (common.Hash{}); have != want {
t.Fatalf("invalid %d: have %x, want %x", i, have, want)
}
}
}
}
20 changes: 16 additions & 4 deletions eth/downloader/downloader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,20 +171,32 @@ func (dlp *downloadTesterPeer) Head() (common.Hash, *big.Int) {
return head.Hash(), head.Number()
}

func unmarshalRlpHeaders(rlpdata []rlp.RawValue) []*types.Header {
var headers = make([]*types.Header, len(rlpdata))
for i, data := range rlpdata {
var h types.Header
if err := rlp.DecodeBytes(data, &h); err != nil {
panic(err)
}
headers[i] = &h
}
return headers
}

// RequestHeadersByHash constructs a GetBlockHeaders function based on a hashed
// origin; associated with a particular peer in the download tester. The returned
// function can be used to retrieve batches of headers from the particular peer.
func (dlp *downloadTesterPeer) RequestHeadersByHash(origin common.Hash, amount int, skip int, reverse bool, sink chan *eth.Response) (*eth.Request, error) {
// Service the header query via the live handler code
headers := eth.ServiceGetBlockHeadersQuery(dlp.chain, &eth.GetBlockHeadersPacket{
rlpHeaders := eth.ServiceGetBlockHeadersQuery(dlp.chain, &eth.GetBlockHeadersPacket{
Origin: eth.HashOrNumber{
Hash: origin,
},
Amount: uint64(amount),
Skip: uint64(skip),
Reverse: reverse,
}, nil)

headers := unmarshalRlpHeaders(rlpHeaders)
// If a malicious peer is simulated withholding headers, delete them
for hash := range dlp.withholdHeaders {
for i, header := range headers {
Expand Down Expand Up @@ -220,15 +232,15 @@ func (dlp *downloadTesterPeer) RequestHeadersByHash(origin common.Hash, amount i
// function can be used to retrieve batches of headers from the particular peer.
func (dlp *downloadTesterPeer) RequestHeadersByNumber(origin uint64, amount int, skip int, reverse bool, sink chan *eth.Response) (*eth.Request, error) {
// Service the header query via the live handler code
headers := eth.ServiceGetBlockHeadersQuery(dlp.chain, &eth.GetBlockHeadersPacket{
rlpHeaders := eth.ServiceGetBlockHeadersQuery(dlp.chain, &eth.GetBlockHeadersPacket{
Origin: eth.HashOrNumber{
Number: origin,
},
Amount: uint64(amount),
Skip: uint64(skip),
Reverse: reverse,
}, nil)

headers := unmarshalRlpHeaders(rlpHeaders)
// If a malicious peer is simulated withholding headers, delete them
for hash := range dlp.withholdHeaders {
for i, header := range headers {
Expand Down
19 changes: 13 additions & 6 deletions eth/protocols/eth/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,11 +146,13 @@ func testGetBlockHeaders(t *testing.T, protocol uint) {
query *GetBlockHeadersPacket // The query to execute for header retrieval
expect []common.Hash // The hashes of the block whose headers are expected
}{
// A single random block should be retrievable by hash and number too
// A single random block should be retrievable by hash
{
&GetBlockHeadersPacket{Origin: HashOrNumber{Hash: backend.chain.GetBlockByNumber(limit / 2).Hash()}, Amount: 1},
[]common.Hash{backend.chain.GetBlockByNumber(limit / 2).Hash()},
}, {
},
// A single random block should be retrievable by number
{
&GetBlockHeadersPacket{Origin: HashOrNumber{Number: limit / 2}, Amount: 1},
[]common.Hash{backend.chain.GetBlockByNumber(limit / 2).Hash()},
},
Expand Down Expand Up @@ -190,10 +192,15 @@ func testGetBlockHeaders(t *testing.T, protocol uint) {
{
&GetBlockHeadersPacket{Origin: HashOrNumber{Number: 0}, Amount: 1},
[]common.Hash{backend.chain.GetBlockByNumber(0).Hash()},
}, {
},
{
&GetBlockHeadersPacket{Origin: HashOrNumber{Number: backend.chain.CurrentBlock().NumberU64()}, Amount: 1},
[]common.Hash{backend.chain.CurrentBlock().Hash()},
},
{ // If the peer requests a bit into the future, we deliver what we have
&GetBlockHeadersPacket{Origin: HashOrNumber{Number: backend.chain.CurrentBlock().NumberU64()}, Amount: 10},
[]common.Hash{backend.chain.CurrentBlock().Hash()},
},
// Ensure protocol limits are honored
{
&GetBlockHeadersPacket{Origin: HashOrNumber{Number: backend.chain.CurrentBlock().NumberU64() - 1}, Amount: limit + 10, Reverse: true},
Expand Down Expand Up @@ -290,7 +297,7 @@ func testGetBlockHeaders(t *testing.T, protocol uint) {
RequestId: 456,
BlockHeadersPacket: headers,
}); err != nil {
t.Errorf("test %d: headers mismatch: %v", i, err)
t.Errorf("test %d by hash: headers mismatch: %v", i, err)
}
}
}
Expand Down Expand Up @@ -426,9 +433,8 @@ func testGetNodeData(t *testing.T, protocol uint) {
peer, _ := newTestPeer("peer", protocol, backend)
defer peer.close()

// Fetch for now the entire chain db
// Collect all state tree hashes.
var hashes []common.Hash

it := backend.db.NewIterator(nil, nil)
for it.Next() {
if key := it.Key(); len(key) == common.HashLength {
Expand All @@ -437,6 +443,7 @@ func testGetNodeData(t *testing.T, protocol uint) {
}
it.Release()

// Request all hashes.
p2p.Send(peer.app, GetNodeDataMsg, GetNodeDataPacket66{
RequestId: 123,
GetNodeDataPacket: hashes,
Expand Down
Loading

0 comments on commit e22abdf

Please sign in to comment.