Browse Source

refactor: Improve WebSocket connection and message handling

- Restructure WebSocket code for better readability and maintainability
- Extract connection registration, unregistration, and broadcast logic into separate functions
- Improve error handling and logging for WebSocket operations
- Simplify message broadcasting and connection management
- Add more descriptive logging for WebSocket events
pull/271/head
Aldino Kemal 1 year ago
parent
commit
a1489e70ed
  1. 100
      src/internal/websocket/websocket.go

100
src/internal/websocket/websocket.go

@ -3,102 +3,111 @@ package websocket
import ( import (
"context" "context"
"encoding/json" "encoding/json"
"log"
domainApp "github.com/aldinokemal/go-whatsapp-web-multidevice/domains/app" domainApp "github.com/aldinokemal/go-whatsapp-web-multidevice/domains/app"
"github.com/gofiber/fiber/v2" "github.com/gofiber/fiber/v2"
"github.com/gofiber/websocket/v2" "github.com/gofiber/websocket/v2"
"log"
) )
type client struct{} // Add more data to this type if needed
type client struct{}
type BroadcastMessage struct { type BroadcastMessage struct {
Code string `json:"code"` Code string `json:"code"`
Message string `json:"message"` Message string `json:"message"`
Result any `json:"result"` Result any `json:"result"`
} }
var Clients = make(map[*websocket.Conn]client) // Note: although large maps with pointer-like types (e.g. strings) as keys are slow, using pointers themselves as keys is acceptable and fast
var Register = make(chan *websocket.Conn)
var Broadcast = make(chan BroadcastMessage)
var Unregister = make(chan *websocket.Conn)
var (
Clients = make(map[*websocket.Conn]client)
Register = make(chan *websocket.Conn)
Broadcast = make(chan BroadcastMessage)
Unregister = make(chan *websocket.Conn)
)
func RunHub() {
for {
select {
case connection := <-Register:
Clients[connection] = client{}
func handleRegister(conn *websocket.Conn) {
Clients[conn] = client{}
log.Println("connection registered") log.Println("connection registered")
}
case message := <-Broadcast:
log.Println("message received:", message)
func handleUnregister(conn *websocket.Conn) {
delete(Clients, conn)
log.Println("connection unregistered")
}
func broadcastMessage(message BroadcastMessage) {
marshalMessage, err := json.Marshal(message) marshalMessage, err := json.Marshal(message)
if err != nil { if err != nil {
log.Println("write error:", err)
log.Println("marshal error:", err)
return return
} }
// Send the message to all clients
for connection := range Clients {
if err := connection.WriteMessage(websocket.TextMessage, marshalMessage); err != nil {
for conn := range Clients {
if err := conn.WriteMessage(websocket.TextMessage, marshalMessage); err != nil {
log.Println("write error:", err) log.Println("write error:", err)
err := connection.WriteMessage(websocket.CloseMessage, []byte{})
if err != nil {
log.Println("write message close error:", err)
return
closeConnection(conn)
} }
err = connection.Close()
if err != nil {
log.Println("close error:", err)
return
} }
delete(Clients, connection)
} }
func closeConnection(conn *websocket.Conn) {
if err := conn.WriteMessage(websocket.CloseMessage, []byte{}); err != nil {
log.Println("write close message error:", err)
}
if err := conn.Close(); err != nil {
log.Println("close connection error:", err)
}
delete(Clients, conn)
} }
case connection := <-Unregister:
// Remove the client from the hub
delete(Clients, connection)
func RunHub() {
for {
select {
case conn := <-Register:
handleRegister(conn)
log.Println("connection unregistered")
case conn := <-Unregister:
handleUnregister(conn)
case message := <-Broadcast:
log.Println("message received:", message)
broadcastMessage(message)
} }
} }
} }
func RegisterRoutes(app *fiber.App, service domainApp.IAppService) { func RegisterRoutes(app *fiber.App, service domainApp.IAppService) {
app.Use("/ws", func(c *fiber.Ctx) error { app.Use("/ws", func(c *fiber.Ctx) error {
if websocket.IsWebSocketUpgrade(c) { // Returns true if the client requested upgrade to the WebSocket protocol
if websocket.IsWebSocketUpgrade(c) {
return c.Next() return c.Next()
} }
return c.SendStatus(fiber.StatusUpgradeRequired) return c.SendStatus(fiber.StatusUpgradeRequired)
}) })
app.Get("/ws", websocket.New(func(c *websocket.Conn) {
// When the function returns, unregister the client and close the connection
app.Get("/ws", websocket.New(func(conn *websocket.Conn) {
defer func() { defer func() {
Unregister <- c
_ = c.Close()
Unregister <- conn
_ = conn.Close()
}() }()
// Register the client
Register <- c
Register <- conn
for { for {
messageType, message, err := c.ReadMessage()
messageType, message, err := conn.ReadMessage()
if err != nil { if err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) { if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
log.Println("read error:", err) log.Println("read error:", err)
} }
return // Calls the deferred function, i.e. closes the connection on error
return
} }
if messageType == websocket.TextMessage { if messageType == websocket.TextMessage {
// Broadcast the received message
var messageData BroadcastMessage var messageData BroadcastMessage
err := json.Unmarshal(message, &messageData)
if err != nil {
log.Println("error unmarshal message:", err)
if err := json.Unmarshal(message, &messageData); err != nil {
log.Println("unmarshal error:", err)
return return
} }
if messageData.Code == "FETCH_DEVICES" { if messageData.Code == "FETCH_DEVICES" {
devices, _ := service.FetchDevices(context.Background()) devices, _ := service.FetchDevices(context.Background())
Broadcast <- BroadcastMessage{ Broadcast <- BroadcastMessage{
@ -107,9 +116,8 @@ func RegisterRoutes(app *fiber.App, service domainApp.IAppService) {
Result: devices, Result: devices,
} }
} }
} else { } else {
log.Println("websocket message received of type", messageType)
log.Println("unsupported message type:", messageType)
} }
} }
})) }))

Loading…
Cancel
Save