Begin extracting context handling
This commit is contained in:
@@ -88,6 +88,10 @@ type Conn struct {
|
|||||||
closingLock sync.Mutex
|
closingLock sync.Mutex
|
||||||
alive bool
|
alive bool
|
||||||
causeOfDeath error
|
causeOfDeath error
|
||||||
|
|
||||||
|
// context support
|
||||||
|
doneChan chan struct{}
|
||||||
|
closedChan chan struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
// PreparedStatement is a description of a prepared statement
|
// PreparedStatement is a description of a prepared statement
|
||||||
@@ -257,6 +261,8 @@ func (c *Conn) connect(config ConnConfig, network, address string, tlsConfig *tl
|
|||||||
c.channels = make(map[string]struct{})
|
c.channels = make(map[string]struct{})
|
||||||
c.alive = true
|
c.alive = true
|
||||||
c.lastActivityTime = time.Now()
|
c.lastActivityTime = time.Now()
|
||||||
|
c.doneChan = make(chan struct{})
|
||||||
|
c.closedChan = make(chan struct{})
|
||||||
|
|
||||||
if tlsConfig != nil {
|
if tlsConfig != nil {
|
||||||
if c.shouldLog(LogLevelDebug) {
|
if c.shouldLog(LogLevelDebug) {
|
||||||
@@ -619,8 +625,7 @@ func (c *Conn) Prepare(name, sql string) (ps *PreparedStatement, err error) {
|
|||||||
// name and sql arguments. This allows a code path to PrepareEx and Query/Exec without
|
// name and sql arguments. This allows a code path to PrepareEx and Query/Exec without
|
||||||
// concern for if the statement has already been prepared.
|
// concern for if the statement has already been prepared.
|
||||||
func (c *Conn) PrepareEx(name, sql string, opts *PrepareExOptions) (ps *PreparedStatement, err error) {
|
func (c *Conn) PrepareEx(name, sql string, opts *PrepareExOptions) (ps *PreparedStatement, err error) {
|
||||||
return c.PrepareExContext(context.Background(), name, sql, opts)
|
return c.prepareEx(name, sql, opts)
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Conn) PrepareExContext(ctx context.Context, name, sql string, opts *PrepareExOptions) (ps *PreparedStatement, err error) {
|
func (c *Conn) PrepareExContext(ctx context.Context, name, sql string, opts *PrepareExOptions) (ps *PreparedStatement, err error) {
|
||||||
@@ -630,25 +635,14 @@ func (c *Conn) PrepareExContext(ctx context.Context, name, sql string, opts *Pre
|
|||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
|
|
||||||
doneChan := make(chan struct{})
|
go c.contextHandler(ctx)
|
||||||
closedChan := make(chan struct{})
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
c.cancelQuery()
|
|
||||||
c.Close()
|
|
||||||
closedChan <- struct{}{}
|
|
||||||
case <-doneChan:
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
ps, err = c.prepareEx(name, sql, opts)
|
ps, err = c.prepareEx(name, sql, opts)
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-closedChan:
|
case <-c.closedChan:
|
||||||
return nil, ctx.Err()
|
return nil, ctx.Err()
|
||||||
case doneChan <- struct{}{}:
|
case c.doneChan <- struct{}{}:
|
||||||
return ps, err
|
return ps, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -1383,25 +1377,24 @@ func (c *Conn) ExecContext(ctx context.Context, sql string, arguments ...interfa
|
|||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
|
|
||||||
doneChan := make(chan struct{})
|
go c.contextHandler(ctx)
|
||||||
closedChan := make(chan struct{})
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
c.cancelQuery()
|
|
||||||
c.Close()
|
|
||||||
closedChan <- struct{}{}
|
|
||||||
case <-doneChan:
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
commandTag, err = c.Exec(sql, arguments...)
|
commandTag, err = c.Exec(sql, arguments...)
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-closedChan:
|
case <-c.closedChan:
|
||||||
return "", ctx.Err()
|
return "", ctx.Err()
|
||||||
case doneChan <- struct{}{}:
|
case c.doneChan <- struct{}{}:
|
||||||
return commandTag, err
|
return commandTag, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *Conn) contextHandler(ctx context.Context) {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
c.cancelQuery()
|
||||||
|
c.Close()
|
||||||
|
c.closedChan <- struct{}{}
|
||||||
|
case <-c.doneChan:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@@ -51,9 +51,7 @@ type Rows struct {
|
|||||||
unlockConn bool
|
unlockConn bool
|
||||||
closed bool
|
closed bool
|
||||||
|
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
doneChan chan struct{}
|
|
||||||
closedChan chan bool
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rows *Rows) FieldDescriptions() []FieldDescription {
|
func (rows *Rows) FieldDescriptions() []FieldDescription {
|
||||||
@@ -128,9 +126,9 @@ func (rows *Rows) Close() {
|
|||||||
|
|
||||||
if rows.ctx != nil {
|
if rows.ctx != nil {
|
||||||
select {
|
select {
|
||||||
case <-rows.closedChan:
|
case <-rows.conn.closedChan:
|
||||||
rows.err = rows.ctx.Err()
|
rows.err = rows.ctx.Err()
|
||||||
case rows.doneChan <- struct{}{}:
|
case rows.conn.doneChan <- struct{}{}:
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -508,33 +506,20 @@ func (c *Conn) QueryRow(sql string, args ...interface{}) *Row {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (c *Conn) QueryContext(ctx context.Context, sql string, args ...interface{}) (*Rows, error) {
|
func (c *Conn) QueryContext(ctx context.Context, sql string, args ...interface{}) (*Rows, error) {
|
||||||
doneChan := make(chan struct{})
|
go c.contextHandler(ctx)
|
||||||
closedChan := make(chan bool)
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
c.cancelQuery()
|
|
||||||
c.Close()
|
|
||||||
closedChan <- true
|
|
||||||
case <-doneChan:
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
rows, err := c.Query(sql, args...)
|
rows, err := c.Query(sql, args...)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
select {
|
select {
|
||||||
case <-closedChan:
|
case <-c.closedChan:
|
||||||
return rows, ctx.Err()
|
return rows, ctx.Err()
|
||||||
case doneChan <- struct{}{}:
|
case c.doneChan <- struct{}{}:
|
||||||
return rows, err
|
return rows, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
rows.ctx = ctx
|
rows.ctx = ctx
|
||||||
rows.doneChan = doneChan
|
|
||||||
rows.closedChan = closedChan
|
|
||||||
|
|
||||||
return rows, nil
|
return rows, nil
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user