Skip to content

Commit

Permalink
Work through JSON/RPC subscription to events
Browse files Browse the repository at this point in the history
Signed-off-by: Peter Broadhurst <[email protected]>
  • Loading branch information
peterbroadhurst committed Jan 18, 2025
1 parent f43860c commit e700e74
Show file tree
Hide file tree
Showing 9 changed files with 266 additions and 40 deletions.
1 change: 1 addition & 0 deletions core/go/internal/txmgr/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,5 +101,6 @@ func (tm *txManager) Start() error {
}

func (tm *txManager) Stop() {
tm.rpcEventStreams.stop()
tm.stopReceiptListeners()
}
57 changes: 38 additions & 19 deletions core/go/internal/txmgr/rpc_eventstreams.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ type receiptListenerSubscription struct {
rrc components.ReceiptReceiverCloser
ctrl rpcserver.RPCAsyncControl
acksNacks chan *rpcAckNack
closed chan struct{}
}

func (es *rpcEventStreams) HandleStart(ctx context.Context, req *rpcclient.RPCRequest, ctrl rpcserver.RPCAsyncControl) (rpcserver.RPCAsyncInstance, *rpcclient.RPCResponse) {
Expand All @@ -87,6 +88,7 @@ func (es *rpcEventStreams) HandleStart(ctx context.Context, req *rpcclient.RPCRe
es: es,
ctrl: ctrl,
acksNacks: make(chan *rpcAckNack, 1),
closed: make(chan struct{}),
}
es.receiptSubs[ctrl.ID()] = sub
var err error
Expand All @@ -102,17 +104,21 @@ func (es *rpcEventStreams) HandleStart(ctx context.Context, req *rpcclient.RPCRe
}
}

func (es *rpcEventStreams) popSubForUnsubscribe(subID string) *receiptListenerSubscription {
func (es *rpcEventStreams) cleanupSubscription(subID string) {
es.subLock.Lock()
defer es.subLock.Unlock()

sub := es.receiptSubs[subID]
if sub != nil {
delete(es.receiptSubs, subID)
return sub
es.cleanupLocked(sub)
}
}

func (es *rpcEventStreams) getSubscription(subID string) *receiptListenerSubscription {
es.subLock.Lock()
defer es.subLock.Unlock()

return nil
return es.receiptSubs[subID]
}

func (es *rpcEventStreams) HandleLifecycle(ctx context.Context, req *rpcclient.RPCRequest) *rpcclient.RPCResponse {
Expand All @@ -121,7 +127,7 @@ func (es *rpcEventStreams) HandleLifecycle(ctx context.Context, req *rpcclient.R
return rpcclient.NewRPCErrorResponse(i18n.NewError(ctx, msgs.MsgTxMgrSubIDRequired), req.ID, rpcclient.RPCCodeInvalidRequest)
}
subID := req.Params[0].AsString()
sub := es.popSubForUnsubscribe(subID)
sub := es.getSubscription(subID)
switch req.Method {
case "ptx_ack", "ptx_nack":
if sub != nil {
Expand All @@ -136,6 +142,7 @@ func (es *rpcEventStreams) HandleLifecycle(ctx context.Context, req *rpcclient.R
if sub != nil {
sub.ctrl.Closed()
}
es.cleanupSubscription(subID)
return &rpcclient.RPCResponse{
JSONRpc: "2.0",
ID: req.ID,
Expand All @@ -149,28 +156,40 @@ func (es *rpcEventStreams) HandleLifecycle(ctx context.Context, req *rpcclient.R

func (sub *receiptListenerSubscription) DeliverReceiptBatch(ctx context.Context, batchID uint64, receipts []*pldapi.TransactionReceiptFull) error {
log.L(ctx).Infof("Delivering receipt batch %d to subscription %s over JSON/RPC", batchID, sub.ctrl.ID())
sub.ctrl.Send("ptx_receiptBatch", receipts)
ackNack, ok := <-sub.acksNacks
if !ok {
sub.ctrl.Send("ptx_receiptBatch", &pldapi.TransactionReceiptBatch{
Subscription: sub.ctrl.ID(),
Batch: batchID,
Receipts: receipts,
})
select {
case ackNack := <-sub.acksNacks:
if !ackNack.ack {
log.L(ctx).Warnf("Batch %d negatively acknowledged by subscription %s over JSON/RPC", batchID, sub.ctrl.ID())
return i18n.NewError(ctx, msgs.MsgTxMgrJSONRPCSubscriptionNack, sub.ctrl.ID())
}
log.L(ctx).Infof("Batch %d acknowledged by subscription %s over JSON/RPC", batchID, sub.ctrl.ID())
return nil
case <-sub.closed:
return i18n.NewError(ctx, msgs.MsgTxMgrJSONRPCSubscriptionClosed, sub.ctrl.ID())
}
if !ackNack.ack {
log.L(ctx).Warnf("Batch %d negatively acknowledged by subscription %s over JSON/RPC", batchID, sub.ctrl.ID())
return i18n.NewError(ctx, msgs.MsgTxMgrJSONRPCSubscriptionNack, sub.ctrl.ID())
}
log.L(ctx).Infof("Batch %d acknowledged by subscription %s over JSON/RPC", batchID, sub.ctrl.ID())
return nil
}

func (sub *receiptListenerSubscription) ConnectionClosed() {
sub.es.cleanupSub(sub)
sub.es.cleanupSubscription(sub.ctrl.ID())
}

func (es *rpcEventStreams) cleanupSub(sub *receiptListenerSubscription) {
func (es *rpcEventStreams) cleanupLocked(sub *receiptListenerSubscription) {
delete(sub.es.receiptSubs, sub.ctrl.ID())
sub.rrc.Close()
close(sub.closed)
}

func (es *rpcEventStreams) stop() {
es.subLock.Lock()
defer es.subLock.Unlock()

close(sub.acksNacks) // cancels any DeliverReceiptBatch() in-flight
sub.rrc.Close()
delete(es.receiptSubs, sub.ctrl.ID())
for _, sub := range es.receiptSubs {
es.cleanupLocked(sub)
}

}
161 changes: 161 additions & 0 deletions core/go/internal/txmgr/rpc_eventstreams_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
/*
* Copyright © 2024 Kaleido, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/

package txmgr

import (
"encoding/json"
"fmt"
"sync/atomic"
"testing"

"github.com/google/uuid"
"github.com/hyperledger/firefly-common/pkg/fftypes"
"github.com/hyperledger/firefly-common/pkg/wsclient"
"github.com/hyperledger/firefly-signer/pkg/rpcbackend"
"github.com/kaleido-io/paladin/config/pkg/pldconf"
"github.com/kaleido-io/paladin/core/internal/components"
"github.com/kaleido-io/paladin/toolkit/pkg/pldapi"
"github.com/kaleido-io/paladin/toolkit/pkg/rpcclient"
"github.com/kaleido-io/paladin/toolkit/pkg/tktypes"
"github.com/stretchr/testify/require"
)

var nextReq atomic.Uint64

func rpcTestRequest(method string, params ...any) (uint64, []byte) {
reqID := nextReq.Add(1)
jsonParams := make([]*fftypes.JSONAny, len(params))
for i, p := range params {
jsonParams[i] = fftypes.JSONAnyPtr(tktypes.JSONString((p)).Pretty())
}
req := &rpcbackend.RPCRequest{
JSONRpc: "2.0",
ID: fftypes.JSONAnyPtr(fmt.Sprintf("%d", reqID)),
Method: method,
Params: jsonParams,
}
return reqID, []byte(tktypes.JSONString((req)).Pretty())
}

func TestRPCEventListenerE2E(t *testing.T) {
ctx, url, txm, done := newTestTransactionManagerWithWebSocketRPC(t)
defer done()

wscConf, err := rpcclient.ParseWSConfig(ctx, &pldconf.WSClientConfig{
HTTPClientConfig: pldconf.HTTPClientConfig{URL: url},
})
require.NoError(t, err)

err = txm.CreateReceiptListener(ctx, &pldapi.TransactionReceiptListener{
Name: "listener1",
})
require.NoError(t, err)

wsc, err := wsclient.New(ctx, wscConf, nil, nil)
require.NoError(t, err)
err = wsc.Connect()
require.NoError(t, err)
defer wsc.Close()

subReqID, req := rpcTestRequest("ptx_subscribe", "receipts", "listener1")
err = wsc.Send(ctx, req)
require.NoError(t, err)

subIDChan := make(chan string)
unSubChan := make(chan bool)
receipts := make(chan *pldapi.TransactionReceiptFull)
var unSubReqID atomic.Uint64
var subID atomic.Pointer[string]

go func() {
for payload := range wsc.Receive() {
var rpcPayload *rpcbackend.RPCResponse
err := json.Unmarshal(payload, &rpcPayload)
require.NoError(t, err)

if rpcPayload.Error != nil {
require.NoError(t, rpcPayload.Error.Error())
}

if !rpcPayload.ID.IsNil() {
var rpcID uint64
err := json.Unmarshal(rpcPayload.ID.Bytes(), &rpcID)
require.NoError(t, err)

switch rpcID {
case subReqID: // Subscribe reply
subIDChan <- rpcPayload.Result.AsString()
case unSubReqID.Load(): // Unsubscribe reply
unSubChan <- true
}
}

if rpcPayload.Method == "ptx_receiptBatch" {
var batchPayload pldapi.TransactionReceiptBatch
err := json.Unmarshal(rpcPayload.Params.Bytes(), &batchPayload)
require.NoError(t, err)

for _, r := range batchPayload.Receipts {
receipts <- r
}

_, req := rpcTestRequest("ptx_ack", *subID.Load())
err = wsc.Send(ctx, req)
require.NoError(t, err)
}

}
}()

txs := make([]*components.ReceiptInput, 6)
for i := 0; i < len(txs); i++ {
txs[i] = &components.ReceiptInput{
ReceiptType: components.RT_Success,
TransactionID: uuid.New(),
OnChain: randOnChain(tktypes.RandAddress()),
}
}

// Send first 3
postCommit, err := txm.FinalizeTransactions(ctx, txm.p.DB(), txs[0:3])
require.NoError(t, err)
postCommit()

subIDStr := <-subIDChan
_, err = uuid.Parse(subIDStr)
require.NoError(t, err)
subID.Store(&subIDStr)

for i := 0; i < 3; i++ {
require.Equal(t, txs[i].TransactionID, (<-receipts).ID)
}

// Send rest
postCommit, err = txm.FinalizeTransactions(ctx, txm.p.DB(), txs[3:])
require.NoError(t, err)
postCommit()

for i := 3; i < len(txs); i++ {
require.Equal(t, txs[i].TransactionID, (<-receipts).ID)
}

reqID, req := rpcTestRequest("ptx_unsubscribe", subIDStr)
unSubReqID.Store(reqID)
err = wsc.Send(ctx, req)
require.NoError(t, err)
<-unSubChan

}
27 changes: 27 additions & 0 deletions core/go/internal/txmgr/rpcmodule_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,33 @@ func newTestTransactionManagerWithRPC(t *testing.T, init ...func(*pldconf.TxMana

}

func newTestTransactionManagerWithWebSocketRPC(t *testing.T, init ...func(*pldconf.TxManagerConfig, *mockComponents)) (context.Context, string, *txManager, func()) {
ctx, txm, txmDone := newTestTransactionManager(t, true, init...)

rpcServer, err := rpcserver.NewRPCServer(ctx, &pldconf.RPCServerConfig{
HTTP: pldconf.RPCServerConfigHTTP{Disabled: true},
WS: pldconf.RPCServerConfigWS{
HTTPServerConfig: pldconf.HTTPServerConfig{
Port: confutil.P(0),
ShutdownTimeout: confutil.P("0"),
},
},
})
require.NoError(t, err)

rpcServer.Register(txm.rpcModule)
rpcServer.Register(txm.debugRpcModule)

err = rpcServer.Start()
require.NoError(t, err)

return ctx, fmt.Sprintf("ws://%s", rpcServer.WSAddr()), txm, func() {
txmDone()
rpcServer.Stop()
}

}

func mockResolveKeyOKThenFail(t *testing.T, mc *mockComponents, identifier string, senderAddr *tktypes.EthAddress) {
kr := mockKeyResolverForFail(t, mc)
kr.On("ResolveKey", identifier, algorithms.ECDSA_SECP256K1, verifiers.ETH_ADDRESS).
Expand Down
6 changes: 6 additions & 0 deletions toolkit/go/pkg/pldapi/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,12 @@ type TransactionReceiptFull struct {
DomainReceiptError string `docstruct:"TransactionReceiptFull" json:"domainReceiptError,omitempty"`
}

type TransactionReceiptBatch struct {
Subscription string `docstruct:"TransactionReceiptBatch" json:"subscription,omitempty"`
Batch uint64 `docstruct:"TransactionReceiptBatch" json:"batch,omitempty"`
Receipts []*TransactionReceiptFull `docstruct:"TransactionReceiptBatch" json:"receipts,omitempty"`
}

type TransactionReceiptDataOnchain struct {
TransactionHash *tktypes.Bytes32 `docstruct:"TransactionReceiptDataOnchain" json:"transactionHash,omitempty"`
BlockNumber int64 `docstruct:"TransactionReceiptDataOnchain" json:"blockNumber,omitempty"`
Expand Down
30 changes: 21 additions & 9 deletions toolkit/go/pkg/rpcserver/rpchandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,13 @@ import (
"github.com/kaleido-io/paladin/toolkit/pkg/tktypes"
)

func (s *rpcServer) rpcHandler(ctx context.Context, r io.Reader, wsc *webSocketConnection) (interface{}, bool) {
type handlerResult struct {
sendRes bool
isOK bool
res any
}

func (s *rpcServer) rpcHandler(ctx context.Context, r io.Reader, wsc *webSocketConnection) handlerResult {

b, err := io.ReadAll(r)
if err != nil {
Expand All @@ -45,25 +51,31 @@ func (s *rpcServer) rpcHandler(ctx context.Context, r io.Reader, wsc *webSocketC
log.L(ctx).Errorf("Bad RPC array received %s", b)
return s.replyRPCParseError(ctx, b, err)
}
return s.handleRPCBatch(ctx, rpcArray, wsc)
batchRes, isOK := s.handleRPCBatch(ctx, rpcArray, wsc)
return handlerResult{isOK: isOK, sendRes: true, res: batchRes}
}

var rpcRequest rpcclient.RPCRequest
err = json.Unmarshal(b, &rpcRequest)
if err != nil {
return s.replyRPCParseError(ctx, b, err)
}
return s.processRPC(ctx, &rpcRequest, wsc)
res, isOK := s.processRPC(ctx, &rpcRequest, wsc)
return handlerResult{isOK: isOK, sendRes: res != nil, res: res}

}

func (s *rpcServer) replyRPCParseError(ctx context.Context, b []byte, err error) (*rpcclient.RPCResponse, bool) {
func (s *rpcServer) replyRPCParseError(ctx context.Context, b []byte, err error) handlerResult {
log.L(ctx).Errorf("Request could not be parsed (err=%v): %s", err, b)
return rpcclient.NewRPCErrorResponse(
i18n.NewError(ctx, tkmsgs.MsgJSONRPCInvalidRequest),
tktypes.RawJSON(`"1"`),
rpcclient.RPCCodeInvalidRequest,
), false
return handlerResult{
isOK: false,
sendRes: true,
res: rpcclient.NewRPCErrorResponse(
i18n.NewError(ctx, tkmsgs.MsgJSONRPCInvalidRequest),
tktypes.RawJSON(`"1"`),
rpcclient.RPCCodeInvalidRequest,
),
}
}

func (s *rpcServer) sniffFirstByte(data []byte) byte {
Expand Down
Loading

0 comments on commit e700e74

Please sign in to comment.