Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Peer Management Enhancements (issue #488) #491

Merged
merged 43 commits into from
Jan 8, 2025
Merged
Show file tree
Hide file tree
Changes from 38 commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
e64f2ab
Initial framework for peer mgmt code and new queued msg tables
peterbroadhurst Dec 23, 2024
beae08f
Spelling for reliable delivery API
peterbroadhurst Dec 24, 2024
9f34a57
Update transport proto interface for activate/deactive lifecycle
peterbroadhurst Dec 27, 2024
6f15517
Close out on gRPC transport changes for activate/deactivate
peterbroadhurst Dec 27, 2024
44defaf
State distribution message building and sender loop
peterbroadhurst Dec 28, 2024
1725f86
Working through refactor of transport interface
peterbroadhurst Dec 29, 2024
6ea9915
Kick off test reconcile for updated transport manager
peterbroadhurst Dec 29, 2024
1169abc
Work through send loop logic
peterbroadhurst Dec 30, 2024
582b72f
Fix gorm annotations
peterbroadhurst Dec 31, 2024
3342176
Original tests all re-instated
peterbroadhurst Dec 31, 2024
92bd1f1
Reconcile logic for node target validation
peterbroadhurst Dec 31, 2024
a8fa88b
First real DB test of reliable re-delivery
peterbroadhurst Dec 31, 2024
01f1fa7
Work through additional scenarios
peterbroadhurst Jan 1, 2025
dc0ca39
Close out test of send path, ready for receive
peterbroadhurst Jan 2, 2025
7eaa162
Allow active peer without sender connected and add stats
peterbroadhurst Jan 2, 2025
253859c
Initial work on receive message processing with acks
peterbroadhurst Jan 2, 2025
ffb4d12
Peer inactivity reaper
peterbroadhurst Jan 3, 2025
8bdfed8
Work through DB test of state receipt and ack return
peterbroadhurst Jan 3, 2025
d168cbe
Remove quiesce logic now we have simpler reaper based close
peterbroadhurst Jan 3, 2025
a2123a3
Close out on receive/ack logic
peterbroadhurst Jan 3, 2025
6fdfc2d
Add RPC interface for query
peterbroadhurst Jan 3, 2025
2b62d39
Add peer RPC functions to client/docs
peterbroadhurst Jan 3, 2025
5fe2739
Reconcile interfaces for prepared TX
peterbroadhurst Jan 3, 2025
60f76c4
Add support for receipt and prepared TX reliable msgs
peterbroadhurst Jan 4, 2025
55db3c0
Add a ReceivedMessage object for component delivery
peterbroadhurst Jan 4, 2025
4c93d19
Add activate/deactivate node functions
peterbroadhurst Jan 4, 2025
3962e63
Update types in private TX manager and remove redundant logic/interfaces
peterbroadhurst Jan 5, 2025
f36b2f8
Fix migrations
peterbroadhurst Jan 5, 2025
0747909
Allow sender to specify message ID for correlation
peterbroadhurst Jan 5, 2025
3393a64
Update DomainContext export/import to be full snapshot with states
peterbroadhurst Jan 6, 2025
da143aa
Generate nullifiers when writing received states
peterbroadhurst Jan 6, 2025
c1c3d62
Merge branch 'main' of https://github.com/kaleido-io/paladin into peers
peterbroadhurst Jan 6, 2025
a7c5aa9
Fix SQLite migration
peterbroadhurst Jan 6, 2025
456023b
Fix down migrations for later deletion of table
peterbroadhurst Jan 6, 2025
59ae8f9
Up coverage
peterbroadhurst Jan 6, 2025
133e1fd
Fix client for struct change
peterbroadhurst Jan 6, 2025
2c9655b
Add docs for peer JSON/RPC apis
peterbroadhurst Jan 6, 2025
95d72eb
Remove unused fields after activate/deactivate API update
peterbroadhurst Jan 6, 2025
b06c28e
Update naming from review
peterbroadhurst Jan 6, 2025
98619a5
Remove map order indeterminism in test
peterbroadhurst Jan 6, 2025
3f43002
Requesting node may not be the one that prepares a transaction
awrichar Jan 8, 2025
a58fe5d
pente: propagate required signer for prepared transactions
awrichar Jan 8, 2025
3dcb438
Merge branch 'main' into peers
peterbroadhurst Jan 8, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 33 additions & 2 deletions config/pkg/pldconf/transportmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,46 @@
*/
package pldconf

import "github.com/kaleido-io/paladin/config/pkg/confutil"

type TransportManagerConfig struct {
NodeName string `json:"nodeName"`
Transports map[string]*TransportConfig `json:"transports"`
NodeName string `json:"nodeName"`
SendQueueLen *int `json:"sendQueueLen"`
PeerInactivityTimeout *string `json:"peerInactivityTimeout"`
PeerReaperInterval *string `json:"peerReaperInterval"`
SendRetry RetryConfigWithMax `json:"sendRetry"`
ReliableScanRetry RetryConfig `json:"reliableScanRetry"`
ReliableMessageResend *string `json:"reliableMessageResend"`
ReliableMessageWriter FlushWriterConfig `json:"reliableMessageWriter"`
Transports map[string]*TransportConfig `json:"transports"`
}

type TransportInitConfig struct {
Retry RetryConfig `json:"retry"`
}

var TransportManagerDefaults = &TransportManagerConfig{
SendQueueLen: confutil.P(10),
ReliableMessageResend: confutil.P("30s"),
PeerInactivityTimeout: confutil.P("1m"),
PeerReaperInterval: confutil.P("30s"),
ReliableScanRetry: GenericRetryDefaults.RetryConfig,
// SendRetry defaults are deliberately short
SendRetry: RetryConfigWithMax{
RetryConfig: RetryConfig{
InitialDelay: confutil.P("50ms"),
MaxDelay: confutil.P("1s"),
Factor: confutil.P(2.0),
},
MaxAttempts: confutil.P(3),
},
ReliableMessageWriter: FlushWriterConfig{
WorkerCount: confutil.P(1),
BatchTimeout: confutil.P("250ms"),
BatchMaxSize: confutil.P(50),
},
}

type TransportConfig struct {
Init TransportInitConfig `json:"init"`
Plugin PluginConfig `json:"plugin"`
Expand Down
14 changes: 1 addition & 13 deletions core/go/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ ext {
include "mocks/**/*.go"
}

targetCoverage = 92.5
targetCoverage = 93.5
maxCoverageBarGap = 1
coverageExcludedPackages = [
'github.com/kaleido-io/paladin/core/internal/privatetxnmgr/ptmgrtypes/mock_transaction_flow.go',
Expand Down Expand Up @@ -231,18 +231,6 @@ task makeMocks(type: Mockery, dependsOn: [":installMockery", protoc, goGet]) {
outputDir 'mocks/ethclientmocks'
}
mock {
inputDir 'internal/statedistribution'
includeAll true
outputPackage 'statedistributionmocks'
outputDir 'mocks/statedistributionmocks'
}
mock {
inputDir 'internal/preparedtxdistribution'
includeAll true
outputPackage 'preparedtxdistributionmocks'
outputDir 'mocks/preparedtxdistributionmocks'
}
mock {
inputDir 'internal/privatetxnmgr/ptmgrtypes'
name "TransactionFlow"
inpackage true
Expand Down
11 changes: 8 additions & 3 deletions core/go/componenttest/component_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -915,12 +915,17 @@ func TestNotaryDelegatedPrepare(t *testing.T) {

assert.Eventually(t,
func() bool {
preparedTx := pldapi.PreparedTransaction{}
var preparedTx *pldapi.PreparedTransaction

err = client1.CallRPC(ctx, &preparedTx, "ptx_getPreparedTransaction", transferA2BTxId)
// The transaction is prepared with a from-address that is local to node3 - so only
// node3 will be able to send it. So that's where it gets persisted.
err = client3.CallRPC(ctx, &preparedTx, "ptx_getPreparedTransaction", transferA2BTxId)
require.NoError(t, err)
assert.Empty(t, preparedTx.Transaction.Domain)

if preparedTx == nil {
return false
}
assert.Empty(t, preparedTx.Transaction.Domain)
return preparedTx.ID == transferA2BTxId && len(preparedTx.States.Spent) == 1 && len(preparedTx.States.Confirmed) == 2

},
Expand Down
2 changes: 2 additions & 0 deletions core/go/componenttest/utils_for_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,8 @@ func testConfig(t *testing.T) pldconf.PaladinConfig {
conf.RPCServer.WS.Disabled = true
conf.Log.Level = confutil.P("info")

conf.TransportManagerConfig.ReliableMessageWriter.BatchMaxSize = confutil.P(1)

conf.Wallets[0].Signer.KeyStore.Static.Keys["seed"] = pldconf.StaticKeyEntryConfig{
Encoding: "hex",
Inline: tktypes.RandHex(32),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
BEGIN;
DROP TABLE dispatches;
DROP TABLE state_distribution_acknowledgments;
DROP TABLE state_distributions;
DROP TABLE IF EXISTS dispatches;
DROP TABLE IF EXISTS state_distribution_acknowledgments;
DROP TABLE IF EXISTS state_distributions;
COMMIT;

Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
BEGIN;
DROP TABLE prepared_txn_states;
DROP TABLE prepared_txn_distribution_acknowledgments;
DROP TABLE prepared_txn_distributions;
DROP TABLE prepared_txns;
DROP TABLE IF EXISTS prepared_txn_states;
DROP TABLE IF EXISTS prepared_txn_distribution_acknowledgments;
DROP TABLE IF EXISTS prepared_txn_distributions;
DROP TABLE IF EXISTS prepared_txns;
COMMIT;
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
BEGIN;
DROP TABLE reliable_msg_acks;
DROP TABLE reliable_msgs;
COMMIT;

30 changes: 30 additions & 0 deletions core/go/db/migrations/postgres/000014_peer_queued_messages.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
BEGIN;

-- These tables are replaced (data is not migrated from initial state distribution specific implementation)
DROP TABLE state_distribution_acknowledgments;
DROP TABLE state_distributions;
DROP TABLE prepared_txn_distribution_acknowledgments;
DROP TABLE prepared_txn_distributions;

CREATE TABLE reliable_msgs (
"sequence" BIGINT GENERATED ALWAYS AS IDENTITY,
"id" UUID NOT NULL,
"created" BIGINT NOT NULL,
"node" TEXT NOT NULL,
"msg_type" TEXT NOT NULL,
"metadata" TEXT
);

CREATE UNIQUE INDEX reliable_msgs_id ON reliable_msgs ("id");
CREATE INDEX reliable_msgs_node ON reliable_msgs ("node");
CREATE INDEX reliable_msgs_created ON reliable_msgs ("created");

CREATE TABLE reliable_msg_acks (
"id" UUID NOT NULL,
"time" BIGINT NOT NULL,
"error" TEXT,
PRIMARY KEY ("id"),
FOREIGN KEY ("id") REFERENCES reliable_msgs ("id") ON DELETE CASCADE
);

COMMIT;
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
DROP TABLE dispatches;
DROP TABLE state_distribution_acknowledgments;
DROP TABLE state_distributions;
DROP TABLE IF EXISTS dispatches;
DROP TABLE IF EXISTS state_distribution_acknowledgments;
DROP TABLE IF EXISTS state_distributions;

Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
DROP TABLE prepared_txn_states;
DROP TABLE prepared_txn_distribution_acknowledgments;
DROP TABLE prepared_txn_distributions;
DROP TABLE prepared_txns;
DROP TABLE IF EXISTS prepared_txn_states;
DROP TABLE IF EXISTS prepared_txn_distribution_acknowledgments;
DROP TABLE IF EXISTS prepared_txn_distributions;
DROP TABLE IF EXISTS prepared_txns;
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
DROP TABLE reliable_msg_acks;
DROP TABLE reliable_msgs;

28 changes: 28 additions & 0 deletions core/go/db/migrations/sqlite/000014_peer_queued_messages.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
-- These tables are replaced (data is not migrated from initial state distribution specific implementation)
DROP TABLE state_distribution_acknowledgments;
DROP TABLE state_distributions;
DROP TABLE prepared_txn_distribution_acknowledgments;
DROP TABLE prepared_txn_distributions;

CREATE TABLE reliable_msgs (
"sequence" INTEGER PRIMARY KEY AUTOINCREMENT,
"id" UUID NOT NULL,
"created" BIGINT NOT NULL,
"node" TEXT NOT NULL,
"msg_type" TEXT NOT NULL,
"metadata" TEXT
);

CREATE UNIQUE INDEX reliable_msgs_id ON reliable_msgs ("id");
CREATE INDEX reliable_msgs_node ON reliable_msgs ("node");
CREATE INDEX reliable_msgs_created ON reliable_msgs ("created");

CREATE TABLE reliable_msg_acks (
"id" UUID NOT NULL,
"time" BIGINT NOT NULL,
"error" TEXT,
PRIMARY KEY ("id"),
FOREIGN KEY ("id") REFERENCES reliable_msgs ("id") ON DELETE CASCADE
);


36 changes: 19 additions & 17 deletions core/go/internal/components/privatetxmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

"github.com/google/uuid"
"github.com/hyperledger/firefly-signer/pkg/abi"
"github.com/kaleido-io/paladin/toolkit/pkg/tktypes"
"gorm.io/gorm"
)

Expand Down Expand Up @@ -51,29 +52,28 @@ type PrivateTxStatus struct {
FailureMessage string `json:"failureMessage,omitempty"`
}

// If we had lots of these we would probably want to centralize the assignment of the constants to avoid duplication
// but currently there is only 2 ( the other being IDENTITY_RESOLVER_DESTINATION )
const PRIVATE_TX_MANAGER_DESTINATION = "private-tx-manager"

type StateDistributionSet struct {
LocalNode string
SenderNode string
Remote []*StateDistribution
Local []*StateDistribution
Remote []*StateDistributionWithData
Local []*StateDistributionWithData
}

// A StateDistribution is an intent to send private data for a given state to a remote party
type StateDistribution struct {
ID string
StateID string
IdentityLocator string
Domain string
ContractAddress string
SchemaID string
StateDataJson string
NullifierAlgorithm *string
NullifierVerifierType *string
NullifierPayloadType *string
StateID string `json:"stateId"`
IdentityLocator string `json:"identityLocator"`
Domain string `json:"domain"`
ContractAddress string `json:"contractAddress"`
SchemaID string `json:"schemaId"`
NullifierAlgorithm *string `json:"nullifierAlgorithm"`
NullifierVerifierType *string `json:"nullifierVerifierType"`
NullifierPayloadType *string `json:"nullifierPayloadType"`
}

// A StateDistributionWithData is an intent to send private data for a given state to a remote party
type StateDistributionWithData struct {
StateDistribution
StateData tktypes.RawJSON `json:"stateData"`
}

type PrivateTxManager interface {
Expand All @@ -96,4 +96,6 @@ type PrivateTxManager interface {
PrivateTransactionConfirmed(ctx context.Context, receipt *TxCompletion)

BuildStateDistributions(ctx context.Context, tx *PrivateTransaction) (*StateDistributionSet, error)
BuildNullifier(ctx context.Context, kr KeyResolver, s *StateDistributionWithData) (*NullifierUpsert, error)
BuildNullifiers(ctx context.Context, distributions []*StateDistributionWithData) (nullifiers []*NullifierUpsert, err error)
}
14 changes: 7 additions & 7 deletions core/go/internal/components/statemgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,10 @@ type DomainContext interface {
FindAvailableStates(dbTX *gorm.DB, schemaID tktypes.Bytes32, query *query.QueryJSON) (Schema, []*pldapi.State, error)

// Return a snapshot of all currently known state locks
ExportStateLocks() ([]byte, error)
ExportSnapshot() ([]byte, error)

// ImportStateLocks is used to restore the state of the domain context, by adding a set of locks
ImportStateLocks([]byte) error
// ImportSnapshot is used to restore the state of the domain context, by adding a set of locks
ImportSnapshot([]byte) error

// FindAvailableNullifiers is similar to FindAvailableStates, but for domains that leverage
// nullifiers to record spending.
Expand Down Expand Up @@ -171,10 +171,10 @@ type DomainContext interface {
}

type StateUpsert struct {
ID tktypes.HexBytes
SchemaID tktypes.Bytes32
Data tktypes.RawJSON
CreatedBy *uuid.UUID
ID tktypes.HexBytes `json:"id"`
Schema tktypes.Bytes32 `json:"schema"`
Data tktypes.RawJSON `json:"data"`
CreatedBy *uuid.UUID `json:"createdBy,omitempty"` // not exported
}

type StateUpsertOutsideContext struct {
Expand Down
11 changes: 3 additions & 8 deletions core/go/internal/components/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,9 @@ type TransactionStateRefs struct {
Info []tktypes.HexBytes
}

type PrepareTransactionWithRefs struct {
ID uuid.UUID // ID of the original private transaction
Domain string // domain of the original private transaction
To *tktypes.EthAddress // the private smart contract that was invoked
States TransactionStateRefs // the states associated with the original private transaction
Metadata tktypes.RawJSON // metadta produced from the prepare of the original private transaction, in addition to the prepared transaction
Transaction *pldapi.TransactionInput // the downstream transaction - might be public or private
Sender string // the sender of the original private transaction
type PreparedTransactionWithRefs struct {
*pldapi.PreparedTransactionBase
StateRefs TransactionStateRefs `json:"stateRefs"` // the states associated with the original private transaction
}

type TransactionPreAssembly struct {
Expand Down
Loading
Loading