Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/appchain demo #40

Open
wants to merge 11 commits into
base: feature/appchain-demo
Choose a base branch
from
6 changes: 3 additions & 3 deletions common/address.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func Bech32ToAddress(s string) (Address, error) {
if currentAddressHRP == "" {
log.Warn("the address hrp not set yet", "input", s)
} else if currentAddressHRP != hrpDecode {
log.Warn("the address not compare current net", "want", currentAddressHRP, "input", s)
//log.Warn("the address not compare current net", "want", currentAddressHRP, "input", s)
}
return BytesToAddress(converted), nil
}
Expand Down Expand Up @@ -258,7 +258,7 @@ func (a *Address) UnmarshalText(input []byte) error {
return err
}
if !CheckAddressHRP(hrpDecode) {
log.Warn("address prefix does not match the current network", "want", GetAddressHRP(), "have", string(input))
//log.Warn("address prefix does not match the current network", "want", GetAddressHRP(), "have", string(input))
}
a.SetBytes(converted)
return nil
Expand All @@ -277,7 +277,7 @@ func (a *Address) UnmarshalJSON(input []byte) error {
return &json.UnmarshalTypeError{Value: err.Error(), Type: addressT}
}
if !CheckAddressHRP(hrpDecode) {
log.Warn("hrpDecode not compare the current net", "want", GetAddressHRP(), "have", hrpDecode)
log.Trace("hrpDecode not compare the current net", "want", GetAddressHRP(), "have", hrpDecode)
}
a.SetBytes(v)
return nil
Expand Down
41 changes: 30 additions & 11 deletions rootchain/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,14 +90,9 @@ func (em *EventManager) Listen() error {
}
client.SetNameSpace("platon")
newHeadChan := make(chan *types.Header)
newHeadSubscribe, err := client.SubscribeNewHead(context.Background(), newHeadChan)
if err != nil {
close(newHeadChan)
log.Error("listening to the block header fails", "error", err)
return err
}
go em.subscribeRootChainNewHead(newHeadChan, client)

defer func() {
newHeadSubscribe.Unsubscribe()
client.Close()
}()

Expand All @@ -108,10 +103,6 @@ func (em *EventManager) Listen() error {
case <-em.exit:
log.Warn("event listener exit")
return nil
case err := <-newHeadSubscribe.Err():
log.Error("subscription failure", "error", err)
// TODO 处理订阅区块头失败的情况
return err
case resultBlock := <-em.bftResultCh:
// Clear irreversible, already processed rootChain events
stopBlockNumber := types.DecodeStakeExtra(resultBlock.Extra())
Expand Down Expand Up @@ -175,6 +166,34 @@ func (em *EventManager) Listen() error {
}
}

func (em *EventManager) subscribeRootChainNewHead(headerChan chan *types.Header, client *ethclient.Client) error {
var newHeadSubscribe appchain.Subscription
var subChan chan *types.Header
defer func() {
newHeadSubscribe.Unsubscribe()
}()
reSubscribe:
subChan = make(chan *types.Header)
newHeadSubscribe, err := client.SubscribeNewHead(context.Background(), subChan)
if err != nil {
log.Error("listening to the block header fails", "error", err)
return err
}
for {
select {
case <-em.exit:
log.Warn("event listener exit")
return nil
case err := <-newHeadSubscribe.Err():
log.Error("subscription failure && resubscribe", "error", err)
goto reSubscribe
case newHead := <-subChan:
headerChan <- newHead
}
}
return nil
}

type BlockNumberListSort []uint64

func (bnl BlockNumberListSort) Len() int {
Expand Down
9 changes: 5 additions & 4 deletions rpc/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ const (
wsWriteBuffer = 1024
wsPingInterval = 60 * time.Second
wsPingWriteTimeout = 5 * time.Second
wsDefaultReadLimit = 32 * 1024 * 1024
)

var wsBufferPool = new(sync.Pool)
Expand All @@ -58,7 +59,7 @@ func (s *Server) WebsocketHandler(allowedOrigins []string) http.Handler {
log.Debug("WebSocket upgrade failed", "err", err)
return
}
codec := newWebsocketCodec(conn)
codec := newWebsocketCodec(conn, wsDefaultReadLimit)
s.ServeCodec(codec, 0)
})
}
Expand Down Expand Up @@ -195,7 +196,7 @@ func DialWebsocketWithDialer(ctx context.Context, endpoint, origin string, diale
}
return nil, hErr
}
return newWebsocketCodec(conn), nil
return newWebsocketCodec(conn, wsDefaultReadLimit), nil
})
}

Expand Down Expand Up @@ -238,8 +239,8 @@ type websocketCodec struct {
pingReset chan struct{}
}

func newWebsocketCodec(conn *websocket.Conn) ServerCodec {
conn.SetReadLimit(maxRequestContentLength)
func newWebsocketCodec(conn *websocket.Conn, readLimit int64) ServerCodec {
conn.SetReadLimit(readLimit)
wc := &websocketCodec{
jsonCodec: NewFuncCodec(conn, conn.WriteJSON, conn.ReadJSON).(*jsonCodec),
conn: conn,
Expand Down