Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions server/ingester/flow_log/dbwriter/span_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ func (t *SpanWithTraceID) Encode() {
encoder.WriteString255(t.RequestResource)
encoder.WriteString255(t.ResponseResult)
encoder.WriteString255(t.BizProtocol)
encoder.WriteString255(t.BizResponseCode)
encoder.WriteString255(t.ResponseException)
if t.RequestId != nil {
encoder.WriteVarintU64(*t.RequestId)
Expand Down
2 changes: 1 addition & 1 deletion server/ingester/flow_log/log_data/l7_flow_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ func L7FlowLogColumns() []*ckdb.Column {
l7Columns = append(l7Columns, ckdb.NewColumn("_id", ckdb.UInt64))
l7Columns = append(l7Columns, L7BaseColumns()...)
l7Columns = append(l7Columns,
ckdb.NewColumn("l7_protocol", ckdb.UInt8).SetIndex(ckdb.IndexNone).SetComment("0:未知 1:其他, 20:http1, 21:http2, 40:dubbo, 60:mysql, 80:redis, 100:kafka, 101:mqtt, 120:dns"),
ckdb.NewColumn("l7_protocol", ckdb.UInt8).SetIndex(ckdb.IndexNone).SetComment("0:未知, 20:http1, 21:http2, 40:dubbo, 60:mysql, 61:postgresql, 62:oracle, 63:dameng, 64:db2, 65:tdsql, 66:oceanbase, 67:goldendb, 68:kingbase, 80:redis, 100:kafka, 101:mqtt, 120:dns"),
ckdb.NewColumn("biz_protocol", ckdb.LowCardinalityString).SetIndex(ckdb.IndexNone).SetComment("应用协议"),
ckdb.NewColumn("version", ckdb.LowCardinalityString).SetComment("协议版本"),
ckdb.NewColumn("type", ckdb.UInt8).SetIndex(ckdb.IndexNone).SetComment("日志类型, 0:请求, 1:响应, 2:会话"),
Expand Down
40 changes: 40 additions & 0 deletions server/libs/datatype/flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,11 @@ const (
L7_PROTOCOL_POSTGRE L7Protocol = 61
L7_PROTOCOL_ORACLE L7Protocol = 62
L7_PROTOCOL_DAMENG L7Protocol = 63
L7_PROTOCOL_DB2 L7Protocol = 64
L7_PROTOCOL_TDSQL L7Protocol = 65
L7_PROTOCOL_OCEANBASE L7Protocol = 66
L7_PROTOCOL_GOLDENDB L7Protocol = 67
L7_PROTOCOL_KINGBASE L7Protocol = 68
L7_PROTOCOL_REDIS L7Protocol = 80
L7_PROTOCOL_MONGODB L7Protocol = 81
L7_PROTOCOL_MEMCACHED L7Protocol = 82
Expand Down Expand Up @@ -700,6 +705,36 @@ func (p L7Protocol) String(isTLS bool) string {
} else {
return "Dameng"
}
case L7_PROTOCOL_DB2:
if isTLS {
return "DB2_TLS"
} else {
return "DB2"
}
case L7_PROTOCOL_TDSQL:
if isTLS {
return "TDSQL_TLS"
} else {
return "TDSQL"
}
case L7_PROTOCOL_OCEANBASE:
if isTLS {
return "OceanBase_TLS"
} else {
return "OceanBase"
}
case L7_PROTOCOL_GOLDENDB:
if isTLS {
return "GoldenDB_TLS"
} else {
return "GoldenDB"
}
case L7_PROTOCOL_KINGBASE:
if isTLS {
return "Kingbase_TLS"
} else {
return "Kingbase"
}
case L7_PROTOCOL_ISO8583:
if isTLS {
return "ISO-8583_TLS"
Expand Down Expand Up @@ -823,6 +858,11 @@ var L7ProtocolStringMap = map[string]L7Protocol{
strings.ToLower(L7_PROTOCOL_POSTGRE.String(false)): L7_PROTOCOL_POSTGRE,
strings.ToLower(L7_PROTOCOL_ORACLE.String(false)): L7_PROTOCOL_ORACLE,
strings.ToLower(L7_PROTOCOL_DAMENG.String(false)): L7_PROTOCOL_DAMENG,
strings.ToLower(L7_PROTOCOL_DB2.String(false)): L7_PROTOCOL_DB2,
strings.ToLower(L7_PROTOCOL_TDSQL.String(false)): L7_PROTOCOL_TDSQL,
strings.ToLower(L7_PROTOCOL_OCEANBASE.String(false)): L7_PROTOCOL_OCEANBASE,
strings.ToLower(L7_PROTOCOL_GOLDENDB.String(false)): L7_PROTOCOL_GOLDENDB,
strings.ToLower(L7_PROTOCOL_KINGBASE.String(false)): L7_PROTOCOL_KINGBASE,
strings.ToLower(L7_PROTOCOL_ISO8583.String(false)): L7_PROTOCOL_ISO8583,
strings.ToLower(L7_PROTOCOL_NET_SIGN.String(false)): L7_PROTOCOL_NET_SIGN,
strings.ToLower(L7_PROTOCOL_TRIPLE.String(false)): L7_PROTOCOL_TRIPLE,
Expand Down
22 changes: 22 additions & 0 deletions server/libs/datatype/tag_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,25 @@ func TestTagEncodeAndDecode(t *testing.T) {
t.Errorf("编解码函数实现错误")
}
}

func TestCommercialDatabaseL7ProtocolMappings(t *testing.T) {
expected := map[string]L7Protocol{
"mysql": L7_PROTOCOL_MYSQL,
"postgresql": L7_PROTOCOL_POSTGRE,
"oracle": L7_PROTOCOL_ORACLE,
"dameng": L7_PROTOCOL_DAMENG,
"db2": L7_PROTOCOL_DB2,
"tdsql": L7_PROTOCOL_TDSQL,
"oceanbase": L7_PROTOCOL_OCEANBASE,
"goldendb": L7_PROTOCOL_GOLDENDB,
"kingbase": L7_PROTOCOL_KINGBASE,
}
for name, protocol := range expected {
if got := L7ProtocolStringMap[name]; got != protocol {
t.Errorf("L7ProtocolStringMap[%q] = %v, want %v", name, got, protocol)
}
if got := protocol.String(false); got == "N/A" {
t.Errorf("L7Protocol(%d).String(false) = %q", protocol, got)
}
}
}
2 changes: 1 addition & 1 deletion server/libs/flow-metrics/tag.go
Original file line number Diff line number Diff line change
Expand Up @@ -1003,7 +1003,7 @@ func GenTagColumns(code Code) []*ckdb.Column {
columns = append(columns, ckdb.NewColumnWithGroupBy("l3_epc_id_1", ckdb.Int32).SetComment("ip4/6_1对应的EPC ID"))
}
if code&L7Protocol != 0 {
columns = append(columns, ckdb.NewColumnWithGroupBy("l7_protocol", ckdb.UInt8).SetComment("应用协议0: unknown, 1: http, 2: dns, 3: mysql, 4: redis, 5: dubbo, 6: kafka"))
columns = append(columns, ckdb.NewColumnWithGroupBy("l7_protocol", ckdb.UInt8).SetComment("应用协议0: unknown, 20: http1, 21: http2, 40: dubbo, 60: mysql, 61: postgresql, 62: oracle, 63: dameng, 64: db2, 65: tdsql, 66: oceanbase, 67: goldendb, 68: kingbase, 80: redis, 100: kafka"))
columns = append(columns, ckdb.NewColumnWithGroupBy("app_service", ckdb.LowCardinalityString))
columns = append(columns, ckdb.NewColumnWithGroupBy("app_instance", ckdb.LowCardinalityString))
columns = append(columns, ckdb.NewColumnWithGroupBy("endpoint", ckdb.String))
Expand Down
5 changes: 4 additions & 1 deletion server/libs/tracetree/spantrace.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ const SPAN_TRACE_VERSION_0x13 = 0x13 // before 20251108
const SPAN_TRACE_VERSION_0x14 = 0x14 // before 20251211
const SPAN_TRACE_VERSION_0x15 = 0x15 // before 20251227
const SPAN_TRACE_VERSION_0x16 = 0x16
const SPAN_TRACE_VERSION = 0x17
const SPAN_TRACE_VERSION_0x17 = 0x17 // before 20260507
const SPAN_TRACE_VERSION = 0x18

type SpanTrace struct {
QuerierRegion string // not store, easy to use when calculating
Expand Down Expand Up @@ -75,6 +76,7 @@ type SpanTrace struct {
RequestResource string // notice: will be cut to 255 when write
ResponseResult string
BizProtocol string
BizResponseCode string
ResponseException string
RequestId uint64
SyscallTraceIDRequest uint64
Expand Down Expand Up @@ -137,6 +139,7 @@ func (t *SpanTrace) Decode(decoder *codec.SimpleDecoder) error {
t.RequestResource = decoder.ReadString255()
t.ResponseResult = decoder.ReadString255()
t.BizProtocol = decoder.ReadString255()
t.BizResponseCode = decoder.ReadString255()
t.ResponseException = decoder.ReadString255()
t.RequestId = decoder.ReadVarintU64()

Expand Down
82 changes: 74 additions & 8 deletions server/libs/tracetree/tracetree.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ const TRACE_TREE_VERSION_0x19 = 0x19 // before 20260127
const TRACE_TREE_VERSION_0x20 = 0x20 // before 20260129
const TRACE_TREE_VERSION_0x21 = 0x21 // before 20260202
const TRACE_TREE_VERSION_0x22 = 0x22 // before 20260317
const TRACE_TREE_VERSION = 0x23
const TRACE_TREE_VERSION_0x23 = 0x23 // before 20260507
const TRACE_TREE_VERSION = 0x24

func HashSearchIndex(key string) uint64 {
return utils.DJBHash(17, key)
Expand All @@ -56,6 +57,14 @@ type TraceTree struct {
encodedTreeNodes []byte
}

type EndpointStats struct {
BizResponseCode string `json:"biz_response_code"`
ResponseException string `json:"response_exception"`
ResponseCode uint32 `json:"response_code"`
Total uint32 `json:"total"`
ResponseStatus uint8 `json:"response_status"`
}

type SpanInfo struct {
SignalSource uint8
AutoServiceType0 uint8
Expand All @@ -66,6 +75,7 @@ type SpanInfo struct {
AppService1 string
ObservationPoint string
Endpoints []string
EndpointStat []EndpointStats

IsIPv4 bool
IP40 uint32
Expand All @@ -82,6 +92,8 @@ type NodeInfo struct {
ObservationPoint string
Endpoints0 []string
Endpoints1 []string
EndpointStat0 []EndpointStats
EndpointStat1 []EndpointStats

IsIPv4 bool
IP4 uint32
Expand All @@ -102,6 +114,7 @@ type TreeNode struct {
Topic string
QuerierRegion string

BizResponseCode string
ResponseException string
ResponseDurationSum uint64
ResponseCode uint32
Expand Down Expand Up @@ -176,6 +189,14 @@ func (t *TraceTree) Encode() {
for _, e := range s.Endpoints {
encoder.WriteString255(e)
}
encoder.WriteU16(uint16(len(s.EndpointStat)))
for _, e := range s.EndpointStat {
encoder.WriteString255(e.BizResponseCode)
encoder.WriteString255(e.ResponseException)
encoder.WriteVarintU32(e.ResponseCode)
encoder.WriteVarintU32(e.Total)
encoder.WriteU8(e.ResponseStatus)
}

encoder.WriteBool(s.IsIPv4)
if s.IsIPv4 {
Expand Down Expand Up @@ -209,6 +230,22 @@ func (t *TraceTree) Encode() {
for _, e := range nodeInfo.Endpoints1 {
encoder.WriteString255(e)
}
encoder.WriteU16(uint16(len(nodeInfo.EndpointStat0)))
for _, e := range nodeInfo.EndpointStat0 {
encoder.WriteString255(e.BizResponseCode)
encoder.WriteString255(e.ResponseException)
encoder.WriteVarintU32(e.ResponseCode)
encoder.WriteVarintU32(e.Total)
encoder.WriteU8(e.ResponseStatus)
}
encoder.WriteU16(uint16(len(nodeInfo.EndpointStat1)))
for _, e := range nodeInfo.EndpointStat1 {
encoder.WriteString255(e.BizResponseCode)
encoder.WriteString255(e.ResponseException)
encoder.WriteVarintU32(e.ResponseCode)
encoder.WriteVarintU32(e.Total)
encoder.WriteU8(e.ResponseStatus)
}

encoder.WriteBool(nodeInfo.IsIPv4)
if nodeInfo.IsIPv4 {
Expand All @@ -223,6 +260,7 @@ func (t *TraceTree) Encode() {
encoder.WriteU8(node.PseudoLink)
encoder.WriteString255(node.Topic)
encoder.WriteString255(node.QuerierRegion)
encoder.WriteString255(node.BizResponseCode)
encoder.WriteString255(node.ResponseException)
encoder.WriteVarintU64(node.ResponseDurationSum)
encoder.WriteVarintU32(node.ResponseCode)
Expand All @@ -237,7 +275,7 @@ func (t *TraceTree) Encode() {

func (t *TraceTree) Decode(decoder *codec.SimpleDecoder) error {
version := decoder.ReadU8()
if version != TRACE_TREE_VERSION && version != TRACE_TREE_VERSION_0X12 && version != TRACE_TREE_VERSION_0X13 {
if version != TRACE_TREE_VERSION {
return fmt.Errorf("trace tree data version is %d expect version is %d", version, TRACE_TREE_VERSION)
}
t.UID = decoder.ReadU64()
Expand Down Expand Up @@ -268,6 +306,16 @@ func (t *TraceTree) Decode(decoder *codec.SimpleDecoder) error {
for k := 0; k < endpointCount; k++ {
s.Endpoints[k] = decoder.ReadString255()
}
endpointStatCount := int(decoder.ReadU16())
s.EndpointStat = make([]EndpointStats, endpointStatCount)
for k := 0; k < endpointStatCount; k++ {
e := &s.EndpointStat[k]
e.BizResponseCode = decoder.ReadString255()
e.ResponseException = decoder.ReadString255()
e.ResponseCode = decoder.ReadVarintU32()
e.Total = decoder.ReadVarintU32()
e.ResponseStatus = decoder.ReadU8()
}

s.IsIPv4 = decoder.ReadBool()
if s.IsIPv4 {
Expand Down Expand Up @@ -298,6 +346,27 @@ func (t *TraceTree) Decode(decoder *codec.SimpleDecoder) error {
for j := 0; j < endpointCount; j++ {
nodeInfo.Endpoints1[j] = decoder.ReadString255()
}
endpointStatCount := int(decoder.ReadU16())
nodeInfo.EndpointStat0 = make([]EndpointStats, endpointStatCount)
for j := 0; j < endpointStatCount; j++ {
e := &nodeInfo.EndpointStat0[j]
e.BizResponseCode = decoder.ReadString255()
e.ResponseException = decoder.ReadString255()
e.ResponseCode = decoder.ReadVarintU32()
e.Total = decoder.ReadVarintU32()
e.ResponseStatus = decoder.ReadU8()
}
endpointStatCount = int(decoder.ReadU16())
nodeInfo.EndpointStat1 = make([]EndpointStats, endpointStatCount)
for j := 0; j < endpointStatCount; j++ {
e := &nodeInfo.EndpointStat1[j]
e.BizResponseCode = decoder.ReadString255()
e.ResponseException = decoder.ReadString255()
e.ResponseCode = decoder.ReadVarintU32()
e.Total = decoder.ReadVarintU32()
e.ResponseStatus = decoder.ReadU8()
}

nodeInfo.IsIPv4 = decoder.ReadBool()
if nodeInfo.IsIPv4 {
nodeInfo.IP4 = decoder.ReadU32()
Expand All @@ -307,15 +376,12 @@ func (t *TraceTree) Decode(decoder *codec.SimpleDecoder) error {
}
n.ChildIndices = n.ChildIndices[:0]
n.Level = 0
if version == TRACE_TREE_VERSION_0X12 {
n.PseudoLink = 0
} else {
n.PseudoLink = decoder.ReadU8()
}
n.PseudoLink = decoder.ReadU8()
n.UID = ""
n.Topic = decoder.ReadString255()
n.QuerierRegion = decoder.ReadString255()
if version >= TRACE_TREE_VERSION {
n.QuerierRegion = decoder.ReadString255()
n.BizResponseCode = decoder.ReadString255()
}
n.ResponseException = decoder.ReadString255()
n.ResponseDurationSum = decoder.ReadVarintU64()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@
61 , PostgreSQL ,
62 , Oracle ,
63 , Dameng ,
64 , DB2 ,
65 , TDSQL ,
66 , OceanBase ,
67 , GoldenDB ,
68 , Kingbase ,
80 , Redis ,
81 , MongoDB ,
82 , Memcached ,
Expand Down
39 changes: 39 additions & 0 deletions server/querier/db_descriptions/clickhouse/tag/enum/l7_protocol.ch
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# Value , DisplayName , Description
0 , N/A ,
20 , HTTP ,
21 , HTTP2 ,
40 , Dubbo ,
41 , gRPC ,
43 , SofaRPC ,
44 , FastCGI ,
45 , bRPC ,
46 , Tars ,
47 , Some/IP ,
48 , ISO-8583 ,
49 , Triple ,
50 , NetSign ,
60 , MySQL ,
61 , PostgreSQL ,
62 , Oracle ,
63 , 达梦 ,
64 , DB2 ,
65 , TDSQL ,
66 , OceanBase ,
67 , GoldenDB ,
68 , 人大金仓 ,
80 , Redis ,
81 , MongoDB ,
82 , Memcached ,
100 , Kafka ,
101 , MQTT ,
102 , AMQP , RabbitMQ
103 , OpenWire , ActiveMQ
104 , NATS ,
105 , Pulsar ,
106 , ZMTP , ZeroMQ
107 , RocketMQ ,
108 , WebSphereMQ ,
120 , DNS ,
121 , TLS ,
122 , Ping ,
127 , Custom ,
39 changes: 39 additions & 0 deletions server/querier/db_descriptions/clickhouse/tag/enum/l7_protocol.en
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# Value , DisplayName , Description
0 , N/A ,
20 , HTTP ,
21 , HTTP2 ,
40 , Dubbo ,
41 , gRPC ,
43 , SofaRPC ,
44 , FastCGI ,
45 , bRPC ,
46 , Tars ,
47 , Some/IP ,
48 , ISO-8583 ,
49 , Triple ,
50 , NetSign ,
60 , MySQL ,
61 , PostgreSQL ,
62 , Oracle ,
63 , Dameng ,
64 , DB2 ,
65 , TDSQL ,
66 , OceanBase ,
67 , GoldenDB ,
68 , Kingbase ,
80 , Redis ,
81 , MongoDB ,
82 , Memcached ,
100 , Kafka ,
101 , MQTT ,
102 , AMQP , RabbitMQ
103 , OpenWire , ActiveMQ
104 , NATS ,
105 , Pulsar ,
106 , ZMTP , ZeroMQ
107 , RocketMQ ,
108 , WebSphereMQ ,
120 , DNS ,
121 , TLS ,
122 , Ping ,
127 , Custom ,
Loading
Loading