From 40a86b719cd8a163aac428d8846081893a51ae26 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lucas=20L=C3=B6ffel?= Date: Wed, 13 Feb 2019 13:05:04 +0100 Subject: [PATCH 1/2] added keepAlive --- recws.go | 42 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/recws.go b/recws.go index b003118..d983bdd 100644 --- a/recws.go +++ b/recws.go @@ -37,6 +37,9 @@ type RecConn struct { NonVerbose bool // SubscribeHandler fires after the connection successfully establish. SubscribeHandler func() error + // KeepAliveTimeout is an interval for sending ping/pong messages + // disabled if 0 + KeepAliveTimeout time.Duration mu sync.RWMutex url string @@ -298,6 +301,41 @@ func (rc *RecConn) hasSubscribeHandler() bool { return rc.SubscribeHandler != nil } +func (rc *RecConn) getKeepAliveTimeout() time.Duration { + rc.mu.RLock() + defer rc.mu.RUnlock() + + return rc.KeepAliveTimeout +} + +func (rc *RecConn) keepAlive() { + var ( + lastResponse = time.Now() + ticker = time.NewTicker(rc.getKeepAliveTimeout()) + ) + + rc.Conn.SetPongHandler(func(msg string) error { + lastResponse = time.Now() + return nil + }) + + go func() { + defer ticker.Stop() + + for { + if err := rc.Conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(10*time.Second)); err != nil { + return + } + + <-ticker.C + if time.Now().Sub(lastResponse) > rc.getKeepAliveTimeout() { + rc.closeAndReconnect() + return + } + } + }() +} + func (rc *RecConn) connect() { b := rc.getBackoff() rand.Seed(time.Now().UTC().UnixNano()) @@ -326,6 +364,10 @@ func (rc *RecConn) connect() { } log.Printf("Dial: connect handler was successfully established with %s\n", rc.url) + + if rc.getKeepAliveTimeout() != 0 { + rc.keepAlive() + } } return From b18b68d8ff9ba4c75f6c172192e7d0028865b7bc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lucas=20L=C3=B6ffel?= Date: Wed, 13 Feb 2019 21:34:50 +0100 Subject: [PATCH 2/2] fixed data race --- keepalive.go | 25 +++++++++++++++++++++++++ recws.go | 22 ++++++++++++++-------- 2 files changed, 39 insertions(+), 8 deletions(-) create mode 100644 keepalive.go diff --git a/keepalive.go b/keepalive.go new file mode 100644 index 0000000..bb14889 --- /dev/null +++ b/keepalive.go @@ -0,0 +1,25 @@ +package recws + +import ( + "sync" + "time" +) + +type keepAliveResponse struct { + lastResponse time.Time + sync.RWMutex +} + +func (k *keepAliveResponse) setLastResponse() { + k.Lock() + defer k.Unlock() + + k.lastResponse = time.Now() +} + +func (k *keepAliveResponse) getLastResponse() time.Time { + k.RLock() + defer k.RUnlock() + + return k.lastResponse +} diff --git a/recws.go b/recws.go index d983bdd..c0da768 100644 --- a/recws.go +++ b/recws.go @@ -308,27 +308,33 @@ func (rc *RecConn) getKeepAliveTimeout() time.Duration { return rc.KeepAliveTimeout } +func (rc *RecConn) writeControlPingMessage() error { + rc.mu.Lock() + defer rc.mu.Unlock() + + return rc.Conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(10*time.Second)) +} + func (rc *RecConn) keepAlive() { var ( - lastResponse = time.Now() - ticker = time.NewTicker(rc.getKeepAliveTimeout()) + keepAliveResponse = new(keepAliveResponse) + ticker = time.NewTicker(rc.getKeepAliveTimeout()) ) + rc.mu.Lock() rc.Conn.SetPongHandler(func(msg string) error { - lastResponse = time.Now() + keepAliveResponse.setLastResponse() return nil }) + rc.mu.Unlock() go func() { defer ticker.Stop() for { - if err := rc.Conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(10*time.Second)); err != nil { - return - } - + rc.writeControlPingMessage() <-ticker.C - if time.Now().Sub(lastResponse) > rc.getKeepAliveTimeout() { + if time.Now().Sub(keepAliveResponse.getLastResponse()) > rc.getKeepAliveTimeout() { rc.closeAndReconnect() return }