mirror of
https://github.com/fosrl/olm.git
synced 2026-03-22 06:10:48 -05:00
Compare commits
2 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7d83518951 | ||
|
|
964532777a |
@@ -205,12 +205,13 @@ func (o *Olm) handleConnect(msg websocket.WSMessage) {
|
||||
// Register JIT handler: when the DNS proxy resolves a local record, check whether
|
||||
// the owning site is already connected and, if not, initiate a JIT connection.
|
||||
o.dnsProxy.SetJITHandler(func(siteId int) {
|
||||
if o.peerManager == nil || o.websocket == nil {
|
||||
pm := o.getPeerManager()
|
||||
if pm == nil || o.websocket == nil {
|
||||
return
|
||||
}
|
||||
|
||||
// Site already has an active peer connection - nothing to do.
|
||||
if _, exists := o.peerManager.GetPeer(siteId); exists {
|
||||
if _, exists := pm.GetPeer(siteId); exists {
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
49
olm/data.go
49
olm/data.go
@@ -32,21 +32,27 @@ func (o *Olm) handleWgPeerAddData(msg websocket.WSMessage) {
|
||||
return
|
||||
}
|
||||
|
||||
if _, exists := o.peerManager.GetPeer(addSubnetsData.SiteId); !exists {
|
||||
pm := o.getPeerManager()
|
||||
if pm == nil {
|
||||
logger.Debug("Ignoring add-remote-subnets-aliases message: peerManager is nil (shutdown in progress)")
|
||||
return
|
||||
}
|
||||
|
||||
if _, exists := pm.GetPeer(addSubnetsData.SiteId); !exists {
|
||||
logger.Debug("Peer %d not found for removing remote subnets and aliases", addSubnetsData.SiteId)
|
||||
return
|
||||
}
|
||||
|
||||
// Add new subnets
|
||||
for _, subnet := range addSubnetsData.RemoteSubnets {
|
||||
if err := o.peerManager.AddRemoteSubnet(addSubnetsData.SiteId, subnet); err != nil {
|
||||
if err := pm.AddRemoteSubnet(addSubnetsData.SiteId, subnet); err != nil {
|
||||
logger.Error("Failed to add allowed IP %s: %v", subnet, err)
|
||||
}
|
||||
}
|
||||
|
||||
// Add new aliases
|
||||
for _, alias := range addSubnetsData.Aliases {
|
||||
if err := o.peerManager.AddAlias(addSubnetsData.SiteId, alias); err != nil {
|
||||
if err := pm.AddAlias(addSubnetsData.SiteId, alias); err != nil {
|
||||
logger.Error("Failed to add alias %s: %v", alias.Alias, err)
|
||||
}
|
||||
}
|
||||
@@ -73,21 +79,27 @@ func (o *Olm) handleWgPeerRemoveData(msg websocket.WSMessage) {
|
||||
return
|
||||
}
|
||||
|
||||
if _, exists := o.peerManager.GetPeer(removeSubnetsData.SiteId); !exists {
|
||||
pm := o.getPeerManager()
|
||||
if pm == nil {
|
||||
logger.Debug("Ignoring remove-remote-subnets-aliases message: peerManager is nil (shutdown in progress)")
|
||||
return
|
||||
}
|
||||
|
||||
if _, exists := pm.GetPeer(removeSubnetsData.SiteId); !exists {
|
||||
logger.Debug("Peer %d not found for removing remote subnets and aliases", removeSubnetsData.SiteId)
|
||||
return
|
||||
}
|
||||
|
||||
// Remove subnets
|
||||
for _, subnet := range removeSubnetsData.RemoteSubnets {
|
||||
if err := o.peerManager.RemoveRemoteSubnet(removeSubnetsData.SiteId, subnet); err != nil {
|
||||
if err := pm.RemoveRemoteSubnet(removeSubnetsData.SiteId, subnet); err != nil {
|
||||
logger.Error("Failed to remove allowed IP %s: %v", subnet, err)
|
||||
}
|
||||
}
|
||||
|
||||
// Remove aliases
|
||||
for _, alias := range removeSubnetsData.Aliases {
|
||||
if err := o.peerManager.RemoveAlias(removeSubnetsData.SiteId, alias.Alias); err != nil {
|
||||
if err := pm.RemoveAlias(removeSubnetsData.SiteId, alias.Alias); err != nil {
|
||||
logger.Error("Failed to remove alias %s: %v", alias.Alias, err)
|
||||
}
|
||||
}
|
||||
@@ -114,7 +126,13 @@ func (o *Olm) handleWgPeerUpdateData(msg websocket.WSMessage) {
|
||||
return
|
||||
}
|
||||
|
||||
if _, exists := o.peerManager.GetPeer(updateSubnetsData.SiteId); !exists {
|
||||
pm := o.getPeerManager()
|
||||
if pm == nil {
|
||||
logger.Debug("Ignoring update-remote-subnets-aliases message: peerManager is nil (shutdown in progress)")
|
||||
return
|
||||
}
|
||||
|
||||
if _, exists := pm.GetPeer(updateSubnetsData.SiteId); !exists {
|
||||
logger.Debug("Peer %d not found for updating remote subnets and aliases", updateSubnetsData.SiteId)
|
||||
return
|
||||
}
|
||||
@@ -123,14 +141,14 @@ func (o *Olm) handleWgPeerUpdateData(msg websocket.WSMessage) {
|
||||
// This ensures that if an old and new subnet are the same on different peers,
|
||||
// the route won't be temporarily removed
|
||||
for _, subnet := range updateSubnetsData.NewRemoteSubnets {
|
||||
if err := o.peerManager.AddRemoteSubnet(updateSubnetsData.SiteId, subnet); err != nil {
|
||||
if err := pm.AddRemoteSubnet(updateSubnetsData.SiteId, subnet); err != nil {
|
||||
logger.Error("Failed to add allowed IP %s: %v", subnet, err)
|
||||
}
|
||||
}
|
||||
|
||||
// Remove old subnets after new ones are added
|
||||
for _, subnet := range updateSubnetsData.OldRemoteSubnets {
|
||||
if err := o.peerManager.RemoveRemoteSubnet(updateSubnetsData.SiteId, subnet); err != nil {
|
||||
if err := pm.RemoveRemoteSubnet(updateSubnetsData.SiteId, subnet); err != nil {
|
||||
logger.Error("Failed to remove allowed IP %s: %v", subnet, err)
|
||||
}
|
||||
}
|
||||
@@ -139,14 +157,14 @@ func (o *Olm) handleWgPeerUpdateData(msg websocket.WSMessage) {
|
||||
// This ensures that if an old and new alias share the same IP, the IP won't be
|
||||
// temporarily removed from the allowed IPs list
|
||||
for _, alias := range updateSubnetsData.NewAliases {
|
||||
if err := o.peerManager.AddAlias(updateSubnetsData.SiteId, alias); err != nil {
|
||||
if err := pm.AddAlias(updateSubnetsData.SiteId, alias); err != nil {
|
||||
logger.Error("Failed to add alias %s: %v", alias.Alias, err)
|
||||
}
|
||||
}
|
||||
|
||||
// Remove old aliases after new ones are added
|
||||
for _, alias := range updateSubnetsData.OldAliases {
|
||||
if err := o.peerManager.RemoveAlias(updateSubnetsData.SiteId, alias.Alias); err != nil {
|
||||
if err := pm.RemoveAlias(updateSubnetsData.SiteId, alias.Alias); err != nil {
|
||||
logger.Error("Failed to remove alias %s: %v", alias.Alias, err)
|
||||
}
|
||||
}
|
||||
@@ -163,7 +181,8 @@ func (o *Olm) handleSync(msg websocket.WSMessage) {
|
||||
return
|
||||
}
|
||||
|
||||
if o.peerManager == nil {
|
||||
pm := o.getPeerManager()
|
||||
if pm == nil {
|
||||
logger.Warn("Peer manager not initialized, ignoring sync request")
|
||||
return
|
||||
}
|
||||
@@ -190,7 +209,7 @@ func (o *Olm) handleSync(msg websocket.WSMessage) {
|
||||
}
|
||||
|
||||
// Get all current peers
|
||||
currentPeers := o.peerManager.GetAllPeers()
|
||||
currentPeers := pm.GetAllPeers()
|
||||
currentPeerMap := make(map[int]peers.SiteConfig)
|
||||
for _, peer := range currentPeers {
|
||||
currentPeerMap[peer.SiteId] = peer
|
||||
@@ -200,7 +219,7 @@ func (o *Olm) handleSync(msg websocket.WSMessage) {
|
||||
for siteId := range currentPeerMap {
|
||||
if _, exists := expectedPeers[siteId]; !exists {
|
||||
logger.Info("Sync: Removing peer for site %d (no longer in expected config)", siteId)
|
||||
if err := o.peerManager.RemovePeer(siteId); err != nil {
|
||||
if err := pm.RemovePeer(siteId); err != nil {
|
||||
logger.Error("Sync: Failed to remove peer %d: %v", siteId, err)
|
||||
} else {
|
||||
// Remove any exit nodes associated with this peer from hole punching
|
||||
@@ -301,7 +320,7 @@ func (o *Olm) handleSync(msg websocket.WSMessage) {
|
||||
siteConfig.Aliases = expectedSite.Aliases
|
||||
}
|
||||
|
||||
if err := o.peerManager.UpdatePeer(siteConfig); err != nil {
|
||||
if err := pm.UpdatePeer(siteConfig); err != nil {
|
||||
logger.Error("Sync: Failed to update peer %d: %v", siteId, err)
|
||||
} else {
|
||||
// If the endpoint changed, trigger holepunch to refresh NAT mappings
|
||||
|
||||
26
olm/olm.go
26
olm/olm.go
@@ -47,6 +47,7 @@ type Olm struct {
|
||||
websocket *websocket.Client
|
||||
holePunchManager *holepunch.Manager
|
||||
peerManager *peers.PeerManager
|
||||
peerManagerMu sync.RWMutex
|
||||
// Power mode management
|
||||
currentPowerMode string
|
||||
powerModeMu sync.Mutex
|
||||
@@ -76,6 +77,15 @@ type Olm struct {
|
||||
tunnelWg sync.WaitGroup
|
||||
}
|
||||
|
||||
// getPeerManager safely returns the current peerManager under a read-lock.
|
||||
// Callers must check the returned value for nil before using it.
|
||||
func (o *Olm) getPeerManager() *peers.PeerManager {
|
||||
o.peerManagerMu.RLock()
|
||||
pm := o.peerManager
|
||||
o.peerManagerMu.RUnlock()
|
||||
return pm
|
||||
}
|
||||
|
||||
// initTunnelInfo creates the shared UDP socket and holepunch manager.
|
||||
// This is used during initial tunnel setup and when switching organizations.
|
||||
func (o *Olm) initTunnelInfo(clientID string) error {
|
||||
@@ -457,7 +467,7 @@ func (o *Olm) StartTunnel(config TunnelConfig) {
|
||||
"userToken": userToken,
|
||||
"fingerprint": o.fingerprint,
|
||||
"postures": o.postures,
|
||||
}, 2*time.Second, 10)
|
||||
}, 2*time.Second, 20)
|
||||
|
||||
// Invoke onRegistered callback if configured
|
||||
if o.olmConfig.OnRegistered != nil {
|
||||
@@ -591,10 +601,12 @@ func (o *Olm) Close() {
|
||||
}
|
||||
|
||||
// Close() also calls Stop() internally
|
||||
o.peerManagerMu.Lock()
|
||||
if o.peerManager != nil {
|
||||
o.peerManager.Close()
|
||||
o.peerManager = nil
|
||||
}
|
||||
o.peerManagerMu.Unlock()
|
||||
|
||||
if o.uapiListener != nil {
|
||||
_ = o.uapiListener.Close()
|
||||
@@ -806,14 +818,14 @@ func (o *Olm) SetPowerMode(mode string) error {
|
||||
|
||||
lowPowerInterval := 10 * time.Minute
|
||||
|
||||
if o.peerManager != nil {
|
||||
peerMonitor := o.peerManager.GetPeerMonitor()
|
||||
if pm := o.getPeerManager(); pm != nil {
|
||||
peerMonitor := pm.GetPeerMonitor()
|
||||
if peerMonitor != nil {
|
||||
peerMonitor.SetPeerInterval(lowPowerInterval, lowPowerInterval)
|
||||
peerMonitor.SetPeerHolepunchInterval(lowPowerInterval, lowPowerInterval)
|
||||
logger.Info("Set monitoring intervals to 10 minutes for low power mode")
|
||||
}
|
||||
o.peerManager.UpdateAllPeersPersistentKeepalive(0) // disable
|
||||
pm.UpdateAllPeersPersistentKeepalive(0) // disable
|
||||
}
|
||||
|
||||
if o.holePunchManager != nil {
|
||||
@@ -858,14 +870,14 @@ func (o *Olm) SetPowerMode(mode string) error {
|
||||
}
|
||||
|
||||
// Restore intervals and reconnect websocket
|
||||
if o.peerManager != nil {
|
||||
peerMonitor := o.peerManager.GetPeerMonitor()
|
||||
if pm := o.getPeerManager(); pm != nil {
|
||||
peerMonitor := pm.GetPeerMonitor()
|
||||
if peerMonitor != nil {
|
||||
peerMonitor.ResetPeerHolepunchInterval()
|
||||
peerMonitor.ResetPeerInterval()
|
||||
}
|
||||
|
||||
o.peerManager.UpdateAllPeersPersistentKeepalive(5)
|
||||
pm.UpdateAllPeersPersistentKeepalive(5)
|
||||
}
|
||||
|
||||
if o.holePunchManager != nil {
|
||||
|
||||
38
olm/peer.go
38
olm/peer.go
@@ -20,7 +20,8 @@ func (o *Olm) handleWgPeerAdd(msg websocket.WSMessage) {
|
||||
return
|
||||
}
|
||||
|
||||
if o.peerManager == nil {
|
||||
pm := o.getPeerManager()
|
||||
if pm == nil {
|
||||
logger.Debug("Ignoring add-peer message: peerManager is nil (shutdown in progress)")
|
||||
return
|
||||
}
|
||||
@@ -64,7 +65,7 @@ func (o *Olm) handleWgPeerAdd(msg websocket.WSMessage) {
|
||||
|
||||
_ = o.holePunchManager.TriggerHolePunch() // Trigger immediate hole punch attempt so that if the peer decides to relay we have already punched close to when we need it
|
||||
|
||||
if err := o.peerManager.AddPeer(siteConfigMsg.SiteConfig); err != nil {
|
||||
if err := pm.AddPeer(siteConfigMsg.SiteConfig); err != nil {
|
||||
logger.Error("Failed to add peer: %v", err)
|
||||
return
|
||||
}
|
||||
@@ -81,7 +82,8 @@ func (o *Olm) handleWgPeerRemove(msg websocket.WSMessage) {
|
||||
return
|
||||
}
|
||||
|
||||
if o.peerManager == nil {
|
||||
pm := o.getPeerManager()
|
||||
if pm == nil {
|
||||
logger.Debug("Ignoring remove-peer message: peerManager is nil (shutdown in progress)")
|
||||
return
|
||||
}
|
||||
@@ -98,7 +100,7 @@ func (o *Olm) handleWgPeerRemove(msg websocket.WSMessage) {
|
||||
return
|
||||
}
|
||||
|
||||
if err := o.peerManager.RemovePeer(removeData.SiteId); err != nil {
|
||||
if err := pm.RemovePeer(removeData.SiteId); err != nil {
|
||||
logger.Error("Failed to remove peer: %v", err)
|
||||
return
|
||||
}
|
||||
@@ -123,7 +125,8 @@ func (o *Olm) handleWgPeerUpdate(msg websocket.WSMessage) {
|
||||
return
|
||||
}
|
||||
|
||||
if o.peerManager == nil {
|
||||
pm := o.getPeerManager()
|
||||
if pm == nil {
|
||||
logger.Debug("Ignoring update-peer message: peerManager is nil (shutdown in progress)")
|
||||
return
|
||||
}
|
||||
@@ -141,7 +144,7 @@ func (o *Olm) handleWgPeerUpdate(msg websocket.WSMessage) {
|
||||
}
|
||||
|
||||
// Get existing peer from PeerManager
|
||||
existingPeer, exists := o.peerManager.GetPeer(updateData.SiteId)
|
||||
existingPeer, exists := pm.GetPeer(updateData.SiteId)
|
||||
if !exists {
|
||||
logger.Warn("Peer with site ID %d not found", updateData.SiteId)
|
||||
return
|
||||
@@ -169,7 +172,7 @@ func (o *Olm) handleWgPeerUpdate(msg websocket.WSMessage) {
|
||||
siteConfig.RemoteSubnets = updateData.RemoteSubnets
|
||||
}
|
||||
|
||||
if err := o.peerManager.UpdatePeer(siteConfig); err != nil {
|
||||
if err := pm.UpdatePeer(siteConfig); err != nil {
|
||||
logger.Error("Failed to update peer: %v", err)
|
||||
return
|
||||
}
|
||||
@@ -188,7 +191,8 @@ func (o *Olm) handleWgPeerRelay(msg websocket.WSMessage) {
|
||||
logger.Debug("Received relay-peer message: %v", msg.Data)
|
||||
|
||||
// Check if peerManager is still valid (may be nil during shutdown)
|
||||
if o.peerManager == nil {
|
||||
pm := o.getPeerManager()
|
||||
if pm == nil {
|
||||
logger.Debug("Ignoring relay message: peerManager is nil (shutdown in progress)")
|
||||
return
|
||||
}
|
||||
@@ -208,7 +212,7 @@ func (o *Olm) handleWgPeerRelay(msg websocket.WSMessage) {
|
||||
return
|
||||
}
|
||||
|
||||
if monitor := o.peerManager.GetPeerMonitor(); monitor != nil {
|
||||
if monitor := pm.GetPeerMonitor(); monitor != nil {
|
||||
monitor.CancelRelaySend(relayData.ChainId)
|
||||
}
|
||||
|
||||
@@ -222,14 +226,15 @@ func (o *Olm) handleWgPeerRelay(msg websocket.WSMessage) {
|
||||
// Update HTTP server to mark this peer as using relay
|
||||
o.apiServer.UpdatePeerRelayStatus(relayData.SiteId, relayData.RelayEndpoint, true)
|
||||
|
||||
o.peerManager.RelayPeer(relayData.SiteId, primaryRelay, relayData.RelayPort)
|
||||
pm.RelayPeer(relayData.SiteId, primaryRelay, relayData.RelayPort)
|
||||
}
|
||||
|
||||
func (o *Olm) handleWgPeerUnrelay(msg websocket.WSMessage) {
|
||||
logger.Debug("Received unrelay-peer message: %v", msg.Data)
|
||||
|
||||
// Check if peerManager is still valid (may be nil during shutdown)
|
||||
if o.peerManager == nil {
|
||||
pm := o.getPeerManager()
|
||||
if pm == nil {
|
||||
logger.Debug("Ignoring unrelay message: peerManager is nil (shutdown in progress)")
|
||||
return
|
||||
}
|
||||
@@ -249,7 +254,7 @@ func (o *Olm) handleWgPeerUnrelay(msg websocket.WSMessage) {
|
||||
return
|
||||
}
|
||||
|
||||
if monitor := o.peerManager.GetPeerMonitor(); monitor != nil {
|
||||
if monitor := pm.GetPeerMonitor(); monitor != nil {
|
||||
monitor.CancelRelaySend(relayData.ChainId)
|
||||
}
|
||||
|
||||
@@ -262,7 +267,7 @@ func (o *Olm) handleWgPeerUnrelay(msg websocket.WSMessage) {
|
||||
// Update HTTP server to mark this peer as using relay
|
||||
o.apiServer.UpdatePeerRelayStatus(relayData.SiteId, relayData.Endpoint, false)
|
||||
|
||||
o.peerManager.UnRelayPeer(relayData.SiteId, primaryRelay)
|
||||
pm.UnRelayPeer(relayData.SiteId, primaryRelay)
|
||||
}
|
||||
|
||||
func (o *Olm) handleWgPeerHolepunchAddSite(msg websocket.WSMessage) {
|
||||
@@ -317,7 +322,12 @@ func (o *Olm) handleWgPeerHolepunchAddSite(msg websocket.WSMessage) {
|
||||
}
|
||||
|
||||
// Get existing peer from PeerManager
|
||||
_, exists := o.peerManager.GetPeer(handshakeData.SiteId)
|
||||
pm := o.getPeerManager()
|
||||
if pm == nil {
|
||||
logger.Debug("Ignoring peer-handshake message: peerManager is nil (shutdown in progress)")
|
||||
return
|
||||
}
|
||||
_, exists := pm.GetPeer(handshakeData.SiteId)
|
||||
if exists {
|
||||
logger.Warn("Peer with site ID %d already added", handshakeData.SiteId)
|
||||
return
|
||||
|
||||
Reference in New Issue
Block a user