Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/skywalking-go.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ jobs:
uses: apache/skywalking-eyes@9bd5feb86b5817aa6072b008f9866a2c3bbc8587
- name: Test
run: make test
- name: Test Race
run: make test-race
- name: Lint
run: make lint

Expand Down
3 changes: 3 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,16 @@ Release Notes.
* Add mutex to fix some data race.
* Replace external `goapi` dependency with in-repo generated protocols.
* Support pprof profiling.

#### Plugins

#### Documentation

#### Bug Fixes

* Fix plugin interceptors bypassed on Windows.
* Fix wrong tracing context switch when trace ignore plugin activated.
* Fix data race when sending trace data to reporter.

#### Issues and PR
- All issues are [here](https://github.com/apache/skywalking/milestone/238?closed=1)
Expand Down
18 changes: 18 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,24 @@ test: ## Run E2E scenario tests
fi; \
done

.PHONY: test-race
test-race: ## Run data-race regression tests (TestRace*) under the race detector
@$(LOG_TARGET)
@for dir in $$(find . -name go.mod -exec dirname {} \; ); do \
if [[ $$dir == "./test/"* ]]; then \
continue; \
fi; \
cd $$dir; \
echo "Race testing $$dir"; \
go test -race -run '^TestRace' ./...; \
test_status=$$?; \
cd ${REPODIR}; \
if [ $$test_status -ne 0 ]; then \
echo "Error occurred during race test, exiting..."; \
exit $$test_status; \
fi; \
done

.PHONY: lint
lint: linter ## Run golangci-lint linter
@$(LOG_TARGET)
Expand Down
59 changes: 46 additions & 13 deletions plugins/core/reporter/grpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,27 @@ func (r *gRPCReporter) closeGRPCConn() {
}
}

// sendWithRecover invokes send and recovers from a panic raised while encoding or
// transmitting a single message, so that one corrupted payload cannot tear down the
// whole send pipeline. On a recovered panic it logs via the existing logger and
// returns recovered=true, telling the caller to skip the current message and keep
// streaming the rest.
//
// Such a panic originates in protobuf size/marshal computation (the #13885 crash),
// which runs before any bytes are written to the stream, so the stream stays valid
// and may be reused for the next message. Should a panic ever leave the stream
// inconsistent, the following send returns an error and the caller reconnects.
func (r *gRPCReporter) sendWithRecover(send func() error) (recovered bool, err error) {
defer func() {
if rec := recover(); rec != nil {
r.logger.Errorf("gRPCReporter recovered from panic while sending, skip current message: %v", rec)
recovered = true
}
}()
err = send()
return recovered, err
}

// nolint
func (r *gRPCReporter) initSendPipeline() {
if r.traceClient == nil {
Expand Down Expand Up @@ -210,9 +231,12 @@ func (r *gRPCReporter) initSendPipeline() {
continue StreamLoop
}
for s := range r.tracingSendCh {
err = stream.Send(s)
if err != nil {
r.logger.Errorf("send segment error %v", err)
recovered, sendErr := r.sendWithRecover(func() error { return stream.Send(s) })
if recovered {
continue
}
if sendErr != nil {
r.logger.Errorf("send segment error %v", sendErr)
r.closeTracingStream(stream)
continue StreamLoop
}
Expand Down Expand Up @@ -245,11 +269,14 @@ func (r *gRPCReporter) initSendPipeline() {
continue StreamLoop
}
for s := range r.metricsSendCh {
err = stream.Send(&agentv3.MeterDataCollection{
MeterData: s,
recovered, sendErr := r.sendWithRecover(func() error {
return stream.Send(&agentv3.MeterDataCollection{MeterData: s})
})
if err != nil {
r.logger.Errorf("send metrics error %v", err)
if recovered {
continue
}
if sendErr != nil {
r.logger.Errorf("send metrics error %v", sendErr)
r.closeMetricsStream(stream)
continue StreamLoop
}
Expand Down Expand Up @@ -281,9 +308,12 @@ func (r *gRPCReporter) initSendPipeline() {
continue StreamLoop
}
for s := range r.logSendCh {
err = stream.Send(s)
if err != nil {
r.logger.Errorf("send log error %v", err)
recovered, sendErr := r.sendWithRecover(func() error { return stream.Send(s) })
if recovered {
continue
}
if sendErr != nil {
r.logger.Errorf("send log error %v", sendErr)
r.closeLogStream(stream)
continue StreamLoop
}
Expand Down Expand Up @@ -325,9 +355,12 @@ func (r *gRPCReporter) initSendPipeline() {
}
r.logger.Infof("Sending profile task: TaskID='%s', PayloadSize=%d, IsLast=%v",
task.TaskID, len(task.Payload), task.IsLast)
err = stream.Send(profileData)
if err != nil {
r.logger.Errorf("send profile data error %v", err)
recovered, sendErr := r.sendWithRecover(func() error { return stream.Send(profileData) })
if recovered {
continue
}
if sendErr != nil {
r.logger.Errorf("send profile data error %v", sendErr)
r.closeProfileStream(stream)
continue StreamLoop
}
Expand Down
32 changes: 28 additions & 4 deletions plugins/core/reporter/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,11 +141,29 @@ func (r *kafkaReporter) initSendPipeline() {
go r.logSendLoop()
}

// marshalWithRecover invokes marshal and recovers from a panic raised while encoding
// a single message, so that one corrupted payload cannot tear down the whole send
// loop. On a recovered panic it logs via the existing logger and returns
// recovered=true, telling the caller to skip the current message and keep going.
func (r *kafkaReporter) marshalWithRecover(marshal func() ([]byte, error)) (payload []byte, recovered bool, err error) {
defer func() {
if rec := recover(); rec != nil {
r.logger.Errorf("kafkaReporter recovered from panic while marshalling, skip current message: %v", rec)
recovered = true
}
}()
payload, err = marshal()
return payload, recovered, err
}

func (r *kafkaReporter) tracingSendLoop() {
consecutiveErrors := 0
logFrequency := 30
for s := range r.tracingSendCh {
payload, err := proto.Marshal(s)
payload, recovered, err := r.marshalWithRecover(func() ([]byte, error) { return proto.Marshal(s) })
if recovered {
continue
}
if err != nil {
r.logger.Errorf("marshal segment error %v", err)
continue
Expand All @@ -172,9 +190,12 @@ func (r *kafkaReporter) metricsSendLoop() {
consecutiveErrors := 0
logFrequency := 30
for s := range r.metricsSendCh {
payload, err := proto.Marshal(&agentv3.MeterDataCollection{
MeterData: s,
payload, recovered, err := r.marshalWithRecover(func() ([]byte, error) {
return proto.Marshal(&agentv3.MeterDataCollection{MeterData: s})
})
if recovered {
continue
}
if err != nil {
r.logger.Errorf("marshal metrics error %v", err)
continue
Expand All @@ -201,7 +222,10 @@ func (r *kafkaReporter) logSendLoop() {
consecutiveErrors := 0
logFrequency := 30
for s := range r.logSendCh {
payload, err := proto.Marshal(s)
payload, recovered, err := r.marshalWithRecover(func() ([]byte, error) { return proto.Marshal(s) })
if recovered {
continue
}
if err != nil {
r.logger.Errorf("marshal log error %v", err)
continue
Expand Down
41 changes: 39 additions & 2 deletions plugins/core/reporter/transform.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package reporter
import (
"time"

commonv3 "github.com/apache/skywalking-go/protocols/collect/common/v3"
agentv3 "github.com/apache/skywalking-go/protocols/collect/language/agent/v3"
)

Expand Down Expand Up @@ -60,8 +61,8 @@ func (r *Transform) TransformSegmentObject(spans []ReportedSpan) *agentv3.Segmen
SpanLayer: s.SpanLayer(),
ComponentId: s.ComponentID(),
IsError: s.IsError(),
Tags: s.Tags(),
Logs: s.Logs(),
Tags: copyKeyStringValuePairs(s.Tags()),
Logs: copyLogs(s.Logs()),
}
srr := make([]*agentv3.SegmentReference, 0)
if i == (spanSize-1) && spanCtx.GetParentSpanID() > -1 {
Expand Down Expand Up @@ -93,6 +94,42 @@ func (r *Transform) TransformSegmentObject(spans []ReportedSpan) *agentv3.Segmen
return segmentObject
}

// copyLogs returns a deep copy of the span logs for the same reason as
// copyKeyStringValuePairs.
func copyLogs(logs []*agentv3.Log) []*agentv3.Log {
if len(logs) == 0 {
return nil
}
cp := make([]*agentv3.Log, 0, len(logs))
for _, log := range logs {
if log == nil {
continue
}
cp = append(cp, &agentv3.Log{Time: log.Time, Data: copyKeyStringValuePairs(log.Data)})
}
return cp
}

// copyKeyStringValuePairs returns a deep copy of a key/value pair slice (span tags,
// or a log's data) so that the reported SegmentObject never shares mutable backing
// storage with the live span. Without this copy the reporter marshals the slice in a
// different goroutine while the span may still be mutated (e.g. when a span is
// mistakenly shared across goroutines), corrupting the protobuf message during encode
// (apache/skywalking#13885). Nil elements are skipped and an empty input returns nil.
func copyKeyStringValuePairs(pairs []*commonv3.KeyStringValuePair) []*commonv3.KeyStringValuePair {
if len(pairs) == 0 {
return nil
}
cp := make([]*commonv3.KeyStringValuePair, 0, len(pairs))
for _, p := range pairs {
if p == nil {
continue
}
cp = append(cp, &commonv3.KeyStringValuePair{Key: p.Key, Value: p.Value})
}
return cp
}

func (r *Transform) TransformMeterData(metrics []ReportedMeter) []*agentv3.MeterData {
if len(metrics) == 0 {
return nil
Expand Down
Loading
Loading