Skip to content

Commit

Permalink
rpc proxy currently deployed
Browse files Browse the repository at this point in the history
  • Loading branch information
hrissan committed May 13, 2024
1 parent 56cd486 commit ef9a680
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 9 deletions.
108 changes: 101 additions & 7 deletions internal/aggregator/ingress_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"sync"
"time"

"go.uber.org/atomic"

"github.com/vkcom/statshouse/internal/vkgo/basictl"
"github.com/vkcom/statshouse/internal/vkgo/build"
"github.com/vkcom/statshouse/internal/vkgo/rpc"
Expand All @@ -36,11 +38,28 @@ type clientPool struct {
clients map[string]*rpc.Client
}

type longpollClient struct {
queryID int64
requestLen int
}

type longpollShard struct {
proxy *IngressProxy
mu sync.Mutex
clientList map[*rpc.HandlerContext]longpollClient
}

const longPollShardsCount = 256 // we want to shard lock to reduce contention

type IngressProxy struct {
nextShardLock atomic.Uint64 // round-robin for lock shards must be good

sh2 *agent.Agent
pool *clientPool
server *rpc.Server
config ConfigIngressProxy

longpollShards [longPollShardsCount]*longpollShard
}

type ConfigIngressProxy struct {
Expand Down Expand Up @@ -100,16 +119,22 @@ func RunIngressProxy(sh2 *agent.Agent, aesPwd string, config ConfigIngressProxy)
// TODO - server settings must be tuned
config: config,
}
for i := 0; i < longPollShardsCount; i++ {
proxy.longpollShards[i] = &longpollShard{
proxy: proxy,
clientList: map[*rpc.HandlerContext]longpollClient{},
}
}
proxy.server = rpc.NewServer(rpc.ServerWithCryptoKeys(config.IngressKeys),
rpc.ServerWithHandler(proxy.handler),
rpc.ServerWithSyncHandler(proxy.syncHandler),
rpc.ServerWithForceEncryption(true),
rpc.ServerWithLogf(log.Printf),
rpc.ServerWithDisableContextTimeout(true),
rpc.ServerWithTrustedSubnetGroups(build.TrustedSubnetGroups()),
rpc.ServerWithVersion(build.Info()),
rpc.ServerWithDefaultResponseTimeout(data_model.MaxConveyorDelay*time.Second),
rpc.ServerWithMaxInflightPackets(aggregatorMaxInflightPackets*100), // enough for up to 100 shards
rpc.ServerWithMaxWorkers(2<<20), // almost infinite
rpc.ServerWithResponseBufSize(1024),
rpc.ServerWithResponseMemEstimate(1024),
rpc.ServerWithRequestMemoryLimit(8<<30)) // see server settings in aggregator. We do not multiply here
Expand All @@ -128,6 +153,81 @@ func keyFromHctx(hctx *rpc.HandlerContext, resultTag int32) data_model.Key {
}
}

func (ls *longpollShard) callback(client *rpc.Client, queryID int64, resp *rpc.Response, err error, userData any) {
hctx := userData.(*rpc.HandlerContext)
ls.mu.Lock()
defer ls.mu.Unlock()
lpc, ok := ls.clientList[hctx]
if !ok || lpc.queryID != queryID {
// server already cancelled longpoll call
// or hctx was cancelled and reused by server before client response arrived
// since we have no client cancellation, we rely on fact that client queryId does not repeat often
return
}
delete(ls.clientList, hctx)
var key data_model.Key
if err != nil {
key = keyFromHctx(hctx, format.TagValueIDRPCRequestsStatusErrUpstream)
} else {
key = keyFromHctx(hctx, format.TagValueIDRPCRequestsStatusOK)
}
ls.proxy.sh2.AddValueCounter(key, float64(lpc.requestLen), 1, nil)
if resp != nil {
hctx.Response = append(hctx.Response, resp.Body...)
}
hctx.SendHijackedResponse(err)
}

func (ls *longpollShard) CancelHijack(hctx *rpc.HandlerContext) {
ls.mu.Lock()
defer ls.mu.Unlock()
if lpc, ok := ls.clientList[hctx]; ok {
key := keyFromHctx(hctx, format.TagValueIDRPCRequestsStatusErrCancel)
ls.proxy.sh2.AddValueCounter(key, float64(lpc.requestLen), 1, nil)
}
delete(ls.clientList, hctx)
}

func (proxy *IngressProxy) syncHandler(ctx context.Context, hctx *rpc.HandlerContext) error {
requestLen := len(hctx.Request)
resultTag, err := proxy.syncHandlerImpl(ctx, hctx)
if resultTag != 0 {
key := keyFromHctx(hctx, resultTag)
proxy.sh2.AddValueCounter(key, float64(requestLen), 1, nil)
}
return err
}

func (proxy *IngressProxy) syncHandlerImpl(ctx context.Context, hctx *rpc.HandlerContext) (resultTag int32, err error) {
requestLen := len(hctx.Request)
switch hctx.RequestTag() {
case constants.StatshouseGetTagMapping2,
constants.StatshouseSendKeepAlive2, constants.StatshouseSendSourceBucket2,
constants.StatshouseTestConnection2, constants.StatshouseGetTargets2,
constants.StatshouseGetTagMappingBootstrap, constants.StatshouseGetMetrics3,
constants.StatshouseAutoCreate:
case constants.StatshouseGetConfig2:
return 0, rpc.ErrNoHandler // call SyncHandler in worker
default:
// we want fast reject of unknown requests in sync handler
return format.TagValueIDRPCRequestsStatusNoHandler, fmt.Errorf("ingress proxy does not support tag 0x%x", hctx.RequestTag())
}
req, client, address, err := proxy.fillProxyRequest(hctx)
if err != nil {
return format.TagValueIDRPCRequestsStatusErrLocal, err
}

lockShardID := int(proxy.nextShardLock.Inc() % longPollShardsCount)
ls := proxy.longpollShards[lockShardID]
ls.mu.Lock() // to avoid race with longpoll cancellation, all code below must run under lock
defer ls.mu.Unlock()
if _, err := client.DoCallback(ctx, proxy.config.Network, address, req, ls.callback, hctx); err != nil {
return format.TagValueIDRPCRequestsStatusErrLocal, err
}
ls.clientList[hctx] = longpollClient{queryID: req.QueryID(), requestLen: requestLen}
return 0, hctx.HijackResponse(ls)
}

func (proxy *IngressProxy) handler(ctx context.Context, hctx *rpc.HandlerContext) error {
requestLen := len(hctx.Request)
resultTag, err := proxy.handlerImpl(ctx, hctx)
Expand Down Expand Up @@ -157,12 +257,6 @@ func (proxy *IngressProxy) handlerImpl(ctx context.Context, hctx *rpc.HandlerCon
ret.PreviousAddresses = proxy.sh2.GetConfigResult.PreviousAddresses
hctx.Response, _ = args.WriteResult(hctx.Response[:0], ret)
return format.TagValueIDRPCRequestsStatusOK, nil
case constants.StatshouseGetTagMapping2,
constants.StatshouseSendKeepAlive2, constants.StatshouseSendSourceBucket2,
constants.StatshouseTestConnection2, constants.StatshouseGetTargets2,
constants.StatshouseGetTagMappingBootstrap, constants.StatshouseGetMetrics3,
constants.StatshouseAutoCreate:
return proxy.syncProxyRequest(ctx, hctx)
default:
return format.TagValueIDRPCRequestsStatusNoHandler, fmt.Errorf("ingress proxy does not support tag 0x%x", hctx.RequestTag())
}
Expand Down
2 changes: 0 additions & 2 deletions internal/data_model/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,6 @@ const (

MaxMissedSecondsIntoContributors = 60 // If source sends more MissedSeconds, they will be truncated. Do not make large. We scan 4 arrays of this size on each insert.

ClientRPCPongTimeout = 30 * time.Second

AgentMappingTimeout1 = 10 * time.Second
AgentMappingTimeout2 = 30 * time.Second
AutoConfigTimeout = 30 * time.Second
Expand Down
1 change: 1 addition & 0 deletions internal/format/builtin.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ const (
TagValueIDRPCRequestsStatusErrUpstream = 3
TagValueIDRPCRequestsStatusHijack = 4
TagValueIDRPCRequestsStatusNoHandler = 5
TagValueIDRPCRequestsStatusErrCancel = 6 // on proxy, agent request was cancelled before response from aggregator arrived

TagValueIDProduction = 1
TagValueIDStaging = 2
Expand Down

0 comments on commit ef9a680

Please sign in to comment.