Merge pull request #8 from loeffel-io/feature/keep-alive
added keepAlive
This commit is contained in:
@@ -0,0 +1,25 @@
|
|||||||
|
package recws
|
||||||
|
|
||||||
|
import (
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
type keepAliveResponse struct {
|
||||||
|
lastResponse time.Time
|
||||||
|
sync.RWMutex
|
||||||
|
}
|
||||||
|
|
||||||
|
func (k *keepAliveResponse) setLastResponse() {
|
||||||
|
k.Lock()
|
||||||
|
defer k.Unlock()
|
||||||
|
|
||||||
|
k.lastResponse = time.Now()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (k *keepAliveResponse) getLastResponse() time.Time {
|
||||||
|
k.RLock()
|
||||||
|
defer k.RUnlock()
|
||||||
|
|
||||||
|
return k.lastResponse
|
||||||
|
}
|
||||||
@@ -37,6 +37,9 @@ type RecConn struct {
|
|||||||
NonVerbose bool
|
NonVerbose bool
|
||||||
// SubscribeHandler fires after the connection successfully establish.
|
// SubscribeHandler fires after the connection successfully establish.
|
||||||
SubscribeHandler func() error
|
SubscribeHandler func() error
|
||||||
|
// KeepAliveTimeout is an interval for sending ping/pong messages
|
||||||
|
// disabled if 0
|
||||||
|
KeepAliveTimeout time.Duration
|
||||||
|
|
||||||
mu sync.RWMutex
|
mu sync.RWMutex
|
||||||
url string
|
url string
|
||||||
@@ -298,6 +301,47 @@ func (rc *RecConn) hasSubscribeHandler() bool {
|
|||||||
return rc.SubscribeHandler != nil
|
return rc.SubscribeHandler != nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (rc *RecConn) getKeepAliveTimeout() time.Duration {
|
||||||
|
rc.mu.RLock()
|
||||||
|
defer rc.mu.RUnlock()
|
||||||
|
|
||||||
|
return rc.KeepAliveTimeout
|
||||||
|
}
|
||||||
|
|
||||||
|
func (rc *RecConn) writeControlPingMessage() error {
|
||||||
|
rc.mu.Lock()
|
||||||
|
defer rc.mu.Unlock()
|
||||||
|
|
||||||
|
return rc.Conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(10*time.Second))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (rc *RecConn) keepAlive() {
|
||||||
|
var (
|
||||||
|
keepAliveResponse = new(keepAliveResponse)
|
||||||
|
ticker = time.NewTicker(rc.getKeepAliveTimeout())
|
||||||
|
)
|
||||||
|
|
||||||
|
rc.mu.Lock()
|
||||||
|
rc.Conn.SetPongHandler(func(msg string) error {
|
||||||
|
keepAliveResponse.setLastResponse()
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
rc.mu.Unlock()
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
defer ticker.Stop()
|
||||||
|
|
||||||
|
for {
|
||||||
|
rc.writeControlPingMessage()
|
||||||
|
<-ticker.C
|
||||||
|
if time.Now().Sub(keepAliveResponse.getLastResponse()) > rc.getKeepAliveTimeout() {
|
||||||
|
rc.closeAndReconnect()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
func (rc *RecConn) connect() {
|
func (rc *RecConn) connect() {
|
||||||
b := rc.getBackoff()
|
b := rc.getBackoff()
|
||||||
rand.Seed(time.Now().UTC().UnixNano())
|
rand.Seed(time.Now().UTC().UnixNano())
|
||||||
@@ -326,6 +370,10 @@ func (rc *RecConn) connect() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
log.Printf("Dial: connect handler was successfully established with %s\n", rc.url)
|
log.Printf("Dial: connect handler was successfully established with %s\n", rc.url)
|
||||||
|
|
||||||
|
if rc.getKeepAliveTimeout() != 0 {
|
||||||
|
rc.keepAlive()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return
|
return
|
||||||
|
|||||||
Reference in New Issue
Block a user