-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathsimple.go
119 lines (104 loc) · 3.5 KB
/
simple.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
package simple
import (
"context"
"errors"
"fmt"
"strings"
"github.com/mainflux/mainflux/logger"
"github.com/mainflux/mproxy/pkg/session"
)
var errSessionMissing = errors.New("session is missing")
var _ session.Handler = (*Handler)(nil)
// Handler implements mqtt.Handler interface
type Handler struct {
logger logger.Logger
}
// New creates new Event entity
func New(logger logger.Logger) *Handler {
return &Handler{
logger: logger,
}
}
// AuthConnect is called on device connection,
// prior forwarding to the MQTT broker
func (h *Handler) AuthConnect(ctx context.Context) error {
s, ok := session.FromContext(ctx)
if !ok {
h.logger.Error(errSessionMissing.Error())
return errSessionMissing
}
h.logger.Info(fmt.Sprintf("AuthConnect() - sessionID: %s, username: %s, password: %s, client_CN: %s", s.ID, s.Username, string(s.Password), s.Cert.Subject.CommonName))
return nil
}
// AuthPublish is called on device publish,
// prior forwarding to the MQTT broker
func (h *Handler) AuthPublish(ctx context.Context, topic *string, payload *[]byte) error {
s, ok := session.FromContext(ctx)
if !ok {
h.logger.Error(errSessionMissing.Error())
return errSessionMissing
}
h.logger.Info(fmt.Sprintf("AuthPublish() - sessionID: %s, topic: %s, payload: %s", s.ID, *topic, string(*payload)))
return nil
}
// AuthSubscribe is called on device publish,
// prior forwarding to the MQTT broker
func (h *Handler) AuthSubscribe(ctx context.Context, topics *[]string) error {
s, ok := session.FromContext(ctx)
if !ok {
h.logger.Error(errSessionMissing.Error())
return errSessionMissing
}
h.logger.Info(fmt.Sprintf("AuthSubscribe() - sessionID: %s, topics: %s", s.ID, strings.Join(*topics, ",")))
return nil
}
// Connect - after client successfully connected
func (h *Handler) Connect(ctx context.Context) error {
s, ok := session.FromContext(ctx)
if !ok {
h.logger.Error(errSessionMissing.Error())
return errSessionMissing
}
h.logger.Info(fmt.Sprintf("Connect() - username: %s, sessionID: %s", s.Username, s.ID))
return nil
}
// Publish - after client successfully published
func (h *Handler) Publish(ctx context.Context, topic *string, payload *[]byte) error {
s, ok := session.FromContext(ctx)
if !ok {
h.logger.Error(errSessionMissing.Error())
return errSessionMissing
}
h.logger.Info(fmt.Sprintf("Publish() - username: %s, sessionID: %s, topic: %s, payload: %s", s.Username, s.ID, *topic, string(*payload)))
return nil
}
// Subscribe - after client successfully subscribed
func (h *Handler) Subscribe(ctx context.Context, topics *[]string) error {
s, ok := session.FromContext(ctx)
if !ok {
h.logger.Error(errSessionMissing.Error())
return errSessionMissing
}
h.logger.Info(fmt.Sprintf("Subscribe() - username: %s, sessionID: %s, topics: %s", s.Username, s.ID, strings.Join(*topics, ",")))
return nil
}
// Unsubscribe - after client unsubscribed
func (h *Handler) Unsubscribe(ctx context.Context, topics *[]string) error {
s, ok := session.FromContext(ctx)
if !ok {
h.logger.Error(errSessionMissing.Error())
return errSessionMissing
}
h.logger.Info(fmt.Sprintf("Unsubscribe() - username: %s, sessionID: %s, topics: %s", s.Username, s.ID, strings.Join(*topics, ",")))
return nil
}
// Disconnect on connection lost
func (h *Handler) Disconnect(ctx context.Context) error {
s, ok := session.FromContext(ctx)
if !ok {
h.logger.Error(errSessionMissing.Error())
return errSessionMissing
}
h.logger.Info(fmt.Sprintf("Disconnect() - client with username: %s and ID: %s disconnected", s.Username, s.ID))
return nil
}