Skip to content

Commit

Permalink
Use MQTT client and configure timeout (#20)
Browse files Browse the repository at this point in the history
* Use Mqtt client instead of http

* Do no check for http error

* Better logging + get timeout from config

* Use mqtt client and configure timeout
  • Loading branch information
miguelreiswildlife authored Apr 25, 2024
1 parent 2f0d0ac commit 2cca100
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 24 deletions.
25 changes: 8 additions & 17 deletions api/sendmqtt.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (

"github.com/labstack/echo"
log "github.com/sirupsen/logrus"
"github.com/topfreegames/arkadiko/httpclient"
)

// SendMqttHandler is the handler responsible for sending messages to mqtt
Expand Down Expand Up @@ -52,6 +51,7 @@ func SendMqttHandler(app *App) func(c echo.Context) error {
return FailWith(400, err.Error(), c)
}

// Default should_moderate to false so messages sent from the server side are not moderated
if _, exists := msgPayload["should_moderate"]; !exists {
msgPayload["should_moderate"] = false
}
Expand All @@ -75,34 +75,24 @@ func SendMqttHandler(app *App) func(c echo.Context) error {
})

var mqttLatency time.Duration
var beforeMqttTime, afterMqttTime time.Time
var beforeMqttTime time.Time

err = WithSegment("mqtt", c, func() error {
beforeMqttTime = time.Now()
httpError := app.HttpClient.SendMessage(
c.Request().Context(), topic, string(b), retained,
)
afterMqttTime = time.Now()
return httpError
sendMqttErr := app.MqttClient.PublishMessage(c.Request().Context(), topic, string(b), retained)
mqttLatency = time.Now().Sub(beforeMqttTime)

return sendMqttErr
})

status := 200
if err != nil {
lg.WithError(err).Error("failed to send mqtt message")
status = 500
if e, ok := err.(*httpclient.HTTPError); ok {
status = e.StatusCode
}
}
tags := []string{
fmt.Sprintf("error:%t", err != nil),
fmt.Sprintf("status:%d", status),
fmt.Sprintf("retained:%t", retained),
}
if source != "" {
tags = append(tags, fmt.Sprintf("requestor:%s", source))
}
mqttLatency = afterMqttTime.Sub(beforeMqttTime)

app.DDStatsD.Timing("mqtt_latency", mqttLatency, tags...)
lg = lg.WithField("mqttLatency", mqttLatency.Nanoseconds())
lg.Debug("sent mqtt message")
Expand All @@ -111,6 +101,7 @@ func SendMqttHandler(app *App) func(c echo.Context) error {
c.Set("retained", retained)

if err != nil {
lg.WithError(err).Error("failed to send mqtt message")
return FailWith(500, err.Error(), c)
}
return c.String(http.StatusOK, workingString)
Expand Down
25 changes: 18 additions & 7 deletions mqttclient/mqttclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
type MqttClient struct {
MqttServerHost string
MqttServerPort int
Timeout time.Duration
ConfigPath string
Config *viper.Viper
Logger log.FieldLogger
Expand Down Expand Up @@ -62,30 +63,38 @@ func GetMqttClient(configPath string, onConnectHandler mqtt.OnConnectHandler, l

// SendMessage sends the message with the given payload to topic
func (mc *MqttClient) SendMessage(ctx context.Context, topic string, message string) error {
return mc.publishMessage(ctx, topic, message, false)
return mc.PublishMessage(ctx, topic, message, false)
}

// SendRetainedMessage sends the message with the given payload to topic
func (mc *MqttClient) SendRetainedMessage(ctx context.Context, topic string, message string) error {
return mc.publishMessage(ctx, topic, message, true)
return mc.PublishMessage(ctx, topic, message, true)
}

func (mc *MqttClient) publishMessage(ctx context.Context, topic string, message string, retained bool) error {
func (mc *MqttClient) PublishMessage(ctx context.Context, topic string, message string, retained bool) error {
l := mc.Logger.WithFields(
log.Fields{
"method": "PublishMessage",
"topic": topic,
"message": message,
"retained": retained,
},
)
token := mc.MqttClient.WithContext(ctx).Publish(topic, 2, retained, message)
result := token.WaitTimeout(time.Second * 5)
result := token.WaitTimeout(mc.Timeout)

if result == false || token.Error() != nil {
err := token.Error()
if err == nil {
err = errors.New("timeoutError")
}
mc.Logger.WithError(err).Error()
l.WithError(err).Error("Error publishing message to mqtt")
return err
}
return nil
}

//WaitForConnection to mqtt server
// WaitForConnection to mqtt server
func (mc *MqttClient) WaitForConnection(timeout int) error {
start := time.Now()
timedOut := func() bool {
Expand All @@ -102,7 +111,7 @@ func (mc *MqttClient) WaitForConnection(timeout int) error {
}

func (mc *MqttClient) configure(l log.FieldLogger) {
mc.Logger = l
mc.Logger = l.WithField("source", "MqttClient")

mc.setConfigurationDefaults()
mc.loadConfiguration()
Expand All @@ -115,6 +124,7 @@ func (mc *MqttClient) setConfigurationDefaults() {
mc.Config.SetDefault("mqttserver.user", "admin")
mc.Config.SetDefault("mqttserver.pass", "admin")
mc.Config.SetDefault("mqttserver.ca_cert_file", "")
mc.Config.SetDefault("mqttserver.timeout", 5*time.Second)
}

func (mc *MqttClient) loadConfiguration() {
Expand All @@ -135,6 +145,7 @@ func (mc *MqttClient) loadConfiguration() {
func (mc *MqttClient) configureClient() {
mc.MqttServerHost = mc.Config.GetString("mqttserver.host")
mc.MqttServerPort = mc.Config.GetInt("mqttserver.port")
mc.Timeout = mc.Config.GetDuration("mqttserver.timeout")
}

func (mc *MqttClient) start(onConnectHandler mqtt.OnConnectHandler) {
Expand Down

0 comments on commit 2cca100

Please sign in to comment.