Merge pull request #1 from loeffel-io/feature/keep-alive
added keepAlive
This commit is contained in:
@@ -37,6 +37,9 @@ type RecConn struct {
|
||||
NonVerbose bool
|
||||
// SubscribeHandler fires after the connection successfully establish.
|
||||
SubscribeHandler func() error
|
||||
// KeepAliveTimeout is an interval for sending ping/pong messages
|
||||
// disabled if 0
|
||||
KeepAliveTimeout time.Duration
|
||||
|
||||
mu sync.RWMutex
|
||||
url string
|
||||
@@ -298,6 +301,41 @@ func (rc *RecConn) hasSubscribeHandler() bool {
|
||||
return rc.SubscribeHandler != nil
|
||||
}
|
||||
|
||||
func (rc *RecConn) getKeepAliveTimeout() time.Duration {
|
||||
rc.mu.RLock()
|
||||
defer rc.mu.RUnlock()
|
||||
|
||||
return rc.KeepAliveTimeout
|
||||
}
|
||||
|
||||
func (rc *RecConn) keepAlive() {
|
||||
var (
|
||||
lastResponse = time.Now()
|
||||
ticker = time.NewTicker(rc.getKeepAliveTimeout())
|
||||
)
|
||||
|
||||
rc.Conn.SetPongHandler(func(msg string) error {
|
||||
lastResponse = time.Now()
|
||||
return nil
|
||||
})
|
||||
|
||||
go func() {
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
if err := rc.Conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(10*time.Second)); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
<-ticker.C
|
||||
if time.Now().Sub(lastResponse) > rc.getKeepAliveTimeout() {
|
||||
rc.closeAndReconnect()
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (rc *RecConn) connect() {
|
||||
b := rc.getBackoff()
|
||||
rand.Seed(time.Now().UTC().UnixNano())
|
||||
@@ -326,6 +364,10 @@ func (rc *RecConn) connect() {
|
||||
}
|
||||
|
||||
log.Printf("Dial: connect handler was successfully established with %s\n", rc.url)
|
||||
|
||||
if rc.getKeepAliveTimeout() != 0 {
|
||||
rc.keepAlive()
|
||||
}
|
||||
}
|
||||
|
||||
return
|
||||
|
||||
Reference in New Issue
Block a user