Use envelope as value

This commit is contained in:
Ola
2024-03-06 18:05:19 +01:00
parent 4c79256c40
commit 3823d1271f
4 changed files with 21 additions and 21 deletions
+4 -4
View File
@@ -61,10 +61,10 @@ func (ss *sessionSet) all() []*Session {
type hub struct { type hub struct {
sessions sessionSet sessions sessionSet
broadcast chan *envelope broadcast chan envelope
register chan *Session register chan *Session
unregister chan *Session unregister chan *Session
exit chan *envelope exit chan envelope
open atomic.Bool open atomic.Bool
} }
@@ -73,10 +73,10 @@ func newHub() *hub {
sessions: sessionSet{ sessions: sessionSet{
members: make(map[*Session]struct{}), members: make(map[*Session]struct{}),
}, },
broadcast: make(chan *envelope), broadcast: make(chan envelope),
register: make(chan *Session), register: make(chan *Session),
unregister: make(chan *Session), unregister: make(chan *Session),
exit: make(chan *envelope), exit: make(chan envelope),
} }
} }
+7 -7
View File
@@ -161,7 +161,7 @@ func (m *Melody) HandleRequestWithKeys(w http.ResponseWriter, r *http.Request, k
Request: r, Request: r,
Keys: keys, Keys: keys,
conn: conn, conn: conn,
output: make(chan *envelope, m.Config.MessageBufferSize), output: make(chan envelope, m.Config.MessageBufferSize),
outputDone: make(chan struct{}), outputDone: make(chan struct{}),
melody: m, melody: m,
open: true, open: true,
@@ -193,7 +193,7 @@ func (m *Melody) Broadcast(msg []byte) error {
return ErrClosed return ErrClosed
} }
message := &envelope{t: websocket.TextMessage, msg: msg} message := envelope{t: websocket.TextMessage, msg: msg}
m.hub.broadcast <- message m.hub.broadcast <- message
return nil return nil
@@ -205,7 +205,7 @@ func (m *Melody) BroadcastFilter(msg []byte, fn func(*Session) bool) error {
return ErrClosed return ErrClosed
} }
message := &envelope{t: websocket.TextMessage, msg: msg, filter: fn} message := envelope{t: websocket.TextMessage, msg: msg, filter: fn}
m.hub.broadcast <- message m.hub.broadcast <- message
return nil return nil
@@ -234,7 +234,7 @@ func (m *Melody) BroadcastBinary(msg []byte) error {
return ErrClosed return ErrClosed
} }
message := &envelope{t: websocket.BinaryMessage, msg: msg} message := envelope{t: websocket.BinaryMessage, msg: msg}
m.hub.broadcast <- message m.hub.broadcast <- message
return nil return nil
@@ -246,7 +246,7 @@ func (m *Melody) BroadcastBinaryFilter(msg []byte, fn func(*Session) bool) error
return ErrClosed return ErrClosed
} }
message := &envelope{t: websocket.BinaryMessage, msg: msg, filter: fn} message := envelope{t: websocket.BinaryMessage, msg: msg, filter: fn}
m.hub.broadcast <- message m.hub.broadcast <- message
return nil return nil
@@ -273,7 +273,7 @@ func (m *Melody) Close() error {
return ErrClosed return ErrClosed
} }
m.hub.exit <- &envelope{t: websocket.CloseMessage, msg: []byte{}} m.hub.exit <- envelope{t: websocket.CloseMessage, msg: []byte{}}
return nil return nil
} }
@@ -285,7 +285,7 @@ func (m *Melody) CloseWithMsg(msg []byte) error {
return ErrClosed return ErrClosed
} }
m.hub.exit <- &envelope{t: websocket.CloseMessage, msg: msg} m.hub.exit <- envelope{t: websocket.CloseMessage, msg: msg}
return nil return nil
} }
+2 -2
View File
@@ -573,8 +573,8 @@ func TestErrSessionClosed(t *testing.T) {
assert.ErrorIs(t, s.Close(), ErrSessionClosed) assert.ErrorIs(t, s.Close(), ErrSessionClosed)
assert.ErrorIs(t, ws.m.BroadcastMultiple(TestMsg, []*Session{s}), ErrSessionClosed) assert.ErrorIs(t, ws.m.BroadcastMultiple(TestMsg, []*Session{s}), ErrSessionClosed)
assert.ErrorIs(t, s.writeRaw(nil), ErrWriteClosed) assert.ErrorIs(t, s.writeRaw(envelope{}), ErrWriteClosed)
s.writeMessage(nil) s.writeMessage(envelope{})
} }
func TestErrMessageBufferFull(t *testing.T) { func TestErrMessageBufferFull(t *testing.T) {
+8 -8
View File
@@ -14,14 +14,14 @@ type Session struct {
Request *http.Request Request *http.Request
Keys map[string]interface{} Keys map[string]interface{}
conn *websocket.Conn conn *websocket.Conn
output chan *envelope output chan envelope
outputDone chan struct{} outputDone chan struct{}
melody *Melody melody *Melody
open bool open bool
rwmutex *sync.RWMutex rwmutex *sync.RWMutex
} }
func (s *Session) writeMessage(message *envelope) { func (s *Session) writeMessage(message envelope) {
if s.closed() { if s.closed() {
s.melody.errorHandler(s, ErrWriteClosed) s.melody.errorHandler(s, ErrWriteClosed)
return return
@@ -34,7 +34,7 @@ func (s *Session) writeMessage(message *envelope) {
} }
} }
func (s *Session) writeRaw(message *envelope) error { func (s *Session) writeRaw(message envelope) error {
if s.closed() { if s.closed() {
return ErrWriteClosed return ErrWriteClosed
} }
@@ -68,7 +68,7 @@ func (s *Session) close() {
} }
func (s *Session) ping() { func (s *Session) ping() {
s.writeRaw(&envelope{t: websocket.PingMessage, msg: []byte{}}) s.writeRaw(envelope{t: websocket.PingMessage, msg: []byte{}})
} }
func (s *Session) writePump() { func (s *Session) writePump() {
@@ -156,7 +156,7 @@ func (s *Session) Write(msg []byte) error {
return ErrSessionClosed return ErrSessionClosed
} }
s.writeMessage(&envelope{t: websocket.TextMessage, msg: msg}) s.writeMessage(envelope{t: websocket.TextMessage, msg: msg})
return nil return nil
} }
@@ -167,7 +167,7 @@ func (s *Session) WriteBinary(msg []byte) error {
return ErrSessionClosed return ErrSessionClosed
} }
s.writeMessage(&envelope{t: websocket.BinaryMessage, msg: msg}) s.writeMessage(envelope{t: websocket.BinaryMessage, msg: msg})
return nil return nil
} }
@@ -178,7 +178,7 @@ func (s *Session) Close() error {
return ErrSessionClosed return ErrSessionClosed
} }
s.writeMessage(&envelope{t: websocket.CloseMessage, msg: []byte{}}) s.writeMessage(envelope{t: websocket.CloseMessage, msg: []byte{}})
return nil return nil
} }
@@ -190,7 +190,7 @@ func (s *Session) CloseWithMsg(msg []byte) error {
return ErrSessionClosed return ErrSessionClosed
} }
s.writeMessage(&envelope{t: websocket.CloseMessage, msg: msg}) s.writeMessage(envelope{t: websocket.CloseMessage, msg: msg})
return nil return nil
} }