Skip to content

Commit

Permalink
refactor confirmation logic to be coordinated
Browse files Browse the repository at this point in the history
Signed-off-by: Chengxuan Xing <[email protected]>
  • Loading branch information
Chengxuan committed Sep 13, 2024
1 parent 2112c00 commit bc04c88
Show file tree
Hide file tree
Showing 27 changed files with 671 additions and 1,015 deletions.
22 changes: 11 additions & 11 deletions core/go/internal/engine/baseledgertx/dispatch_action.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,13 +83,13 @@ func (baseLedgerEngine *baseLedgerTxEngine) dispatchAction(ctx context.Context,
}
}

func (te *orchestrator) dispatchAction(ctx context.Context, mtx *baseTypes.ManagedTX, action APIRequestType, response chan<- APIResponse) {
func (oc *orchestrator) dispatchAction(ctx context.Context, mtx *baseTypes.ManagedTX, action APIRequestType, response chan<- APIResponse) {
switch action {
case ActionSuspend, ActionResume:
te.InFlightTxsMux.Lock()
defer te.InFlightTxsMux.Unlock()
oc.InFlightTxsMux.Lock()
defer oc.InFlightTxsMux.Unlock()
var pending *InFlightTransactionStageController
for _, inflight := range te.InFlightTxs {
for _, inflight := range oc.InFlightTxs {
if inflight.stateManager.GetTxID() == mtx.ID {
pending = inflight
break
Expand All @@ -101,21 +101,21 @@ func (te *orchestrator) dispatchAction(ctx context.Context, mtx *baseTypes.Manag
}
if pending == nil {
// transaction not in flight yet, update the DB directly and tell the engine to not pick up the transaction until we completed
te.transactionIDsInStatusUpdate = append(te.transactionIDsInStatusUpdate, mtx.ID)
oc.transactionIDsInStatusUpdate = append(oc.transactionIDsInStatusUpdate, mtx.ID)
go func() {
defer func() {
te.InFlightTxsMux.Lock()
defer te.InFlightTxsMux.Unlock()
newTransactionIDsInStatusUpdate := make([]string, 0, len(te.transactionIDsInStatusUpdate)-1)
for _, txID := range te.transactionIDsInStatusUpdate {
oc.InFlightTxsMux.Lock()
defer oc.InFlightTxsMux.Unlock()
newTransactionIDsInStatusUpdate := make([]string, 0, len(oc.transactionIDsInStatusUpdate)-1)
for _, txID := range oc.transactionIDsInStatusUpdate {
if txID != mtx.ID {
newTransactionIDsInStatusUpdate = append(newTransactionIDsInStatusUpdate, txID)
}
}
te.transactionIDsInStatusUpdate = newTransactionIDsInStatusUpdate
oc.transactionIDsInStatusUpdate = newTransactionIDsInStatusUpdate
}()
log.L(ctx).Debugf("Setting status to '%s' for transaction %s", newStatus, mtx.ID)
err := te.txStore.UpdateTransaction(ctx, mtx.ID, &baseTypes.BaseTXUpdates{
err := oc.txStore.UpdateTransaction(ctx, mtx.ID, &baseTypes.BaseTXUpdates{
Status: &newStatus,
})
if err != nil {
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (

type testInFlightTransactionWithMocksAndConf struct {
it *InFlightTransactionStageController
mCL *enginemocks.TransactionConfirmationListener
mBI *componentmocks.BlockIndexer
mEC *componentmocks.EthClient
mEN *enginemocks.ManagedTxEventNotifier
mTS *enginemocks.TransactionStore
Expand All @@ -46,21 +46,21 @@ func NewTestInFlightTransactionWithMocks(t *testing.T) *testInFlightTransactionW
ble, conf := NewTestTransactionEngine(t)
mockBalanceManager, mEC, _ := NewTestBalanceManager(context.Background(), t)
ble.gasPriceClient = NewTestFixedPriceGasPriceClient(t)
mCL := enginemocks.NewTransactionConfirmationListener(t)
mBI := componentmocks.NewBlockIndexer(t)
mTS := enginemocks.NewTransactionStore(t)
mEN := enginemocks.NewManagedTxEventNotifier(t)
mKM := componentmocks.NewKeyManager(t)
ble.Init(ctx, mEC, mKM, mTS, mEN, mCL)
ble.Init(ctx, mEC, mKM, mTS, mEN, mBI)
ble.ctx = ctx
ble.balanceManager = mockBalanceManager
orchestratorConf := conf.SubSection(OrchestratorSection)
te := NewOrchestrator(ble, imtxs.GetFrom(), orchestratorConf)
it := NewInFlightTransactionStageController(ble, te, imtxs.GetTx())
oc := NewOrchestrator(ble, imtxs.GetFrom(), orchestratorConf)
it := NewInFlightTransactionStageController(ble, oc, imtxs.GetTx())
it.timeLineLoggingEnabled = true
it.testOnlyNoActionMode = true
return &testInFlightTransactionWithMocksAndConf{
it: it,
mCL: mCL,
mBI: mBI,
mEC: mEC,
mEN: mEN,
mTS: mTS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func TestProduceLatestInFlightStageContextConfirming(t *testing.T) {
assert.True(t, tOut.TransactionSubmitted)
rsc = it.stateManager.GetRunningStageContext(ctx)
assert.NotNil(t, rsc.StageOutputsToBePersisted)
assert.Equal(t, testConfirmation, rsc.StageOutputsToBePersisted.Confirmations)
assert.Equal(t, testConfirmation, rsc.StageOutputsToBePersisted.IndexedTransaction)

// persisting error waiting for persistence retry timeout
rsc.StageErrored = false
Expand Down Expand Up @@ -312,7 +312,7 @@ func TestProduceLatestInFlightStageContextSanityChecksForCompletedTransactions(t
ProtocolID: "0000/0001",
}

imtxs.Receipt = testReceipt
imtxs.IndexedTransaction = testReceipt
tOut := it.ProduceLatestInFlightStageContext(ctx, &baseTypes.OrchestratorContext{
AvailableToSpend: nil,
PreviousNonceCostUnknown: false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,11 +142,11 @@ func TestProduceLatestInFlightStageContextReceiptingCheckExistingHashes(t *testi
// set existing transaction hashes
oldHash := mtx.TransactionHash
imtxs.policyInfo = &baseTypes.EnterprisePolicyInfo{
SubmittedTxHashes: []string{oldHash, "hash1"},
SubmittedHashes: []string{oldHash, "hash1"},
}
called := make(chan bool, 3)
mEC := it.ethClient.(*componentmocks.EthClient)
mEC.On("GetTransactionReceipt", ctx, "hash1").Run(func(args mock.Arguments) {
mEC.On("GetIndexedTransaction", ctx, "hash1").Run(func(args mock.Arguments) {
called <- true
}).Return(nil, nil).Once()
mTS.On("AddSubStatusAction", mock.Anything, mtx.ID, baseTypes.BaseTxSubStatusTracking, baseTypes.BaseTxActionStateTransition, fftypes.JSONAnyPtr(`{"txHash":"hash1"}`), (*fftypes.JSONAny)(nil), mock.Anything).Return(nil).Maybe()
Expand All @@ -155,9 +155,9 @@ func TestProduceLatestInFlightStageContextReceiptingCheckExistingHashes(t *testi
persistenceCalled <- true
}).Return(nil).Maybe()

mCL := testInFlightTransactionStateManagerWithMocks.mCL
addMock := mCL.On("Add", mock.Anything, mtx.ID, "hash1", mock.Anything, mock.Anything)
mCL.On("Remove", mock.Anything, oldHash).Return(nil).Maybe()
mBI := testInFlightTransactionStateManagerWithMocks.mBI
addMock := mBI.On("Add", mock.Anything, mtx.ID, "hash1", mock.Anything, mock.Anything)
mBI.On("Remove", mock.Anything, oldHash).Return(nil).Maybe()
eventHandlerCalled := make(chan bool, 3)
addMock.Run(func(args mock.Arguments) {
addMock.Return(nil)
Expand Down Expand Up @@ -192,11 +192,11 @@ func TestProduceLatestInFlightStageContextReceiptingCheckExistingHashesPersisten
mTS := testInFlightTransactionStateManagerWithMocks.mTS
// set existing transaction hashes
imtxs.policyInfo = &baseTypes.EnterprisePolicyInfo{
SubmittedTxHashes: []string{mtx.TransactionHash, "hash1"},
SubmittedHashes: []string{mtx.TransactionHash, "hash1"},
}
called := make(chan bool, 3)
mEC := testInFlightTransactionStateManagerWithMocks.mEC
mEC.On("GetTransactionReceipt", ctx, "hash1").Run(func(args mock.Arguments) {
mEC.On("GetIndexedTransaction", ctx, "hash1").Run(func(args mock.Arguments) {
called <- true
}).Return(nil, nil).Once()
mTS.On("AddSubStatusAction", mock.Anything, mtx.ID, baseTypes.BaseTxSubStatusTracking, baseTypes.BaseTxActionStateTransition, fftypes.JSONAnyPtr(`{"txHash":"hash1"}`), (*fftypes.JSONAny)(nil), mock.Anything).Return(nil).Maybe()
Expand Down Expand Up @@ -233,11 +233,11 @@ func TestProduceLatestInFlightStageContextReceiptingCheckExistingHashesTrackingF
// set existing transaction hashes
oldHash := mtx.TransactionHash
imtxs.policyInfo = &baseTypes.EnterprisePolicyInfo{
SubmittedTxHashes: []string{oldHash, "hash1"},
SubmittedHashes: []string{oldHash, "hash1"},
}
getReceiptCalled := make(chan bool, 3)
mEC := testInFlightTransactionStateManagerWithMocks.mEC
mEC.On("GetTransactionReceipt", ctx, "hash1").Run(func(args mock.Arguments) {
mEC.On("GetIndexedTransaction", ctx, "hash1").Run(func(args mock.Arguments) {
getReceiptCalled <- true
}).Return(nil, nil).Once()
mTS.On("AddSubStatusAction", mock.Anything, mtx.ID, baseTypes.BaseTxSubStatusTracking, baseTypes.BaseTxActionStateTransition, fftypes.JSONAnyPtr(`{"txHash":"hash1"}`), (*fftypes.JSONAny)(nil), mock.Anything).Return(nil).Maybe()
Expand All @@ -246,9 +246,9 @@ func TestProduceLatestInFlightStageContextReceiptingCheckExistingHashesTrackingF
persistenceCalled <- true
}).Return(nil).Maybe()

mCL := testInFlightTransactionStateManagerWithMocks.mCL
addMock := mCL.On("Add", mock.Anything, mtx.ID, "hash1", mock.Anything, mock.Anything)
mCL.On("Remove", mock.Anything, oldHash).Return(nil).Maybe()
mBI := testInFlightTransactionStateManagerWithMocks.mBI
addMock := mBI.On("Add", mock.Anything, mtx.ID, "hash1", mock.Anything, mock.Anything)
mBI.On("Remove", mock.Anything, oldHash).Return(nil).Maybe()
eventHandlerCalled := make(chan bool, 3)
addMock.Run(func(args mock.Arguments) {
addMock.Return(fmt.Errorf("failed to add"))
Expand Down Expand Up @@ -286,10 +286,10 @@ func TestProduceLatestInFlightStageContextReceiptingExceededTimeout(t *testing.T

assert.Equal(t, baseTypes.InFlightTxStageReceipting, inFlightStageMananger.stage)

mCL := testInFlightTransactionStateManagerWithMocks.mCL
mCL.On("Add", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe()
mBI := testInFlightTransactionStateManagerWithMocks.mBI
mBI.On("Add", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe()

removeMock := mCL.On("Remove", mock.Anything, mock.Anything)
removeMock := mBI.On("Remove", mock.Anything, mock.Anything)
removeMock.Run(func(args mock.Arguments) {
removeMock.Return(nil)
}).Maybe()
Expand Down Expand Up @@ -327,9 +327,9 @@ func TestProduceLatestInFlightStageContextReceiptingExceededTimeoutIgnoreRemoval
assert.True(t, tOut.TransactionSubmitted)

assert.Equal(t, baseTypes.InFlightTxStageReceipting, inFlightStageMananger.stage)
mCL := testInFlightTransactionStateManagerWithMocks.mCL
removeMock := mCL.On("Remove", mock.Anything, mock.Anything)
mCL.On("Add", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe()
mBI := testInFlightTransactionStateManagerWithMocks.mBI
removeMock := mBI.On("Remove", mock.Anything, mock.Anything)
mBI.On("Add", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe()

removeCalled := make(chan struct{})
removeMock.Run(func(args mock.Arguments) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func TestProduceLatestInFlightStageContextSubmitComplete(t *testing.T) {
mTS := testInFlightTransactionStateManagerWithMocks.mTS
mtx.FirstSubmit = nil
imtxs.policyInfo = &baseTypes.EnterprisePolicyInfo{
SubmittedTxHashes: []string{},
SubmittedHashes: []string{},
}

submissionTime := fftypes.Now()
Expand All @@ -93,7 +93,7 @@ func TestProduceLatestInFlightStageContextSubmitComplete(t *testing.T) {
assert.Equal(t, 1, len(rsc.StageOutputsToBePersisted.HistoryUpdates))
mTS.On("AddSubStatusAction", mock.Anything, mtx.ID, baseTypes.BaseTxSubStatusReceived, baseTypes.BaseTxActionSubmitTransaction, fftypes.JSONAnyPtr(`{"txHash":"`+txHash+`"}`), (*fftypes.JSONAny)(nil), mock.Anything).Return(nil).Maybe()
_ = rsc.StageOutputsToBePersisted.HistoryUpdates[0](mTS)
assert.Equal(t, []string{txHash}, rsc.StageOutputsToBePersisted.PolicyInfo.SubmittedTxHashes)
assert.Equal(t, []string{txHash}, rsc.StageOutputsToBePersisted.PolicyInfo.SubmittedHashes)
assert.Equal(t, submissionTime, rsc.StageOutputsToBePersisted.TxUpdates.FirstSubmit)
assert.Equal(t, submissionTime, rsc.StageOutputsToBePersisted.TxUpdates.LastSubmit)

Expand All @@ -113,7 +113,7 @@ func TestProduceLatestInFlightStageContextSubmitComplete(t *testing.T) {
assert.NotNil(t, rsc.StageOutputsToBePersisted)
mTS.On("AddSubStatusAction", mock.Anything, mtx.ID, baseTypes.BaseTxSubStatusReceived, baseTypes.BaseTxActionSubmitTransaction, fftypes.JSONAnyPtr(`{"txHash":"`+txHash+`"}`), (*fftypes.JSONAny)(nil), mock.Anything).Return(nil).Maybe()
_ = rsc.StageOutputsToBePersisted.HistoryUpdates[0](mTS)
assert.Equal(t, []string{txHash}, rsc.StageOutputsToBePersisted.PolicyInfo.SubmittedTxHashes)
assert.Equal(t, []string{txHash}, rsc.StageOutputsToBePersisted.PolicyInfo.SubmittedHashes)
assert.Equal(t, submissionTime, rsc.StageOutputsToBePersisted.TxUpdates.LastSubmit)
assert.Equal(t, txHash, *rsc.StageOutputsToBePersisted.TxUpdates.TransactionHash)
}
Expand All @@ -135,7 +135,7 @@ func TestProduceLatestInFlightStageContextCannotSubmit(t *testing.T) {
mtx.TransactionHash = ""
mtx.FirstSubmit = nil
imtxs.policyInfo = &baseTypes.EnterprisePolicyInfo{
SubmittedTxHashes: []string{},
SubmittedHashes: []string{},
}

tOut := it.ProduceLatestInFlightStageContext(ctx, &baseTypes.OrchestratorContext{
Expand All @@ -150,7 +150,7 @@ func TestProduceLatestInFlightStageContextCannotSubmit(t *testing.T) {
inFlightStageMananger.bufferedStageOutputs = make([]*baseTypes.StageOutput, 0)
mtx.TransactionHash = "test"
imtxs.policyInfo = &baseTypes.EnterprisePolicyInfo{
SubmittedTxHashes: []string{},
SubmittedHashes: []string{},
}
mtx.GasLimit = ethtypes.NewHexInteger64(-1) // invalid limit
it.stateManager.SetValidatedTransactionHashMatchState(ctx, false)
Expand Down Expand Up @@ -180,7 +180,7 @@ func TestProduceLatestInFlightStageContextSubmitCompleteAlreadyKnown(t *testing.
mtx.TransactionHash = ""
mtx.FirstSubmit = nil
imtxs.policyInfo = &baseTypes.EnterprisePolicyInfo{
SubmittedTxHashes: []string{},
SubmittedHashes: []string{},
}

submissionTime := fftypes.Now()
Expand All @@ -190,7 +190,7 @@ func TestProduceLatestInFlightStageContextSubmitCompleteAlreadyKnown(t *testing.
mtx.FirstSubmit = fftypes.Now()
rsc = it.stateManager.GetRunningStageContext(ctx)
imtxs.policyInfo = &baseTypes.EnterprisePolicyInfo{
SubmittedTxHashes: []string{txHash},
SubmittedHashes: []string{txHash},
}
it.stateManager.AddSubmitOutput(ctx, txHash, submissionTime, baseTypes.SubmissionOutcomeAlreadyKnown, ethclient.ErrorReason(""), nil)
tOut := it.ProduceLatestInFlightStageContext(ctx, &baseTypes.OrchestratorContext{
Expand All @@ -202,7 +202,7 @@ func TestProduceLatestInFlightStageContextSubmitCompleteAlreadyKnown(t *testing.
assert.Equal(t, baseTypes.InFlightTxStageSubmitting, rsc.Stage)
assert.NotNil(t, rsc.StageOutputsToBePersisted)
assert.Empty(t, rsc.StageOutputsToBePersisted.HistoryUpdates)
assert.Equal(t, []string{txHash}, rsc.StageOutputsToBePersisted.PolicyInfo.SubmittedTxHashes)
assert.Equal(t, []string{txHash}, rsc.StageOutputsToBePersisted.PolicyInfo.SubmittedHashes)
}
func TestProduceLatestInFlightStageContextSubmitErrors(t *testing.T) {
ctx := context.Background()
Expand All @@ -222,7 +222,7 @@ func TestProduceLatestInFlightStageContextSubmitErrors(t *testing.T) {
mtx.TransactionHash = ""
mtx.FirstSubmit = nil
imtxs.policyInfo = &baseTypes.EnterprisePolicyInfo{
SubmittedTxHashes: []string{},
SubmittedHashes: []string{},
}

submissionTime := fftypes.Now()
Expand Down Expand Up @@ -339,7 +339,7 @@ func TestProduceLatestInFlightStageContextSubmitRePrepare(t *testing.T) {
mtx.TransactionHash = ""
mtx.FirstSubmit = nil
imtxs.policyInfo = &baseTypes.EnterprisePolicyInfo{
SubmittedTxHashes: []string{},
SubmittedHashes: []string{},
}

// persisted stage error - require re-preparation
Expand Down Expand Up @@ -374,7 +374,7 @@ func TestProduceLatestInFlightStageContextSubmitSuccess(t *testing.T) {

imtxs := inFlightStageMananger.InMemoryTxStateManager.(*inMemoryTxState)
imtxs.policyInfo = &baseTypes.EnterprisePolicyInfo{
SubmittedTxHashes: []string{},
SubmittedHashes: []string{},
}

// persisted stage error - require re-preparation
Expand Down
Loading

0 comments on commit bc04c88

Please sign in to comment.