-
Notifications
You must be signed in to change notification settings - Fork 102
/
Copy pathlogreader.go
146 lines (127 loc) · 3.55 KB
/
logreader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0
package tfe
import (
"context"
"errors"
"fmt"
"io"
"math"
"net/http"
"net/url"
"time"
)
// LogReader implements io.Reader for streaming logs.
type LogReader struct {
client *Client
ctx context.Context
done func() (bool, error)
logURL *url.URL
offset int64
reads int
startOfText bool
endOfText bool
}
func (r *LogReader) Read(l []byte) (int, error) {
if written, err := r.read(l); !errors.Is(err, io.ErrNoProgress) {
return written, err
}
// Loop until we can any data, the context is canceled or the
// run is finsished. If we would return right away without any
// data, we could end up causing a io.ErrNoProgress error.
for r.reads = 1; ; r.reads++ {
select {
case <-r.ctx.Done():
return 0, r.ctx.Err()
case <-time.After(backoff(500, 2000, r.reads)):
if written, err := r.read(l); !errors.Is(err, io.ErrNoProgress) {
return written, err
}
}
}
}
func (r *LogReader) read(l []byte) (int, error) {
// Update the query string.
r.logURL.RawQuery = fmt.Sprintf("limit=%d&offset=%d", len(l), r.offset)
// Create a new request.
req, err := http.NewRequest("GET", r.logURL.String(), nil)
if err != nil {
return 0, err
}
req = req.WithContext(r.ctx)
// Attach the default headers.
for k, v := range r.client.headers {
req.Header[k] = v
}
// Retrieve the next chunk.
resp, err := r.client.http.HTTPClient.Do(req)
if err != nil {
return 0, err
}
defer resp.Body.Close()
// Basic response checking.
if err := checkResponseCode(resp); err != nil {
return 0, err
}
// Read the retrieved chunk.
written, err := resp.Body.Read(l)
if err != nil && !errors.Is(err, io.EOF) {
// Ignore io.EOF errors returned when reading from the response
// body as this indicates the end of the chunk and not the end
// of the logfile.
return written, err
}
if written > 0 {
// Check for an STX (Start of Text) ASCII control marker.
if !r.startOfText && l[0] == byte(2) {
r.startOfText = true
// Remove the STX marker from the received chunk.
copy(l[:written-1], l[1:])
l[written-1] = byte(0)
r.offset++
written--
// Return early if we only received the STX marker.
if written == 0 {
return 0, io.ErrNoProgress
}
}
// If we found an STX ASCII control character, start looking for
// the ETX (End of Text) control character.
if r.startOfText && l[written-1] == byte(3) {
r.endOfText = true
// Remove the ETX marker from the received chunk.
l[written-1] = byte(0)
r.offset++
written--
}
}
// Check if we need to continue the loop and wait 500 miliseconds
// before checking if there is a new chunk available or that the
// run is finished and we are done reading all chunks.
if written != 0 {
// Update the offset for the next read.
r.offset += int64(written)
return written, nil
}
if (r.startOfText && r.endOfText) || // The logstream finished without issues.
(r.startOfText && r.reads%10 == 0) || // The logstream terminated unexpectedly.
(!r.startOfText && r.reads > 1) { // The logstream doesn't support STX/ETX.
done, err := r.done()
if err != nil {
return 0, err
}
if done {
return 0, io.EOF
}
}
return 0, io.ErrNoProgress
}
// backoff will perform exponential backoff based on the iteration and
// limited by the provided min and max (in milliseconds) durations.
func backoff(min, max float64, iter int) time.Duration {
backoff := math.Pow(2, float64(iter)/5) * min
if backoff > max {
backoff = max
}
return time.Duration(backoff) * time.Millisecond
}