-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathpacket_handler.go
97 lines (90 loc) · 2.28 KB
/
packet_handler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
package mongocaputils
import (
"io"
"log"
"time"
"github.com/google/gopacket"
"github.com/google/gopacket/layers"
"github.com/google/gopacket/pcap"
"github.com/google/gopacket/tcpassembly"
)
type PacketHandler struct {
Verbose bool
pcap *pcap.Handle
numDropped int64
}
func NewPacketHandler(pcapHandle *pcap.Handle) *PacketHandler {
return &PacketHandler{
pcap: pcapHandle,
}
}
type StreamHandler interface {
tcpassembly.StreamFactory
io.Closer
}
type SetFirstSeener interface {
SetFirstSeen(t time.Time)
}
func (p *PacketHandler) Handle(streamHandler StreamHandler, numToHandle int) error {
count := int64(0)
start := time.Now()
if p.Verbose && numToHandle > 0 {
log.Println("Processing", numToHandle, "packets")
}
source := gopacket.NewPacketSource(p.pcap, p.pcap.LinkType())
streamPool := tcpassembly.NewStreamPool(streamHandler)
assembler := tcpassembly.NewAssembler(streamPool)
defer func() {
if p.Verbose {
log.Println("flushing assembler.")
log.Println("num flushed/closed:", assembler.FlushAll())
log.Println("closing stream handler.")
} else {
assembler.FlushAll()
}
streamHandler.Close()
}()
defer func() {
if p.Verbose {
log.Println("Dropped", p.numDropped, "packets out of", count)
runTime := float64(time.Now().Sub(start)) / float64(time.Second)
log.Println("Processed", float64(count-p.numDropped)/runTime, "packets per second")
}
}()
ticker := time.Tick(time.Second * 1)
for {
select {
case pkt := <-source.Packets():
if pkt == nil { // end of pcap file
if p.Verbose {
log.Println("end of stream")
}
return nil
}
if tcpLayer := pkt.Layer(layers.LayerTypeTCP); tcpLayer != nil {
assembler.AssembleWithTimestamp(
pkt.TransportLayer().TransportFlow(),
tcpLayer.(*layers.TCP),
pkt.Metadata().Timestamp)
}
if count == 0 {
if firstSeener, ok := streamHandler.(SetFirstSeener); ok {
firstSeener.SetFirstSeen(pkt.Metadata().Timestamp)
}
}
count++
if numToHandle > 0 && count >= int64(numToHandle) {
if p.Verbose {
log.Println("Count exceeds requested packets, returning.")
}
break
}
case <-ticker:
if p.Verbose {
log.Println("flushing old streams")
}
assembler.FlushOlderThan(time.Now().Add(time.Second * -5))
}
}
return nil
}