mirror of
https://github.com/fosrl/newt.git
synced 2026-05-08 08:58:10 -05:00
Compare commits
14 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
542c70b326 | ||
|
|
663e98af60 | ||
|
|
901ec71baf | ||
|
|
9bc0204f57 | ||
|
|
1e77b09e3b | ||
|
|
74fd3f3aa3 | ||
|
|
e8dc19a62b | ||
|
|
9ff32b8a8b | ||
|
|
9edaac9c11 | ||
|
|
ced87b1d5e | ||
|
|
3aaebe64fb | ||
|
|
27f7ca6bb9 | ||
|
|
5090907307 | ||
|
|
a6533b3fa0 |
2
.github/workflows/cicd.yml
vendored
2
.github/workflows/cicd.yml
vendored
@@ -764,7 +764,7 @@ jobs:
|
||||
cosign public-key --key env://COSIGN_PRIVATE_KEY >/dev/null
|
||||
|
||||
- name: Generate SBOM (SPDX JSON) from GHCR digest
|
||||
uses: aquasecurity/trivy-action@57a97c7e7821a5776cebc9bb87c984fa69cba8f1 # v0.35.0
|
||||
uses: aquasecurity/trivy-action@ed142fd0673e97e23eac54620cfb913e5ce36c25 # v0.36.0
|
||||
with:
|
||||
image-ref: ${{ env.GHCR_REF }}
|
||||
format: spdx-json
|
||||
|
||||
82
common.go
82
common.go
@@ -208,6 +208,7 @@ func pingWithRetry(tnet *netstack.Net, dst string, timeout time.Duration) (stopC
|
||||
logger.Warn(msgHealthFileWriteFailed, err)
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
case <-pingStopChan:
|
||||
// Stop the goroutine when signaled
|
||||
@@ -220,6 +221,25 @@ func pingWithRetry(tnet *netstack.Net, dst string, timeout time.Duration) (stopC
|
||||
return stopChan, fmt.Errorf("initial ping attempts failed, continuing in background")
|
||||
}
|
||||
|
||||
// shouldFireRecovery decides whether the data-plane recovery flow in
|
||||
// startPingCheck should run on this tick. Recovery fires once when the
|
||||
// consecutive-failure counter first crosses the threshold; the connectionLost
|
||||
// flag prevents re-firing until a successful ping resets the state.
|
||||
//
|
||||
// This condition was previously inlined into startPingCheck and AND-ed with
|
||||
// `currentInterval < maxInterval`, which silently broke recovery once
|
||||
// pingInterval's default was bumped to 15s while maxInterval stayed at 6s
|
||||
// (commit 8161fa6, March 2026): the gate became permanently false on default
|
||||
// settings, so the recovery code never executed and ping failures climbed
|
||||
// forever — the proximate cause of fosrl/newt#284, #310 and pangolin#1004.
|
||||
//
|
||||
// Recovery and backoff are independent concerns; the backoff ramp is now
|
||||
// computed separately in the caller. Do not re-introduce currentInterval
|
||||
// here.
|
||||
func shouldFireRecovery(consecutiveFailures, failureThreshold int, connectionLost bool) bool {
|
||||
return consecutiveFailures >= failureThreshold && !connectionLost
|
||||
}
|
||||
|
||||
func startPingCheck(tnet *netstack.Net, serverIP string, client *websocket.Client, tunnelID string) chan struct{} {
|
||||
maxInterval := 6 * time.Second
|
||||
currentInterval := pingInterval
|
||||
@@ -279,42 +299,44 @@ func startPingCheck(tnet *netstack.Net, serverIP string, client *websocket.Clien
|
||||
|
||||
// More lenient threshold for declaring connection lost under load
|
||||
failureThreshold := 4
|
||||
if consecutiveFailures >= failureThreshold && currentInterval < maxInterval {
|
||||
if !connectionLost {
|
||||
connectionLost = true
|
||||
logger.Warn("Connection to server lost after %d failures. Continuous reconnection attempts will be made.", consecutiveFailures)
|
||||
if tunnelID != "" {
|
||||
telemetry.IncReconnect(context.Background(), tunnelID, "client", telemetry.ReasonTimeout)
|
||||
}
|
||||
pingChainId := generateChainId()
|
||||
pendingPingChainId = pingChainId
|
||||
stopFunc = client.SendMessageInterval("newt/ping/request", map[string]interface{}{
|
||||
"chainId": pingChainId,
|
||||
}, 3*time.Second)
|
||||
// Send registration message to the server for backward compatibility
|
||||
bcChainId := generateChainId()
|
||||
pendingRegisterChainId = bcChainId
|
||||
err := client.SendMessage("newt/wg/register", map[string]interface{}{
|
||||
"publicKey": publicKey.String(),
|
||||
"backwardsCompatible": true,
|
||||
"chainId": bcChainId,
|
||||
})
|
||||
if shouldFireRecovery(consecutiveFailures, failureThreshold, connectionLost) {
|
||||
connectionLost = true
|
||||
logger.Warn("Connection to server lost after %d failures. Continuous reconnection attempts will be made.", consecutiveFailures)
|
||||
if tunnelID != "" {
|
||||
telemetry.IncReconnect(context.Background(), tunnelID, "client", telemetry.ReasonTimeout)
|
||||
}
|
||||
pingChainId := generateChainId()
|
||||
pendingPingChainId = pingChainId
|
||||
stopFunc = client.SendMessageInterval("newt/ping/request", map[string]interface{}{
|
||||
"chainId": pingChainId,
|
||||
}, 3*time.Second)
|
||||
// Send registration message to the server for backward compatibility
|
||||
bcChainId := generateChainId()
|
||||
pendingRegisterChainId = bcChainId
|
||||
err := client.SendMessage("newt/wg/register", map[string]interface{}{
|
||||
"publicKey": publicKey.String(),
|
||||
"backwardsCompatible": true,
|
||||
"chainId": bcChainId,
|
||||
})
|
||||
if err != nil {
|
||||
logger.Error("Failed to send registration message: %v", err)
|
||||
}
|
||||
if healthFile != "" {
|
||||
err = os.Remove(healthFile)
|
||||
if err != nil {
|
||||
logger.Error("Failed to send registration message: %v", err)
|
||||
}
|
||||
if healthFile != "" {
|
||||
err = os.Remove(healthFile)
|
||||
if err != nil {
|
||||
logger.Error("Failed to remove health file: %v", err)
|
||||
}
|
||||
logger.Error("Failed to remove health file: %v", err)
|
||||
}
|
||||
}
|
||||
currentInterval = time.Duration(float64(currentInterval) * 1.3) // Slower increase
|
||||
}
|
||||
// Backoff: ramp the periodic-ping interval up while we are
|
||||
// past the failure threshold, capped at maxInterval. Kept
|
||||
// independent of the recovery trigger above so the trigger
|
||||
// fires on every outage regardless of pingInterval.
|
||||
if consecutiveFailures >= failureThreshold && currentInterval < maxInterval {
|
||||
currentInterval = time.Duration(float64(currentInterval) * 1.3)
|
||||
if currentInterval > maxInterval {
|
||||
currentInterval = maxInterval
|
||||
}
|
||||
ticker.Reset(currentInterval)
|
||||
logger.Debug("Increased ping check interval to %v due to consecutive failures", currentInterval)
|
||||
}
|
||||
} else {
|
||||
// Track recent latencies
|
||||
|
||||
@@ -210,3 +210,42 @@ func TestParseTargetStringNetDialCompatibility(t *testing.T) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// TestShouldFireRecovery is the regression guard for the broken trigger gate
|
||||
// that prevented data-plane recovery from ever firing under default settings
|
||||
// (fosrl/newt#284, #310, pangolin#1004). The pre-fix condition was
|
||||
//
|
||||
// consecutiveFailures >= failureThreshold && currentInterval < maxInterval
|
||||
//
|
||||
// which became permanently false once pingInterval's default was bumped from
|
||||
// 3s to 15s in commit 8161fa6 — currentInterval starts at pingInterval=15s,
|
||||
// maxInterval stayed at 6s, so 15<6 is false and the recovery branch never
|
||||
// executed.
|
||||
//
|
||||
// The fix is to drop currentInterval from the trigger condition entirely;
|
||||
// backoff is a separate concern computed in the caller. The cases below
|
||||
// exercise the documented contract.
|
||||
func TestShouldFireRecovery(t *testing.T) {
|
||||
const threshold = 4
|
||||
cases := []struct {
|
||||
name string
|
||||
failures int
|
||||
connectionLost bool
|
||||
want bool
|
||||
}{
|
||||
{"below threshold, fresh", 3, false, false},
|
||||
{"below threshold, already lost", 3, true, false},
|
||||
{"at threshold, fresh — recovery must fire", threshold, false, true},
|
||||
{"at threshold, already lost — gate prevents re-fire", threshold, true, false},
|
||||
{"far above threshold, fresh", 100, false, true},
|
||||
{"far above threshold, already lost", 100, true, false},
|
||||
}
|
||||
for _, c := range cases {
|
||||
t.Run(c.name, func(t *testing.T) {
|
||||
if got := shouldFireRecovery(c.failures, threshold, c.connectionLost); got != c.want {
|
||||
t.Errorf("shouldFireRecovery(failures=%d, threshold=%d, lost=%v) = %v, want %v",
|
||||
c.failures, threshold, c.connectionLost, got, c.want)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -25,7 +25,7 @@
|
||||
inherit (pkgs) lib;
|
||||
|
||||
# Update version when releasing
|
||||
version = "1.11.0";
|
||||
version = "1.12.4";
|
||||
in
|
||||
{
|
||||
default = self.packages.${system}.pangolin-newt;
|
||||
@@ -35,7 +35,7 @@
|
||||
inherit version;
|
||||
src = pkgs.nix-gitignore.gitignoreSource [ ] ./.;
|
||||
|
||||
vendorHash = "sha256-+zMSzNbqmWm/DXL2xMUd5uPP5tSIybsRokwJ2zd0pf0=";
|
||||
vendorHash = "sha256-WfIK+Q8WQ372NzLw6DRapv1nYPduShi4KnVJBPk0Oz0=";
|
||||
|
||||
nativeInstallCheckInputs = [ pkgs.versionCheckHook ];
|
||||
|
||||
|
||||
14
go.mod
14
go.mod
@@ -17,14 +17,14 @@ require (
|
||||
go.opentelemetry.io/otel/metric v1.43.0
|
||||
go.opentelemetry.io/otel/sdk v1.43.0
|
||||
go.opentelemetry.io/otel/sdk/metric v1.43.0
|
||||
golang.org/x/crypto v0.49.0
|
||||
golang.org/x/crypto v0.50.0
|
||||
golang.org/x/exp v0.0.0-20251113190631-e25ba8c21ef6
|
||||
golang.org/x/net v0.52.0
|
||||
golang.org/x/sys v0.42.0
|
||||
golang.org/x/net v0.53.0
|
||||
golang.org/x/sys v0.43.0
|
||||
golang.zx2c4.com/wireguard v0.0.0-20250521234502-f333402bd9cb
|
||||
golang.zx2c4.com/wireguard/wgctrl v0.0.0-20241231184526-a9ab2273dd10
|
||||
golang.zx2c4.com/wireguard/windows v0.5.3
|
||||
google.golang.org/grpc v1.80.0
|
||||
google.golang.org/grpc v1.81.0
|
||||
gopkg.in/yaml.v3 v3.0.1
|
||||
gvisor.dev/gvisor v0.0.0-20250503011706-39ed1f5ac29c
|
||||
software.sslmate.com/src/go-pkcs12 v0.7.0
|
||||
@@ -65,11 +65,11 @@ require (
|
||||
go.opentelemetry.io/otel/trace v1.43.0 // indirect
|
||||
go.opentelemetry.io/proto/otlp v1.10.0 // indirect
|
||||
go.yaml.in/yaml/v2 v2.4.4 // indirect
|
||||
golang.org/x/mod v0.33.0 // indirect
|
||||
golang.org/x/mod v0.34.0 // indirect
|
||||
golang.org/x/sync v0.20.0 // indirect
|
||||
golang.org/x/text v0.35.0 // indirect
|
||||
golang.org/x/text v0.36.0 // indirect
|
||||
golang.org/x/time v0.12.0 // indirect
|
||||
golang.org/x/tools v0.42.0 // indirect
|
||||
golang.org/x/tools v0.43.0 // indirect
|
||||
golang.zx2c4.com/wintun v0.0.0-20230126152724-0fa3db229ce2 // indirect
|
||||
google.golang.org/genproto/googleapis/api v0.0.0-20260401024825-9d38bb4040a9 // indirect
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20260401024825-9d38bb4040a9 // indirect
|
||||
|
||||
28
go.sum
28
go.sum
@@ -125,26 +125,26 @@ go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
|
||||
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
|
||||
go.yaml.in/yaml/v2 v2.4.4 h1:tuyd0P+2Ont/d6e2rl3be67goVK4R6deVxCUX5vyPaQ=
|
||||
go.yaml.in/yaml/v2 v2.4.4/go.mod h1:gMZqIpDtDqOfM0uNfy0SkpRhvUryYH0Z6wdMYcacYXQ=
|
||||
golang.org/x/crypto v0.49.0 h1:+Ng2ULVvLHnJ/ZFEq4KdcDd/cfjrrjjNSXNzxg0Y4U4=
|
||||
golang.org/x/crypto v0.49.0/go.mod h1:ErX4dUh2UM+CFYiXZRTcMpEcN8b/1gxEuv3nODoYtCA=
|
||||
golang.org/x/crypto v0.50.0 h1:zO47/JPrL6vsNkINmLoo/PH1gcxpls50DNogFvB5ZGI=
|
||||
golang.org/x/crypto v0.50.0/go.mod h1:3muZ7vA7PBCE6xgPX7nkzzjiUq87kRItoJQM1Yo8S+Q=
|
||||
golang.org/x/exp v0.0.0-20251113190631-e25ba8c21ef6 h1:zfMcR1Cs4KNuomFFgGefv5N0czO2XZpUbxGUy8i8ug0=
|
||||
golang.org/x/exp v0.0.0-20251113190631-e25ba8c21ef6/go.mod h1:46edojNIoXTNOhySWIWdix628clX9ODXwPsQuG6hsK0=
|
||||
golang.org/x/mod v0.33.0 h1:tHFzIWbBifEmbwtGz65eaWyGiGZatSrT9prnU8DbVL8=
|
||||
golang.org/x/mod v0.33.0/go.mod h1:swjeQEj+6r7fODbD2cqrnje9PnziFuw4bmLbBZFrQ5w=
|
||||
golang.org/x/net v0.52.0 h1:He/TN1l0e4mmR3QqHMT2Xab3Aj3L9qjbhRm78/6jrW0=
|
||||
golang.org/x/net v0.52.0/go.mod h1:R1MAz7uMZxVMualyPXb+VaqGSa3LIaUqk0eEt3w36Sw=
|
||||
golang.org/x/mod v0.34.0 h1:xIHgNUUnW6sYkcM5Jleh05DvLOtwc6RitGHbDk4akRI=
|
||||
golang.org/x/mod v0.34.0/go.mod h1:ykgH52iCZe79kzLLMhyCUzhMci+nQj+0XkbXpNYtVjY=
|
||||
golang.org/x/net v0.53.0 h1:d+qAbo5L0orcWAr0a9JweQpjXF19LMXJE8Ey7hwOdUA=
|
||||
golang.org/x/net v0.53.0/go.mod h1:JvMuJH7rrdiCfbeHoo3fCQU24Lf5JJwT9W3sJFulfgs=
|
||||
golang.org/x/sync v0.20.0 h1:e0PTpb7pjO8GAtTs2dQ6jYa5BWYlMuX047Dco/pItO4=
|
||||
golang.org/x/sync v0.20.0/go.mod h1:9xrNwdLfx4jkKbNva9FpL6vEN7evnE43NNNJQ2LF3+0=
|
||||
golang.org/x/sys v0.2.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.10.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.42.0 h1:omrd2nAlyT5ESRdCLYdm3+fMfNFE/+Rf4bDIQImRJeo=
|
||||
golang.org/x/sys v0.42.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw=
|
||||
golang.org/x/text v0.35.0 h1:JOVx6vVDFokkpaq1AEptVzLTpDe9KGpj5tR4/X+ybL8=
|
||||
golang.org/x/text v0.35.0/go.mod h1:khi/HExzZJ2pGnjenulevKNX1W67CUy0AsXcNubPGCA=
|
||||
golang.org/x/sys v0.43.0 h1:Rlag2XtaFTxp19wS8MXlJwTvoh8ArU6ezoyFsMyCTNI=
|
||||
golang.org/x/sys v0.43.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw=
|
||||
golang.org/x/text v0.36.0 h1:JfKh3XmcRPqZPKevfXVpI1wXPTqbkE5f7JA92a55Yxg=
|
||||
golang.org/x/text v0.36.0/go.mod h1:NIdBknypM8iqVmPiuco0Dh6P5Jcdk8lJL0CUebqK164=
|
||||
golang.org/x/time v0.12.0 h1:ScB/8o8olJvc+CQPWrK3fPZNfh7qgwCrY0zJmoEQLSE=
|
||||
golang.org/x/time v0.12.0/go.mod h1:CDIdPxbZBQxdj6cxyCIdrNogrJKMJ7pr37NYpMcMDSg=
|
||||
golang.org/x/tools v0.42.0 h1:uNgphsn75Tdz5Ji2q36v/nsFSfR/9BRFvqhGBaJGd5k=
|
||||
golang.org/x/tools v0.42.0/go.mod h1:Ma6lCIwGZvHK6XtgbswSoWroEkhugApmsXyrUmBhfr0=
|
||||
golang.org/x/tools v0.43.0 h1:12BdW9CeB3Z+J/I/wj34VMl8X+fEXBxVR90JeMX5E7s=
|
||||
golang.org/x/tools v0.43.0/go.mod h1:uHkMso649BX2cZK6+RpuIPXS3ho2hZo4FVwfoy1vIk0=
|
||||
golang.zx2c4.com/wintun v0.0.0-20230126152724-0fa3db229ce2 h1:B82qJJgjvYKsXS9jeunTOisW56dUokqW/FOteYJJ/yg=
|
||||
golang.zx2c4.com/wintun v0.0.0-20230126152724-0fa3db229ce2/go.mod h1:deeaetjYA+DHMHg+sMSMI58GrEteJUUzzw7en6TJQcI=
|
||||
golang.zx2c4.com/wireguard v0.0.0-20250521234502-f333402bd9cb h1:whnFRlWMcXI9d+ZbWg+4sHnLp52d5yiIPUxMBSt4X9A=
|
||||
@@ -159,8 +159,8 @@ google.golang.org/genproto/googleapis/api v0.0.0-20260401024825-9d38bb4040a9 h1:
|
||||
google.golang.org/genproto/googleapis/api v0.0.0-20260401024825-9d38bb4040a9/go.mod h1:7QBABkRtR8z+TEnmXTqIqwJLlzrZKVfAUm7tY3yGv0M=
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20260401024825-9d38bb4040a9 h1:m8qni9SQFH0tJc1X0vmnpw/0t+AImlSvp30sEupozUg=
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20260401024825-9d38bb4040a9/go.mod h1:4Hqkh8ycfw05ld/3BWL7rJOSfebL2Q+DVDeRgYgxUU8=
|
||||
google.golang.org/grpc v1.80.0 h1:Xr6m2WmWZLETvUNvIUmeD5OAagMw3FiKmMlTdViWsHM=
|
||||
google.golang.org/grpc v1.80.0/go.mod h1:ho/dLnxwi3EDJA4Zghp7k2Ec1+c2jqup0bFkw07bwF4=
|
||||
google.golang.org/grpc v1.81.0 h1:W3G9N3KQf3BU+YuCtGKJk0CmxQNbAISICD/9AORxLIw=
|
||||
google.golang.org/grpc v1.81.0/go.mod h1:xGH9GfzOyMTGIOXBJmXt+BX/V0kcdQbdcuwQ/zNw42I=
|
||||
google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBNVE=
|
||||
google.golang.org/protobuf v1.36.11/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
|
||||
@@ -356,16 +356,16 @@ func (h *HTTPHandler) handleRequest(w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
|
||||
// If the rule is plain HTTP but has a TLS certificate configured, redirect
|
||||
// the client to the HTTPS equivalent of the requested URL.
|
||||
if rule.Protocol == "http" && rule.TLSCert != "" && rule.TLSKey != "" {
|
||||
// If the rule is HTTPS and a TLS certificate is configured, but the
|
||||
// incoming request arrived over plain HTTP, redirect to HTTPS.
|
||||
if rule.Protocol == "https" && rule.TLSCert != "" && rule.TLSKey != "" && r.TLS == nil {
|
||||
host := r.Host
|
||||
if host == "" {
|
||||
host = r.URL.Host
|
||||
}
|
||||
httpsURL := "https://" + host + r.RequestURI
|
||||
logger.Info("HTTP handler: redirecting %s %s -> %s (TLS cert present)", r.Method, r.URL.RequestURI(), httpsURL)
|
||||
http.Redirect(w, r, httpsURL, http.StatusMovedPermanently)
|
||||
http.Redirect(w, r, httpsURL, http.StatusPermanentRedirect)
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
@@ -572,6 +572,18 @@ func (p *ProxyHandler) HandleIncomingPacket(packet []byte) bool {
|
||||
|
||||
// Store destination rewrite for handler lookups
|
||||
p.destRewriteTable[dKey] = newDst
|
||||
|
||||
// Also store the resource ID under the rewritten destination key so that
|
||||
// TCP/UDP handlers can find it after DNAT (they see the post-NAT dst IP).
|
||||
if matchedRule.ResourceId != 0 {
|
||||
rewrittenKey := destKey{
|
||||
srcIP: srcAddr.String(),
|
||||
dstIP: newDst.String(),
|
||||
dstPort: dstPort,
|
||||
proto: uint8(protocol),
|
||||
}
|
||||
p.resourceTable[rewrittenKey] = matchedRule.ResourceId
|
||||
}
|
||||
p.natMu.Unlock()
|
||||
logger.Debug("New NAT entry for connection: %s -> %s", dstAddr, newDst)
|
||||
}
|
||||
|
||||
@@ -48,7 +48,7 @@ type Client struct {
|
||||
metricsCtx context.Context
|
||||
configNeedsSave bool // Flag to track if config needs to be saved
|
||||
serverVersion string
|
||||
configVersion int64 // Latest config version received from server
|
||||
configVersion int64 // Latest config version received from server
|
||||
configVersionMux sync.RWMutex
|
||||
processingMessage bool // Flag to track if a message is currently being processed
|
||||
processingMux sync.RWMutex // Protects processingMessage
|
||||
@@ -271,13 +271,17 @@ func (c *Client) SendMessageInterval(messageType string, data interface{}, inter
|
||||
stopChan := make(chan struct{})
|
||||
go func() {
|
||||
count := 0
|
||||
maxAttempts := 10
|
||||
maxAttempts := 16
|
||||
|
||||
c.reconnectMux.RLock()
|
||||
connected := c.isConnected
|
||||
c.reconnectMux.RUnlock()
|
||||
err := c.SendMessage(messageType, data) // Send immediately
|
||||
if err != nil {
|
||||
logger.Error("Failed to send initial message: %v", err)
|
||||
} else if connected {
|
||||
count++
|
||||
}
|
||||
count++
|
||||
|
||||
ticker := time.NewTicker(interval)
|
||||
defer ticker.Stop()
|
||||
@@ -288,11 +292,15 @@ func (c *Client) SendMessageInterval(messageType string, data interface{}, inter
|
||||
logger.Info("SendMessageInterval timed out after %d attempts for message type: %s", maxAttempts, messageType)
|
||||
return
|
||||
}
|
||||
c.reconnectMux.RLock()
|
||||
connected = c.isConnected
|
||||
c.reconnectMux.RUnlock()
|
||||
err = c.SendMessage(messageType, data)
|
||||
if err != nil {
|
||||
logger.Error("Failed to send message: %v", err)
|
||||
} else if connected {
|
||||
count++
|
||||
}
|
||||
count++
|
||||
case <-stopChan:
|
||||
return
|
||||
}
|
||||
@@ -836,7 +844,7 @@ func (c *Client) readPumpWithDisconnectDetection(started time.Time) {
|
||||
logger.Error("WebSocket failed to parse message: %v", err)
|
||||
continue
|
||||
}
|
||||
|
||||
|
||||
c.setConfigVersion(msg.ConfigVersion)
|
||||
|
||||
c.handlersMux.RLock()
|
||||
|
||||
Reference in New Issue
Block a user