Compare commits

...

17 Commits
msg-opt ... dev

Author SHA1 Message Date
Owen
d4ebb3e2af Send disconnecting message 2026-03-15 17:42:03 -07:00
Owen
bf029b7bb2 Clean up to match olm 2026-03-14 11:57:37 -07:00
Owen
745d2dbc7e Merge branch 'dev' into msg-opt 2026-03-13 17:10:49 -07:00
Owen
c7b01288e0 Clean up previous logging 2026-03-13 11:45:36 -07:00
Owen
539e595c48 Add optional compression 2026-03-12 17:49:05 -07:00
Owen
a1df3d7ff0 Merge branch 'dev' of github.com:fosrl/newt into dev 2026-03-11 17:28:16 -07:00
Laurence
d68a13ea1f feat(installer): prefer /usr/local/bin and improve POSIX compatibility
- Always install to /usr/local/bin instead of ~/.local/bin
  - Use sudo automatically when write access is needed
  - Replace bash-specific syntax with POSIX equivalents:
    - Change shebang from #!/bin/bash to #!/bin/sh
    - Replace [[ == *pattern* ]] with case statements
    - Replace echo -e with printf for colored output
  - Script now works with dash, ash, busybox sh, and bash
2026-03-10 10:01:28 -07:00
Owen
accac75a53 Set newt version in dockerfile 2026-03-08 11:26:35 -07:00
Laurence
768415f90b Parse target strings with IPv6 support and strict validation
Add parseTargetString() for listenPort:host:targetPort using net.SplitHostPort/JoinHostPort. Replace manual split in updateTargets; fix err shadowing on remove. Validate listen port 1–65535 and reject empty host/port; use %w for errors. Add tests for IPv4, IPv6, hostnames, and invalid cases.
2026-03-07 21:32:36 -08:00
Owen
da9825d030 Merge branch 'main' into dev 2026-03-07 12:34:45 -08:00
Owen
afdb1fc977 Make sure to set version and fix prepare issue 2026-03-07 12:32:49 -08:00
Owen
392e4c83bf Make sure to skip prepare 2026-03-07 10:37:44 -08:00
Owen
1bd1133ac2 Make sure to skip prepare 2026-03-07 10:36:18 -08:00
Owen
a85454e770 Build full arn 2026-03-07 10:20:18 -08:00
Owen
fac0f5b197 Build full arn 2026-03-07 10:17:14 -08:00
Marc Schäfer
068145c539 fix(ci): Refactor CI/CD workflow for AWS and image management
Updated CI/CD workflow to improve AWS role handling and image tagging.
2026-03-07 10:07:55 -08:00
Marc Schäfer
91a035f4ab fix(ci): Use AWS SelfHosted runner to fix pull and install request limit 2026-03-07 10:07:55 -08:00
8 changed files with 1113 additions and 482 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -17,7 +17,8 @@ RUN go mod download
COPY . . COPY . .
# Build the application # Build the application
RUN CGO_ENABLED=0 GOOS=linux go build -ldflags="-s -w" -o /newt ARG VERSION=dev
RUN CGO_ENABLED=0 GOOS=linux go build -ldflags="-s -w -X main.newtVersion=${VERSION}" -o /newt
FROM public.ecr.aws/docker/library/alpine:3.23 AS runner FROM public.ecr.aws/docker/library/alpine:3.23 AS runner

View File

@@ -2,6 +2,9 @@
all: local all: local
VERSION ?= dev
LDFLAGS = -X main.newtVersion=$(VERSION)
local: local:
CGO_ENABLED=0 go build -o ./bin/newt CGO_ENABLED=0 go build -o ./bin/newt
@@ -40,31 +43,31 @@ go-build-release: \
go-build-release-freebsd-arm64 go-build-release-freebsd-arm64
go-build-release-linux-arm64: go-build-release-linux-arm64:
CGO_ENABLED=0 GOOS=linux GOARCH=arm64 go build -o bin/newt_linux_arm64 CGO_ENABLED=0 GOOS=linux GOARCH=arm64 go build -ldflags "$(LDFLAGS)" -o bin/newt_linux_arm64
go-build-release-linux-arm32-v7: go-build-release-linux-arm32-v7:
CGO_ENABLED=0 GOOS=linux GOARCH=arm GOARM=7 go build -o bin/newt_linux_arm32 CGO_ENABLED=0 GOOS=linux GOARCH=arm GOARM=7 go build -ldflags "$(LDFLAGS)" -o bin/newt_linux_arm32
go-build-release-linux-arm32-v6: go-build-release-linux-arm32-v6:
CGO_ENABLED=0 GOOS=linux GOARCH=arm GOARM=6 go build -o bin/newt_linux_arm32v6 CGO_ENABLED=0 GOOS=linux GOARCH=arm GOARM=6 go build -ldflags "$(LDFLAGS)" -o bin/newt_linux_arm32v6
go-build-release-linux-amd64: go-build-release-linux-amd64:
CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -o bin/newt_linux_amd64 CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -ldflags "$(LDFLAGS)" -o bin/newt_linux_amd64
go-build-release-linux-riscv64: go-build-release-linux-riscv64:
CGO_ENABLED=0 GOOS=linux GOARCH=riscv64 go build -o bin/newt_linux_riscv64 CGO_ENABLED=0 GOOS=linux GOARCH=riscv64 go build -ldflags "$(LDFLAGS)" -o bin/newt_linux_riscv64
go-build-release-darwin-arm64: go-build-release-darwin-arm64:
CGO_ENABLED=0 GOOS=darwin GOARCH=arm64 go build -o bin/newt_darwin_arm64 CGO_ENABLED=0 GOOS=darwin GOARCH=arm64 go build -ldflags "$(LDFLAGS)" -o bin/newt_darwin_arm64
go-build-release-darwin-amd64: go-build-release-darwin-amd64:
CGO_ENABLED=0 GOOS=darwin GOARCH=amd64 go build -o bin/newt_darwin_amd64 CGO_ENABLED=0 GOOS=darwin GOARCH=amd64 go build -ldflags "$(LDFLAGS)" -o bin/newt_darwin_amd64
go-build-release-windows-amd64: go-build-release-windows-amd64:
CGO_ENABLED=0 GOOS=windows GOARCH=amd64 go build -o bin/newt_windows_amd64.exe CGO_ENABLED=0 GOOS=windows GOARCH=amd64 go build -ldflags "$(LDFLAGS)" -o bin/newt_windows_amd64.exe
go-build-release-freebsd-amd64: go-build-release-freebsd-amd64:
CGO_ENABLED=0 GOOS=freebsd GOARCH=amd64 go build -o bin/newt_freebsd_amd64 CGO_ENABLED=0 GOOS=freebsd GOARCH=amd64 go build -ldflags "$(LDFLAGS)" -o bin/newt_freebsd_amd64
go-build-release-freebsd-arm64: go-build-release-freebsd-arm64:
CGO_ENABLED=0 GOOS=freebsd GOARCH=arm64 go build -o bin/newt_freebsd_arm64 CGO_ENABLED=0 GOOS=freebsd GOARCH=arm64 go build -ldflags "$(LDFLAGS)" -o bin/newt_freebsd_arm64

View File

@@ -5,6 +5,7 @@ import (
"context" "context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"net"
"os" "os"
"os/exec" "os/exec"
"strings" "strings"
@@ -363,27 +364,62 @@ func parseTargetData(data interface{}) (TargetData, error) {
return targetData, nil return targetData, nil
} }
// parseTargetString parses a target string in the format "listenPort:host:targetPort"
// It properly handles IPv6 addresses which must be in brackets: "listenPort:[ipv6]:targetPort"
// Examples:
// - IPv4: "3001:192.168.1.1:80"
// - IPv6: "3001:[::1]:8080" or "3001:[fd70:1452:b736:4dd5:caca:7db9:c588:f5b3]:80"
//
// Returns listenPort, targetAddress (in host:port format suitable for net.Dial), and error
func parseTargetString(target string) (int, string, error) {
// Find the first colon to extract the listen port
firstColon := strings.Index(target, ":")
if firstColon == -1 {
return 0, "", fmt.Errorf("invalid target format, no colon found: %s", target)
}
listenPortStr := target[:firstColon]
var listenPort int
_, err := fmt.Sscanf(listenPortStr, "%d", &listenPort)
if err != nil {
return 0, "", fmt.Errorf("invalid listen port: %s", listenPortStr)
}
if listenPort <= 0 || listenPort > 65535 {
return 0, "", fmt.Errorf("listen port out of range: %d", listenPort)
}
// The remainder is host:targetPort - use net.SplitHostPort which handles IPv6 brackets
remainder := target[firstColon+1:]
host, targetPort, err := net.SplitHostPort(remainder)
if err != nil {
return 0, "", fmt.Errorf("invalid host:port format '%s': %w", remainder, err)
}
// Reject empty host or target port
if host == "" {
return 0, "", fmt.Errorf("empty host in target: %s", target)
}
if targetPort == "" {
return 0, "", fmt.Errorf("empty target port in target: %s", target)
}
// Reconstruct the target address using JoinHostPort (handles IPv6 properly)
targetAddr := net.JoinHostPort(host, targetPort)
return listenPort, targetAddr, nil
}
func updateTargets(pm *proxy.ProxyManager, action string, tunnelIP string, proto string, targetData TargetData) error { func updateTargets(pm *proxy.ProxyManager, action string, tunnelIP string, proto string, targetData TargetData) error {
for _, t := range targetData.Targets { for _, t := range targetData.Targets {
// Split the first number off of the target with : separator and use as the port // Parse the target string, handling both IPv4 and IPv6 addresses
parts := strings.Split(t, ":") port, target, err := parseTargetString(t)
if len(parts) != 3 {
logger.Info("Invalid target format: %s", t)
continue
}
// Get the port as an int
port := 0
_, err := fmt.Sscanf(parts[0], "%d", &port)
if err != nil { if err != nil {
logger.Info("Invalid port: %s", parts[0]) logger.Info("Invalid target format: %s (%v)", t, err)
continue continue
} }
switch action { switch action {
case "add": case "add":
target := parts[1] + ":" + parts[2]
// Call updown script if provided // Call updown script if provided
processedTarget := target processedTarget := target
if updownScript != "" { if updownScript != "" {
@@ -410,8 +446,6 @@ func updateTargets(pm *proxy.ProxyManager, action string, tunnelIP string, proto
case "remove": case "remove":
logger.Info("Removing target with port %d", port) logger.Info("Removing target with port %d", port)
target := parts[1] + ":" + parts[2]
// Call updown script if provided // Call updown script if provided
if updownScript != "" { if updownScript != "" {
_, err := executeUpdownScript(action, proto, target) _, err := executeUpdownScript(action, proto, target)
@@ -420,7 +454,7 @@ func updateTargets(pm *proxy.ProxyManager, action string, tunnelIP string, proto
} }
} }
err := pm.RemoveTarget(proto, tunnelIP, port) err = pm.RemoveTarget(proto, tunnelIP, port)
if err != nil { if err != nil {
logger.Error("Failed to remove target: %v", err) logger.Error("Failed to remove target: %v", err)
return err return err

212
common_test.go Normal file
View File

@@ -0,0 +1,212 @@
package main
import (
"net"
"testing"
)
func TestParseTargetString(t *testing.T) {
tests := []struct {
name string
input string
wantListenPort int
wantTargetAddr string
wantErr bool
}{
// IPv4 test cases
{
name: "valid IPv4 basic",
input: "3001:192.168.1.1:80",
wantListenPort: 3001,
wantTargetAddr: "192.168.1.1:80",
wantErr: false,
},
{
name: "valid IPv4 localhost",
input: "8080:127.0.0.1:3000",
wantListenPort: 8080,
wantTargetAddr: "127.0.0.1:3000",
wantErr: false,
},
{
name: "valid IPv4 same ports",
input: "443:10.0.0.1:443",
wantListenPort: 443,
wantTargetAddr: "10.0.0.1:443",
wantErr: false,
},
// IPv6 test cases
{
name: "valid IPv6 loopback",
input: "3001:[::1]:8080",
wantListenPort: 3001,
wantTargetAddr: "[::1]:8080",
wantErr: false,
},
{
name: "valid IPv6 full address",
input: "80:[fd70:1452:b736:4dd5:caca:7db9:c588:f5b3]:8080",
wantListenPort: 80,
wantTargetAddr: "[fd70:1452:b736:4dd5:caca:7db9:c588:f5b3]:8080",
wantErr: false,
},
{
name: "valid IPv6 link-local",
input: "443:[fe80::1]:443",
wantListenPort: 443,
wantTargetAddr: "[fe80::1]:443",
wantErr: false,
},
{
name: "valid IPv6 all zeros compressed",
input: "8000:[::]:9000",
wantListenPort: 8000,
wantTargetAddr: "[::]:9000",
wantErr: false,
},
{
name: "valid IPv6 mixed notation",
input: "5000:[::ffff:192.168.1.1]:6000",
wantListenPort: 5000,
wantTargetAddr: "[::ffff:192.168.1.1]:6000",
wantErr: false,
},
// Hostname test cases
{
name: "valid hostname",
input: "8080:example.com:80",
wantListenPort: 8080,
wantTargetAddr: "example.com:80",
wantErr: false,
},
{
name: "valid hostname with subdomain",
input: "443:api.example.com:8443",
wantListenPort: 443,
wantTargetAddr: "api.example.com:8443",
wantErr: false,
},
{
name: "valid localhost hostname",
input: "3000:localhost:3000",
wantListenPort: 3000,
wantTargetAddr: "localhost:3000",
wantErr: false,
},
// Error cases
{
name: "invalid - no colons",
input: "invalid",
wantErr: true,
},
{
name: "invalid - empty string",
input: "",
wantErr: true,
},
{
name: "invalid - non-numeric listen port",
input: "abc:192.168.1.1:80",
wantErr: true,
},
{
name: "invalid - missing target port",
input: "3001:192.168.1.1",
wantErr: true,
},
{
name: "invalid - IPv6 without brackets",
input: "3001:fd70:1452:b736:4dd5:caca:7db9:c588:f5b3:80",
wantErr: true,
},
{
name: "invalid - only listen port",
input: "3001:",
wantErr: true,
},
{
name: "invalid - missing host",
input: "3001::80",
wantErr: true,
},
{
name: "invalid - IPv6 unclosed bracket",
input: "3001:[::1:80",
wantErr: true,
},
{
name: "invalid - listen port zero",
input: "0:192.168.1.1:80",
wantErr: true,
},
{
name: "invalid - listen port negative",
input: "-1:192.168.1.1:80",
wantErr: true,
},
{
name: "invalid - listen port out of range",
input: "70000:192.168.1.1:80",
wantErr: true,
},
{
name: "invalid - empty target port",
input: "3001:192.168.1.1:",
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
listenPort, targetAddr, err := parseTargetString(tt.input)
if (err != nil) != tt.wantErr {
t.Errorf("parseTargetString(%q) error = %v, wantErr %v", tt.input, err, tt.wantErr)
return
}
if tt.wantErr {
return // Don't check other values if we expected an error
}
if listenPort != tt.wantListenPort {
t.Errorf("parseTargetString(%q) listenPort = %d, want %d", tt.input, listenPort, tt.wantListenPort)
}
if targetAddr != tt.wantTargetAddr {
t.Errorf("parseTargetString(%q) targetAddr = %q, want %q", tt.input, targetAddr, tt.wantTargetAddr)
}
})
}
}
// TestParseTargetStringNetDialCompatibility verifies that the output is compatible with net.Dial
func TestParseTargetStringNetDialCompatibility(t *testing.T) {
tests := []struct {
name string
input string
}{
{"IPv4", "8080:127.0.0.1:80"},
{"IPv6 loopback", "8080:[::1]:80"},
{"IPv6 full", "8080:[2001:db8::1]:80"},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
_, targetAddr, err := parseTargetString(tt.input)
if err != nil {
t.Fatalf("parseTargetString(%q) unexpected error: %v", tt.input, err)
}
// Verify the format is valid for net.Dial by checking it can be split back
// This doesn't actually dial, just validates the format
_, _, err = net.SplitHostPort(targetAddr)
if err != nil {
t.Errorf("parseTargetString(%q) produced invalid net.Dial format %q: %v", tt.input, targetAddr, err)
}
})
}
}

View File

@@ -1,7 +1,7 @@
#!/bin/bash #!/bin/sh
# Get Newt - Cross-platform installation script # Get Newt - Cross-platform installation script
# Usage: curl -fsSL https://raw.githubusercontent.com/fosrl/newt/refs/heads/main/get-newt.sh | bash # Usage: curl -fsSL https://raw.githubusercontent.com/fosrl/newt/refs/heads/main/get-newt.sh | sh
set -e set -e
@@ -17,15 +17,15 @@ GITHUB_API_URL="https://api.github.com/repos/${REPO}/releases/latest"
# Function to print colored output # Function to print colored output
print_status() { print_status() {
echo -e "${GREEN}[INFO]${NC} $1" printf '%b[INFO]%b %s\n' "${GREEN}" "${NC}" "$1"
} }
print_warning() { print_warning() {
echo -e "${YELLOW}[WARN]${NC} $1" printf '%b[WARN]%b %s\n' "${YELLOW}" "${NC}" "$1"
} }
print_error() { print_error() {
echo -e "${RED}[ERROR]${NC} $1" printf '%b[ERROR]%b %s\n' "${RED}" "${NC}" "$1"
} }
# Function to get latest version from GitHub API # Function to get latest version from GitHub API
@@ -113,16 +113,34 @@ get_install_dir() {
if [ "$OS" = "windows" ]; then if [ "$OS" = "windows" ]; then
echo "$HOME/bin" echo "$HOME/bin"
else else
# Try to use a directory in PATH, fallback to ~/.local/bin # Prefer /usr/local/bin for system-wide installation
if echo "$PATH" | grep -q "/usr/local/bin"; then echo "/usr/local/bin"
if [ -w "/usr/local/bin" ] 2>/dev/null; then fi
echo "/usr/local/bin" }
else
echo "$HOME/.local/bin" # Check if we need sudo for installation
fi needs_sudo() {
local install_dir="$1"
if [ -w "$install_dir" ] 2>/dev/null; then
return 1 # No sudo needed
else
return 0 # Sudo needed
fi
}
# Get the appropriate command prefix (sudo or empty)
get_sudo_cmd() {
local install_dir="$1"
if needs_sudo "$install_dir"; then
if command -v sudo >/dev/null 2>&1; then
echo "sudo"
else else
echo "$HOME/.local/bin" print_error "Cannot write to ${install_dir} and sudo is not available."
print_error "Please run this script as root or install sudo."
exit 1
fi fi
else
echo ""
fi fi
} }
@@ -130,21 +148,24 @@ get_install_dir() {
install_newt() { install_newt() {
local platform="$1" local platform="$1"
local install_dir="$2" local install_dir="$2"
local sudo_cmd="$3"
local binary_name="newt_${platform}" local binary_name="newt_${platform}"
local exe_suffix="" local exe_suffix=""
# Add .exe suffix for Windows # Add .exe suffix for Windows
if [[ "$platform" == *"windows"* ]]; then case "$platform" in
binary_name="${binary_name}.exe" *windows*)
exe_suffix=".exe" binary_name="${binary_name}.exe"
fi exe_suffix=".exe"
;;
esac
local download_url="${BASE_URL}/${binary_name}" local download_url="${BASE_URL}/${binary_name}"
local temp_file="/tmp/newt${exe_suffix}" local temp_file="/tmp/newt${exe_suffix}"
local final_path="${install_dir}/newt${exe_suffix}" local final_path="${install_dir}/newt${exe_suffix}"
print_status "Downloading newt from ${download_url}" print_status "Downloading newt from ${download_url}"
# Download the binary # Download the binary
if command -v curl >/dev/null 2>&1; then if command -v curl >/dev/null 2>&1; then
curl -fsSL "$download_url" -o "$temp_file" curl -fsSL "$download_url" -o "$temp_file"
@@ -154,18 +175,22 @@ install_newt() {
print_error "Neither curl nor wget is available. Please install one of them." print_error "Neither curl nor wget is available. Please install one of them."
exit 1 exit 1
fi fi
# Make executable before moving
chmod +x "$temp_file"
# Create install directory if it doesn't exist # Create install directory if it doesn't exist
mkdir -p "$install_dir" if [ -n "$sudo_cmd" ]; then
$sudo_cmd mkdir -p "$install_dir"
# Move binary to install directory print_status "Using sudo to install to ${install_dir}"
mv "$temp_file" "$final_path" $sudo_cmd mv "$temp_file" "$final_path"
else
# Make executable (not needed on Windows, but doesn't hurt) mkdir -p "$install_dir"
chmod +x "$final_path" mv "$temp_file" "$final_path"
fi
print_status "newt installed to ${final_path}" print_status "newt installed to ${final_path}"
# Check if install directory is in PATH # Check if install directory is in PATH
if ! echo "$PATH" | grep -q "$install_dir"; then if ! echo "$PATH" | grep -q "$install_dir"; then
print_warning "Install directory ${install_dir} is not in your PATH." print_warning "Install directory ${install_dir} is not in your PATH."
@@ -179,9 +204,9 @@ verify_installation() {
local install_dir="$1" local install_dir="$1"
local exe_suffix="" local exe_suffix=""
if [[ "$PLATFORM" == *"windows"* ]]; then case "$PLATFORM" in
exe_suffix=".exe" *windows*) exe_suffix=".exe" ;;
fi esac
local newt_path="${install_dir}/newt${exe_suffix}" local newt_path="${install_dir}/newt${exe_suffix}"
@@ -198,34 +223,36 @@ verify_installation() {
# Main installation process # Main installation process
main() { main() {
print_status "Installing latest version of newt..." print_status "Installing latest version of newt..."
# Get latest version # Get latest version
print_status "Fetching latest version from GitHub..." print_status "Fetching latest version from GitHub..."
VERSION=$(get_latest_version) VERSION=$(get_latest_version)
print_status "Latest version: v${VERSION}" print_status "Latest version: v${VERSION}"
# Set base URL with the fetched version # Set base URL with the fetched version
BASE_URL="https://github.com/${REPO}/releases/download/${VERSION}" BASE_URL="https://github.com/${REPO}/releases/download/${VERSION}"
# Detect platform # Detect platform
PLATFORM=$(detect_platform) PLATFORM=$(detect_platform)
print_status "Detected platform: ${PLATFORM}" print_status "Detected platform: ${PLATFORM}"
# Get install directory # Get install directory
INSTALL_DIR=$(get_install_dir) INSTALL_DIR=$(get_install_dir)
print_status "Install directory: ${INSTALL_DIR}" print_status "Install directory: ${INSTALL_DIR}"
# Check if we need sudo
SUDO_CMD=$(get_sudo_cmd "$INSTALL_DIR")
if [ -n "$SUDO_CMD" ]; then
print_status "Root privileges required for installation to ${INSTALL_DIR}"
fi
# Install newt # Install newt
install_newt "$PLATFORM" "$INSTALL_DIR" install_newt "$PLATFORM" "$INSTALL_DIR" "$SUDO_CMD"
# Verify installation # Verify installation
if verify_installation "$INSTALL_DIR"; then if verify_installation "$INSTALL_DIR"; then
print_status "newt is ready to use!" print_status "newt is ready to use!"
if [[ "$PLATFORM" == *"windows"* ]]; then print_status "Run 'newt --help' to get started"
print_status "Run 'newt --help' to get started"
else
print_status "Run 'newt --help' to get started"
fi
else else
exit 1 exit 1
fi fi

View File

@@ -618,8 +618,6 @@ func runNewtMain(ctx context.Context) {
var connected bool var connected bool
var wgData WgData var wgData WgData
var dockerEventMonitor *docker.EventMonitor var dockerEventMonitor *docker.EventMonitor
logger.Debug("++++++++++++++++++++++ the port is %d", port)
if !disableClients { if !disableClients {
setupClients(client) setupClients(client)
@@ -1199,7 +1197,7 @@ persistent_keepalive_interval=5`, util.FixKey(privateKey.String()), util.FixKey(
len(syncData.Targets.TCP), len(syncData.Targets.UDP), len(syncData.HealthCheckTargets)) len(syncData.Targets.TCP), len(syncData.Targets.UDP), len(syncData.HealthCheckTargets))
//TODO: TEST AND IMPLEMENT THIS //TODO: TEST AND IMPLEMENT THIS
// // Build sets of desired targets (port -> target string) // // Build sets of desired targets (port -> target string)
// desiredTCP := make(map[int]string) // desiredTCP := make(map[int]string)
// for _, t := range syncData.Targets.TCP { // for _, t := range syncData.Targets.TCP {
@@ -1796,6 +1794,8 @@ persistent_keepalive_interval=5`, util.FixKey(privateKey.String()), util.FixKey(
pm.Stop() pm.Stop()
} }
client.SendMessage("newt/disconnecting", map[string]any{})
if client != nil { if client != nil {
client.Close() client.Close()
} }

View File

@@ -2,6 +2,7 @@ package websocket
import ( import (
"bytes" "bytes"
"compress/gzip"
"crypto/tls" "crypto/tls"
"crypto/x509" "crypto/x509"
"encoding/json" "encoding/json"
@@ -660,7 +661,57 @@ func (c *Client) setupPKCS12TLS() (*tls.Config, error) {
} }
// pingMonitor sends pings at a short interval and triggers reconnect on failure // pingMonitor sends pings at a short interval and triggers reconnect on failure
func (c *Client) sendPing() {
if c.conn == nil {
return
}
// Skip ping if a message is currently being processed
c.processingMux.RLock()
isProcessing := c.processingMessage
c.processingMux.RUnlock()
if isProcessing {
logger.Debug("Skipping ping, message is being processed")
return
}
c.configVersionMux.RLock()
configVersion := c.configVersion
c.configVersionMux.RUnlock()
pingMsg := WSMessage{
Type: "newt/ping",
Data: map[string]interface{}{},
ConfigVersion: configVersion,
}
c.writeMux.Lock()
err := c.conn.WriteJSON(pingMsg)
if err == nil {
telemetry.IncWSMessage(c.metricsContext(), "out", "ping")
}
c.writeMux.Unlock()
if err != nil {
// Check if we're shutting down before logging error and reconnecting
select {
case <-c.done:
// Expected during shutdown
return
default:
logger.Error("Ping failed: %v", err)
telemetry.IncWSKeepaliveFailure(c.metricsContext(), "ping_write")
telemetry.IncWSReconnect(c.metricsContext(), "ping_write")
c.reconnect()
return
}
}
}
func (c *Client) pingMonitor() { func (c *Client) pingMonitor() {
// Send an immediate ping as soon as we connect
c.sendPing()
ticker := time.NewTicker(c.pingInterval) ticker := time.NewTicker(c.pingInterval)
defer ticker.Stop() defer ticker.Stop()
@@ -669,50 +720,7 @@ func (c *Client) pingMonitor() {
case <-c.done: case <-c.done:
return return
case <-ticker.C: case <-ticker.C:
if c.conn == nil { c.sendPing()
return
}
// Skip ping if a message is currently being processed
c.processingMux.RLock()
isProcessing := c.processingMessage
c.processingMux.RUnlock()
if isProcessing {
logger.Debug("Skipping ping, message is being processed")
continue
}
c.configVersionMux.RLock()
configVersion := c.configVersion
c.configVersionMux.RUnlock()
pingMsg := WSMessage{
Type: "newt/ping",
Data: map[string]interface{}{},
ConfigVersion: configVersion,
}
c.writeMux.Lock()
err := c.conn.WriteJSON(pingMsg)
if err == nil {
telemetry.IncWSMessage(c.metricsContext(), "out", "ping")
}
c.writeMux.Unlock()
if err != nil {
// Check if we're shutting down before logging error and reconnecting
select {
case <-c.done:
// Expected during shutdown
return
default:
logger.Error("Ping failed: %v", err)
telemetry.IncWSKeepaliveFailure(c.metricsContext(), "ping_write")
telemetry.IncWSReconnect(c.metricsContext(), "ping_write")
c.reconnect()
return
}
}
} }
} }
} }
@@ -749,10 +757,13 @@ func (c *Client) readPumpWithDisconnectDetection(started time.Time) {
disconnectResult = "success" disconnectResult = "success"
return return
default: default:
var msg WSMessage msgType, p, err := c.conn.ReadMessage()
err := c.conn.ReadJSON(&msg)
if err == nil { if err == nil {
telemetry.IncWSMessage(c.metricsContext(), "in", "text") if msgType == websocket.BinaryMessage {
telemetry.IncWSMessage(c.metricsContext(), "in", "binary")
} else {
telemetry.IncWSMessage(c.metricsContext(), "in", "text")
}
} }
if err != nil { if err != nil {
// Check if we're shutting down before logging error // Check if we're shutting down before logging error
@@ -778,6 +789,29 @@ func (c *Client) readPumpWithDisconnectDetection(started time.Time) {
} }
// Update config version from incoming message // Update config version from incoming message
var data []byte
if msgType == websocket.BinaryMessage {
gr, err := gzip.NewReader(bytes.NewReader(p))
if err != nil {
logger.Error("WebSocket failed to create gzip reader: %v", err)
continue
}
data, err = io.ReadAll(gr)
gr.Close()
if err != nil {
logger.Error("WebSocket failed to decompress message: %v", err)
continue
}
} else {
data = p
}
var msg WSMessage
if err = json.Unmarshal(data, &msg); err != nil {
logger.Error("WebSocket failed to parse message: %v", err)
continue
}
c.setConfigVersion(msg.ConfigVersion) c.setConfigVersion(msg.ConfigVersion)
c.handlersMux.RLock() c.handlersMux.RLock()