From 3ec54aacf014fcaf6950c931f12bd7b4d36bba82 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lucas=20L=C3=B6ffel?= Date: Tue, 9 Oct 2018 13:06:23 +0200 Subject: [PATCH] added rwmutex --- recws.go | 165 ++++++++++++++++++++++++++++++++++++++----------------- 1 file changed, 115 insertions(+), 50 deletions(-) diff --git a/recws.go b/recws.go index aaf3d69..ebab96a 100644 --- a/recws.go +++ b/recws.go @@ -38,7 +38,7 @@ type RecConn struct { // SubscribeHandler fires after the connection successfully establish. SubscribeHandler func(rc *RecConn) error - mu sync.Mutex + mu sync.RWMutex url string reqHeader http.Header httpResp *http.Response @@ -55,15 +55,24 @@ func (rc *RecConn) closeAndReconnect() { go rc.connect() } +// setIsConnected sets state for isConnected +func (rc *RecConn) setIsConnected(state bool) { + rc.mu.Lock() + defer rc.mu.Unlock() + + rc.isConnected = state +} + // Close closes the underlying network connection without // sending or waiting for a close frame. func (rc *RecConn) Close() { - rc.mu.Lock() + rc.mu.RLock() + defer rc.mu.RUnlock() if rc.Conn != nil { rc.Conn.Close() } - rc.isConnected = false - rc.mu.Unlock() + + rc.setIsConnected(false) } // ReadMessage is a helper method for getting a reader @@ -135,53 +144,98 @@ func (rc *RecConn) ReadJSON(v interface{}) error { return err } +func (rc *RecConn) setURL(url string) { + rc.mu.Lock() + defer rc.mu.Unlock() + + rc.url = url +} + +// parseURL parses current url +func (rc *RecConn) parseURL(urlStr string) (string, error) { + if urlStr == "" { + return "", errors.New("dial: url cannot be empty") + } + + u, err := url.Parse(urlStr) + + if err != nil { + return "", errors.New("url: " + err.Error()) + } + + if u.Scheme != "ws" && u.Scheme != "wss" { + return "", errors.New("url: websocket uris must start with ws or wss scheme") + } + + if u.User != nil { + return "", errors.New("url: user name and password are not allowed in websocket URIs") + } + + return urlStr, nil +} + +func (rc *RecConn) setDefaultRecIntvlMin() { + rc.mu.Lock() + defer rc.mu.Unlock() + + if rc.RecIntvlMin == 0 { + rc.RecIntvlMin = 2 * time.Second + } +} + +func (rc *RecConn) setDefaultRecIntvlMax() { + rc.mu.Lock() + defer rc.mu.Unlock() + + if rc.RecIntvlMax == 0 { + rc.RecIntvlMax = 30 * time.Second + } +} + +func (rc *RecConn) setDefaultRecIntvlFactor() { + rc.mu.Lock() + defer rc.mu.Unlock() + + if rc.RecIntvlFactor == 0 { + rc.RecIntvlFactor = 1.5 + } +} + +func (rc *RecConn) setDefaultHandshakeTimeout() { + rc.mu.Lock() + defer rc.mu.Unlock() + + if rc.HandshakeTimeout == 0 { + rc.HandshakeTimeout = 2 * time.Second + } +} + +func (rc *RecConn) setDefaultDialer() { + rc.mu.Lock() + defer rc.mu.Unlock() + + rc.dialer = websocket.DefaultDialer + rc.dialer.HandshakeTimeout = rc.HandshakeTimeout +} + // Dial creates a new client connection. // The URL url specifies the host and request URI. Use requestHeader to specify // the origin (Origin), subprotocols (Sec-WebSocket-Protocol) and cookies // (Cookie). Use GetHTTPResponse() method for the response.Header to get // the selected subprotocol (Sec-WebSocket-Protocol) and cookies (Set-Cookie). func (rc *RecConn) Dial(urlStr string, reqHeader http.Header) { - rc.mu.Lock() - defer rc.mu.Unlock() - - if urlStr == "" { - log.Fatal("Dial: Url cannot be empty") - } - - u, err := url.Parse(urlStr) + urlStr, err := rc.parseURL(urlStr) if err != nil { - log.Fatal("Url:", err) + log.Fatalf("Dial: %v", err) } - if u.Scheme != "ws" && u.Scheme != "wss" { - log.Fatal("Url: websocket URIs must start with ws or wss scheme") - } - - if u.User != nil { - log.Fatal("Url: user name and password are not allowed in websocket URIs") - } - - rc.url = urlStr - - if rc.RecIntvlMin == 0 { - rc.RecIntvlMin = 2 * time.Second - } - - if rc.RecIntvlMax == 0 { - rc.RecIntvlMax = 30 * time.Second - } - - if rc.RecIntvlFactor == 0 { - rc.RecIntvlFactor = 1.5 - } - - if rc.HandshakeTimeout == 0 { - rc.HandshakeTimeout = 2 * time.Second - } - - rc.dialer = websocket.DefaultDialer - rc.dialer.HandshakeTimeout = rc.HandshakeTimeout + rc.setURL(urlStr) + rc.setDefaultRecIntvlMin() + rc.setDefaultRecIntvlMax() + rc.setDefaultRecIntvlFactor() + rc.setDefaultHandshakeTimeout() + rc.setDefaultDialer() go rc.connect() @@ -191,9 +245,19 @@ func (rc *RecConn) Dial(urlStr string, reqHeader http.Header) { // GetURL returns current connection url func (rc *RecConn) GetURL() string { + rc.mu.RLock() + defer rc.mu.RUnlock() + return rc.url } +func (rc *RecConn) getNonVerbose() bool { + rc.mu.RLock() + defer rc.mu.RUnlock() + + return rc.NonVerbose +} + func (rc *RecConn) connect() { b := &backoff.Backoff{ Min: rc.RecIntvlMin, @@ -206,8 +270,9 @@ func (rc *RecConn) connect() { for { nextItvl := b.Duration() - + rc.mu.RLock() wsConn, httpResp, err := rc.dialer.Dial(rc.url, rc.reqHeader) + rc.mu.RUnlock() rc.mu.Lock() rc.Conn = wsConn @@ -217,7 +282,7 @@ func (rc *RecConn) connect() { rc.mu.Unlock() if err == nil { - if !rc.NonVerbose { + if !rc.getNonVerbose() { log.Printf("Dial: connection was successfully established with %s\n", rc.url) if rc.SubscribeHandler == nil { @@ -234,7 +299,7 @@ func (rc *RecConn) connect() { return } - if !rc.NonVerbose { + if !rc.getNonVerbose() { log.Println(err) log.Println("Dial: will try again in", nextItvl, "seconds.") } @@ -247,8 +312,8 @@ func (rc *RecConn) connect() { // Useful when WebSocket handshake fails, // so that callers can handle redirects, authentication, etc. func (rc *RecConn) GetHTTPResponse() *http.Response { - rc.mu.Lock() - defer rc.mu.Unlock() + rc.mu.RLock() + defer rc.mu.RUnlock() return rc.httpResp } @@ -256,16 +321,16 @@ func (rc *RecConn) GetHTTPResponse() *http.Response { // GetDialError returns the last dialer error. // nil on successful connection. func (rc *RecConn) GetDialError() error { - rc.mu.Lock() - defer rc.mu.Unlock() + rc.mu.RLock() + defer rc.mu.RUnlock() return rc.dialErr } // IsConnected returns the WebSocket connection state func (rc *RecConn) IsConnected() bool { - rc.mu.Lock() - defer rc.mu.Unlock() + rc.mu.RLock() + defer rc.mu.RUnlock() return rc.isConnected }