From 40a86b719cd8a163aac428d8846081893a51ae26 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lucas=20L=C3=B6ffel?= Date: Wed, 13 Feb 2019 13:05:04 +0100 Subject: [PATCH] added keepAlive --- recws.go | 42 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/recws.go b/recws.go index b003118..d983bdd 100644 --- a/recws.go +++ b/recws.go @@ -37,6 +37,9 @@ type RecConn struct { NonVerbose bool // SubscribeHandler fires after the connection successfully establish. SubscribeHandler func() error + // KeepAliveTimeout is an interval for sending ping/pong messages + // disabled if 0 + KeepAliveTimeout time.Duration mu sync.RWMutex url string @@ -298,6 +301,41 @@ func (rc *RecConn) hasSubscribeHandler() bool { return rc.SubscribeHandler != nil } +func (rc *RecConn) getKeepAliveTimeout() time.Duration { + rc.mu.RLock() + defer rc.mu.RUnlock() + + return rc.KeepAliveTimeout +} + +func (rc *RecConn) keepAlive() { + var ( + lastResponse = time.Now() + ticker = time.NewTicker(rc.getKeepAliveTimeout()) + ) + + rc.Conn.SetPongHandler(func(msg string) error { + lastResponse = time.Now() + return nil + }) + + go func() { + defer ticker.Stop() + + for { + if err := rc.Conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(10*time.Second)); err != nil { + return + } + + <-ticker.C + if time.Now().Sub(lastResponse) > rc.getKeepAliveTimeout() { + rc.closeAndReconnect() + return + } + } + }() +} + func (rc *RecConn) connect() { b := rc.getBackoff() rand.Seed(time.Now().UTC().UnixNano()) @@ -326,6 +364,10 @@ func (rc *RecConn) connect() { } log.Printf("Dial: connect handler was successfully established with %s\n", rc.url) + + if rc.getKeepAliveTimeout() != 0 { + rc.keepAlive() + } } return