Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 152 additions & 0 deletions pkg/leakybucket/bench_race_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
package leakybucket

import (
"fmt"
"testing"
"time"

log "github.com/sirupsen/logrus"

"github.com/crowdsecurity/crowdsec/pkg/pipeline"
)

// benchEvent returns a fresh, realistic parsed HTTP event. Meta is a single map
// that PourItemToHolders will share (by reference) across every matching bucket.
func benchEvent(now time.Time) pipeline.Event {
return pipeline.Event{
Time: now,
Meta: map[string]string{
"source_ip": "1.2.3.4",
"service": "http",
"log_type": "http_access-log",
"http_status": "404",
"http_path": "/wp-admin/",
"http_verb": "GET",
"http_user_agent": "curl/8.0",
"http_host": "example.com",
"machine": "test",
"datasource_path": "/var/log/traefik/access.log",
"datasource_type": "file",
"k1": "v1",
"k2": "v2",
"k3": "v3",
},
}
}

// runPourBenchmark drives PourItemToHolders against `readers` trigger scenarios
// (each overflows immediately and reads evt.Meta while building its alert). If
// withWriter is true, a final scenario whose filter mutates the shared event via
// evt.SetMeta(...) is appended — reproducing the crowdsecurity/http-technology-probing
// side effect and the crowdsecurity/crowdsec#4459 data race.
func runPourBenchmark(b *testing.B, readers int, withWriter bool) {
b.Helper()

// keep the overflow alert validation noise out of the benchmark output
oldLevel := log.GetLevel()
log.SetLevel(log.PanicLevel)
defer log.SetLevel(oldLevel)

// response collects overflows emitted by every bucket (AllOut). It must be
// drained continuously, otherwise the overflowing trigger goroutines block
// on the send *after* having read the shared map.
response := make(chan pipeline.Event, 4096)

stop := make(chan struct{})
go func() {
for {
select {
case <-stop:
return
case <-response:
}
}
}()
defer close(stop)

holders := make([]BucketFactory, 0, readers+1)

// Readers: each is a trigger that overflows immediately and reads evt.Meta
// while building the alert.
for i := range readers {
holders = append(holders, BucketFactory{
Spec: BucketSpec{
Name: fmt.Sprintf("reader_%d", i),
Description: "reads shared Meta on overflow",
Type: "trigger",
Filter: "true",
},
})
}

if withWriter {
// The writer last: its filter has a side effect on the shared event, just
// like crowdsecurity/http-technology-probing's `evt.SetMeta(...)`.
holders = append(holders, BucketFactory{
Spec: BucketSpec{
Name: "writer",
Description: "mutates shared Meta from its filter",
Type: "trigger",
Filter: `evt.SetMeta("injected", "1")`,
},
})
}

for idx := range holders {
holders[idx].ret = response
if err := holders[idx].LoadBucket(); err != nil {
b.Fatalf("while loading holder %d: %s", idx, err)
}

if err := holders[idx].Validate(); err != nil {
b.Fatalf("while validating holder %d: %s", idx, err)
}
}

ctx := b.Context()
bucketStore := NewBucketStore()
now := time.Now().UTC()

for b.Loop() {
evt := benchEvent(now)
if _, err := PourItemToHolders(ctx, evt, holders, bucketStore, nil); err != nil {
b.Fatalf("while pouring item: %s", err)
}
}
}

// BenchmarkPourSharedMetaRace reproduces the data race reported in
// crowdsecurity/crowdsec#4459.
//
// Root cause: PourItemToHolders pours a single parsed event into every matching
// scenario, but Event.Meta is a map (reference type) and the pour path only
// makes a shallow struct copy (pipeline.Queue.Add). Every bucket queue across
// all matching scenarios therefore ends up sharing the *same* Meta map.
//
// It wires up several "reader" trigger scenarios followed by a "writer" whose
// filter mutates the shared map via evt.SetMeta(...). When runPour evaluates the
// writer's filter (a map write) while an already-poured reader bucket iterates
// the same map during overflow (a map read), the Go runtime aborts with:
//
// fatal error: concurrent map iteration and map write
// fatal error: concurrent map read and map write
//
// Run as a race reproducer (reports a data race before the fix, clean after):
//
// go test -tags 'netgo,osusergo,expr_debug,nomsgpack' -race \
// -run='^$' -bench=BenchmarkPourSharedMetaRace ./pkg/leakybucket/
func BenchmarkPourSharedMetaRace(b *testing.B) {
runPourBenchmark(b, 8, true)
}

// BenchmarkPourNoSideEffect is the same pour workload without the side-effecting
// filter, so no map is ever mutated after being poured. It is race-free both
// before and after the #4459 fix, which makes it the apples-to-apples way to
// measure the fix's only cost: the per-pour CopyForBucket clone. Compare ns/op
// and allocs/op with and without the fix applied:
//
// go test -tags 'netgo,osusergo,expr_debug,nomsgpack' \
// -run='^$' -bench=BenchmarkPourNoSideEffect -benchmem ./pkg/leakybucket/
func BenchmarkPourNoSideEffect(b *testing.B) {
runPourBenchmark(b, 8, false)
}
6 changes: 6 additions & 0 deletions pkg/leakybucket/manager_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,12 @@ func PourItemToBucket(
var buckey = bucket.Mapkey
var err error

// Make a copy of the various maps for this bucket
// Otherwise, we could risk reading and writing to the same map concurrently
// (eg, with a call to evt.SetMeta in a filter, which the same event has lead to an overflow for another bucket)
localEvt := parsed.CopyForBucket()
parsed = &localEvt

sigclosed := 0
failed_sent := 0
attempts := 0
Expand Down
11 changes: 11 additions & 0 deletions pkg/pipeline/event.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package pipeline

import (
"maps"
"net/netip"
"time"

Expand Down Expand Up @@ -45,6 +46,16 @@ type Event struct {
Meta map[string]string `json:"Meta,omitempty" yaml:"Meta,omitempty"`
}

func (e *Event) CopyForBucket() Event {
out := *e
out.Parsed = maps.Clone(e.Parsed)
out.Enriched = maps.Clone(e.Enriched)
out.Unmarshaled = maps.Clone(e.Unmarshaled)
out.Meta = maps.Clone(e.Meta)

return out
}

func MakeEvent(timeMachine bool, evtType int, process bool) Event {
evt := Event{
Parsed: make(map[string]string),
Expand Down
70 changes: 70 additions & 0 deletions pkg/pipeline/event_copy_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package pipeline

import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestCopyForBucketIndependence(t *testing.T) {
src := Event{
Meta: map[string]string{"source_ip": "1.2.3.4", "log_type": "http_access-log"},
Parsed: map[string]string{"verb": "GET"},
Enriched: map[string]string{"IsoCode": "FR"},
Unmarshaled: map[string]any{"json": map[string]any{"a": 1}},
}

cp := src.CopyForBucket()

// same content right after the copy
assert.Equal(t, src.Meta, cp.Meta)
assert.Equal(t, src.Parsed, cp.Parsed)
assert.Equal(t, src.Enriched, cp.Enriched)

// mutating the copy must not affect the source (this is the #4459 guarantee)
cp.SetMeta("injected", "1")
cp.SetParsed("verb", "POST")
cp.Enriched["IsoCode"] = "US"

_, ok := src.Meta["injected"]
assert.False(t, ok, "writing copy.Meta leaked into source")
assert.Equal(t, "GET", src.Parsed["verb"], "writing copy.Parsed leaked into source")
assert.Equal(t, "FR", src.Enriched["IsoCode"], "writing copy.Enriched leaked into source")

// the top-level Unmarshaled map is cloned (key add/remove is isolated)
cp.Unmarshaled["new"] = 2
_, ok = src.Unmarshaled["new"]
assert.False(t, ok, "adding a key to copy.Unmarshaled leaked into source")
}

func TestCopyForBucketNilMapsStayNil(t *testing.T) {
var src Event

cp := src.CopyForBucket()

// maps.Clone preserves nil, which the downstream nil checks rely on
require.Nil(t, cp.Meta)
require.Nil(t, cp.Parsed)
require.Nil(t, cp.Enriched)
require.Nil(t, cp.Unmarshaled)
}

// BenchmarkCopyForBucket isolates the per-pour cost of the #4459 fix: cloning
// the four data maps of a realistic parsed HTTP event.
func BenchmarkCopyForBucket(b *testing.B) {
src := Event{
Meta: map[string]string{
"source_ip": "1.2.3.4", "service": "http", "log_type": "http_access-log",
"http_status": "404", "http_path": "/wp-admin/", "http_verb": "GET",
"http_user_agent": "curl/8.0", "http_host": "example.com", "machine": "test",
"datasource_path": "/var/log/traefik/access.log", "datasource_type": "file",
},
Parsed: map[string]string{"verb": "GET", "status": "404", "request": "/wp-admin/"},
Enriched: map[string]string{"IsoCode": "FR", "ASNumber": "1234", "ASNOrg": "Example"},
}

for b.Loop() {
_ = src.CopyForBucket()
}
}
Loading