From 3a9306e9c736f64ebeb6eaa2c6c4ea0e0dbb24f3 Mon Sep 17 00:00:00 2001 From: Cody Littley <56973212+cody-littley@users.noreply.github.com> Date: Tue, 7 Jan 2025 10:54:03 -0600 Subject: [PATCH 1/6] Ignore .DS_Store files, which are generated by the OSX Finder app. (#1070) Signed-off-by: Cody Littley --- .gitignore | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.gitignore b/.gitignore index 40fe5f34c..f4dc62269 100644 --- a/.gitignore +++ b/.gitignore @@ -18,3 +18,6 @@ lightnode/docker/args.sh .vscode icicle/* + +# OSX specific +.DS_Store From 7459132b398c818298be5ae7e0da51c54af73654 Mon Sep 17 00:00:00 2001 From: anupsv Date: Wed, 8 Jan 2025 01:34:30 +0530 Subject: [PATCH 2/6] Update codeQL-scanning.yaml (#1020) --- .github/workflows/codeQL-scanning.yaml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.github/workflows/codeQL-scanning.yaml b/.github/workflows/codeQL-scanning.yaml index 897553f96..4338c4f63 100644 --- a/.github/workflows/codeQL-scanning.yaml +++ b/.github/workflows/codeQL-scanning.yaml @@ -19,6 +19,8 @@ on: - 'api/**' - '.github/codeql/**' - '.github/workflows/codeql-analysis.yml' + paths-ignore: + - 'contracts/bindings/**' schedule: - cron: '0 9 * * *' From 9dfd08b60bd5d696fd93dcd5fce057d2d203cbdf Mon Sep 17 00:00:00 2001 From: hopeyen <60078528+hopeyen@users.noreply.github.com> Date: Tue, 7 Jan 2025 15:09:45 -0600 Subject: [PATCH 3/6] fix: payment metadata invalid if both payment fields empty (#1073) --- disperser/apiserver/disperse_blob_v2.go | 2 +- disperser/apiserver/server_v2_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/disperser/apiserver/disperse_blob_v2.go b/disperser/apiserver/disperse_blob_v2.go index d8fcd3dd8..4e3e8ca7d 100644 --- a/disperser/apiserver/disperse_blob_v2.go +++ b/disperser/apiserver/disperse_blob_v2.go @@ -118,7 +118,7 @@ func (s *DispersalServerV2) validateDispersalRequest(ctx context.Context, req *p return api.NewErrorInvalidArg("payment metadata is required") } - if len(blobHeader.PaymentMetadata.AccountID) == 0 || blobHeader.PaymentMetadata.ReservationPeriod == 0 || blobHeader.PaymentMetadata.CumulativePayment == nil { + if len(blobHeader.PaymentMetadata.AccountID) == 0 || (blobHeader.PaymentMetadata.ReservationPeriod == 0 && blobHeader.PaymentMetadata.CumulativePayment.Cmp(big.NewInt(0)) == 0) { return api.NewErrorInvalidArg("invalid payment metadata") } diff --git a/disperser/apiserver/server_v2_test.go b/disperser/apiserver/server_v2_test.go index 367d3c9bb..305b6bd43 100644 --- a/disperser/apiserver/server_v2_test.go +++ b/disperser/apiserver/server_v2_test.go @@ -223,7 +223,7 @@ func TestV2DisperseBlobRequestValidation(t *testing.T) { PaymentHeader: &pbcommon.PaymentHeader{ AccountId: accountID, ReservationPeriod: 0, - CumulativePayment: big.NewInt(100).Bytes(), + CumulativePayment: big.NewInt(0).Bytes(), }, } blobHeader, err := corev2.BlobHeaderFromProtobuf(invalidReqProto) From c521c447bf97ab4fd511ded3876dc8e041284629 Mon Sep 17 00:00:00 2001 From: Ian Shim <100327837+ian-shim@users.noreply.github.com> Date: Wed, 8 Jan 2025 06:10:18 +0900 Subject: [PATCH 4/6] [v2] Retriever server (#1048) --- api/clients/v2/mock/retrieval_client.go | 27 +++ api/docs/eigenda-protos.html | 120 ++++++++++ api/docs/eigenda-protos.md | 76 +++++++ api/docs/retriever.html | 38 ++-- api/docs/retriever.md | 35 ++- api/grpc/retriever/v2/retriever.pb.go | 248 +++++++++++++++++++++ api/grpc/retriever/v2/retriever_grpc.pb.go | 113 ++++++++++ api/proto/retriever/v2/retriever.proto | 39 ++++ core/data.go | 4 + core/v2/types.go | 3 + retriever/cmd/main.go | 94 ++++---- retriever/config.go | 37 ++- retriever/flags/flags.go | 20 +- retriever/v2/server.go | 85 +++++++ retriever/v2/server_test.go | 134 +++++++++++ tools/traffic/config/config.go | 8 +- 16 files changed, 954 insertions(+), 127 deletions(-) create mode 100644 api/clients/v2/mock/retrieval_client.go create mode 100644 api/grpc/retriever/v2/retriever.pb.go create mode 100644 api/grpc/retriever/v2/retriever_grpc.pb.go create mode 100644 api/proto/retriever/v2/retriever.proto create mode 100644 retriever/v2/server.go create mode 100644 retriever/v2/server_test.go diff --git a/api/clients/v2/mock/retrieval_client.go b/api/clients/v2/mock/retrieval_client.go new file mode 100644 index 000000000..6f0e23b72 --- /dev/null +++ b/api/clients/v2/mock/retrieval_client.go @@ -0,0 +1,27 @@ +package mock + +import ( + "context" + + "github.com/Layr-Labs/eigenda/api/clients/v2" + "github.com/Layr-Labs/eigenda/core" + corev2 "github.com/Layr-Labs/eigenda/core/v2" + "github.com/stretchr/testify/mock" +) + +type MockRetrievalClient struct { + mock.Mock +} + +var _ clients.RetrievalClient = (*MockRetrievalClient)(nil) + +func NewRetrievalClient() *MockRetrievalClient { + return &MockRetrievalClient{} +} + +func (c *MockRetrievalClient) GetBlob(ctx context.Context, blobHeader *corev2.BlobHeader, referenceBlockNumber uint64, quorumID core.QuorumID) ([]byte, error) { + args := c.Called() + + result := args.Get(0) + return result.([]byte), args.Error(1) +} diff --git a/api/docs/eigenda-protos.html b/api/docs/eigenda-protos.html index 44474db3c..3981fbfb1 100644 --- a/api/docs/eigenda-protos.html +++ b/api/docs/eigenda-protos.html @@ -624,6 +624,29 @@

Table of Contents

+ +
  • + retriever/v2/retriever.proto + +
  • +
  • Scalar Value Types
  • @@ -3955,6 +3978,103 @@

    Retriever

    + +
    +

    retriever/v2/retriever.proto

    Top +
    +

    + + +

    BlobReply

    +

    + + + + + + + + + + + + + + + + +
    FieldTypeLabelDescription
    databytes

    The blob retrieved and reconstructed from the EigenDA Nodes per BlobRequest.

    + + + + + +

    BlobRequest

    +

    + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
    FieldTypeLabelDescription
    blob_headercommon.v2.BlobHeader

    header of the blob to be retrieved

    reference_block_numberuint32

    The Ethereum block number at which the batch for this blob was constructed.

    quorum_iduint32

    Which quorum of the blob this is requesting for (note a blob can participate in +multiple quorums).

    + + + + + + + + + + + +

    Retriever

    +

    The Retriever is a service for retrieving chunks corresponding to a blob from

    the EigenDA operator nodes and reconstructing the original blob from the chunks.

    This is a client-side library that the users are supposed to operationalize.

    Note: Users generally have two ways to retrieve a blob from EigenDA V2:

    1) Retrieve from the relay that the blob is assigned to: the API

    is Relay.GetBlob() as defined in api/proto/relay/relay.proto

    2) Retrieve directly from the EigenDA Nodes, which is supported by this Retriever.

    The Relay.GetBlob() (the 1st approach) is generally faster and cheaper as the

    relay manages the blobs that it has processed, whereas the Retriever.RetrieveBlob()

    (the 2nd approach here) removes the need to trust the relay, with the downside of

    worse cost and performance.

    + + + + + + + + + + + + + + +
    Method NameRequest TypeResponse TypeDescription
    RetrieveBlobBlobRequestBlobReply

    This fans out request to EigenDA Nodes to retrieve the chunks and returns the +reconstructed original blob in response.

    + + +

    Scalar Value Types

    diff --git a/api/docs/eigenda-protos.md b/api/docs/eigenda-protos.md index 60d2a21c9..69f441396 100644 --- a/api/docs/eigenda-protos.md +++ b/api/docs/eigenda-protos.md @@ -119,6 +119,12 @@ - [Retriever](#retriever-Retriever) +- [retriever/v2/retriever.proto](#retriever_v2_retriever-proto) + - [BlobReply](#retriever-v2-BlobReply) + - [BlobRequest](#retriever-v2-BlobRequest) + + - [Retriever](#retriever-v2-Retriever) + - [Scalar Value Types](#scalar-value-types) @@ -1737,6 +1743,76 @@ worse cost and performance. + +

    Top

    + +## retriever/v2/retriever.proto + + + + + +### BlobReply + + + +| Field | Type | Label | Description | +| ----- | ---- | ----- | ----------- | +| data | [bytes](#bytes) | | The blob retrieved and reconstructed from the EigenDA Nodes per BlobRequest. | + + + + + + + + +### BlobRequest + + + +| Field | Type | Label | Description | +| ----- | ---- | ----- | ----------- | +| blob_header | [common.v2.BlobHeader](#common-v2-BlobHeader) | | header of the blob to be retrieved | +| reference_block_number | [uint32](#uint32) | | The Ethereum block number at which the batch for this blob was constructed. | +| quorum_id | [uint32](#uint32) | | Which quorum of the blob this is requesting for (note a blob can participate in multiple quorums). | + + + + + + + + + + + + + + +### Retriever +The Retriever is a service for retrieving chunks corresponding to a blob from +the EigenDA operator nodes and reconstructing the original blob from the chunks. +This is a client-side library that the users are supposed to operationalize. + +Note: Users generally have two ways to retrieve a blob from EigenDA V2: + 1) Retrieve from the relay that the blob is assigned to: the API + is Relay.GetBlob() as defined in api/proto/relay/relay.proto + 2) Retrieve directly from the EigenDA Nodes, which is supported by this Retriever. + +The Relay.GetBlob() (the 1st approach) is generally faster and cheaper as the +relay manages the blobs that it has processed, whereas the Retriever.RetrieveBlob() +(the 2nd approach here) removes the need to trust the relay, with the downside of +worse cost and performance. + +| Method Name | Request Type | Response Type | Description | +| ----------- | ------------ | ------------- | ------------| +| RetrieveBlob | [BlobRequest](#retriever-v2-BlobRequest) | [BlobReply](#retriever-v2-BlobReply) | This fans out request to EigenDA Nodes to retrieve the chunks and returns the reconstructed original blob in response. | + + + + + ## Scalar Value Types | .proto Type | Notes | C++ | Java | Python | Go | C# | PHP | Ruby | diff --git a/api/docs/retriever.html b/api/docs/retriever.html index 07a0cef13..c5de205af 100644 --- a/api/docs/retriever.html +++ b/api/docs/retriever.html @@ -175,22 +175,22 @@

    Table of Contents

  • - retriever/retriever.proto + retriever/v2/retriever.proto @@ -203,12 +203,12 @@

    Table of Contents

    -

    retriever/retriever.proto

    Top +

    retriever/v2/retriever.proto

    Top

    -

    BlobReply

    +

    BlobReply

    @@ -232,7 +232,7 @@

    BlobReply

    -

    BlobRequest

    +

    BlobRequest

    @@ -243,20 +243,10 @@

    BlobRequest

  • - - - - - - - - - + + - + @@ -287,8 +277,8 @@

    BlobRequest

    -

    Retriever

    -

    The Retriever is a service for retrieving chunks corresponding to a blob from

    the EigenDA operator nodes and reconstructing the original blob from the chunks.

    This is a client-side library that the users are supposed to operationalize.

    Note: Users generally have two ways to retrieve a blob from EigenDA:

    1) Retrieve from the Disperser that the user initially used for dispersal: the API

    is Disperser.RetrieveBlob() as defined in api/proto/disperser/disperser.proto

    2) Retrieve directly from the EigenDA Nodes, which is supported by this Retriever.

    The Disperser.RetrieveBlob() (the 1st approach) is generally faster and cheaper as the

    Disperser manages the blobs that it has processed, whereas the Retriever.RetrieveBlob()

    (the 2nd approach here) removes the need to trust the Disperser, with the downside of

    worse cost and performance.

    +

    Retriever

    +

    The Retriever is a service for retrieving chunks corresponding to a blob from

    the EigenDA operator nodes and reconstructing the original blob from the chunks.

    This is a client-side library that the users are supposed to operationalize.

    Note: Users generally have two ways to retrieve a blob from EigenDA V2:

    1) Retrieve from the relay that the blob is assigned to: the API

    is Relay.GetBlob() as defined in api/proto/relay/relay.proto

    2) Retrieve directly from the EigenDA Nodes, which is supported by this Retriever.

    The Relay.GetBlob() (the 1st approach) is generally faster and cheaper as the

    relay manages the blobs that it has processed, whereas the Retriever.RetrieveBlob()

    (the 2nd approach here) removes the need to trust the relay, with the downside of

    worse cost and performance.

    batch_header_hashbytes

    The hash of the ReducedBatchHeader defined onchain, see: -https://github.com/Layr-Labs/eigenda/blob/master/contracts/src/interfaces/IEigenDAServiceManager.sol#L43 -This identifies the batch that this blob belongs to.

    blob_indexuint32blob_headercommon.v2.BlobHeader

    Which blob in the batch this is requesting for (note: a batch is logically an -ordered list of blobs).

    header of the blob to be retrieved

    @@ -297,8 +287,8 @@

    Retriever

    - - + + diff --git a/api/docs/retriever.md b/api/docs/retriever.md index d78b27fe9..27452d2f4 100644 --- a/api/docs/retriever.md +++ b/api/docs/retriever.md @@ -3,24 +3,24 @@ ## Table of Contents -- [retriever/retriever.proto](#retriever_retriever-proto) - - [BlobReply](#retriever-BlobReply) - - [BlobRequest](#retriever-BlobRequest) +- [retriever/v2/retriever.proto](#retriever_v2_retriever-proto) + - [BlobReply](#retriever-v2-BlobReply) + - [BlobRequest](#retriever-v2-BlobRequest) - - [Retriever](#retriever-Retriever) + - [Retriever](#retriever-v2-Retriever) - [Scalar Value Types](#scalar-value-types) - +

    Top

    -## retriever/retriever.proto +## retriever/v2/retriever.proto - + ### BlobReply @@ -35,7 +35,7 @@ - + ### BlobRequest @@ -43,8 +43,7 @@ | Field | Type | Label | Description | | ----- | ---- | ----- | ----------- | -| batch_header_hash | [bytes](#bytes) | | The hash of the ReducedBatchHeader defined onchain, see: https://github.com/Layr-Labs/eigenda/blob/master/contracts/src/interfaces/IEigenDAServiceManager.sol#L43 This identifies the batch that this blob belongs to. | -| blob_index | [uint32](#uint32) | | Which blob in the batch this is requesting for (note: a batch is logically an ordered list of blobs). | +| blob_header | [common.v2.BlobHeader](#common-v2-BlobHeader) | | header of the blob to be retrieved | | reference_block_number | [uint32](#uint32) | | The Ethereum block number at which the batch for this blob was constructed. | | quorum_id | [uint32](#uint32) | | Which quorum of the blob this is requesting for (note a blob can participate in multiple quorums). | @@ -59,26 +58,26 @@ - + ### Retriever The Retriever is a service for retrieving chunks corresponding to a blob from the EigenDA operator nodes and reconstructing the original blob from the chunks. This is a client-side library that the users are supposed to operationalize. -Note: Users generally have two ways to retrieve a blob from EigenDA: - 1) Retrieve from the Disperser that the user initially used for dispersal: the API - is Disperser.RetrieveBlob() as defined in api/proto/disperser/disperser.proto +Note: Users generally have two ways to retrieve a blob from EigenDA V2: + 1) Retrieve from the relay that the blob is assigned to: the API + is Relay.GetBlob() as defined in api/proto/relay/relay.proto 2) Retrieve directly from the EigenDA Nodes, which is supported by this Retriever. -The Disperser.RetrieveBlob() (the 1st approach) is generally faster and cheaper as the -Disperser manages the blobs that it has processed, whereas the Retriever.RetrieveBlob() -(the 2nd approach here) removes the need to trust the Disperser, with the downside of +The Relay.GetBlob() (the 1st approach) is generally faster and cheaper as the +relay manages the blobs that it has processed, whereas the Retriever.RetrieveBlob() +(the 2nd approach here) removes the need to trust the relay, with the downside of worse cost and performance. | Method Name | Request Type | Response Type | Description | | ----------- | ------------ | ------------- | ------------| -| RetrieveBlob | [BlobRequest](#retriever-BlobRequest) | [BlobReply](#retriever-BlobReply) | This fans out request to EigenDA Nodes to retrieve the chunks and returns the reconstructed original blob in response. | +| RetrieveBlob | [BlobRequest](#retriever-v2-BlobRequest) | [BlobReply](#retriever-v2-BlobReply) | This fans out request to EigenDA Nodes to retrieve the chunks and returns the reconstructed original blob in response. | diff --git a/api/grpc/retriever/v2/retriever.pb.go b/api/grpc/retriever/v2/retriever.pb.go new file mode 100644 index 000000000..d9ef75061 --- /dev/null +++ b/api/grpc/retriever/v2/retriever.pb.go @@ -0,0 +1,248 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.28.1 +// protoc v4.23.4 +// source: retriever/v2/retriever.proto + +package v2 + +import ( + v2 "github.com/Layr-Labs/eigenda/api/grpc/common/v2" + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type BlobRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // header of the blob to be retrieved + BlobHeader *v2.BlobHeader `protobuf:"bytes,1,opt,name=blob_header,json=blobHeader,proto3" json:"blob_header,omitempty"` + // The Ethereum block number at which the batch for this blob was constructed. + ReferenceBlockNumber uint32 `protobuf:"varint,2,opt,name=reference_block_number,json=referenceBlockNumber,proto3" json:"reference_block_number,omitempty"` + // Which quorum of the blob this is requesting for (note a blob can participate in + // multiple quorums). + QuorumId uint32 `protobuf:"varint,3,opt,name=quorum_id,json=quorumId,proto3" json:"quorum_id,omitempty"` +} + +func (x *BlobRequest) Reset() { + *x = BlobRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_retriever_v2_retriever_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *BlobRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*BlobRequest) ProtoMessage() {} + +func (x *BlobRequest) ProtoReflect() protoreflect.Message { + mi := &file_retriever_v2_retriever_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use BlobRequest.ProtoReflect.Descriptor instead. +func (*BlobRequest) Descriptor() ([]byte, []int) { + return file_retriever_v2_retriever_proto_rawDescGZIP(), []int{0} +} + +func (x *BlobRequest) GetBlobHeader() *v2.BlobHeader { + if x != nil { + return x.BlobHeader + } + return nil +} + +func (x *BlobRequest) GetReferenceBlockNumber() uint32 { + if x != nil { + return x.ReferenceBlockNumber + } + return 0 +} + +func (x *BlobRequest) GetQuorumId() uint32 { + if x != nil { + return x.QuorumId + } + return 0 +} + +type BlobReply struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // The blob retrieved and reconstructed from the EigenDA Nodes per BlobRequest. + Data []byte `protobuf:"bytes,1,opt,name=data,proto3" json:"data,omitempty"` +} + +func (x *BlobReply) Reset() { + *x = BlobReply{} + if protoimpl.UnsafeEnabled { + mi := &file_retriever_v2_retriever_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *BlobReply) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*BlobReply) ProtoMessage() {} + +func (x *BlobReply) ProtoReflect() protoreflect.Message { + mi := &file_retriever_v2_retriever_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use BlobReply.ProtoReflect.Descriptor instead. +func (*BlobReply) Descriptor() ([]byte, []int) { + return file_retriever_v2_retriever_proto_rawDescGZIP(), []int{1} +} + +func (x *BlobReply) GetData() []byte { + if x != nil { + return x.Data + } + return nil +} + +var File_retriever_v2_retriever_proto protoreflect.FileDescriptor + +var file_retriever_v2_retriever_proto_rawDesc = []byte{ + 0x0a, 0x1c, 0x72, 0x65, 0x74, 0x72, 0x69, 0x65, 0x76, 0x65, 0x72, 0x2f, 0x76, 0x32, 0x2f, 0x72, + 0x65, 0x74, 0x72, 0x69, 0x65, 0x76, 0x65, 0x72, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x0c, + 0x72, 0x65, 0x74, 0x72, 0x69, 0x65, 0x76, 0x65, 0x72, 0x2e, 0x76, 0x32, 0x1a, 0x16, 0x63, 0x6f, + 0x6d, 0x6d, 0x6f, 0x6e, 0x2f, 0x76, 0x32, 0x2f, 0x63, 0x6f, 0x6d, 0x6d, 0x6f, 0x6e, 0x2e, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x98, 0x01, 0x0a, 0x0b, 0x42, 0x6c, 0x6f, 0x62, 0x52, 0x65, 0x71, + 0x75, 0x65, 0x73, 0x74, 0x12, 0x36, 0x0a, 0x0b, 0x62, 0x6c, 0x6f, 0x62, 0x5f, 0x68, 0x65, 0x61, + 0x64, 0x65, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x15, 0x2e, 0x63, 0x6f, 0x6d, 0x6d, + 0x6f, 0x6e, 0x2e, 0x76, 0x32, 0x2e, 0x42, 0x6c, 0x6f, 0x62, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, + 0x52, 0x0a, 0x62, 0x6c, 0x6f, 0x62, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x12, 0x34, 0x0a, 0x16, + 0x72, 0x65, 0x66, 0x65, 0x72, 0x65, 0x6e, 0x63, 0x65, 0x5f, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x5f, + 0x6e, 0x75, 0x6d, 0x62, 0x65, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x14, 0x72, 0x65, + 0x66, 0x65, 0x72, 0x65, 0x6e, 0x63, 0x65, 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x4e, 0x75, 0x6d, 0x62, + 0x65, 0x72, 0x12, 0x1b, 0x0a, 0x09, 0x71, 0x75, 0x6f, 0x72, 0x75, 0x6d, 0x5f, 0x69, 0x64, 0x18, + 0x03, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x08, 0x71, 0x75, 0x6f, 0x72, 0x75, 0x6d, 0x49, 0x64, 0x22, + 0x1f, 0x0a, 0x09, 0x42, 0x6c, 0x6f, 0x62, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x12, 0x12, 0x0a, 0x04, + 0x64, 0x61, 0x74, 0x61, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, + 0x32, 0x51, 0x0a, 0x09, 0x52, 0x65, 0x74, 0x72, 0x69, 0x65, 0x76, 0x65, 0x72, 0x12, 0x44, 0x0a, + 0x0c, 0x52, 0x65, 0x74, 0x72, 0x69, 0x65, 0x76, 0x65, 0x42, 0x6c, 0x6f, 0x62, 0x12, 0x19, 0x2e, + 0x72, 0x65, 0x74, 0x72, 0x69, 0x65, 0x76, 0x65, 0x72, 0x2e, 0x76, 0x32, 0x2e, 0x42, 0x6c, 0x6f, + 0x62, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x17, 0x2e, 0x72, 0x65, 0x74, 0x72, 0x69, + 0x65, 0x76, 0x65, 0x72, 0x2e, 0x76, 0x32, 0x2e, 0x42, 0x6c, 0x6f, 0x62, 0x52, 0x65, 0x70, 0x6c, + 0x79, 0x22, 0x00, 0x42, 0x34, 0x5a, 0x32, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, + 0x6d, 0x2f, 0x4c, 0x61, 0x79, 0x72, 0x2d, 0x4c, 0x61, 0x62, 0x73, 0x2f, 0x65, 0x69, 0x67, 0x65, + 0x6e, 0x64, 0x61, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x67, 0x72, 0x70, 0x63, 0x2f, 0x72, 0x65, 0x74, + 0x72, 0x69, 0x65, 0x76, 0x65, 0x72, 0x2f, 0x76, 0x32, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x33, +} + +var ( + file_retriever_v2_retriever_proto_rawDescOnce sync.Once + file_retriever_v2_retriever_proto_rawDescData = file_retriever_v2_retriever_proto_rawDesc +) + +func file_retriever_v2_retriever_proto_rawDescGZIP() []byte { + file_retriever_v2_retriever_proto_rawDescOnce.Do(func() { + file_retriever_v2_retriever_proto_rawDescData = protoimpl.X.CompressGZIP(file_retriever_v2_retriever_proto_rawDescData) + }) + return file_retriever_v2_retriever_proto_rawDescData +} + +var file_retriever_v2_retriever_proto_msgTypes = make([]protoimpl.MessageInfo, 2) +var file_retriever_v2_retriever_proto_goTypes = []interface{}{ + (*BlobRequest)(nil), // 0: retriever.v2.BlobRequest + (*BlobReply)(nil), // 1: retriever.v2.BlobReply + (*v2.BlobHeader)(nil), // 2: common.v2.BlobHeader +} +var file_retriever_v2_retriever_proto_depIdxs = []int32{ + 2, // 0: retriever.v2.BlobRequest.blob_header:type_name -> common.v2.BlobHeader + 0, // 1: retriever.v2.Retriever.RetrieveBlob:input_type -> retriever.v2.BlobRequest + 1, // 2: retriever.v2.Retriever.RetrieveBlob:output_type -> retriever.v2.BlobReply + 2, // [2:3] is the sub-list for method output_type + 1, // [1:2] is the sub-list for method input_type + 1, // [1:1] is the sub-list for extension type_name + 1, // [1:1] is the sub-list for extension extendee + 0, // [0:1] is the sub-list for field type_name +} + +func init() { file_retriever_v2_retriever_proto_init() } +func file_retriever_v2_retriever_proto_init() { + if File_retriever_v2_retriever_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_retriever_v2_retriever_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*BlobRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_retriever_v2_retriever_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*BlobReply); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_retriever_v2_retriever_proto_rawDesc, + NumEnums: 0, + NumMessages: 2, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_retriever_v2_retriever_proto_goTypes, + DependencyIndexes: file_retriever_v2_retriever_proto_depIdxs, + MessageInfos: file_retriever_v2_retriever_proto_msgTypes, + }.Build() + File_retriever_v2_retriever_proto = out.File + file_retriever_v2_retriever_proto_rawDesc = nil + file_retriever_v2_retriever_proto_goTypes = nil + file_retriever_v2_retriever_proto_depIdxs = nil +} diff --git a/api/grpc/retriever/v2/retriever_grpc.pb.go b/api/grpc/retriever/v2/retriever_grpc.pb.go new file mode 100644 index 000000000..31fa39a90 --- /dev/null +++ b/api/grpc/retriever/v2/retriever_grpc.pb.go @@ -0,0 +1,113 @@ +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. +// versions: +// - protoc-gen-go-grpc v1.3.0 +// - protoc v4.23.4 +// source: retriever/v2/retriever.proto + +package v2 + +import ( + context "context" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +// Requires gRPC-Go v1.32.0 or later. +const _ = grpc.SupportPackageIsVersion7 + +const ( + Retriever_RetrieveBlob_FullMethodName = "/retriever.v2.Retriever/RetrieveBlob" +) + +// RetrieverClient is the client API for Retriever service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +type RetrieverClient interface { + // This fans out request to EigenDA Nodes to retrieve the chunks and returns the + // reconstructed original blob in response. + RetrieveBlob(ctx context.Context, in *BlobRequest, opts ...grpc.CallOption) (*BlobReply, error) +} + +type retrieverClient struct { + cc grpc.ClientConnInterface +} + +func NewRetrieverClient(cc grpc.ClientConnInterface) RetrieverClient { + return &retrieverClient{cc} +} + +func (c *retrieverClient) RetrieveBlob(ctx context.Context, in *BlobRequest, opts ...grpc.CallOption) (*BlobReply, error) { + out := new(BlobReply) + err := c.cc.Invoke(ctx, Retriever_RetrieveBlob_FullMethodName, in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// RetrieverServer is the server API for Retriever service. +// All implementations must embed UnimplementedRetrieverServer +// for forward compatibility +type RetrieverServer interface { + // This fans out request to EigenDA Nodes to retrieve the chunks and returns the + // reconstructed original blob in response. + RetrieveBlob(context.Context, *BlobRequest) (*BlobReply, error) + mustEmbedUnimplementedRetrieverServer() +} + +// UnimplementedRetrieverServer must be embedded to have forward compatible implementations. +type UnimplementedRetrieverServer struct { +} + +func (UnimplementedRetrieverServer) RetrieveBlob(context.Context, *BlobRequest) (*BlobReply, error) { + return nil, status.Errorf(codes.Unimplemented, "method RetrieveBlob not implemented") +} +func (UnimplementedRetrieverServer) mustEmbedUnimplementedRetrieverServer() {} + +// UnsafeRetrieverServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to RetrieverServer will +// result in compilation errors. +type UnsafeRetrieverServer interface { + mustEmbedUnimplementedRetrieverServer() +} + +func RegisterRetrieverServer(s grpc.ServiceRegistrar, srv RetrieverServer) { + s.RegisterService(&Retriever_ServiceDesc, srv) +} + +func _Retriever_RetrieveBlob_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(BlobRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(RetrieverServer).RetrieveBlob(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: Retriever_RetrieveBlob_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(RetrieverServer).RetrieveBlob(ctx, req.(*BlobRequest)) + } + return interceptor(ctx, in, info, handler) +} + +// Retriever_ServiceDesc is the grpc.ServiceDesc for Retriever service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var Retriever_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "retriever.v2.Retriever", + HandlerType: (*RetrieverServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "RetrieveBlob", + Handler: _Retriever_RetrieveBlob_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, + Metadata: "retriever/v2/retriever.proto", +} diff --git a/api/proto/retriever/v2/retriever.proto b/api/proto/retriever/v2/retriever.proto new file mode 100644 index 000000000..87a758ec1 --- /dev/null +++ b/api/proto/retriever/v2/retriever.proto @@ -0,0 +1,39 @@ +syntax = "proto3"; + +import "common/v2/common.proto"; +option go_package = "github.com/Layr-Labs/eigenda/api/grpc/retriever/v2"; +package retriever.v2; + +// The Retriever is a service for retrieving chunks corresponding to a blob from +// the EigenDA operator nodes and reconstructing the original blob from the chunks. +// This is a client-side library that the users are supposed to operationalize. +// +// Note: Users generally have two ways to retrieve a blob from EigenDA V2: +// 1) Retrieve from the relay that the blob is assigned to: the API +// is Relay.GetBlob() as defined in api/proto/relay/relay.proto +// 2) Retrieve directly from the EigenDA Nodes, which is supported by this Retriever. +// +// The Relay.GetBlob() (the 1st approach) is generally faster and cheaper as the +// relay manages the blobs that it has processed, whereas the Retriever.RetrieveBlob() +// (the 2nd approach here) removes the need to trust the relay, with the downside of +// worse cost and performance. +service Retriever { + // This fans out request to EigenDA Nodes to retrieve the chunks and returns the + // reconstructed original blob in response. + rpc RetrieveBlob(BlobRequest) returns (BlobReply) {} +} + +message BlobRequest { + // header of the blob to be retrieved + common.v2.BlobHeader blob_header = 1; + // The Ethereum block number at which the batch for this blob was constructed. + uint32 reference_block_number = 2; + // Which quorum of the blob this is requesting for (note a blob can participate in + // multiple quorums). + uint32 quorum_id = 3; +} + +message BlobReply { + // The blob retrieved and reconstructed from the EigenDA Nodes per BlobRequest. + bytes data = 1; +} diff --git a/core/data.go b/core/data.go index 367faad32..d68e598a8 100644 --- a/core/data.go +++ b/core/data.go @@ -596,6 +596,10 @@ func (pm *PaymentMetadata) ToProtobuf() *commonpb.PaymentHeader { // ConvertToProtoPaymentHeader converts a PaymentMetadata to a protobuf payment header func ConvertToPaymentMetadata(ph *commonpb.PaymentHeader) *PaymentMetadata { + if ph == nil { + return nil + } + return &PaymentMetadata{ AccountID: ph.AccountId, ReservationPeriod: ph.ReservationPeriod, diff --git a/core/v2/types.go b/core/v2/types.go index 3ff941108..41f58c677 100644 --- a/core/v2/types.go +++ b/core/v2/types.go @@ -109,6 +109,9 @@ func BlobHeaderFromProtobuf(proto *commonpb.BlobHeader) (*BlobHeader, error) { } paymentMetadata := core.ConvertToPaymentMetadata(proto.GetPaymentHeader()) + if paymentMetadata == nil { + return nil, errors.New("payment metadata is nil") + } return &BlobHeader{ BlobVersion: BlobVersion(proto.GetVersion()), diff --git a/retriever/cmd/main.go b/retriever/cmd/main.go index 8ea7b0429..bfa963a80 100644 --- a/retriever/cmd/main.go +++ b/retriever/cmd/main.go @@ -2,26 +2,28 @@ package main import ( "context" + "errors" "fmt" "log" "net" "os" "github.com/Layr-Labs/eigenda/api/clients" + clientsv2 "github.com/Layr-Labs/eigenda/api/clients/v2" pb "github.com/Layr-Labs/eigenda/api/grpc/retriever" + pbv2 "github.com/Layr-Labs/eigenda/api/grpc/retriever/v2" "github.com/Layr-Labs/eigenda/common" "github.com/Layr-Labs/eigenda/common/geth" "github.com/Layr-Labs/eigenda/common/healthcheck" "github.com/Layr-Labs/eigenda/core" "github.com/Layr-Labs/eigenda/core/eth" - coreindexer "github.com/Layr-Labs/eigenda/core/indexer" "github.com/Layr-Labs/eigenda/core/thegraph" "github.com/Layr-Labs/eigenda/encoding/kzg/verifier" "github.com/Layr-Labs/eigenda/retriever" retrivereth "github.com/Layr-Labs/eigenda/retriever/eth" "github.com/Layr-Labs/eigenda/retriever/flags" + retrieverv2 "github.com/Layr-Labs/eigenda/retriever/v2" gethcommon "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/rpc" "github.com/urfave/cli" "google.golang.org/grpc" "google.golang.org/grpc/reflection" @@ -89,69 +91,65 @@ func RetrieverMain(ctx *cli.Context) error { log.Fatalln("could not start tcp listener", err) } - // TODO(ian-shim): uncomment when https://github.com/Layr-Labs/eigenda-internal/issues/77 is done - // store, err := leveldb.NewHeaderStore(config.IndexerDataDir) - // if err != nil { - // return err - // } - tx, err := eth.NewReader(logger, gethClient, config.BLSOperatorStateRetrieverAddr, config.EigenDAServiceManagerAddr) if err != nil { log.Fatalln("could not start tcp listener", err) } cs := eth.NewChainState(tx, gethClient) - rpcClient, err := rpc.Dial(config.EthClientConfig.RPCURLs[0]) if err != nil { log.Fatalln("could not start tcp listener", err) } - var ics core.IndexedChainState - if config.UseGraph { - logger.Info("Using graph node") - - logger.Info("Connecting to subgraph", "url", config.ChainStateConfig.Endpoint) - ics = thegraph.MakeIndexedChainState(config.ChainStateConfig, cs, logger) - } else { - logger.Info("Using built-in indexer") - - indexer, err := coreindexer.CreateNewIndexer( - &config.IndexerConfig, - gethClient, - rpcClient, - config.EigenDAServiceManagerAddr, - logger, - ) + logger.Info("Connecting to subgraph", "url", config.ChainStateConfig.Endpoint) + ics := thegraph.MakeIndexedChainState(config.ChainStateConfig, cs, logger) + + if config.EigenDAVersion == 1 { + agn := &core.StdAssignmentCoordinator{} + retrievalClient, err := clients.NewRetrievalClient(logger, ics, agn, nodeClient, v, config.NumConnections) if err != nil { - return err + log.Fatalln("could not start tcp listener", err) } - ics, err = coreindexer.NewIndexedChainState(cs, indexer) - if err != nil { - return err + + chainClient := retrivereth.NewChainClient(gethClient, logger) + retrieverServiceServer := retriever.NewServer(config, logger, retrievalClient, ics, chainClient) + if err = retrieverServiceServer.Start(context.Background()); err != nil { + log.Fatalln("failed to start retriever service server", err) } - } - agn := &core.StdAssignmentCoordinator{} - retrievalClient, err := clients.NewRetrievalClient(logger, ics, agn, nodeClient, v, config.NumConnections) - if err != nil { - log.Fatalln("could not start tcp listener", err) - } + // Register reflection service on gRPC server + // This makes "grpcurl -plaintext localhost:9000 list" command work + reflection.Register(gs) + + pb.RegisterRetrieverServer(gs, retrieverServiceServer) - chainClient := retrivereth.NewChainClient(gethClient, logger) - retrieverServiceServer := retriever.NewServer(config, logger, retrievalClient, ics, chainClient) - if err = retrieverServiceServer.Start(context.Background()); err != nil { - log.Fatalln("failed to start retriever service server", err) + // Register Server for Health Checks + name := pb.Retriever_ServiceDesc.ServiceName + healthcheck.RegisterHealthServer(name, gs) + + log.Printf("server listening at %s", addr) + return gs.Serve(listener) } - // Register reflection service on gRPC server - // This makes "grpcurl -plaintext localhost:9000 list" command work - reflection.Register(gs) + if config.EigenDAVersion == 2 { + retrievalClient := clientsv2.NewRetrievalClient(logger, tx, ics, v, config.NumConnections) + retrieverServiceServer := retrieverv2.NewServer(config, logger, retrievalClient, ics) + if err = retrieverServiceServer.Start(context.Background()); err != nil { + log.Fatalln("failed to start retriever service server", err) + } - pb.RegisterRetrieverServer(gs, retrieverServiceServer) + // Register reflection service on gRPC server + // This makes "grpcurl -plaintext localhost:9000 list" command work + reflection.Register(gs) - // Register Server for Health Checks - name := pb.Retriever_ServiceDesc.ServiceName - healthcheck.RegisterHealthServer(name, gs) + pbv2.RegisterRetrieverServer(gs, retrieverServiceServer) + + // Register Server for Health Checks + name := pb.Retriever_ServiceDesc.ServiceName + healthcheck.RegisterHealthServer(name, gs) + + log.Printf("server listening at %s", addr) + return gs.Serve(listener) + } - log.Printf("server listening at %s", addr) - return gs.Serve(listener) + return errors.New("invalid EigenDA version") } diff --git a/retriever/config.go b/retriever/config.go index c31798095..6476fa353 100644 --- a/retriever/config.go +++ b/retriever/config.go @@ -1,13 +1,13 @@ package retriever import ( + "errors" "time" "github.com/Layr-Labs/eigenda/common" "github.com/Layr-Labs/eigenda/common/geth" "github.com/Layr-Labs/eigenda/core/thegraph" "github.com/Layr-Labs/eigenda/encoding/kzg" - "github.com/Layr-Labs/eigenda/indexer" "github.com/Layr-Labs/eigenda/retriever/flags" "github.com/urfave/cli" ) @@ -16,44 +16,39 @@ type Config struct { EncoderConfig kzg.KzgConfig EthClientConfig geth.EthClientConfig LoggerConfig common.LoggerConfig - IndexerConfig indexer.Config MetricsConfig MetricsConfig ChainStateConfig thegraph.Config - IndexerDataDir string Timeout time.Duration NumConnections int BLSOperatorStateRetrieverAddr string EigenDAServiceManagerAddr string - UseGraph bool + + EigenDAVersion int } -func ReadRetrieverConfig(ctx *cli.Context) *Config { +func NewConfig(ctx *cli.Context) (*Config, error) { + version := ctx.GlobalInt(flags.EigenDAVersionFlag.Name) + if version != 1 && version != 2 { + return nil, errors.New("invalid EigenDA version") + } + loggerConfig, err := common.ReadLoggerCLIConfig(ctx, flags.FlagPrefix) + if err != nil { + return nil, err + } + return &Config{ + LoggerConfig: *loggerConfig, EncoderConfig: kzg.ReadCLIConfig(ctx), EthClientConfig: geth.ReadEthClientConfig(ctx), - IndexerConfig: indexer.ReadIndexerConfig(ctx), MetricsConfig: MetricsConfig{ HTTPPort: ctx.GlobalString(flags.MetricsHTTPPortFlag.Name), }, ChainStateConfig: thegraph.ReadCLIConfig(ctx), - IndexerDataDir: ctx.GlobalString(flags.IndexerDataDirFlag.Name), Timeout: ctx.Duration(flags.TimeoutFlag.Name), NumConnections: ctx.Int(flags.NumConnectionsFlag.Name), BLSOperatorStateRetrieverAddr: ctx.GlobalString(flags.BlsOperatorStateRetrieverFlag.Name), EigenDAServiceManagerAddr: ctx.GlobalString(flags.EigenDAServiceManagerFlag.Name), - UseGraph: ctx.GlobalBool(flags.UseGraphFlag.Name), - } -} - -func NewConfig(ctx *cli.Context) (*Config, error) { - loggerConfig, err := common.ReadLoggerCLIConfig(ctx, flags.FlagPrefix) - if err != nil { - return nil, err - } - - config := ReadRetrieverConfig(ctx) - config.LoggerConfig = *loggerConfig - - return config, nil + EigenDAVersion: version, + }, nil } diff --git a/retriever/flags/flags.go b/retriever/flags/flags.go index c8f1276a7..e8f07cba8 100644 --- a/retriever/flags/flags.go +++ b/retriever/flags/flags.go @@ -5,7 +5,6 @@ import ( "github.com/Layr-Labs/eigenda/common/geth" "github.com/Layr-Labs/eigenda/core/thegraph" "github.com/Layr-Labs/eigenda/encoding/kzg" - "github.com/Layr-Labs/eigenda/indexer" "github.com/urfave/cli" ) @@ -55,12 +54,6 @@ var ( EnvVar: common.PrefixEnvVar(envPrefix, "NUM_CONNECTIONS"), Value: 20, } - IndexerDataDirFlag = cli.StringFlag{ - Name: common.PrefixFlag(FlagPrefix, "indexer-data-dir"), - Usage: "the data directory for the indexer", - EnvVar: common.PrefixEnvVar(envPrefix, "DATA_DIR"), - Value: "./data/retriever", - } MetricsHTTPPortFlag = cli.StringFlag{ Name: common.PrefixFlag(FlagPrefix, "metrics-http-port"), Usage: "the http port which the metrics prometheus server is listening", @@ -68,11 +61,12 @@ var ( Value: "9100", EnvVar: common.PrefixEnvVar(envPrefix, "METRICS_HTTP_PORT"), } - UseGraphFlag = cli.BoolFlag{ - Name: common.PrefixFlag(FlagPrefix, "use-graph"), - Usage: "Whether to use the graph node", + EigenDAVersionFlag = cli.IntFlag{ + Name: common.PrefixFlag(FlagPrefix, "eigenda-version"), + Usage: "EigenDA version: currently supports 1 and 2", Required: false, - EnvVar: common.PrefixEnvVar(envPrefix, "USE_GRAPH"), + EnvVar: common.PrefixEnvVar(envPrefix, "EIGENDA_VERSION"), + Value: 1, } ) @@ -84,9 +78,8 @@ func RetrieverFlags(envPrefix string) []cli.Flag { BlsOperatorStateRetrieverFlag, EigenDAServiceManagerFlag, NumConnectionsFlag, - IndexerDataDirFlag, MetricsHTTPPortFlag, - UseGraphFlag, + EigenDAVersionFlag, } } @@ -98,6 +91,5 @@ func init() { Flags = append(Flags, kzg.CLIFlags(envPrefix)...) Flags = append(Flags, geth.EthClientFlags(envPrefix)...) Flags = append(Flags, common.LoggerCLIFlags(envPrefix, FlagPrefix)...) - Flags = append(Flags, indexer.CLIFlags(envPrefix)...) Flags = append(Flags, thegraph.CLIFlags(envPrefix)...) } diff --git a/retriever/v2/server.go b/retriever/v2/server.go new file mode 100644 index 000000000..f212a68ce --- /dev/null +++ b/retriever/v2/server.go @@ -0,0 +1,85 @@ +package v2 + +import ( + "bytes" + "context" + "encoding/hex" + "errors" + + "github.com/Layr-Labs/eigenda/api/clients/v2" + pb "github.com/Layr-Labs/eigenda/api/grpc/retriever/v2" + "github.com/Layr-Labs/eigenda/core" + corev2 "github.com/Layr-Labs/eigenda/core/v2" + "github.com/Layr-Labs/eigenda/encoding/utils/codec" + "github.com/Layr-Labs/eigenda/retriever" + "github.com/Layr-Labs/eigensdk-go/logging" +) + +type Config = retriever.Config + +type Server struct { + pb.UnimplementedRetrieverServer + + config *Config + retrievalClient clients.RetrievalClient + indexedState core.IndexedChainState + logger logging.Logger + metrics *retriever.Metrics +} + +func NewServer( + config *Config, + logger logging.Logger, + retrievalClient clients.RetrievalClient, + indexedState core.IndexedChainState, +) *Server { + metrics := retriever.NewMetrics(config.MetricsConfig.HTTPPort, logger) + + return &Server{ + config: config, + retrievalClient: retrievalClient, + indexedState: indexedState, + logger: logger.With("component", "RetrieverServer"), + metrics: metrics, + } +} + +func (s *Server) Start(ctx context.Context) error { + s.metrics.Start(ctx) + return s.indexedState.Start(ctx) +} + +func (s *Server) RetrieveBlob(ctx context.Context, req *pb.BlobRequest) (*pb.BlobReply, error) { + if req.GetBlobHeader() == nil { + return nil, errors.New("blob header is nil") + } + if req.GetReferenceBlockNumber() == 0 { + return nil, errors.New("reference block number is 0") + } + + blobHeader, err := corev2.BlobHeaderFromProtobuf(req.GetBlobHeader()) + if err != nil { + return nil, err + } + + blobKey, err := blobHeader.BlobKey() + if err != nil { + return nil, err + } + + s.logger.Info("Received request: ", "blobKey", hex.EncodeToString(blobKey[:]), "referenceBlockNumber", req.GetReferenceBlockNumber(), "quorumId", req.GetQuorumId()) + s.metrics.IncrementRetrievalRequestCounter() + + ctxWithTimeout, cancel := context.WithTimeout(ctx, s.config.Timeout) + defer cancel() + data, err := s.retrievalClient.GetBlob(ctxWithTimeout, blobHeader, uint64(req.GetReferenceBlockNumber()), core.QuorumID(req.GetQuorumId())) + if err != nil { + return nil, err + } + restored := bytes.TrimRight(data, "\x00") + restored = codec.RemoveEmptyByteFromPaddedBytes(restored) + + return &pb.BlobReply{ + Data: restored, + }, nil +} diff --git a/retriever/v2/server_test.go b/retriever/v2/server_test.go new file mode 100644 index 000000000..5585c9d4f --- /dev/null +++ b/retriever/v2/server_test.go @@ -0,0 +1,134 @@ +package v2_test + +import ( + "context" + "math/big" + "runtime" + "testing" + + clientsmock "github.com/Layr-Labs/eigenda/api/clients/v2/mock" + commonpb "github.com/Layr-Labs/eigenda/api/grpc/common" + commonpbv2 "github.com/Layr-Labs/eigenda/api/grpc/common/v2" + pb "github.com/Layr-Labs/eigenda/api/grpc/retriever/v2" + "github.com/Layr-Labs/eigenda/core" + coremock "github.com/Layr-Labs/eigenda/core/mock" + "github.com/Layr-Labs/eigenda/encoding" + "github.com/Layr-Labs/eigenda/encoding/kzg" + "github.com/Layr-Labs/eigenda/encoding/kzg/prover" + "github.com/Layr-Labs/eigenda/encoding/kzg/verifier" + "github.com/Layr-Labs/eigenda/encoding/utils/codec" + "github.com/Layr-Labs/eigenda/retriever/mock" + retriever "github.com/Layr-Labs/eigenda/retriever/v2" + "github.com/Layr-Labs/eigensdk-go/logging" + "github.com/consensys/gnark-crypto/ecc/bn254" + "github.com/consensys/gnark-crypto/ecc/bn254/fp" + "github.com/stretchr/testify/require" +) + +const numOperators = 10 + +var ( + indexedChainState core.IndexedChainState + retrievalClient *clientsmock.MockRetrievalClient + chainClient *mock.MockChainClient + gettysburgAddressBytes = []byte("Fourscore and seven years ago our fathers brought forth, on this continent, a new nation, conceived in liberty, and dedicated to the proposition that all men are created equal. Now we are engaged in a great civil war, testing whether that nation, or any nation so conceived, and so dedicated, can long endure. We are met on a great battle-field of that war. We have come to dedicate a portion of that field, as a final resting-place for those who here gave their lives, that that nation might live. It is altogether fitting and proper that we should do this. But, in a larger sense, we cannot dedicate, we cannot consecrate—we cannot hallow—this ground. The brave men, living and dead, who struggled here, have consecrated it far above our poor power to add or detract. The world will little note, nor long remember what we say here, but it can never forget what they did here. It is for us the living, rather, to be dedicated here to the unfinished work which they who fought here have thus far so nobly advanced. It is rather for us to be here dedicated to the great task remaining before us—that from these honored dead we take increased devotion to that cause for which they here gave the last full measure of devotion—that we here highly resolve that these dead shall not have died in vain—that this nation, under God, shall have a new birth of freedom, and that government of the people, by the people, for the people, shall not perish from the earth.") +) + +func makeTestComponents() (encoding.Prover, encoding.Verifier, error) { + config := &kzg.KzgConfig{ + G1Path: "../../inabox/resources/kzg/g1.point", + G2Path: "../../inabox/resources/kzg/g2.point", + CacheDir: "../../inabox/resources/kzg/SRSTables", + SRSOrder: 3000, + SRSNumberToLoad: 3000, + NumWorker: uint64(runtime.GOMAXPROCS(0)), + LoadG2Points: true, + } + + p, err := prover.NewProver(config, nil) + if err != nil { + return nil, nil, err + } + + v, err := verifier.NewVerifier(config, nil) + if err != nil { + return nil, nil, err + } + + return p, v, nil +} + +func newTestServer(t *testing.T) *retriever.Server { + var err error + config := &retriever.Config{} + + logger := logging.NewNoopLogger() + + indexedChainState, err = coremock.MakeChainDataMock(map[uint8]int{ + 0: numOperators, + 1: numOperators, + 2: numOperators, + }) + require.NoError(t, err) + + _, _, err = makeTestComponents() + require.NoError(t, err) + + retrievalClient = &clientsmock.MockRetrievalClient{} + chainClient = mock.NewMockChainClient() + return retriever.NewServer(config, logger, retrievalClient, indexedChainState) +} + +func TestRetrieveBlob(t *testing.T) { + server := newTestServer(t) + data := codec.ConvertByPaddingEmptyByte(gettysburgAddressBytes) + retrievalClient.On("GetBlob").Return(data, nil) + + var X1, Y1 fp.Element + X1 = *X1.SetBigInt(big.NewInt(1)) + Y1 = *Y1.SetBigInt(big.NewInt(2)) + + var lengthXA0, lengthXA1, lengthYA0, lengthYA1 fp.Element + _, err := lengthXA0.SetString("10857046999023057135944570762232829481370756359578518086990519993285655852781") + require.NoError(t, err) + _, err = lengthXA1.SetString("11559732032986387107991004021392285783925812861821192530917403151452391805634") + require.NoError(t, err) + _, err = lengthYA0.SetString("8495653923123431417604973247489272438418190587263600148770280649306958101930") + require.NoError(t, err) + _, err = lengthYA1.SetString("4082367875863433681332203403145435568316851327593401208105741076214120093531") + require.NoError(t, err) + + var lengthProof, lengthCommitment bn254.G2Affine + lengthProof.X.A0 = lengthXA0 + lengthProof.X.A1 = lengthXA1 + lengthProof.Y.A0 = lengthYA0 + lengthProof.Y.A1 = lengthYA1 + + lengthCommitment = lengthProof + + mockCommitment := encoding.BlobCommitments{ + Commitment: &encoding.G1Commitment{ + X: X1, + Y: Y1, + }, + LengthCommitment: (*encoding.G2Commitment)(&lengthCommitment), + LengthProof: (*encoding.G2Commitment)(&lengthProof), + Length: 16, + } + c, err := mockCommitment.ToProtobuf() + require.NoError(t, err) + retrievalReply, err := server.RetrieveBlob(context.Background(), &pb.BlobRequest{ + BlobHeader: &commonpbv2.BlobHeader{ + Version: 0, + QuorumNumbers: []uint32{0}, + Commitment: c, + PaymentHeader: &commonpb.PaymentHeader{ + AccountId: "account_id", + }, + }, + ReferenceBlockNumber: 100, + QuorumId: 0, + }) + require.NoError(t, err) + require.Equal(t, gettysburgAddressBytes, retrievalReply.Data) +} diff --git a/tools/traffic/config/config.go b/tools/traffic/config/config.go index 2b147f1a4..9702c7be8 100644 --- a/tools/traffic/config/config.go +++ b/tools/traffic/config/config.go @@ -3,10 +3,11 @@ package config import ( "errors" "fmt" + "time" + "github.com/Layr-Labs/eigenda/api/clients" "github.com/Layr-Labs/eigenda/core/thegraph" "github.com/Layr-Labs/eigenda/retriever" - "time" "github.com/Layr-Labs/eigenda/common" "github.com/urfave/cli" @@ -55,7 +56,10 @@ func NewConfig(ctx *cli.Context) (*Config, error) { customQuorumsUint8[i] = uint8(q) } - retrieverConfig := retriever.ReadRetrieverConfig(ctx) + retrieverConfig, err := retriever.NewConfig(ctx) + if err != nil { + return nil, err + } config := &Config{ DisperserClientConfig: &clients.Config{ From 5aace774c1c7e651e69517a58b3bf2cb3e3befb1 Mon Sep 17 00:00:00 2001 From: Ian Shim <100327837+ian-shim@users.noreply.github.com> Date: Wed, 8 Jan 2025 06:10:36 +0900 Subject: [PATCH 5/6] Compress relay auth signature (#1068) --- api/clients/v2/relay_client.go | 6 ++++-- core/attestation.go | 4 ++++ 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/api/clients/v2/relay_client.go b/api/clients/v2/relay_client.go index d6be36064..00ad65639 100644 --- a/api/clients/v2/relay_client.go +++ b/api/clients/v2/relay_client.go @@ -4,9 +4,10 @@ import ( "context" "errors" "fmt" + "sync" + "github.com/Layr-Labs/eigenda/core" "github.com/Layr-Labs/eigenda/relay/auth" - "sync" relaygrpc "github.com/Layr-Labs/eigenda/api/grpc/relay" corev2 "github.com/Layr-Labs/eigenda/core/v2" @@ -122,7 +123,8 @@ func (c *relayClient) signGetChunksRequest(ctx context.Context, request *relaygr if err != nil { return fmt.Errorf("failed to sign get chunks request: %v", err) } - request.OperatorSignature = signature.Serialize() + sig := signature.SerializeCompressed() + request.OperatorSignature = sig[:] return nil } diff --git a/core/attestation.go b/core/attestation.go index b7ac39036..a18b9f1d9 100644 --- a/core/attestation.go +++ b/core/attestation.go @@ -47,6 +47,10 @@ func (p *G1Point) VerifyEquivalence(p2 *G2Point) (bool, error) { return bn254utils.CheckG1AndG2DiscreteLogEquality(p.G1Affine, p2.G2Affine) } +func (p *G1Point) SerializeCompressed() [32]byte { + return p.Bytes() +} + func (p *G1Point) Serialize() []byte { res := p.RawBytes() return res[:] From c31aea77990d6080e3acbde8e4f2652f3d7a0deb Mon Sep 17 00:00:00 2001 From: hopeyen <60078528+hopeyen@users.noreply.github.com> Date: Tue, 7 Jan 2025 15:11:33 -0600 Subject: [PATCH 6/6] feat: add basic logging in disperser meterer (#1069) --- core/meterer/meterer.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/core/meterer/meterer.go b/core/meterer/meterer.go index f9b9dfd3b..55c271211 100644 --- a/core/meterer/meterer.go +++ b/core/meterer/meterer.go @@ -74,6 +74,7 @@ func (m *Meterer) Start(ctx context.Context) { // TODO: return error if there's a rejection (with reasoning) or internal error (should be very rare) func (m *Meterer) MeterRequest(ctx context.Context, header core.PaymentMetadata, numSymbols uint, quorumNumbers []uint8) error { accountID := gethcommon.HexToAddress(header.AccountID) + m.logger.Info("Validating incoming request's payment metadata", "paymentMetadata", header, "numSymbols", numSymbols, "quorumNumbers", quorumNumbers) // Validate against the payment method if header.CumulativePayment.Sign() == 0 { reservation, err := m.ChainPaymentState.GetReservedPaymentByAccount(ctx, accountID) @@ -98,6 +99,7 @@ func (m *Meterer) MeterRequest(ctx context.Context, header core.PaymentMetadata, // ServeReservationRequest handles the rate limiting logic for incoming requests func (m *Meterer) ServeReservationRequest(ctx context.Context, header core.PaymentMetadata, reservation *core.ReservedPayment, numSymbols uint, quorumNumbers []uint8) error { + m.logger.Info("Recording and validating reservation usage", "header", header, "reservation", reservation) if !reservation.IsActive(uint64(time.Now().Unix())) { return fmt.Errorf("reservation not active") } @@ -186,6 +188,7 @@ func GetReservationPeriod(timestamp uint64, binInterval uint32) uint32 { // On-demand requests doesn't have additional quorum settings and should only be // allowed by ETH and EIGEN quorums func (m *Meterer) ServeOnDemandRequest(ctx context.Context, header core.PaymentMetadata, onDemandPayment *core.OnDemandPayment, numSymbols uint, headerQuorums []uint8) error { + m.logger.Info("Recording and validating on-demand usage", "header", header, "onDemandPayment", onDemandPayment) quorumNumbers, err := m.ChainPaymentState.GetOnDemandQuorumNumbers(ctx) if err != nil { return fmt.Errorf("failed to get on-demand quorum numbers: %w", err)
    Method NameRequest TypeResponse TypeDescription
    RetrieveBlobBlobRequestBlobReplyBlobRequestBlobReply

    This fans out request to EigenDA Nodes to retrieve the chunks and returns the reconstructed original blob in response.