Skip to content

Commit

Permalink
fix drain test
Browse files Browse the repository at this point in the history
  • Loading branch information
cenkalti committed Nov 6, 2020
1 parent c4aa826 commit ff2e1c6
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 8 deletions.
1 change: 1 addition & 0 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type Client struct {
log log.Logger
trackerURL *url.URL
httpClient http.Client
drainer bool
}

// NewClient creates a new Client.
Expand Down
6 changes: 6 additions & 0 deletions drain.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ type Drainer struct {
log log.Logger
shutdown chan struct{}
stopped chan struct{}

stopOnError bool
}

func NewDrainer(c *Config) (*Drainer, error) {
Expand All @@ -42,6 +44,7 @@ func NewDrainer(c *Config) (*Drainer, error) {
if err != nil {
return nil, err
}
clt.drainer = true
logger := log.NewLogger("drain")
d := &Drainer{
config: c,
Expand Down Expand Up @@ -91,6 +94,9 @@ func (d *Drainer) Run() error {
d.log.Infof("moving fid=%v; %v of %v (%v%%)", fid, i+1, len(fids), ((i+1)*100)/len(fids))
if err = d.moveFile(fid); err != nil {
d.log.Error(err)
if d.stopOnError {
return err
}
}
}
return nil
Expand Down
1 change: 1 addition & 0 deletions drain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ func TestDrain(t *testing.T) {
if err != nil {
t.Fatal(err)
}
dr.stopOnError = true
err = dr.Run()
if err != nil {
t.Fatal(err)
Expand Down
18 changes: 10 additions & 8 deletions filereceiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,16 @@ func (f *FileReceiver) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return
}
}
ok, err := f.tempfileExists(path)
if err != nil {
f.internalServerError("cannot check tempfile", err, r, w)
return
}
if !ok {
http.Error(w, "tempfile does not exist", http.StatusNotFound)
return
if r.Header.Get("efes-drain") == "" {
ok, err := f.tempfileExists(path)
if err != nil {
f.internalServerError("cannot check tempfile", err, r, w)
return
}
if !ok {
http.Error(w, "tempfile does not exist", http.StatusNotFound)
return
}
}
newOffset, digest, err := saveFile(path, offset, length, r.Body, f.log)
if oerr, ok := err.(*OffsetMismatchError); ok {
Expand Down
8 changes: 8 additions & 0 deletions write.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,11 @@ func (c *Client) sendFile(path string, rs io.ReadSeeker, size int64) (*Checksums
}
}
checksums, err = c.send(path, r, offset, size, bo)
if cerr, ok := err.(*ClientError); ok && cerr.Code == 404 {
return backoff.Permanent(cerr)
}
if err != nil {
c.log.Errorf("cannot send chunk: %s", err.Error())
return err
}
remoteSha1, err = hex.DecodeString(checksums.Sha1)
Expand Down Expand Up @@ -156,10 +160,14 @@ func (c *Client) patch(path string, body io.Reader, offset, size int64) (*http.R
if size > -1 {
req.Header.Add("efes-file-length", strconv.FormatInt(size, 10))
}
if c.drainer {
req.Header.Add("efes-drain", "true")
}
resp, err := c.httpClient.Do(req)
if err != nil {
return nil, err
}
resp.Body.Close()
return resp, checkResponseError(resp)
}

Expand Down

0 comments on commit ff2e1c6

Please sign in to comment.