From f5e4ac14158d4a96801a2a0ae284ae4a65fede8e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lucas=20L=C3=B6ffel?= Date: Fri, 6 Jul 2018 00:17:05 +0200 Subject: [PATCH 01/23] fixed typo --- recws.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/recws.go b/recws.go index 077eff4..2b0c28a 100644 --- a/recws.go +++ b/recws.go @@ -47,8 +47,8 @@ type RecConn struct { *websocket.Conn } -// CloseAndRecconect will try to reconnect. -func (rc *RecConn) closeAndRecconect() { +// CloseAndReconnect will try to reconnect. +func (rc *RecConn) closeAndReconnect() { rc.Close() go func() { rc.connect() @@ -76,7 +76,7 @@ func (rc *RecConn) ReadMessage() (messageType int, message []byte, err error) { if rc.IsConnected() { messageType, message, err = rc.Conn.ReadMessage() if err != nil { - rc.closeAndRecconect() + rc.closeAndReconnect() } } @@ -92,7 +92,7 @@ func (rc *RecConn) WriteMessage(messageType int, data []byte) error { if rc.IsConnected() { err = rc.Conn.WriteMessage(messageType, data) if err != nil { - rc.closeAndRecconect() + rc.closeAndReconnect() } } @@ -110,7 +110,7 @@ func (rc *RecConn) WriteJSON(v interface{}) error { if rc.IsConnected() { err = rc.Conn.WriteJSON(v) if err != nil { - rc.closeAndRecconect() + rc.closeAndReconnect() } } From 6fe8fd6ea5e039612f3fb5d025a97d726de3d354 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lucas=20L=C3=B6ffel?= Date: Fri, 6 Jul 2018 00:20:22 +0200 Subject: [PATCH 02/23] added goland ignore --- .gitignore | 1 + 1 file changed, 1 insertion(+) diff --git a/.gitignore b/.gitignore index 8f5fd87..c610222 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ .history +/.idea \ No newline at end of file From b501600fcce2f5b04884c55fe9fd6b72ae5ce22c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lucas=20L=C3=B6ffel?= Date: Fri, 6 Jul 2018 01:09:36 +0200 Subject: [PATCH 03/23] Added ReadJSON --- recws.go | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/recws.go b/recws.go index 2b0c28a..63c91aa 100644 --- a/recws.go +++ b/recws.go @@ -117,6 +117,25 @@ func (rc *RecConn) WriteJSON(v interface{}) error { return err } +// ReadJSON reads the next JSON-encoded message from the connection and stores +// it in the value pointed to by v. +// +// See the documentation for the encoding/json Unmarshal function for details +// about the conversion of JSON to a Go value. +// +// If the connection is closed ErrNotConnected is returned +func (rc *RecConn) ReadJSON(v interface{}) error { + err := ErrNotConnected + if rc.IsConnected() { + err = rc.Conn.ReadJSON(v) + if err != nil { + rc.closeAndReconnect() + } + } + + return err +} + // Dial creates a new client connection. // The URL url specifies the host and request URI. Use requestHeader to specify // the origin (Origin), subprotocols (Sec-WebSocket-Protocol) and cookies From e8861f25c79e292b79aab4a8524df4586e8749c5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lucas=20L=C3=B6ffel?= Date: Sat, 14 Jul 2018 23:29:03 +0200 Subject: [PATCH 04/23] added goreportcard --- README.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/README.md b/README.md index 5fc85ea..dc26c09 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,5 @@ # recws + +[![Go Report Card](https://goreportcard.com/badge/github.com/loeffel-io/recws)](https://goreportcard.com/report/github.com/loeffel-io/recws) + Reconnecting WebSocket is a websocket client based on [gorilla/websocket](https://github.com/gorilla/websocket) that will automatically reconnect if the connection is dropped. From bbc5af6a3cae3a8e509b0ffadfd3830d1399f24b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lucas=20L=C3=B6ffel?= Date: Tue, 9 Oct 2018 11:08:15 +0200 Subject: [PATCH 05/23] added connect handler and GetURL method --- recws.go | 44 ++++++++++++++++++++++++++++++++------------ 1 file changed, 32 insertions(+), 12 deletions(-) diff --git a/recws.go b/recws.go index 63c91aa..35d08b3 100644 --- a/recws.go +++ b/recws.go @@ -35,6 +35,8 @@ type RecConn struct { HandshakeTimeout time.Duration // NonVerbose suppress connecting/reconnecting messages. NonVerbose bool + // ConnectHandler fires after the connection successfully establish. + ConnectHandler func(rc *RecConn) error mu sync.Mutex url string @@ -50,10 +52,7 @@ type RecConn struct { // CloseAndReconnect will try to reconnect. func (rc *RecConn) closeAndReconnect() { rc.Close() - go func() { - rc.connect() - }() - + go rc.connect() } // Close closes the underlying network connection without @@ -180,14 +179,17 @@ func (rc *RecConn) Dial(urlStr string, reqHeader http.Header) { rc.dialer = websocket.DefaultDialer rc.dialer.HandshakeTimeout = rc.HandshakeTimeout - go func() { - rc.connect() - }() + go rc.connect() // wait on first attempt time.Sleep(rc.HandshakeTimeout) } +// GetURL returns current connection url +func (rc *RecConn) GetURL() string { + return rc.url +} + func (rc *RecConn) connect() { b := &backoff.Backoff{ Min: rc.RecIntvlMin, @@ -214,12 +216,23 @@ func (rc *RecConn) connect() { if !rc.NonVerbose { log.Printf("Dial: connection was successfully established with %s\n", rc.url) } - break - } else { - if !rc.NonVerbose { - log.Println(err) - log.Println("Dial: will try again in", nextItvl, "seconds.") + + if rc.ConnectHandler == nil { + break } + + if err := rc.ConnectHandler(rc); err != nil { + log.Fatalf("Dial: connect handler failed with %s", err.Error()) + } + + log.Printf("Dial: connect handler was successfully established with %s\n", rc.url) + + return + } + + if !rc.NonVerbose { + log.Println(err) + log.Println("Dial: will try again in", nextItvl, "seconds.") } time.Sleep(nextItvl) @@ -252,3 +265,10 @@ func (rc *RecConn) IsConnected() bool { return rc.isConnected } + +func (rc *RecConn) GetConnection() *RecConn { + rc.mu.Lock() + defer rc.mu.Unlock() + + return rc +} From 7ad86cf9a647bce347918b82a1fdbeb42f454900 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lucas=20L=C3=B6ffel?= Date: Tue, 9 Oct 2018 11:10:42 +0200 Subject: [PATCH 06/23] break to return --- recws.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/recws.go b/recws.go index 35d08b3..31a7fbc 100644 --- a/recws.go +++ b/recws.go @@ -218,7 +218,7 @@ func (rc *RecConn) connect() { } if rc.ConnectHandler == nil { - break + return } if err := rc.ConnectHandler(rc); err != nil { From 67334dd2414c42d8ddfad989b123abe06a557aa1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lucas=20L=C3=B6ffel?= Date: Tue, 9 Oct 2018 11:11:31 +0200 Subject: [PATCH 07/23] removed getconnection --- recws.go | 7 ------- 1 file changed, 7 deletions(-) diff --git a/recws.go b/recws.go index 31a7fbc..7724aa6 100644 --- a/recws.go +++ b/recws.go @@ -265,10 +265,3 @@ func (rc *RecConn) IsConnected() bool { return rc.isConnected } - -func (rc *RecConn) GetConnection() *RecConn { - rc.mu.Lock() - defer rc.mu.Unlock() - - return rc -} From 6cd130b00b75f4a2948369f742fa2c6d2ec8233b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lucas=20L=C3=B6ffel?= Date: Tue, 9 Oct 2018 11:14:43 +0200 Subject: [PATCH 08/23] renamed to subscribeHandler --- recws.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/recws.go b/recws.go index 7724aa6..a7d5522 100644 --- a/recws.go +++ b/recws.go @@ -35,8 +35,8 @@ type RecConn struct { HandshakeTimeout time.Duration // NonVerbose suppress connecting/reconnecting messages. NonVerbose bool - // ConnectHandler fires after the connection successfully establish. - ConnectHandler func(rc *RecConn) error + // SubscribeHandler fires after the connection successfully establish. + SubscribeHandler func(rc *RecConn) error mu sync.Mutex url string @@ -217,11 +217,11 @@ func (rc *RecConn) connect() { log.Printf("Dial: connection was successfully established with %s\n", rc.url) } - if rc.ConnectHandler == nil { + if rc.SubscribeHandler == nil { return } - if err := rc.ConnectHandler(rc); err != nil { + if err := rc.SubscribeHandler(rc); err != nil { log.Fatalf("Dial: connect handler failed with %s", err.Error()) } From 7c395c3a92a843fb830f41445975b3e5c4ac1284 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lucas=20L=C3=B6ffel?= Date: Tue, 9 Oct 2018 11:21:45 +0200 Subject: [PATCH 09/23] fix subscribe channel --- recws.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/recws.go b/recws.go index a7d5522..0235d43 100644 --- a/recws.go +++ b/recws.go @@ -215,17 +215,17 @@ func (rc *RecConn) connect() { if err == nil { if !rc.NonVerbose { log.Printf("Dial: connection was successfully established with %s\n", rc.url) - } - if rc.SubscribeHandler == nil { - return - } + if rc.SubscribeHandler == nil { + return + } - if err := rc.SubscribeHandler(rc); err != nil { - log.Fatalf("Dial: connect handler failed with %s", err.Error()) - } + if err := rc.SubscribeHandler(rc); err != nil { + log.Fatalf("Dial: connect handler failed with %s", err.Error()) + } - log.Printf("Dial: connect handler was successfully established with %s\n", rc.url) + log.Printf("Dial: connect handler was successfully established with %s\n", rc.url) + } return } From 3698e5f7fe8e82a0b1edfcc36e4aea3c0824a43e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lucas=20L=C3=B6ffel?= Date: Tue, 9 Oct 2018 12:26:46 +0200 Subject: [PATCH 10/23] added lock to fix concurrency issue --- recws.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/recws.go b/recws.go index 0235d43..aaf3d69 100644 --- a/recws.go +++ b/recws.go @@ -141,9 +141,13 @@ func (rc *RecConn) ReadJSON(v interface{}) error { // (Cookie). Use GetHTTPResponse() method for the response.Header to get // the selected subprotocol (Sec-WebSocket-Protocol) and cookies (Set-Cookie). func (rc *RecConn) Dial(urlStr string, reqHeader http.Header) { + rc.mu.Lock() + defer rc.mu.Unlock() + if urlStr == "" { log.Fatal("Dial: Url cannot be empty") } + u, err := url.Parse(urlStr) if err != nil { From 3ec54aacf014fcaf6950c931f12bd7b4d36bba82 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lucas=20L=C3=B6ffel?= Date: Tue, 9 Oct 2018 13:06:23 +0200 Subject: [PATCH 11/23] added rwmutex --- recws.go | 165 ++++++++++++++++++++++++++++++++++++++----------------- 1 file changed, 115 insertions(+), 50 deletions(-) diff --git a/recws.go b/recws.go index aaf3d69..ebab96a 100644 --- a/recws.go +++ b/recws.go @@ -38,7 +38,7 @@ type RecConn struct { // SubscribeHandler fires after the connection successfully establish. SubscribeHandler func(rc *RecConn) error - mu sync.Mutex + mu sync.RWMutex url string reqHeader http.Header httpResp *http.Response @@ -55,15 +55,24 @@ func (rc *RecConn) closeAndReconnect() { go rc.connect() } +// setIsConnected sets state for isConnected +func (rc *RecConn) setIsConnected(state bool) { + rc.mu.Lock() + defer rc.mu.Unlock() + + rc.isConnected = state +} + // Close closes the underlying network connection without // sending or waiting for a close frame. func (rc *RecConn) Close() { - rc.mu.Lock() + rc.mu.RLock() + defer rc.mu.RUnlock() if rc.Conn != nil { rc.Conn.Close() } - rc.isConnected = false - rc.mu.Unlock() + + rc.setIsConnected(false) } // ReadMessage is a helper method for getting a reader @@ -135,53 +144,98 @@ func (rc *RecConn) ReadJSON(v interface{}) error { return err } +func (rc *RecConn) setURL(url string) { + rc.mu.Lock() + defer rc.mu.Unlock() + + rc.url = url +} + +// parseURL parses current url +func (rc *RecConn) parseURL(urlStr string) (string, error) { + if urlStr == "" { + return "", errors.New("dial: url cannot be empty") + } + + u, err := url.Parse(urlStr) + + if err != nil { + return "", errors.New("url: " + err.Error()) + } + + if u.Scheme != "ws" && u.Scheme != "wss" { + return "", errors.New("url: websocket uris must start with ws or wss scheme") + } + + if u.User != nil { + return "", errors.New("url: user name and password are not allowed in websocket URIs") + } + + return urlStr, nil +} + +func (rc *RecConn) setDefaultRecIntvlMin() { + rc.mu.Lock() + defer rc.mu.Unlock() + + if rc.RecIntvlMin == 0 { + rc.RecIntvlMin = 2 * time.Second + } +} + +func (rc *RecConn) setDefaultRecIntvlMax() { + rc.mu.Lock() + defer rc.mu.Unlock() + + if rc.RecIntvlMax == 0 { + rc.RecIntvlMax = 30 * time.Second + } +} + +func (rc *RecConn) setDefaultRecIntvlFactor() { + rc.mu.Lock() + defer rc.mu.Unlock() + + if rc.RecIntvlFactor == 0 { + rc.RecIntvlFactor = 1.5 + } +} + +func (rc *RecConn) setDefaultHandshakeTimeout() { + rc.mu.Lock() + defer rc.mu.Unlock() + + if rc.HandshakeTimeout == 0 { + rc.HandshakeTimeout = 2 * time.Second + } +} + +func (rc *RecConn) setDefaultDialer() { + rc.mu.Lock() + defer rc.mu.Unlock() + + rc.dialer = websocket.DefaultDialer + rc.dialer.HandshakeTimeout = rc.HandshakeTimeout +} + // Dial creates a new client connection. // The URL url specifies the host and request URI. Use requestHeader to specify // the origin (Origin), subprotocols (Sec-WebSocket-Protocol) and cookies // (Cookie). Use GetHTTPResponse() method for the response.Header to get // the selected subprotocol (Sec-WebSocket-Protocol) and cookies (Set-Cookie). func (rc *RecConn) Dial(urlStr string, reqHeader http.Header) { - rc.mu.Lock() - defer rc.mu.Unlock() - - if urlStr == "" { - log.Fatal("Dial: Url cannot be empty") - } - - u, err := url.Parse(urlStr) + urlStr, err := rc.parseURL(urlStr) if err != nil { - log.Fatal("Url:", err) + log.Fatalf("Dial: %v", err) } - if u.Scheme != "ws" && u.Scheme != "wss" { - log.Fatal("Url: websocket URIs must start with ws or wss scheme") - } - - if u.User != nil { - log.Fatal("Url: user name and password are not allowed in websocket URIs") - } - - rc.url = urlStr - - if rc.RecIntvlMin == 0 { - rc.RecIntvlMin = 2 * time.Second - } - - if rc.RecIntvlMax == 0 { - rc.RecIntvlMax = 30 * time.Second - } - - if rc.RecIntvlFactor == 0 { - rc.RecIntvlFactor = 1.5 - } - - if rc.HandshakeTimeout == 0 { - rc.HandshakeTimeout = 2 * time.Second - } - - rc.dialer = websocket.DefaultDialer - rc.dialer.HandshakeTimeout = rc.HandshakeTimeout + rc.setURL(urlStr) + rc.setDefaultRecIntvlMin() + rc.setDefaultRecIntvlMax() + rc.setDefaultRecIntvlFactor() + rc.setDefaultHandshakeTimeout() + rc.setDefaultDialer() go rc.connect() @@ -191,9 +245,19 @@ func (rc *RecConn) Dial(urlStr string, reqHeader http.Header) { // GetURL returns current connection url func (rc *RecConn) GetURL() string { + rc.mu.RLock() + defer rc.mu.RUnlock() + return rc.url } +func (rc *RecConn) getNonVerbose() bool { + rc.mu.RLock() + defer rc.mu.RUnlock() + + return rc.NonVerbose +} + func (rc *RecConn) connect() { b := &backoff.Backoff{ Min: rc.RecIntvlMin, @@ -206,8 +270,9 @@ func (rc *RecConn) connect() { for { nextItvl := b.Duration() - + rc.mu.RLock() wsConn, httpResp, err := rc.dialer.Dial(rc.url, rc.reqHeader) + rc.mu.RUnlock() rc.mu.Lock() rc.Conn = wsConn @@ -217,7 +282,7 @@ func (rc *RecConn) connect() { rc.mu.Unlock() if err == nil { - if !rc.NonVerbose { + if !rc.getNonVerbose() { log.Printf("Dial: connection was successfully established with %s\n", rc.url) if rc.SubscribeHandler == nil { @@ -234,7 +299,7 @@ func (rc *RecConn) connect() { return } - if !rc.NonVerbose { + if !rc.getNonVerbose() { log.Println(err) log.Println("Dial: will try again in", nextItvl, "seconds.") } @@ -247,8 +312,8 @@ func (rc *RecConn) connect() { // Useful when WebSocket handshake fails, // so that callers can handle redirects, authentication, etc. func (rc *RecConn) GetHTTPResponse() *http.Response { - rc.mu.Lock() - defer rc.mu.Unlock() + rc.mu.RLock() + defer rc.mu.RUnlock() return rc.httpResp } @@ -256,16 +321,16 @@ func (rc *RecConn) GetHTTPResponse() *http.Response { // GetDialError returns the last dialer error. // nil on successful connection. func (rc *RecConn) GetDialError() error { - rc.mu.Lock() - defer rc.mu.Unlock() + rc.mu.RLock() + defer rc.mu.RUnlock() return rc.dialErr } // IsConnected returns the WebSocket connection state func (rc *RecConn) IsConnected() bool { - rc.mu.Lock() - defer rc.mu.Unlock() + rc.mu.RLock() + defer rc.mu.RUnlock() return rc.isConnected } From 299794442ed5bd45ff1c4efaa8219df835c12780 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lucas=20L=C3=B6ffel?= Date: Tue, 9 Oct 2018 13:16:25 +0200 Subject: [PATCH 12/23] updated lock --- recws.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/recws.go b/recws.go index ebab96a..5969feb 100644 --- a/recws.go +++ b/recws.go @@ -230,6 +230,7 @@ func (rc *RecConn) Dial(urlStr string, reqHeader http.Header) { log.Fatalf("Dial: %v", err) } + // Config rc.setURL(urlStr) rc.setDefaultRecIntvlMin() rc.setDefaultRecIntvlMax() @@ -237,6 +238,7 @@ func (rc *RecConn) Dial(urlStr string, reqHeader http.Header) { rc.setDefaultHandshakeTimeout() rc.setDefaultDialer() + // Connect go rc.connect() // wait on first attempt @@ -270,9 +272,9 @@ func (rc *RecConn) connect() { for { nextItvl := b.Duration() - rc.mu.RLock() + rc.mu.Lock() wsConn, httpResp, err := rc.dialer.Dial(rc.url, rc.reqHeader) - rc.mu.RUnlock() + rc.mu.Unlock() rc.mu.Lock() rc.Conn = wsConn From 1986820ec3670f5097617b9e5ac7194a35880d4d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lucas=20L=C3=B6ffel?= Date: Tue, 9 Oct 2018 13:29:19 +0200 Subject: [PATCH 13/23] added getHandshakeTimeout --- recws.go | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/recws.go b/recws.go index 5969feb..d033aa2 100644 --- a/recws.go +++ b/recws.go @@ -211,11 +211,19 @@ func (rc *RecConn) setDefaultHandshakeTimeout() { } func (rc *RecConn) setDefaultDialer() { + handshakeTimeout := rc.getHandshakeTimeout() rc.mu.Lock() defer rc.mu.Unlock() rc.dialer = websocket.DefaultDialer - rc.dialer.HandshakeTimeout = rc.HandshakeTimeout + rc.dialer.HandshakeTimeout = handshakeTimeout +} + +func (rc *RecConn) getHandshakeTimeout() time.Duration { + rc.mu.RLock() + defer rc.mu.RUnlock() + + return rc.HandshakeTimeout } // Dial creates a new client connection. From 112dbdc9f6c27e246bcf066aca4b424d8ddb9dd0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lucas=20L=C3=B6ffel?= Date: Tue, 9 Oct 2018 14:04:37 +0200 Subject: [PATCH 14/23] rm param --- recws.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/recws.go b/recws.go index d033aa2..f0aade2 100644 --- a/recws.go +++ b/recws.go @@ -36,7 +36,7 @@ type RecConn struct { // NonVerbose suppress connecting/reconnecting messages. NonVerbose bool // SubscribeHandler fires after the connection successfully establish. - SubscribeHandler func(rc *RecConn) error + SubscribeHandler func() error mu sync.RWMutex url string @@ -299,7 +299,7 @@ func (rc *RecConn) connect() { return } - if err := rc.SubscribeHandler(rc); err != nil { + if err := rc.SubscribeHandler(); err != nil { log.Fatalf("Dial: connect handler failed with %s", err.Error()) } From 2e0bb1fcd55eb8235a8462e0e491394769848fb4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lucas=20L=C3=B6ffel?= Date: Tue, 9 Oct 2018 14:09:45 +0200 Subject: [PATCH 15/23] removed go routine for dial --- recws.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/recws.go b/recws.go index f0aade2..6c49e0f 100644 --- a/recws.go +++ b/recws.go @@ -247,7 +247,7 @@ func (rc *RecConn) Dial(urlStr string, reqHeader http.Header) { rc.setDefaultDialer() // Connect - go rc.connect() + rc.connect() // wait on first attempt time.Sleep(rc.HandshakeTimeout) @@ -280,9 +280,7 @@ func (rc *RecConn) connect() { for { nextItvl := b.Duration() - rc.mu.Lock() wsConn, httpResp, err := rc.dialer.Dial(rc.url, rc.reqHeader) - rc.mu.Unlock() rc.mu.Lock() rc.Conn = wsConn From 7b4667c944536ee40a83e823a67d63e9370dee7b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lucas=20L=C3=B6ffel?= Date: Tue, 9 Oct 2018 14:26:17 +0200 Subject: [PATCH 16/23] back to goroutine and added lock for get handshake --- recws.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/recws.go b/recws.go index 6c49e0f..1832575 100644 --- a/recws.go +++ b/recws.go @@ -247,10 +247,10 @@ func (rc *RecConn) Dial(urlStr string, reqHeader http.Header) { rc.setDefaultDialer() // Connect - rc.connect() + go rc.connect() // wait on first attempt - time.Sleep(rc.HandshakeTimeout) + time.Sleep(rc.getHandshakeTimeout()) } // GetURL returns current connection url From f463fb24970938937b587adf64b4a321dfc16006 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lucas=20L=C3=B6ffel?= Date: Tue, 9 Oct 2018 14:58:34 +0200 Subject: [PATCH 17/23] fixed websocket default dialer (fixed concurrency) --- recws.go | 23 ++++++++++++++++------- 1 file changed, 16 insertions(+), 7 deletions(-) diff --git a/recws.go b/recws.go index 1832575..ba97de5 100644 --- a/recws.go +++ b/recws.go @@ -210,13 +210,13 @@ func (rc *RecConn) setDefaultHandshakeTimeout() { } } -func (rc *RecConn) setDefaultDialer() { - handshakeTimeout := rc.getHandshakeTimeout() +func (rc *RecConn) setDefaultDialer(handshakeTimeout time.Duration) { rc.mu.Lock() defer rc.mu.Unlock() - rc.dialer = websocket.DefaultDialer - rc.dialer.HandshakeTimeout = handshakeTimeout + rc.dialer = &websocket.Dialer{ + HandshakeTimeout: handshakeTimeout, + } } func (rc *RecConn) getHandshakeTimeout() time.Duration { @@ -244,7 +244,7 @@ func (rc *RecConn) Dial(urlStr string, reqHeader http.Header) { rc.setDefaultRecIntvlMax() rc.setDefaultRecIntvlFactor() rc.setDefaultHandshakeTimeout() - rc.setDefaultDialer() + rc.setDefaultDialer(rc.getHandshakeTimeout()) // Connect go rc.connect() @@ -268,19 +268,28 @@ func (rc *RecConn) getNonVerbose() bool { return rc.NonVerbose } -func (rc *RecConn) connect() { - b := &backoff.Backoff{ +func (rc *RecConn) getBackoff() *backoff.Backoff { + rc.mu.RLock() + defer rc.mu.RUnlock() + + return &backoff.Backoff{ Min: rc.RecIntvlMin, Max: rc.RecIntvlMax, Factor: rc.RecIntvlFactor, Jitter: true, } +} + +func (rc *RecConn) connect() { + b := rc.getBackoff() rand.Seed(time.Now().UTC().UnixNano()) for { nextItvl := b.Duration() + rc.mu.RLock() wsConn, httpResp, err := rc.dialer.Dial(rc.url, rc.reqHeader) + rc.mu.RUnlock() rc.mu.Lock() rc.Conn = wsConn From 14925171deeb68a17a3be098d79d48e7b54568e5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lucas=20L=C3=B6ffel?= Date: Tue, 9 Oct 2018 15:01:06 +0200 Subject: [PATCH 18/23] added hasSubscribeHandler --- recws.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/recws.go b/recws.go index ba97de5..5d0e351 100644 --- a/recws.go +++ b/recws.go @@ -280,6 +280,13 @@ func (rc *RecConn) getBackoff() *backoff.Backoff { } } +func (rc *RecConn) hasSubscribeHandler() bool { + rc.mu.RLock() + defer rc.mu.RUnlock() + + return rc.SubscribeHandler != nil +} + func (rc *RecConn) connect() { b := rc.getBackoff() @@ -302,7 +309,7 @@ func (rc *RecConn) connect() { if !rc.getNonVerbose() { log.Printf("Dial: connection was successfully established with %s\n", rc.url) - if rc.SubscribeHandler == nil { + if !rc.hasSubscribeHandler() { return } From 37041c8dfcd1d1f0906c4358f0f598a7b62d6827 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lucas=20L=C3=B6ffel?= Date: Tue, 9 Oct 2018 15:04:49 +0200 Subject: [PATCH 19/23] removed mutex on func --- recws.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/recws.go b/recws.go index 5d0e351..b589d2a 100644 --- a/recws.go +++ b/recws.go @@ -294,9 +294,7 @@ func (rc *RecConn) connect() { for { nextItvl := b.Duration() - rc.mu.RLock() wsConn, httpResp, err := rc.dialer.Dial(rc.url, rc.reqHeader) - rc.mu.RUnlock() rc.mu.Lock() rc.Conn = wsConn From f073e940a675c3d3798c715e73210f566940c924 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lucas=20L=C3=B6ffel?= Date: Tue, 9 Oct 2018 15:07:24 +0200 Subject: [PATCH 20/23] removed whiteline --- recws.go | 1 - 1 file changed, 1 deletion(-) diff --git a/recws.go b/recws.go index b589d2a..9abf55a 100644 --- a/recws.go +++ b/recws.go @@ -289,7 +289,6 @@ func (rc *RecConn) hasSubscribeHandler() bool { func (rc *RecConn) connect() { b := rc.getBackoff() - rand.Seed(time.Now().UTC().UnixNano()) for { From 797d4c1d5108b2bc0e23cfc8fb40ac21dfaa2a60 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lucas=20L=C3=B6ffel?= Date: Tue, 9 Oct 2018 15:39:29 +0200 Subject: [PATCH 21/23] added basic example --- examples/basic.go | 46 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 46 insertions(+) create mode 100644 examples/basic.go diff --git a/examples/basic.go b/examples/basic.go new file mode 100644 index 0000000..6784f4b --- /dev/null +++ b/examples/basic.go @@ -0,0 +1,46 @@ +package main + +import ( + "context" + "github.com/loeffel-io/recws" + "log" + "time" +) + +func main() { + ctx, cancel := context.WithCancel(context.Background()) + ws := recws.RecConn{} + ws.Dial("wss://echo.websocket.org", nil) + + go func() { + time.Sleep(2 * time.Second) + cancel() + }() + + for { + select { + case <-ctx.Done(): + go ws.Close() + log.Printf("Websocket closed %s", ws.GetURL()) + return + default: + if !ws.IsConnected() { + log.Printf("Websocket disconnected %s", ws.GetURL()) + continue + } + + if err := ws.WriteMessage(1, []byte("Incoming")); err != nil { + log.Printf("Error: WriteMessage %s", ws.GetURL()) + return + } + + _, message, err := ws.ReadMessage() + if err != nil { + log.Printf("Error: ReadMessage %s", ws.GetURL()) + return + } + + log.Printf("Success: %s", message) + } + } +} From 6a3257a9967723dd13497a5102bcba4e2ea0f1d4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lucas=20L=C3=B6ffel?= Date: Tue, 9 Oct 2018 15:41:05 +0200 Subject: [PATCH 22/23] added example --- README.md | 51 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 51 insertions(+) diff --git a/README.md b/README.md index dc26c09..2448782 100644 --- a/README.md +++ b/README.md @@ -3,3 +3,54 @@ [![Go Report Card](https://goreportcard.com/badge/github.com/loeffel-io/recws)](https://goreportcard.com/report/github.com/loeffel-io/recws) Reconnecting WebSocket is a websocket client based on [gorilla/websocket](https://github.com/gorilla/websocket) that will automatically reconnect if the connection is dropped. + +## Basic example + +```go +package main + +import ( + "context" + "github.com/loeffel-io/recws" + "log" + "time" +) + +func main() { + ctx, cancel := context.WithCancel(context.Background()) + ws := recws.RecConn{} + ws.Dial("wss://echo.websocket.org", nil) + + go func() { + time.Sleep(2 * time.Second) + cancel() + }() + + for { + select { + case <-ctx.Done(): + go ws.Close() + log.Printf("Websocket closed %s", ws.GetURL()) + return + default: + if !ws.IsConnected() { + log.Printf("Websocket disconnected %s", ws.GetURL()) + continue + } + + if err := ws.WriteMessage(1, []byte("Incoming")); err != nil { + log.Printf("Error: WriteMessage %s", ws.GetURL()) + return + } + + _, message, err := ws.ReadMessage() + if err != nil { + log.Printf("Error: ReadMessage %s", ws.GetURL()) + return + } + + log.Printf("Success: %s", message) + } + } +} +``` \ No newline at end of file From c38eca71e74dc537ba673cc88b3c2ee6c99e63b4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lucas=20L=C3=B6ffel?= Date: Wed, 10 Oct 2018 13:38:22 +0200 Subject: [PATCH 23/23] added license badge --- README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/README.md b/README.md index 2448782..a146202 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,7 @@ # recws [![Go Report Card](https://goreportcard.com/badge/github.com/loeffel-io/recws)](https://goreportcard.com/report/github.com/loeffel-io/recws) +[![GitHub license](https://img.shields.io/github/license/Naereen/StrapDown.js.svg)](https://github.com/Naereen/StrapDown.js/blob/master/LICENSE) Reconnecting WebSocket is a websocket client based on [gorilla/websocket](https://github.com/gorilla/websocket) that will automatically reconnect if the connection is dropped.