Skip to content

Commit

Permalink
feat: add handlers for nats connection states
Browse files Browse the repository at this point in the history
  • Loading branch information
1995parham committed Jan 1, 2024
1 parent a70cf70 commit f60e81b
Showing 1 changed file with 38 additions and 13 deletions.
51 changes: 38 additions & 13 deletions internal/infra/cmq/cmq.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,37 +21,62 @@ type CMQ struct {
}

func Provide(lc fx.Lifecycle, cfg Config, logger *zap.Logger) (*CMQ, error) {
cmq := &CMQ{
nats: nil,
logger: logger,
jetstream: nil,
}

nc, err := nats.Connect(cfg.URL)
if err != nil {
return nil, fmt.Errorf("nats connection failed %w", err)
}

cmq.nats = nc

logger.Info("nats connection successful",
zap.String("connected-addr", nc.ConnectedAddr()),
zap.Strings("discovered-servers", nc.DiscoveredServers()))

nc.SetDisconnectErrHandler(func(_ *nats.Conn, err error) {
logger.Fatal("nats disconnected", zap.Error(err))
})

nc.SetReconnectHandler(func(_ *nats.Conn) {
logger.Warn("nats reconnected")
})
nc.SetDisconnectErrHandler(cmq.disconnectHandler)
nc.SetClosedHandler(cmq.closeHandler)
nc.SetReconnectHandler(cmq.reconnectHandler)

js, err := jetstream.New(nc)
if err != nil {
return nil, fmt.Errorf("jetstream context extraction failed %w", err)
return nil, fmt.Errorf("jetstream creation failed %w", err)
}

cmq.jetstream = js

lc.Append(
fx.StopHook(func() { nc.Close() }),
)

return &CMQ{
nats: nc,
logger: logger,
jetstream: js,
}, nil
return cmq, nil
}

// close handler is called when the connection is closed which means
// we are not going to retry again for getting a live connection
// to server anymore.
func (c *CMQ) closeHandler(nc *nats.Conn) {
c.logger.Fatal("connection closed",
zap.Strings("urls", nc.DiscoveredServers()),
zap.Error(nc.LastError()),
)
}

// disconnection handler is called when we lost connection
// and we are going to retry, so we may get connected in the future.
func (c *CMQ) disconnectHandler(nc *nats.Conn, err error) {
c.logger.Error("got disconnected",
zap.Strings("urls", nc.DiscoveredServers()),
zap.Error(err),
)
}

func (c *CMQ) reconnectHandler(nc *nats.Conn) {
c.logger.Info("got reconnected", zap.String("url", nc.ConnectedUrl()))
}

// Streams creates required streams on jetstream.
Expand Down

0 comments on commit f60e81b

Please sign in to comment.