From c1ec811cb148d3d1a648460c3a8e62f4792835ea Mon Sep 17 00:00:00 2001 From: Nikolay Pavlovich Date: Wed, 7 Sep 2022 01:26:05 +0300 Subject: [PATCH] allow recognize data as keepalive --- keepalive.go | 19 ++++++++++++++++++- recws.go | 36 +++++++++++++++++++++++++----------- 2 files changed, 43 insertions(+), 12 deletions(-) diff --git a/keepalive.go b/keepalive.go index bb14889..55d71fa 100644 --- a/keepalive.go +++ b/keepalive.go @@ -6,7 +6,8 @@ import ( ) type keepAliveResponse struct { - lastResponse time.Time + allowDataResponse bool + lastResponse time.Time sync.RWMutex } @@ -17,6 +18,22 @@ func (k *keepAliveResponse) setLastResponse() { k.lastResponse = time.Now() } +func (k *keepAliveResponse) getAllowDataResponse() bool { + k.RLock() + allow := k.allowDataResponse + k.RUnlock() + return allow +} + +func (k *keepAliveResponse) setLastDataResponse() { + allow := k.getAllowDataResponse() + if allow { + k.Lock() + k.lastResponse = time.Now() + k.Unlock() + } +} + func (k *keepAliveResponse) getLastResponse() time.Time { k.RLock() defer k.RUnlock() diff --git a/recws.go b/recws.go index 4faa7af..1d95489 100644 --- a/recws.go +++ b/recws.go @@ -46,14 +46,17 @@ type RecConn struct { KeepAliveTimeout time.Duration // NonVerbose suppress connecting/reconnecting messages. NonVerbose bool + // AllowKeepAliveDataResponse allows recognize data response like keepalive response + AllowKeepAliveDataResponse bool - isConnected bool - mu sync.RWMutex - url string - reqHeader http.Header - httpResp *http.Response - dialErr error - dialer *websocket.Dialer + isConnected bool + mu sync.RWMutex + url string + reqHeader http.Header + httpResp *http.Response + dialErr error + dialer *websocket.Dialer + keepAliveResponse *keepAliveResponse *websocket.Conn } @@ -119,6 +122,9 @@ func (rc *RecConn) ReadMessage() (messageType int, message []byte, err error) { rc.CloseAndReconnect() } } + if err == nil { + rc.getKeepAliveResponse().setLastDataResponse() + } return } @@ -192,6 +198,13 @@ func (rc *RecConn) ReadJSON(v interface{}) error { return err } +func (rc *RecConn) getKeepAliveResponse() *keepAliveResponse { + rc.mu.RLock() + ka := rc.keepAliveResponse + rc.mu.RUnlock() + return ka +} + func (rc *RecConn) setURL(url string) { rc.mu.Lock() defer rc.mu.Unlock() @@ -385,13 +398,12 @@ func (rc *RecConn) writeControlPingMessage() error { func (rc *RecConn) keepAlive() { var ( - keepAliveResponse = new(keepAliveResponse) - ticker = time.NewTicker(rc.getKeepAliveTimeout()) + ticker = time.NewTicker(rc.getKeepAliveTimeout()) ) rc.mu.Lock() rc.Conn.SetPongHandler(func(msg string) error { - keepAliveResponse.setLastResponse() + rc.getKeepAliveResponse().setLastResponse() return nil }) rc.mu.Unlock() @@ -409,7 +421,7 @@ func (rc *RecConn) keepAlive() { } <-ticker.C - if time.Since(keepAliveResponse.getLastResponse()) > rc.getKeepAliveTimeout() { + if time.Since(rc.getKeepAliveResponse().getLastResponse()) > rc.getKeepAliveTimeout() { rc.CloseAndReconnect() return } @@ -430,6 +442,8 @@ func (rc *RecConn) connect() { rc.dialErr = err rc.isConnected = err == nil rc.httpResp = httpResp + rc.keepAliveResponse = new(keepAliveResponse) + rc.keepAliveResponse.allowDataResponse = rc.AllowKeepAliveDataResponse rc.mu.Unlock() if err == nil {