-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathconnection.go
36 lines (28 loc) · 997 Bytes
/
connection.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package kafka
import (
"context"
"github.com/smarty/messaging/v4"
)
type defaultConnection struct {
config configuration
lifecycle context.Context
cancel context.CancelFunc
}
func newConnection(config configuration, parent context.Context) messaging.Connection {
this := &defaultConnection{config: config} // pointer because defaultConnection needs to be comparable for connection pool
this.lifecycle, this.cancel = context.WithCancel(parent)
return this
}
func (this *defaultConnection) Reader(_ context.Context) (messaging.Reader, error) {
return newReader(this.config, this.lifecycle), nil
}
func (this *defaultConnection) Writer(_ context.Context) (messaging.Writer, error) {
return newWriter(this.config, this.lifecycle, false), nil
}
func (this *defaultConnection) CommitWriter(_ context.Context) (messaging.CommitWriter, error) {
return newWriter(this.config, this.lifecycle, true), nil
}
func (this *defaultConnection) Close() error {
this.cancel()
return nil
}