diff --git a/keepalive.go b/keepalive.go new file mode 100644 index 0000000..bb14889 --- /dev/null +++ b/keepalive.go @@ -0,0 +1,25 @@ +package recws + +import ( + "sync" + "time" +) + +type keepAliveResponse struct { + lastResponse time.Time + sync.RWMutex +} + +func (k *keepAliveResponse) setLastResponse() { + k.Lock() + defer k.Unlock() + + k.lastResponse = time.Now() +} + +func (k *keepAliveResponse) getLastResponse() time.Time { + k.RLock() + defer k.RUnlock() + + return k.lastResponse +} diff --git a/recws.go b/recws.go index b003118..c0da768 100644 --- a/recws.go +++ b/recws.go @@ -37,6 +37,9 @@ type RecConn struct { NonVerbose bool // SubscribeHandler fires after the connection successfully establish. SubscribeHandler func() error + // KeepAliveTimeout is an interval for sending ping/pong messages + // disabled if 0 + KeepAliveTimeout time.Duration mu sync.RWMutex url string @@ -298,6 +301,47 @@ func (rc *RecConn) hasSubscribeHandler() bool { return rc.SubscribeHandler != nil } +func (rc *RecConn) getKeepAliveTimeout() time.Duration { + rc.mu.RLock() + defer rc.mu.RUnlock() + + return rc.KeepAliveTimeout +} + +func (rc *RecConn) writeControlPingMessage() error { + rc.mu.Lock() + defer rc.mu.Unlock() + + return rc.Conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(10*time.Second)) +} + +func (rc *RecConn) keepAlive() { + var ( + keepAliveResponse = new(keepAliveResponse) + ticker = time.NewTicker(rc.getKeepAliveTimeout()) + ) + + rc.mu.Lock() + rc.Conn.SetPongHandler(func(msg string) error { + keepAliveResponse.setLastResponse() + return nil + }) + rc.mu.Unlock() + + go func() { + defer ticker.Stop() + + for { + rc.writeControlPingMessage() + <-ticker.C + if time.Now().Sub(keepAliveResponse.getLastResponse()) > rc.getKeepAliveTimeout() { + rc.closeAndReconnect() + return + } + } + }() +} + func (rc *RecConn) connect() { b := rc.getBackoff() rand.Seed(time.Now().UTC().UnixNano()) @@ -326,6 +370,10 @@ func (rc *RecConn) connect() { } log.Printf("Dial: connect handler was successfully established with %s\n", rc.url) + + if rc.getKeepAliveTimeout() != 0 { + rc.keepAlive() + } } return