Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/skywalking-go.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ jobs:
uses: apache/skywalking-eyes@9bd5feb86b5817aa6072b008f9866a2c3bbc8587
- name: Test
run: make test
- name: Test Race
run: make test-race
- name: Lint
run: make lint

Expand Down
3 changes: 3 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,16 @@ Release Notes.
* Add mutex to fix some data race.
* Replace external `goapi` dependency with in-repo generated protocols.
* Support pprof profiling.

#### Plugins

#### Documentation

#### Bug Fixes

* Fix plugin interceptors bypassed on Windows.
* Fix wrong tracing context switch when trace ignore plugin activated.
* Fix data race when sending trace data to reporter.

#### Issues and PR
- All issues are [here](https://github.com/apache/skywalking/milestone/238?closed=1)
Expand Down
18 changes: 18 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,24 @@ test: ## Run E2E scenario tests
fi; \
done

.PHONY: test-race
test-race: ## Run data-race regression tests (TestRace*) under the race detector
@$(LOG_TARGET)
@for dir in $$(find . -name go.mod -exec dirname {} \; ); do \
if [[ $$dir == "./test/"* ]]; then \
continue; \
fi; \
cd $$dir; \
echo "Race testing $$dir"; \
go test -race -run '^TestRace' ./...; \
test_status=$$?; \
cd ${REPODIR}; \
if [ $$test_status -ne 0 ]; then \
echo "Error occurred during race test, exiting..."; \
exit $$test_status; \
fi; \
done

.PHONY: lint
lint: linter ## Run golangci-lint linter
@$(LOG_TARGET)
Expand Down
59 changes: 46 additions & 13 deletions plugins/core/reporter/grpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,27 @@ func (r *gRPCReporter) closeGRPCConn() {
}
}

// sendWithRecover invokes send and recovers from a panic raised while encoding or
// transmitting a single message, so that one corrupted payload cannot tear down the
// whole send pipeline. On a recovered panic it logs via the existing logger and
// returns recovered=true, telling the caller to skip the current message and keep
// streaming the rest.
//
// Such a panic originates in protobuf size/marshal computation (the #13885 crash),
// which runs before any bytes are written to the stream, so the stream stays valid
// and may be reused for the next message. Should a panic ever leave the stream
// inconsistent, the following send returns an error and the caller reconnects.
func (r *gRPCReporter) sendWithRecover(send func() error) (recovered bool, err error) {
defer func() {
if rec := recover(); rec != nil {
r.logger.Errorf("gRPCReporter recovered from panic while sending, skip current message: %v", rec)
recovered = true
}
}()
err = send()
return recovered, err
}

// nolint
func (r *gRPCReporter) initSendPipeline() {
if r.traceClient == nil {
Expand Down Expand Up @@ -210,9 +231,12 @@ func (r *gRPCReporter) initSendPipeline() {
continue StreamLoop
}
for s := range r.tracingSendCh {
err = stream.Send(s)
if err != nil {
r.logger.Errorf("send segment error %v", err)
recovered, sendErr := r.sendWithRecover(func() error { return stream.Send(s) })
if recovered {
continue
}
if sendErr != nil {
r.logger.Errorf("send segment error %v", sendErr)
r.closeTracingStream(stream)
continue StreamLoop
}
Expand Down Expand Up @@ -245,11 +269,14 @@ func (r *gRPCReporter) initSendPipeline() {
continue StreamLoop
}
for s := range r.metricsSendCh {
err = stream.Send(&agentv3.MeterDataCollection{
MeterData: s,
recovered, sendErr := r.sendWithRecover(func() error {
return stream.Send(&agentv3.MeterDataCollection{MeterData: s})
})
if err != nil {
r.logger.Errorf("send metrics error %v", err)
if recovered {
continue
}
if sendErr != nil {
r.logger.Errorf("send metrics error %v", sendErr)
r.closeMetricsStream(stream)
continue StreamLoop
}
Expand Down Expand Up @@ -281,9 +308,12 @@ func (r *gRPCReporter) initSendPipeline() {
continue StreamLoop
}
for s := range r.logSendCh {
err = stream.Send(s)
if err != nil {
r.logger.Errorf("send log error %v", err)
recovered, sendErr := r.sendWithRecover(func() error { return stream.Send(s) })
if recovered {
continue
}
if sendErr != nil {
r.logger.Errorf("send log error %v", sendErr)
r.closeLogStream(stream)
continue StreamLoop
}
Expand Down Expand Up @@ -325,9 +355,12 @@ func (r *gRPCReporter) initSendPipeline() {
}
r.logger.Infof("Sending profile task: TaskID='%s', PayloadSize=%d, IsLast=%v",
task.TaskID, len(task.Payload), task.IsLast)
err = stream.Send(profileData)
if err != nil {
r.logger.Errorf("send profile data error %v", err)
recovered, sendErr := r.sendWithRecover(func() error { return stream.Send(profileData) })
if recovered {
continue
}
if sendErr != nil {
r.logger.Errorf("send profile data error %v", sendErr)
r.closeProfileStream(stream)
continue StreamLoop
}
Expand Down
32 changes: 28 additions & 4 deletions plugins/core/reporter/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,11 +141,29 @@ func (r *kafkaReporter) initSendPipeline() {
go r.logSendLoop()
}

// marshalWithRecover invokes marshal and recovers from a panic raised while encoding
// a single message, so that one corrupted payload cannot tear down the whole send
// loop. On a recovered panic it logs via the existing logger and returns
// recovered=true, telling the caller to skip the current message and keep going.
func (r *kafkaReporter) marshalWithRecover(marshal func() ([]byte, error)) (payload []byte, recovered bool, err error) {
defer func() {
if rec := recover(); rec != nil {
r.logger.Errorf("kafkaReporter recovered from panic while marshaling, skip current message: %v", rec)
recovered = true
}
}()
payload, err = marshal()
return payload, recovered, err
}

func (r *kafkaReporter) tracingSendLoop() {
consecutiveErrors := 0
logFrequency := 30
for s := range r.tracingSendCh {
payload, err := proto.Marshal(s)
payload, recovered, err := r.marshalWithRecover(func() ([]byte, error) { return proto.Marshal(s) })
if recovered {
continue
}
if err != nil {
r.logger.Errorf("marshal segment error %v", err)
continue
Expand All @@ -172,9 +190,12 @@ func (r *kafkaReporter) metricsSendLoop() {
consecutiveErrors := 0
logFrequency := 30
for s := range r.metricsSendCh {
payload, err := proto.Marshal(&agentv3.MeterDataCollection{
MeterData: s,
payload, recovered, err := r.marshalWithRecover(func() ([]byte, error) {
return proto.Marshal(&agentv3.MeterDataCollection{MeterData: s})
})
if recovered {
continue
}
if err != nil {
r.logger.Errorf("marshal metrics error %v", err)
continue
Expand All @@ -201,7 +222,10 @@ func (r *kafkaReporter) logSendLoop() {
consecutiveErrors := 0
logFrequency := 30
for s := range r.logSendCh {
payload, err := proto.Marshal(s)
payload, recovered, err := r.marshalWithRecover(func() ([]byte, error) { return proto.Marshal(s) })
if recovered {
continue
}
if err != nil {
r.logger.Errorf("marshal log error %v", err)
continue
Expand Down
41 changes: 39 additions & 2 deletions plugins/core/reporter/transform.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package reporter
import (
"time"

commonv3 "github.com/apache/skywalking-go/protocols/collect/common/v3"
agentv3 "github.com/apache/skywalking-go/protocols/collect/language/agent/v3"
)

Expand Down Expand Up @@ -60,8 +61,8 @@ func (r *Transform) TransformSegmentObject(spans []ReportedSpan) *agentv3.Segmen
SpanLayer: s.SpanLayer(),
ComponentId: s.ComponentID(),
IsError: s.IsError(),
Tags: s.Tags(),
Logs: s.Logs(),
Tags: copyKeyStringValuePairs(s.Tags()),
Logs: copyLogs(s.Logs()),
}
srr := make([]*agentv3.SegmentReference, 0)
if i == (spanSize-1) && spanCtx.GetParentSpanID() > -1 {
Expand Down Expand Up @@ -93,6 +94,42 @@ func (r *Transform) TransformSegmentObject(spans []ReportedSpan) *agentv3.Segmen
return segmentObject
}

// copyLogs returns a deep copy of the span logs for the same reason as
// copyKeyStringValuePairs.
func copyLogs(logs []*agentv3.Log) []*agentv3.Log {
if len(logs) == 0 {
return nil
}
cp := make([]*agentv3.Log, 0, len(logs))
for _, log := range logs {
if log == nil {
continue
}
cp = append(cp, &agentv3.Log{Time: log.Time, Data: copyKeyStringValuePairs(log.Data)})
}
return cp
}

// copyKeyStringValuePairs returns a deep copy of a key/value pair slice (span tags,
// or a log's data) so that the reported SegmentObject never shares mutable backing
// storage with the live span. Without this copy the reporter marshals the slice in a
// different goroutine while the span may still be mutated (e.g. when a span is
// mistakenly shared across goroutines), corrupting the protobuf message during encode
// (apache/skywalking#13885). Nil elements are skipped and an empty input returns nil.
func copyKeyStringValuePairs(pairs []*commonv3.KeyStringValuePair) []*commonv3.KeyStringValuePair {
if len(pairs) == 0 {
return nil
}
cp := make([]*commonv3.KeyStringValuePair, 0, len(pairs))
for _, p := range pairs {
if p == nil {
continue
}
cp = append(cp, &commonv3.KeyStringValuePair{Key: p.Key, Value: p.Value})
}
return cp
}

func (r *Transform) TransformMeterData(metrics []ReportedMeter) []*agentv3.MeterData {
if len(metrics) == 0 {
return nil
Expand Down
Loading
Loading