Skip to content

Commit

Permalink
Merge pull request #491 from LF-Decentralized-Trust-labs/peers
Browse files Browse the repository at this point in the history
Peer Management Enhancements (issue #488)
  • Loading branch information
peterbroadhurst authored Jan 8, 2025
2 parents d94666b + 3dcb438 commit 7d67e28
Show file tree
Hide file tree
Showing 103 changed files with 4,023 additions and 2,972 deletions.
35 changes: 33 additions & 2 deletions config/pkg/pldconf/transportmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,46 @@
*/
package pldconf

import "github.com/kaleido-io/paladin/config/pkg/confutil"

type TransportManagerConfig struct {
NodeName string `json:"nodeName"`
Transports map[string]*TransportConfig `json:"transports"`
NodeName string `json:"nodeName"`
SendQueueLen *int `json:"sendQueueLen"`
PeerInactivityTimeout *string `json:"peerInactivityTimeout"`
PeerReaperInterval *string `json:"peerReaperInterval"`
SendRetry RetryConfigWithMax `json:"sendRetry"`
ReliableScanRetry RetryConfig `json:"reliableScanRetry"`
ReliableMessageResend *string `json:"reliableMessageResend"`
ReliableMessageWriter FlushWriterConfig `json:"reliableMessageWriter"`
Transports map[string]*TransportConfig `json:"transports"`
}

type TransportInitConfig struct {
Retry RetryConfig `json:"retry"`
}

var TransportManagerDefaults = &TransportManagerConfig{
SendQueueLen: confutil.P(10),
ReliableMessageResend: confutil.P("30s"),
PeerInactivityTimeout: confutil.P("1m"),
PeerReaperInterval: confutil.P("30s"),
ReliableScanRetry: GenericRetryDefaults.RetryConfig,
// SendRetry defaults are deliberately short
SendRetry: RetryConfigWithMax{
RetryConfig: RetryConfig{
InitialDelay: confutil.P("50ms"),
MaxDelay: confutil.P("1s"),
Factor: confutil.P(2.0),
},
MaxAttempts: confutil.P(3),
},
ReliableMessageWriter: FlushWriterConfig{
WorkerCount: confutil.P(1),
BatchTimeout: confutil.P("250ms"),
BatchMaxSize: confutil.P(50),
},
}

type TransportConfig struct {
Init TransportInitConfig `json:"init"`
Plugin PluginConfig `json:"plugin"`
Expand Down
14 changes: 1 addition & 13 deletions core/go/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ ext {
include "mocks/**/*.go"
}

targetCoverage = 92.5
targetCoverage = 93.5
maxCoverageBarGap = 1
coverageExcludedPackages = [
'github.com/kaleido-io/paladin/core/internal/privatetxnmgr/ptmgrtypes/mock_transaction_flow.go',
Expand Down Expand Up @@ -231,18 +231,6 @@ task makeMocks(type: Mockery, dependsOn: [":installMockery", protoc, goGet]) {
outputDir 'mocks/ethclientmocks'
}
mock {
inputDir 'internal/statedistribution'
includeAll true
outputPackage 'statedistributionmocks'
outputDir 'mocks/statedistributionmocks'
}
mock {
inputDir 'internal/preparedtxdistribution'
includeAll true
outputPackage 'preparedtxdistributionmocks'
outputDir 'mocks/preparedtxdistributionmocks'
}
mock {
inputDir 'internal/privatetxnmgr/ptmgrtypes'
name "TransactionFlow"
inpackage true
Expand Down
11 changes: 8 additions & 3 deletions core/go/componenttest/component_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -915,12 +915,17 @@ func TestNotaryDelegatedPrepare(t *testing.T) {

assert.Eventually(t,
func() bool {
preparedTx := pldapi.PreparedTransaction{}
var preparedTx *pldapi.PreparedTransaction

err = client1.CallRPC(ctx, &preparedTx, "ptx_getPreparedTransaction", transferA2BTxId)
// The transaction is prepared with a from-address that is local to node3 - so only
// node3 will be able to send it. So that's where it gets persisted.
err = client3.CallRPC(ctx, &preparedTx, "ptx_getPreparedTransaction", transferA2BTxId)
require.NoError(t, err)
assert.Empty(t, preparedTx.Transaction.Domain)

if preparedTx == nil {
return false
}
assert.Empty(t, preparedTx.Transaction.Domain)
return preparedTx.ID == transferA2BTxId && len(preparedTx.States.Spent) == 1 && len(preparedTx.States.Confirmed) == 2

},
Expand Down
2 changes: 2 additions & 0 deletions core/go/componenttest/utils_for_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,8 @@ func testConfig(t *testing.T) pldconf.PaladinConfig {
conf.RPCServer.WS.Disabled = true
conf.Log.Level = confutil.P("info")

conf.TransportManagerConfig.ReliableMessageWriter.BatchMaxSize = confutil.P(1)

conf.Wallets[0].Signer.KeyStore.Static.Keys["seed"] = pldconf.StaticKeyEntryConfig{
Encoding: "hex",
Inline: tktypes.RandHex(32),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
BEGIN;
DROP TABLE dispatches;
DROP TABLE state_distribution_acknowledgments;
DROP TABLE state_distributions;
DROP TABLE IF EXISTS dispatches;
DROP TABLE IF EXISTS state_distribution_acknowledgments;
DROP TABLE IF EXISTS state_distributions;
COMMIT;

Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
BEGIN;
DROP TABLE prepared_txn_states;
DROP TABLE prepared_txn_distribution_acknowledgments;
DROP TABLE prepared_txn_distributions;
DROP TABLE prepared_txns;
DROP TABLE IF EXISTS prepared_txn_states;
DROP TABLE IF EXISTS prepared_txn_distribution_acknowledgments;
DROP TABLE IF EXISTS prepared_txn_distributions;
DROP TABLE IF EXISTS prepared_txns;
COMMIT;
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
BEGIN;
DROP TABLE reliable_msg_acks;
DROP TABLE reliable_msgs;
COMMIT;

30 changes: 30 additions & 0 deletions core/go/db/migrations/postgres/000014_peer_queued_messages.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
BEGIN;

-- These tables are replaced (data is not migrated from initial state distribution specific implementation)
DROP TABLE state_distribution_acknowledgments;
DROP TABLE state_distributions;
DROP TABLE prepared_txn_distribution_acknowledgments;
DROP TABLE prepared_txn_distributions;

CREATE TABLE reliable_msgs (
"sequence" BIGINT GENERATED ALWAYS AS IDENTITY,
"id" UUID NOT NULL,
"created" BIGINT NOT NULL,
"node" TEXT NOT NULL,
"msg_type" TEXT NOT NULL,
"metadata" TEXT
);

CREATE UNIQUE INDEX reliable_msgs_id ON reliable_msgs ("id");
CREATE INDEX reliable_msgs_node ON reliable_msgs ("node");
CREATE INDEX reliable_msgs_created ON reliable_msgs ("created");

CREATE TABLE reliable_msg_acks (
"id" UUID NOT NULL,
"time" BIGINT NOT NULL,
"error" TEXT,
PRIMARY KEY ("id"),
FOREIGN KEY ("id") REFERENCES reliable_msgs ("id") ON DELETE CASCADE
);

COMMIT;
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
DROP TABLE dispatches;
DROP TABLE state_distribution_acknowledgments;
DROP TABLE state_distributions;
DROP TABLE IF EXISTS dispatches;
DROP TABLE IF EXISTS state_distribution_acknowledgments;
DROP TABLE IF EXISTS state_distributions;

Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
DROP TABLE prepared_txn_states;
DROP TABLE prepared_txn_distribution_acknowledgments;
DROP TABLE prepared_txn_distributions;
DROP TABLE prepared_txns;
DROP TABLE IF EXISTS prepared_txn_states;
DROP TABLE IF EXISTS prepared_txn_distribution_acknowledgments;
DROP TABLE IF EXISTS prepared_txn_distributions;
DROP TABLE IF EXISTS prepared_txns;
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
DROP TABLE reliable_msg_acks;
DROP TABLE reliable_msgs;

28 changes: 28 additions & 0 deletions core/go/db/migrations/sqlite/000014_peer_queued_messages.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
-- These tables are replaced (data is not migrated from initial state distribution specific implementation)
DROP TABLE state_distribution_acknowledgments;
DROP TABLE state_distributions;
DROP TABLE prepared_txn_distribution_acknowledgments;
DROP TABLE prepared_txn_distributions;

CREATE TABLE reliable_msgs (
"sequence" INTEGER PRIMARY KEY AUTOINCREMENT,
"id" UUID NOT NULL,
"created" BIGINT NOT NULL,
"node" TEXT NOT NULL,
"msg_type" TEXT NOT NULL,
"metadata" TEXT
);

CREATE UNIQUE INDEX reliable_msgs_id ON reliable_msgs ("id");
CREATE INDEX reliable_msgs_node ON reliable_msgs ("node");
CREATE INDEX reliable_msgs_created ON reliable_msgs ("created");

CREATE TABLE reliable_msg_acks (
"id" UUID NOT NULL,
"time" BIGINT NOT NULL,
"error" TEXT,
PRIMARY KEY ("id"),
FOREIGN KEY ("id") REFERENCES reliable_msgs ("id") ON DELETE CASCADE
);


36 changes: 19 additions & 17 deletions core/go/internal/components/privatetxmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

"github.com/google/uuid"
"github.com/hyperledger/firefly-signer/pkg/abi"
"github.com/kaleido-io/paladin/toolkit/pkg/tktypes"
"gorm.io/gorm"
)

Expand Down Expand Up @@ -51,29 +52,28 @@ type PrivateTxStatus struct {
FailureMessage string `json:"failureMessage,omitempty"`
}

// If we had lots of these we would probably want to centralize the assignment of the constants to avoid duplication
// but currently there is only 2 ( the other being IDENTITY_RESOLVER_DESTINATION )
const PRIVATE_TX_MANAGER_DESTINATION = "private-tx-manager"

type StateDistributionSet struct {
LocalNode string
SenderNode string
Remote []*StateDistribution
Local []*StateDistribution
Remote []*StateDistributionWithData
Local []*StateDistributionWithData
}

// A StateDistribution is an intent to send private data for a given state to a remote party
type StateDistribution struct {
ID string
StateID string
IdentityLocator string
Domain string
ContractAddress string
SchemaID string
StateDataJson string
NullifierAlgorithm *string
NullifierVerifierType *string
NullifierPayloadType *string
StateID string `json:"stateId"`
IdentityLocator string `json:"identityLocator"`
Domain string `json:"domain"`
ContractAddress string `json:"contractAddress"`
SchemaID string `json:"schemaId"`
NullifierAlgorithm *string `json:"nullifierAlgorithm"`
NullifierVerifierType *string `json:"nullifierVerifierType"`
NullifierPayloadType *string `json:"nullifierPayloadType"`
}

// A StateDistributionWithData is an intent to send private data for a given state to a remote party
type StateDistributionWithData struct {
StateDistribution
StateData tktypes.RawJSON `json:"stateData"`
}

type PrivateTxManager interface {
Expand All @@ -96,4 +96,6 @@ type PrivateTxManager interface {
PrivateTransactionConfirmed(ctx context.Context, receipt *TxCompletion)

BuildStateDistributions(ctx context.Context, tx *PrivateTransaction) (*StateDistributionSet, error)
BuildNullifier(ctx context.Context, kr KeyResolver, s *StateDistributionWithData) (*NullifierUpsert, error)
BuildNullifiers(ctx context.Context, distributions []*StateDistributionWithData) (nullifiers []*NullifierUpsert, err error)
}
14 changes: 7 additions & 7 deletions core/go/internal/components/statemgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,10 @@ type DomainContext interface {
FindAvailableStates(dbTX *gorm.DB, schemaID tktypes.Bytes32, query *query.QueryJSON) (Schema, []*pldapi.State, error)

// Return a snapshot of all currently known state locks
ExportStateLocks() ([]byte, error)
ExportSnapshot() ([]byte, error)

// ImportStateLocks is used to restore the state of the domain context, by adding a set of locks
ImportStateLocks([]byte) error
// ImportSnapshot is used to restore the state of the domain context, by adding a set of locks
ImportSnapshot([]byte) error

// FindAvailableNullifiers is similar to FindAvailableStates, but for domains that leverage
// nullifiers to record spending.
Expand Down Expand Up @@ -171,10 +171,10 @@ type DomainContext interface {
}

type StateUpsert struct {
ID tktypes.HexBytes
SchemaID tktypes.Bytes32
Data tktypes.RawJSON
CreatedBy *uuid.UUID
ID tktypes.HexBytes `json:"id"`
Schema tktypes.Bytes32 `json:"schema"`
Data tktypes.RawJSON `json:"data"`
CreatedBy *uuid.UUID `json:"createdBy,omitempty"` // not exported
}

type StateUpsertOutsideContext struct {
Expand Down
11 changes: 3 additions & 8 deletions core/go/internal/components/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,9 @@ type TransactionStateRefs struct {
Info []tktypes.HexBytes
}

type PrepareTransactionWithRefs struct {
ID uuid.UUID // ID of the original private transaction
Domain string // domain of the original private transaction
To *tktypes.EthAddress // the private smart contract that was invoked
States TransactionStateRefs // the states associated with the original private transaction
Metadata tktypes.RawJSON // metadta produced from the prepare of the original private transaction, in addition to the prepared transaction
Transaction *pldapi.TransactionInput // the downstream transaction - might be public or private
Sender string // the sender of the original private transaction
type PreparedTransactionWithRefs struct {
*pldapi.PreparedTransactionBase
StateRefs TransactionStateRefs `json:"stateRefs"` // the states associated with the original private transaction
}

type TransactionPreAssembly struct {
Expand Down
Loading

0 comments on commit 7d67e28

Please sign in to comment.