@@ -139,6 +139,12 @@ arrays (or any types) differently can easily override the default transcoding
|
|||||||
(so even using a strict with value and null fields would simply be a matter of
|
(so even using a strict with value and null fields would simply be a matter of
|
||||||
changing transcoders).
|
changing transcoders).
|
||||||
|
|
||||||
|
### Logging
|
||||||
|
|
||||||
|
Pgx defines the pgx.Logger interface. A value that satisfies this interface
|
||||||
|
used as part of ConnectionOptions or ConnectionPoolOptions to enable logging
|
||||||
|
of pgx activities.
|
||||||
|
|
||||||
## Testing
|
## Testing
|
||||||
|
|
||||||
Pgx supports multiple connection and authentication types. Setting up a test
|
Pgx supports multiple connection and authentication types. Setting up a test
|
||||||
|
|||||||
@@ -27,6 +27,7 @@ type ConnectionParameters struct {
|
|||||||
Password string
|
Password string
|
||||||
MsgBufSize int // Size of work buffer used for transcoding messages. For optimal performance, it should be large enough to store a single row from any result set. Default: 1024
|
MsgBufSize int // Size of work buffer used for transcoding messages. For optimal performance, it should be large enough to store a single row from any result set. Default: 1024
|
||||||
TLSConfig *tls.Config // config for TLS connection -- nil disables TLS
|
TLSConfig *tls.Config // config for TLS connection -- nil disables TLS
|
||||||
|
Logger Logger
|
||||||
}
|
}
|
||||||
|
|
||||||
// Connection is a PostgreSQL connection handle. It is not safe for concurrent usage.
|
// Connection is a PostgreSQL connection handle. It is not safe for concurrent usage.
|
||||||
@@ -46,6 +47,7 @@ type Connection struct {
|
|||||||
notifications []*Notification
|
notifications []*Notification
|
||||||
alive bool
|
alive bool
|
||||||
causeOfDeath error
|
causeOfDeath error
|
||||||
|
logger Logger
|
||||||
}
|
}
|
||||||
|
|
||||||
type preparedStatement struct {
|
type preparedStatement struct {
|
||||||
@@ -94,21 +96,33 @@ func Connect(parameters ConnectionParameters) (c *Connection, err error) {
|
|||||||
c = new(Connection)
|
c = new(Connection)
|
||||||
|
|
||||||
c.parameters = parameters
|
c.parameters = parameters
|
||||||
|
if c.parameters.Logger != nil {
|
||||||
|
c.logger = c.parameters.Logger
|
||||||
|
} else {
|
||||||
|
c.logger = nullLogger("null")
|
||||||
|
}
|
||||||
|
|
||||||
if c.parameters.Port == 0 {
|
if c.parameters.Port == 0 {
|
||||||
|
c.logger.Debug("Using default Port")
|
||||||
c.parameters.Port = 5432
|
c.parameters.Port = 5432
|
||||||
}
|
}
|
||||||
if c.parameters.MsgBufSize == 0 {
|
if c.parameters.MsgBufSize == 0 {
|
||||||
|
c.logger.Debug("Using default MsgBufSize")
|
||||||
c.parameters.MsgBufSize = 1024
|
c.parameters.MsgBufSize = 1024
|
||||||
}
|
}
|
||||||
|
|
||||||
if c.parameters.Socket != "" {
|
if c.parameters.Socket != "" {
|
||||||
|
c.logger.Info(fmt.Sprintf("Dialing PostgreSQL server at socket: %s", c.parameters.Socket))
|
||||||
c.conn, err = net.Dial("unix", c.parameters.Socket)
|
c.conn, err = net.Dial("unix", c.parameters.Socket)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
c.logger.Error(fmt.Sprintf("Connection failed: %v", err))
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
} else if c.parameters.Host != "" {
|
} else if c.parameters.Host != "" {
|
||||||
|
c.logger.Info(fmt.Sprintf("Dialing PostgreSQL server at host: %s:%d", c.parameters.Host, c.parameters.Port))
|
||||||
c.conn, err = net.Dial("tcp", fmt.Sprintf("%s:%d", c.parameters.Host, c.parameters.Port))
|
c.conn, err = net.Dial("tcp", fmt.Sprintf("%s:%d", c.parameters.Host, c.parameters.Port))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
c.logger.Error(fmt.Sprintf("Connection failed: %v", err))
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -116,6 +130,7 @@ func Connect(parameters ConnectionParameters) (c *Connection, err error) {
|
|||||||
if c != nil && err != nil {
|
if c != nil && err != nil {
|
||||||
c.conn.Close()
|
c.conn.Close()
|
||||||
c.alive = false
|
c.alive = false
|
||||||
|
c.logger.Error(err.Error())
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
@@ -126,7 +141,9 @@ func Connect(parameters ConnectionParameters) (c *Connection, err error) {
|
|||||||
c.alive = true
|
c.alive = true
|
||||||
|
|
||||||
if parameters.TLSConfig != nil {
|
if parameters.TLSConfig != nil {
|
||||||
|
c.logger.Debug("Starting TLS handshake")
|
||||||
if err = c.startTLS(); err != nil {
|
if err = c.startTLS(); err != nil {
|
||||||
|
c.logger.Error(fmt.Sprintf("TLS failed: %v", err))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -155,6 +172,8 @@ func Connect(parameters ConnectionParameters) (c *Connection, err error) {
|
|||||||
}
|
}
|
||||||
case readyForQuery:
|
case readyForQuery:
|
||||||
c.rxReadyForQuery(r)
|
c.rxReadyForQuery(r)
|
||||||
|
c.logger = newPidLogger(c.Pid, c.logger)
|
||||||
|
c.logger.Info("Connection established")
|
||||||
return c, nil
|
return c, nil
|
||||||
default:
|
default:
|
||||||
if err = c.processContextFreeMsg(t, r); err != nil {
|
if err = c.processContextFreeMsg(t, r); err != nil {
|
||||||
@@ -170,6 +189,7 @@ func Connect(parameters ConnectionParameters) (c *Connection, err error) {
|
|||||||
func (c *Connection) Close() (err error) {
|
func (c *Connection) Close() (err error) {
|
||||||
err = c.txMsg('X', c.getBuf(), true)
|
err = c.txMsg('X', c.getBuf(), true)
|
||||||
c.die(errors.New("Closed"))
|
c.die(errors.New("Closed"))
|
||||||
|
c.logger.Info("Closed connection")
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -183,6 +203,12 @@ func (c *Connection) Close() (err error) {
|
|||||||
// it is possible to process some rows and then for an error to occur. Callers
|
// it is possible to process some rows and then for an error to occur. Callers
|
||||||
// should be aware of this possibility.
|
// should be aware of this possibility.
|
||||||
func (c *Connection) SelectFunc(sql string, onDataRow func(*DataRowReader) error, arguments ...interface{}) (err error) {
|
func (c *Connection) SelectFunc(sql string, onDataRow func(*DataRowReader) error, arguments ...interface{}) (err error) {
|
||||||
|
defer func() {
|
||||||
|
if err != nil {
|
||||||
|
c.logger.Error(fmt.Sprintf("SelectFunc `%s` with %v failed: %v", sql, arguments, err))
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
var fields []FieldDescription
|
var fields []FieldDescription
|
||||||
|
|
||||||
if ps, present := c.preparedStatements[sql]; present {
|
if ps, present := c.preparedStatements[sql]; present {
|
||||||
@@ -298,6 +324,12 @@ func (c *Connection) SelectValue(sql string, arguments ...interface{}) (v interf
|
|||||||
// Returns a UnexpectedColumnCountError if exactly one column is not found
|
// Returns a UnexpectedColumnCountError if exactly one column is not found
|
||||||
// Returns a NotSingleRowError if exactly one row is not found
|
// Returns a NotSingleRowError if exactly one row is not found
|
||||||
func (c *Connection) SelectValueTo(w io.Writer, sql string, arguments ...interface{}) (err error) {
|
func (c *Connection) SelectValueTo(w io.Writer, sql string, arguments ...interface{}) (err error) {
|
||||||
|
defer func() {
|
||||||
|
if err != nil {
|
||||||
|
c.logger.Error(fmt.Sprintf("SelectValueTo `%s` with %v failed: %v", sql, arguments, err))
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
if err = c.sendQuery(sql, arguments...); err != nil {
|
if err = c.sendQuery(sql, arguments...); err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@@ -413,6 +445,12 @@ func (c *Connection) SelectValues(sql string, arguments ...interface{}) (values
|
|||||||
// Prepare creates a prepared statement with name and sql. sql can contain placeholders
|
// Prepare creates a prepared statement with name and sql. sql can contain placeholders
|
||||||
// for bound parameters. These placeholders are referenced positional as $1, $2, etc.
|
// for bound parameters. These placeholders are referenced positional as $1, $2, etc.
|
||||||
func (c *Connection) Prepare(name, sql string) (err error) {
|
func (c *Connection) Prepare(name, sql string) (err error) {
|
||||||
|
defer func() {
|
||||||
|
if err != nil {
|
||||||
|
c.logger.Error(fmt.Sprintf("Prepare `%s` as `%s` failed: %v", name, sql, err))
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
// parse
|
// parse
|
||||||
buf := c.getBuf()
|
buf := c.getBuf()
|
||||||
w := newMessageWriter(buf)
|
w := newMessageWriter(buf)
|
||||||
@@ -638,6 +676,12 @@ func (c *Connection) sendPreparedQuery(ps *preparedStatement, arguments ...inter
|
|||||||
// arguments will be sanitized before being interpolated into sql strings. arguments
|
// arguments will be sanitized before being interpolated into sql strings. arguments
|
||||||
// should be referenced positionally from the sql string as $1, $2, etc.
|
// should be referenced positionally from the sql string as $1, $2, etc.
|
||||||
func (c *Connection) Execute(sql string, arguments ...interface{}) (commandTag string, err error) {
|
func (c *Connection) Execute(sql string, arguments ...interface{}) (commandTag string, err error) {
|
||||||
|
defer func() {
|
||||||
|
if err != nil {
|
||||||
|
c.logger.Error(fmt.Sprintf("Execute `%s` with %v failed: %v", sql, arguments, err))
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
if err = c.sendQuery(sql, arguments...); err != nil {
|
if err = c.sendQuery(sql, arguments...); err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@@ -978,6 +1022,7 @@ func (c *Connection) txPasswordMessage(password string) (err error) {
|
|||||||
func (c *Connection) getBuf() *bytes.Buffer {
|
func (c *Connection) getBuf() *bytes.Buffer {
|
||||||
c.buf.Reset()
|
c.buf.Reset()
|
||||||
if cap(c.buf.Bytes()) > c.bufSize {
|
if cap(c.buf.Bytes()) > c.bufSize {
|
||||||
|
c.logger.Debug(fmt.Sprintf("c.buf (%d) is larger than c.bufSize (%d) -- resetting", cap(c.buf.Bytes()), c.bufSize))
|
||||||
c.buf = bytes.NewBuffer(make([]byte, 0, c.bufSize))
|
c.buf = bytes.NewBuffer(make([]byte, 0, c.bufSize))
|
||||||
}
|
}
|
||||||
return c.buf
|
return c.buf
|
||||||
|
|||||||
@@ -8,6 +8,7 @@ import (
|
|||||||
type ConnectionPoolOptions struct {
|
type ConnectionPoolOptions struct {
|
||||||
MaxConnections int // max simultaneous connections to use
|
MaxConnections int // max simultaneous connections to use
|
||||||
AfterConnect func(*Connection) error
|
AfterConnect func(*Connection) error
|
||||||
|
Logger Logger
|
||||||
}
|
}
|
||||||
|
|
||||||
type ConnectionPool struct {
|
type ConnectionPool struct {
|
||||||
@@ -17,6 +18,7 @@ type ConnectionPool struct {
|
|||||||
parameters ConnectionParameters // parameters used when establishing connection
|
parameters ConnectionParameters // parameters used when establishing connection
|
||||||
maxConnections int
|
maxConnections int
|
||||||
afterConnect func(*Connection) error
|
afterConnect func(*Connection) error
|
||||||
|
logger Logger
|
||||||
}
|
}
|
||||||
|
|
||||||
type ConnectionPoolStat struct {
|
type ConnectionPoolStat struct {
|
||||||
@@ -32,6 +34,11 @@ func NewConnectionPool(parameters ConnectionParameters, options ConnectionPoolOp
|
|||||||
p.parameters = parameters
|
p.parameters = parameters
|
||||||
p.maxConnections = options.MaxConnections
|
p.maxConnections = options.MaxConnections
|
||||||
p.afterConnect = options.AfterConnect
|
p.afterConnect = options.AfterConnect
|
||||||
|
if options.Logger != nil {
|
||||||
|
p.logger = options.Logger
|
||||||
|
} else {
|
||||||
|
p.logger = nullLogger("null")
|
||||||
|
}
|
||||||
|
|
||||||
p.allConnections = make([]*Connection, 0, p.maxConnections)
|
p.allConnections = make([]*Connection, 0, p.maxConnections)
|
||||||
p.availableConnections = make([]*Connection, 0, p.maxConnections)
|
p.availableConnections = make([]*Connection, 0, p.maxConnections)
|
||||||
@@ -72,9 +79,12 @@ func (p *ConnectionPool) Acquire() (c *Connection, err error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// All connections are in use and we cannot create more
|
// All connections are in use and we cannot create more
|
||||||
|
if len(p.availableConnections) == 0 {
|
||||||
|
p.logger.Warning("All connections in pool are busy - waiting...")
|
||||||
for len(p.availableConnections) == 0 {
|
for len(p.availableConnections) == 0 {
|
||||||
p.cond.Wait()
|
p.cond.Wait()
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
c = p.availableConnections[len(p.availableConnections)-1]
|
c = p.availableConnections[len(p.availableConnections)-1]
|
||||||
p.availableConnections = p.availableConnections[:len(p.availableConnections)-1]
|
p.availableConnections = p.availableConnections[:len(p.availableConnections)-1]
|
||||||
|
|||||||
@@ -0,0 +1,34 @@
|
|||||||
|
package pgx
|
||||||
|
|
||||||
|
import (
|
||||||
|
"strconv"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Logger interface {
|
||||||
|
Error(msg string)
|
||||||
|
Warning(msg string)
|
||||||
|
Info(msg string)
|
||||||
|
Debug(msg string)
|
||||||
|
}
|
||||||
|
|
||||||
|
type nullLogger string
|
||||||
|
|
||||||
|
func (l nullLogger) Error(msg string) {}
|
||||||
|
func (l nullLogger) Warning(msg string) {}
|
||||||
|
func (l nullLogger) Info(msg string) {}
|
||||||
|
func (l nullLogger) Debug(msg string) {}
|
||||||
|
|
||||||
|
type pidLogger struct {
|
||||||
|
prefix string
|
||||||
|
baseLogger Logger
|
||||||
|
}
|
||||||
|
|
||||||
|
func newPidLogger(pid int32, baseLogger Logger) *pidLogger {
|
||||||
|
prefix := "(" + strconv.FormatInt(int64(pid), 10) + ") "
|
||||||
|
return &pidLogger{prefix: prefix, baseLogger: baseLogger}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *pidLogger) Error(msg string) { l.baseLogger.Error(l.prefix + msg) }
|
||||||
|
func (l *pidLogger) Warning(msg string) { l.baseLogger.Warning(l.prefix + msg) }
|
||||||
|
func (l *pidLogger) Info(msg string) { l.baseLogger.Info(l.prefix + msg) }
|
||||||
|
func (l *pidLogger) Debug(msg string) { l.baseLogger.Debug(l.prefix + msg) }
|
||||||
Reference in New Issue
Block a user