From f3ccd79a30324d71b007a4d695bb66431555a830 Mon Sep 17 00:00:00 2001 From: Ola <1386739+olahol@users.noreply.github.com> Date: Mon, 12 Feb 2024 18:58:22 +0100 Subject: [PATCH] Concurrent message handling --- config.go | 11 +++---- melody.go | 5 ++++ melody_test.go | 81 ++++++++++++++++++++++++++++++++++++++++++++++++++ session.go | 17 +++++++---- 4 files changed, 104 insertions(+), 10 deletions(-) diff --git a/config.go b/config.go index 81ebd05..18814d7 100644 --- a/config.go +++ b/config.go @@ -4,11 +4,12 @@ import "time" // Config melody configuration struct. type Config struct { - WriteWait time.Duration // Milliseconds until write times out. - PongWait time.Duration // Timeout for waiting on pong. - PingPeriod time.Duration // Milliseconds between pings. - MaxMessageSize int64 // Maximum size in bytes of a message. - MessageBufferSize int // The max amount of messages that can be in a sessions buffer before it starts dropping them. + WriteWait time.Duration // Duration until write times out. + PongWait time.Duration // Timeout for waiting on pong. + PingPeriod time.Duration // Duration between pings. + MaxMessageSize int64 // Maximum size in bytes of a message. + MessageBufferSize int // The max amount of messages that can be in a sessions buffer before it starts dropping them. + ConcurrentMessageHandling bool // Handle messages from sessions concurrently. } func newConfig() *Config { diff --git a/melody.go b/melody.go index ca2c5dd..991e9c2 100644 --- a/melody.go +++ b/melody.go @@ -112,6 +112,11 @@ func (m *Melody) HandlePong(fn func(*Session)) { } // HandleMessage fires fn when a text message comes in. +// NOTE: by default Melody handles messages sequentially for each +// session. This has the effect that a message handler exceeding the +// read deadline (Config.PongWait, by default 1 minute) will time out +// the session. Concurrent message handling can be turned on by setting +// Config.ConcurrentMessageHandling to true. func (m *Melody) HandleMessage(fn func(*Session, []byte)) { m.messageHandler = fn } diff --git a/melody_test.go b/melody_test.go index d1098cf..e256272 100644 --- a/melody_test.go +++ b/melody_test.go @@ -765,3 +765,84 @@ func TestHandleSentMessage(t *testing.T) { conn.WriteMessage(websocket.BinaryMessage, TestMsg) }) } + +func TestConcurrentMessageHandling(t *testing.T) { + testTimeout := func(cmh bool, msgType int) bool { + base := time.Millisecond * 100 + done := make(chan struct{}) + + handler := func(s *Session, msg []byte) { + if len(msg) == 0 { + done <- struct{}{} + return + } + + time.Sleep(base * 2) + } + + ws := NewTestServerHandler(func(session *Session, msg []byte) {}) + if msgType == websocket.TextMessage { + ws.m.HandleMessage(handler) + } else { + ws.m.HandleMessageBinary(handler) + } + + ws.m.Config.ConcurrentMessageHandling = cmh + ws.m.Config.PongWait = base + + var errorSet bool + ws.m.HandleError(func(s *Session, err error) { + errorSet = true + done <- struct{}{} + }) + + server := httptest.NewServer(ws) + defer server.Close() + + conn := MustNewDialer(server.URL) + defer conn.Close() + + conn.WriteMessage(msgType, TestMsg) + conn.WriteMessage(msgType, TestMsg) + + time.Sleep(base / 4) + + conn.WriteMessage(msgType, nil) + + <-done + + return errorSet + } + + t.Run("text should error", func(t *testing.T) { + errorSet := testTimeout(false, websocket.TextMessage) + + if !errorSet { + t.FailNow() + } + }) + + t.Run("text should not error", func(t *testing.T) { + errorSet := testTimeout(true, websocket.TextMessage) + + if errorSet { + t.FailNow() + } + }) + + t.Run("binary should error", func(t *testing.T) { + errorSet := testTimeout(false, websocket.BinaryMessage) + + if !errorSet { + t.FailNow() + } + }) + + t.Run("binary should not error", func(t *testing.T) { + errorSet := testTimeout(true, websocket.BinaryMessage) + + if errorSet { + t.FailNow() + } + }) +} diff --git a/session.go b/session.go index ad79d54..36de197 100644 --- a/session.go +++ b/session.go @@ -133,13 +133,20 @@ func (s *Session) readPump() { break } - if t == websocket.TextMessage { - s.melody.messageHandler(s, message) + if s.melody.Config.ConcurrentMessageHandling { + go s.handleMessage(t, message) + } else { + s.handleMessage(t, message) } + } +} - if t == websocket.BinaryMessage { - s.melody.messageHandlerBinary(s, message) - } +func (s *Session) handleMessage(t int, message []byte) { + switch t { + case websocket.TextMessage: + s.melody.messageHandler(s, message) + case websocket.BinaryMessage: + s.melody.messageHandlerBinary(s, message) } }