diff --git a/agent/src/flow_generator/protocol_logs/http.rs b/agent/src/flow_generator/protocol_logs/http.rs index af0ae074370..06389b648a6 100755 --- a/agent/src/flow_generator/protocol_logs/http.rs +++ b/agent/src/flow_generator/protocol_logs/http.rs @@ -482,8 +482,10 @@ impl HttpInfo { self.status_code = Some(code as u16); } - if custom.resp.status != self.status { - self.status = custom.resp.status; + if let Some(status) = custom.resp.status { + if status != self.status { + self.status = status; + } } if !custom.resp.result.is_empty() { diff --git a/agent/src/flow_generator/protocol_logs/mq/web_sphere_mq.rs b/agent/src/flow_generator/protocol_logs/mq/web_sphere_mq.rs index 13a960b883f..8a8685538fe 100644 --- a/agent/src/flow_generator/protocol_logs/mq/web_sphere_mq.rs +++ b/agent/src/flow_generator/protocol_logs/mq/web_sphere_mq.rs @@ -213,9 +213,9 @@ impl WebSphereMqInfo { self.response_code = code; } - if custom.resp.status != L7ResponseStatus::default() { + if let Some(status) = custom.resp.status { self.msg_type = LogMessageType::Response; - self.status = custom.resp.status; + self.status = status; } if !custom.resp.result.is_empty() { diff --git a/agent/src/flow_generator/protocol_logs/plugin/custom_protocol_policy.rs b/agent/src/flow_generator/protocol_logs/plugin/custom_protocol_policy.rs index 895bd941415..df33807aeda 100644 --- a/agent/src/flow_generator/protocol_logs/plugin/custom_protocol_policy.rs +++ b/agent/src/flow_generator/protocol_logs/plugin/custom_protocol_policy.rs @@ -184,7 +184,11 @@ impl From<(&CustomPolicyInfo, PacketDirection)> for CustomInfo { endpoint: info.endpoint.clone(), }, resp: CustomInfoResp { - status: L7ResponseStatus::from(info.response_status.as_str()), + status: if info.response_status.is_empty() { + None + } else { + Some(L7ResponseStatus::from(info.response_status.as_str())) + }, code: info.response_code.clone(), exception: info.response_exception.clone(), result: info.response_result.clone(), diff --git a/agent/src/flow_generator/protocol_logs/rpc/dubbo.rs b/agent/src/flow_generator/protocol_logs/rpc/dubbo.rs index 771fc07042e..0774cdc9ed9 100644 --- a/agent/src/flow_generator/protocol_logs/rpc/dubbo.rs +++ b/agent/src/flow_generator/protocol_logs/rpc/dubbo.rs @@ -291,8 +291,10 @@ impl DubboInfo { self.status_code = Some(code); } - if custom.resp.status != self.resp_status { - self.resp_status = custom.resp.status; + if let Some(status) = custom.resp.status { + if status != self.resp_status { + self.resp_status = status; + } } if !custom.resp.result.is_empty() { diff --git a/agent/src/flow_generator/protocol_logs/rpc/sofa_rpc.rs b/agent/src/flow_generator/protocol_logs/rpc/sofa_rpc.rs index e4aa84491a4..76882b79d18 100644 --- a/agent/src/flow_generator/protocol_logs/rpc/sofa_rpc.rs +++ b/agent/src/flow_generator/protocol_logs/rpc/sofa_rpc.rs @@ -15,7 +15,7 @@ */ mod hessian; -use std::borrow::Cow; +use std::{borrow::Cow, mem}; use nom::InputTakeAtPosition; use public::{ @@ -34,11 +34,17 @@ use crate::{ config::handler::{LogParserConfig, TraceType}, flow_generator::{ protocol_logs::{ - pb_adapter::{ExtendedInfo, L7ProtocolSendLog, L7Request, L7Response, TraceInfo}, - set_captured_byte, swap_if, L7ResponseStatus, PrioFields, BASE_FIELD_PRIORITY, + consts::{APM_SPAN_ID_ATTR, SYS_RESPONSE_CODE_ATTR}, + pb_adapter::{ + ExtendedInfo, KeyVal, L7ProtocolSendLog, L7Request, L7Response, MetricKeyVal, + TraceInfo, + }, + set_captured_byte, swap_if, L7ResponseStatus, PrioField, PrioFields, + BASE_FIELD_PRIORITY, PLUGIN_FIELD_PRIORITY, }, AppProtoHead, Error, Result, }, + plugin::{wasm::WasmData, CustomInfo}, }; use self::hessian::{FieldEnum, HessianObjIterator}; @@ -169,6 +175,8 @@ impl TryFrom<&[u8]> for Hdr { #[derive(Debug, Default, Clone, Serialize)] pub struct SofaRpcInfo { is_tls: bool, + is_async: bool, + is_reversed: bool, proto: u8, req_id: u32, @@ -189,6 +197,15 @@ pub struct SofaRpcInfo { resp_code: u16, status: L7ResponseStatus, + custom_result: Option, + custom_exception: Option, + #[serde(skip)] + attributes: Vec, + #[serde(skip)] + metrics: Vec, + biz_type: u8, + biz_code: String, + biz_scenario: String, #[serde(skip)] is_on_blacklist: bool, @@ -219,6 +236,154 @@ impl SofaRpcInfo { } } + fn response_code_to_attribute(&mut self) { + self.attributes.push(KeyVal { + key: SYS_RESPONSE_CODE_ATTR.to_string(), + val: self.resp_code.to_string(), + }); + } + + fn merge(&mut self, other: &mut Self) { + if other.is_tls { + self.is_tls = other.is_tls; + } + if other.is_async { + self.is_async = other.is_async; + } + if other.is_reversed { + self.is_reversed = other.is_reversed; + } + if other.proto > 0 { + self.proto = other.proto; + } + if other.req_id > 0 { + self.req_id = other.req_id; + } + if other.cmd_code > 0 { + self.cmd_code = other.cmd_code; + } + if other.req_len > 0 { + self.req_len = other.req_len; + } + if other.resp_len > 0 { + self.resp_len = other.resp_len; + } + if other.resp_code > 0 { + self.resp_code = other.resp_code; + } + if other.status != L7ResponseStatus::default() { + self.status = other.status; + } + swap_if!(self, target_serv, is_empty, other); + swap_if!(self, method, is_empty, other); + swap_if!(self, span_id, is_empty, other); + swap_if!(self, parent_span_id, is_empty, other); + swap_if!(self, custom_result, is_none, other); + swap_if!(self, custom_exception, is_none, other); + swap_if!(self, endpoint, is_none, other); + let other_trace_ids = mem::take(&mut other.trace_ids); + self.trace_ids.merge(other_trace_ids); + self.attributes.append(&mut other.attributes); + self.metrics.append(&mut other.metrics); + if other.captured_request_byte > 0 { + self.captured_request_byte = other.captured_request_byte; + } + if other.captured_response_byte > 0 { + self.captured_response_byte = other.captured_response_byte; + } + if other.biz_type > 0 { + self.biz_type = other.biz_type; + } + swap_if!(self, biz_code, is_empty, other); + swap_if!(self, biz_scenario, is_empty, other); + if other.is_on_blacklist { + self.is_on_blacklist = other.is_on_blacklist; + } + } + + pub fn merge_custom_info(&mut self, custom: CustomInfo) { + if !custom.req.domain.is_empty() { + self.target_serv = custom.req.domain; + } + if !custom.req.req_type.is_empty() { + self.method = custom.req.req_type; + } + if !custom.req.endpoint.is_empty() { + self.endpoint = Some(custom.req.endpoint); + } + + if let Some(code) = custom.resp.code { + self.response_code_to_attribute(); + self.resp_code = code as u16; + } + if let Some(status) = custom.resp.status { + if status != self.status { + self.status = status; + } + } + if !custom.resp.result.is_empty() { + self.custom_result = Some(custom.resp.result); + } + if !custom.resp.exception.is_empty() { + self.custom_exception = Some(custom.resp.exception); + } + if !custom.resp.req_type.is_empty() { + self.method = custom.resp.req_type; + } + if !custom.resp.endpoint.is_empty() { + self.endpoint = Some(custom.resp.endpoint); + } + + self.trace_ids + .merge_same_priority(PLUGIN_FIELD_PRIORITY, custom.trace.trace_ids); + + if let Some(span_id) = custom.trace.span_id { + if !span_id.is_empty() { + let prev = mem::replace( + &mut self.span_id, + PrioField::new(PLUGIN_FIELD_PRIORITY, span_id).into_inner(), + ); + if !prev.is_empty() { + self.attributes.push(KeyVal { + key: APM_SPAN_ID_ATTR.to_string(), + val: prev, + }); + } + } + } + if let Some(parent_span_id) = custom.trace.parent_span_id { + if !parent_span_id.is_empty() { + self.parent_span_id = parent_span_id; + } + } + + if !custom.attributes.is_empty() { + self.attributes.extend(custom.attributes); + } + if !custom.metrics.is_empty() { + self.metrics.extend(custom.metrics); + } + if let Some(is_async) = custom.is_async { + self.is_async = is_async; + } + if let Some(is_reversed) = custom.is_reversed { + self.is_reversed = is_reversed; + } + if custom.biz_type > 0 { + self.biz_type = custom.biz_type; + } + if let Some(biz_code) = custom.biz_code { + if !biz_code.is_empty() { + self.biz_code = biz_code; + } + } + if let Some(biz_scenario) = custom.biz_scenario { + if !biz_scenario.is_empty() { + self.biz_scenario = biz_scenario; + } + } + } + fn set_is_on_blacklist(&mut self, config: &LogParserConfig) { if let Some(t) = config.l7_log_blacklist_trie.get(&L7Protocol::SofaRPC) { self.is_on_blacklist = t.request_resource.is_on_blacklist(&self.target_serv) @@ -239,14 +404,7 @@ impl L7ProtocolInfoInterface for SofaRpcInfo { fn merge_log(&mut self, other: &mut L7ProtocolInfo) -> Result<()> { if let L7ProtocolInfo::SofaRpcInfo(s) = other { - self.resp_len = s.resp_len; - self.resp_code = s.resp_code; - self.status = s.status; - self.captured_response_byte = s.captured_response_byte; - swap_if!(self, endpoint, is_none, s); - if s.is_on_blacklist { - self.is_on_blacklist = s.is_on_blacklist; - } + self.merge(s); } Ok(()) } @@ -263,10 +421,18 @@ impl L7ProtocolInfoInterface for SofaRpcInfo { self.is_tls } + fn is_reversed(&self) -> bool { + self.is_reversed + } + fn get_endpoint(&self) -> Option { self.endpoint.clone() } + fn get_biz_type(&self) -> u8 { + self.biz_type + } + fn is_on_blacklist(&self) -> bool { self.is_on_blacklist } @@ -274,11 +440,16 @@ impl L7ProtocolInfoInterface for SofaRpcInfo { impl From for L7ProtocolSendLog { fn from(s: SofaRpcInfo) -> Self { - let flags = if s.is_tls { - ApplicationFlags::TLS.bits() - } else { - ApplicationFlags::NONE.bits() - }; + let mut flags = ApplicationFlags::default(); + if s.is_tls { + flags = flags | ApplicationFlags::TLS; + } + if s.is_async { + flags = flags | ApplicationFlags::ASYNC; + } + if s.is_reversed { + flags = flags | ApplicationFlags::REVERSED; + } Self { captured_request_byte: s.captured_request_byte, captured_response_byte: s.captured_response_byte, @@ -286,6 +457,7 @@ impl From for L7ProtocolSendLog { resp_len: Some(s.resp_len), req: L7Request { req_type: s.method.clone(), + domain: s.target_serv.clone(), resource: s.target_serv.clone(), endpoint: s.endpoint.unwrap_or_default(), ..Default::default() @@ -293,6 +465,12 @@ impl From for L7ProtocolSendLog { resp: L7Response { status: s.status, code: Some(s.resp_code as i32), + exception: if s.status != L7ResponseStatus::Ok { + s.custom_exception.unwrap_or_default() + } else { + Default::default() + }, + result: s.custom_result.unwrap_or_default(), ..Default::default() }, trace_info: Some(TraceInfo { @@ -304,9 +482,13 @@ impl From for L7ProtocolSendLog { ext_info: Some(ExtendedInfo { rpc_service: Some(s.target_serv), request_id: Some(s.req_id), + attributes: Some(s.attributes), + metrics: Some(s.metrics), ..Default::default() }), - flags, + flags: flags.bits(), + biz_code: s.biz_code, + biz_scenario: s.biz_scenario, ..Default::default() } } @@ -353,6 +535,7 @@ impl L7ProtocolParserInterface for SofaRpcLog { info.endpoint = info.generate_endpoint(); info.is_tls = param.is_tls(); set_captured_byte!(info, param); + self.wasm_hook(param, payload, &mut info); if let Some(config) = param.parse_config { info.set_is_on_blacklist(config); } @@ -360,7 +543,8 @@ impl L7ProtocolParserInterface for SofaRpcLog { if param.parse_perf { let mut perf_stat = L7PerfStats::default(); if info.msg_type == LogMessageType::Response && info.endpoint.is_none() { - if let Some(endpoint) = info.load_endpoint_from_cache(param, false) { + if let Some(endpoint) = info.load_endpoint_from_cache(param, info.is_reversed) + { info.endpoint = Some(endpoint.to_string()); } } @@ -534,6 +718,17 @@ impl SofaRpcLog { Ok(true) } } + + fn wasm_hook(&mut self, param: &ParseParam, payload: &[u8], info: &mut SofaRpcInfo) { + let mut vm_ref = param.wasm_vm.borrow_mut(); + let Some(vm) = vm_ref.as_mut() else { + return; + }; + let wasm_data = WasmData::new(L7Protocol::SofaRPC); + if let Some(custom) = vm.on_custom_message(payload, param, wasm_data) { + info.merge_custom_info(custom); + } + } } #[derive(Debug)] @@ -679,11 +874,13 @@ mod test { }, flow_generator::{ protocol_logs::{ + pb_adapter::{KeyVal, L7ProtocolSendLog, MetricKeyVal}, rpc::sofa_rpc::{CMD_CODE_REQ, CMD_CODE_RESP, PROTO_BOLT_V1}, L7ResponseStatus, }, L7_RRT_CACHE_CAPACITY, }, + plugin::{CustomInfo, CustomInfoRequest, CustomInfoResp, CustomInfoTrace}, utils::test::Capture, }; @@ -914,4 +1111,156 @@ mod test { } ); } + + #[test] + fn test_sofarpc_merge_custom_info() { + let mut info = super::SofaRpcInfo { + target_serv: "svc-old".into(), + method: "method-old".into(), + req_id: 7, + req_len: 10, + resp_len: 20, + resp_code: 0, + status: L7ResponseStatus::Ok, + endpoint: Some("svc-old/method-old".into()), + ..Default::default() + }; + info.trace_ids + .merge_field(super::BASE_FIELD_PRIORITY, "trace-old".into()); + info.span_id = "span-old".into(); + + info.merge_custom_info(CustomInfo { + req_len: Some(999), + resp_len: Some(1999), + request_id: Some(666), + req: CustomInfoRequest { + req_type: "type-new".into(), + domain: "svc-new".into(), + endpoint: "svc-new/type-new".into(), + ..Default::default() + }, + resp: CustomInfoResp { + status: Some(L7ResponseStatus::ServerError), + code: Some(599), + result: "rewrite result".into(), + exception: "rewrite exception".into(), + ..Default::default() + }, + trace: CustomInfoTrace { + trace_ids: vec!["trace-new".into()], + span_id: Some("span-new".into()), + parent_span_id: Some("parent-new".into()), + ..Default::default() + }, + attributes: vec![ + KeyVal { + key: "k1".into(), + val: "v1".into(), + }, + KeyVal { + key: "k2".into(), + val: "v2".into(), + }, + ], + metrics: vec![MetricKeyVal { + key: "m1".into(), + val: 1.5, + }], + biz_type: 9, + biz_code: Some("biz-code".into()), + biz_scenario: Some("biz-scenario".into()), + is_async: Some(true), + is_reversed: Some(true), + ..Default::default() + }); + + let log: L7ProtocolSendLog = info.into(); + assert_eq!(log.req.req_type, "type-new"); + assert_eq!(log.req.domain, "svc-new"); + assert_eq!(log.req.resource, "svc-new"); + assert_eq!(log.req.endpoint, "svc-new/type-new"); + assert_eq!(log.resp.code, Some(599)); + assert_eq!(log.resp.status, L7ResponseStatus::ServerError); + assert_eq!(log.resp.result, "rewrite result"); + assert_eq!(log.resp.exception, "rewrite exception"); + assert_eq!( + log.trace_info.as_ref().unwrap().trace_ids, + vec!["trace-new".to_string(), "trace-old".to_string()] + ); + assert_eq!( + log.trace_info.as_ref().unwrap().span_id.as_deref(), + Some("span-new") + ); + assert_eq!( + log.trace_info.as_ref().unwrap().parent_span_id.as_deref(), + Some("parent-new") + ); + let ext = log.ext_info.unwrap(); + assert_eq!(ext.request_id, Some(7)); + assert_eq!(ext.rpc_service.as_deref(), Some("svc-new")); + let attrs = ext.attributes.unwrap(); + assert!(attrs.iter().any(|kv| kv.key == "sys_response_code" && kv.val == "0")); + assert!(attrs.iter().any(|kv| kv.key == "apm_span_id" && kv.val == "span-old")); + assert!(attrs.iter().any(|kv| kv.key == "k1" && kv.val == "v1")); + assert_eq!(ext.metrics.unwrap()[0].key, "m1"); + assert_eq!(log.flags & crate::common::meta_packet::ApplicationFlags::ASYNC.bits(), 2); + assert_ne!( + log.flags & crate::common::meta_packet::ApplicationFlags::REVERSED.bits(), + 0 + ); + assert_eq!(log.biz_code, "biz-code"); + assert_eq!(log.biz_scenario, "biz-scenario"); + } + + #[test] + fn test_sofarpc_merge_custom_info_ignore_empty_values() { + let mut info = super::SofaRpcInfo { + target_serv: "svc-old".into(), + method: "method-old".into(), + span_id: "span-old".into(), + parent_span_id: "parent-old".into(), + resp_code: 321, + status: L7ResponseStatus::ServerError, + biz_code: "biz-old".into(), + biz_scenario: "scenario-old".into(), + endpoint: Some("svc-old/method-old".into()), + ..Default::default() + }; + + info.merge_custom_info(CustomInfo { + req: CustomInfoRequest { + req_type: String::new(), + domain: String::new(), + endpoint: String::new(), + ..Default::default() + }, + resp: CustomInfoResp { + status: None, + code: None, + req_type: String::new(), + endpoint: String::new(), + result: String::new(), + exception: String::new(), + ..Default::default() + }, + trace: CustomInfoTrace { + span_id: Some(String::new()), + parent_span_id: Some(String::new()), + ..Default::default() + }, + biz_code: Some(String::new()), + biz_scenario: Some(String::new()), + ..Default::default() + }); + + assert_eq!(info.target_serv, "svc-old"); + assert_eq!(info.method, "method-old"); + assert_eq!(info.endpoint.as_deref(), Some("svc-old/method-old")); + assert_eq!(info.resp_code, 321); + assert_eq!(info.status, L7ResponseStatus::ServerError); + assert_eq!(info.span_id, "span-old"); + assert_eq!(info.parent_span_id, "parent-old"); + assert_eq!(info.biz_code, "biz-old"); + assert_eq!(info.biz_scenario, "scenario-old"); + } } diff --git a/agent/src/plugin/c_ffi.rs b/agent/src/plugin/c_ffi.rs index 9540a7462f7..644c31c9144 100644 --- a/agent/src/plugin/c_ffi.rs +++ b/agent/src/plugin/c_ffi.rs @@ -274,8 +274,10 @@ impl TryFrom for CustomInfo { ( CustomInfoRequest::default(), CustomInfoResp { - status: L7ResponseStatus::try_from(resp.status) - .map_err(|e| e.to_string())?, + status: Some( + L7ResponseStatus::try_from(resp.status) + .map_err(|e| e.to_string())?, + ), code: Some(resp.code), exception: c_str_to_string(&resp.exception).unwrap_or_default(), result: c_str_to_string(&resp.result).unwrap_or_default(), diff --git a/agent/src/plugin/mod.rs b/agent/src/plugin/mod.rs index e2e86d2b131..c87678ee84e 100644 --- a/agent/src/plugin/mod.rs +++ b/agent/src/plugin/mod.rs @@ -60,7 +60,7 @@ pub struct CustomInfoRequest { #[derive(Debug, Default, Serialize, Clone)] pub struct CustomInfoResp { - pub status: L7ResponseStatus, + pub status: Option, pub code: Option, pub exception: String, pub result: String, @@ -269,11 +269,11 @@ impl CustomInfo { // parse resp let status = buf[off]; match status { - 0 => info.resp.status = L7ResponseStatus::Ok, - 2 => info.resp.status = L7ResponseStatus::Timeout, - 3 => info.resp.status = L7ResponseStatus::ServerError, - 4 => info.resp.status = L7ResponseStatus::ClientError, - 5 => info.resp.status = L7ResponseStatus::Unknown, + 0 => info.resp.status = Some(L7ResponseStatus::Ok), + 2 => info.resp.status = Some(L7ResponseStatus::Timeout), + 3 => info.resp.status = Some(L7ResponseStatus::ServerError), + 4 => info.resp.status = Some(L7ResponseStatus::ClientError), + 5 => info.resp.status = Some(L7ResponseStatus::Unknown), _ => { return Err(Error::WasmSerializeFail( "recv unexpected status ".to_string(), @@ -473,16 +473,16 @@ impl CustomInfo { Some(pb::app_info::Info::Resp(r)) => { info.resp = CustomInfoResp { status: match r.status.and_then(|s| pb::AppRespStatus::try_from(s).ok()) { - Some(pb::AppRespStatus::RespOk) => L7ResponseStatus::Ok, - Some(pb::AppRespStatus::RespTimeout) => L7ResponseStatus::Timeout, - Some(pb::AppRespStatus::RespServerError) => L7ResponseStatus::ServerError, - Some(pb::AppRespStatus::RespClientError) => L7ResponseStatus::ClientError, - Some(pb::AppRespStatus::RespUnknown) => L7ResponseStatus::Unknown, - _ => { - return Err(Error::WasmSerializeFail( - "unexpected resp status".to_string(), - )) + Some(pb::AppRespStatus::RespOk) => Some(L7ResponseStatus::Ok), + Some(pb::AppRespStatus::RespTimeout) => Some(L7ResponseStatus::Timeout), + Some(pb::AppRespStatus::RespServerError) => { + Some(L7ResponseStatus::ServerError) + } + Some(pb::AppRespStatus::RespClientError) => { + Some(L7ResponseStatus::ClientError) } + Some(pb::AppRespStatus::RespUnknown) => Some(L7ResponseStatus::Unknown), + None => None, }, code: r.code, result: r.result.unwrap_or_default(), @@ -580,7 +580,7 @@ impl L7ProtocolInfoInterface for CustomInfo { self.captured_request_byte += w.captured_request_byte; // resp merge - if self.resp.status == L7ResponseStatus::default() { + if self.resp.status.is_none() { self.resp.status = w.resp.status; } @@ -686,7 +686,7 @@ impl From for L7ProtocolSendLog { }, }, resp: L7Response { - status: w.resp.status, + status: w.resp.status.unwrap_or_default(), code: w.resp.code, exception: w.resp.exception, result: w.resp.result, @@ -728,7 +728,7 @@ impl From<&CustomInfo> for LogCache { fn from(info: &CustomInfo) -> Self { LogCache { msg_type: info.msg_type, - resp_status: info.resp.status, + resp_status: info.resp.status.unwrap_or_default(), on_blacklist: info.is_on_blacklist, endpoint: info.get_endpoint(), ..Default::default() diff --git a/agent/src/plugin/shared_obj/test.rs b/agent/src/plugin/shared_obj/test.rs index 300428112c2..fe7a57eb69c 100644 --- a/agent/src/plugin/shared_obj/test.rs +++ b/agent/src/plugin/shared_obj/test.rs @@ -208,7 +208,7 @@ fn test_parse() { assert!(info.resp.exception.is_empty()); assert_eq!(info.resp.code.unwrap(), 0); assert_eq!(info.resp.result.as_str(), "110.242.68.66"); - assert_eq!(info.resp.status, L7ResponseStatus::Ok); + assert_eq!(info.resp.status, Some(L7ResponseStatus::Ok)); assert_eq!( info.trace.trace_ids.first().unwrap().as_str(), diff --git a/agent/src/plugin/wasm/test.rs b/agent/src/plugin/wasm/test.rs index 280696ed586..fed60579b24 100644 --- a/agent/src/plugin/wasm/test.rs +++ b/agent/src/plugin/wasm/test.rs @@ -370,7 +370,7 @@ fn test_wasm_parse_payload_resp() { assert_eq!(ci.req_len.unwrap(), 999); assert_eq!(ci.resp_len.unwrap(), 9999); assert_eq!(ci.request_id.unwrap(), 666); - assert_eq!(ci.resp.status, L7ResponseStatus::Ok); + assert_eq!(ci.resp.status, Some(L7ResponseStatus::Ok)); assert_eq!(ci.resp.code.unwrap(), 999); assert_eq!(ci.resp.result, "result"); assert_eq!(ci.resp.exception, "exception"); @@ -399,7 +399,7 @@ fn test_wasm_parse_payload_resp() { assert_eq!(ci.req_len.unwrap(), 999); assert_eq!(ci.resp_len.unwrap(), 9999); assert_eq!(ci.request_id.unwrap(), 666); - assert_eq!(ci.resp.status, L7ResponseStatus::Ok); + assert_eq!(ci.resp.status, Some(L7ResponseStatus::Ok)); assert_eq!(ci.resp.code.unwrap(), 999); assert_eq!(ci.resp.result, "result"); assert_eq!(ci.resp.exception, "exception");