From bb672dccfd761636a91e0d8e5c99414ffacc6403 Mon Sep 17 00:00:00 2001 From: mrproliu <741550557@qq.com> Date: Wed, 27 May 2026 14:52:37 +0800 Subject: [PATCH 1/2] Fix data race when sending trace data to reporter. --- .github/workflows/skywalking-go.yaml | 2 + CHANGES.md | 3 + Makefile | 18 +++ plugins/core/reporter/grpc/grpc.go | 59 ++++++-- plugins/core/reporter/kafka/kafka.go | 32 ++++- plugins/core/reporter/transform.go | 41 +++++- plugins/core/segment_datarace_test.go | 191 ++++++++++++++++++++++++++ 7 files changed, 327 insertions(+), 19 deletions(-) create mode 100644 plugins/core/segment_datarace_test.go diff --git a/.github/workflows/skywalking-go.yaml b/.github/workflows/skywalking-go.yaml index 1a29fe0a..f31138b9 100644 --- a/.github/workflows/skywalking-go.yaml +++ b/.github/workflows/skywalking-go.yaml @@ -37,6 +37,8 @@ jobs: uses: apache/skywalking-eyes@9bd5feb86b5817aa6072b008f9866a2c3bbc8587 - name: Test run: make test + - name: Test Race + run: make test-race - name: Lint run: make lint diff --git a/CHANGES.md b/CHANGES.md index 57f0ccb8..401c55c8 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -12,13 +12,16 @@ Release Notes. * Add mutex to fix some data race. * Replace external `goapi` dependency with in-repo generated protocols. * Support pprof profiling. + #### Plugins #### Documentation + #### Bug Fixes * Fix plugin interceptors bypassed on Windows. * Fix wrong tracing context switch when trace ignore plugin activated. +* Fix data race when sending trace data to reporter. #### Issues and PR - All issues are [here](https://github.com/apache/skywalking/milestone/238?closed=1) diff --git a/Makefile b/Makefile index a9dea81c..04471996 100644 --- a/Makefile +++ b/Makefile @@ -89,6 +89,24 @@ test: ## Run E2E scenario tests fi; \ done +.PHONY: test-race +test-race: ## Run data-race regression tests (TestRace*) under the race detector + @$(LOG_TARGET) + @for dir in $$(find . -name go.mod -exec dirname {} \; ); do \ + if [[ $$dir == "./test/"* ]]; then \ + continue; \ + fi; \ + cd $$dir; \ + echo "Race testing $$dir"; \ + go test -race -run '^TestRace' ./...; \ + test_status=$$?; \ + cd ${REPODIR}; \ + if [ $$test_status -ne 0 ]; then \ + echo "Error occurred during race test, exiting..."; \ + exit $$test_status; \ + fi; \ + done + .PHONY: lint lint: linter ## Run golangci-lint linter @$(LOG_TARGET) diff --git a/plugins/core/reporter/grpc/grpc.go b/plugins/core/reporter/grpc/grpc.go index 7f21ad31..be60b385 100644 --- a/plugins/core/reporter/grpc/grpc.go +++ b/plugins/core/reporter/grpc/grpc.go @@ -182,6 +182,27 @@ func (r *gRPCReporter) closeGRPCConn() { } } +// sendWithRecover invokes send and recovers from a panic raised while encoding or +// transmitting a single message, so that one corrupted payload cannot tear down the +// whole send pipeline. On a recovered panic it logs via the existing logger and +// returns recovered=true, telling the caller to skip the current message and keep +// streaming the rest. +// +// Such a panic originates in protobuf size/marshal computation (the #13885 crash), +// which runs before any bytes are written to the stream, so the stream stays valid +// and may be reused for the next message. Should a panic ever leave the stream +// inconsistent, the following send returns an error and the caller reconnects. +func (r *gRPCReporter) sendWithRecover(send func() error) (recovered bool, err error) { + defer func() { + if rec := recover(); rec != nil { + r.logger.Errorf("gRPCReporter recovered from panic while sending, skip current message: %v", rec) + recovered = true + } + }() + err = send() + return recovered, err +} + // nolint func (r *gRPCReporter) initSendPipeline() { if r.traceClient == nil { @@ -210,9 +231,12 @@ func (r *gRPCReporter) initSendPipeline() { continue StreamLoop } for s := range r.tracingSendCh { - err = stream.Send(s) - if err != nil { - r.logger.Errorf("send segment error %v", err) + recovered, sendErr := r.sendWithRecover(func() error { return stream.Send(s) }) + if recovered { + continue + } + if sendErr != nil { + r.logger.Errorf("send segment error %v", sendErr) r.closeTracingStream(stream) continue StreamLoop } @@ -245,11 +269,14 @@ func (r *gRPCReporter) initSendPipeline() { continue StreamLoop } for s := range r.metricsSendCh { - err = stream.Send(&agentv3.MeterDataCollection{ - MeterData: s, + recovered, sendErr := r.sendWithRecover(func() error { + return stream.Send(&agentv3.MeterDataCollection{MeterData: s}) }) - if err != nil { - r.logger.Errorf("send metrics error %v", err) + if recovered { + continue + } + if sendErr != nil { + r.logger.Errorf("send metrics error %v", sendErr) r.closeMetricsStream(stream) continue StreamLoop } @@ -281,9 +308,12 @@ func (r *gRPCReporter) initSendPipeline() { continue StreamLoop } for s := range r.logSendCh { - err = stream.Send(s) - if err != nil { - r.logger.Errorf("send log error %v", err) + recovered, sendErr := r.sendWithRecover(func() error { return stream.Send(s) }) + if recovered { + continue + } + if sendErr != nil { + r.logger.Errorf("send log error %v", sendErr) r.closeLogStream(stream) continue StreamLoop } @@ -325,9 +355,12 @@ func (r *gRPCReporter) initSendPipeline() { } r.logger.Infof("Sending profile task: TaskID='%s', PayloadSize=%d, IsLast=%v", task.TaskID, len(task.Payload), task.IsLast) - err = stream.Send(profileData) - if err != nil { - r.logger.Errorf("send profile data error %v", err) + recovered, sendErr := r.sendWithRecover(func() error { return stream.Send(profileData) }) + if recovered { + continue + } + if sendErr != nil { + r.logger.Errorf("send profile data error %v", sendErr) r.closeProfileStream(stream) continue StreamLoop } diff --git a/plugins/core/reporter/kafka/kafka.go b/plugins/core/reporter/kafka/kafka.go index 012f9f2a..39a4f04b 100644 --- a/plugins/core/reporter/kafka/kafka.go +++ b/plugins/core/reporter/kafka/kafka.go @@ -141,11 +141,29 @@ func (r *kafkaReporter) initSendPipeline() { go r.logSendLoop() } +// marshalWithRecover invokes marshal and recovers from a panic raised while encoding +// a single message, so that one corrupted payload cannot tear down the whole send +// loop. On a recovered panic it logs via the existing logger and returns +// recovered=true, telling the caller to skip the current message and keep going. +func (r *kafkaReporter) marshalWithRecover(marshal func() ([]byte, error)) (payload []byte, recovered bool, err error) { + defer func() { + if rec := recover(); rec != nil { + r.logger.Errorf("kafkaReporter recovered from panic while marshalling, skip current message: %v", rec) + recovered = true + } + }() + payload, err = marshal() + return payload, recovered, err +} + func (r *kafkaReporter) tracingSendLoop() { consecutiveErrors := 0 logFrequency := 30 for s := range r.tracingSendCh { - payload, err := proto.Marshal(s) + payload, recovered, err := r.marshalWithRecover(func() ([]byte, error) { return proto.Marshal(s) }) + if recovered { + continue + } if err != nil { r.logger.Errorf("marshal segment error %v", err) continue @@ -172,9 +190,12 @@ func (r *kafkaReporter) metricsSendLoop() { consecutiveErrors := 0 logFrequency := 30 for s := range r.metricsSendCh { - payload, err := proto.Marshal(&agentv3.MeterDataCollection{ - MeterData: s, + payload, recovered, err := r.marshalWithRecover(func() ([]byte, error) { + return proto.Marshal(&agentv3.MeterDataCollection{MeterData: s}) }) + if recovered { + continue + } if err != nil { r.logger.Errorf("marshal metrics error %v", err) continue @@ -201,7 +222,10 @@ func (r *kafkaReporter) logSendLoop() { consecutiveErrors := 0 logFrequency := 30 for s := range r.logSendCh { - payload, err := proto.Marshal(s) + payload, recovered, err := r.marshalWithRecover(func() ([]byte, error) { return proto.Marshal(s) }) + if recovered { + continue + } if err != nil { r.logger.Errorf("marshal log error %v", err) continue diff --git a/plugins/core/reporter/transform.go b/plugins/core/reporter/transform.go index 4cb89d1d..8533eb1b 100644 --- a/plugins/core/reporter/transform.go +++ b/plugins/core/reporter/transform.go @@ -20,6 +20,7 @@ package reporter import ( "time" + commonv3 "github.com/apache/skywalking-go/protocols/collect/common/v3" agentv3 "github.com/apache/skywalking-go/protocols/collect/language/agent/v3" ) @@ -60,8 +61,8 @@ func (r *Transform) TransformSegmentObject(spans []ReportedSpan) *agentv3.Segmen SpanLayer: s.SpanLayer(), ComponentId: s.ComponentID(), IsError: s.IsError(), - Tags: s.Tags(), - Logs: s.Logs(), + Tags: copyKeyStringValuePairs(s.Tags()), + Logs: copyLogs(s.Logs()), } srr := make([]*agentv3.SegmentReference, 0) if i == (spanSize-1) && spanCtx.GetParentSpanID() > -1 { @@ -93,6 +94,42 @@ func (r *Transform) TransformSegmentObject(spans []ReportedSpan) *agentv3.Segmen return segmentObject } +// copyLogs returns a deep copy of the span logs for the same reason as +// copyKeyStringValuePairs. +func copyLogs(logs []*agentv3.Log) []*agentv3.Log { + if len(logs) == 0 { + return nil + } + cp := make([]*agentv3.Log, 0, len(logs)) + for _, log := range logs { + if log == nil { + continue + } + cp = append(cp, &agentv3.Log{Time: log.Time, Data: copyKeyStringValuePairs(log.Data)}) + } + return cp +} + +// copyKeyStringValuePairs returns a deep copy of a key/value pair slice (span tags, +// or a log's data) so that the reported SegmentObject never shares mutable backing +// storage with the live span. Without this copy the reporter marshals the slice in a +// different goroutine while the span may still be mutated (e.g. when a span is +// mistakenly shared across goroutines), corrupting the protobuf message during encode +// (apache/skywalking#13885). Nil elements are skipped and an empty input returns nil. +func copyKeyStringValuePairs(pairs []*commonv3.KeyStringValuePair) []*commonv3.KeyStringValuePair { + if len(pairs) == 0 { + return nil + } + cp := make([]*commonv3.KeyStringValuePair, 0, len(pairs)) + for _, p := range pairs { + if p == nil { + continue + } + cp = append(cp, &commonv3.KeyStringValuePair{Key: p.Key, Value: p.Value}) + } + return cp +} + func (r *Transform) TransformMeterData(metrics []ReportedMeter) []*agentv3.MeterData { if len(metrics) == 0 { return nil diff --git a/plugins/core/segment_datarace_test.go b/plugins/core/segment_datarace_test.go new file mode 100644 index 00000000..e4071017 --- /dev/null +++ b/plugins/core/segment_datarace_test.go @@ -0,0 +1,191 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//go:build race + +// This file holds data-race regression tests for apache/skywalking#13885. They are +// only meaningful under the race detector, so they are built solely with the `race` +// tag and are run by `make test-race` (which selects them via `-run '^TestRace'`). + +package core + +import ( + "fmt" + "sync" + "sync/atomic" + "testing" + "time" + + "google.golang.org/protobuf/proto" + + "github.com/apache/skywalking-go/plugins/core/reporter" +) + +// buildReportedSpan constructs a real *SegmentSpanImpl, the exact type the agent +// hands to Transform.TransformSegmentObject when a finished segment is reported. +func buildReportedSpan() *SegmentSpanImpl { + return &SegmentSpanImpl{ + DefaultSpan: DefaultSpan{ + StartTime: time.Now(), + EndTime: time.Now(), + OperationName: "users/SELECT", + Peer: "127.0.0.1:5432", + SpanType: SpanTypeExit, + }, + SegmentContext: SegmentContext{ + TraceID: "trace-id", + SegmentID: "segment-id", + SpanID: 0, + ParentSpanID: -1, + }, + } +} + +// TestRaceSegmentDecoupledFromLiveSpan is the regression test for +// apache/skywalking#13885 ("Service crash caused by Go agent unexpected fault address"). +// +// Root cause: TransformSegmentObject used to publish the span's Tags/Logs slices +// by reference ("Tags: s.Tags()"). The produced SegmentObject was then queued and +// marshalled by the gRPC reporter goroutine (the "send queue"). When the same span +// was still being mutated - e.g. the gorm plugin stores one span per *gorm.DB and a +// single *gorm.DB session is reused across goroutines - the reporter walked a slice +// that another goroutine was appending to, corrupting the protobuf message and +// faulting inside MessageInfo.sizePointerSlow. +// +// The fix deep-copies Tags/Logs in TransformSegmentObject so the reported segment +// shares no mutable storage with the live span. This test verifies that guarantee: +// once a segment has been transformed for sending, concurrently mutating the span +// must not race with marshalling the segment. +func TestRaceSegmentDecoupledFromLiveSpan(t *testing.T) { + span := buildReportedSpan() + const nTags = 16 + for i := 0; i < nTags; i++ { + span.Tag(fmt.Sprintf("key-%d", i), "init") + } + + transform := reporter.NewTransform(&reporter.Entity{ + ServiceName: "svc", + ServiceInstanceName: "inst", + }) + + // The reporter transforms the finished segment once and hands it to the send queue. + seg := transform.TransformSegmentObject([]reporter.ReportedSpan{span}) + + var stop int32 + var wg sync.WaitGroup + wg.Add(2) + + // goroutine A: a concurrent caller keeps mutating the SAME span (shared-session + // misuse): it both updates existing tag values in place and appends new tags. + go func() { + defer wg.Done() + for i := 0; atomic.LoadInt32(&stop) == 0; i++ { + span.Tag(fmt.Sprintf("key-%d", i%nTags), fmt.Sprintf("v-%d", i)) + span.Tag(fmt.Sprintf("extra-%d", i), "v") + } + }() + + // goroutine B: the send queue marshals the already-transformed segment. + go func() { + defer wg.Done() + for atomic.LoadInt32(&stop) == 0 { + if _, err := proto.Marshal(seg); err != nil { + t.Errorf("marshal segment: %v", err) + return + } + } + }() + + time.Sleep(300 * time.Millisecond) + atomic.StoreInt32(&stop, 1) + wg.Wait() +} + +// TestRaceReporterNeverPanicsWhileSpanMutated is the panic-focused regression test +// for apache/skywalking#13885. It reproduces the exact crash conditions and asserts +// the reporter never panics/faults. +// +// A finished segment is transformed once (as the agent does when a segment ends), +// then several "send queue" goroutines keep marshalling it while ONE caller keeps +// mutating the original span - in particular updating existing tag values IN PLACE, +// which is the write that, before the fix, let the reporter read a half-overwritten +// string and fault inside the protobuf encoder. A single mutator is used on purpose +// so the test isolates the reporter-vs-span race (the fix's guarantee) rather than +// span-vs-span writer races, which are a separate concern. +// +// Each goroutine recovers from panics and the test fails if any occurred, so a +// regression surfaces as a clean failure instead of crashing the test binary. +func TestRaceReporterNeverPanicsWhileSpanMutated(t *testing.T) { + span := buildReportedSpan() + const nTags = 32 + for i := 0; i < nTags; i++ { + span.Tag(fmt.Sprintf("key-%d", i), "init") + span.Log("event", fmt.Sprintf("log-%d", i)) + } + + transform := reporter.NewTransform(&reporter.Entity{ + ServiceName: "svc", + ServiceInstanceName: "inst", + }) + seg := transform.TransformSegmentObject([]reporter.ReportedSpan{span}) + + var stop int32 + var panicked int32 + var wg sync.WaitGroup + + guard := func(fn func()) { + defer wg.Done() + defer func() { + if r := recover(); r != nil { + atomic.AddInt32(&panicked, 1) + t.Errorf("reporter panicked while the span was mutated concurrently: %v", r) + } + }() + fn() + } + + // one caller keeps mutating the live span: in-place tag-value updates, new tags and logs. + wg.Add(1) + go guard(func() { + for i := 0; atomic.LoadInt32(&stop) == 0; i++ { + span.Tag(fmt.Sprintf("key-%d", i%nTags), fmt.Sprintf("v-%d", i)) + span.Tag(fmt.Sprintf("extra-%d", i), "v") + span.Log("k", fmt.Sprintf("v-%d", i)) + } + }) + + // several send-queue goroutines marshal the already-transformed segment. + for r := 0; r < 4; r++ { + wg.Add(1) + go guard(func() { + for atomic.LoadInt32(&stop) == 0 { + if _, err := proto.Marshal(seg); err != nil { + t.Errorf("marshal segment: %v", err) + return + } + } + }) + } + + time.Sleep(500 * time.Millisecond) + atomic.StoreInt32(&stop, 1) + wg.Wait() + + if atomic.LoadInt32(&panicked) > 0 { + t.Fatalf("reporter panicked %d time(s) - the #13885 crash regressed", atomic.LoadInt32(&panicked)) + } +} From 3a836526a5e4b099153ea6f26f1bf70feee42adb Mon Sep 17 00:00:00 2001 From: mrproliu <741550557@qq.com> Date: Wed, 27 May 2026 15:50:29 +0800 Subject: [PATCH 2/2] fix lint --- plugins/core/reporter/kafka/kafka.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/core/reporter/kafka/kafka.go b/plugins/core/reporter/kafka/kafka.go index 39a4f04b..0655d29b 100644 --- a/plugins/core/reporter/kafka/kafka.go +++ b/plugins/core/reporter/kafka/kafka.go @@ -148,7 +148,7 @@ func (r *kafkaReporter) initSendPipeline() { func (r *kafkaReporter) marshalWithRecover(marshal func() ([]byte, error)) (payload []byte, recovered bool, err error) { defer func() { if rec := recover(); rec != nil { - r.logger.Errorf("kafkaReporter recovered from panic while marshalling, skip current message: %v", rec) + r.logger.Errorf("kafkaReporter recovered from panic while marshaling, skip current message: %v", rec) recovered = true } }()