From a1489e70edba73af65bd7080fa77e7e5c8abc050 Mon Sep 17 00:00:00 2001 From: Aldino Kemal Date: Sat, 1 Mar 2025 07:15:41 +0700 Subject: [PATCH] refactor: Improve WebSocket connection and message handling - Restructure WebSocket code for better readability and maintainability - Extract connection registration, unregistration, and broadcast logic into separate functions - Improve error handling and logging for WebSocket operations - Simplify message broadcasting and connection management - Add more descriptive logging for WebSocket events --- src/internal/websocket/websocket.go | 116 +++++++++++++++------------- 1 file changed, 62 insertions(+), 54 deletions(-) diff --git a/src/internal/websocket/websocket.go b/src/internal/websocket/websocket.go index 15f337c..4c3fe3a 100644 --- a/src/internal/websocket/websocket.go +++ b/src/internal/websocket/websocket.go @@ -3,102 +3,111 @@ package websocket import ( "context" "encoding/json" + "log" + domainApp "github.com/aldinokemal/go-whatsapp-web-multidevice/domains/app" "github.com/gofiber/fiber/v2" "github.com/gofiber/websocket/v2" - "log" ) -type client struct{} // Add more data to this type if needed +type client struct{} + type BroadcastMessage struct { Code string `json:"code"` Message string `json:"message"` Result any `json:"result"` } -var Clients = make(map[*websocket.Conn]client) // Note: although large maps with pointer-like types (e.g. strings) as keys are slow, using pointers themselves as keys is acceptable and fast -var Register = make(chan *websocket.Conn) -var Broadcast = make(chan BroadcastMessage) -var Unregister = make(chan *websocket.Conn) +var ( + Clients = make(map[*websocket.Conn]client) + Register = make(chan *websocket.Conn) + Broadcast = make(chan BroadcastMessage) + Unregister = make(chan *websocket.Conn) +) + +func handleRegister(conn *websocket.Conn) { + Clients[conn] = client{} + log.Println("connection registered") +} + +func handleUnregister(conn *websocket.Conn) { + delete(Clients, conn) + log.Println("connection unregistered") +} + +func broadcastMessage(message BroadcastMessage) { + marshalMessage, err := json.Marshal(message) + if err != nil { + log.Println("marshal error:", err) + return + } + + for conn := range Clients { + if err := conn.WriteMessage(websocket.TextMessage, marshalMessage); err != nil { + log.Println("write error:", err) + closeConnection(conn) + } + } +} + +func closeConnection(conn *websocket.Conn) { + if err := conn.WriteMessage(websocket.CloseMessage, []byte{}); err != nil { + log.Println("write close message error:", err) + } + if err := conn.Close(); err != nil { + log.Println("close connection error:", err) + } + delete(Clients, conn) +} func RunHub() { for { select { - case connection := <-Register: - Clients[connection] = client{} - log.Println("connection registered") + case conn := <-Register: + handleRegister(conn) + + case conn := <-Unregister: + handleUnregister(conn) case message := <-Broadcast: log.Println("message received:", message) - marshalMessage, err := json.Marshal(message) - if err != nil { - log.Println("write error:", err) - return - } - - // Send the message to all clients - for connection := range Clients { - if err := connection.WriteMessage(websocket.TextMessage, marshalMessage); err != nil { - log.Println("write error:", err) - - err := connection.WriteMessage(websocket.CloseMessage, []byte{}) - if err != nil { - log.Println("write message close error:", err) - return - } - err = connection.Close() - if err != nil { - log.Println("close error:", err) - return - } - delete(Clients, connection) - } - } - - case connection := <-Unregister: - // Remove the client from the hub - delete(Clients, connection) - - log.Println("connection unregistered") + broadcastMessage(message) } } } func RegisterRoutes(app *fiber.App, service domainApp.IAppService) { app.Use("/ws", func(c *fiber.Ctx) error { - if websocket.IsWebSocketUpgrade(c) { // Returns true if the client requested upgrade to the WebSocket protocol + if websocket.IsWebSocketUpgrade(c) { return c.Next() } return c.SendStatus(fiber.StatusUpgradeRequired) }) - app.Get("/ws", websocket.New(func(c *websocket.Conn) { - // When the function returns, unregister the client and close the connection + app.Get("/ws", websocket.New(func(conn *websocket.Conn) { defer func() { - Unregister <- c - _ = c.Close() + Unregister <- conn + _ = conn.Close() }() - // Register the client - Register <- c + Register <- conn for { - messageType, message, err := c.ReadMessage() + messageType, message, err := conn.ReadMessage() if err != nil { if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) { log.Println("read error:", err) } - return // Calls the deferred function, i.e. closes the connection on error + return } if messageType == websocket.TextMessage { - // Broadcast the received message var messageData BroadcastMessage - err := json.Unmarshal(message, &messageData) - if err != nil { - log.Println("error unmarshal message:", err) + if err := json.Unmarshal(message, &messageData); err != nil { + log.Println("unmarshal error:", err) return } + if messageData.Code == "FETCH_DEVICES" { devices, _ := service.FetchDevices(context.Background()) Broadcast <- BroadcastMessage{ @@ -107,9 +116,8 @@ func RegisterRoutes(app *fiber.App, service domainApp.IAppService) { Result: devices, } } - } else { - log.Println("websocket message received of type", messageType) + log.Println("unsupported message type:", messageType) } } }))