Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

eventProducer: Implement event producer #94

Merged
merged 1 commit into from
Dec 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 37 additions & 2 deletions cmd/planner-api/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,12 @@ import (
"os/signal"
"syscall"

"github.com/IBM/sarama"
pkgKafka "github.com/kubev2v/migration-event-streamer/pkg/kafka"
apiserver "github.com/kubev2v/migration-planner/internal/api_server"
"github.com/kubev2v/migration-planner/internal/api_server/agentserver"
"github.com/kubev2v/migration-planner/internal/config"
"github.com/kubev2v/migration-planner/internal/events"
"github.com/kubev2v/migration-planner/internal/store"
"github.com/kubev2v/migration-planner/pkg/log"
"github.com/spf13/cobra"
Expand Down Expand Up @@ -56,6 +59,9 @@ var runCmd = &cobra.Command{
zap.S().Fatalf("running initial migration: %v", err)
}

// initilize event writer
ep, _ := getEventProducer(cfg)

ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGHUP, syscall.SIGTERM, syscall.SIGQUIT)
go func() {
defer cancel()
Expand All @@ -64,7 +70,7 @@ var runCmd = &cobra.Command{
zap.S().Fatalf("creating listener: %s", err)
}

server := apiserver.New(cfg, store, listener)
server := apiserver.New(cfg, store, ep, listener)
if err := server.Run(ctx); err != nil {
zap.S().Fatalf("Error running server: %s", err)
}
Expand All @@ -77,13 +83,15 @@ var runCmd = &cobra.Command{
zap.S().Fatalf("creating listener: %s", err)
}

agentserver := agentserver.New(cfg, store, listener)
agentserver := agentserver.New(cfg, store, ep, listener)
if err := agentserver.Run(ctx); err != nil {
zap.S().Fatalf("Error running server: %s", err)
}
}()

<-ctx.Done()
_ = ep.Close()

return nil
},
}
Expand All @@ -94,3 +102,30 @@ func newListener(address string) (net.Listener, error) {
}
return net.Listen("tcp", address)
}

func getEventProducer(cfg *config.Config) (*events.EventProducer, error) {
if len(cfg.Service.Kafka.Brokers) == 0 {
stdWriter := &events.StdoutWriter{}
ew := events.NewEventProducer(stdWriter)
return ew, nil
}

saramaConfig := sarama.NewConfig()
if cfg.Service.Kafka.SaramaConfig != nil {
saramaConfig = cfg.Service.Kafka.SaramaConfig
}
saramaConfig.Version = sarama.V3_6_0_0

kp, err := pkgKafka.NewKafkaProducer(cfg.Service.Kafka.Brokers, saramaConfig)
if err != nil {
return nil, err
}

zap.S().Named("planner-api").Infof("connected to kafka: %v", cfg.Service.Kafka.Brokers)

if cfg.Service.Kafka.Topic != "" {
return events.NewEventProducer(kp, events.WithOutputTopic(cfg.Service.Kafka.Topic)), nil
}

return events.NewEventProducer(kp), nil
}
39 changes: 28 additions & 11 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
module github.com/kubev2v/migration-planner

go 1.21.11
go 1.22.2
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this part of registry.access.redhat.com/ubi9/go-toolset already?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not me. maybe go mod.


require (
github.com/IBM/sarama v1.43.3
github.com/cloudevents/sdk-go/v2 v2.15.2
github.com/coreos/butane v0.22.0
github.com/getkin/kin-openapi v0.126.0
github.com/go-chi/chi v1.5.5
github.com/go-chi/chi/v5 v5.1.0
github.com/go-chi/render v1.0.3
github.com/google/uuid v1.6.0
github.com/konveyor/forklift-controller v0.0.0-20221102112227-e73b65a01cda
github.com/kubev2v/migration-event-streamer v0.0.0-20241125102656-9cdf9e64a16b
github.com/leosunmo/zapchi v0.2.0
github.com/libvirt/libvirt-go v7.4.0+incompatible
github.com/lthibault/jitterbug v2.0.0+incompatible
Expand All @@ -24,6 +27,7 @@ require (
github.com/thoas/go-funk v0.9.3
github.com/vmware/govmomi v0.39.0
go.uber.org/zap v1.26.0
golang.org/x/sync v0.8.0
gorm.io/driver/postgres v1.5.9
gorm.io/driver/sqlite v1.5.6
gorm.io/gorm v1.25.11
Expand All @@ -42,13 +46,17 @@ require (
github.com/chenzhuoyu/base64x v0.0.0-20230717121745-296ad89f973d // indirect
github.com/chenzhuoyu/iasm v0.9.0 // indirect
github.com/clarketm/json v1.17.1 // indirect
github.com/cloudevents/sdk-go/protocol/kafka_sarama/v2 v2.15.2 // indirect
github.com/coreos/go-json v0.0.0-20230131223807-18775e0fb4fb // indirect
github.com/coreos/go-semver v0.3.1 // indirect
github.com/coreos/go-systemd/v22 v22.5.0 // indirect
github.com/coreos/ignition/v2 v2.18.0 // indirect
github.com/coreos/vcontext v0.0.0-20230201181013-d72178a18687 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/diskfs/go-diskfs v1.4.0 // indirect
github.com/eapache/go-resiliency v1.7.0 // indirect
github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect
github.com/eapache/queue v1.1.0 // indirect
github.com/elliotwutingfeng/asciiset v0.0.0-20230602022725-51bbb787efab // indirect
github.com/emicklei/go-restful/v3 v3.11.0 // indirect
github.com/evanphx/json-patch/v5 v5.6.0 // indirect
Expand All @@ -69,24 +77,34 @@ require (
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/mock v1.6.0 // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/gnostic-models v0.6.8 // indirect
github.com/google/go-cmp v0.6.0 // indirect
github.com/google/gofuzz v1.2.0 // indirect
github.com/google/pprof v0.0.0-20211214055906-6f57359322fd // indirect
github.com/gorilla/mux v1.8.1 // indirect
github.com/gorilla/websocket v1.5.1 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/go-uuid v1.0.3 // indirect
github.com/imdario/mergo v0.3.7 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/invopop/yaml v0.3.1 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
github.com/jackc/pgx/v5 v5.5.5 // indirect
github.com/jackc/puddle/v2 v2.2.1 // indirect
github.com/jcmturner/aescts/v2 v2.0.0 // indirect
github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect
github.com/jcmturner/gofork v1.7.6 // indirect
github.com/jcmturner/gokrb5/v8 v8.4.4 // indirect
github.com/jcmturner/rpc/v2 v2.0.3 // indirect
github.com/jinzhu/inflection v1.0.0 // indirect
github.com/jinzhu/now v1.1.5 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/k8snetworkplumbingwg/network-attachment-definition-client v1.4.0 // indirect
github.com/klauspost/compress v1.17.9 // indirect
github.com/klauspost/cpuid/v2 v2.2.5 // indirect
github.com/leodido/go-urn v1.2.4 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
Expand All @@ -99,31 +117,30 @@ require (
github.com/openshift/api v0.0.0-20230613151523-ba04973d3ed1 // indirect
github.com/openshift/custom-resource-status v1.1.2 // indirect
github.com/pborman/uuid v1.2.1 // indirect
github.com/pelletier/go-toml/v2 v2.1.0 // indirect
github.com/pelletier/go-toml/v2 v2.2.2 // indirect
github.com/perimeterx/marshmallow v1.1.5 // indirect
github.com/pierrec/lz4/v4 v4.1.17 // indirect
github.com/pierrec/lz4/v4 v4.1.21 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pkg/xattr v0.4.9 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect
github.com/stretchr/testify v1.9.0 // indirect
github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
github.com/ugorji/go/codec v1.2.11 // indirect
github.com/ulikunitz/xz v0.5.11 // indirect
github.com/vincent-petithory/dataurl v1.0.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/arch v0.4.0 // indirect
golang.org/x/crypto v0.23.0 // indirect
golang.org/x/exp v0.0.0-20230905200255-921286631fa9 // indirect
golang.org/x/net v0.25.0 // indirect
golang.org/x/crypto v0.26.0 // indirect
golang.org/x/net v0.28.0 // indirect
golang.org/x/oauth2 v0.17.0 // indirect
golang.org/x/sync v0.7.0 // indirect
golang.org/x/sys v0.20.0 // indirect
golang.org/x/term v0.20.0 // indirect
golang.org/x/text v0.16.0 // indirect
golang.org/x/sys v0.23.0 // indirect
golang.org/x/term v0.23.0 // indirect
golang.org/x/text v0.17.0 // indirect
golang.org/x/time v0.5.0 // indirect
golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d // indirect
google.golang.org/appengine v1.6.8 // indirect
google.golang.org/protobuf v1.33.0 // indirect
google.golang.org/protobuf v1.34.2 // indirect
gopkg.in/djherbis/times.v1 v1.3.0 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
Expand Down
Loading
Loading