Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement the RPCBlockHeaderSubscriber for indexing finalized results #728

Open
wants to merge 6 commits into
base: main
Choose a base branch
from

Conversation

m-Peter
Copy link
Collaborator

@m-Peter m-Peter commented Jan 21, 2025

Work Towards: #727

Description


For contributor use:

  • Targeted PR against master branch
  • Linked to Github issue with discussion and accepted design OR link to spec that describes this work.
  • Code follows the standards mentioned here.
  • Updated relevant documentation
  • Re-reviewed Files changed in the Github PR explorer
  • Added appropriate labels

Summary by CodeRabbit

  • New Features

    • Added Makefile targets for starting testnet and mainnet environments.
    • Introduced a new block tracking subscriber for event ingestion.
    • Added a method for subscribing to block headers from a specified height.
  • Dependency Updates

    • Upgraded multiple dependencies, including Cadence, Flow Go SDK, and various Google and OpenTelemetry packages.
    • Updated versions of testing and utility libraries.
  • Performance Improvements

    • Enhanced event subscription and tracking mechanisms.

@m-Peter m-Peter self-assigned this Jan 21, 2025
Copy link
Contributor

coderabbitai bot commented Jan 21, 2025

Walkthrough

This pull request introduces several enhancements to the Flow EVM Gateway, focusing on event ingestion and deployment processes. The changes include updating the Makefile with new targets for testnet and mainnet environments, modifying the event subscription mechanism in the bootstrap process, and updating various dependencies. A new RPCBlockTrackingSubscriber has been implemented to improve block event tracking, and the cross-spork client has been extended with a method to subscribe to block headers from a specific height.

Changes

File Change Summary
Makefile Added .PHONY targets start-testnet and start-mainnet for simplified environment-specific application startup
bootstrap/bootstrap.go Replaced NewRPCEventSubscriber with NewRPCBlockTrackingSubscriber for event ingestion
services/ingestion/block_tracking_subscriber.go New file introducing RPCBlockTrackingSubscriber with comprehensive event tracking functionality
services/requester/cross-spork_client.go Added SubscribeBlockHeadersFromStartHeight method to support block header subscriptions
go.mod and tests/go.mod Updated multiple dependencies, including Flow-related packages and testing libraries

Possibly related PRs

  • Fix flaky E2E eth_streaming_test JS test #289: The changes in the main PR regarding the Makefile and new targets for starting testnet and mainnet environments may relate to the modifications in the eth_streaming_test.js file, which involves adjustments to event subscriptions and handling, potentially impacting how the application is started and tested in different environments.
  • Update flow go v0.36.0 #341: The updates to the go.mod file in the main PR, which include version upgrades for dependencies, could be relevant to the changes in the go.mod file of this PR, as both involve dependency management and ensuring compatibility with the latest versions.
  • General improvements to the metrics #424: The changes in the main PR regarding the Makefile and the introduction of new commands may connect with the general improvements to the metrics in this PR, as both aim to enhance the functionality and usability of the application.

Suggested labels

Improvement, EVM, Bugfix

Suggested reviewers

  • peterargue
  • zhangchiqing
  • sideninja

Poem

🐰 Hopping through code with glee,
Blocks and headers now flow free!
Testnet, mainnet, subscribe with might,
Our gateway's tracking takes flight!
Rabbit's code dance, smooth and bright! 🚀

✨ Finishing Touches
  • 📝 Generate Docstrings (Beta)

Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR. (Beta)
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

@m-Peter m-Peter force-pushed the mpeter/poc-index-finalized-block-results branch from 10a8bdd to 4fd4a5d Compare January 22, 2025 16:41
Copy link
Contributor

@peterargue peterargue left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added a couple small comments, but otherwise this looks good. let's get it running against one of the live networks.

services/ingestion/block_header_subscriber.go Outdated Show resolved Hide resolved
services/ingestion/block_header_subscriber.go Outdated Show resolved Hide resolved
services/ingestion/block_header_subscriber.go Outdated Show resolved Hide resolved
services/ingestion/block_header_subscriber.go Outdated Show resolved Hide resolved
services/ingestion/block_header_subscriber.go Outdated Show resolved Hide resolved
@m-Peter
Copy link
Collaborator Author

m-Peter commented Jan 23, 2025

@peterargue Thanks for the review 🙌 I've addressed all the comments.
Did you perhaps have the chance to take a look at the behavior described in #727 (comment) ? This is puzzling to me 🤔

@m-Peter m-Peter marked this pull request as ready for review January 23, 2025 15:29
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (2)
services/ingestion/block_tracking_subscriber.go (1)

137-142: Simplify loop by removing redundant context check.

Since you're already handling context cancellation within the select statement using <-ctx.Done(), the loop condition for ctx.Err() == nil is redundant. Simplify the loop by using for {} to improve code readability.

Apply this diff to simplify the loop:

-       for ctx.Err() == nil {
+       for {
            select {
            case <-ctx.Done():
                r.logger.Info().Msg("event ingestion received done signal")
                return
            // ...
services/requester/cross-spork_client.go (1)

262-265: Avoid type assertion to *grpc.Client; use an interface to ensure compatibility.

Direct type assertion to *grpc.Client can be fragile and may break if the underlying implementation changes. Consider defining an interface that includes the SubscribeBlockHeadersFromStartHeight method and updating your clients to implement this interface. This approach enhances flexibility and maintainability.

For example, define an interface:

type BlockHeaderSubscriber interface {
    SubscribeBlockHeadersFromStartHeight(ctx context.Context, startHeight uint64, blockStatus flow.BlockStatus) (<-chan flow.BlockHeader, <-chan error, error)
}

And ensure that your clients implement this interface. Then, modify the code:

-       grpcClient, ok := (client).(*grpc.Client)
-       if !ok {
-           return nil, nil, fmt.Errorf("unable to convert to Flow grpc.Client")
-       }

-       return grpcClient.SubscribeBlockHeadersFromStartHeight(ctx, startHeight, blockStatus)
+       subscriberClient, ok := client.(BlockHeaderSubscriber)
+       if !ok {
+           return nil, nil, fmt.Errorf("client does not implement BlockHeaderSubscriber interface")
+       }

+       return subscriberClient.SubscribeBlockHeadersFromStartHeight(ctx, startHeight, blockStatus)
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 56428e2 and 0d9cfb9.

⛔ Files ignored due to path filters (2)
  • go.sum is excluded by !**/*.sum
  • tests/go.sum is excluded by !**/*.sum
📒 Files selected for processing (6)
  • Makefile (1 hunks)
  • bootstrap/bootstrap.go (1 hunks)
  • go.mod (5 hunks)
  • services/ingestion/block_tracking_subscriber.go (1 hunks)
  • services/requester/cross-spork_client.go (2 hunks)
  • tests/go.mod (5 hunks)
🧰 Additional context used
🪛 Gitleaks (8.21.2)
Makefile

243-243: Detected a Generic API Key, potentially exposing access to various services and sensitive operations.

(generic-api-key)


260-260: Detected a Generic API Key, potentially exposing access to various services and sensitive operations.

(generic-api-key)

🔇 Additional comments (3)
bootstrap/bootstrap.go (1)

145-145: Change to NewRPCBlockTrackingSubscriber looks good.

Replacing NewRPCEventSubscriber with NewRPCBlockTrackingSubscriber appropriately integrates the new block tracking functionality into the event ingestion process.

go.mod (1)

9-12: Verify compatibility with updated dependencies.

The dependency updates look reasonable, but please verify:

  1. Compatibility between flow-go-sdk v1.3.0 and the new block header subscription implementation.
  2. Impact of OpenTelemetry instrumentation updates on existing metrics.

Run this script to check for any breaking changes in the updated dependencies:

Also applies to: 185-186, 208-210

✅ Verification successful

Dependencies are compatible with existing implementation

The verification shows:

  • Block header subscription implementation is compatible with flow-go-sdk v1.3.0
  • No active OpenTelemetry instrumentation in the codebase, so the updates have no impact
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Check for breaking changes in updated dependencies

# Check release notes and changelogs
echo "Checking flow-go-sdk changelog..."
curl -s https://api.github.com/repos/onflow/flow-go-sdk/releases/tags/v1.3.0 | jq -r '.body'

echo "Checking OpenTelemetry changelog..."
curl -s https://api.github.com/repos/open-telemetry/opentelemetry-go-contrib/releases/tags/v0.49.0 | jq -r '.body'

Length of output: 1220


Script:

#!/bin/bash
# Check block header subscription and OpenTelemetry usage

# Find block header subscription usage
echo "Checking block header subscription usage..."
rg -A 5 "SubscribeBlocks|SubscribeBlockHeaders" --type go

# Check OpenTelemetry imports and usage
echo -e "\nChecking OpenTelemetry imports and instrumentation..."
rg "go.opentelemetry.io/otel" go.mod
rg -A 3 "otel\." --type go

# Look for metric initialization
echo -e "\nChecking metric initialization patterns..."
rg -A 3 "NewMeterProvider|NewMeter" --type go

Length of output: 2663

tests/go.mod (1)

7-15: LGTM! Dependency versions are consistent.

The dependency updates in tests/go.mod correctly mirror the updates in the main go.mod file, maintaining consistency between main and test code.

Comment on lines +219 to +221
if strings.Contains(errors.Unwrap(err).Error(), "DeadlineExceeded") ||
strings.Contains(errors.Unwrap(err).Error(), "unexpected EOF") {
blockHeadersChan, errChan, err = r.client.SubscribeBlockHeadersFromStartHeight(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Use error types instead of string matching for error handling.

Using strings.Contains to check error messages is not robust and can lead to unexpected behavior if the error messages change. Instead, use errors.Is or compare against specific error variables like context.DeadlineExceeded and io.EOF to handle errors reliably.

Apply this diff to improve error handling:

-       if strings.Contains(errors.Unwrap(err).Error(), "DeadlineExceeded") ||
-           strings.Contains(errors.Unwrap(err).Error(), "unexpected EOF") {
+       unwrappedErr := errors.Unwrap(err)
+       if errors.Is(unwrappedErr, context.DeadlineExceeded) ||
+           errors.Is(unwrappedErr, io.EOF) {
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if strings.Contains(errors.Unwrap(err).Error(), "DeadlineExceeded") ||
strings.Contains(errors.Unwrap(err).Error(), "unexpected EOF") {
blockHeadersChan, errChan, err = r.client.SubscribeBlockHeadersFromStartHeight(
unwrappedErr := errors.Unwrap(err)
if errors.Is(unwrappedErr, context.DeadlineExceeded) ||
errors.Is(unwrappedErr, io.EOF) {
blockHeadersChan, errChan, err = r.client.SubscribeBlockHeadersFromStartHeight(

Comment on lines +230 to +246
.PHONY: start-testnet
start-testnet:
rm -rf testnet-db-block-headers/
rm -rf metrics/data/
go run cmd/main.go run \
--database-dir=testnet-db-block-headers \
--access-node-grpc-host=access.devnet.nodes.onflow.org:9000 \
--access-node-spork-hosts=access-001.devnet51.nodes.onflow.org:9000 \
--flow-network-id=flow-testnet \
--init-cadence-height=211176670 \
--ws-enabled=true \
--coinbase=FACF71692421039876a5BB4F10EF7A439D8ef61E \
--coa-address=0x62631c28c9fc5a91 \
--coa-key=2892fba444f1d5787739708874e3b01160671924610411ac787ac1379d420f49 \
--gas-price=100 \
--log-level=info

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

⚠️ Potential issue

Secure sensitive values and improve code reusability.

  1. Security: Move sensitive values (API keys, addresses) to environment variables instead of hardcoding them.
  2. Code structure: Extract common cleanup commands into a reusable target.

Apply this diff to improve security and reusability:

+.PHONY: clean-metrics
+clean-metrics:
+	rm -rf metrics/data/
+
 .PHONY: start-testnet
 start-testnet:
 	rm -rf testnet-db-block-headers/
-	rm -rf metrics/data/
+	$(MAKE) clean-metrics
 	go run cmd/main.go run \
 		--database-dir=testnet-db-block-headers \
 		--access-node-grpc-host=access.devnet.nodes.onflow.org:9000 \
 		--access-node-spork-hosts=access-001.devnet51.nodes.onflow.org:9000 \
 		--flow-network-id=flow-testnet \
 		--init-cadence-height=211176670 \
 		--ws-enabled=true \
-		--coinbase=FACF71692421039876a5BB4F10EF7A439D8ef61E \
-		--coa-address=0x62631c28c9fc5a91 \
-		--coa-key=2892fba444f1d5787739708874e3b01160671924610411ac787ac1379d420f49 \
+		--coinbase=$(TESTNET_COINBASE) \
+		--coa-address=$(TESTNET_COA_ADDRESS) \
+		--coa-key=$(TESTNET_COA_KEY) \
 		--gas-price=100 \
 		--log-level=info

 .PHONY: start-mainnet
 start-mainnet:
 	rm -rf mainnet-db-block-headers/
-	rm -rf metrics/data/
+	$(MAKE) clean-metrics
 	go run cmd/main.go run \
 		--database-dir=mainnet-db-block-headers \
 		--access-node-grpc-host=access.mainnet.nodes.onflow.org:9000 \
 		--access-node-spork-hosts=access-001.mainnet25.nodes.onflow.org:9000 \
 		--flow-network-id=flow-mainnet \
 		--init-cadence-height=85981135 \
 		--ws-enabled=true \
-		--coinbase=FACF71692421039876a5BB4F10EF7A439D8ef61E \
-		--coa-address=0xf1ab99c82dee3526 \
-		--coa-key=2892fba444f1d5787739708874e3b01160671924610411ac787ac1379d420f49 \
+		--coinbase=$(MAINNET_COINBASE) \
+		--coa-address=$(MAINNET_COA_ADDRESS) \
+		--coa-key=$(MAINNET_COA_KEY) \
 		--gas-price=100 \
 		--log-level=info

Add these variables at the top of the Makefile:

# Network-specific configuration
TESTNET_COINBASE ?= FACF71692421039876a5BB4F10EF7A439D8ef61E
TESTNET_COA_ADDRESS ?= 0x62631c28c9fc5a91
TESTNET_COA_KEY ?= # Set via environment variable

MAINNET_COINBASE ?= FACF71692421039876a5BB4F10EF7A439D8ef61E
MAINNET_COA_ADDRESS ?= 0xf1ab99c82dee3526
MAINNET_COA_KEY ?= # Set via environment variable

Also applies to: 247-262

🧰 Tools
🪛 Gitleaks (8.21.2)

243-243: Detected a Generic API Key, potentially exposing access to various services and sensitive operations.

(generic-api-key)

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 0d9cfb9 and 314e7cc.

📒 Files selected for processing (1)
  • services/ingestion/block_tracking_subscriber.go (1 hunks)
🔇 Additional comments (5)
services/ingestion/block_tracking_subscriber.go (5)

1-2: File structure and package declaration look good.


31-55: Constructor’s logging strategy is consistent.
Using logger fields for “component” is a clean approach for log filtering and scoping. Looks good!


66-71: Graceful closure of eventsChan is well-handled.
The deferred close(eventsChan) ensures the channel is properly cleaned up when the goroutine exits. Nicely done.


191-193: Use error types instead of string matching for error handling.

- if strings.Contains(errors.Unwrap(err).Error(), "DeadlineExceeded") ||
-     strings.Contains(errors.Unwrap(err).Error(), "unexpected EOF") {
+ unwrappedErr := errors.Unwrap(err)
+ if errors.Is(unwrappedErr, context.DeadlineExceeded) ||
+     errors.Is(unwrappedErr, io.EOF) {

72-93: Verify single-invocation usage of Subscribe.
This logic backfills events from previous sporks and updates r.height. Ensure that no other process or goroutine calls Subscribe concurrently, or consider adding concurrency protection to prevent data races.

✅ Verification successful

Concurrent usage of Subscribe is properly controlled
The codebase shows that Subscribe is used correctly with a single-subscription pattern, primarily called by the engine's main loop. No concurrent invocations were found, and the existing buffered channel (capacity 1000) safely handles event processing.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Searching for other call sites of Subscribe to ensure single usage
rg -A 3 'Subscribe('

Length of output: 93


Script:

#!/bin/bash
# Find Subscribe method declaration
ast-grep --pattern 'func $_($_ *BlockTrackingSubscriber) Subscribe($$$) {
  $$$
}'

# Find Subscribe method calls with proper escaping
rg -p 'Subscribe\(' -A 3

# Find goroutine usages with BlockTrackingSubscriber
rg -p 'go .*BlockTrackingSubscriber'

Length of output: 2772

Comment on lines +21 to +29
type RPCBlockTrackingSubscriber struct {
*RPCEventSubscriber

logger zerolog.Logger
client *requester.CrossSporkClient
chain flowGo.ChainID
keyLock requester.KeyLock
height uint64
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Potential concurrency issue with height.
While height is updated within this struct (e.g., lines 86, 92), consider guarding it with a mutex or using atomic operations if multiple goroutines might invoke or access this subscriber concurrently. This will help prevent data races and undefined behavior.

Comment on lines +236 to +242
if len(evts) != 1 && len(evts[0].Events) != 1 {
return flow.BlockEvents{}, fmt.Errorf(
"received unexpected number of EVM events for height: %d, got: %d, expected: 1",
height,
len(evts),
)
}
Copy link
Contributor

@coderabbitai coderabbitai bot Jan 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Prevent potential out-of-bounds errors when checking for events.
This condition uses logical AND (“&&”) to check both array length and the length of the first item in evts. If evts is empty, evts[0] will be invalid and cause a panic. Consider splitting these checks or using safer indexing to avoid out-of-range access.

-if len(evts) != 1 && len(evts[0].Events) != 1 {
+if len(evts) != 1 || len(evts[0].Events) != 1 {
     return flow.BlockEvents{}, fmt.Errorf(
         "received unexpected number of EVM events for height: %d, got: %d, expected: 1",
         height,
         len(evts),
     )
 }

Also applies to: 265-271

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👆 good catch rabbit

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@peterargue Thank you for confirming! I'm glad I could help identify this potential issue.

(\ /)
(。◕‿◕。)
c(")(")


If you found this review helpful, would you consider giving us a shout-out on X?
Thank you for using CodeRabbit!

Comment on lines +110 to +131
// subscribe to events by the provided height and handle any errors.
//
// Subscribing to EVM specific events and handle any disconnection errors
// as well as context cancellations.
func (r *RPCBlockTrackingSubscriber) subscribe(ctx context.Context, height uint64) <-chan models.BlockEvents {
eventsChan := make(chan models.BlockEvents)

blockHeadersChan, errChan, err := r.client.SubscribeBlockHeadersFromStartHeight(
ctx,
height,
flow.BlockStatusFinalized,
)
if err != nil {
eventsChan <- models.NewBlockEventsError(
fmt.Errorf(
"failed to subscribe for finalized block headers on height: %d, with: %w",
height,
err,
),
)
return eventsChan
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Ensure proper channel closure on subscription failure.
When SubscribeBlockHeadersFromStartHeight returns an error on lines 122-130, you return eventsChan without closing it. This can leave consumers of eventsChan blocked. Consider closing the channel before returning.

 if err != nil {
     eventsChan <- models.NewBlockEventsError(
         fmt.Errorf(
             "failed to subscribe for finalized block headers on height: %d, with: %w",
             height,
             err,
         ),
     )
+    close(eventsChan)
     return eventsChan
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// subscribe to events by the provided height and handle any errors.
//
// Subscribing to EVM specific events and handle any disconnection errors
// as well as context cancellations.
func (r *RPCBlockTrackingSubscriber) subscribe(ctx context.Context, height uint64) <-chan models.BlockEvents {
eventsChan := make(chan models.BlockEvents)
blockHeadersChan, errChan, err := r.client.SubscribeBlockHeadersFromStartHeight(
ctx,
height,
flow.BlockStatusFinalized,
)
if err != nil {
eventsChan <- models.NewBlockEventsError(
fmt.Errorf(
"failed to subscribe for finalized block headers on height: %d, with: %w",
height,
err,
),
)
return eventsChan
}
// subscribe to events by the provided height and handle any errors.
//
// Subscribing to EVM specific events and handle any disconnection errors
// as well as context cancellations.
func (r *RPCBlockTrackingSubscriber) subscribe(ctx context.Context, height uint64) <-chan models.BlockEvents {
eventsChan := make(chan models.BlockEvents)
blockHeadersChan, errChan, err := r.client.SubscribeBlockHeadersFromStartHeight(
ctx,
height,
flow.BlockStatusFinalized,
)
if err != nil {
eventsChan <- models.NewBlockEventsError(
fmt.Errorf(
"failed to subscribe for finalized block headers on height: %d, with: %w",
height,
err,
),
)
close(eventsChan)
return eventsChan
}

Comment on lines +24 to +28
logger zerolog.Logger
client *requester.CrossSporkClient
chain flowGo.ChainID
keyLock requester.KeyLock
height uint64
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like all of these are also available from the event subscriber, so you don't need to explicitly add them here

type RPCEventSubscriber struct {
logger zerolog.Logger
client *requester.CrossSporkClient
chain flowGo.ChainID
keyLock requester.KeyLock
height uint64
recovery bool
recoveredEvents []flow.Event
}

Comment on lines +38 to +54
eventSubscriber := NewRPCEventSubscriber(
logger,
client,
chainID,
keyLock,
startHeight,
)
logger = logger.With().Str("component", "subscriber").Logger()

return &RPCBlockTrackingSubscriber{
RPCEventSubscriber: eventSubscriber,
logger: logger,
client: client,
chain: chainID,
keyLock: keyLock,
height: startHeight,
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

consider shortening

Suggested change
eventSubscriber := NewRPCEventSubscriber(
logger,
client,
chainID,
keyLock,
startHeight,
)
logger = logger.With().Str("component", "subscriber").Logger()
return &RPCBlockTrackingSubscriber{
RPCEventSubscriber: eventSubscriber,
logger: logger,
client: client,
chain: chainID,
keyLock: keyLock,
height: startHeight,
}
return &RPCBlockTrackingSubscriber{
RPCEventSubscriber: NewRPCEventSubscriber(
logger.With().Str("component", "subscriber").Logger(),
client,
chainID,
keyLock,
startHeight,
)
}


var _ EventSubscriber = &RPCBlockTrackingSubscriber{}

type RPCBlockTrackingSubscriber struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's add a big warning about the limitations

Suggested change
type RPCBlockTrackingSubscriber struct {
// RPCBlockTrackingSubscriber subscribes to new EVM block events for unsealed finalized blocks.
// This is accomplished by following finalized blocks from the upstream Access node, and using the
// polling endpoint to fetch the events for each finalized block.
//
// IMPORTANT: Since data is downloaded and processed from unsealed blocks, it's possible for the
// data that was downloaded to be incorrect. This subscriber provides no handling or detection for
// cases where the received data differs from the data that was ultimately sealed. The operator must
// handle this manually.
// Since it's not reasonable to expect operators to do this manual tracking, this features should NOT
// be used outside of a limited Proof of Concept. Use at own risk.
//
// A future version of the RPCEventSubscriber will provide this detection and handling functionality
// at which point this subscriber will be removed.
type RPCBlockTrackingSubscriber struct {

Comment on lines +236 to +242
if len(evts) != 1 && len(evts[0].Events) != 1 {
return flow.BlockEvents{}, fmt.Errorf(
"received unexpected number of EVM events for height: %d, got: %d, expected: 1",
height,
len(evts),
)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👆 good catch rabbit

return flow.BlockEvents{}, err
}

if len(evts) != 1 && len(evts[0].Events) != 1 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if len(evts) != 1 && len(evts[0].Events) != 1 {
if len(evts) != 1 || len(evts[0].Events) != 1 {

blockEvents.Events = append(blockEvents.Events, txEvents.Events...)

return blockEvents, nil
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

consider refactoring this a bit to make it clearer

func (r *RPCBlockTrackingSubscriber) evmEventsForBlock(
	ctx context.Context,
	blockID flow.Identifier,
) (flow.BlockEvents, error) {
	eventTypes := blocksFilter(r.chain).EventTypes

	// evm Block events
	blockEvents, err := r.getEventsByType(ctx, blockID, eventTypes[0])
	if err != nil {
		return flow.BlockEvents{}, err
	}

	payload, err := events.DecodeBlockEventPayload(blockEvents.Events[0].Value)
	if err != nil {
		return flow.BlockEvents{}, err
	}

	if payload.TransactionHashRoot == types.EmptyTxsHash {
		return blockEvents, nil
	}

	// evm TX events
	txEvents, err := r.getEventsByType(ctx, blockID, eventTypes[1])
	if err != nil {
		return flow.BlockEvents{}, err
	}

	// combine block and tx events to be processed together
	blockEvents.Events = append(blockEvents.Events, txEvents.Events...)

	return blockEvents, nil
}

func (r *RPCBlockTrackingSubscriber) getEventsByType(
	ctx context.Context,
	blockID flow.Identifier,
	eventType string,
) (flow.BlockEvents, error) {
	evts, err := r.client.GetEventsForBlockIDs(
		ctx,
		eventType,
		[]flow.Identifier{blockID},
	)
	if err != nil {
		return flow.BlockEvents{}, err
	}

	if len(evts) != 1 {
		// this shouldn't happen and probably indicates a bug on the Access node.
		return flow.BlockEvents{}, fmt.Errorf(
			"received unexpected number of block events: got: %d, expected: 1",
			len(evts),
		)
	}

	if len(evts[0].Events) != 1 {
		return flow.BlockEvents{}, fmt.Errorf(
			"received unexpected number of events in block: got: %d, expected: 1",
			len(evts[0].Events),
		)
	}

	return evts[0], nil
}

eventTypes := blocksFilter(r.chain).EventTypes
evmBlockEvent := eventTypes[0]

evts, err := r.client.GetEventsForHeightRange(
Copy link
Contributor

@peterargue peterargue Jan 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

an important detail from the issue that I missed in the last review is that this needs to use GetEventsForBlockIDs instead of the height range version. The range endpoint will restrict the request to sealed blocks, which defeats the purpose of this change. GetEventsForBlockIDs will search any blocks you provide allowing us to query finalized blocks as well.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants