Compare commits

..

8 Commits
main ... dev

Author SHA1 Message Date
Owen
40da38708c Update logging 2026-03-20 16:11:10 -07:00
Owen
3af64d8bd3 Merge branch 'dev' of github.com:fosrl/gerbil into dev 2026-03-20 16:04:39 -07:00
Owen
fcead8cc15 Add rate limit to hole punch 2026-03-20 16:02:58 -07:00
Owen Schwartz
20dad7bb8e Merge pull request #60 from LaurenceJJones/split/upstream-dev-relay-worker-scaling
perf(relay): scale packet workers and queue depth for throughput
2026-03-18 15:56:46 -07:00
Owen Schwartz
a955aa6169 Merge pull request #61 from LaurenceJJones/split/upstream-dev-relay-addr-cache
perf(relay): cache resolved UDP destinations with TTL
2026-03-18 15:56:32 -07:00
Laurence
b118fef265 perf(relay): cache resolved UDP destinations with TTL 2026-03-12 12:54:59 +00:00
Laurence
7985f97eb6 perf(relay): scale packet workers and queue depth for throughput 2026-03-12 12:54:02 +00:00
Owen
b9261b8fea Add optional tc 2026-02-27 15:45:17 -08:00
2 changed files with 318 additions and 29 deletions

228
main.go
View File

@@ -33,15 +33,16 @@ import (
) )
var ( var (
interfaceName string interfaceName string
listenAddr string listenAddr string
mtuInt int mtuInt int
lastReadings = make(map[string]PeerReading) lastReadings = make(map[string]PeerReading)
mu sync.Mutex mu sync.Mutex
wgMu sync.Mutex // Protects WireGuard operations wgMu sync.Mutex // Protects WireGuard operations
notifyURL string notifyURL string
proxyRelay *relay.UDPProxyServer proxyRelay *relay.UDPProxyServer
proxySNI *proxy.SNIProxy proxySNI *proxy.SNIProxy
doTrafficShaping bool
) )
type WgConfig struct { type WgConfig struct {
@@ -151,6 +152,7 @@ func main() {
localOverridesStr = os.Getenv("LOCAL_OVERRIDES") localOverridesStr = os.Getenv("LOCAL_OVERRIDES")
trustedUpstreamsStr = os.Getenv("TRUSTED_UPSTREAMS") trustedUpstreamsStr = os.Getenv("TRUSTED_UPSTREAMS")
proxyProtocolStr := os.Getenv("PROXY_PROTOCOL") proxyProtocolStr := os.Getenv("PROXY_PROTOCOL")
doTrafficShapingStr := os.Getenv("DO_TRAFFIC_SHAPING")
if interfaceName == "" { if interfaceName == "" {
flag.StringVar(&interfaceName, "interface", "wg0", "Name of the WireGuard interface") flag.StringVar(&interfaceName, "interface", "wg0", "Name of the WireGuard interface")
@@ -222,6 +224,13 @@ func main() {
flag.BoolVar(&proxyProtocol, "proxy-protocol", true, "Enable PROXY protocol v1 for preserving client IP") flag.BoolVar(&proxyProtocol, "proxy-protocol", true, "Enable PROXY protocol v1 for preserving client IP")
} }
if doTrafficShapingStr != "" {
doTrafficShaping = strings.ToLower(doTrafficShapingStr) == "true"
}
if doTrafficShapingStr == "" {
flag.BoolVar(&doTrafficShaping, "do-traffic-shaping", false, "Whether to set up traffic shaping rules for peers (requires tc command and root privileges)")
}
flag.Parse() flag.Parse()
logger.Init() logger.Init()
@@ -886,17 +895,23 @@ func addPeerInternal(peer Peer) error {
return fmt.Errorf("failed to parse public key: %v", err) return fmt.Errorf("failed to parse public key: %v", err)
} }
logger.Debug("Adding peer %s with AllowedIPs: %v", peer.PublicKey, peer.AllowedIPs)
// parse allowed IPs into array of net.IPNet // parse allowed IPs into array of net.IPNet
var allowedIPs []net.IPNet var allowedIPs []net.IPNet
var wgIPs []string var wgIPs []string
for _, ipStr := range peer.AllowedIPs { for _, ipStr := range peer.AllowedIPs {
logger.Debug("Parsing AllowedIP: %s", ipStr)
_, ipNet, err := net.ParseCIDR(ipStr) _, ipNet, err := net.ParseCIDR(ipStr)
if err != nil { if err != nil {
logger.Warn("Failed to parse allowed IP '%s' for peer %s: %v", ipStr, peer.PublicKey, err)
return fmt.Errorf("failed to parse allowed IP: %v", err) return fmt.Errorf("failed to parse allowed IP: %v", err)
} }
allowedIPs = append(allowedIPs, *ipNet) allowedIPs = append(allowedIPs, *ipNet)
// Extract the IP address from the CIDR for relay cleanup // Extract the IP address from the CIDR for relay cleanup
wgIPs = append(wgIPs, ipNet.IP.String()) extractedIP := ipNet.IP.String()
wgIPs = append(wgIPs, extractedIP)
logger.Debug("Extracted IP %s from AllowedIP %s", extractedIP, ipStr)
} }
peerConfig := wgtypes.PeerConfig{ peerConfig := wgtypes.PeerConfig{
@@ -912,6 +927,18 @@ func addPeerInternal(peer Peer) error {
return fmt.Errorf("failed to add peer: %v", err) return fmt.Errorf("failed to add peer: %v", err)
} }
// Setup bandwidth limiting for each peer IP
if doTrafficShaping {
logger.Debug("doTrafficShaping is true, setting up bandwidth limits for %d IPs", len(wgIPs))
for _, wgIP := range wgIPs {
if err := setupPeerBandwidthLimit(wgIP); err != nil {
logger.Warn("Failed to setup bandwidth limit for peer IP %s: %v", wgIP, err)
}
}
} else {
logger.Debug("doTrafficShaping is false, skipping bandwidth limit setup")
}
// Clear relay connections for the peer's WireGuard IPs // Clear relay connections for the peer's WireGuard IPs
if proxyRelay != nil { if proxyRelay != nil {
for _, wgIP := range wgIPs { for _, wgIP := range wgIPs {
@@ -956,19 +983,17 @@ func removePeerInternal(publicKey string) error {
return fmt.Errorf("failed to parse public key: %v", err) return fmt.Errorf("failed to parse public key: %v", err)
} }
// Get current peer info before removing to clear relay connections // Get current peer info before removing to clear relay connections and bandwidth limits
var wgIPs []string var wgIPs []string
if proxyRelay != nil { device, err := wgClient.Device(interfaceName)
device, err := wgClient.Device(interfaceName) if err == nil {
if err == nil { for _, peer := range device.Peers {
for _, peer := range device.Peers { if peer.PublicKey.String() == publicKey {
if peer.PublicKey.String() == publicKey { // Extract WireGuard IPs from this peer's allowed IPs
// Extract WireGuard IPs from this peer's allowed IPs for _, allowedIP := range peer.AllowedIPs {
for _, allowedIP := range peer.AllowedIPs { wgIPs = append(wgIPs, allowedIP.IP.String())
wgIPs = append(wgIPs, allowedIP.IP.String())
}
break
} }
break
} }
} }
} }
@@ -986,6 +1011,15 @@ func removePeerInternal(publicKey string) error {
return fmt.Errorf("failed to remove peer: %v", err) return fmt.Errorf("failed to remove peer: %v", err)
} }
// Remove bandwidth limits for each peer IP
if doTrafficShaping {
for _, wgIP := range wgIPs {
if err := removePeerBandwidthLimit(wgIP); err != nil {
logger.Warn("Failed to remove bandwidth limit for peer IP %s: %v", wgIP, err)
}
}
}
// Clear relay connections for the peer's WireGuard IPs // Clear relay connections for the peer's WireGuard IPs
if proxyRelay != nil { if proxyRelay != nil {
for _, wgIP := range wgIPs { for _, wgIP := range wgIPs {
@@ -1315,3 +1349,155 @@ func monitorMemory(limit uint64) {
time.Sleep(5 * time.Second) time.Sleep(5 * time.Second)
} }
} }
// setupPeerBandwidthLimit sets up TC (Traffic Control) to limit bandwidth for a specific peer IP
// Currently hardcoded to 20 Mbps per peer
func setupPeerBandwidthLimit(peerIP string) error {
logger.Debug("setupPeerBandwidthLimit called for peer IP: %s", peerIP)
const bandwidthLimit = "50mbit" // 50 Mbps limit per peer
// Parse the IP to get just the IP address (strip any CIDR notation if present)
ip := peerIP
if strings.Contains(peerIP, "/") {
parsedIP, _, err := net.ParseCIDR(peerIP)
if err != nil {
return fmt.Errorf("failed to parse peer IP: %v", err)
}
ip = parsedIP.String()
}
// First, ensure we have a root qdisc on the interface (HTB - Hierarchical Token Bucket)
// Check if qdisc already exists
cmd := exec.Command("tc", "qdisc", "show", "dev", interfaceName)
output, err := cmd.CombinedOutput()
if err != nil {
return fmt.Errorf("failed to check qdisc: %v, output: %s", err, string(output))
}
// If no HTB qdisc exists, create one
if !strings.Contains(string(output), "htb") {
cmd = exec.Command("tc", "qdisc", "add", "dev", interfaceName, "root", "handle", "1:", "htb", "default", "9999")
if output, err := cmd.CombinedOutput(); err != nil {
return fmt.Errorf("failed to add root qdisc: %v, output: %s", err, string(output))
}
logger.Info("Created HTB root qdisc on %s", interfaceName)
}
// Generate a unique class ID based on the IP address
// We'll use the last octet of the IP as part of the class ID
ipParts := strings.Split(ip, ".")
if len(ipParts) != 4 {
return fmt.Errorf("invalid IPv4 address: %s", ip)
}
lastOctet := ipParts[3]
classID := fmt.Sprintf("1:%s", lastOctet)
logger.Debug("Generated class ID %s for peer IP %s", classID, ip)
// Create a class for this peer with bandwidth limit
cmd = exec.Command("tc", "class", "add", "dev", interfaceName, "parent", "1:", "classid", classID,
"htb", "rate", bandwidthLimit, "ceil", bandwidthLimit)
if output, err := cmd.CombinedOutput(); err != nil {
logger.Debug("tc class add failed for %s: %v, output: %s", ip, err, string(output))
// If class already exists, try to replace it
if strings.Contains(string(output), "File exists") {
cmd = exec.Command("tc", "class", "replace", "dev", interfaceName, "parent", "1:", "classid", classID,
"htb", "rate", bandwidthLimit, "ceil", bandwidthLimit)
if output, err := cmd.CombinedOutput(); err != nil {
return fmt.Errorf("failed to replace class: %v, output: %s", err, string(output))
}
logger.Debug("Successfully replaced existing class %s for peer IP %s", classID, ip)
} else {
return fmt.Errorf("failed to add class: %v, output: %s", err, string(output))
}
} else {
logger.Debug("Successfully added new class %s for peer IP %s", classID, ip)
}
// Add a filter to match traffic from this peer IP (ingress)
cmd = exec.Command("tc", "filter", "add", "dev", interfaceName, "protocol", "ip", "parent", "1:",
"prio", "1", "u32", "match", "ip", "src", ip, "flowid", classID)
if output, err := cmd.CombinedOutput(); err != nil {
// If filter fails, log but don't fail the peer addition
logger.Warn("Failed to add ingress filter for peer IP %s: %v, output: %s", ip, err, string(output))
}
// Add a filter to match traffic to this peer IP (egress)
cmd = exec.Command("tc", "filter", "add", "dev", interfaceName, "protocol", "ip", "parent", "1:",
"prio", "1", "u32", "match", "ip", "dst", ip, "flowid", classID)
if output, err := cmd.CombinedOutput(); err != nil {
// If filter fails, log but don't fail the peer addition
logger.Warn("Failed to add egress filter for peer IP %s: %v, output: %s", ip, err, string(output))
}
logger.Info("Setup bandwidth limit of %s for peer IP %s (class %s)", bandwidthLimit, ip, classID)
return nil
}
// removePeerBandwidthLimit removes TC rules for a specific peer IP
func removePeerBandwidthLimit(peerIP string) error {
// Parse the IP to get just the IP address
ip := peerIP
if strings.Contains(peerIP, "/") {
parsedIP, _, err := net.ParseCIDR(peerIP)
if err != nil {
return fmt.Errorf("failed to parse peer IP: %v", err)
}
ip = parsedIP.String()
}
// Generate the class ID based on the IP
ipParts := strings.Split(ip, ".")
if len(ipParts) != 4 {
return fmt.Errorf("invalid IPv4 address: %s", ip)
}
lastOctet := ipParts[3]
classID := fmt.Sprintf("1:%s", lastOctet)
// Remove filters for this IP
// List all filters to find the ones for this class
cmd := exec.Command("tc", "filter", "show", "dev", interfaceName, "parent", "1:")
output, err := cmd.CombinedOutput()
if err != nil {
logger.Warn("Failed to list filters for peer IP %s: %v, output: %s", ip, err, string(output))
} else {
// Parse the output to find filter handles that match this classID
// The output format includes lines like:
// filter parent 1: protocol ip pref 1 u32 chain 0 fh 800::800 order 2048 key ht 800 bkt 0 flowid 1:4
lines := strings.Split(string(output), "\n")
for _, line := range lines {
// Look for lines containing our flowid (classID)
if strings.Contains(line, "flowid "+classID) && strings.Contains(line, "fh ") {
// Extract handle (format: fh 800::800)
parts := strings.Fields(line)
var handle string
for j, part := range parts {
if part == "fh" && j+1 < len(parts) {
handle = parts[j+1]
break
}
}
if handle != "" {
// Delete this filter using the handle
delCmd := exec.Command("tc", "filter", "del", "dev", interfaceName, "parent", "1:", "handle", handle, "prio", "1", "u32")
if delOutput, delErr := delCmd.CombinedOutput(); delErr != nil {
logger.Debug("Failed to delete filter handle %s for peer IP %s: %v, output: %s", handle, ip, delErr, string(delOutput))
} else {
logger.Debug("Deleted filter handle %s for peer IP %s", handle, ip)
}
}
}
}
}
// Remove the class
cmd = exec.Command("tc", "class", "del", "dev", interfaceName, "classid", classID)
if output, err := cmd.CombinedOutput(); err != nil {
// It's okay if the class doesn't exist
if !strings.Contains(string(output), "No such file or directory") && !strings.Contains(string(output), "Cannot find") {
logger.Warn("Failed to remove class for peer IP %s: %v, output: %s", ip, err, string(output))
}
}
logger.Info("Removed bandwidth limit for peer IP %s (class %s)", ip, classID)
return nil
}

View File

@@ -9,6 +9,7 @@ import (
"io" "io"
"net" "net"
"net/http" "net/http"
"runtime"
"sync" "sync"
"time" "time"
@@ -118,6 +119,13 @@ type Packet struct {
n int n int
} }
// holePunchRateLimitEntry tracks hole punch message counts within a sliding 1-second window.
type holePunchRateLimitEntry struct {
mu sync.Mutex
count int
windowStart time.Time
}
// WireGuard message types // WireGuard message types
const ( const (
WireGuardMessageTypeHandshakeInitiation = 1 WireGuardMessageTypeHandshakeInitiation = 1
@@ -153,6 +161,11 @@ type UDPProxyServer struct {
// Communication pattern tracking for rebuilding sessions // Communication pattern tracking for rebuilding sessions
// Key format: "clientIP:clientPort-destIP:destPort" // Key format: "clientIP:clientPort-destIP:destPort"
commPatterns sync.Map commPatterns sync.Map
// Rate limiter for encrypted hole punch messages, keyed by "ip:port"
holePunchRateLimiter sync.Map
// Cache for resolved UDP addresses to avoid per-packet DNS lookups
// Key: "ip:port" string, Value: *net.UDPAddr
addrCache sync.Map
// ReachableAt is the URL where this server can be reached // ReachableAt is the URL where this server can be reached
ReachableAt string ReachableAt string
} }
@@ -164,7 +177,7 @@ func NewUDPProxyServer(parentCtx context.Context, addr, serverURL string, privat
addr: addr, addr: addr,
serverURL: serverURL, serverURL: serverURL,
privateKey: privateKey, privateKey: privateKey,
packetChan: make(chan Packet, 1000), packetChan: make(chan Packet, 50000), // Increased from 1000 to handle high throughput
ReachableAt: reachableAt, ReachableAt: reachableAt,
ctx: ctx, ctx: ctx,
cancel: cancel, cancel: cancel,
@@ -189,8 +202,13 @@ func (s *UDPProxyServer) Start() error {
s.conn = conn s.conn = conn
logger.Info("UDP server listening on %s", s.addr) logger.Info("UDP server listening on %s", s.addr)
// Start a fixed number of worker goroutines. // Start worker goroutines based on CPU cores for better parallelism
workerCount := 10 // TODO: Make this configurable or pick it better! // At high throughput (160+ Mbps), we need many workers to avoid bottlenecks
workerCount := runtime.NumCPU() * 10
if workerCount < 20 {
workerCount = 20 // Minimum 20 workers
}
logger.Info("Starting %d packet workers (CPUs: %d)", workerCount, runtime.NumCPU())
for i := 0; i < workerCount; i++ { for i := 0; i < workerCount; i++ {
go s.packetWorker() go s.packetWorker()
} }
@@ -210,6 +228,9 @@ func (s *UDPProxyServer) Start() error {
// Start the communication pattern cleanup routine // Start the communication pattern cleanup routine
go s.cleanupIdleCommunicationPatterns() go s.cleanupIdleCommunicationPatterns()
// Start the hole punch rate limiter cleanup routine
go s.cleanupHolePunchRateLimiter()
return nil return nil
} }
@@ -272,6 +293,27 @@ func (s *UDPProxyServer) packetWorker() {
// Process as a WireGuard packet. // Process as a WireGuard packet.
s.handleWireGuardPacket(packet.data, packet.remoteAddr) s.handleWireGuardPacket(packet.data, packet.remoteAddr)
} else { } else {
// Rate limit: allow at most 2 hole punch messages per IP:Port per second
rateLimitKey := packet.remoteAddr.String()
entryVal, _ := s.holePunchRateLimiter.LoadOrStore(rateLimitKey, &holePunchRateLimitEntry{
windowStart: time.Now(),
})
rlEntry := entryVal.(*holePunchRateLimitEntry)
rlEntry.mu.Lock()
now := time.Now()
if now.Sub(rlEntry.windowStart) >= time.Second {
rlEntry.count = 0
rlEntry.windowStart = now
}
rlEntry.count++
allowed := rlEntry.count <= 2
rlEntry.mu.Unlock()
if !allowed {
// logger.Debug("Rate limiting hole punch message from %s", rateLimitKey)
bufferPool.Put(packet.data[:1500])
continue
}
// Process as an encrypted hole punch message // Process as an encrypted hole punch message
var encMsg EncryptedHolePunchMessage var encMsg EncryptedHolePunchMessage
if err := json.Unmarshal(packet.data, &encMsg); err != nil { if err := json.Unmarshal(packet.data, &encMsg); err != nil {
@@ -291,7 +333,7 @@ func (s *UDPProxyServer) packetWorker() {
// This appears to be an encrypted message // This appears to be an encrypted message
decryptedData, err := s.decryptMessage(encMsg) decryptedData, err := s.decryptMessage(encMsg)
if err != nil { if err != nil {
logger.Error("Failed to decrypt message: %v", err) // logger.Error("Failed to decrypt message: %v", err)
// Return the buffer to the pool for reuse and continue with next packet // Return the buffer to the pool for reuse and continue with next packet
bufferPool.Put(packet.data[:1500]) bufferPool.Put(packet.data[:1500])
continue continue
@@ -416,6 +458,43 @@ func extractWireGuardIndices(packet []byte) (uint32, uint32, bool) {
return 0, 0, false return 0, 0, false
} }
// cachedAddr holds a resolved UDP address with TTL
type cachedAddr struct {
addr *net.UDPAddr
expiresAt time.Time
}
// addrCacheTTL is how long resolved addresses are cached before re-resolving
const addrCacheTTL = 5 * time.Minute
// getCachedAddr returns a cached UDP address or resolves and caches it.
// This avoids per-packet DNS lookups which are a major throughput bottleneck.
func (s *UDPProxyServer) getCachedAddr(ip string, port int) (*net.UDPAddr, error) {
key := fmt.Sprintf("%s:%d", ip, port)
// Check cache first
if cached, ok := s.addrCache.Load(key); ok {
entry := cached.(*cachedAddr)
if time.Now().Before(entry.expiresAt) {
return entry.addr, nil
}
// Cache expired, delete and re-resolve
s.addrCache.Delete(key)
}
// Resolve and cache
addr, err := net.ResolveUDPAddr("udp", key)
if err != nil {
return nil, err
}
s.addrCache.Store(key, &cachedAddr{
addr: addr,
expiresAt: time.Now().Add(addrCacheTTL),
})
return addr, nil
}
// Updated to handle multi-peer WireGuard communication // Updated to handle multi-peer WireGuard communication
func (s *UDPProxyServer) handleWireGuardPacket(packet []byte, remoteAddr *net.UDPAddr) { func (s *UDPProxyServer) handleWireGuardPacket(packet []byte, remoteAddr *net.UDPAddr) {
if len(packet) == 0 { if len(packet) == 0 {
@@ -450,7 +529,7 @@ func (s *UDPProxyServer) handleWireGuardPacket(packet []byte, remoteAddr *net.UD
logger.Debug("Forwarding handshake initiation from %s (sender index: %d) to peers %v", remoteAddr, senderIndex, proxyMapping.Destinations) logger.Debug("Forwarding handshake initiation from %s (sender index: %d) to peers %v", remoteAddr, senderIndex, proxyMapping.Destinations)
for _, dest := range proxyMapping.Destinations { for _, dest := range proxyMapping.Destinations {
destAddr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", dest.DestinationIP, dest.DestinationPort)) destAddr, err := s.getCachedAddr(dest.DestinationIP, dest.DestinationPort)
if err != nil { if err != nil {
logger.Error("Failed to resolve destination address: %v", err) logger.Error("Failed to resolve destination address: %v", err)
continue continue
@@ -486,7 +565,7 @@ func (s *UDPProxyServer) handleWireGuardPacket(packet []byte, remoteAddr *net.UD
// Forward the response to the original sender // Forward the response to the original sender
for _, dest := range proxyMapping.Destinations { for _, dest := range proxyMapping.Destinations {
destAddr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", dest.DestinationIP, dest.DestinationPort)) destAddr, err := s.getCachedAddr(dest.DestinationIP, dest.DestinationPort)
if err != nil { if err != nil {
logger.Error("Failed to resolve destination address: %v", err) logger.Error("Failed to resolve destination address: %v", err)
continue continue
@@ -543,7 +622,7 @@ func (s *UDPProxyServer) handleWireGuardPacket(packet []byte, remoteAddr *net.UD
// No known session, fall back to forwarding to all peers // No known session, fall back to forwarding to all peers
logger.Debug("No session found for receiver index %d, forwarding to all destinations", receiverIndex) logger.Debug("No session found for receiver index %d, forwarding to all destinations", receiverIndex)
for _, dest := range proxyMapping.Destinations { for _, dest := range proxyMapping.Destinations {
destAddr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", dest.DestinationIP, dest.DestinationPort)) destAddr, err := s.getCachedAddr(dest.DestinationIP, dest.DestinationPort)
if err != nil { if err != nil {
logger.Error("Failed to resolve destination address: %v", err) logger.Error("Failed to resolve destination address: %v", err)
continue continue
@@ -571,7 +650,7 @@ func (s *UDPProxyServer) handleWireGuardPacket(packet []byte, remoteAddr *net.UD
// Forward to all peers // Forward to all peers
for _, dest := range proxyMapping.Destinations { for _, dest := range proxyMapping.Destinations {
destAddr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", dest.DestinationIP, dest.DestinationPort)) destAddr, err := s.getCachedAddr(dest.DestinationIP, dest.DestinationPort)
if err != nil { if err != nil {
logger.Error("Failed to resolve destination address: %v", err) logger.Error("Failed to resolve destination address: %v", err)
continue continue
@@ -1030,6 +1109,30 @@ func (s *UDPProxyServer) tryRebuildSession(pattern *CommunicationPattern) {
} }
// cleanupIdleCommunicationPatterns periodically removes idle communication patterns // cleanupIdleCommunicationPatterns periodically removes idle communication patterns
// cleanupHolePunchRateLimiter periodically evicts stale rate limit entries to prevent unbounded growth.
func (s *UDPProxyServer) cleanupHolePunchRateLimiter() {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
now := time.Now()
s.holePunchRateLimiter.Range(func(key, value interface{}) bool {
rlEntry := value.(*holePunchRateLimitEntry)
rlEntry.mu.Lock()
stale := now.Sub(rlEntry.windowStart) > 10*time.Second
rlEntry.mu.Unlock()
if stale {
s.holePunchRateLimiter.Delete(key)
}
return true
})
case <-s.ctx.Done():
return
}
}
}
func (s *UDPProxyServer) cleanupIdleCommunicationPatterns() { func (s *UDPProxyServer) cleanupIdleCommunicationPatterns() {
ticker := time.NewTicker(10 * time.Minute) ticker := time.NewTicker(10 * time.Minute)
defer ticker.Stop() defer ticker.Stop()