2
0

added rwmutex

This commit is contained in:
Lucas Löffel
2018-10-09 13:06:23 +02:00
parent 3698e5f7fe
commit 3ec54aacf0
+115 -50
View File
@@ -38,7 +38,7 @@ type RecConn struct {
// SubscribeHandler fires after the connection successfully establish. // SubscribeHandler fires after the connection successfully establish.
SubscribeHandler func(rc *RecConn) error SubscribeHandler func(rc *RecConn) error
mu sync.Mutex mu sync.RWMutex
url string url string
reqHeader http.Header reqHeader http.Header
httpResp *http.Response httpResp *http.Response
@@ -55,15 +55,24 @@ func (rc *RecConn) closeAndReconnect() {
go rc.connect() go rc.connect()
} }
// setIsConnected sets state for isConnected
func (rc *RecConn) setIsConnected(state bool) {
rc.mu.Lock()
defer rc.mu.Unlock()
rc.isConnected = state
}
// Close closes the underlying network connection without // Close closes the underlying network connection without
// sending or waiting for a close frame. // sending or waiting for a close frame.
func (rc *RecConn) Close() { func (rc *RecConn) Close() {
rc.mu.Lock() rc.mu.RLock()
defer rc.mu.RUnlock()
if rc.Conn != nil { if rc.Conn != nil {
rc.Conn.Close() rc.Conn.Close()
} }
rc.isConnected = false
rc.mu.Unlock() rc.setIsConnected(false)
} }
// ReadMessage is a helper method for getting a reader // ReadMessage is a helper method for getting a reader
@@ -135,53 +144,98 @@ func (rc *RecConn) ReadJSON(v interface{}) error {
return err return err
} }
func (rc *RecConn) setURL(url string) {
rc.mu.Lock()
defer rc.mu.Unlock()
rc.url = url
}
// parseURL parses current url
func (rc *RecConn) parseURL(urlStr string) (string, error) {
if urlStr == "" {
return "", errors.New("dial: url cannot be empty")
}
u, err := url.Parse(urlStr)
if err != nil {
return "", errors.New("url: " + err.Error())
}
if u.Scheme != "ws" && u.Scheme != "wss" {
return "", errors.New("url: websocket uris must start with ws or wss scheme")
}
if u.User != nil {
return "", errors.New("url: user name and password are not allowed in websocket URIs")
}
return urlStr, nil
}
func (rc *RecConn) setDefaultRecIntvlMin() {
rc.mu.Lock()
defer rc.mu.Unlock()
if rc.RecIntvlMin == 0 {
rc.RecIntvlMin = 2 * time.Second
}
}
func (rc *RecConn) setDefaultRecIntvlMax() {
rc.mu.Lock()
defer rc.mu.Unlock()
if rc.RecIntvlMax == 0 {
rc.RecIntvlMax = 30 * time.Second
}
}
func (rc *RecConn) setDefaultRecIntvlFactor() {
rc.mu.Lock()
defer rc.mu.Unlock()
if rc.RecIntvlFactor == 0 {
rc.RecIntvlFactor = 1.5
}
}
func (rc *RecConn) setDefaultHandshakeTimeout() {
rc.mu.Lock()
defer rc.mu.Unlock()
if rc.HandshakeTimeout == 0 {
rc.HandshakeTimeout = 2 * time.Second
}
}
func (rc *RecConn) setDefaultDialer() {
rc.mu.Lock()
defer rc.mu.Unlock()
rc.dialer = websocket.DefaultDialer
rc.dialer.HandshakeTimeout = rc.HandshakeTimeout
}
// Dial creates a new client connection. // Dial creates a new client connection.
// The URL url specifies the host and request URI. Use requestHeader to specify // The URL url specifies the host and request URI. Use requestHeader to specify
// the origin (Origin), subprotocols (Sec-WebSocket-Protocol) and cookies // the origin (Origin), subprotocols (Sec-WebSocket-Protocol) and cookies
// (Cookie). Use GetHTTPResponse() method for the response.Header to get // (Cookie). Use GetHTTPResponse() method for the response.Header to get
// the selected subprotocol (Sec-WebSocket-Protocol) and cookies (Set-Cookie). // the selected subprotocol (Sec-WebSocket-Protocol) and cookies (Set-Cookie).
func (rc *RecConn) Dial(urlStr string, reqHeader http.Header) { func (rc *RecConn) Dial(urlStr string, reqHeader http.Header) {
rc.mu.Lock() urlStr, err := rc.parseURL(urlStr)
defer rc.mu.Unlock()
if urlStr == "" {
log.Fatal("Dial: Url cannot be empty")
}
u, err := url.Parse(urlStr)
if err != nil { if err != nil {
log.Fatal("Url:", err) log.Fatalf("Dial: %v", err)
} }
if u.Scheme != "ws" && u.Scheme != "wss" { rc.setURL(urlStr)
log.Fatal("Url: websocket URIs must start with ws or wss scheme") rc.setDefaultRecIntvlMin()
} rc.setDefaultRecIntvlMax()
rc.setDefaultRecIntvlFactor()
if u.User != nil { rc.setDefaultHandshakeTimeout()
log.Fatal("Url: user name and password are not allowed in websocket URIs") rc.setDefaultDialer()
}
rc.url = urlStr
if rc.RecIntvlMin == 0 {
rc.RecIntvlMin = 2 * time.Second
}
if rc.RecIntvlMax == 0 {
rc.RecIntvlMax = 30 * time.Second
}
if rc.RecIntvlFactor == 0 {
rc.RecIntvlFactor = 1.5
}
if rc.HandshakeTimeout == 0 {
rc.HandshakeTimeout = 2 * time.Second
}
rc.dialer = websocket.DefaultDialer
rc.dialer.HandshakeTimeout = rc.HandshakeTimeout
go rc.connect() go rc.connect()
@@ -191,9 +245,19 @@ func (rc *RecConn) Dial(urlStr string, reqHeader http.Header) {
// GetURL returns current connection url // GetURL returns current connection url
func (rc *RecConn) GetURL() string { func (rc *RecConn) GetURL() string {
rc.mu.RLock()
defer rc.mu.RUnlock()
return rc.url return rc.url
} }
func (rc *RecConn) getNonVerbose() bool {
rc.mu.RLock()
defer rc.mu.RUnlock()
return rc.NonVerbose
}
func (rc *RecConn) connect() { func (rc *RecConn) connect() {
b := &backoff.Backoff{ b := &backoff.Backoff{
Min: rc.RecIntvlMin, Min: rc.RecIntvlMin,
@@ -206,8 +270,9 @@ func (rc *RecConn) connect() {
for { for {
nextItvl := b.Duration() nextItvl := b.Duration()
rc.mu.RLock()
wsConn, httpResp, err := rc.dialer.Dial(rc.url, rc.reqHeader) wsConn, httpResp, err := rc.dialer.Dial(rc.url, rc.reqHeader)
rc.mu.RUnlock()
rc.mu.Lock() rc.mu.Lock()
rc.Conn = wsConn rc.Conn = wsConn
@@ -217,7 +282,7 @@ func (rc *RecConn) connect() {
rc.mu.Unlock() rc.mu.Unlock()
if err == nil { if err == nil {
if !rc.NonVerbose { if !rc.getNonVerbose() {
log.Printf("Dial: connection was successfully established with %s\n", rc.url) log.Printf("Dial: connection was successfully established with %s\n", rc.url)
if rc.SubscribeHandler == nil { if rc.SubscribeHandler == nil {
@@ -234,7 +299,7 @@ func (rc *RecConn) connect() {
return return
} }
if !rc.NonVerbose { if !rc.getNonVerbose() {
log.Println(err) log.Println(err)
log.Println("Dial: will try again in", nextItvl, "seconds.") log.Println("Dial: will try again in", nextItvl, "seconds.")
} }
@@ -247,8 +312,8 @@ func (rc *RecConn) connect() {
// Useful when WebSocket handshake fails, // Useful when WebSocket handshake fails,
// so that callers can handle redirects, authentication, etc. // so that callers can handle redirects, authentication, etc.
func (rc *RecConn) GetHTTPResponse() *http.Response { func (rc *RecConn) GetHTTPResponse() *http.Response {
rc.mu.Lock() rc.mu.RLock()
defer rc.mu.Unlock() defer rc.mu.RUnlock()
return rc.httpResp return rc.httpResp
} }
@@ -256,16 +321,16 @@ func (rc *RecConn) GetHTTPResponse() *http.Response {
// GetDialError returns the last dialer error. // GetDialError returns the last dialer error.
// nil on successful connection. // nil on successful connection.
func (rc *RecConn) GetDialError() error { func (rc *RecConn) GetDialError() error {
rc.mu.Lock() rc.mu.RLock()
defer rc.mu.Unlock() defer rc.mu.RUnlock()
return rc.dialErr return rc.dialErr
} }
// IsConnected returns the WebSocket connection state // IsConnected returns the WebSocket connection state
func (rc *RecConn) IsConnected() bool { func (rc *RecConn) IsConnected() bool {
rc.mu.Lock() rc.mu.RLock()
defer rc.mu.Unlock() defer rc.mu.RUnlock()
return rc.isConnected return rc.isConnected
} }