Compare commits

..

1 Commits

Author SHA1 Message Date
Laurence
c7d9c72f29 Add HTTP client reuse and buffer pooling for performance
- Add reusable HTTP client with connection pooling for API requests
- Add sync.Pool for 32KB buffers used in connection piping
- Clear buffers before returning to pool to prevent data leakage
- Reduces GC pressure and improves throughput under load
2026-03-13 15:28:04 +00:00

View File

@@ -18,7 +18,6 @@ import (
"github.com/fosrl/gerbil/logger"
"github.com/patrickmn/go-cache"
"golang.org/x/sync/errgroup"
)
// RouteRecord represents a routing configuration
@@ -70,12 +69,16 @@ type SNIProxy struct {
// Trusted upstream proxies that can send PROXY protocol
trustedUpstreams map[string]struct{}
// Reusable HTTP client for API requests
httpClient *http.Client
// Buffer pool for connection piping
bufferPool *sync.Pool
}
type activeTunnel struct {
ctx context.Context
cancel context.CancelFunc
count int // protected by activeTunnelsLock
conns []net.Conn
}
// readOnlyConn is a wrapper for io.Reader that implements net.Conn
@@ -377,6 +380,20 @@ func NewSNIProxy(port int, remoteConfigURL, publicKey, localProxyAddr string, lo
localOverrides: overridesMap,
activeTunnels: make(map[string]*activeTunnel),
trustedUpstreams: trustedMap,
httpClient: &http.Client{
Timeout: 5 * time.Second,
Transport: &http.Transport{
MaxIdleConns: 100,
MaxIdleConnsPerHost: 10,
IdleConnTimeout: 90 * time.Second,
},
},
bufferPool: &sync.Pool{
New: func() interface{} {
buf := make([]byte, 32*1024)
return &buf
},
},
}
return proxy, nil
@@ -591,32 +608,37 @@ func (p *SNIProxy) handleConnection(clientConn net.Conn) {
}
}
// Track this tunnel by SNI using context for cancellation
// Track this tunnel by SNI
p.activeTunnelsLock.Lock()
tunnel, ok := p.activeTunnels[hostname]
if !ok {
ctx, cancel := context.WithCancel(p.ctx)
tunnel = &activeTunnel{ctx: ctx, cancel: cancel}
tunnel = &activeTunnel{}
p.activeTunnels[hostname] = tunnel
}
tunnel.count++
tunnelCtx := tunnel.ctx
tunnel.conns = append(tunnel.conns, actualClientConn)
p.activeTunnelsLock.Unlock()
defer func() {
// Remove this conn from active tunnels
p.activeTunnelsLock.Lock()
tunnel.count--
if tunnel.count == 0 {
tunnel.cancel()
if p.activeTunnels[hostname] == tunnel {
if tunnel, ok := p.activeTunnels[hostname]; ok {
newConns := make([]net.Conn, 0, len(tunnel.conns))
for _, c := range tunnel.conns {
if c != actualClientConn {
newConns = append(newConns, c)
}
}
if len(newConns) == 0 {
delete(p.activeTunnels, hostname)
} else {
tunnel.conns = newConns
}
}
p.activeTunnelsLock.Unlock()
}()
// Start bidirectional data transfer with tunnel context
p.pipe(tunnelCtx, actualClientConn, targetConn, clientReader)
// Start bidirectional data transfer
p.pipe(actualClientConn, targetConn, clientReader)
}
// getRoute retrieves routing information for a hostname
@@ -679,9 +701,8 @@ func (p *SNIProxy) getRoute(hostname, clientAddr string) (*RouteRecord, error) {
}
req.Header.Set("Content-Type", "application/json")
// Make HTTP request
client := &http.Client{Timeout: 5 * time.Second}
resp, err := client.Do(req)
// Make HTTP request using reusable client
resp, err := p.httpClient.Do(req)
if err != nil {
return nil, fmt.Errorf("API request failed: %w", err)
}
@@ -752,36 +773,59 @@ func (p *SNIProxy) selectStickyEndpoint(clientAddr string, endpoints []string) s
}
// pipe handles bidirectional data transfer between connections
func (p *SNIProxy) pipe(ctx context.Context, clientConn, targetConn net.Conn, clientReader io.Reader) {
g, gCtx := errgroup.WithContext(ctx)
func (p *SNIProxy) pipe(clientConn, targetConn net.Conn, clientReader io.Reader) {
var wg sync.WaitGroup
wg.Add(2)
// Close connections when context cancels to unblock io.Copy operations
context.AfterFunc(gCtx, func() {
clientConn.Close()
targetConn.Close()
})
// closeOnce ensures we only close connections once
var closeOnce sync.Once
closeConns := func() {
closeOnce.Do(func() {
// Close both connections to unblock any pending reads
clientConn.Close()
targetConn.Close()
})
}
// Copy data from client to target
g.Go(func() error {
buf := make([]byte, 32*1024)
_, err := io.CopyBuffer(targetConn, clientReader, buf)
// Copy data from client to target (using the buffered reader)
go func() {
defer wg.Done()
defer closeConns()
// Get buffer from pool and return when done
bufPtr := p.bufferPool.Get().(*[]byte)
defer func() {
// Clear buffer before returning to pool to prevent data leakage
clear(*bufPtr)
p.bufferPool.Put(bufPtr)
}()
_, err := io.CopyBuffer(targetConn, clientReader, *bufPtr)
if err != nil && err != io.EOF {
logger.Debug("Copy client->target error: %v", err)
}
return err
})
}()
// Copy data from target to client
g.Go(func() error {
buf := make([]byte, 32*1024)
_, err := io.CopyBuffer(clientConn, targetConn, buf)
go func() {
defer wg.Done()
defer closeConns()
// Get buffer from pool and return when done
bufPtr := p.bufferPool.Get().(*[]byte)
defer func() {
// Clear buffer before returning to pool to prevent data leakage
clear(*bufPtr)
p.bufferPool.Put(bufPtr)
}()
_, err := io.CopyBuffer(clientConn, targetConn, *bufPtr)
if err != nil && err != io.EOF {
logger.Debug("Copy target->client error: %v", err)
}
return err
})
}()
g.Wait()
wg.Wait()
}
// GetCacheStats returns cache statistics
@@ -817,14 +861,16 @@ func (p *SNIProxy) UpdateLocalSNIs(fullDomains []string) {
logger.Debug("Updated local SNIs, added %d, removed %d", len(newSNIs), len(removed))
// Terminate tunnels for removed SNIs via context cancellation
// Terminate tunnels for removed SNIs
if len(removed) > 0 {
p.activeTunnelsLock.Lock()
for _, sni := range removed {
if tunnel, ok := p.activeTunnels[sni]; ok {
tunnel.cancel()
if tunnels, ok := p.activeTunnels[sni]; ok {
for _, conn := range tunnels.conns {
conn.Close()
}
delete(p.activeTunnels, sni)
logger.Debug("Cancelled tunnel context for SNI target change: %s", sni)
logger.Debug("Closed tunnels for SNI target change: %s", sni)
}
}
p.activeTunnelsLock.Unlock()