2
0

allow recognize data as keepalive

This commit is contained in:
Nikolay Pavlovich
2022-09-07 01:26:05 +03:00
parent 4baede84bc
commit c1ec811cb1
2 changed files with 43 additions and 12 deletions
+18 -1
View File
@@ -6,7 +6,8 @@ import (
) )
type keepAliveResponse struct { type keepAliveResponse struct {
lastResponse time.Time allowDataResponse bool
lastResponse time.Time
sync.RWMutex sync.RWMutex
} }
@@ -17,6 +18,22 @@ func (k *keepAliveResponse) setLastResponse() {
k.lastResponse = time.Now() k.lastResponse = time.Now()
} }
func (k *keepAliveResponse) getAllowDataResponse() bool {
k.RLock()
allow := k.allowDataResponse
k.RUnlock()
return allow
}
func (k *keepAliveResponse) setLastDataResponse() {
allow := k.getAllowDataResponse()
if allow {
k.Lock()
k.lastResponse = time.Now()
k.Unlock()
}
}
func (k *keepAliveResponse) getLastResponse() time.Time { func (k *keepAliveResponse) getLastResponse() time.Time {
k.RLock() k.RLock()
defer k.RUnlock() defer k.RUnlock()
+25 -11
View File
@@ -46,14 +46,17 @@ type RecConn struct {
KeepAliveTimeout time.Duration KeepAliveTimeout time.Duration
// NonVerbose suppress connecting/reconnecting messages. // NonVerbose suppress connecting/reconnecting messages.
NonVerbose bool NonVerbose bool
// AllowKeepAliveDataResponse allows recognize data response like keepalive response
AllowKeepAliveDataResponse bool
isConnected bool isConnected bool
mu sync.RWMutex mu sync.RWMutex
url string url string
reqHeader http.Header reqHeader http.Header
httpResp *http.Response httpResp *http.Response
dialErr error dialErr error
dialer *websocket.Dialer dialer *websocket.Dialer
keepAliveResponse *keepAliveResponse
*websocket.Conn *websocket.Conn
} }
@@ -119,6 +122,9 @@ func (rc *RecConn) ReadMessage() (messageType int, message []byte, err error) {
rc.CloseAndReconnect() rc.CloseAndReconnect()
} }
} }
if err == nil {
rc.getKeepAliveResponse().setLastDataResponse()
}
return return
} }
@@ -192,6 +198,13 @@ func (rc *RecConn) ReadJSON(v interface{}) error {
return err return err
} }
func (rc *RecConn) getKeepAliveResponse() *keepAliveResponse {
rc.mu.RLock()
ka := rc.keepAliveResponse
rc.mu.RUnlock()
return ka
}
func (rc *RecConn) setURL(url string) { func (rc *RecConn) setURL(url string) {
rc.mu.Lock() rc.mu.Lock()
defer rc.mu.Unlock() defer rc.mu.Unlock()
@@ -385,13 +398,12 @@ func (rc *RecConn) writeControlPingMessage() error {
func (rc *RecConn) keepAlive() { func (rc *RecConn) keepAlive() {
var ( var (
keepAliveResponse = new(keepAliveResponse) ticker = time.NewTicker(rc.getKeepAliveTimeout())
ticker = time.NewTicker(rc.getKeepAliveTimeout())
) )
rc.mu.Lock() rc.mu.Lock()
rc.Conn.SetPongHandler(func(msg string) error { rc.Conn.SetPongHandler(func(msg string) error {
keepAliveResponse.setLastResponse() rc.getKeepAliveResponse().setLastResponse()
return nil return nil
}) })
rc.mu.Unlock() rc.mu.Unlock()
@@ -409,7 +421,7 @@ func (rc *RecConn) keepAlive() {
} }
<-ticker.C <-ticker.C
if time.Since(keepAliveResponse.getLastResponse()) > rc.getKeepAliveTimeout() { if time.Since(rc.getKeepAliveResponse().getLastResponse()) > rc.getKeepAliveTimeout() {
rc.CloseAndReconnect() rc.CloseAndReconnect()
return return
} }
@@ -430,6 +442,8 @@ func (rc *RecConn) connect() {
rc.dialErr = err rc.dialErr = err
rc.isConnected = err == nil rc.isConnected = err == nil
rc.httpResp = httpResp rc.httpResp = httpResp
rc.keepAliveResponse = new(keepAliveResponse)
rc.keepAliveResponse.allowDataResponse = rc.AllowKeepAliveDataResponse
rc.mu.Unlock() rc.mu.Unlock()
if err == nil { if err == nil {