Skip to content

Commit

Permalink
Gracefully handle TERM signals (#206)
Browse files Browse the repository at this point in the history
Add a -term_timeout flag, when receiving the TERM singal proxy will wait up to `term_timeout` for existing connections to close.
  • Loading branch information
supersam654 authored and hfwang committed Oct 9, 2018
1 parent 3236ed5 commit ac834ce
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 12 deletions.
24 changes: 22 additions & 2 deletions cmd/cloud_sql_proxy/cloud_sql_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@ import (
"log"
"net/http"
"os"
"os/signal"
"path/filepath"
"strings"
"sync"
"syscall"
"time"

"github.com/GoogleCloudPlatform/cloudsql-proxy/logging"
Expand Down Expand Up @@ -77,6 +79,7 @@ can be removed automatically by this program.`)
// Settings for limits
maxConnections = flag.Uint64("max_connections", 0, `If provided, the maximum number of connections to establish before refusing new connections. Defaults to 0 (no limit)`)
fdRlimit = flag.Uint64("fd_rlimit", limits.ExpectedFDs, `Sets the rlimit on the number of open file descriptors for the proxy to the provided value. If set to zero, disables attempts to set the rlimit. Defaults to a value which can support 4K connections to one instance`)
termTimeout = flag.Duration("term_timeout", 0, "When set, the proxy will wait for existing connections to close before terminating. Any connections that haven't closed after the timeout will be dropped")

// Settings for authentication.
token = flag.String("token", "", "When set, the proxy uses this Bearer token for authorization.")
Expand Down Expand Up @@ -496,7 +499,7 @@ func main() {
}
logging.Infof("Ready for new connections")

(&proxy.Client{
proxyClient := &proxy.Client{
Port: port,
MaxConnections: *maxConnections,
Certs: certs.NewCertSourceOpts(client, certs.RemoteOpts{
Expand All @@ -507,5 +510,22 @@ func main() {
}),
Conns: connset,
RefreshCfgThrottle: refreshCfgThrottle,
}).Run(connSrc)
}

signals := make(chan os.Signal, 1)
signal.Notify(signals, syscall.SIGTERM, syscall.SIGINT)

go func() {
<-signals
logging.Infof("Received TERM signal. Waiting up to %s before terminating.", *termTimeout)

err := proxyClient.Shutdown(*termTimeout)

if err == nil {
os.Exit(0)
}
os.Exit(2)
}()

proxyClient.Run(connSrc)
}
33 changes: 23 additions & 10 deletions proxy/proxy/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,18 +111,15 @@ func (c *Client) Run(connSrc <-chan Conn) {
}

func (c *Client) handleConn(conn Conn) {
// Track connections count only if a maximum connections limit is set to avoid useless overhead
if c.MaxConnections > 0 {
active := atomic.AddUint64(&c.ConnectionsCounter, 1)
active := atomic.AddUint64(&c.ConnectionsCounter, 1)

// Deferred decrement of ConnectionsCounter upon connection closing
defer atomic.AddUint64(&c.ConnectionsCounter, ^uint64(0))
// Deferred decrement of ConnectionsCounter upon connection closing
defer atomic.AddUint64(&c.ConnectionsCounter, ^uint64(0))

if active > c.MaxConnections {
logging.Errorf("too many open connections (max %d)", c.MaxConnections)
conn.Conn.Close()
return
}
if c.MaxConnections > 0 && active > c.MaxConnections {
logging.Errorf("too many open connections (max %d)", c.MaxConnections)
conn.Conn.Close()
return
}

server, err := c.Dial(conn.Instance)
Expand Down Expand Up @@ -323,3 +320,19 @@ func NewConnSrc(instance string, l net.Listener) <-chan Conn {
}()
return ch
}

// Shutdown waits up to a given amount of time for all active connections to
// close. Returns an error if there are still active connections after waiting
// for the whole length of the timeout.
func (c *Client) Shutdown(termTimeout time.Duration) error {
termTime := time.Now().Add(termTimeout)
for termTime.After(time.Now()) && atomic.LoadUint64(&c.ConnectionsCounter) > 0 {
time.Sleep(1)
}

active := atomic.LoadUint64(&c.ConnectionsCounter)
if active == 0 {
return nil
}
return fmt.Errorf("%d active connections still exist after waiting for %v", active, termTimeout)
}
32 changes: 32 additions & 0 deletions proxy/proxy/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,3 +183,35 @@ func TestMaximumConnectionsCount(t *testing.T) {
t.Errorf("client should have dialed exactly the maximum of %d connections (%d connections, %d dials)", maxConnections, numConnections, dials)
}
}

func TestShutdownTerminatesEarly(t *testing.T) {
b := &fakeCerts{}
c := &Client{
Certs: &blockingCertSource{
map[string]*fakeCerts{
instance: b,
}},
Dialer: func(string, string) (net.Conn, error) {
return nil, nil
},
}

shutdown := make(chan bool, 1)
go func() {
c.Shutdown(1)
shutdown <- true
}()

shutdownFinished := false

// In case the code is actually broken and the client doesn't shut down quickly, don't cause the test to hang until it times out.
select {
case <-time.After(100 * time.Millisecond):
case shutdownFinished = <-shutdown:
}

if !shutdownFinished {
t.Errorf("shutdown should have completed quickly because there are no active connections")
}

}

0 comments on commit ac834ce

Please sign in to comment.