mirror of
https://github.com/go-vikunja/vikunja.git
synced 2026-05-07 12:37:14 -05:00
Add the core WebSocket infrastructure: - Message type definitions for the wire protocol (subscribe, unsubscribe, auth, error, push events) - In-memory connection hub that tracks per-user connections and routes messages to subscribed clients - Connection wrapper with auth-after-connect flow: connections start unauthenticated, client sends JWT as first message, only then can subscribe to event topics Includes auth timeout (30s), shared cancellation context for read/write loops, hub map cleanup on last connection removal, and proper error delivery before closing on auth failure.
86 lines
2.5 KiB
Go
86 lines
2.5 KiB
Go
// Vikunja is a to-do list application to facilitate your life.
|
|
// Copyright 2018-present Vikunja and contributors. All rights reserved.
|
|
//
|
|
// This program is free software: you can redistribute it and/or modify
|
|
// it under the terms of the GNU Affero General Public License as published by
|
|
// the Free Software Foundation, either version 3 of the License, or
|
|
// (at your option) any later version.
|
|
//
|
|
// This program is distributed in the hope that it will be useful,
|
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
// GNU Affero General Public License for more details.
|
|
//
|
|
// You should have received a copy of the GNU Affero General Public License
|
|
// along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
|
|
package websocket
|
|
|
|
import (
|
|
"sync"
|
|
|
|
"code.vikunja.io/api/pkg/log"
|
|
)
|
|
|
|
// Hub maintains the set of active connections and delivers messages to them.
|
|
type Hub struct {
|
|
mu sync.RWMutex
|
|
connections map[int64][]*Connection // userID -> connections
|
|
}
|
|
|
|
// NewHub creates a new Hub.
|
|
func NewHub() *Hub {
|
|
return &Hub{
|
|
connections: make(map[int64][]*Connection),
|
|
}
|
|
}
|
|
|
|
// Register adds a connection to the hub.
|
|
func (h *Hub) Register(conn *Connection) {
|
|
h.mu.Lock()
|
|
defer h.mu.Unlock()
|
|
h.connections[conn.userID] = append(h.connections[conn.userID], conn)
|
|
log.Debugf("WebSocket: registered connection for user %d (total: %d)", conn.userID, len(h.connections[conn.userID]))
|
|
}
|
|
|
|
// Unregister removes a connection from the hub.
|
|
func (h *Hub) Unregister(conn *Connection) {
|
|
h.mu.Lock()
|
|
defer h.mu.Unlock()
|
|
conns := h.connections[conn.userID]
|
|
for i, c := range conns {
|
|
if c == conn {
|
|
h.connections[conn.userID] = append(conns[:i], conns[i+1:]...)
|
|
break
|
|
}
|
|
}
|
|
remaining := len(h.connections[conn.userID])
|
|
if remaining == 0 {
|
|
delete(h.connections, conn.userID)
|
|
}
|
|
log.Debugf("WebSocket: unregistered connection for user %d (remaining: %d)", conn.userID, remaining)
|
|
}
|
|
|
|
// PublishForUser sends an event to all connections of a specific user that are subscribed to the given event.
|
|
func (h *Hub) PublishForUser(userID int64, event string, data any) {
|
|
h.mu.RLock()
|
|
defer h.mu.RUnlock()
|
|
|
|
conns := h.connections[userID]
|
|
msg := OutgoingMessage{
|
|
Event: event,
|
|
Data: data,
|
|
}
|
|
|
|
for _, conn := range conns {
|
|
if !conn.IsSubscribed(event) {
|
|
continue
|
|
}
|
|
select {
|
|
case conn.send <- msg:
|
|
default:
|
|
log.Warningf("WebSocket: send buffer full for user %d, dropping message", userID)
|
|
}
|
|
}
|
|
}
|