2
0

Merge pull request #7 from nikepan/keepalive_data_response

allow recognize data as keepalive
This commit is contained in:
Nikolay Pavlovich
2022-09-07 01:28:07 +03:00
committed by GitHub
2 changed files with 43 additions and 12 deletions
+17
View File
@@ -6,6 +6,7 @@ import (
) )
type keepAliveResponse struct { type keepAliveResponse struct {
allowDataResponse bool
lastResponse time.Time lastResponse time.Time
sync.RWMutex sync.RWMutex
} }
@@ -17,6 +18,22 @@ func (k *keepAliveResponse) setLastResponse() {
k.lastResponse = time.Now() k.lastResponse = time.Now()
} }
func (k *keepAliveResponse) getAllowDataResponse() bool {
k.RLock()
allow := k.allowDataResponse
k.RUnlock()
return allow
}
func (k *keepAliveResponse) setLastDataResponse() {
allow := k.getAllowDataResponse()
if allow {
k.Lock()
k.lastResponse = time.Now()
k.Unlock()
}
}
func (k *keepAliveResponse) getLastResponse() time.Time { func (k *keepAliveResponse) getLastResponse() time.Time {
k.RLock() k.RLock()
defer k.RUnlock() defer k.RUnlock()
+17 -3
View File
@@ -49,6 +49,8 @@ type RecConn struct {
LogHandler func(v LogValues) LogHandler func(v LogValues)
// NonVerbose suppress connecting/reconnecting messages. // NonVerbose suppress connecting/reconnecting messages.
NonVerbose bool NonVerbose bool
// AllowKeepAliveDataResponse allows recognize data response like keepalive response
AllowKeepAliveDataResponse bool
isConnected bool isConnected bool
mu sync.RWMutex mu sync.RWMutex
@@ -57,6 +59,7 @@ type RecConn struct {
httpResp *http.Response httpResp *http.Response
dialErr error dialErr error
dialer *websocket.Dialer dialer *websocket.Dialer
keepAliveResponse *keepAliveResponse
*websocket.Conn *websocket.Conn
} }
@@ -133,6 +136,9 @@ func (rc *RecConn) ReadMessage() (messageType int, message []byte, err error) {
rc.CloseAndReconnect() rc.CloseAndReconnect()
} }
} }
if err == nil {
rc.getKeepAliveResponse().setLastDataResponse()
}
return return
} }
@@ -206,6 +212,13 @@ func (rc *RecConn) ReadJSON(v interface{}) error {
return err return err
} }
func (rc *RecConn) getKeepAliveResponse() *keepAliveResponse {
rc.mu.RLock()
ka := rc.keepAliveResponse
rc.mu.RUnlock()
return ka
}
func (rc *RecConn) setURL(url string) { func (rc *RecConn) setURL(url string) {
rc.mu.Lock() rc.mu.Lock()
defer rc.mu.Unlock() defer rc.mu.Unlock()
@@ -409,13 +422,12 @@ func (rc *RecConn) writeControlPingMessage() error {
func (rc *RecConn) keepAlive() { func (rc *RecConn) keepAlive() {
var ( var (
keepAliveResponse = new(keepAliveResponse)
ticker = time.NewTicker(rc.getKeepAliveTimeout()) ticker = time.NewTicker(rc.getKeepAliveTimeout())
) )
rc.mu.Lock() rc.mu.Lock()
rc.Conn.SetPongHandler(func(msg string) error { rc.Conn.SetPongHandler(func(msg string) error {
keepAliveResponse.setLastResponse() rc.getKeepAliveResponse().setLastResponse()
return nil return nil
}) })
rc.mu.Unlock() rc.mu.Unlock()
@@ -434,7 +446,7 @@ func (rc *RecConn) keepAlive() {
<-ticker.C <-ticker.C
timeoutOffset := time.Millisecond * 500 timeoutOffset := time.Millisecond * 500
if time.Since(keepAliveResponse.getLastResponse()) > rc.getKeepAliveTimeout()+timeoutOffset { if time.Since(rc.getKeepAliveResponse().getLastResponse()) > rc.getKeepAliveTimeout()+timeoutOffset {
rc.log(LogValues{Err: errors.New("keepalive timeout"), Msg: "Reconnect", Url: rc.url}) rc.log(LogValues{Err: errors.New("keepalive timeout"), Msg: "Reconnect", Url: rc.url})
rc.CloseAndReconnect() rc.CloseAndReconnect()
return return
@@ -456,6 +468,8 @@ func (rc *RecConn) connect() {
rc.dialErr = err rc.dialErr = err
rc.isConnected = err == nil rc.isConnected = err == nil
rc.httpResp = httpResp rc.httpResp = httpResp
rc.keepAliveResponse = new(keepAliveResponse)
rc.keepAliveResponse.allowDataResponse = rc.AllowKeepAliveDataResponse
rc.mu.Unlock() rc.mu.Unlock()
if err == nil { if err == nil {