Skip to content

Commit

Permalink
Remove closed sessions and close their conn to reduce ram usage
Browse files Browse the repository at this point in the history
Signed-off-by: Yilun <[email protected]>
  • Loading branch information
yilunzhang committed Aug 22, 2020
1 parent fba43ee commit d3f32ab
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 75 deletions.
180 changes: 105 additions & 75 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,15 @@ import (
ncp "github.com/nknorg/ncp-go"
nkn "github.com/nknorg/nkn-sdk-go"
"github.com/nknorg/tuna"
gocache "github.com/patrickmn/go-cache"
)

const (
DefaultSessionAllowAddr = nkn.DefaultSessionAllowAddr
SessionIDSize = nkn.SessionIDSize
acceptSessionBufSize = 128
DefaultSessionAllowAddr = nkn.DefaultSessionAllowAddr
SessionIDSize = nkn.SessionIDSize
acceptSessionBufSize = 128
closedSessionKeyExpiration = 5 * time.Minute
closedSessionKeyCleanupInterval = time.Minute
)

type TunaSessionClient struct {
Expand All @@ -36,14 +39,15 @@ type TunaSessionClient struct {
onClose chan struct{}

sync.RWMutex
listeners []net.Listener
tunaExits []*tuna.TunaExit
acceptAddrs []*regexp.Regexp
sessions map[string]*ncp.Session
sessionConns map[string]map[string]*Conn
sharedKeys map[string]*[sharedKeySize]byte
connCount map[string]int
isClosed bool
listeners []net.Listener
tunaExits []*tuna.TunaExit
acceptAddrs []*regexp.Regexp
sessions map[string]*ncp.Session
sessionConns map[string]map[string]*Conn
sharedKeys map[string]*[sharedKeySize]byte
connCount map[string]int
closedSessionKey *gocache.Cache
isClosed bool
}

func NewTunaSessionClient(clientAccount *nkn.Account, m *nkn.MultiClient, wallet *nkn.Wallet, config *Config) (*TunaSessionClient, error) {
Expand All @@ -53,18 +57,21 @@ func NewTunaSessionClient(clientAccount *nkn.Account, m *nkn.MultiClient, wallet
}

c := &TunaSessionClient{
config: config,
clientAccount: clientAccount,
multiClient: m,
wallet: wallet,
addr: m.Addr(),
acceptSession: make(chan *ncp.Session, acceptSessionBufSize),
onClose: make(chan struct{}, 0),
sessions: make(map[string]*ncp.Session),
sessionConns: make(map[string]map[string]*Conn),
sharedKeys: make(map[string]*[sharedKeySize]byte),
connCount: make(map[string]int),
}
config: config,
clientAccount: clientAccount,
multiClient: m,
wallet: wallet,
addr: m.Addr(),
acceptSession: make(chan *ncp.Session, acceptSessionBufSize),
onClose: make(chan struct{}, 0),
sessions: make(map[string]*ncp.Session),
sessionConns: make(map[string]map[string]*Conn),
sharedKeys: make(map[string]*[sharedKeySize]byte),
connCount: make(map[string]int),
closedSessionKey: gocache.New(closedSessionKeyExpiration, closedSessionKeyCleanupInterval),
}

go c.removeClosedSessions()

return c, nil
}
Expand Down Expand Up @@ -270,12 +277,17 @@ func (c *TunaSessionClient) listenNet(i int) {
return
}

sessionKey := sessionKey(remoteAddr, sessionID)
sessKey := sessionKey(remoteAddr, sessionID)

c.Lock()
sess, ok := c.sessions[sessionKey]
sess, ok := c.sessions[sessKey]
if !ok {
if !c.shouldAcceptAddr(remoteAddr) {
c.Unlock()
return
}
if _, ok := c.closedSessionKey.Get(sessKey); ok {
c.Unlock()
return
}
connIDs := make([]string, c.config.NumTunaListeners)
Expand All @@ -287,10 +299,10 @@ func (c *TunaSessionClient) listenNet(i int) {
c.Unlock()
return
}
c.sessions[sessionKey] = sess
c.sessionConns[sessionKey] = make(map[string]*Conn, c.config.NumTunaListeners)
c.sessions[sessKey] = sess
c.sessionConns[sessKey] = make(map[string]*Conn, c.config.NumTunaListeners)
}
c.sessionConns[sessionKey][connID(i)] = conn
c.sessionConns[sessKey][connID(i)] = conn
c.Unlock()

if !ok {
Expand All @@ -306,23 +318,7 @@ func (c *TunaSessionClient) listenNet(i int) {
}
}

c.Lock()
c.connCount[sessionKey]++
c.Unlock()

c.handleConn(conn, sess, i)

c.Lock()
c.connCount[sessionKey]--
shouldClose := c.connCount[sessionKey] == 0
if shouldClose {
delete(c.connCount, sessionKey)
}
c.Unlock()

if shouldClose {
sess.Close()
}
c.handleConn(conn, sessKey, i)
}(conn)
}
}
Expand Down Expand Up @@ -471,38 +467,22 @@ func (c *TunaSessionClient) DialWithConfig(remoteAddr string, config *DialConfig
}
}

sessionKey := sessionKey(remoteAddr, sessionID)
sessKey := sessionKey(remoteAddr, sessionID)
sess, err := c.newSession(remoteAddr, sessionID, connIDs, config.SessionConfig)
if err != nil {
return nil, err
}

c.Lock()
c.sessions[sessionKey] = sess
c.sessionConns[sessionKey] = conns
c.sessions[sessKey] = sess
c.sessionConns[sessKey] = conns
c.Unlock()

for i := 0; i < len(pubAddrs.Addrs); i++ {
if conn, ok := conns[connID(i)]; ok {
go func(conn *Conn, i int) {
c.Lock()
c.connCount[sessionKey]++
c.Unlock()

c.handleConn(conn, sess, i)
conn.Close()

c.Lock()
c.connCount[sessionKey]--
shouldClose := c.connCount[sessionKey] == 0
if shouldClose {
delete(c.connCount, sessionKey)
}
c.Unlock()

if shouldClose {
sess.Close()
}
defer conn.Close()
c.handleConn(conn, sessKey, i)
}(conn, i)
}
}
Expand All @@ -521,7 +501,7 @@ func (c *TunaSessionClient) AcceptSession() (*ncp.Session, error) {
case session := <-c.acceptSession:
err := session.Accept()
if err != nil {
log.Println(err)
log.Println("Accept error:", err)
continue
}
return session, nil
Expand All @@ -547,13 +527,13 @@ func (c *TunaSessionClient) Close() error {

err := c.multiClient.Close()
if err != nil {
log.Println(err)
log.Println("MultiClient close error:", err)
}

for _, listener := range c.listeners {
err := listener.Close()
if err != nil {
log.Println(err)
log.Println("Listener close error:", err)
continue
}
}
Expand All @@ -562,7 +542,7 @@ func (c *TunaSessionClient) Close() error {
if !sess.IsClosed() {
err := sess.Close()
if err != nil {
log.Println(err)
log.Println("Session close error:", err)
continue
}
}
Expand All @@ -572,7 +552,7 @@ func (c *TunaSessionClient) Close() error {
for _, conn := range conns {
err := conn.Close()
if err != nil {
log.Println(err)
log.Println("Conn close error:", err)
continue
}
}
Expand All @@ -596,10 +576,10 @@ func (c *TunaSessionClient) IsClosed() bool {
}

func (c *TunaSessionClient) newSession(remoteAddr string, sessionID []byte, connIDs []string, config *ncp.Config) (*ncp.Session, error) {
sessionKey := sessionKey(remoteAddr, sessionID)
sessKey := sessionKey(remoteAddr, sessionID)
return ncp.NewSession(c.addr, nkn.NewClientAddr(remoteAddr), connIDs, nil, (func(connID, _ string, buf []byte, writeTimeout time.Duration) error {
c.RLock()
conn := c.sessionConns[sessionKey][connID]
conn := c.sessionConns[sessKey][connID]
c.RUnlock()
if conn == nil {
return fmt.Errorf("conn %s is nil", connID)
Expand All @@ -610,7 +590,7 @@ func (c *TunaSessionClient) newSession(remoteAddr string, sessionID []byte, conn
}
err = writeMessage(conn, buf)
if err != nil {
log.Println(err)
log.Println("Write message error:", err)
return ncp.ErrConnClosed
}
return nil
Expand All @@ -636,7 +616,33 @@ func (c *TunaSessionClient) handleMsg(conn *Conn, sess *ncp.Session, i int) erro
return nil
}

func (c *TunaSessionClient) handleConn(conn *Conn, sess *ncp.Session, i int) {
func (c *TunaSessionClient) handleConn(conn *Conn, sessKey string, i int) {
c.Lock()
sess := c.sessions[sessKey]
if sess == nil {
c.Unlock()
return
}
c.connCount[sessKey]++
c.Unlock()

defer func() {
c.Lock()
c.connCount[sessKey]--
shouldClose := c.connCount[sessKey] == 0
if shouldClose {
delete(c.sessions, sessKey)
delete(c.sessionConns, sessKey)
delete(c.connCount, sessKey)
c.closedSessionKey.Add(sessKey, nil, gocache.DefaultExpiration)
}
c.Unlock()

if shouldClose {
sess.Close()
}
}()

for {
err := c.handleMsg(conn, sess, i)
if err != nil {
Expand All @@ -654,3 +660,27 @@ func (c *TunaSessionClient) handleConn(conn *Conn, sess *ncp.Session, i int) {
}
}
}

func (c *TunaSessionClient) removeClosedSessions() {
for {
time.Sleep(time.Second)

if c.IsClosed() {
return
}

c.Lock()
for sessKey, sess := range c.sessions {
if sess.IsClosed() {
for _, conn := range c.sessionConns[sessKey] {
conn.Close()
}
delete(c.sessions, sessKey)
delete(c.sessionConns, sessKey)
delete(c.connCount, sessKey)
c.closedSessionKey.Add(sessKey, nil, gocache.DefaultExpiration)
}
}
c.Unlock()
}
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,6 @@ require (
github.com/nknorg/nkn-sdk-go v1.3.2
github.com/nknorg/nkn/v2 v2.0.2
github.com/nknorg/tuna v0.0.0-20200821072604-fc469bb4e723
github.com/patrickmn/go-cache v2.1.0+incompatible
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9
)

0 comments on commit d3f32ab

Please sign in to comment.