2
0

Merge branch 'master' into keepalive_data_response

This commit is contained in:
Nikolay Pavlovich
2022-09-07 01:27:54 +03:00
committed by GitHub
4 changed files with 48 additions and 20 deletions
+5 -3
View File
@@ -2,6 +2,8 @@
# recws # recws
***This is fork of [recws-org/recws](https://github.com/recws-org/recws) with some fixes***
Reconnecting WebSocket is a websocket client based on [gorilla/websocket](https://github.com/gorilla/websocket) that will automatically reconnect if the connection is dropped - thread safe! Reconnecting WebSocket is a websocket client based on [gorilla/websocket](https://github.com/gorilla/websocket) that will automatically reconnect if the connection is dropped - thread safe!
[![Build Status](https://travis-ci.com/recws-org/recws.svg?branch=master)](https://travis-ci.com/recws-org/recws) [![Build Status](https://travis-ci.com/recws-org/recws.svg?branch=master)](https://travis-ci.com/recws-org/recws)
@@ -12,7 +14,7 @@ Reconnecting WebSocket is a websocket client based on [gorilla/websocket](https:
## Installation ## Installation
```bash ```bash
go get github.com/recws-org/recws go get github.com/nikepan/recws
``` ```
## Sponsors ## Sponsors
@@ -21,8 +23,8 @@ go get github.com/recws-org/recws
## Logo ## Logo
- Logo by [Anastasia Marx](https://www.behance.net/AnastasiaMarx) - Logo by [Anastasia Marx](https://www.behance.net/AnastasiaMarx)
- Gopher by [Gophers](https://github.com/egonelbre/gophers) - Gopher by [Gophers](https://github.com/egonelbre/gophers)
## License ## License
+2 -1
View File
@@ -2,9 +2,10 @@ package main
import ( import (
"context" "context"
"github.com/recws-org/recws"
"log" "log"
"time" "time"
"github.com/nikepan/recws"
) )
func main() { func main() {
+2 -2
View File
@@ -1,6 +1,6 @@
module github.com/recws-org/recws module github.com/nikepan/recws
go 1.17 go 1.18
require ( require (
github.com/gorilla/websocket v1.4.2 github.com/gorilla/websocket v1.4.2
+39 -14
View File
@@ -5,6 +5,7 @@ package recws
import ( import (
"crypto/tls" "crypto/tls"
"errors" "errors"
"fmt"
"log" "log"
"math/rand" "math/rand"
"net/http" "net/http"
@@ -44,6 +45,8 @@ type RecConn struct {
// KeepAliveTimeout is an interval for sending ping/pong messages // KeepAliveTimeout is an interval for sending ping/pong messages
// disabled if 0 // disabled if 0
KeepAliveTimeout time.Duration KeepAliveTimeout time.Duration
// LogHandler handles all log messages
LogHandler func(v LogValues)
// NonVerbose suppress connecting/reconnecting messages. // NonVerbose suppress connecting/reconnecting messages.
NonVerbose bool NonVerbose bool
// AllowKeepAliveDataResponse allows recognize data response like keepalive response // AllowKeepAliveDataResponse allows recognize data response like keepalive response
@@ -61,6 +64,18 @@ type RecConn struct {
*websocket.Conn *websocket.Conn
} }
// LogValues type includes values for send to logger
type LogValues struct {
// Msg is main message
Msg string
// Err is error for separate and display it
Err error
// Url is connection url
Url string
// Fatal is tag of fatal error
Fatal bool
}
// CloseAndReconnect will try to reconnect. // CloseAndReconnect will try to reconnect.
func (rc *RecConn) CloseAndReconnect() { func (rc *RecConn) CloseAndReconnect() {
rc.Close() rc.Close()
@@ -85,13 +100,12 @@ func (rc *RecConn) getConn() *websocket.Conn {
// Close closes the underlying network connection without // Close closes the underlying network connection without
// sending or waiting for a close frame. // sending or waiting for a close frame.
func (rc *RecConn) Close() { func (rc *RecConn) Close() {
if rc.getConn() != nil { rc.mu.Lock()
rc.mu.Lock() if rc.Conn != nil {
rc.Conn.Close() rc.Conn.Close()
rc.mu.Unlock()
} }
rc.isConnected = false
rc.setIsConnected(false) rc.mu.Unlock()
} }
// Shutdown gracefully closes the connection by sending the websocket.CloseMessage. // Shutdown gracefully closes the connection by sending the websocket.CloseMessage.
@@ -101,7 +115,7 @@ func (rc *RecConn) Shutdown(writeWait time.Duration) {
err := rc.WriteControl(websocket.CloseMessage, msg, time.Now().Add(writeWait)) err := rc.WriteControl(websocket.CloseMessage, msg, time.Now().Add(writeWait))
if err != nil && err != websocket.ErrCloseSent { if err != nil && err != websocket.ErrCloseSent {
// If close message could not be sent, then close without the handshake. // If close message could not be sent, then close without the handshake.
log.Printf("Shutdown: %v", err) rc.log(LogValues{Err: err, Msg: "Shutdown"})
rc.Close() rc.Close()
} }
} }
@@ -328,7 +342,7 @@ func (rc *RecConn) Dial(urlStr string, reqHeader http.Header) {
urlStr, err := rc.parseURL(urlStr) urlStr, err := rc.parseURL(urlStr)
if err != nil { if err != nil {
log.Fatalf("Dial: %v", err) rc.log(LogValues{Msg: "Dial", Err: err, Fatal: true})
} }
// Config // Config
@@ -356,6 +370,16 @@ func (rc *RecConn) GetURL() string {
return rc.url return rc.url
} }
func (rc *RecConn) log(v LogValues) {
if rc.LogHandler != nil {
rc.LogHandler(v)
} else if v.Err != nil {
log.Printf("ERROR: %+v: %+v (%+v)\n", v.Msg, v.Err, v.Url)
} else {
log.Printf("%+v (%+v)\n", v.Msg, v.Url)
}
}
func (rc *RecConn) getNonVerbose() bool { func (rc *RecConn) getNonVerbose() bool {
rc.mu.RLock() rc.mu.RLock()
defer rc.mu.RUnlock() defer rc.mu.RUnlock()
@@ -417,11 +441,13 @@ func (rc *RecConn) keepAlive() {
} }
if err := rc.writeControlPingMessage(); err != nil { if err := rc.writeControlPingMessage(); err != nil {
log.Println(err) rc.log(LogValues{Err: err})
} }
<-ticker.C <-ticker.C
if time.Since(rc.getKeepAliveResponse().getLastResponse()) > rc.getKeepAliveTimeout() { timeoutOffset := time.Millisecond * 500
if time.Since(rc.getKeepAliveResponse().getLastResponse()) > rc.getKeepAliveTimeout()+timeoutOffset {
rc.log(LogValues{Err: errors.New("keepalive timeout"), Msg: "Reconnect", Url: rc.url})
rc.CloseAndReconnect() rc.CloseAndReconnect()
return return
} }
@@ -448,15 +474,15 @@ func (rc *RecConn) connect() {
if err == nil { if err == nil {
if !rc.getNonVerbose() { if !rc.getNonVerbose() {
log.Printf("Dial: connection was successfully established with %s\n", rc.url) rc.log(LogValues{Msg: "Dial: connection was successfully established", Url: rc.url})
} }
if rc.hasSubscribeHandler() { if rc.hasSubscribeHandler() {
if err := rc.SubscribeHandler(); err != nil { if err := rc.SubscribeHandler(); err != nil {
log.Fatalf("Dial: connect handler failed with %s", err.Error()) rc.log(LogValues{Msg: "Dial: connect handler failed", Err: err, Fatal: true})
} }
if !rc.getNonVerbose() { if !rc.getNonVerbose() {
log.Printf("Dial: connect handler was successfully established with %s\n", rc.url) rc.log(LogValues{Msg: "Dial: connect handler was successfully established", Url: rc.url})
} }
} }
@@ -468,8 +494,7 @@ func (rc *RecConn) connect() {
} }
if !rc.getNonVerbose() { if !rc.getNonVerbose() {
log.Println(err) rc.log(LogValues{Err: err, Msg: fmt.Sprintf("Dial: will try again in %+v seconds", nextItvl)})
log.Println("Dial: will try again in", nextItvl, "seconds.")
} }
time.Sleep(nextItvl) time.Sleep(nextItvl)