-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Client probability returns an error about ErrConnectionClosed #1892
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
I'm not sure I understand what the issue is here, can you post a reproducible example? |
client->ngnix->server. |
func HttpPostJson(nginxUrl string, data interface{}) (respBody []byte, err error) {
} func ReceiveRequest(data interface{}) { |
Please post the server-side processing code and Nginx configuration together. |
Hello, I've encountered this issue many many times, I am developing a tool that also sends concurrent requests (1-10k req/s) and I've always wanted to write a test case to reproduce the error:
One day I managed to reproduced it somehow. Use my test server: package main
import (
"bufio"
"crypto/rand"
"crypto/rsa"
"crypto/tls"
"crypto/x509"
"crypto/x509/pkix"
"flag"
"fmt"
"io"
"log"
"math/big"
"net"
"os"
"strings"
"sync"
"time"
)
// To trigger timeout errors when testing: ./http-echo-server.exe -port 80 -tlsport 443 -v -template timeout -timeout 5000
// To trigger server closed connection before returning first byte: ./http-echo-server.exe -port 80 -tlsport 443 -v -template timeout -timeout 200
const (
colorReset = "\033[0m"
colorGreen = "\033[32m"
colorYellow = "\033[33m"
colorWhite = "\033[37m"
)
var verbose bool
func generateTLSConfig() (*tls.Config, error) {
privateKey, err := rsa.GenerateKey(rand.Reader, 2048)
if err != nil {
return nil, fmt.Errorf("failed to generate private key: %v", err)
}
template := x509.Certificate{
SerialNumber: big.NewInt(1),
Subject: pkix.Name{
Organization: []string{"HTTP Echo Server"},
},
NotBefore: time.Now(),
NotAfter: time.Now().Add(time.Hour * 24 * 365), // 1 year
KeyUsage: x509.KeyUsageKeyEncipherment | x509.KeyUsageDigitalSignature,
ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth},
BasicConstraintsValid: true,
IPAddresses: []net.IP{net.ParseIP("127.0.0.1")},
DNSNames: []string{"localhost"},
}
certDER, err := x509.CreateCertificate(rand.Reader, &template, &template, &privateKey.PublicKey, privateKey)
if err != nil {
return nil, fmt.Errorf("failed to create certificate: %v", err)
}
cert := tls.Certificate{
Certificate: [][]byte{certDER},
PrivateKey: privateKey,
}
return &tls.Config{
Certificates: []tls.Certificate{cert},
}, nil
}
func main() {
log.SetFlags(log.Lshortfile)
// cli args
dumpFlag := flag.String("dump", "", "Dump incoming request to a file")
portFlag := flag.String("port", "", "HTTP listening port")
tlsPortFlag := flag.String("tlsport", "", "HTTPS/TLS listening port")
verboseFlag := flag.Bool("v", false, "Display request with special characters")
templateFlag := flag.String("template", "echo", "Response template (echo, timeout)")
timeoutFlag := flag.Int("timeout", 200, "Timeout to close connection (ms)")
helpFlag := flag.Bool("h", false, "Show help")
// helper
flag.Usage = func() {
fmt.Fprintf(os.Stderr, "Usage: http-echo-server [flags]\n\n")
fmt.Fprintf(os.Stderr, "Echo server accepting malformed HTTP requests\n\n")
fmt.Fprintf(os.Stderr, "Flags:\n")
flag.PrintDefaults()
fmt.Fprintf(os.Stderr, "\nExamples:\n")
fmt.Fprintf(os.Stderr, " HTTP only: http-echo-server -port 8888\n")
fmt.Fprintf(os.Stderr, " HTTPS only: http-echo-server -tlsport 8443\n")
fmt.Fprintf(os.Stderr, " Both HTTP and HTTPS: http-echo-server -port 8888 -tlsport 8443\n")
fmt.Fprintf(os.Stderr, " Dump request to file: http-echo-server -port 8888 -d request.txt\n")
fmt.Fprintf(os.Stderr, " Show special chars: http-echo-server -port 8888 -v\n")
fmt.Fprintf(os.Stderr, " Timeout template: http-echo-server -port 8888 -template timeout\n")
fmt.Fprintf(os.Stderr, " Timeout template: http-echo-server -port 8888 -template timeout -timeout 5000\n")
}
flag.Parse()
if *helpFlag {
flag.Usage()
os.Exit(0)
}
if *templateFlag != "echo" && *templateFlag != "timeout" {
log.Fatal("Template must be either 'echo' or 'timeout'")
}
if *portFlag == "" && *tlsPortFlag == "" {
log.Fatal("At least one of -port or -tlsport must be specified")
}
verbose = *verboseFlag
var wg sync.WaitGroup
if *portFlag != "" {
wg.Add(1)
go func() {
defer wg.Done()
httpPort := fmt.Sprintf(":%s", *portFlag)
ln, err := net.Listen("tcp", httpPort)
if err != nil {
log.Fatalf("Failed to start HTTP listener: %v", err)
}
defer ln.Close()
log.Printf("HTTP Server listening on %s", httpPort)
for {
conn, err := ln.Accept()
if err != nil {
log.Printf("Failed to accept HTTP connection: %v", err)
continue
}
conn.SetDeadline(time.Now().Add(time.Duration(*timeoutFlag) * time.Millisecond))
go handleConnection(conn, *dumpFlag, *timeoutFlag, *templateFlag)
}
}()
}
// Start HTTPS server if tlsport specified
if *tlsPortFlag != "" {
wg.Add(1)
go func() {
defer wg.Done()
tlsPort := fmt.Sprintf(":%s", *tlsPortFlag)
tlsConfig, err := generateTLSConfig()
if err != nil {
log.Fatalf("Failed to generate TLS config: %v", err)
}
ln, err := tls.Listen("tcp", tlsPort, tlsConfig)
if err != nil {
log.Fatalf("Failed to start HTTPS listener: %v", err)
}
defer ln.Close()
log.Printf("HTTPS Server listening on %s", tlsPort)
for {
conn, err := ln.Accept()
if err != nil {
log.Printf("Failed to accept HTTPS connection: %v", err)
continue
}
conn.SetDeadline(time.Now().Add(time.Duration(*timeoutFlag) * time.Millisecond))
go handleConnection(conn, *dumpFlag, *timeoutFlag, *templateFlag)
}
}()
}
wg.Wait()
}
func handleConnection(conn net.Conn, dump string, timeout int, template string) {
// Set a deadline for the entire connection
if timeout > 0 {
conn.SetDeadline(time.Now().Add(time.Duration(timeout) * time.Millisecond))
}
defer conn.Close()
// Determine if connection is TLS
_, isTLS := conn.(*tls.Conn)
// Read the request first
reader := bufio.NewReader(conn)
var request strings.Builder
// Read headers
for {
line, err := reader.ReadString('\n')
if err != nil {
if err != io.EOF &&
!strings.Contains(err.Error(), "timeout") &&
!strings.Contains(err.Error(), "closed network connection") {
log.Printf("Read error: %v", err)
}
return
}
request.WriteString(line)
if line == "\r\n" || line == "\n" {
break
}
}
requestStr := request.String()
// Print the request with proper formatting
if requestStr != "" {
printRequest(requestStr, verbose, isTLS)
}
// Handle different templates
switch template {
case "timeout":
fmt.Printf("Sleeping for 1 seconds...\n")
time.Sleep(1 * time.Second)
fmt.Printf("Sleep done, sending response\n")
response := fmt.Sprintf("HTTP/1.1 200 OK\r\n"+
"Content-Type: text/plain\r\n"+
"Content-Length: %d\r\n"+
"Connection: close\r\n"+
"\r\n%s",
len(requestStr), requestStr)
conn.Write([]byte(response))
case "echo":
// Immediately send complete response
response := fmt.Sprintf("HTTP/1.1 200 OK\r\n"+
"Content-Type: text/plain\r\n"+
"Content-Length: %d\r\n"+
"Connection: close\r\n"+
"\r\n%s",
len(requestStr), requestStr)
conn.Write([]byte(response))
}
// Handle request dumping if enabled
if dump != "" && requestStr != "" {
if err := os.WriteFile(dump, []byte(requestStr), 0644); err != nil {
log.Printf("Failed to dump request: %v", err)
} else {
log.Printf("\nRequest dumped to: %s\n", dump)
}
}
}
// Helper function to print requests
func printRequest(req string, verbose bool, isTLS bool) {
if verbose {
// Replace special characters with colored versions
specialChars := map[string]string{
"\r": colorGreen + "\\r" + colorReset,
"\n": colorGreen + "\\n\n" + colorReset, // Keep the extra newline for readability
"\t": colorGreen + "\\t" + colorReset,
"\v": colorGreen + "\\v" + colorReset, // Vertical tab
"\f": colorGreen + "\\f" + colorReset, // Form feed
"\b": colorGreen + "\\b" + colorReset, // Backspace
"\a": colorGreen + "\\a" + colorReset, // Alert/Bell
}
for char, replacement := range specialChars {
req = strings.ReplaceAll(req, char, replacement)
}
}
// Color the text terminal req
if isTLS {
fmt.Print(colorYellow + req + colorReset)
} else {
fmt.Print(colorWhite + req + colorReset)
}
} And start it like this -port 80 -tlsport 443 -v -template timeout -timeout 200 Then run this testcase: package tests
import (
"crypto/tls"
"fmt"
"testing"
"github.com/valyala/fasthttp"
)
func TestServerClosedConnectionBeforeReturningTheFirstResponseByte(t *testing.T) {
client := &fasthttp.Client{
StreamResponseBody: false,
DisablePathNormalizing: true,
DisableHeaderNamesNormalizing: true,
TLSConfig: &tls.Config{
InsecureSkipVerify: true,
},
}
req := fasthttp.AcquireRequest()
resp := fasthttp.AcquireResponse()
defer fasthttp.ReleaseRequest(req)
defer fasthttp.ReleaseResponse(resp)
req.SetRequestURI("https://localhost/test")
err := client.Do(req, resp)
if err == nil {
t.Fatalf("i expect an error: %v", err)
}
fmt.Println(err)
} Result:
See if it helps to conclude if there is an issue in fasthttp core or a just a broken server. |
@slicingmelon This problem is because the server closes the tcp connection 200 milliseconds after Accept. When the client writes the http protocol data, the tcp connection has been closed, which is a normal phenomenon. You can change the timeout time to 3 seconds and try to understand. Line 3035 in 80d3e44
|
@byte0o Yes, you are right. In this scenario, the problem is the server. I tried to reproduce this on an isolated case because when I was sending multiple concurrent requests to a different server, the client was hanging, even with the To further prove myself, I also sent the same requests with curl, and curl closed the connection right away. Let me see if I can pull the nginx configuration for that server, as I was testing on an isolated environment, and run it in a container. |
Hello, I've managed to retrieve the nginx conf of the server where I was doing my tests, but it is hard to reproduce locally, as the testing server is configured to handle a lot more requests (it is a heavy-load server). However, I managed to reproduce it using a separate nginx conf. nginx.conf worker_processes 1;
events {
worker_connections 128;
}
http {
include mime.types;
default_type application/octet-stream;
access_log off;
error_log logs/error.log error;
sendfile on;
tcp_nopush off;
server_tokens off;
proxy_temp_path /home/nginx/proxy_temp 1 2;
server_names_hash_max_size 2048;
server_names_hash_bucket_size 256;
keepalive_timeout 10s;
keepalive_requests 100;
client_body_timeout 60;
client_header_timeout 60;
gzip on;
gzip_min_length 1000;
gzip_proxied expired no-cache no-store private auth;
large_client_header_buffers 8 1024k;
send_timeout 60;
server {
listen 80;
server_name localhost;
root /usr/share/nginx/html;
index index.html;
location / {
try_files $uri $uri/ =404;
}
}
} index.html <html>
<head>
<title>Test Page</title>
</head>
<body>
<h1>Hello, World!</h1>
<p>This is a simple nginx test page.</p>
</body>
</html> Dockerfile FROM nginx:alpine
RUN mkdir -p /home/nginx/proxy_temp && mkdir -p /etc/nginx/logs
COPY nginx.conf /etc/nginx/nginx.conf
COPY index.html /usr/share/nginx/html/index.html
EXPOSE 80 Run the container on an external server. docker build -t test-nginx .
docker run -p 80:80 test-nginx The following test case mimics some of the configurations I use on my fasthttp-based client, except that I use a worker pool to send concurrent requests. func TestServerClosedConnectionBeforeReturningTheFirstResponseByte2(t *testing.T) {
client := &fasthttp.Client{
StreamResponseBody: true,
DisablePathNormalizing: true,
DisableHeaderNamesNormalizing: true,
MaxConnsPerHost: 768, // 512 + 50% additional (I use the same config on my main tool)
MaxConnWaitTimeout: 1 * time.Second,
MaxIdleConnDuration: 1 * time.Minute,
TLSConfig: &tls.Config{
InsecureSkipVerify: true,
},
}
const totalRequests = 10000 // so a total of 10k requests to be sent
const concurrentRequests = 512 // and 512 concurrent requests
var (
wg sync.WaitGroup
errCount atomic.Int32
successCount atomic.Int32
startTime = time.Now()
sem = make(chan struct{}, concurrentRequests)
)
for i := 0; i < totalRequests; i++ {
wg.Add(1)
sem <- struct{}{}
go func(reqNum int) {
defer func() {
<-sem
wg.Done()
}()
req := fasthttp.AcquireRequest()
resp := fasthttp.AcquireResponse()
defer fasthttp.ReleaseRequest(req)
defer fasthttp.ReleaseResponse(resp)
// Change this to point to your server
req.SetRequestURI(fmt.Sprintf("http://external-test-server/test%d", reqNum))
err := client.Do(req, resp)
if err != nil {
errCount.Add(1)
if reqNum%100 == 0 {
t.Logf("Error on request %d: %v", reqNum, err)
}
} else {
successCount.Add(1)
if reqNum%1000 == 0 {
t.Logf("Success on request %d: %d %s", reqNum, resp.StatusCode(), resp.Body())
}
}
}(i)
}
wg.Wait()
elapsed := time.Since(startTime)
t.Logf("Test completed in %v", elapsed)
t.Logf("Total requests: %d", totalRequests)
t.Logf("Successful requests: %d", successCount.Load())
t.Logf("Failed requests: %d", errCount.Load())
t.Logf("Requests per second: %.2f", float64(totalRequests)/elapsed.Seconds())
// most common errors
if errCount.Load() > 0 {
t.Logf("Errors occurred during the test")
}
} Run the unit test with Result: go.exe test -timeout 30s -v -run ^TestServerClosedConnectionBeforeReturningTheFirstResponseByte2$ github.com/slicingmelon/go-bypass-403/tests/bugs
=== RUN TestServerClosedConnectionBeforeReturningTheFirstResponseByte2
byte_header_error_resp_test.go:88: Success on request 0: 404 <html>
<head><title>404 Not Found</title></head>
<body>
<center><h1>404 Not Found</h1></center>
<hr><center>nginx</center>
</body>
</html>
byte_header_error_resp_test.go:88: Success on request 1000: 404 <html>
<head><title>404 Not Found</title></head>
<body>
<center><h1>404 Not Found</h1></center>
<hr><center>nginx</center>
</body>
</html>
[...]
byte_header_error_resp_test.go:88: Success on request 8000: 404 <html>
<head><title>404 Not Found</title></head>
<body>
<center><h1>404 Not Found</h1></center>
<hr><center>nginx</center>
</body>
</html>
byte_header_error_resp_test.go:88: Success on request 9000: 404 <html>
<head><title>404 Not Found</title></head>
<body>
<center><h1>404 Not Found</h1></center>
<hr><center>nginx</center>
</body>
</html>
byte_header_error_resp_test.go:83: Error on request 9900: the server closed connection before returning the first response byte. Make sure the server returns 'Connection: close' response header before closing the connection
byte_header_error_resp_test.go:97: Test completed in 8.663774s
byte_header_error_resp_test.go:98: Total requests: 10000
byte_header_error_resp_test.go:99: Successful requests: 9937
byte_header_error_resp_test.go:100: Failed requests: 63
byte_header_error_resp_test.go:101: Requests per second: 1154.23
byte_header_error_resp_test.go:104: Errors occurred during the test
--- PASS: TestServerClosedConnectionBeforeReturningTheFirstResponseByte2 (8.66s)
PASS
ok github.com/slicingmelon/go-bypass-403/tests/bugs 10.506s As you can see, 63 out of 10000 requests failed with the error |
@slicingmelon I used the test configuration you provided and tested locally and found that nginx will actively send RST packets after establishing a tcp connection, which is a bit different from your test error. ![]() ![]() |
Hi @byte0o, I re-run the testcase above and captured the traffic using Wireshark. I noticed some TCP acked unseen segments: So I exported the capture and tried to perform some analysis using a python script. It needs refinements and improvements. import pyshark
import collections
import sys
from datetime import datetime
import statistics
def analyze_fasthttp_tcp_behavior(pcap_file):
print(f"Analyzing FastHTTP TCP behavior in {pcap_file}...")
cap = pyshark.FileCapture(pcap_file)
# Track TCP streams
tcp_streams = {}
http_requests_by_stream = {}
http_responses_by_stream = {}
rst_after_response = []
premature_rst = []
keepalive_connections = set()
total_rst_packets = 0
total_fin_packets = 0
connection_close_headers = 0
partial_handshakes = []
complete_handshakes = []
retransmissions = 0
# Connection reuse tracking
reused_connections = set()
potential_reused_streams = {}
tcp_ports_by_ip = {}
# Silent termination detection
idle_periods = []
silently_terminated_streams = []
acked_unseen_segments = 0
rst_ack_packets = 0
for i, packet in enumerate(cap):
if i % 10000 == 0 and i > 0:
print(f"Processed {i} packets...")
try:
if 'TCP' in packet:
# Track TCP stream ID for correlation
stream_id = packet.tcp.stream
# Record client and server information for port reuse detection
src_ip = packet.ip.src
dst_ip = packet.ip.dst
src_port = packet.tcp.srcport
dst_port = packet.tcp.dstport
# Track ports used by each IP to detect connection reuse
if src_ip not in tcp_ports_by_ip:
tcp_ports_by_ip[src_ip] = {}
if dst_ip not in tcp_ports_by_ip:
tcp_ports_by_ip[dst_ip] = {}
if src_port not in tcp_ports_by_ip[src_ip]:
tcp_ports_by_ip[src_ip][src_port] = set()
if dst_port not in tcp_ports_by_ip[dst_ip]:
tcp_ports_by_ip[dst_ip][dst_port] = set()
tcp_ports_by_ip[src_ip][src_port].add(stream_id)
tcp_ports_by_ip[dst_ip][dst_port].add(stream_id)
# If a port has multiple streams, it may indicate connection reuse
if len(tcp_ports_by_ip[src_ip][src_port]) > 1 or len(tcp_ports_by_ip[dst_ip][dst_port]) > 1:
reused_connections.add(stream_id)
# Track TCP handshake
if hasattr(packet.tcp, 'flags_syn') and packet.tcp.flags_syn == '1' and packet.tcp.flags_ack == '0':
if stream_id not in tcp_streams:
tcp_streams[stream_id] = {}
tcp_streams[stream_id]['has_syn'] = True
tcp_streams[stream_id]['syn_time'] = float(packet.sniff_timestamp)
if hasattr(packet.tcp, 'flags_syn') and packet.tcp.flags_syn == '1' and packet.tcp.flags_ack == '1':
if stream_id not in tcp_streams:
tcp_streams[stream_id] = {}
tcp_streams[stream_id]['has_syn_ack'] = True
if hasattr(packet.tcp, 'flags_ack') and packet.tcp.flags_ack == '1' and not hasattr(packet.tcp, 'flags_syn'):
if stream_id not in tcp_streams:
tcp_streams[stream_id] = {}
tcp_streams[stream_id]['has_ack'] = True
# Count RST packets
if hasattr(packet.tcp, 'flags_reset') and packet.tcp.flags_reset == '1':
total_rst_packets += 1
if stream_id not in tcp_streams:
tcp_streams[stream_id] = {}
tcp_streams[stream_id]['has_rst'] = True
tcp_streams[stream_id]['rst_time'] = float(packet.sniff_timestamp)
tcp_streams[stream_id]['rst_from'] = src_ip
# Check if this stream had a response
if stream_id in http_responses_by_stream:
# This is a RST after a response was sent
rst_after_response.append({
'stream_id': stream_id,
'time': float(packet.sniff_timestamp),
'src': packet.ip.src,
'dst': packet.ip.dst
})
# Count FIN packets
if hasattr(packet.tcp, 'flags_fin') and packet.tcp.flags_fin == '1':
total_fin_packets += 1
if stream_id not in tcp_streams:
tcp_streams[stream_id] = {}
tcp_streams[stream_id]['has_fin'] = True
tcp_streams[stream_id]['fin_time'] = float(packet.sniff_timestamp)
tcp_streams[stream_id]['fin_from'] = src_ip
# Track TCP streams
if stream_id not in tcp_streams:
tcp_streams[stream_id] = {
'packets': 0,
'first_packet_time': float(packet.sniff_timestamp),
'last_packet_time': float(packet.sniff_timestamp),
'client_ip': src_ip,
'server_ip': dst_ip,
'client_port': src_port,
'server_port': dst_port,
'packet_timestamps': [float(packet.sniff_timestamp)]
}
else:
tcp_streams[stream_id]['last_packet_time'] = float(packet.sniff_timestamp)
if 'packet_timestamps' not in tcp_streams[stream_id]:
tcp_streams[stream_id]['packet_timestamps'] = []
tcp_streams[stream_id]['packet_timestamps'].append(float(packet.sniff_timestamp))
tcp_streams[stream_id]['packets'] = tcp_streams[stream_id].get('packets', 0) + 1
# Track HTTP information in this TCP stream
if 'HTTP' in packet:
# HTTP Request
if hasattr(packet.http, 'request'):
if stream_id not in http_requests_by_stream:
http_requests_by_stream[stream_id] = []
request_info = {
'time': float(packet.sniff_timestamp),
'method': getattr(packet.http, 'request_method', 'UNKNOWN'),
'uri': getattr(packet.http, 'request_uri', 'UNKNOWN')
}
# Check for keep-alive header -- not present in the request but default in HTTP/1.1
if hasattr(packet.http, 'connection'):
request_info['connection'] = packet.http.connection
if 'keep-alive' in packet.http.connection.lower():
keepalive_connections.add(stream_id)
else:
# In HTTP/1.1, connections are keep-alive by default
request_info['connection'] = 'default-keepalive'
keepalive_connections.add(stream_id)
http_requests_by_stream[stream_id].append(request_info)
# HTTP Response
if hasattr(packet.http, 'response'):
if stream_id not in http_responses_by_stream:
http_responses_by_stream[stream_id] = []
response_info = {
'time': float(packet.sniff_timestamp),
'status_code': getattr(packet.http, 'response_code', 'UNKNOWN')
}
# Check for Connection: close header -- for other tests
if hasattr(packet.http, 'connection'):
response_info['connection'] = packet.http.connection
if 'close' in packet.http.connection.lower():
connection_close_headers += 1
http_responses_by_stream[stream_id].append(response_info)
if 'TCP' in packet and hasattr(packet, 'tcp'):
if hasattr(packet.tcp, 'analysis_retransmission'):
retransmissions += 1
# Look for ACKed unseen segment markers
if hasattr(packet.tcp, 'analysis_ack_lost_segment') or \
hasattr(packet.tcp, 'analysis_acked_unseen_segment'):
acked_unseen_segments += 1
# Look for RST+ACK packets
if hasattr(packet.tcp, 'flags_reset') and packet.tcp.flags_reset == '1' and \
hasattr(packet.tcp, 'flags_ack') and packet.tcp.flags_ack == '1':
rst_ack_packets += 1
except Exception as e:
print(f"Error processing packet {i}: {e}")
# Track TCP handshake states
for stream_id, stream_data in tcp_streams.items():
# Check if we have SYN, SYN-ACK, ACK sequence
if 'has_syn' not in stream_data or 'has_syn_ack' not in stream_data or 'has_ack' not in stream_data:
partial_handshakes.append(stream_id)
else:
complete_handshakes.append(stream_id)
# Analyze idle periods and potential silent terminations
for stream_id, stream_data in tcp_streams.items():
if 'packet_timestamps' in stream_data and len(stream_data['packet_timestamps']) > 1:
timestamps = sorted(stream_data['packet_timestamps'])
time_diffs = [timestamps[i+1] - timestamps[i] for i in range(len(timestamps)-1)]
if len(time_diffs) > 0:
max_idle = max(time_diffs)
avg_idle = sum(time_diffs) / len(time_diffs)
idle_periods.append(max_idle)
# Look for significant idle periods (could be refined)
if max_idle > avg_idle * 5 and max_idle > 1.0:
if stream_id in http_requests_by_stream:
silently_terminated_streams.append({
'stream_id': stream_id,
'max_idle': max_idle,
'avg_idle': avg_idle,
'requests': len(http_requests_by_stream.get(stream_id, [])),
'responses': len(http_responses_by_stream.get(stream_id, []))
})
print("\n===== FASTHTTP TCP BEHAVIOR ANALYSIS =====")
print(f"Total TCP streams: {len(tcp_streams)}")
print(f"Total HTTP requests: {sum(len(reqs) for reqs in http_requests_by_stream.values())}")
print(f"Total HTTP responses: {sum(len(resps) for resps in http_responses_by_stream.values())}")
print(f"Total RST packets: {total_rst_packets}")
print(f"Total FIN packets: {total_fin_packets}")
print(f"Responses with 'Connection: close' header: {connection_close_headers}")
print(f"Keep-alive connections (explicit or implicit HTTP/1.1): {len(keepalive_connections)}")
print(f"TCP retransmissions: {retransmissions}")
print(f"Potentially reused connections: {len(reused_connections)}")
print(f"TCP ACKed unseen segments: {acked_unseen_segments}")
print(f"RST+ACK packets: {rst_ack_packets}")
print("\n===== PROBLEMATIC TCP BEHAVIOR =====")
# 1. RST after response without Connection: close (from other tests)
rst_streams_after_response = set(item['stream_id'] for item in rst_after_response)
print(f"TCP streams with RST after response: {len(rst_streams_after_response)}")
# 2. Analyze TCP stream durations
stream_durations = []
for stream_id, data in tcp_streams.items():
duration = data['last_packet_time'] - data['first_packet_time']
stream_durations.append((stream_id, duration))
# Sort by duration
stream_durations.sort(key=lambda x: x[1])
# Find very short-lived streams with HTTP activity
short_streams = []
for stream_id, duration in stream_durations:
if duration < 0.5 and (stream_id in http_requests_by_stream or stream_id in http_responses_by_stream):
short_streams.append((stream_id, duration))
print(f"Short-lived TCP streams with HTTP activity: {len(short_streams)}")
# 3. Find streams where response was interrupted (has request but no complete response)
incomplete_streams = []
for stream_id in http_requests_by_stream.keys():
if stream_id not in http_responses_by_stream and stream_id in rst_streams_after_response:
incomplete_streams.append(stream_id)
print(f"Incomplete HTTP transactions (request without response, ended by RST): {len(incomplete_streams)}")
# 4. Analyze timing between last response and RST for problematic connections
timing_issues = []
for item in rst_after_response:
stream_id = item['stream_id']
if stream_id in http_responses_by_stream:
# Get the last response time
response_times = [resp['time'] for resp in http_responses_by_stream[stream_id]]
if response_times:
last_response_time = max(response_times)
# Time between last response and RST
time_to_rst = item['time'] - last_response_time
timing_issues.append((stream_id, time_to_rst))
timing_issues.sort(key=lambda x: x[1])
if timing_issues:
print("\n===== TIMING BETWEEN RESPONSE AND RST =====")
print("Time (seconds) between last HTTP response and RST packet:")
# Group by time ranges
time_ranges = {
"< 0.01s": 0,
"0.01s - 0.1s": 0,
"0.1s - 1s": 0,
"> 1s": 0
}
for _, time_to_rst in timing_issues:
if time_to_rst < 0.01:
time_ranges["< 0.01s"] += 1
elif time_to_rst < 0.1:
time_ranges["0.01s - 0.1s"] += 1
elif time_to_rst < 1:
time_ranges["0.1s - 1s"] += 1
else:
time_ranges["> 1s"] += 1
for range_name, count in time_ranges.items():
print(f" {range_name}: {count} streams")
# 5. Check for silent connection terminations
if silently_terminated_streams:
print("\n===== SILENT CONNECTION TERMINATION EVIDENCE =====")
print(f"Found {len(silently_terminated_streams)} streams with suspicious idle periods:")
for i, stream in enumerate(silently_terminated_streams[:5]): # Show top 5
print(f" Stream {stream['stream_id']}:")
print(f" Max idle period: {stream['max_idle']:.2f} seconds")
print(f" Avg idle period: {stream['avg_idle']:.2f} seconds")
print(f" HTTP requests: {stream['requests']}")
print(f" HTTP responses: {stream['responses']}")
# Idle period statistics
if idle_periods:
print(f"\nIdle period statistics across all streams:")
print(f" Minimum: {min(idle_periods):.2f} seconds")
print(f" Maximum: {max(idle_periods):.2f} seconds")
print(f" Average: {sum(idle_periods) / len(idle_periods):.2f} seconds")
print(f" Median: {statistics.median(idle_periods):.2f} seconds")
# 6. FastHTTP specific connection issues
print("\n===== POSSIBLE ISSUES =====")
# Calculate percentages for better analysis
total_streams_with_responses = len(http_responses_by_stream)
if total_streams_with_responses > 0:
rst_after_resp_percent = (len(rst_streams_after_response) / total_streams_with_responses) * 100
print(f"Percentage of streams with RST after response: {rst_after_resp_percent:.2f}%")
if connection_close_headers > 0:
connection_close_percent = (connection_close_headers / sum(len(resps) for resps in http_responses_by_stream.values())) * 100
print(f"Percentage of responses with 'Connection: close' header: {connection_close_percent:.2f}%")
else:
print("No 'Connection: close' headers found in any responses")
# Find keep-alive connections that were RST
keepalive_rst = keepalive_connections.intersection(rst_streams_after_response)
if keepalive_rst:
print(f"Keep-alive connections terminated with RST: {len(keepalive_rst)} streams")
# Connection reuse analysis
if reused_connections:
print(f"Connections with port reuse: {len(reused_connections)}")
print("FastHTTP may be reusing local ports for new connections after previous connections have been closed or become idle")
print("\n===== CONCLUSION =====")
silence_issue = (len(silently_terminated_streams) > 0 or
(total_rst_packets > 0 and connection_close_headers == 0) or
(acked_unseen_segments > 0 and len(short_streams) > 0))
if silence_issue:
print("\nPossible Issue Evidence:")
print(f"- {acked_unseen_segments} TCP ACKed unseen segments: Direct evidence of packets missing from capture")
print(f"- {len(short_streams)} short-lived connections: Connections terminating abnormally")
#print(f"- 'Connection: close' headers despite connection terminations")
if __name__ == "__main__":
if len(sys.argv) > 1:
pcap_file = sys.argv[1]
else:
pcap_file = "wireshark_capture_1.pcapng"
analyze_fasthttp_tcp_behavior(pcap_file) Output
The output is not great at all, the script needs to be updated. Still trying to analyze based on what we have..
So the server terminates the idle connections and fasthttp keeps these dead connections in the pool, not checking if they are terminated? AcquireConn So is it possible that the server terminated the connection while it was still in the fasthttp's pool, and then it hits ErrConnectionClosed. The error might come from ReadLimitBody and fasthttp translates io.EOF to ErrConnectionClosed? I am keen to find out the problem because the client happened to trigger this error when I was doing testing on servers configured to handle heavy traffic loads. |
Maybe you can take a look at this article about TCP full connection queue overflow |
It cost about 70ms from send msg to resp error:the server closed connection before returning the first response byte. Make sure the server returns 'Connection: close' response header before closing the connection.The code is as follows:

c := fasthttp.Client{}
c.TLSConfig = &tls.Config{
InsecureSkipVerify: true,
}
if err = c.DoTimeout(req, resp, 10*time.Second); err != nil {
return
}
Server message packet:
The text was updated successfully, but these errors were encountered: