From cec0f3fdb1903093b6eea7788ffea02df11909e7 Mon Sep 17 00:00:00 2001 From: Kent Quirk Date: Wed, 6 Dec 2023 15:06:09 -0500 Subject: [PATCH] fix: try to deflake several flaky tests (#934) ## Which problem is this PR solving? We've had some problematic flaky tests. * Some were because of probabalistic assertions relating to samplers -- we now run the worst of those a second time if they fail. * Some were waiting for events on another goroutine that might not have been scheduled, and these have been adjusted by using `assert.Eventually` which is actually pretty useful. Fixes #896 Fixes #897 Fixes #901 Fixes #902 Plus a couple of other tests that hadn't gotten their own issue. This might also make the tests run a little bit faster overall. You might want to turn off whitespace when reviewing this, because the retry loop changed indentation. --- app/app_test.go | 50 ++---- collect/collect_test.go | 13 +- internal/peer/peers_test.go | 10 +- sharder/deterministic_test.go | 327 +++++++++++++++++++--------------- 4 files changed, 205 insertions(+), 195 deletions(-) diff --git a/app/app_test.go b/app/app_test.go index 075ba984a2..732327d218 100644 --- a/app/app_test.go +++ b/app/app_test.go @@ -245,25 +245,14 @@ func TestAppIntegration(t *testing.T) { err = startstop.Stop(graph.Objects(), nil) assert.NoError(t, err) - // Wait for span to be sent. - deadline := time.After(time.Second) - for { - if out.Len() > 62 { - break - } - select { - case <-deadline: - t.Error("timed out waiting for output") - return - case <-time.After(time.Millisecond): - } - } + assert.Eventually(t, func() bool { + return out.Len() > 62 + }, 5*time.Second, 2*time.Millisecond) assert.Equal(t, `{"data":{"foo":"bar","meta.refinery.original_sample_rate":1,"trace.trace_id":"1"},"dataset":"dataset"}`+"\n", out.String()) } func TestAppIntegrationWithNonLegacyKey(t *testing.T) { - // This is failing in Parallel, so disable it for now. - // t.Parallel() + t.Parallel() var out bytes.Buffer a, graph := newStartedApp(t, &transmission.WriterSender{W: &out}, 10500, nil, false) @@ -288,18 +277,9 @@ func TestAppIntegrationWithNonLegacyKey(t *testing.T) { assert.NoError(t, err) // Wait for span to be sent. - deadline := time.After(2 * time.Second) - for { - if out.Len() > 62 { - break - } - select { - case <-deadline: - t.Error("timed out waiting for output") - return - case <-time.After(time.Millisecond): - } - } + assert.Eventually(t, func() bool { + return out.Len() > 62 + }, 5*time.Second, 2*time.Millisecond) assert.Equal(t, `{"data":{"foo":"bar","meta.refinery.original_sample_rate":1,"trace.trace_id":"1"},"dataset":"dataset"}`+"\n", out.String()) } @@ -452,19 +432,9 @@ func TestHostMetadataSpanAdditions(t *testing.T) { err = startstop.Stop(graph.Objects(), nil) assert.NoError(t, err) - // Wait for span to be sent. - deadline := time.After(time.Second) - for { - if out.Len() > 62 { - break - } - select { - case <-deadline: - t.Error("timed out waiting for output") - return - case <-time.After(time.Millisecond): - } - } + assert.Eventually(t, func() bool { + return out.Len() > 62 + }, 5*time.Second, 2*time.Millisecond) expectedSpan := `{"data":{"foo":"bar","meta.refinery.local_hostname":"%s","meta.refinery.original_sample_rate":1,"trace.trace_id":"1"},"dataset":"dataset"}` + "\n" assert.Equal(t, fmt.Sprintf(expectedSpan, hostname), out.String()) diff --git a/collect/collect_test.go b/collect/collect_test.go index 4c4b3edeb2..31a1c36029 100644 --- a/collect/collect_test.go +++ b/collect/collect_test.go @@ -152,7 +152,7 @@ func TestOriginalSampleRateIsNotedInMetaField(t *testing.T) { // Spin until a sample gets triggered sendAttemptCount := 0 - for getEventsLength(transmission) < 1 || sendAttemptCount > 10 { + for getEventsLength(transmission) < 1 { sendAttemptCount++ span := &types.Span{ TraceID: fmt.Sprintf("trace-%v", sendAttemptCount), @@ -164,11 +164,11 @@ func TestOriginalSampleRateIsNotedInMetaField(t *testing.T) { }, } coll.AddSpan(span) - time.Sleep(conf.SendTickerVal * 2) + time.Sleep(conf.SendTickerVal * 5) } transmission.Mux.RLock() - assert.Greater(t, len(transmission.Events), 0, "should be some events transmitted") + assert.Greater(t, len(transmission.Events), 0, "should be at least one event transmitted") assert.Equal(t, uint(50), transmission.Events[0].Data["meta.refinery.original_sample_rate"], "metadata should be populated with original sample rate") transmission.Mux.RUnlock() @@ -245,8 +245,13 @@ func TestTransmittedSpansShouldHaveASampleRateOfAtLeastOne(t *testing.T) { time.Sleep(conf.SendTickerVal * 2) + assert.Eventually(t, func() bool { + transmission.Mux.RLock() + defer transmission.Mux.RUnlock() + return len(transmission.Events) > 0 + }, 2*time.Second, conf.SendTickerVal*2) + transmission.Mux.RLock() - assert.Equal(t, 1, len(transmission.Events), "should be some events transmitted") assert.Equal(t, uint(1), transmission.Events[0].SampleRate, "SampleRate should be reset to one after starting at zero") transmission.Mux.RUnlock() diff --git a/internal/peer/peers_test.go b/internal/peer/peers_test.go index 65b89f57bc..e481d0796c 100644 --- a/internal/peer/peers_test.go +++ b/internal/peer/peers_test.go @@ -70,8 +70,10 @@ func TestPeerShutdown(t *testing.T) { assert.True(t, strings.HasSuffix(peers[0], "8081")) close(done) - time.Sleep(500 * time.Millisecond) - peers, err = peer.GetPeers() - assert.NoError(t, err) - assert.Equal(t, 0, len(peers)) + + assert.Eventually(t, func() bool { + peers, err = peer.GetPeers() + assert.NoError(t, err) + return len(peers) == 0 + }, 5*time.Second, 200*time.Millisecond) } diff --git a/sharder/deterministic_test.go b/sharder/deterministic_test.go index 51ed086108..f5fd90c2de 100644 --- a/sharder/deterministic_test.go +++ b/sharder/deterministic_test.go @@ -157,57 +157,66 @@ func TestShardBulk(t *testing.T) { for i := 0; i < 5; i++ { npeers := i*10 + 5 t.Run(fmt.Sprintf("bulk npeers=%d", npeers), func(t *testing.T) { - peers := []string{ - "http://" + selfAddr, - } - for i := 1; i < npeers; i++ { - peers = append(peers, fmt.Sprintf("http://2.2.2.%d/:8081", i)) - } + for retry := 0; retry < 2; retry++ { + peers := []string{ + "http://" + selfAddr, + } + for i := 1; i < npeers; i++ { + peers = append(peers, fmt.Sprintf("http://2.2.2.%d/:8081", i)) + } - config := &config.MockConfig{ - GetPeerListenAddrVal: selfAddr, - GetPeersVal: peers, - PeerManagementType: "file", - } - done := make(chan struct{}) - defer close(done) - filePeers, err := peer.NewPeers(context.Background(), config, done) - assert.Equal(t, nil, err) - sharder := DeterministicSharder{ - Config: config, - Logger: &logger.NullLogger{}, - Peers: filePeers, - } + config := &config.MockConfig{ + GetPeerListenAddrVal: selfAddr, + GetPeersVal: peers, + PeerManagementType: "file", + } + done := make(chan struct{}) + defer close(done) + filePeers, err := peer.NewPeers(context.Background(), config, done) + assert.NoError(t, err, "NewPeers should succeed") + sharder := DeterministicSharder{ + Config: config, + Logger: &logger.NullLogger{}, + Peers: filePeers, + } - assert.NoError(t, sharder.Start(), "starting sharder should not error") + assert.NoError(t, sharder.Start(), "starting sharder should not error") - const ntraces = 1000 - ids := make([]string, ntraces) - for i := 0; i < ntraces; i++ { - ids[i] = GenID(32) - } + const ntraces = 1000 + ids := make([]string, ntraces) + for i := 0; i < ntraces; i++ { + ids[i] = GenID(32) + } - results := make(map[string]int) - for i := 0; i < ntraces; i++ { - s := sharder.WhichShard(ids[i]) - results[s.GetAddress()]++ - } - min := ntraces - max := 0 - for _, r := range results { - if r < min { - min = r + results := make(map[string]int) + for i := 0; i < ntraces; i++ { + s := sharder.WhichShard(ids[i]) + results[s.GetAddress()]++ } - if r > max { - max = r + min := ntraces + max := 0 + for _, r := range results { + if r < min { + min = r + } + if r > max { + max = r + } } - } - // This is probabilistic, so could fail, but shouldn't be flaky as long as - // expectedResult is at least 20 or so. - expectedResult := ntraces / npeers - assert.Greater(t, expectedResult*2, max, "expected smaller max, got %d: %v", max, results) - assert.NotEqual(t, 0, min, "expected larger min, got %d: %v", min, results) + // This is probabilistic, so could fail, which is why we retry it once if it does. + expectedResult := ntraces / npeers + if min < expectedResult/3 || max > expectedResult*2 { + if retry == 0 { + t.Logf("probabalistic test failed once, retrying test with npeers=%d", npeers) + continue + } + assert.Greater(t, expectedResult*2, max, "expected smaller max, got %d: %v", max, results) + assert.NotEqual(t, expectedResult/3, min, "expected larger min, got %d: %v", min, results) + } else { + break // don't retry if it passed + } + } }) } } @@ -221,67 +230,77 @@ func TestShardDrop(t *testing.T) { for i := 0; i < 5; i++ { npeers := i*10 + 5 t.Run(fmt.Sprintf("drop npeers=%d", npeers), func(t *testing.T) { - peers := []string{ - "http://" + selfAddr, - } - for i := 1; i < npeers; i++ { - peers = append(peers, fmt.Sprintf("http://2.2.2.%d/:8081", i)) - } + for retry := 0; retry < 2; retry++ { + peers := []string{ + "http://" + selfAddr, + } + for i := 1; i < npeers; i++ { + peers = append(peers, fmt.Sprintf("http://2.2.2.%d/:8081", i)) + } - config := &config.MockConfig{ - GetPeerListenAddrVal: selfAddr, - GetPeersVal: peers, - PeerManagementType: "file", - } - done := make(chan struct{}) - defer close(done) - filePeers, err := peer.NewPeers(context.Background(), config, done) - assert.Equal(t, nil, err) - sharder := DeterministicSharder{ - Config: config, - Logger: &logger.NullLogger{}, - Peers: filePeers, - } + config := &config.MockConfig{ + GetPeerListenAddrVal: selfAddr, + GetPeersVal: peers, + PeerManagementType: "file", + } + done := make(chan struct{}) + defer close(done) + filePeers, err := peer.NewPeers(context.Background(), config, done) + assert.Equal(t, nil, err) + sharder := DeterministicSharder{ + Config: config, + Logger: &logger.NullLogger{}, + Peers: filePeers, + } - assert.NoError(t, sharder.Start(), "starting sharder should not error") + assert.NoError(t, sharder.Start(), "starting sharder should not error") - type placement struct { - id string - shard string - } + type placement struct { + id string + shard string + } - const ntraces = 1000 - placements := make([]placement, ntraces) - for i := 0; i < ntraces; i++ { - placements[i].id = GenID(32) - } + const ntraces = 1000 + placements := make([]placement, ntraces) + for i := 0; i < ntraces; i++ { + placements[i].id = GenID(32) + } - results := make(map[string]int) - for i := 0; i < ntraces; i++ { - s := sharder.WhichShard(placements[i].id) - results[s.GetAddress()]++ - placements[i].shard = s.GetAddress() - } + results := make(map[string]int) + for i := 0; i < ntraces; i++ { + s := sharder.WhichShard(placements[i].id) + results[s.GetAddress()]++ + placements[i].shard = s.GetAddress() + } - // reach in and delete one of the peers, then reshard - config.GetPeersVal = config.GetPeersVal[1:] - sharder.loadPeerList() - - results = make(map[string]int) - nDiff := 0 - for i := 0; i < ntraces; i++ { - s := sharder.WhichShard(placements[i].id) - results[s.GetAddress()]++ - if s.GetAddress() != placements[i].shard { - nDiff++ + // reach in and delete one of the peers, then reshard + config.GetPeersVal = config.GetPeersVal[1:] + sharder.loadPeerList() + + results = make(map[string]int) + nDiff := 0 + for i := 0; i < ntraces; i++ { + s := sharder.WhichShard(placements[i].id) + results[s.GetAddress()]++ + if s.GetAddress() != placements[i].shard { + nDiff++ + } } - } - // we have a fairly large range here because it's truly random - // and we've been having some flaky tests - expected := ntraces / (npeers - 1) - assert.Greater(t, expected*2, nDiff) - assert.Less(t, expected/2, nDiff) + // we have a fairly large range here because it's truly random + // and we've been having some flaky tests + expected := ntraces / (npeers - 1) + if nDiff < expected/2 || nDiff > expected*2 { + if retry == 0 { + t.Logf("probabalistic test failed once, retrying test with npeers=%d", npeers) + continue + } + assert.Greater(t, expected*2, nDiff) + assert.Less(t, expected/2, nDiff) + } else { + break // don't retry if it passed + } + } }) } } @@ -295,64 +314,78 @@ func TestShardAddHash(t *testing.T) { for i := 0; i < 5; i++ { npeers := i*10 + 7 t.Run(fmt.Sprintf("add npeers=%d", npeers), func(t *testing.T) { - peers := []string{ - "http://" + selfAddr, - } - for i := 1; i < npeers; i++ { - peers = append(peers, fmt.Sprintf("http://2.2.2.%d/:8081", i)) - } + for retry := 0; retry < 2; retry++ { + peers := []string{ + "http://" + selfAddr, + } + for i := 1; i < npeers; i++ { + peers = append(peers, fmt.Sprintf("http://2.2.2.%d/:8081", i)) + } - config := &config.MockConfig{ - GetPeerListenAddrVal: selfAddr, - GetPeersVal: peers, - PeerManagementType: "file", - } - done := make(chan struct{}) - defer close(done) - filePeers, err := peer.NewPeers(context.Background(), config, done) - assert.Equal(t, nil, err) - sharder := DeterministicSharder{ - Config: config, - Logger: &logger.NullLogger{}, - Peers: filePeers, - } + config := &config.MockConfig{ + GetPeerListenAddrVal: selfAddr, + GetPeersVal: peers, + PeerManagementType: "file", + } + done := make(chan struct{}) + defer close(done) + filePeers, err := peer.NewPeers(context.Background(), config, done) + assert.Equal(t, nil, err) + sharder := DeterministicSharder{ + Config: config, + Logger: &logger.NullLogger{}, + Peers: filePeers, + } - assert.NoError(t, sharder.Start(), "starting sharder should not error") + assert.NoError(t, sharder.Start(), "starting sharder should not error") - type placement struct { - id string - shard string - } + type placement struct { + id string + shard string + } - const ntraces = 1000 - placements := make([]placement, ntraces) - for i := 0; i < ntraces; i++ { - placements[i].id = GenID(32) - } + const ntraces = 1000 + placements := make([]placement, ntraces) + for i := 0; i < ntraces; i++ { + placements[i].id = GenID(32) + } - results := make(map[string]int) - for i := 0; i < ntraces; i++ { - s := sharder.WhichShard(placements[i].id) - results[s.GetAddress()]++ - placements[i].shard = s.GetAddress() - } + results := make(map[string]int) + for i := 0; i < ntraces; i++ { + s := sharder.WhichShard(placements[i].id) + results[s.GetAddress()]++ + placements[i].shard = s.GetAddress() + } - // reach in and add a peer, then reshard - config.GetPeersVal = append(config.GetPeersVal, "http://2.2.2.255/:8081") - sharder.loadPeerList() - - results = make(map[string]int) - nDiff := 0 - for i := 0; i < ntraces; i++ { - s := sharder.WhichShard(placements[i].id) - results[s.GetAddress()]++ - if s.GetAddress() != placements[i].shard { - nDiff++ + // reach in and add a peer, then reshard + config.GetPeersVal = append(config.GetPeersVal, "http://2.2.2.255/:8081") + sharder.loadPeerList() + + results = make(map[string]int) + nDiff := 0 + for i := 0; i < ntraces; i++ { + s := sharder.WhichShard(placements[i].id) + results[s.GetAddress()]++ + if s.GetAddress() != placements[i].shard { + nDiff++ + } + } + expected := ntraces / (npeers - 1) + // we have a fairly large range here because it's truly random + // and we've been having some flaky tests + if nDiff < expected/2 || nDiff > expected*2 { + if retry == 0 { + t.Logf("probabalistic test failed once, retrying test with npeers=%d", npeers) + continue + } + assert.Greater(t, expected*2, nDiff) + assert.Less(t, expected/2, nDiff) + } else { + break // don't retry if it passed } + assert.Greater(t, expected*2, nDiff) + assert.Less(t, expected/2, nDiff) } - expected := ntraces / (npeers - 1) - assert.Greater(t, expected*2, nDiff) - assert.Less(t, expected/2, nDiff) }) } }