From 9c1f2120206f20034b66b6a094c9c3b0860660ea Mon Sep 17 00:00:00 2001 From: Cosmin Tupangiu Date: Tue, 3 Dec 2024 10:52:40 +0100 Subject: [PATCH] eventProducer: Implement event producer This commit adds an event producer that writes events to a underlying event (stdout or kafka). For now, the supported events are: inventory updated and agent state changed. Signed-off-by: Cosmin Tupangiu --- cmd/planner-api/run.go | 39 ++++++- go.mod | 39 +++++-- go.sum | 113 ++++++++++++++------ internal/api_server/agentserver/server.go | 6 +- internal/api_server/server.go | 6 +- internal/config/config.go | 23 ++-- internal/events/buffer.go | 56 ++++++++++ internal/events/buffer_test.go | 74 +++++++++++++ internal/events/kafka_suite_test.go | 13 +++ internal/events/models.go | 16 +++ internal/events/options.go | 9 ++ internal/events/producer.go | 124 ++++++++++++++++++++++ internal/events/producer_test.go | 53 +++++++++ internal/events/writer_stdout.go | 20 ++++ internal/service/agent/handler.go | 56 +++++++++- internal/service/agent/handler_test.go | 55 ++++++++-- internal/service/agent_test.go | 13 ++- internal/service/handler.go | 9 +- internal/service/source_test.go | 34 +++++- 19 files changed, 685 insertions(+), 73 deletions(-) create mode 100644 internal/events/buffer.go create mode 100644 internal/events/buffer_test.go create mode 100644 internal/events/kafka_suite_test.go create mode 100644 internal/events/models.go create mode 100644 internal/events/options.go create mode 100644 internal/events/producer.go create mode 100644 internal/events/producer_test.go create mode 100644 internal/events/writer_stdout.go diff --git a/cmd/planner-api/run.go b/cmd/planner-api/run.go index 3a4fed2..89b67c8 100644 --- a/cmd/planner-api/run.go +++ b/cmd/planner-api/run.go @@ -7,9 +7,12 @@ import ( "os/signal" "syscall" + "github.com/IBM/sarama" + pkgKafka "github.com/kubev2v/migration-event-streamer/pkg/kafka" apiserver "github.com/kubev2v/migration-planner/internal/api_server" "github.com/kubev2v/migration-planner/internal/api_server/agentserver" "github.com/kubev2v/migration-planner/internal/config" + "github.com/kubev2v/migration-planner/internal/events" "github.com/kubev2v/migration-planner/internal/store" "github.com/kubev2v/migration-planner/pkg/log" "github.com/spf13/cobra" @@ -56,6 +59,9 @@ var runCmd = &cobra.Command{ zap.S().Fatalf("running initial migration: %v", err) } + // initilize event writer + ep, _ := getEventProducer(cfg) + ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGHUP, syscall.SIGTERM, syscall.SIGQUIT) go func() { defer cancel() @@ -64,7 +70,7 @@ var runCmd = &cobra.Command{ zap.S().Fatalf("creating listener: %s", err) } - server := apiserver.New(cfg, store, listener) + server := apiserver.New(cfg, store, ep, listener) if err := server.Run(ctx); err != nil { zap.S().Fatalf("Error running server: %s", err) } @@ -77,13 +83,15 @@ var runCmd = &cobra.Command{ zap.S().Fatalf("creating listener: %s", err) } - agentserver := agentserver.New(cfg, store, listener) + agentserver := agentserver.New(cfg, store, ep, listener) if err := agentserver.Run(ctx); err != nil { zap.S().Fatalf("Error running server: %s", err) } }() <-ctx.Done() + _ = ep.Close() + return nil }, } @@ -94,3 +102,30 @@ func newListener(address string) (net.Listener, error) { } return net.Listen("tcp", address) } + +func getEventProducer(cfg *config.Config) (*events.EventProducer, error) { + if len(cfg.Service.Kafka.Brokers) == 0 { + stdWriter := &events.StdoutWriter{} + ew := events.NewEventProducer(stdWriter) + return ew, nil + } + + saramaConfig := sarama.NewConfig() + if cfg.Service.Kafka.SaramaConfig != nil { + saramaConfig = cfg.Service.Kafka.SaramaConfig + } + saramaConfig.Version = sarama.V3_6_0_0 + + kp, err := pkgKafka.NewKafkaProducer(cfg.Service.Kafka.Brokers, saramaConfig) + if err != nil { + return nil, err + } + + zap.S().Named("planner-api").Infof("connected to kafka: %v", cfg.Service.Kafka.Brokers) + + if cfg.Service.Kafka.Topic != "" { + return events.NewEventProducer(kp, events.WithOutputTopic(cfg.Service.Kafka.Topic)), nil + } + + return events.NewEventProducer(kp), nil +} diff --git a/go.mod b/go.mod index f26dcb2..0c580ee 100644 --- a/go.mod +++ b/go.mod @@ -1,8 +1,10 @@ module github.com/kubev2v/migration-planner -go 1.21.11 +go 1.22.2 require ( + github.com/IBM/sarama v1.43.3 + github.com/cloudevents/sdk-go/v2 v2.15.2 github.com/coreos/butane v0.22.0 github.com/getkin/kin-openapi v0.126.0 github.com/go-chi/chi v1.5.5 @@ -10,6 +12,7 @@ require ( github.com/go-chi/render v1.0.3 github.com/google/uuid v1.6.0 github.com/konveyor/forklift-controller v0.0.0-20221102112227-e73b65a01cda + github.com/kubev2v/migration-event-streamer v0.0.0-20241125102656-9cdf9e64a16b github.com/leosunmo/zapchi v0.2.0 github.com/libvirt/libvirt-go v7.4.0+incompatible github.com/lthibault/jitterbug v2.0.0+incompatible @@ -24,6 +27,7 @@ require ( github.com/thoas/go-funk v0.9.3 github.com/vmware/govmomi v0.39.0 go.uber.org/zap v1.26.0 + golang.org/x/sync v0.8.0 gorm.io/driver/postgres v1.5.9 gorm.io/driver/sqlite v1.5.6 gorm.io/gorm v1.25.11 @@ -42,6 +46,7 @@ require ( github.com/chenzhuoyu/base64x v0.0.0-20230717121745-296ad89f973d // indirect github.com/chenzhuoyu/iasm v0.9.0 // indirect github.com/clarketm/json v1.17.1 // indirect + github.com/cloudevents/sdk-go/protocol/kafka_sarama/v2 v2.15.2 // indirect github.com/coreos/go-json v0.0.0-20230131223807-18775e0fb4fb // indirect github.com/coreos/go-semver v0.3.1 // indirect github.com/coreos/go-systemd/v22 v22.5.0 // indirect @@ -49,6 +54,9 @@ require ( github.com/coreos/vcontext v0.0.0-20230201181013-d72178a18687 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/diskfs/go-diskfs v1.4.0 // indirect + github.com/eapache/go-resiliency v1.7.0 // indirect + github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect + github.com/eapache/queue v1.1.0 // indirect github.com/elliotwutingfeng/asciiset v0.0.0-20230602022725-51bbb787efab // indirect github.com/emicklei/go-restful/v3 v3.11.0 // indirect github.com/evanphx/json-patch/v5 v5.6.0 // indirect @@ -69,12 +77,16 @@ require ( github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/mock v1.6.0 // indirect github.com/golang/protobuf v1.5.4 // indirect + github.com/golang/snappy v0.0.4 // indirect github.com/google/gnostic-models v0.6.8 // indirect github.com/google/go-cmp v0.6.0 // indirect github.com/google/gofuzz v1.2.0 // indirect github.com/google/pprof v0.0.0-20211214055906-6f57359322fd // indirect github.com/gorilla/mux v1.8.1 // indirect github.com/gorilla/websocket v1.5.1 // indirect + github.com/hashicorp/errwrap v1.1.0 // indirect + github.com/hashicorp/go-multierror v1.1.1 // indirect + github.com/hashicorp/go-uuid v1.0.3 // indirect github.com/imdario/mergo v0.3.7 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/invopop/yaml v0.3.1 // indirect @@ -82,11 +94,17 @@ require ( github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect github.com/jackc/pgx/v5 v5.5.5 // indirect github.com/jackc/puddle/v2 v2.2.1 // indirect + github.com/jcmturner/aescts/v2 v2.0.0 // indirect + github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect + github.com/jcmturner/gofork v1.7.6 // indirect + github.com/jcmturner/gokrb5/v8 v8.4.4 // indirect + github.com/jcmturner/rpc/v2 v2.0.3 // indirect github.com/jinzhu/inflection v1.0.0 // indirect github.com/jinzhu/now v1.1.5 // indirect github.com/josharian/intern v1.0.0 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/k8snetworkplumbingwg/network-attachment-definition-client v1.4.0 // indirect + github.com/klauspost/compress v1.17.9 // indirect github.com/klauspost/cpuid/v2 v2.2.5 // indirect github.com/leodido/go-urn v1.2.4 // indirect github.com/mailru/easyjson v0.7.7 // indirect @@ -99,12 +117,13 @@ require ( github.com/openshift/api v0.0.0-20230613151523-ba04973d3ed1 // indirect github.com/openshift/custom-resource-status v1.1.2 // indirect github.com/pborman/uuid v1.2.1 // indirect - github.com/pelletier/go-toml/v2 v2.1.0 // indirect + github.com/pelletier/go-toml/v2 v2.2.2 // indirect github.com/perimeterx/marshmallow v1.1.5 // indirect - github.com/pierrec/lz4/v4 v4.1.17 // indirect + github.com/pierrec/lz4/v4 v4.1.21 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pkg/xattr v0.4.9 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect + github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect github.com/stretchr/testify v1.9.0 // indirect github.com/twitchyliquid64/golang-asm v0.15.1 // indirect github.com/ugorji/go/codec v1.2.11 // indirect @@ -112,18 +131,16 @@ require ( github.com/vincent-petithory/dataurl v1.0.0 // indirect go.uber.org/multierr v1.11.0 // indirect golang.org/x/arch v0.4.0 // indirect - golang.org/x/crypto v0.23.0 // indirect - golang.org/x/exp v0.0.0-20230905200255-921286631fa9 // indirect - golang.org/x/net v0.25.0 // indirect + golang.org/x/crypto v0.26.0 // indirect + golang.org/x/net v0.28.0 // indirect golang.org/x/oauth2 v0.17.0 // indirect - golang.org/x/sync v0.7.0 // indirect - golang.org/x/sys v0.20.0 // indirect - golang.org/x/term v0.20.0 // indirect - golang.org/x/text v0.16.0 // indirect + golang.org/x/sys v0.23.0 // indirect + golang.org/x/term v0.23.0 // indirect + golang.org/x/text v0.17.0 // indirect golang.org/x/time v0.5.0 // indirect golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d // indirect google.golang.org/appengine v1.6.8 // indirect - google.golang.org/protobuf v1.33.0 // indirect + google.golang.org/protobuf v1.34.2 // indirect gopkg.in/djherbis/times.v1 v1.3.0 // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect diff --git a/go.sum b/go.sum index afd8a23..25d70b2 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,7 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/IBM/sarama v1.43.3 h1:Yj6L2IaNvb2mRBop39N7mmJAHBVY3dTPncr3qGVkxPA= +github.com/IBM/sarama v1.43.3/go.mod h1:FVIRaLrhK3Cla/9FfRF5X9Zua2KpS3SYIXxhac1H+FQ= github.com/NYTimes/gziphandler v0.0.0-20170623195520-56545f4a5d46/go.mod h1:3wb06e3pkSAbeQ52E9H9iFoQsEEwGN64994WTCIhntQ= github.com/PuerkitoBio/purell v1.1.1/go.mod h1:c11w/QuzBsJSee3cPx9rAFu61PvFxuPbtSwDGJws/X0= github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578/go.mod h1:uGdkoq3SwY9Y+13GIhn11/XLaGBb4BfwItxLd5jeuXE= @@ -22,8 +24,8 @@ github.com/bytedance/sonic v1.10.0-rc3/go.mod h1:iZcSUejdk5aukTND/Eu/ivjQuEL0Cu9 github.com/cavaliercoder/go-cpio v0.0.0-20180626203310-925f9528c45e h1:hHg27A0RSSp2Om9lubZpiMgVbvn39bsUmW9U5h0twqc= github.com/cavaliercoder/go-cpio v0.0.0-20180626203310-925f9528c45e/go.mod h1:oDpT4efm8tSYHXV5tHSdRvBet/b/QzxZ+XyyPehvm3A= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= -github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= -github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= +github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/chenzhuoyu/base64x v0.0.0-20211019084208-fb5309c8db06/go.mod h1:DH46F32mSOjUmXrMHnKwZdA8wcEefY7UVqBKYGjpdQY= github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311/go.mod h1:b583jCggY9gE99b6G5LEC39OIiVsWj+R97kbl5odCEk= github.com/chenzhuoyu/base64x v0.0.0-20230717121745-296ad89f973d h1:77cEq6EriyTZ0g/qfRdp61a3Uu/AWrgIq2s0ClJV1g0= @@ -36,6 +38,10 @@ github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMn github.com/clarketm/json v1.17.1 h1:U1IxjqJkJ7bRK4L6dyphmoO840P6bdhPdbbLySourqI= github.com/clarketm/json v1.17.1/go.mod h1:ynr2LRfb0fQU34l07csRNBTcivjySLLiY1YzQqKVfdo= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= +github.com/cloudevents/sdk-go/protocol/kafka_sarama/v2 v2.15.2 h1:dl2xbFLV2FGd3OBNC6ncSN9l+gPNEP0DYE+1yKVV5DQ= +github.com/cloudevents/sdk-go/protocol/kafka_sarama/v2 v2.15.2/go.mod h1:jXfl9I1Q78+4zdYGTjHNQcrbNtJL63jpzSgVE2rE79U= +github.com/cloudevents/sdk-go/v2 v2.15.2 h1:54+I5xQEnI73RBhWHxbI1XJcqOFOVJN85vb41+8mHUc= +github.com/cloudevents/sdk-go/v2 v2.15.2/go.mod h1:lL7kSWAE/V8VI4Wh0jbL2v/jvqsm6tjmaQBSvxcv4uE= github.com/coreos/butane v0.22.0 h1:nmXfiGqJMvPzBd2DfGyoayvO/KjpO6bES4uOEmtGTu8= github.com/coreos/butane v0.22.0/go.mod h1:3OKS5qaH58O2yLAKgAtOgBpUQSm7aIOU09IpG+IvmF4= github.com/coreos/go-json v0.0.0-20230131223807-18775e0fb4fb h1:rmqyI19j3Z/74bIRhuC59RB442rXUazKNueVpfJPxg4= @@ -57,6 +63,12 @@ github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8Yc github.com/diskfs/go-diskfs v1.4.0 h1:MAybY6TPD+fmhY+a2qFhmdvMeIKvCqlgh4QIc1uCmBs= github.com/diskfs/go-diskfs v1.4.0/go.mod h1:G8cyy+ngM+3yKlqjweMmtqvE+TxsnIo1xumbJX1AeLg= github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815/go.mod h1:WwZ+bS3ebgob9U8Nd0kOddGdZWjyMGR8Wziv+TBNwSE= +github.com/eapache/go-resiliency v1.7.0 h1:n3NRTnBn5N0Cbi/IeOHuQn9s2UwVUH7Ga0ZWcP+9JTA= +github.com/eapache/go-resiliency v1.7.0/go.mod h1:5yPzW0MIvSe0JDsv0v+DvcjEv2FyD6iZYSs1ZI+iQho= +github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 h1:Oy0F4ALJ04o5Qqpdz8XLIpNA3WM/iSIXqxtqo7UGVws= +github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3/go.mod h1:YvSRo5mw33fLEx1+DlK6L2VV43tJt5Eyel9n9XBcR+0= +github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc= +github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= github.com/elazarl/goproxy v0.0.0-20180725130230-947c36da3153/go.mod h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc= github.com/elliotwutingfeng/asciiset v0.0.0-20230602022725-51bbb787efab h1:h1UgjJdAAhj+uPL68n7XASS6bU+07ZX1WJvVS2eyoeY= github.com/elliotwutingfeng/asciiset v0.0.0-20230602022725-51bbb787efab/go.mod h1:GLo/8fDswSAniFG+BFIaiSPcK610jyzgEhWYPQwuQdw= @@ -70,10 +82,12 @@ github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7 github.com/evanphx/json-patch v4.12.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk= github.com/evanphx/json-patch/v5 v5.6.0 h1:b91NhWfaz02IuVxO9faSllyAtNXHMPkC5J8sJCLunww= github.com/evanphx/json-patch/v5 v5.6.0/go.mod h1:G79N1coSVB93tBe7j6PhzjmR3/2VvlbKOFpnXhI9Bw4= +github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw= +github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= -github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY= -github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw= +github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA= +github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM= github.com/gabriel-vasile/mimetype v1.4.2 h1:w5qFW6JKBz9Y393Y4q372O9A7cUSequkh1Q7OhCmWKU= github.com/gabriel-vasile/mimetype v1.4.2/go.mod h1:zApsH/mKG4w07erKIaJPFiX0Tsq9BFQgN3qGY5GnNgA= github.com/getkin/kin-openapi v0.76.0/go.mod h1:660oXbgy5JFMKreazJaQTw7o+X00qeSyhcnluiMv+Xg= @@ -158,6 +172,8 @@ github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaS github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= +github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= +github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/gnostic-models v0.6.8 h1:yo/ABAfM5IMRsS1VnXjTBvUb61tFIHozhlYvRgGre9I= github.com/google/gnostic-models v0.6.8/go.mod h1:5n7qKqH0f5wFt+aWF8CW6pZLLNOfYuF5OpfBSENuI8U= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= @@ -185,9 +201,19 @@ github.com/googleapis/gnostic v0.5.5/go.mod h1:7+EbHbldMins07ALC74bsA81Ovc97Dwqy github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= github.com/gorilla/mux v1.8.1 h1:TuBL49tXwgrFYWhqrNgrUNEY92u81SPhu7sTdzQEiWY= github.com/gorilla/mux v1.8.1/go.mod h1:AKf9I4AEqPTmMytcMc0KkNouC66V3BtZ4qD5fmWSiMQ= +github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+HVt/4epWDjd4= +github.com/gorilla/sessions v1.2.1/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/zI+bUmuGM= github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/gorilla/websocket v1.5.1 h1:gmztn0JnHVt9JZquRuzLw3g4wouNVzKL15iLr/zn/QY= github.com/gorilla/websocket v1.5.1/go.mod h1:x3kM2JMyaluk02fnUJpQuwD2dCS5NDG2ZHL0uE0tcaY= +github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= +github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I= +github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= +github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo= +github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM= +github.com/hashicorp/go-uuid v1.0.2/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= +github.com/hashicorp/go-uuid v1.0.3 h1:2gKiV6YVmrJ1i2CKKa9obLvRieoRGviZFL26PcT/Co8= +github.com/hashicorp/go-uuid v1.0.3/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= github.com/ianlancetaylor/demangle v0.0.0-20210905161508-09a460cdf81d/go.mod h1:aYm2/VgdVmcIU8iMfdMvDMsRAQjcfZSKFby6HOFvi/w= @@ -205,6 +231,18 @@ github.com/jackc/pgx/v5 v5.5.5 h1:amBjrZVmksIdNjxGW/IiIMzxMKZFelXbUoPNb+8sjQw= github.com/jackc/pgx/v5 v5.5.5/go.mod h1:ez9gk+OAat140fv9ErkZDYFWmXLfV+++K0uAOiwgm1A= github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk= github.com/jackc/puddle/v2 v2.2.1/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= +github.com/jcmturner/aescts/v2 v2.0.0 h1:9YKLH6ey7H4eDBXW8khjYslgyqG2xZikXP0EQFKrle8= +github.com/jcmturner/aescts/v2 v2.0.0/go.mod h1:AiaICIRyfYg35RUkr8yESTqvSy7csK90qZ5xfvvsoNs= +github.com/jcmturner/dnsutils/v2 v2.0.0 h1:lltnkeZGL0wILNvrNiVCR6Ro5PGU/SeBvVO/8c/iPbo= +github.com/jcmturner/dnsutils/v2 v2.0.0/go.mod h1:b0TnjGOvI/n42bZa+hmXL+kFJZsFT7G4t3HTlQ184QM= +github.com/jcmturner/gofork v1.7.6 h1:QH0l3hzAU1tfT3rZCnW5zXl+orbkNMMRGJfdJjHVETg= +github.com/jcmturner/gofork v1.7.6/go.mod h1:1622LH6i/EZqLloHfE7IeZ0uEJwMSUyQ/nDd82IeqRo= +github.com/jcmturner/goidentity/v6 v6.0.1 h1:VKnZd2oEIMorCTsFBnJWbExfNN7yZr3EhJAxwOkZg6o= +github.com/jcmturner/goidentity/v6 v6.0.1/go.mod h1:X1YW3bgtvwAXju7V3LCIMpY0Gbxyjn/mY9zx4tFonSg= +github.com/jcmturner/gokrb5/v8 v8.4.4 h1:x1Sv4HaTpepFkXbt2IkL29DXRf8sOfZXo8eRKh687T8= +github.com/jcmturner/gokrb5/v8 v8.4.4/go.mod h1:1btQEpgT6k+unzCwX1KdWMEwPPkkgBtP+F6aCACiMrs= +github.com/jcmturner/rpc/v2 v2.0.3 h1:7FXXj8Ti1IaVFpSAziCZWNzbNuZmnvw/i6CqLNdWfZY= +github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc= github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E= github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc= @@ -221,6 +259,8 @@ github.com/k8snetworkplumbingwg/network-attachment-definition-client v1.4.0 h1:V github.com/k8snetworkplumbingwg/network-attachment-definition-client v1.4.0/go.mod h1:nqCI7aelBJU61wiBeeZWJ6oi4bJy5nrjkM6lWIMA4j0= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA= +github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= github.com/klauspost/cpuid/v2 v2.2.5 h1:0E5MSMDEoAulmXNFquVs//DdoomxaoTY1kUhbc/qbZg= github.com/klauspost/cpuid/v2 v2.2.5/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws= @@ -236,6 +276,8 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/kubev2v/forklift v0.0.0-20241129095927-4890e072e015 h1:lKoe5Sy+faux6gdX/pCsBsEOQYvqAwhpAOglmGjhWMo= github.com/kubev2v/forklift v0.0.0-20241129095927-4890e072e015/go.mod h1:fHaGLhv09dWXKv0/0GNl3rgLe/KH5Y6IyG6eGLYaA6k= +github.com/kubev2v/migration-event-streamer v0.0.0-20241125102656-9cdf9e64a16b h1:xOHUPs9sVGie2EpTZDfSsxUPZHMBgm8XYvthdAMzJD4= +github.com/kubev2v/migration-event-streamer v0.0.0-20241125102656-9cdf9e64a16b/go.mod h1:xpo9o779xi1mM0142E8KqVc205ahAP1wswoN+DKbX8E= github.com/leodido/go-urn v1.1.0/go.mod h1:+cyI34gQWZcE1eQU7NVgKkkzdXDQHr1dBMtdAPozLkw= github.com/leodido/go-urn v1.2.4 h1:XlAE/cm/ms7TE/VMVoduSpNBoyc2dOxHs5MZSwAN63Q= github.com/leodido/go-urn v1.2.4/go.mod h1:7ZrI8mTSeBSHl/UaRyKQW1qZeMgak41ANeCNaVckg+4= @@ -304,12 +346,12 @@ github.com/openshift/custom-resource-status v1.1.2 h1:C3DL44LEbvlbItfd8mT5jWrqPf github.com/openshift/custom-resource-status v1.1.2/go.mod h1:DB/Mf2oTeiAmVVX1gN+NEqweonAPY0TKUwADizj8+ZA= github.com/pborman/uuid v1.2.1 h1:+ZZIw58t/ozdjRaXh/3awHfmWRbzYxJoAdNJxe/3pvw= github.com/pborman/uuid v1.2.1/go.mod h1:X/NO0urCmaxf9VXbdlT7C2Yzkj2IKimNn4k+gtPdI/k= -github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4= -github.com/pelletier/go-toml/v2 v2.1.0/go.mod h1:tJU2Z3ZkXwnxa4DPO899bsyIoywizdUvyaeZurnPPDc= +github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM= +github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs= github.com/perimeterx/marshmallow v1.1.5 h1:a2LALqQ1BlHM8PZblsDdidgv1mWi1DgC2UmX50IvK2s= github.com/perimeterx/marshmallow v1.1.5/go.mod h1:dsXbUu8CRzfYP5a87xpp0xq9S3u0Vchtcl8we9tYaXw= -github.com/pierrec/lz4/v4 v4.1.17 h1:kV4Ip+/hUBC+8T6+2EgburRtkE9ef4nbY3f4dFhGjMc= -github.com/pierrec/lz4/v4 v4.1.17/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= +github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= +github.com/pierrec/lz4/v4 v4.1.21/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= @@ -318,15 +360,17 @@ github.com/pkg/xattr v0.4.9/go.mod h1:di8WF84zAKk8jzR1UBTEWh9AUlIZZ7M/JNt8e9B6kt github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/prometheus/client_golang v1.19.0 h1:ygXvpU1AoN1MhdzckN+PyD9QJOSD4x7kmXYlnfbA6JU= -github.com/prometheus/client_golang v1.19.0/go.mod h1:ZRM9uEAypZakd+q/x7+gmsvXdURP+DABIEIjnmDdp+k= +github.com/prometheus/client_golang v1.20.5 h1:cxppBPuYhUnsO6yo/aoRol4L7q7UFfdm+bR9r+8l63Y= +github.com/prometheus/client_golang v1.20.5/go.mod h1:PIEt8X02hGcP8JWbeHyeZ53Y/jReSnHgO035n//V5WE= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= -github.com/prometheus/client_model v0.5.0 h1:VQw1hfvPvk3Uv6Qf29VrPF32JB6rtbgI6cYPYQjL0Qw= -github.com/prometheus/client_model v0.5.0/go.mod h1:dTiFglRmd66nLR9Pv9f0mZi7B7fk5Pm3gvsjB5tr+kI= -github.com/prometheus/common v0.48.0 h1:QO8U2CdOzSn1BBsmXJXduaaW+dY/5QLjfB8svtSzKKE= -github.com/prometheus/common v0.48.0/go.mod h1:0/KsvlIEfPQCQ5I2iNSAWKPZziNCvRs5EC6ILDTlAPc= -github.com/prometheus/procfs v0.12.0 h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k6Bo= -github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3cnaOZAZEfOo= +github.com/prometheus/client_model v0.6.1 h1:ZKSh/rekM+n3CeS952MLRAdFwIKqeY8b62p8ais2e9E= +github.com/prometheus/client_model v0.6.1/go.mod h1:OrxVMOVHjw3lKMa8+x6HeMGkHMQyHDk9E3jmP2AmGiY= +github.com/prometheus/common v0.55.0 h1:KEi6DK7lXW/m7Ig5i47x0vRzuBsHuvJdi5ee6Y3G1dc= +github.com/prometheus/common v0.55.0/go.mod h1:2SECS4xJG1kd8XF9IcM1gMX6510RAEL65zxzNImwdc8= +github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0learggepc= +github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk= +github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5XpJzTSTfLsJV/mx9Q9g7kxmchpfZyxgzM= +github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8= github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4= @@ -344,6 +388,7 @@ github.com/stoewer/go-strcase v1.2.0/go.mod h1:IBiWB2sKIp3wVVQ3Y035++gc+knqhUQag github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= @@ -366,6 +411,8 @@ github.com/ugorji/go/codec v1.2.11 h1:BMaWp1Bb6fHwEtbplGBGJ498wD+LKlNSl25MjdZY4d github.com/ugorji/go/codec v1.2.11/go.mod h1:UNopzCgEMSXjBc6AOMqYvWC1ktqTAfzJZUZgYf6w6lg= github.com/ulikunitz/xz v0.5.11 h1:kpFauv27b6ynzBNT/Xy+1k+fK4WswhN/6PN5WhFAGw8= github.com/ulikunitz/xz v0.5.11/go.mod h1:nbz6k7qbPmH4IRqmfOplQw/tblSgqTqBwxkY0oWt/14= +github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= +github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= github.com/vincent-petithory/dataurl v1.0.0 h1:cXw+kPto8NLuJtlMsI152irrVw9fRDX8AbShPRpg2CI= github.com/vincent-petithory/dataurl v1.0.0/go.mod h1:FHafX5vmDzyP+1CQATJn7WFKc9CvnvxyvZy6I1MrG/U= github.com/vmware/govmomi v0.34.1 h1:Hqu2Uke2itC+cNoIcFQBLEZvX9wBRTTOP04J7V1fqRw= @@ -398,8 +445,9 @@ golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8U golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= -golang.org/x/crypto v0.23.0 h1:dIJU/v2J8Mdglj/8rJ6UUOM3Zc9zLZxVZwwxMooUSAI= -golang.org/x/crypto v0.23.0/go.mod h1:CKFgDieR+mRhux2Lsu27y0fO304Db0wZe70UKqHu0v8= +golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58= +golang.org/x/crypto v0.26.0 h1:RrRspgV4mU+YwB4FYnuBoKsUapNIL5cohGAmSH3azsw= +golang.org/x/crypto v0.26.0/go.mod h1:GY7jblb9wI+FOo5y8/S2oY4zWP07AkOJ4+jxCqdqn54= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20230905200255-921286631fa9 h1:GoHiUyI/Tp2nVkLI2mCxVkOjsbSXD66ic0XW0js0R9g= golang.org/x/exp v0.0.0-20230905200255-921286631fa9/go.mod h1:S2oDrQGGwySpoQPVqRShND87VCbxmc6bL1Yd2oYrm6k= @@ -421,6 +469,7 @@ golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20190827160401-ba9fcec4b297/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= @@ -433,8 +482,10 @@ golang.org/x/net v0.0.0-20211015210444-4f30a5c0130f/go.mod h1:9nx3DQGgdP8bBQD5qx golang.org/x/net v0.0.0-20211209124913-491a49abca63/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= -golang.org/x/net v0.25.0 h1:d/OCCoBEUq33pjydKrGQhw7IlUPI2Oylr+8qLx49kac= -golang.org/x/net v0.25.0/go.mod h1:JkAGAh7GEvH74S6FOH42FLoXpXbE/aqXSrIQjXgsiwM= +golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= +golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= +golang.org/x/net v0.28.0 h1:a9JDOJc5GMUJ0+UDqmLT86WiEy7iWyIhz8gz8E4e5hE= +golang.org/x/net v0.28.0/go.mod h1:yqtgsTWOOnlGLG9GFRrK3++bGOUEkNBoHZc8MEDWPNg= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.17.0 h1:6m3ZPmLEFdVxKKWnKq4VqZ60gutO35zm+zrAHVmHyDQ= golang.org/x/oauth2 v0.17.0/go.mod h1:OzPDGQiuQMguemayvdylqddI7qcD9lnSDb+1FiwQ5HA= @@ -445,8 +496,8 @@ golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M= -golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ= +golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -478,12 +529,13 @@ golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y= -golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.23.0 h1:YfKFowiIMvtgl1UERQoTPPToxltDeZfbj4H7dVUCwmM= +golang.org/x/sys v0.23.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= -golang.org/x/term v0.20.0 h1:VnkxpohqXaOBYJtBmEppKUG6mXpi+4O6purfc2+sMhw= -golang.org/x/term v0.20.0/go.mod h1:8UkIAJTvZgivsXaD6/pH6U9ecQzZ45awqEOzuCvwpFY= +golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= +golang.org/x/term v0.23.0 h1:F6D4vR+EHoL9/sWAWgAR1H2DcHr4PareCbAaCo1RpuU= +golang.org/x/term v0.23.0/go.mod h1:DgV24QBUrK6jhZXl+20l6UWznPlwAHm1Q1mGHtydmSk= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= @@ -491,8 +543,9 @@ golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ= -golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4= -golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI= +golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= +golang.org/x/text v0.17.0 h1:XtiM5bkSOt+ewxlOE/aE/AKEHibwj/6gvWMl9Rsh0Qc= +golang.org/x/text v0.17.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk= golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= @@ -544,8 +597,8 @@ google.golang.org/protobuf v1.24.0/go.mod h1:r/3tXBNzIEhYS9I1OUVjXDlt8tc493IdKGj google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= -google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI= -google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= +google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg= +google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/internal/api_server/agentserver/server.go b/internal/api_server/agentserver/server.go index 7b5ca47..bed90bc 100644 --- a/internal/api_server/agentserver/server.go +++ b/internal/api_server/agentserver/server.go @@ -13,6 +13,7 @@ import ( api "github.com/kubev2v/migration-planner/api/v1alpha1/agent" server "github.com/kubev2v/migration-planner/internal/api/server/agent" "github.com/kubev2v/migration-planner/internal/config" + "github.com/kubev2v/migration-planner/internal/events" service "github.com/kubev2v/migration-planner/internal/service/agent" "github.com/kubev2v/migration-planner/internal/store" "github.com/leosunmo/zapchi" @@ -28,17 +29,20 @@ type AgentServer struct { cfg *config.Config store store.Store listener net.Listener + evWriter *events.EventProducer } // New returns a new instance of a migration-planner server. func New( cfg *config.Config, store store.Store, + ew *events.EventProducer, listener net.Listener, ) *AgentServer { return &AgentServer{ cfg: cfg, store: store, + evWriter: ew, listener: listener, } } @@ -68,7 +72,7 @@ func (s *AgentServer) Run(ctx context.Context) error { oapimiddleware.OapiRequestValidatorWithOptions(swagger, &oapiOpts), ) - h := service.NewAgentServiceHandler(s.store) + h := service.NewAgentServiceHandler(s.store, s.evWriter) server.HandlerFromMux(server.NewStrictHandler(h, nil), router) srv := http.Server{Addr: s.cfg.Service.Address, Handler: router} diff --git a/internal/api_server/server.go b/internal/api_server/server.go index 58e5488..46ea1ae 100644 --- a/internal/api_server/server.go +++ b/internal/api_server/server.go @@ -13,6 +13,7 @@ import ( api "github.com/kubev2v/migration-planner/api/v1alpha1" "github.com/kubev2v/migration-planner/internal/api/server" "github.com/kubev2v/migration-planner/internal/config" + "github.com/kubev2v/migration-planner/internal/events" "github.com/kubev2v/migration-planner/internal/image" "github.com/kubev2v/migration-planner/internal/service" "github.com/kubev2v/migration-planner/internal/store" @@ -29,18 +30,21 @@ type Server struct { cfg *config.Config store store.Store listener net.Listener + evWriter *events.EventProducer } // New returns a new instance of a migration-planner server. func New( cfg *config.Config, store store.Store, + ew *events.EventProducer, listener net.Listener, ) *Server { return &Server{ cfg: cfg, store: store, listener: listener, + evWriter: ew, } } @@ -80,7 +84,7 @@ func (s *Server) Run(ctx context.Context) error { withResponseWriter, ) - h := service.NewServiceHandler(s.store) + h := service.NewServiceHandler(s.store, s.evWriter) server.HandlerFromMux(server.NewStrictHandler(h, nil), router) srv := http.Server{Addr: s.cfg.Service.Address, Handler: router} diff --git a/internal/config/config.go b/internal/config/config.go index 4ec1f89..c24741c 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -6,6 +6,7 @@ import ( "os" "path/filepath" + "github.com/IBM/sarama" "github.com/kubev2v/migration-planner/internal/util" "sigs.k8s.io/yaml" ) @@ -29,11 +30,21 @@ type dbConfig struct { } type svcConfig struct { - Address string `json:"address,omitempty"` - AgentEndpointAddress string `json:"agentEndpointAddress,omitempty"` - BaseUrl string `json:"baseUrl,omitempty"` - BaseAgentEndpointUrl string `json:"baseAgentEndpointUrl,omitempty"` - LogLevel string `json:"logLevel,omitempty"` + Address string `json:"address,omitempty"` + AgentEndpointAddress string `json:"agentEndpointAddress,omitempty"` + BaseUrl string `json:"baseUrl,omitempty"` + BaseAgentEndpointUrl string `json:"baseAgentEndpointUrl,omitempty"` + LogLevel string `json:"logLevel,omitempty"` + Kafka kafkaConfig `json:"kafka,omitempty"` +} + +type kafkaConfig struct { + Brokers []string `yaml:"brokers"` + Topic string `yaml:"topic"` + Version sarama.KafkaVersion `yaml:"-"` + ClientID string `yaml:"clientID"` + + SaramaConfig *sarama.Config } func ConfigDir() string { @@ -120,7 +131,7 @@ func Validate(cfg *Config) error { } func (cfg *Config) String() string { - contents, err := json.Marshal(cfg) + contents, err := json.Marshal(cfg) // nolint: staticcheck if err != nil { return "" } diff --git a/internal/events/buffer.go b/internal/events/buffer.go new file mode 100644 index 0000000..4a5a936 --- /dev/null +++ b/internal/events/buffer.go @@ -0,0 +1,56 @@ +package events + +import "sync" + +type message struct { + Kind string + Data []byte + prev *message +} + +type buffer struct { + lock sync.Mutex + head *message + tail *message + size int +} + +func newBuffer() *buffer { + return &buffer{} +} + +func (b *buffer) PushBack(msg *message) error { + b.lock.Lock() + defer b.lock.Unlock() + + if b.head == nil { + b.head = msg + b.tail = msg + } else { + b.tail.prev = msg + b.tail = msg + } + b.size++ + + return nil +} + +func (b *buffer) Pop() *message { + if b.head == nil { + return nil + } + tmp := b.head + if b.head.prev != nil { + b.head = b.head.prev + } else { + // removing the last one + b.head = nil + b.tail = nil + } + b.size-- + return tmp +} + +func (b *buffer) Size() int { + return b.size +} diff --git a/internal/events/buffer_test.go b/internal/events/buffer_test.go new file mode 100644 index 0000000..12377ad --- /dev/null +++ b/internal/events/buffer_test.go @@ -0,0 +1,74 @@ +package events + +import ( + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +var _ = Describe("buffer", Ordered, func() { + Context("buffer", func() { + It("add successfully", func() { + buffer := newBuffer() + + // add the first message + err := buffer.PushBack(&message{Kind: InventoryMessageKind, Data: []byte("msg1")}) + Expect(err).To(BeNil()) + Expect(buffer.Size()).To(Equal(1)) + Expect(buffer.head).NotTo(BeNil()) + Expect(buffer.tail).NotTo(BeNil()) + + // second + err = buffer.PushBack(&message{Kind: InventoryMessageKind, Data: []byte("msg2")}) + Expect(err).To(BeNil()) + Expect(buffer.Size()).To(Equal(2)) + Expect(buffer.head).NotTo(BeNil()) + Expect(buffer.tail).NotTo(BeNil()) + + Expect(buffer.head.Data).To(Equal([]byte("msg1"))) + Expect(buffer.tail.Data).To(Equal([]byte("msg2"))) + + // third + err = buffer.PushBack(&message{Kind: InventoryMessageKind, Data: []byte("msg3")}) + Expect(err).To(BeNil()) + Expect(buffer.Size()).To(Equal(3)) + Expect(buffer.head).NotTo(BeNil()) + Expect(buffer.tail).NotTo(BeNil()) + + Expect(buffer.head.Data).To(Equal([]byte("msg1"))) + Expect(buffer.tail.Data).To(Equal([]byte("msg3"))) + }) + + It("pop", func() { + buffer := newBuffer() + + // add the first message + err := buffer.PushBack(&message{Kind: InventoryMessageKind, Data: []byte("msg1")}) + Expect(err).To(BeNil()) + err = buffer.PushBack(&message{Kind: InventoryMessageKind, Data: []byte("msg2")}) + Expect(err).To(BeNil()) + err = buffer.PushBack(&message{Kind: InventoryMessageKind, Data: []byte("msg3")}) + Expect(err).To(BeNil()) + Expect(buffer.Size()).To(Equal(3)) + + m := buffer.Pop() + Expect(m).NotTo(BeNil()) + Expect(m.Data).To(Equal([]byte("msg1"))) + Expect(buffer.Size()).To(Equal(2)) + + m = buffer.Pop() + Expect(m).NotTo(BeNil()) + Expect(m.Data).To(Equal([]byte("msg2"))) + Expect(buffer.Size()).To(Equal(1)) + + m = buffer.Pop() + Expect(m).NotTo(BeNil()) + Expect(m.Data).To(Equal([]byte("msg3"))) + Expect(buffer.Size()).To(Equal(0)) + Expect(buffer.head).To(BeNil()) + Expect(buffer.tail).To(BeNil()) + + m = buffer.Pop() + Expect(m).To(BeNil()) + }) + }) +}) diff --git a/internal/events/kafka_suite_test.go b/internal/events/kafka_suite_test.go new file mode 100644 index 0000000..01316ee --- /dev/null +++ b/internal/events/kafka_suite_test.go @@ -0,0 +1,13 @@ +package events + +import ( + "testing" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +func TestKafka(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Kafka Suite") +} diff --git a/internal/events/models.go b/internal/events/models.go new file mode 100644 index 0000000..f856437 --- /dev/null +++ b/internal/events/models.go @@ -0,0 +1,16 @@ +package events + +import ( + api "github.com/kubev2v/migration-planner/api/v1alpha1" +) + +type InventoryEvent struct { + SourceID string `json:"source_id"` + Inventory api.Inventory `json:"inventory"` +} + +type AgentEvent struct { + AgentID string `json:"agent_id"` + State string `json:"state"` + StateInfo string `json:"state_info"` +} diff --git a/internal/events/options.go b/internal/events/options.go new file mode 100644 index 0000000..1305ebb --- /dev/null +++ b/internal/events/options.go @@ -0,0 +1,9 @@ +package events + +type ProducerOptions func(e *EventProducer) + +func WithOutputTopic(topic string) ProducerOptions { + return func(e *EventProducer) { + e.topic = topic + } +} diff --git a/internal/events/producer.go b/internal/events/producer.go new file mode 100644 index 0000000..50c5eab --- /dev/null +++ b/internal/events/producer.go @@ -0,0 +1,124 @@ +package events + +import ( + "context" + "io" + "time" + + cloudevents "github.com/cloudevents/sdk-go/v2" + "github.com/google/uuid" + "go.uber.org/zap" + "golang.org/x/sync/errgroup" +) + +const ( + InventoryMessageKind string = "assisted.migrations.events.inventory" + AgentMessageKind string = "assisted.migrations.events.agent" + defaultTopic string = "assisted.migrations.events" +) + +// Writer is the interface to be implemented by the underlying writer. +type Writer interface { + Write(ctx context.Context, topic string, e cloudevents.Event) error + Close(ctx context.Context) error +} + +// EventProducer is a wrapper around a Writer with the buffer. +// It has a buffer to store pending events to not block the caller if the writer takes time to write the event. +type EventProducer struct { + buffer *buffer + startConsumingCh chan any + doneCh chan any + writer Writer + topic string +} + +func NewEventProducer(w Writer, opts ...ProducerOptions) *EventProducer { + ep := &EventProducer{ + buffer: newBuffer(), + startConsumingCh: make(chan any), + doneCh: make(chan any), + writer: w, + topic: defaultTopic, + } + + for _, o := range opts { + o(ep) + } + + go ep.run() + return ep +} + +func (ep *EventProducer) Write(ctx context.Context, kind string, body io.Reader) error { + d, err := io.ReadAll(body) + if err != nil { + return err + } + + prevSize := ep.buffer.Size() + if err := ep.buffer.PushBack(&message{ + Kind: kind, + Data: d, + }); err != nil { + return err + } + + if prevSize == 0 { + // unblock the producer and start sending messages + ep.startConsumingCh <- struct{}{} + } + + return nil +} + +func (ep *EventProducer) Close() error { + closeCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + g, ctx := errgroup.WithContext(closeCtx) + g.Go(func() error { + ep.doneCh <- struct{}{} + return ep.writer.Close(ctx) + }) + if err := g.Wait(); err != nil { + zap.S().Errorf("event producer closed with error: %s", err) + return err + } + + zap.S().Named("event producer").Info("event producer closed") + + return nil +} + +func (ep *EventProducer) run() { + for { + select { + case <-ep.doneCh: + return + default: + } + + if ep.buffer.Size() == 0 { + select { + case <-ep.startConsumingCh: + case <-ep.doneCh: + } + } + + msg := ep.buffer.Pop() + if msg == nil { + continue + } + + e := cloudevents.NewEvent() + e.SetID(uuid.NewString()) + e.SetSource("assisted.migrations.planner") + e.SetType(string(msg.Kind)) + _ = e.SetData(*cloudevents.StringOfApplicationJSON(), msg.Data) + + if err := ep.writer.Write(context.TODO(), ep.topic, e); err != nil { + zap.S().Named("kafka_producer").Errorw("failed to send message", "error", err, "event", e) + } + } +} diff --git a/internal/events/producer_test.go b/internal/events/producer_test.go new file mode 100644 index 0000000..293706d --- /dev/null +++ b/internal/events/producer_test.go @@ -0,0 +1,53 @@ +package events + +import ( + "bytes" + "context" + "time" + + cloudevents "github.com/cloudevents/sdk-go/v2" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +var _ = Describe("producer", Ordered, func() { + Context("write", func() { + It("writes succsessfully", func() { + w := newTestWriter() + kp := NewEventProducer(w) + + // add the first message + msg := []byte("msg1") + err := kp.Write(context.TODO(), "topic1", bytes.NewReader(msg)) + Expect(err).To(BeNil()) + Expect(len(w.Messages)).To(Equal(1)) + Expect(w.Messages[0].Context.GetType()).To(Equal("topic1")) + + msg = []byte("msg2") + err = kp.Write(context.TODO(), "topic2", bytes.NewReader(msg)) + Expect(err).To(BeNil()) + + <-time.After(1 * time.Second) + Expect(len(w.Messages)).To(Equal(2)) + + kp.Close() + }) + }) +}) + +type testwriter struct { + Messages []cloudevents.Event +} + +func newTestWriter() *testwriter { + return &testwriter{Messages: []cloudevents.Event{}} +} + +func (t *testwriter) Write(ctx context.Context, topic string, e cloudevents.Event) error { + t.Messages = append(t.Messages, e) + return nil +} + +func (t *testwriter) Close(_ context.Context) error { + return nil +} diff --git a/internal/events/writer_stdout.go b/internal/events/writer_stdout.go new file mode 100644 index 0000000..3c04ec4 --- /dev/null +++ b/internal/events/writer_stdout.go @@ -0,0 +1,20 @@ +package events + +import ( + "context" + + cloudevents "github.com/cloudevents/sdk-go/v2" + "go.uber.org/zap" +) + +// event writer used in dev +type StdoutWriter struct{} + +func (s *StdoutWriter) Write(ctx context.Context, topic string, e cloudevents.Event) error { + zap.S().Named("stout_writer").Infow("event wrote", "event", e, "topic", topic) + return nil +} + +func (s *StdoutWriter) Close(_ context.Context) error { + return nil +} diff --git a/internal/service/agent/handler.go b/internal/service/agent/handler.go index 2db5aac..1689c3c 100644 --- a/internal/service/agent/handler.go +++ b/internal/service/agent/handler.go @@ -1,24 +1,31 @@ package service import ( + "bytes" "context" + "encoding/json" "errors" + "io" + api "github.com/kubev2v/migration-planner/api/v1alpha1" agentServer "github.com/kubev2v/migration-planner/internal/api/server/agent" + "github.com/kubev2v/migration-planner/internal/events" "github.com/kubev2v/migration-planner/internal/store" "go.uber.org/zap" ) type AgentServiceHandler struct { - store store.Store + store store.Store + eventWriter *events.EventProducer } // Make sure we conform to servers Service interface var _ agentServer.Service = (*AgentServiceHandler)(nil) -func NewAgentServiceHandler(store store.Store) *AgentServiceHandler { +func NewAgentServiceHandler(store store.Store, ew *events.EventProducer) *AgentServiceHandler { return &AgentServiceHandler{ - store: store, + store: store, + eventWriter: ew, } } @@ -81,6 +88,11 @@ func (h *AgentServiceHandler) ReplaceSourceStatus(ctx context.Context, request a return agentServer.ReplaceSourceStatus500JSONResponse{}, nil } + kind, inventoryEvent := h.newInventoryEvent(request.Id.String(), request.Body.Inventory) + if err := h.eventWriter.Write(ctx, kind, inventoryEvent); err != nil { + zap.S().Named("agent_handler").Errorw("failed to write event", "error", err, "event_kind", kind) + } + return agentServer.ReplaceSourceStatus200JSONResponse(*result), nil } @@ -100,12 +112,19 @@ func (h *AgentServiceHandler) UpdateAgentStatus(ctx context.Context, request age } if agent == nil { - if _, err := h.store.Agent().Create(ctx, *request.Body); err != nil { + a, err := h.store.Agent().Create(ctx, *request.Body) + if err != nil { return agentServer.UpdateAgentStatus400JSONResponse{}, nil } if _, err := store.Commit(ctx); err != nil { return agentServer.UpdateAgentStatus500JSONResponse{}, nil } + + kind, agentEvent := h.newAgentEvent(*a) + if err := h.eventWriter.Write(ctx, kind, agentEvent); err != nil { + zap.S().Named("agent_handler").Errorw("failed to write event", "error", err, "event_kind", kind) + } + return agentServer.UpdateAgentStatus201Response{}, nil } @@ -122,5 +141,34 @@ func (h *AgentServiceHandler) UpdateAgentStatus(ctx context.Context, request age if _, err := store.Commit(ctx); err != nil { return agentServer.UpdateAgentStatus500JSONResponse{}, nil } + + kind, agentEvent := h.newAgentEvent(*agent) + if err := h.eventWriter.Write(ctx, kind, agentEvent); err != nil { + zap.S().Named("agent_handler").Errorw("failed to write event", "error", err, "event_kind", kind) + } + return agentServer.UpdateAgentStatus200Response{}, nil } + +func (h *AgentServiceHandler) newAgentEvent(agent api.Agent) (string, io.Reader) { + event := events.AgentEvent{ + AgentID: agent.Id, + State: string(agent.Status), + StateInfo: agent.StatusInfo, + } + + data, _ := json.Marshal(event) + + return events.AgentMessageKind, bytes.NewReader(data) +} + +func (h *AgentServiceHandler) newInventoryEvent(sourceID string, inventory api.Inventory) (string, io.Reader) { + event := events.InventoryEvent{ + SourceID: sourceID, + Inventory: inventory, + } + + data, _ := json.Marshal(event) + + return events.InventoryMessageKind, bytes.NewReader(data) +} diff --git a/internal/service/agent/handler_test.go b/internal/service/agent/handler_test.go index f4b9b09..8804cad 100644 --- a/internal/service/agent/handler_test.go +++ b/internal/service/agent/handler_test.go @@ -6,11 +6,13 @@ import ( "reflect" "time" + cloudevents "github.com/cloudevents/sdk-go/v2" "github.com/google/uuid" v1alpha1 "github.com/kubev2v/migration-planner/api/v1alpha1" apiAgent "github.com/kubev2v/migration-planner/api/v1alpha1/agent" server "github.com/kubev2v/migration-planner/internal/api/server/agent" "github.com/kubev2v/migration-planner/internal/config" + "github.com/kubev2v/migration-planner/internal/events" service "github.com/kubev2v/migration-planner/internal/service/agent" "github.com/kubev2v/migration-planner/internal/store" . "github.com/onsi/ginkgo/v2" @@ -48,7 +50,8 @@ var _ = Describe("agent store", Ordered, func() { It("successfully creates the agent", func() { agentID := uuid.New() - srv := service.NewAgentServiceHandler(s) + eventWriter := newTestWriter() + srv := service.NewAgentServiceHandler(s, events.NewEventProducer(eventWriter)) resp, err := srv.UpdateAgentStatus(context.TODO(), server.UpdateAgentStatusRequestObject{ Id: agentID, Body: &apiAgent.UpdateAgentStatusJSONRequestBody{ @@ -71,6 +74,10 @@ var _ = Describe("agent store", Ordered, func() { tx = gormdb.Raw(fmt.Sprintf("SELECT status from agents WHERE id = '%s';", agentID)).Scan(&status) Expect(tx.Error).To(BeNil()) Expect(status).To(Equal("waiting-for-credentials")) + + // should find one event + <-time.After(500 * time.Millisecond) + Expect(eventWriter.Messages).To(HaveLen(1)) }) It("successfully updates the agent", func() { @@ -78,7 +85,8 @@ var _ = Describe("agent store", Ordered, func() { tx := gormdb.Exec(fmt.Sprintf(insertAgentStm, agentID, "not-connected", "status-info-1", "cred_url-1")) Expect(tx.Error).To(BeNil()) - srv := service.NewAgentServiceHandler(s) + eventWriter := newTestWriter() + srv := service.NewAgentServiceHandler(s, events.NewEventProducer(eventWriter)) resp, err := srv.UpdateAgentStatus(context.TODO(), server.UpdateAgentStatusRequestObject{ Id: agentID, Body: &apiAgent.UpdateAgentStatusJSONRequestBody{ @@ -101,6 +109,10 @@ var _ = Describe("agent store", Ordered, func() { tx = gormdb.Raw(fmt.Sprintf("SELECT status from agents WHERE id = '%s';", agentID)).Scan(&status) Expect(tx.Error).To(BeNil()) Expect(status).To(Equal("waiting-for-credentials")) + + // should find one event + <-time.After(500 * time.Millisecond) + Expect(eventWriter.Messages).To(HaveLen(1)) }) It("should receive 410 when agent is soft deleted", func() { @@ -108,7 +120,8 @@ var _ = Describe("agent store", Ordered, func() { tx := gormdb.Exec(fmt.Sprintf(insertAgentWithDeletedAtStm, agentID, "not-connected", "status-info-1", "cred_url-1", time.Now().Format(time.RFC3339), time.Now().Format(time.RFC3339), time.Now().Format(time.RFC3339))) Expect(tx.Error).To(BeNil()) - srv := service.NewAgentServiceHandler(s) + eventWriter := newTestWriter() + srv := service.NewAgentServiceHandler(s, events.NewEventProducer(eventWriter)) resp, err := srv.UpdateAgentStatus(context.TODO(), server.UpdateAgentStatusRequestObject{ Id: agentID, Body: &apiAgent.UpdateAgentStatusJSONRequestBody{ @@ -135,7 +148,8 @@ var _ = Describe("agent store", Ordered, func() { Expect(tx.Error).To(BeNil()) sourceID := uuid.New() - srv := service.NewAgentServiceHandler(s) + eventWriter := newTestWriter() + srv := service.NewAgentServiceHandler(s, events.NewEventProducer(eventWriter)) resp, err := srv.ReplaceSourceStatus(context.TODO(), server.ReplaceSourceStatusRequestObject{ Id: sourceID, Body: &apiAgent.SourceStatusUpdate{ @@ -157,6 +171,10 @@ var _ = Describe("agent store", Ordered, func() { Expect(source.Agents).ToNot(BeNil()) Expect(*source.Agents).To(HaveLen(1)) Expect((*source.Agents)[0].Id).To(Equal(agentID)) + + // should have one 1 event only + <-time.After(500 * time.Millisecond) + Expect(eventWriter.Messages).To(HaveLen(1)) }) It("agents not associated with the source are not allowed to update inventory", func() { @@ -165,7 +183,8 @@ var _ = Describe("agent store", Ordered, func() { Expect(tx.Error).To(BeNil()) sourceID := uuid.New() - srv := service.NewAgentServiceHandler(s) + eventWriter := newTestWriter() + srv := service.NewAgentServiceHandler(s, events.NewEventProducer(eventWriter)) resp, err := srv.ReplaceSourceStatus(context.TODO(), server.ReplaceSourceStatusRequestObject{ Id: sourceID, Body: &apiAgent.SourceStatusUpdate{ @@ -176,6 +195,10 @@ var _ = Describe("agent store", Ordered, func() { Expect(err).To(BeNil()) Expect(reflect.TypeOf(resp)).To(Equal(reflect.TypeOf(server.ReplaceSourceStatus200JSONResponse{}))) + // should have 1 event (inventory) + <-time.After(500 * time.Millisecond) + Expect(eventWriter.Messages).To(HaveLen(1)) + // according to the multi source model the agent should be associated with the source agent, err := s.Agent().Get(context.TODO(), agentID.String()) Expect(err).To(BeNil()) @@ -202,6 +225,10 @@ var _ = Describe("agent store", Ordered, func() { }) Expect(err).To(BeNil()) Expect(reflect.TypeOf(resp)).To(Equal(reflect.TypeOf(server.ReplaceSourceStatus400JSONResponse{}))) + + // should have one 1 event only + <-time.After(500 * time.Millisecond) + Expect(eventWriter.Messages).To(HaveLen(1)) }) AfterEach(func() { @@ -209,5 +236,21 @@ var _ = Describe("agent store", Ordered, func() { gormdb.Exec("DELETE FROM sources;") }) }) - }) + +type testwriter struct { + Messages []cloudevents.Event +} + +func newTestWriter() *testwriter { + return &testwriter{Messages: []cloudevents.Event{}} +} + +func (t *testwriter) Write(ctx context.Context, topic string, e cloudevents.Event) error { + t.Messages = append(t.Messages, e) + return nil +} + +func (t *testwriter) Close(_ context.Context) error { + return nil +} diff --git a/internal/service/agent_test.go b/internal/service/agent_test.go index 42d20cd..3959fcc 100644 --- a/internal/service/agent_test.go +++ b/internal/service/agent_test.go @@ -8,6 +8,7 @@ import ( "github.com/google/uuid" "github.com/kubev2v/migration-planner/internal/api/server" "github.com/kubev2v/migration-planner/internal/config" + "github.com/kubev2v/migration-planner/internal/events" "github.com/kubev2v/migration-planner/internal/service" "github.com/kubev2v/migration-planner/internal/store" . "github.com/onsi/ginkgo/v2" @@ -46,7 +47,8 @@ var _ = Describe("agent handler", Ordered, func() { tx = gormdb.Exec(fmt.Sprintf(insertAgentStm, "agent-2", "not-connected", "status-info-2", "cred_url-2")) Expect(tx.Error).To(BeNil()) - srv := service.NewServiceHandler(s) + eventWriter := newTestWriter() + srv := service.NewServiceHandler(s, events.NewEventProducer(eventWriter)) resp, err := srv.ListAgents(context.TODO(), server.ListAgentsRequestObject{}) Expect(err).To(BeNil()) Expect(reflect.TypeOf(resp)).To(Equal(reflect.TypeOf(server.ListAgents200JSONResponse{}))) @@ -64,7 +66,8 @@ var _ = Describe("agent handler", Ordered, func() { tx := gormdb.Exec(fmt.Sprintf(insertAgentStm, agentID, "not-connected", "status-info-1", "cred_url-1")) Expect(tx.Error).To(BeNil()) - srv := service.NewServiceHandler(s) + eventWriter := newTestWriter() + srv := service.NewServiceHandler(s, events.NewEventProducer(eventWriter)) resp, err := srv.DeleteAgent(context.TODO(), server.DeleteAgentRequestObject{Id: agentID}) Expect(err).To(BeNil()) Expect(reflect.TypeOf(resp)).To(Equal(reflect.TypeOf(server.DeleteAgent200JSONResponse{}))) @@ -79,7 +82,8 @@ var _ = Describe("agent handler", Ordered, func() { tx := gormdb.Exec(fmt.Sprintf(insertAgentStm, agentID, "not-connected", "status-info-1", "cred_url-1")) Expect(tx.Error).To(BeNil()) - srv := service.NewServiceHandler(s) + eventWriter := newTestWriter() + srv := service.NewServiceHandler(s, events.NewEventProducer(eventWriter)) resp, err := srv.DeleteAgent(context.TODO(), server.DeleteAgentRequestObject{Id: uuid.New()}) Expect(err).To(BeNil()) Expect(reflect.TypeOf(resp)).To(Equal(reflect.TypeOf(server.DeleteAgent404JSONResponse{}))) @@ -90,7 +94,8 @@ var _ = Describe("agent handler", Ordered, func() { tx := gormdb.Exec(fmt.Sprintf(insertAssociatedAgentStm, agentID)) Expect(tx.Error).To(BeNil()) - srv := service.NewServiceHandler(s) + eventWriter := newTestWriter() + srv := service.NewServiceHandler(s, events.NewEventProducer(eventWriter)) resp, err := srv.DeleteAgent(context.TODO(), server.DeleteAgentRequestObject{Id: agentID}) Expect(err).To(BeNil()) Expect(reflect.TypeOf(resp)).To(Equal(reflect.TypeOf(server.DeleteAgent400JSONResponse{}))) diff --git a/internal/service/handler.go b/internal/service/handler.go index b2b9a4b..f6f3e90 100644 --- a/internal/service/handler.go +++ b/internal/service/handler.go @@ -2,18 +2,21 @@ package service import ( "github.com/kubev2v/migration-planner/internal/api/server" + "github.com/kubev2v/migration-planner/internal/events" "github.com/kubev2v/migration-planner/internal/store" ) type ServiceHandler struct { - store store.Store + store store.Store + eventWriter *events.EventProducer } // Make sure we conform to servers Service interface var _ server.Service = (*ServiceHandler)(nil) -func NewServiceHandler(store store.Store) *ServiceHandler { +func NewServiceHandler(store store.Store, ew *events.EventProducer) *ServiceHandler { return &ServiceHandler{ - store: store, + store: store, + eventWriter: ew, } } diff --git a/internal/service/source_test.go b/internal/service/source_test.go index 3f71b76..128d0ab 100644 --- a/internal/service/source_test.go +++ b/internal/service/source_test.go @@ -5,9 +5,11 @@ import ( "fmt" "reflect" + cloudevents "github.com/cloudevents/sdk-go/v2" "github.com/google/uuid" "github.com/kubev2v/migration-planner/internal/api/server" "github.com/kubev2v/migration-planner/internal/config" + "github.com/kubev2v/migration-planner/internal/events" "github.com/kubev2v/migration-planner/internal/service" "github.com/kubev2v/migration-planner/internal/store" . "github.com/onsi/ginkgo/v2" @@ -46,7 +48,8 @@ var _ = Describe("source handler", Ordered, func() { tx = gormdb.Exec(fmt.Sprintf(insertSourceStm, uuid.NewString())) Expect(tx.Error).To(BeNil()) - srv := service.NewServiceHandler(s) + eventWriter := newTestWriter() + srv := service.NewServiceHandler(s, events.NewEventProducer(eventWriter)) resp, err := srv.ListSources(context.TODO(), server.ListSourcesRequestObject{}) Expect(err).To(BeNil()) Expect(reflect.TypeOf(resp)).To(Equal(reflect.TypeOf(server.ListSources200JSONResponse{}))) @@ -67,7 +70,8 @@ var _ = Describe("source handler", Ordered, func() { tx = gormdb.Exec(fmt.Sprintf(insertSourceStm, uuid.NewString())) Expect(tx.Error).To(BeNil()) - srv := service.NewServiceHandler(s) + eventWriter := newTestWriter() + srv := service.NewServiceHandler(s, events.NewEventProducer(eventWriter)) resp, err := srv.ReadSource(context.TODO(), server.ReadSourceRequestObject{Id: firstSource}) Expect(err).To(BeNil()) Expect(reflect.TypeOf(resp)).To(Equal(reflect.TypeOf(server.ReadSource200JSONResponse{}))) @@ -79,7 +83,8 @@ var _ = Describe("source handler", Ordered, func() { tx = gormdb.Exec(fmt.Sprintf(insertSourceStm, uuid.NewString())) Expect(tx.Error).To(BeNil()) - srv := service.NewServiceHandler(s) + eventWriter := newTestWriter() + srv := service.NewServiceHandler(s, events.NewEventProducer(eventWriter)) resp, err := srv.ReadSource(context.TODO(), server.ReadSourceRequestObject{Id: uuid.New()}) Expect(err).To(BeNil()) Expect(reflect.TypeOf(resp)).To(Equal(reflect.TypeOf(server.ReadSource404JSONResponse{}))) @@ -98,7 +103,8 @@ var _ = Describe("source handler", Ordered, func() { tx = gormdb.Exec(fmt.Sprintf(insertSourceStm, uuid.NewString())) Expect(tx.Error).To(BeNil()) - srv := service.NewServiceHandler(s) + eventWriter := newTestWriter() + srv := service.NewServiceHandler(s, events.NewEventProducer(eventWriter)) _, err := srv.DeleteSources(context.TODO(), server.DeleteSourcesRequestObject{}) Expect(err).To(BeNil()) @@ -117,7 +123,8 @@ var _ = Describe("source handler", Ordered, func() { tx = gormdb.Exec(fmt.Sprintf(insertAgentWithSourceStm, agent.String(), source.String())) Expect(tx.Error).To(BeNil()) - srv := service.NewServiceHandler(s) + eventWriter := newTestWriter() + srv := service.NewServiceHandler(s, events.NewEventProducer(eventWriter)) _, err := srv.DeleteSource(context.TODO(), server.DeleteSourceRequestObject{Id: source}) Expect(err).To(BeNil()) @@ -143,3 +150,20 @@ var _ = Describe("source handler", Ordered, func() { }) }) }) + +type testwriter struct { + Messages []cloudevents.Event +} + +func newTestWriter() *testwriter { + return &testwriter{Messages: []cloudevents.Event{}} +} + +func (t *testwriter) Write(ctx context.Context, topic string, e cloudevents.Event) error { + t.Messages = append(t.Messages, e) + return nil +} + +func (t *testwriter) Close(_ context.Context) error { + return nil +}