forked from meshplus/go-lightp2p
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathhandle.go
116 lines (91 loc) · 2.62 KB
/
handle.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
package network
import (
"context"
"fmt"
"io"
"time"
ggio "github.com/gogo/protobuf/io"
"github.com/libp2p/go-libp2p-core/network"
"github.com/pkg/errors"
network_pb "github.com/meshplus/go-lightp2p/pb"
)
func (p2p *P2P) handleMessage(s *stream, reader ggio.ReadCloser) error {
msg := &network_pb.Message{}
if err := reader.ReadMsg(msg); err != nil {
if err != io.EOF {
if err := s.reset(); err != nil {
p2p.logger.WithField("error", err).Error("Reset stream")
}
return errors.Wrap(err, "failed on read msg")
}
return nil
}
if p2p.messageHandler != nil {
p2p.messageHandler(s, msg.Data)
}
return nil
}
// handle newly connected stream
func (p2p *P2P) handleNewStreamReusable(s network.Stream) {
if err := s.SetReadDeadline(time.Time{}); err != nil {
p2p.logger.WithField("error", err).Error("Set stream read deadline")
return
}
reader := ggio.NewDelimitedReader(s, network.MessageSizeMax)
for {
stream := newStream(s, p2p.config.protocolIDs[reusableProtocolIndex], DirInbound)
msg := &network_pb.Message{}
if err := reader.ReadMsg(msg); err != nil {
if err != io.EOF {
if err := stream.reset(); err != nil {
p2p.logger.WithField("error", err).Error("Reset stream")
}
}
return
}
if p2p.messageHandler != nil {
p2p.messageHandler(stream, msg.Data)
}
}
}
func (p2p *P2P) handleNewStream(s network.Stream) {
if err := s.SetReadDeadline(time.Time{}); err != nil {
p2p.logger.WithField("error", err).Error("Set stream read deadline")
return
}
reader := ggio.NewDelimitedReader(s, network.MessageSizeMax)
p2p.handleMessage(newStream(s, p2p.config.protocolIDs[nonReusableProtocolIndex], DirInbound), reader)
}
// waitMsg wait the incoming messages within time duration.
func waitMsg(stream io.Reader, timeout time.Duration) (*network_pb.Message, error) {
reader := ggio.NewDelimitedReader(stream, network.MessageSizeMax)
ch := make(chan error)
msg := &network_pb.Message{}
go func() {
if err := reader.ReadMsg(msg); err != nil {
ch <- err
} else {
ch <- nil
}
}()
ctx, cancel := context.WithTimeout(context.Background(), timeout)
select {
case r := <-ch:
cancel()
return msg, r
case <-ctx.Done():
cancel()
return nil, errors.New("wait msg timeout")
}
}
func (p2p *P2P) send(s *stream, msg []byte) error {
deadline := time.Now().Add(sendTimeout)
if err := s.getStream().SetWriteDeadline(deadline); err != nil {
return fmt.Errorf("set deadline: %w", err)
}
writer := ggio.NewDelimitedWriter(s.getStream())
if err := writer.WriteMsg(&network_pb.Message{Data: msg}); err != nil {
return fmt.Errorf("write msg: %w", err)
}
return nil
}