From a81a5f08b8b4050391b4d55951cba3b7337abb04 Mon Sep 17 00:00:00 2001 From: Jack Christensen Date: Fri, 26 Jul 2013 07:27:14 -0500 Subject: [PATCH] ConnectionPool now only creates the connections it needs --- connection_pool.go | 99 +++++++++++++++++++++++++++++++++-------- connection_pool_test.go | 46 ++++++++++++++++++- 2 files changed, 125 insertions(+), 20 deletions(-) diff --git a/connection_pool.go b/connection_pool.go index 9a040030..4d5741f2 100644 --- a/connection_pool.go +++ b/connection_pool.go @@ -1,42 +1,83 @@ package pgx +import ( + "sync" +) + type ConnectionPoolOptions struct { - MaxConnections int // max simultaneous connections to use (currently all are immediately connected) + MaxConnections int // max simultaneous connections to use AfterConnect func(*Connection) error } type ConnectionPool struct { - connectionChannel chan *Connection - parameters ConnectionParameters // parameters used when establishing connection - maxConnections int - afterConnect func(*Connection) error + allConnections []*Connection + availableConnections []*Connection + cond *sync.Cond + parameters ConnectionParameters // parameters used when establishing connection + maxConnections int + afterConnect func(*Connection) error +} + +type ConnectionPoolStat struct { + MaxConnections int // max simultaneous connections to use + CurrentConnections int // current live connections + AvailableConnections int // unused live connections } // NewConnectionPool creates a new ConnectionPool. parameters are passed through to // Connect directly. func NewConnectionPool(parameters ConnectionParameters, options ConnectionPoolOptions) (p *ConnectionPool, err error) { p = new(ConnectionPool) - p.connectionChannel = make(chan *Connection, options.MaxConnections) - p.parameters = parameters p.maxConnections = options.MaxConnections p.afterConnect = options.AfterConnect - for i := 0; i < p.maxConnections; i++ { - var c *Connection - c, err = p.createConnection() - if err != nil { - return - } - p.connectionChannel <- c + p.allConnections = make([]*Connection, 0, p.maxConnections) + p.availableConnections = make([]*Connection, 0, p.maxConnections) + p.cond = sync.NewCond(new(sync.Mutex)) + + // Initially establish one connection + var c *Connection + c, err = p.createConnection() + if err != nil { + return } + p.allConnections = append(p.allConnections, c) + p.availableConnections = append(p.availableConnections, c) return } // Acquire takes exclusive use of a connection until it is released. func (p *ConnectionPool) Acquire() (c *Connection, err error) { - c = <-p.connectionChannel + p.cond.L.Lock() + defer p.cond.L.Unlock() + + // A connection is available + if len(p.availableConnections) > 0 { + c = p.availableConnections[len(p.availableConnections)-1] + p.availableConnections = p.availableConnections[:len(p.availableConnections)-1] + return + } + + // No connections are available, but we can create more + if len(p.allConnections) < p.maxConnections { + c, err = p.createConnection() + if err != nil { + return + } + p.allConnections = append(p.allConnections, c) + return + } + + // All connections are in use and we cannot create more + for len(p.availableConnections) == 0 { + p.cond.Wait() + } + + c = p.availableConnections[len(p.availableConnections)-1] + p.availableConnections = p.availableConnections[:len(p.availableConnections)-1] + return } @@ -45,17 +86,39 @@ func (p *ConnectionPool) Release(c *Connection) { if c.TxStatus != 'I' { c.Execute("rollback") } - p.connectionChannel <- c + p.cond.L.Lock() + p.availableConnections = append(p.availableConnections, c) + p.cond.L.Unlock() + p.cond.Signal() } // Close ends the use of a connection by closing all underlying connections. func (p *ConnectionPool) Close() { for i := 0; i < p.maxConnections; i++ { - c := <-p.connectionChannel - _ = c.Close() + if c, err := p.Acquire(); err != nil { + _ = c.Close() + } } } +func (p *ConnectionPool) Stat() (s ConnectionPoolStat) { + p.cond.L.Lock() + defer p.cond.L.Unlock() + + s.MaxConnections = p.maxConnections + s.CurrentConnections = len(p.allConnections) + s.AvailableConnections = len(p.availableConnections) + return +} + +func (p *ConnectionPool) MaxConnectionCount() int { + return p.maxConnections +} + +func (p *ConnectionPool) CurrentConnectionCount() int { + return p.maxConnections +} + func (p *ConnectionPool) createConnection() (c *Connection, err error) { c, err = Connect(p.parameters) if err != nil { diff --git a/connection_pool_test.go b/connection_pool_test.go index 597728eb..fa3d4b1d 100644 --- a/connection_pool_test.go +++ b/connection_pool_test.go @@ -4,6 +4,7 @@ import ( "errors" "fmt" "github.com/JackC/pgx" + "sync" "testing" ) @@ -30,8 +31,10 @@ func TestNewConnectionPool(t *testing.T) { } defer pool.Close() - if numCallbacks != 2 { - t.Errorf("Expected AfterConnect callback to fire %v times but only fired %v times", numCallbacks, numCallbacks) + // It initially connects once + stat := pool.Stat() + if stat.CurrentConnections != 1 { + t.Errorf("Expected 1 connection to be established immediately, but %v were", numCallbacks) } // Pool creation returns an error if any AfterConnect callback does @@ -158,3 +161,42 @@ func TestPoolReleaseWithTransactions(t *testing.T) { t.Fatalf("Expected release to rollback uncommitted transaction, but it did not: '%c'", conn.TxStatus) } } + +func TestPoolAcquireAndReleaseCycleAutoConnect(t *testing.T) { + maxConnections := 3 + pool := createConnectionPool(t, maxConnections) + defer pool.Close() + + doSomething := func() { + c, err := pool.Acquire() + if err != nil { + t.Fatalf("Unable to Acquire: %v", err) + } + c.SelectValue("select 1") + pool.Release(c) + } + + for i := 0; i < 1000; i++ { + doSomething() + } + + stat := pool.Stat() + if stat.CurrentConnections != 1 { + t.Fatalf("Pool shouldn't have established more connections when no contention: %v", stat.CurrentConnections) + } + + var wg sync.WaitGroup + for i := 0; i < 1000; i++ { + wg.Add(1) + go func() { + defer wg.Done() + doSomething() + }() + } + wg.Wait() + + stat = pool.Stat() + if stat.CurrentConnections != stat.MaxConnections { + t.Fatalf("Pool should have used all possible connections: %v", stat.CurrentConnections) + } +}