diff --git a/common/address.go b/common/address.go index b457efab14..faa1ef53e9 100644 --- a/common/address.go +++ b/common/address.go @@ -99,7 +99,7 @@ func Bech32ToAddress(s string) (Address, error) { if currentAddressHRP == "" { log.Warn("the address hrp not set yet", "input", s) } else if currentAddressHRP != hrpDecode { - log.Warn("the address not compare current net", "want", currentAddressHRP, "input", s) + //log.Warn("the address not compare current net", "want", currentAddressHRP, "input", s) } return BytesToAddress(converted), nil } @@ -258,7 +258,7 @@ func (a *Address) UnmarshalText(input []byte) error { return err } if !CheckAddressHRP(hrpDecode) { - log.Warn("address prefix does not match the current network", "want", GetAddressHRP(), "have", string(input)) + //log.Warn("address prefix does not match the current network", "want", GetAddressHRP(), "have", string(input)) } a.SetBytes(converted) return nil @@ -277,7 +277,7 @@ func (a *Address) UnmarshalJSON(input []byte) error { return &json.UnmarshalTypeError{Value: err.Error(), Type: addressT} } if !CheckAddressHRP(hrpDecode) { - log.Warn("hrpDecode not compare the current net", "want", GetAddressHRP(), "have", hrpDecode) + log.Trace("hrpDecode not compare the current net", "want", GetAddressHRP(), "have", hrpDecode) } a.SetBytes(v) return nil diff --git a/rootchain/handler.go b/rootchain/handler.go index 8f1676a3dd..1db0aede82 100644 --- a/rootchain/handler.go +++ b/rootchain/handler.go @@ -90,14 +90,9 @@ func (em *EventManager) Listen() error { } client.SetNameSpace("platon") newHeadChan := make(chan *types.Header) - newHeadSubscribe, err := client.SubscribeNewHead(context.Background(), newHeadChan) - if err != nil { - close(newHeadChan) - log.Error("listening to the block header fails", "error", err) - return err - } + go em.subscribeRootChainNewHead(newHeadChan, client) + defer func() { - newHeadSubscribe.Unsubscribe() client.Close() }() @@ -108,10 +103,6 @@ func (em *EventManager) Listen() error { case <-em.exit: log.Warn("event listener exit") return nil - case err := <-newHeadSubscribe.Err(): - log.Error("subscription failure", "error", err) - // TODO 处理订阅区块头失败的情况 - return err case resultBlock := <-em.bftResultCh: // Clear irreversible, already processed rootChain events stopBlockNumber := types.DecodeStakeExtra(resultBlock.Extra()) @@ -175,6 +166,34 @@ func (em *EventManager) Listen() error { } } +func (em *EventManager) subscribeRootChainNewHead(headerChan chan *types.Header, client *ethclient.Client) error { + var newHeadSubscribe appchain.Subscription + var subChan chan *types.Header + defer func() { + newHeadSubscribe.Unsubscribe() + }() +reSubscribe: + subChan = make(chan *types.Header) + newHeadSubscribe, err := client.SubscribeNewHead(context.Background(), subChan) + if err != nil { + log.Error("listening to the block header fails", "error", err) + return err + } + for { + select { + case <-em.exit: + log.Warn("event listener exit") + return nil + case err := <-newHeadSubscribe.Err(): + log.Error("subscription failure && resubscribe", "error", err) + goto reSubscribe + case newHead := <-subChan: + headerChan <- newHead + } + } + return nil +} + type BlockNumberListSort []uint64 func (bnl BlockNumberListSort) Len() int { diff --git a/rpc/websocket.go b/rpc/websocket.go index e06ee751a1..0981d42a05 100644 --- a/rpc/websocket.go +++ b/rpc/websocket.go @@ -37,6 +37,7 @@ const ( wsWriteBuffer = 1024 wsPingInterval = 60 * time.Second wsPingWriteTimeout = 5 * time.Second + wsDefaultReadLimit = 32 * 1024 * 1024 ) var wsBufferPool = new(sync.Pool) @@ -58,7 +59,7 @@ func (s *Server) WebsocketHandler(allowedOrigins []string) http.Handler { log.Debug("WebSocket upgrade failed", "err", err) return } - codec := newWebsocketCodec(conn) + codec := newWebsocketCodec(conn, wsDefaultReadLimit) s.ServeCodec(codec, 0) }) } @@ -195,7 +196,7 @@ func DialWebsocketWithDialer(ctx context.Context, endpoint, origin string, diale } return nil, hErr } - return newWebsocketCodec(conn), nil + return newWebsocketCodec(conn, wsDefaultReadLimit), nil }) } @@ -238,8 +239,8 @@ type websocketCodec struct { pingReset chan struct{} } -func newWebsocketCodec(conn *websocket.Conn) ServerCodec { - conn.SetReadLimit(maxRequestContentLength) +func newWebsocketCodec(conn *websocket.Conn, readLimit int64) ServerCodec { + conn.SetReadLimit(readLimit) wc := &websocketCodec{ jsonCodec: NewFuncCodec(conn, conn.WriteJSON, conn.ReadJSON).(*jsonCodec), conn: conn,