Skip to content

Commit

Permalink
feat: experimental stats
Browse files Browse the repository at this point in the history
  • Loading branch information
chronark committed Aug 10, 2023
1 parent a889425 commit aa6d1e7
Show file tree
Hide file tree
Showing 9 changed files with 192 additions and 64 deletions.
27 changes: 15 additions & 12 deletions apps/agent/cmd/agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ var AgentCmd = &cobra.Command{
if enableAxiom {
t, closeTracer, err := tracing.New(context.Background(), tracing.Config{
Dataset: "tracing",
Service: "api",
Service: "agent",
Version: version.Version,
AxiomOrgId: e.String("AXIOM_ORG_ID"),
AxiomToken: e.String("AXIOM_TOKEN"),
Expand Down Expand Up @@ -119,21 +119,24 @@ var AgentCmd = &cobra.Command{
}
}

db, err := database.New(database.Config{
Logger: logger,
PrimaryUs: e.String("DATABASE_DSN"),
ReplicaEu: e.String("DATABASE_DSN_EU", ""),
ReplicaAsia: e.String("DATABASE_DSN_ASIA", ""),
FlyRegion: region,
PlanetscaleBoost: e.Bool("PLANETSCALE_BOOST", false),
},
database.WithTracing(tracer))
db, err := database.New(
database.Config{
Logger: logger,
PrimaryUs: e.String("DATABASE_DSN"),
ReplicaEu: e.String("DATABASE_DSN_EU", ""),
ReplicaAsia: e.String("DATABASE_DSN_ASIA", ""),
FlyRegion: region,
PlanetscaleBoost: e.Bool("PLANETSCALE_BOOST", false),
},
database.WithLogging(logger),
database.WithTracing(tracer),
)
if err != nil {
logger.Fatal("Failed to connect to database", zap.Error(err))
}

keyCache := cache.New[entities.Key](cache.Config[entities.Key]{
Fresh: time.Minute,
Fresh: time.Minute * 5,
Stale: time.Minute * 15,
RefreshFromOrigin: db.FindKeyByHash,
Logger: logger,
Expand All @@ -142,7 +145,7 @@ var AgentCmd = &cobra.Command{
keyCache = cacheMiddleware.WithLogging[entities.Key](keyCache, logger.With(zap.String("cacheType", "key")))

apiCache := cache.New[entities.Api](cache.Config[entities.Api]{
Fresh: time.Minute,
Fresh: time.Minute * 5,
Stale: time.Minute * 15,
RefreshFromOrigin: db.FindApi,
Logger: logger,
Expand Down
2 changes: 1 addition & 1 deletion apps/agent/pkg/database/decrement_key_remaining.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (

func (db *database) DecrementRemainingKeyUsage(ctx context.Context, keyId string) (entities.Key, error) {

tx, err := db.primary.db.BeginTx(ctx, nil)
tx, err := db.writeReplica.db.BeginTx(ctx, nil)
if err != nil {
return entities.Key{}, fmt.Errorf("unable to begin transaction: %w", err)
}
Expand Down
34 changes: 21 additions & 13 deletions apps/agent/pkg/database/planetscale.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ type replica struct {
}

type database struct {
primary *replica
readReplica *replica
logger *zap.Logger
writeReplica *replica
readReplica *replica
logger *zap.Logger
}

type Config struct {
Expand Down Expand Up @@ -66,13 +66,21 @@ func New(config Config, middlewares ...Middleware) (Database, error) {
if err != nil {
return nil, fmt.Errorf("error opening database: %w", err)
}
} else {
logger.Info("Adding database read replica", zap.String("continent", "us"))

readDB, err = sql.Open("mysql", fmt.Sprintf("%s&parseTime=true", config.PrimaryUs))
if err != nil {
return nil, fmt.Errorf("error opening database: %w", err)
}
}

if readDB != nil {
err = readDB.Ping()
if err != nil {
return nil, fmt.Errorf("unable to ping read replica")
}
logger.Info("pinged read db")
if config.PlanetscaleBoost {
logger.Info("enabling planetscale boost for replica")

Expand All @@ -83,22 +91,24 @@ func New(config Config, middlewares ...Middleware) (Database, error) {
}
}

primaryReplica := &replica{
writeReplica := &replica{
db: primary,
query: gen.New(primary),
}
var readReplica *replica
if readDB != nil {

logger.Info("creating readReplica")
readReplica = &replica{
db: readDB,
query: gen.New(readDB),
}
}

var db Database = &database{
primary: primaryReplica,
readReplica: readReplica,
logger: logger,
writeReplica: writeReplica,
readReplica: readReplica,
logger: logger,
}
for _, mw := range middlewares {
db = mw(db)
Expand All @@ -108,17 +118,15 @@ func New(config Config, middlewares ...Middleware) (Database, error) {

// read returns the primary writable db
func (d *database) write() *gen.Queries {
return d.primary.query
return d.writeReplica.query
}

// read returns the closests read replica or primary as fallback
func (d *database) read() *gen.Queries {
d.logger.Info("read replica", zap.Bool("isNotNil", d.readReplica != nil))
if d.readReplica != nil && d.readReplica.query != nil {
return d.readReplica.query
}
if d.primary != nil && d.primary.query != nil {
return d.primary.query
}

panic("neither primary, nor read replica are available")
d.logger.Warn("falling back to primary ")
return d.writeReplica.query
}
37 changes: 3 additions & 34 deletions apps/agent/pkg/database/regions.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,42 +10,11 @@ const (

func getClosestContinent(flyRegion string) continent {
switch flyRegion {
case "atl":
case "bog":
case "bos":
case "den":
case "dfw":
case "ewr":
case "iad":
case "lax":
case "mia":
case "ord":
case "qro":
case "scl":
case "sea":
case "sjc":
case "yul":
case "yyz":
case "atl", "bog", "bos", "den", "dfw", "ewr", "iad", "lax", "mia", "ord", "qro", "scl", "sea", "sjc", "yul", "yyz":
return continentUs
case "ams":
case "arn":
case "cdg":
case "eze":
case "fra":
case "gdl":
case "gig":
case "gru":
case "jnb":
case "lhr":
case "mad":
case "otp":
case "waw":
case "ams", "arn", "cdg", "eze", "fra", "gdl", "gig", "gru", "jnb", "lhr", "mad", "otp", "waw":
return continentEu
case "hkg":
case "bom":
case "nrt":
case "sin":
case "syd":
case "hkg", "bom", "nrt", "sin", "syd":
return continentAsia
}
return continentUs
Expand Down
79 changes: 79 additions & 0 deletions apps/agent/pkg/server/key_stats.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package server

import (
"fmt"

"github.com/gofiber/fiber/v2"
"github.com/unkeyed/unkey/apps/agent/pkg/errors"
)

type GetKeyStatsRequest struct {
KeyId string `validate:"required"`
}

type usageRecord struct {
Time int64 `json:"time"`
Value int64 `json:"value"`
}

type GetKeyStatsResponse struct {
Usage []usageRecord `json:"usage"`
}

func (s *Server) getKeyStats(c *fiber.Ctx) error {
ctx, span := s.tracer.Start(c.UserContext(), "server.getKey")
defer span.End()
req := GetKeyStatsRequest{
KeyId: c.Params("keyId"),
}

err := s.validator.Struct(req)
if err != nil {
return errors.NewHttpError(c, errors.BAD_REQUEST, err.Error())
}

authHash, err := getKeyHash(c.Get("Authorization"))
if err != nil {
return errors.NewHttpError(c, errors.UNAUTHORIZED, err.Error())
}

authKey, found, err := s.db.FindKeyByHash(ctx, authHash)
if err != nil {
return errors.NewHttpError(c, errors.INTERNAL_SERVER_ERROR, fmt.Sprintf("unable to find key: %s", err.Error()))
}
if !found {
return errors.NewHttpError(c, errors.NOT_FOUND, fmt.Sprintf("unable to find key by hash: %s", authHash))
}

if authKey.ForWorkspaceId == "" {
return errors.NewHttpError(c, errors.BAD_REQUEST, "wrong key type")
}

key, found, err := s.db.FindKeyById(ctx, req.KeyId)
if err != nil {
return errors.NewHttpError(c, errors.INTERNAL_SERVER_ERROR, err.Error())
}
if !found {
return errors.NewHttpError(c, errors.NOT_FOUND, fmt.Sprintf("key %s not found", req.KeyId))
}
if key.WorkspaceId != authKey.ForWorkspaceId {
return errors.NewHttpError(c, errors.UNAUTHORIZED, "workspace access denied")
}

keyStats, err := s.tinybird.GetKeyStats(ctx, key.Id)
if err != nil {
return errors.NewHttpError(c, errors.INTERNAL_SERVER_ERROR, "unable to load stats")
}

res := GetKeyStatsResponse{
Usage: make([]usageRecord, len(keyStats.Usage)),
}
for i, day := range keyStats.Usage {
res.Usage[i] = usageRecord{
Time: day.Time,
Value: day.Value,
}
}

return c.JSON(res)
}
8 changes: 6 additions & 2 deletions apps/agent/pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,11 +112,13 @@ func New(config Config) *Server {
attribute.String("method", c.Route().Method),
attribute.String("path", c.Path()),
attribute.String("edgeRegion", edgeRegion),
attribute.String("region", s.region),
))
defer span.End()
c.SetUserContext(ctx)
traceId := span.SpanContext().TraceID().String()

c.Set("Unkey-Trace-Id", fmt.Sprintf("%s:%s::%s", s.region, edgeRegion, span.SpanContext().TraceID().String()))
c.Set("Unkey-Trace-Id", fmt.Sprintf("%s:%s::%s", s.region, edgeRegion, traceId))
c.Set("Unkey-Version", s.version)
start := time.Now()
err := c.Next()
Expand All @@ -129,7 +131,7 @@ func New(config Config) *Server {
zap.Error(err),
zap.Int64("serviceLatency", latency.Milliseconds()),
zap.String("edgeRegion", edgeRegion),
zap.Int("status", c.Response().StatusCode()),
zap.String("traceId", traceId),
)

if err != nil || c.Response().StatusCode() >= 500 {
Expand All @@ -156,6 +158,8 @@ func New(config Config) *Server {
s.app.Get("/v1/apis/:apiId", s.getApi)
s.app.Get("/v1/apis/:apiId/keys", s.listKeys)

s.app.Get("/vx/keys/:keyId/stats", s.getKeyStats)

return s
}

Expand Down
66 changes: 65 additions & 1 deletion apps/agent/pkg/tinybird/publish.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,14 @@ package tinybird

import (
"bytes"
"context"
"encoding/json"
"fmt"
"go.uber.org/zap"
"io"
"net/http"
"time"

"go.uber.org/zap"
)

type Tinybird struct {
Expand Down Expand Up @@ -94,3 +98,63 @@ func (t *Tinybird) publishEvent(datasource string, event interface{}) error {

return nil
}

type usage struct {
Time int64
Value int64
}
type KeyStats struct {
Usage []usage
}

func (t *Tinybird) GetKeyStats(ctx context.Context, keyId string) (KeyStats, error) {

req, err := http.NewRequest("GET", fmt.Sprintf("https://api.tinybird.co/v0/pipes/x__endpoint_get_daily_key_stats.json?keyId=%s&token=%s", keyId, t.token), nil)
if err != nil {
return KeyStats{}, fmt.Errorf("unable to prepare request: %w", err)
}

res, err := http.DefaultClient.Do(req)
if err != nil {
return KeyStats{}, fmt.Errorf("unable to call tinybird: %w", err)
}
defer res.Body.Close()
if res.StatusCode != 200 {
return KeyStats{}, fmt.Errorf("unable to call tinybird: status=%d", res.StatusCode)
}

type tinybirdResponse struct {
Data []struct {
Time string `json:"time"`
Usage int64 `json:"usage"`
} `json:"data"`
}

body, err := io.ReadAll(res.Body)
if err != nil {
return KeyStats{}, fmt.Errorf("unable to read body: %w", err)
}
tr := tinybirdResponse{}

err = json.Unmarshal(body, &tr)
if err != nil {
return KeyStats{}, fmt.Errorf("unable to unmarshal body: %w", err)
}

stats := KeyStats{
Usage: make([]usage, len(tr.Data)),
}
for i, day := range tr.Data {
t, err := time.Parse(time.DateTime, day.Time)
if err != nil {
return KeyStats{}, fmt.Errorf("unable to parse time %s: %w", day.Time, err)
}
stats.Usage[i] = usage{
Time: t.UnixMilli(),
Value: day.Usage,
}
}

return stats, nil

}
1 change: 1 addition & 0 deletions apps/agent/pkg/tracing/axiom.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package tracing
import (
"context"
"fmt"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"

Expand Down
Loading

1 comment on commit aa6d1e7

@vercel
Copy link

@vercel vercel bot commented on aa6d1e7 Aug 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Successfully deployed to the following URLs:

unkey – ./

unkey-unkey.vercel.app
unkey.dev
www.unkey.dev
unkey-git-main-unkey.vercel.app
unkey.vercel.app

Please sign in to comment.