From f60e81b0c92a2806f7155a8f5c2c62344f238445 Mon Sep 17 00:00:00 2001 From: Parham Alvani Date: Mon, 1 Jan 2024 20:00:44 +0000 Subject: [PATCH] feat: add handlers for nats connection states --- internal/infra/cmq/cmq.go | 51 +++++++++++++++++++++++++++++---------- 1 file changed, 38 insertions(+), 13 deletions(-) diff --git a/internal/infra/cmq/cmq.go b/internal/infra/cmq/cmq.go index fe61884..3ba9be3 100644 --- a/internal/infra/cmq/cmq.go +++ b/internal/infra/cmq/cmq.go @@ -21,37 +21,62 @@ type CMQ struct { } func Provide(lc fx.Lifecycle, cfg Config, logger *zap.Logger) (*CMQ, error) { + cmq := &CMQ{ + nats: nil, + logger: logger, + jetstream: nil, + } + nc, err := nats.Connect(cfg.URL) if err != nil { return nil, fmt.Errorf("nats connection failed %w", err) } + cmq.nats = nc + logger.Info("nats connection successful", zap.String("connected-addr", nc.ConnectedAddr()), zap.Strings("discovered-servers", nc.DiscoveredServers())) - nc.SetDisconnectErrHandler(func(_ *nats.Conn, err error) { - logger.Fatal("nats disconnected", zap.Error(err)) - }) - - nc.SetReconnectHandler(func(_ *nats.Conn) { - logger.Warn("nats reconnected") - }) + nc.SetDisconnectErrHandler(cmq.disconnectHandler) + nc.SetClosedHandler(cmq.closeHandler) + nc.SetReconnectHandler(cmq.reconnectHandler) js, err := jetstream.New(nc) if err != nil { - return nil, fmt.Errorf("jetstream context extraction failed %w", err) + return nil, fmt.Errorf("jetstream creation failed %w", err) } + cmq.jetstream = js + lc.Append( fx.StopHook(func() { nc.Close() }), ) - return &CMQ{ - nats: nc, - logger: logger, - jetstream: js, - }, nil + return cmq, nil +} + +// close handler is called when the connection is closed which means +// we are not going to retry again for getting a live connection +// to server anymore. +func (c *CMQ) closeHandler(nc *nats.Conn) { + c.logger.Fatal("connection closed", + zap.Strings("urls", nc.DiscoveredServers()), + zap.Error(nc.LastError()), + ) +} + +// disconnection handler is called when we lost connection +// and we are going to retry, so we may get connected in the future. +func (c *CMQ) disconnectHandler(nc *nats.Conn, err error) { + c.logger.Error("got disconnected", + zap.Strings("urls", nc.DiscoveredServers()), + zap.Error(err), + ) +} + +func (c *CMQ) reconnectHandler(nc *nats.Conn) { + c.logger.Info("got reconnected", zap.String("url", nc.ConnectedUrl())) } // Streams creates required streams on jetstream.