diff --git a/internal/api/handler.go b/internal/api/handler.go index edb8bd8e7..91a6ff206 100644 --- a/internal/api/handler.go +++ b/internal/api/handler.go @@ -128,7 +128,7 @@ const ( maxFunctions = 10 cacheInvalidateCheckInterval = 1 * time.Second - cacheInvalidateCheckTimeout = 5 * time.Second + cacheInvalidateCheckTimeout = 8 * time.Second // should be greater than dial timeout see chDialTimeout cacheInvalidateMaxRows = 100_000 cacheDefaultDropEvery = 90 * time.Second diff --git a/internal/util/chutil.go b/internal/util/chutil.go index 96508c3b4..b0dd5ec6f 100644 --- a/internal/util/chutil.go +++ b/internal/util/chutil.go @@ -25,45 +25,47 @@ import ( "github.com/vkcom/statshouse/internal/util/queue" ) -type connPool struct { - rnd *rand.Rand - servers []*chpool.Pool - sem *queue.Queue - - userActive map[string]int - mx sync.Mutex - userWait map[string]int - waitMx sync.Mutex -} +type ( + connPool struct { + rnd *rand.Rand + servers []*severConnPool + sem *queue.Queue + + userActive map[string]int + mx sync.Mutex + userWait map[string]int + waitMx sync.Mutex + } -type ClickHouse struct { - pools [4]*connPool -} + ClickHouse struct { + pools [4]*connPool + } -type QueryMetaInto struct { - IsFast bool - IsLight bool - User string - Metric int32 - Table string - Kind string -} + QueryMetaInto struct { + IsFast bool + IsLight bool + User string + Metric int32 + Table string + Kind string + } -type QueryHandleInfo struct { - Duration time.Duration - Profile proto.Profile -} + QueryHandleInfo struct { + Duration time.Duration + Profile proto.Profile + } -type ChConnOptions struct { - Addrs []string - User string - Password string - DialTimeout time.Duration - FastLightMaxConns int - FastHeavyMaxConns int - SlowLightMaxConns int - SlowHeavyMaxConns int -} + ChConnOptions struct { + Addrs []string + User string + Password string + DialTimeout time.Duration + FastLightMaxConns int + FastHeavyMaxConns int + SlowLightMaxConns int + SlowHeavyMaxConns int + } +) const ( fastLight = 0 @@ -78,10 +80,10 @@ func OpenClickHouse(opt ChConnOptions) (*ClickHouse, error) { } result := &ClickHouse{[4]*connPool{ - {rand.New(), make([]*chpool.Pool, 0, len(opt.Addrs)), queue.NewQueue(int64(opt.FastLightMaxConns)), map[string]int{}, sync.Mutex{}, map[string]int{}, sync.Mutex{}}, // fastLight - {rand.New(), make([]*chpool.Pool, 0, len(opt.Addrs)), queue.NewQueue(int64(opt.FastHeavyMaxConns)), map[string]int{}, sync.Mutex{}, map[string]int{}, sync.Mutex{}}, // fastHeavy - {rand.New(), make([]*chpool.Pool, 0, len(opt.Addrs)), queue.NewQueue(int64(opt.SlowLightMaxConns)), map[string]int{}, sync.Mutex{}, map[string]int{}, sync.Mutex{}}, // slowLight - {rand.New(), make([]*chpool.Pool, 0, len(opt.Addrs)), queue.NewQueue(int64(opt.SlowHeavyMaxConns)), map[string]int{}, sync.Mutex{}, map[string]int{}, sync.Mutex{}}, // slowHeavy + {rand.New(), make([]*severConnPool, 0, len(opt.Addrs)), queue.NewQueue(int64(opt.FastLightMaxConns)), map[string]int{}, sync.Mutex{}, map[string]int{}, sync.Mutex{}}, // fastLight + {rand.New(), make([]*severConnPool, 0, len(opt.Addrs)), queue.NewQueue(int64(opt.FastHeavyMaxConns)), map[string]int{}, sync.Mutex{}, map[string]int{}, sync.Mutex{}}, // fastHeavy + {rand.New(), make([]*severConnPool, 0, len(opt.Addrs)), queue.NewQueue(int64(opt.SlowLightMaxConns)), map[string]int{}, sync.Mutex{}, map[string]int{}, sync.Mutex{}}, // slowLight + {rand.New(), make([]*severConnPool, 0, len(opt.Addrs)), queue.NewQueue(int64(opt.SlowHeavyMaxConns)), map[string]int{}, sync.Mutex{}, map[string]int{}, sync.Mutex{}}, // slowHeavy }} for _, addr := range opt.Addrs { for _, pool := range result.pools { @@ -99,7 +101,7 @@ func OpenClickHouse(opt ChConnOptions) (*ClickHouse, error) { result.Close() return nil, err } - pool.servers = append(pool.servers, server) + pool.servers = append(pool.servers, newSeverConnPool(server)) } } @@ -117,7 +119,7 @@ func (c *connPool) countOfReqLocked(m map[string]int) int { func (ch *ClickHouse) Close() { for _, a := range ch.pools { for _, b := range a.servers { - b.Close() + b.ch.Close() } } } @@ -162,7 +164,12 @@ func (ch *ClickHouse) Select(ctx context.Context, meta QueryMetaInto, query ch.Q } kind := QueryKind(meta.IsFast, meta.IsLight) pool := ch.pools[kind] - servers := append(make([]*chpool.Pool, 0, len(pool.servers)), pool.servers...) + servers := make([]*severConnPool, 0, len(pool.servers)) + for _, p := range pool.servers { + if p.IsReady() { + servers = append(servers, p) + } + } for safetyCounter := 0; safetyCounter < len(pool.servers); safetyCounter++ { var i int i, err = pickRandomServer(servers, pool.rnd) @@ -220,7 +227,7 @@ func (ch *ClickHouse) Select(ctx context.Context, meta QueryMetaInto, query ch.Q return info, err } -func pickRandomServer(s []*chpool.Pool, r *rand.Rand) (int, error) { +func pickRandomServer(s []*severConnPool, r *rand.Rand) (int, error) { if len(s) == 0 { return 0, fmt.Errorf("all ClickHouse servers are dead") } @@ -232,7 +239,7 @@ func pickRandomServer(s []*chpool.Pool, r *rand.Rand) (int, error) { if i2 >= i1 { i2++ } - if s[i1].Stat().AcquiredResources() < s[i2].Stat().AcquiredResources() { + if s[i1].ch.Stat().AcquiredResources() < s[i2].ch.Stat().AcquiredResources() { return i1, nil } else { return i2, nil diff --git a/internal/util/conn_proxy.go b/internal/util/conn_proxy.go new file mode 100644 index 000000000..8d222aa84 --- /dev/null +++ b/internal/util/conn_proxy.go @@ -0,0 +1,100 @@ +package util + +import ( + "context" + "errors" + "net" + "sync" + "time" + + "github.com/ClickHouse/ch-go" + "github.com/ClickHouse/ch-go/chpool" + "pgregory.net/rand" +) + +const ( + maxSucc = -5 + maxFailErrAlive = 10 + maxFailErrHalfOpen = 5 + deadInterval = time.Second * 30 + + alive = 0 + dead = 1 + halfOpen = 2 + + halfOpenSendReq = 10 // when conn has halfOpen state send only 1/halfOpenSendReq requests +) + +type severConnPool struct { + ch *chpool.Pool + + mx sync.Mutex + errCount int + state int + stateStart time.Time +} + +func newSeverConnPool(ch *chpool.Pool) *severConnPool { + return &severConnPool{ + ch: ch, + errCount: 0, + state: alive, + stateStart: time.Now(), + } +} + +func (p *severConnPool) Do(ctx context.Context, q ch.Query) (err error) { + err = p.ch.Do(ctx, q) + p.passResult(ctx, err) + return err +} + +func (p *severConnPool) passResult(ctx context.Context, err error) { + p.mx.Lock() + defer p.mx.Unlock() + if p.state == dead { + return + } + var netErr = &net.OpError{} + if err != nil { + if errors.As(err, &netErr) { + p.errCount++ + } else if ctx.Err() == nil { // some query can timeout and this is ok (in this case err != nil and ctx.Err() != nil) + p.errCount++ + } + } else if p.errCount > maxSucc { + p.errCount-- + } + if p.state == alive && p.errCount >= maxFailErrAlive { + p.errCount = 0 + p.state = halfOpen + p.stateStart = time.Now() + } + if p.state == halfOpen && p.errCount >= maxFailErrHalfOpen { + p.errCount = 0 + p.state = dead + p.stateStart = time.Now() + } + if p.errCount <= maxSucc { + p.errCount = 0 + p.state = alive + p.stateStart = time.Now() + } + +} + +func (p *severConnPool) IsReady() bool { + p.mx.Lock() + defer p.mx.Unlock() + if p.state == dead && time.Since(p.stateStart) < deadInterval { + return false + } + if p.state == dead { + p.errCount = 0 + p.state = halfOpen + } + if p.state == halfOpen { + return rand.Intn(halfOpenSendReq) < 1 + } + return true +} diff --git a/internal/util/conn_proxy_test.go b/internal/util/conn_proxy_test.go new file mode 100644 index 000000000..7ed3a7218 --- /dev/null +++ b/internal/util/conn_proxy_test.go @@ -0,0 +1,32 @@ +package util + +import ( + "context" + "fmt" + "net" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func Test_severConnPool_Do(t *testing.T) { + p := &severConnPool{ + ch: nil, + errCount: 0, + state: alive, + stateStart: time.Now(), + } + ctx, cancel := context.WithCancel(context.Background()) + cancel() + for i := 0; i < maxFailErrAlive; i++ { + p.passResult(ctx, &net.OpError{}) + } + require.Equal(t, p.errCount, 0) + require.Equal(t, halfOpen, p.state) + for i := 0; i < maxFailErrHalfOpen; i++ { + p.passResult(context.Background(), fmt.Errorf("FAIL")) + } + require.Equal(t, p.errCount, 0) + require.Equal(t, dead, p.state) +}