Skip to content

Deadlock in ResponseChain when reading large parallel responses #714

@Ezzer17

Description

@Ezzer17

Under high concurrent load with large responses, a deadlock can happen. This leads to nuclei hanging. I use the nuclei engine with the following setting:

engine, err := nuclei.NewNucleiEngine(
		nuclei.WithTemplatesOrWorkflows(nuclei.TemplateSources{
			Templates: []string{"nuclei-templates"},
		}),
		nuclei.WithTemplateFilters(
			nuclei.TemplateFilters{
				IDs: templateIDs,
			},
		),
		nuclei.WithVerbosity(nuclei.VerbosityOptions{
			Silent: true,
		}),
		nuclei.WithNetworkConfig(
			nuclei.NetworkConfig{
				Retries: 3,
				Timeout: 10,
			},
		),
		nuclei.WithGlobalRateLimit(1000, time.Second),
		nuclei.UseStatsWriter(r.statsWriter),
		nuclei.WithConcurrency(nuclei.Concurrency{
			TemplateConcurrency:           5,
			HostConcurrency:               700,
			HeadlessHostConcurrency:       10,
			HeadlessTemplateConcurrency:   10,
			JavascriptTemplateConcurrency: 10,
			TemplatePayloadConcurrency:    10,
			ProbeConcurrency:              1000,
		}),
		nuclei.WithResponseBodyLimit(1024*1024),
		nuclei.WithCatalog(r.catalog),
	)

I've experienced hangs during http-only scans of 100000+ hosts. Root cause:

In respChain.go:
When putBuffer() discards a buffer > largeBufferThreshold with a full largeBufferSem, it doesn't release the semaphore slot acquired in getBuffer(). This exhausts semaphore slots and causes deadlock. This test reproduces the issue:

package httputil

import (
	"bytes"
	"io"
	"net/http"
	"strings"
	"sync"
	"testing"
	"time"

	"github.com/projectdiscovery/utils/sync/sizedpool"
)

// TestSemaphoreLeakDeadlock reproduces the semaphore leak deadlock.
//
// Bug: When putBuffer() discards a buffer (pool full or buffer too large),
// it doesn't release the semaphore slot acquired in getBuffer().
// This exhausts semaphore slots and causes deadlock.
func TestSemaphoreLeakDeadlock(t *testing.T) {
	// Save original state
	origBufPool := bufPool
	origSem := largeBufferSem
	defer func() {
		bufPool = origBufPool
		largeBufferSem = origSem
	}()

	// Decrease pull buffer size for test speed.
	// Can't do it with regular functions because
	// fns like SetBufferSize actually do nothing.
	var p = &sync.Pool{
		New: func() any {
			return new(bytes.Buffer)
		},
	}
	var err error
	bufPool, err = sizedpool.New(
		sizedpool.WithPool[*bytes.Buffer](p),
		sizedpool.WithSize[*bytes.Buffer](20),
	)
	if err != nil {
		t.Fatal(err)
	}
	// Only one large buffer. This number is 20 by default.
	setLargeBufferSemSize(1)

	responses := 2
	iteration := 0

	// Repeatedly:
	// - allocate 2 responses which simulate parallel responses with large bodies until this deadlocks
	// - close them
	for {

		done := make(chan bool, 1)
		iteration++

		go func() {

			responses := make([]*ResponseChain, responses)
			for i := range responses {
				t.Logf("iteration %d, response %d", iteration, i)
				largeBody := strings.Repeat("X", largeBufferThreshold+1)
				resp := &http.Response{
					StatusCode: 200,
					Body:       io.NopCloser(strings.NewReader(largeBody)),
					Header:     http.Header{},
				}
				// 2 new buffers are allocated here
				// At the start of the test, they are small so largeBufferSem is not touched.
				// We get 4 buffers total.
				rc := NewResponseChain(resp, -1)
				rc.Fill() // Read body to expand buffers
				responses[i] = rc
			}

			// Close all - triggers bug:
			// First 2 return to pool
			// Then the largeBufferSem is full, so one large buffer does return to buffer.
			// We leak one buffer from pool.
			for _, rc := range responses {
				rc.Close()
			}
			done <- true
		}()
		select {
		case <-done:
		case <-time.After(time.Second):
			t.Fatalf("DEADLOCK")
		}
	}

}

Metadata

Metadata

Assignees

Labels

No labels
No labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions