diff --git a/go.mod b/go.mod index d029c05..0ee0258 100644 --- a/go.mod +++ b/go.mod @@ -122,7 +122,7 @@ require ( github.com/pmezard/go-difflib v1.0.0 // indirect github.com/polydawn/refmt v0.0.0-20190807091052-3d65705ee9f1 // indirect github.com/prometheus/client_model v0.6.0 // indirect - github.com/prometheus/common v0.50.0 // indirect + github.com/prometheus/common v0.51.1 // indirect github.com/prometheus/procfs v0.13.0 // indirect github.com/protolambda/bls12-381-util v0.1.0 // indirect github.com/quic-go/qpack v0.4.0 // indirect diff --git a/go.sum b/go.sum index 0eff705..0978caf 100644 --- a/go.sum +++ b/go.sum @@ -939,6 +939,8 @@ github.com/prometheus/common v0.15.0/go.mod h1:U+gB1OBLb1lF3O42bTCL+FK18tX9Oar16 github.com/prometheus/common v0.18.0/go.mod h1:U+gB1OBLb1lF3O42bTCL+FK18tX9Oar16Clt/msog/s= github.com/prometheus/common v0.50.0 h1:YSZE6aa9+luNa2da6/Tik0q0A5AbR+U003TItK57CPQ= github.com/prometheus/common v0.50.0/go.mod h1:wHFBCEVWVmHMUpg7pYcOm2QUR/ocQdYSJVQJKnHc3xQ= +github.com/prometheus/common v0.51.1 h1:eIjN50Bwglz6a/c3hAgSMcofL3nD+nFQkV6Dd4DsQCw= +github.com/prometheus/common v0.51.1/go.mod h1:lrWtQx+iDfn2mbH5GUzlH9TSHyfZpHkSiG1W7y3sF2Q= github.com/prometheus/procfs v0.0.0-20180725123919-05ee40e3a273/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= github.com/prometheus/procfs v0.0.0-20190117184657-bf6a532e95b1/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= diff --git a/pkg/crawler/ethereum.go b/pkg/crawler/ethereum.go index 1c9d790..65904cc 100644 --- a/pkg/crawler/ethereum.go +++ b/pkg/crawler/ethereum.go @@ -237,8 +237,13 @@ func NewEthereumCrawler(mainCtx *cli.Context, conf config.EthereumCrawlerConfig) func (c *EthereumCrawler) Run() { // init all the eth_protocols c.EthNode.ServeBeaconPing(c.Host.Host()) + c.EthNode.ServeBeaconGoodbye(c.Host.Host()) c.EthNode.ServeBeaconStatus(c.Host.Host()) c.EthNode.ServeBeaconMetadata(c.Host.Host()) + c.EthNode.ServeBeaconBlocksByRootV2(c.Host.Host()) + c.EthNode.ServeBeaconBlocksByRangeV2(c.Host.Host()) + c.EthNode.ServeBeaconBlobsByRootV1(c.Host.Host()) + c.EthNode.ServeBeaconBlobsByRangeV1(c.Host.Host()) // initialization secuence for the crawler c.Events.Start(c.ctx) diff --git a/pkg/networks/ethereum/beacon_blobs.go b/pkg/networks/ethereum/beacon_blobs.go new file mode 100644 index 0000000..50f5962 --- /dev/null +++ b/pkg/networks/ethereum/beacon_blobs.go @@ -0,0 +1,66 @@ +package ethereum + +import ( + "context" + + "github.com/migalabs/armiarma/pkg/networks/ethereum/rpc/methods" + "github.com/migalabs/armiarma/pkg/networks/ethereum/rpc/reqresp" + + "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/peer" + log "github.com/sirupsen/logrus" +) + +func (en *LocalEthereumNode) ServeBeaconBlobsByRangeV1(h host.Host) { + go func() { + sCtxFn := func() context.Context { + reqCtx, _ := context.WithTimeout(en.ctx, RPCTimeout) + return reqCtx + } + comp := new(reqresp.SnappyCompression) + listenReq := func(ctx context.Context, peerId peer.ID, handler reqresp.ChunkedRequestHandler) { + blobsRange := new(methods.BlobsByRangeReqV1) + err := handler.ReadRequest(blobsRange) + if err != nil { + _ = handler.WriteErrorChunk(reqresp.InvalidReqCode, "could not parse blobs_by_range request") + log.Errorf("failed to read blobs_by_range request: %v from %s", err, peerId.String()) + } else { + log.Info("dropped blobs_by_range request", *blobsRange) + } + } + b := methods.BlobsByRangeRPCv1 + streamHandler := b.MakeStreamHandler(sCtxFn, comp, listenReq) + h.SetStreamHandler(b.Protocol, streamHandler) + log.Info("Started serving blobs_by_range") + // wait untill the ctx is down + <-en.ctx.Done() // TODO: do it better + log.Info("Stopped serving blobs_by_range") + }() +} + +func (en *LocalEthereumNode) ServeBeaconBlobsByRootV1(h host.Host) { + go func() { + sCtxFn := func() context.Context { + reqCtx, _ := context.WithTimeout(en.ctx, RPCTimeout) + return reqCtx + } + comp := new(reqresp.SnappyCompression) + listenReq := func(ctx context.Context, peerId peer.ID, handler reqresp.ChunkedRequestHandler) { + blobRoots := new(methods.BlobByRootV1) + err := handler.ReadRequest(blobRoots) + if err != nil { + _ = handler.WriteErrorChunk(reqresp.InvalidReqCode, "could not parse blobs_by_root request") + log.Errorf("failed to read blobs_by_root request: %v from %s", err, peerId.String()) + } else { + log.Info("dropped blobs_by_root request", *blobRoots) + } + } + b := methods.BlobsByRootRPCv1 + streamHandler := b.MakeStreamHandler(sCtxFn, comp, listenReq) + h.SetStreamHandler(b.Protocol, streamHandler) + log.Info("Started serving blobs_by_root") + // wait untill the ctx is down + <-en.ctx.Done() // TODO: do it better + log.Info("Stopped serving blobs_by_root") + }() +} diff --git a/pkg/networks/ethereum/beacon_blocks.go b/pkg/networks/ethereum/beacon_blocks.go new file mode 100644 index 0000000..76918f0 --- /dev/null +++ b/pkg/networks/ethereum/beacon_blocks.go @@ -0,0 +1,66 @@ +package ethereum + +import ( + "context" + + "github.com/migalabs/armiarma/pkg/networks/ethereum/rpc/methods" + "github.com/migalabs/armiarma/pkg/networks/ethereum/rpc/reqresp" + + "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/peer" + log "github.com/sirupsen/logrus" +) + +func (en *LocalEthereumNode) ServeBeaconBlocksByRangeV2(h host.Host) { + go func() { + sCtxFn := func() context.Context { + reqCtx, _ := context.WithTimeout(en.ctx, RPCTimeout) + return reqCtx + } + comp := new(reqresp.SnappyCompression) + listenReq := func(ctx context.Context, peerId peer.ID, handler reqresp.ChunkedRequestHandler) { + blockRange := new(methods.BlocksByRootReq) + err := handler.ReadRequest(blockRange) + if err != nil { + _ = handler.WriteErrorChunk(reqresp.InvalidReqCode, "could not parse block_by_range request") + log.Errorf("failed to read block_by_range request: %v from %s", err, peerId.String()) + } else { + log.Infof("dropped block_by_range request %v", *blockRange) + } + } + m := methods.BlocksByRangeRPCv2 + streamHandler := m.MakeStreamHandler(sCtxFn, comp, listenReq) + h.SetStreamHandler(m.Protocol, streamHandler) + log.Info("Started serving block_by_range") + // wait untill the ctx is down + <-en.ctx.Done() // TODO: do it better + log.Info("Stopped serving block_by_range") + }() +} + +func (en *LocalEthereumNode) ServeBeaconBlocksByRootV2(h host.Host) { + go func() { + sCtxFn := func() context.Context { + reqCtx, _ := context.WithTimeout(en.ctx, RPCTimeout) + return reqCtx + } + comp := new(reqresp.SnappyCompression) + listenReq := func(ctx context.Context, peerId peer.ID, handler reqresp.ChunkedRequestHandler) { + blockRoot := new(methods.BlocksByRootReq) + err := handler.ReadRequest(blockRoot) + if err != nil { + _ = handler.WriteErrorChunk(reqresp.InvalidReqCode, "could not parse block_by_root request") + log.Error("failed to read block_by_root request: %v from %s", err, peerId.String()) + } else { + log.Infof("dropped block_by_root request %v", *blockRoot) + } + } + m := methods.BlocksByRootRPCv2 + streamHandler := m.MakeStreamHandler(sCtxFn, comp, listenReq) + h.SetStreamHandler(m.Protocol, streamHandler) + log.Info("Started serving block_by_root") + // wait untill the ctx is down + <-en.ctx.Done() // TODO: do it better + log.Info("Stopped serving block_by_root") + }() +} diff --git a/pkg/networks/ethereum/rpc/methods/blobs.go b/pkg/networks/ethereum/rpc/methods/blobs.go new file mode 100644 index 0000000..5d921fa --- /dev/null +++ b/pkg/networks/ethereum/rpc/methods/blobs.go @@ -0,0 +1,140 @@ +package methods + +import ( + "encoding/hex" + "fmt" + "github.com/migalabs/armiarma/pkg/networks/ethereum/rpc/reqresp" + "github.com/protolambda/ztyp/codec" + "github.com/protolambda/ztyp/tree" + "github.com/protolambda/ztyp/view" +) + +// https://github.com/ethereum/consensus-specs/blob/dev/specs/deneb/p2p-interface.md#blobsidecarsbyroot-v1 +const ( + MAX_BLOBS_PER_BLOCK int = 6 + MAX_BLOBS_PER_RPC_REQ int = MAX_REQUEST_BLOCKS_DENEB * MAX_BLOBS_PER_BLOCK +) + +type BlobIdentifier struct { + BlockRoot Root + Index view.Uint64View +} + +func (blobId *BlobIdentifier) Deserialize(dr *codec.DecodingReader) error { + return dr.FixedLenContainer(&blobId.BlockRoot, &blobId.Index) +} + +func (blobId *BlobIdentifier) Serialize(w *codec.EncodingWriter) error { + return w.FixedLenContainer(&blobId.BlockRoot, &blobId.Index) +} + +func (blobId *BlobIdentifier) ByteLength() uint64 { + return blobId.BlockRoot.FixedLength() + blobId.Index.FixedLength() +} + +func (blobId *BlobIdentifier) FixedLength() uint64 { + return blobId.BlockRoot.FixedLength() + blobId.Index.FixedLength() +} + +func (blobId *BlobIdentifier) HashTreeRoot(hFn tree.HashFn) Root { + return hFn.HashTreeRoot(&blobId.BlockRoot, &blobId.Index) +} + +func (blobId *BlobIdentifier) String() string { + return fmt.Sprintf("%v", *blobId) +} + +type BlobByRootV1 []BlobIdentifier + +func (b BlobByRootV1) Deserialize(dr *codec.DecodingReader) error { + var idx int = 0 + return dr.List( + func() codec.Deserializable { + i := idx + idx++ + return &b[i] + }, + uint64(len(b)), + uint64(MAX_BLOBS_PER_RPC_REQ)) +} + +func (b BlobByRootV1) Serialize(w *codec.EncodingWriter) error { + return w.List(func(i uint64) codec.Serializable { + return &b[i] + }, + uint64(len(b)), + uint64(MAX_BLOBS_PER_RPC_REQ)) +} + +func (b BlobByRootV1) ByteLength() uint64 { + return uint64(len(b) * (32 + 8)) +} + +func (b BlobByRootV1) FixedLength() uint64 { + return 0 +} + +func (b BlobByRootV1) String() string { + if len(b) == 0 { + return "empty blobs-by-root request" + } + out := make([]byte, 0, len(b)*66) + for i, bId := range b { + hex.Encode(out[i*66:], bId.BlockRoot[:]) + out[(i+1)*66-2] = ',' + out[(i+1)*66-1] = ' ' + } + return "blobs-by-root requested: " + string(out[:len(out)-1]) +} + +type BlobsByRangeReqV1 struct { + StartSlot Slot + Count view.Uint64View +} + +func (b *BlobsByRangeReqV1) Data() map[string]interface{} { + return map[string]interface{}{ + "start_slot": b.StartSlot, + "count": b.Count, + } +} + +func (b *BlobsByRangeReqV1) Deserialize(dr *codec.DecodingReader) error { + return dr.FixedLenContainer(&b.StartSlot, &b.Count) +} + +func (b *BlobsByRangeReqV1) Serialize(w *codec.EncodingWriter) error { + return w.FixedLenContainer(&b.StartSlot, &b.Count) +} + +const blobsByRangeReqBytes uint64 = 8 + 8 + +func (b BlobsByRangeReqV1) ByteLength() uint64 { + return blobsByRangeReqBytes +} + +func (b *BlobsByRangeReqV1) FixedLength() uint64 { + return blobsByRangeReqBytes +} + +func (b *BlobsByRangeReqV1) HashTreeRoot(hFn tree.HashFn) Root { + return hFn.HashTreeRoot(&b.StartSlot, &b.Count) +} + +func (b *BlobsByRangeReqV1) String() string { + return fmt.Sprintf("%v", *b) +} + +var BlobsByRangeRPCv1 = reqresp.RPCMethod{ + Protocol: "/eth2/beacon_chain/req/blob_sidecars_by_range/1/ssz_snappy", + RequestCodec: reqresp.NewSSZCodec(func() reqresp.SerDes { return new(BlobsByRangeReqV1) }, blobsByRangeReqBytes, blobsByRangeReqBytes), + ResponseChunkCodec: reqresp.NewSSZCodec(func() reqresp.SerDes { return new(BlobsByRangeReqV1) }, 0, uint64(0)), + DefaultResponseChunkCount: 20, +} + +var BlobsByRootRPCv1 = reqresp.RPCMethod{ + Protocol: "/eth2/beacon_chain/req/blob_sidecars_by_root/1/ssz_snappy", + RequestCodec: reqresp.NewSSZCodec(func() reqresp.SerDes { return new(BlobByRootV1) }, 0, uint64((32+8)*MAX_BLOBS_PER_RPC_REQ)), + ResponseChunkCodec: reqresp.NewSSZCodec(func() reqresp.SerDes { return new(BlobByRootV1) }, 0, uint64(0)), + DefaultResponseChunkCount: 20, +} diff --git a/pkg/networks/ethereum/rpc/methods/blocks.go b/pkg/networks/ethereum/rpc/methods/blocks.go index 950368b..5fd5eb3 100644 --- a/pkg/networks/ethereum/rpc/methods/blocks.go +++ b/pkg/networks/ethereum/rpc/methods/blocks.go @@ -3,43 +3,21 @@ package methods import ( "encoding/hex" "fmt" - "github.com/migalabs/armiarma/pkg/networks/ethereum/rpc/reqresp" - "github.com/protolambda/zrnt/eth2/beacon" - "github.com/protolambda/zrnt/eth2/beacon/common" "github.com/protolambda/ztyp/codec" "github.com/protolambda/ztyp/tree" "github.com/protolambda/ztyp/view" ) -// instead of parsing the whole body, we can just leave it as bytes. -type BeaconBlockBodyRaw []byte - -func (b *BeaconBlockBodyRaw) Limit() uint64 { - // just cap block body size at 1 MB - return 1 << 20 -} - -type BeaconBlock struct { - Slot Slot - ProposerIndex ValidatorIndex - ParentRoot Root - StateRoot Root - Body BeaconBlockBodyRaw -} - -type SignedBeaconBlock struct { - Message BeaconBlock - Signature BLSSignature -} +const MAX_REQUEST_BLOCKS_DENEB int = 128 -type BlocksByRangeReqV1 struct { +type BlocksByRangeReqV2 struct { StartSlot Slot Count view.Uint64View Step view.Uint64View } -func (r *BlocksByRangeReqV1) Data() map[string]interface{} { +func (r *BlocksByRangeReqV2) Data() map[string]interface{} { return map[string]interface{}{ "start_slot": r.StartSlot, "count": r.Count, @@ -47,41 +25,32 @@ func (r *BlocksByRangeReqV1) Data() map[string]interface{} { } } -func (d *BlocksByRangeReqV1) Deserialize(dr *codec.DecodingReader) error { +func (d *BlocksByRangeReqV2) Deserialize(dr *codec.DecodingReader) error { return dr.FixedLenContainer(&d.StartSlot, &d.Count, &d.Step) } -func (d *BlocksByRangeReqV1) Serialize(w *codec.EncodingWriter) error { +func (d *BlocksByRangeReqV2) Serialize(w *codec.EncodingWriter) error { return w.FixedLenContainer(&d.StartSlot, &d.Count, &d.Step) } const blocksByRangeReqByteLen = 8 + 8 + 8 -func (d BlocksByRangeReqV1) ByteLength() uint64 { +func (d *BlocksByRangeReqV2) ByteLength() uint64 { return blocksByRangeReqByteLen } -func (*BlocksByRangeReqV1) FixedLength() uint64 { +func (*BlocksByRangeReqV2) FixedLength() uint64 { return blocksByRangeReqByteLen } -func (d *BlocksByRangeReqV1) HashTreeRoot(hFn tree.HashFn) Root { +func (d *BlocksByRangeReqV2) HashTreeRoot(hFn tree.HashFn) Root { return hFn.HashTreeRoot(&d.StartSlot, &d.Count, &d.Step) } -func (r *BlocksByRangeReqV1) String() string { +func (r *BlocksByRangeReqV2) String() string { return fmt.Sprintf("%v", *r) } -func BlocksByRangeRPCv1(spec *common.Spec, opcBlock beacon.OpaqueBlock) *reqresp.RPCMethod { - return &reqresp.RPCMethod{ - Protocol: "/eth2/beacon_chain/req/beacon_blocks_by_range/1/ssz_snappy", - RequestCodec: reqresp.NewSSZCodec(func() reqresp.SerDes { return new(BlocksByRangeReqV1) }, blocksByRangeReqByteLen, blocksByRangeReqByteLen), - ResponseChunkCodec: reqresp.NewSSZCodec(func() reqresp.SerDes { return spec.Wrap(opcBlock) }, 0, opcBlock.ByteLength(spec)), - DefaultResponseChunkCount: 20, - } -} - const MAX_REQUEST_BLOCKS_BY_ROOT = 1024 type BlocksByRootReq []Root @@ -123,11 +92,18 @@ func (r BlocksByRootReq) String() string { return "blocks-by-root requested: " + string(out[:len(out)-1]) } -func BlocksByRootRPCv1(spec *common.Spec, opcBlock beacon.OpaqueBlock) *reqresp.RPCMethod { - return &reqresp.RPCMethod{ - Protocol: "/eth2/beacon_chain/req/beacon_blocks_by_root/1/ssz", - RequestCodec: reqresp.NewSSZCodec(func() reqresp.SerDes { return new(BlocksByRootReq) }, 0, 32*MAX_REQUEST_BLOCKS_BY_ROOT), - ResponseChunkCodec: reqresp.NewSSZCodec(func() reqresp.SerDes { return spec.Wrap(opcBlock) }, 0, opcBlock.ByteLength(spec)), - DefaultResponseChunkCount: 20, - } +// methods + +var BlocksByRangeRPCv2 = reqresp.RPCMethod{ + Protocol: "/eth2/beacon_chain/req/beacon_blocks_by_range/2/ssz_snappy", + RequestCodec: reqresp.NewSSZCodec(func() reqresp.SerDes { return new(BlocksByRangeReqV2) }, blocksByRangeReqByteLen, blocksByRangeReqByteLen), + ResponseChunkCodec: reqresp.NewSSZCodec(func() reqresp.SerDes { return new(BlocksByRangeReqV2) }, 0, uint64(0)), + DefaultResponseChunkCount: 20, +} + +var BlocksByRootRPCv2 = reqresp.RPCMethod{ + Protocol: "/eth2/beacon_chain/req/beacon_blocks_by_root/2/ssz_snappy", + RequestCodec: reqresp.NewSSZCodec(func() reqresp.SerDes { return new(BlocksByRootReq) }, 0, 32*MAX_REQUEST_BLOCKS_BY_ROOT), + ResponseChunkCodec: reqresp.NewSSZCodec(func() reqresp.SerDes { return new(BlocksByRootReq) }, 0, uint64(0)), + DefaultResponseChunkCount: 20, }