diff --git a/agent/resources/test/flow_generator/http/openai_normal_usage.pcap b/agent/resources/test/flow_generator/http/openai_normal_usage.pcap new file mode 100644 index 00000000000..df21d493f9b Binary files /dev/null and b/agent/resources/test/flow_generator/http/openai_normal_usage.pcap differ diff --git a/agent/resources/test/flow_generator/http/openai_stream.pcap b/agent/resources/test/flow_generator/http/openai_stream.pcap new file mode 100644 index 00000000000..2e0f460b95e Binary files /dev/null and b/agent/resources/test/flow_generator/http/openai_stream.pcap differ diff --git a/agent/resources/test/flow_generator/http/openai_stream_usage.pcap b/agent/resources/test/flow_generator/http/openai_stream_usage.pcap new file mode 100644 index 00000000000..37e3dee487d Binary files /dev/null and b/agent/resources/test/flow_generator/http/openai_stream_usage.pcap differ diff --git a/agent/resources/test/flow_generator/http/openai_stream_v537.pcap b/agent/resources/test/flow_generator/http/openai_stream_v537.pcap new file mode 100644 index 00000000000..552544e0a22 Binary files /dev/null and b/agent/resources/test/flow_generator/http/openai_stream_v537.pcap differ diff --git a/agent/src/config/config.rs b/agent/src/config/config.rs index 9a1b1c25aaa..76ca91aa38d 100644 --- a/agent/src/config/config.rs +++ b/agent/src/config/config.rs @@ -2028,6 +2028,92 @@ impl Default for InferenceWhitelist { } } +#[derive(Clone, Debug, Default, Deserialize, PartialEq, Eq)] +#[serde(default)] +pub struct OpenAIBizDimExtractor { + pub headers: Vec, + pub json_paths: Vec, +} + +#[derive(Clone, Debug, Default, Deserialize, PartialEq, Eq)] +#[serde(default)] +pub struct OpenAIBizDimExtractors { + pub org_path: OpenAIBizDimExtractor, + pub user_id: OpenAIBizDimExtractor, + pub app_id: OpenAIBizDimExtractor, +} + +#[derive(Clone, Debug, Deserialize, PartialEq, Eq)] +#[serde(default)] +pub struct OpenAIUsageFieldPaths { + /// JSON paths (dot-notation) to read the input token count, tried in order. + pub input_tokens: Vec, + /// JSON paths (dot-notation) to read the output token count, tried in order. + pub output_tokens: Vec, + /// JSON paths (dot-notation) to read the total token count, tried in order. + pub total_tokens: Vec, + /// JSON paths (dot-notation) to read the cached token count, tried in order. + pub cached_tokens: Vec, +} + +impl Default for OpenAIUsageFieldPaths { + fn default() -> Self { + Self { + input_tokens: vec!["usage.prompt_tokens".to_string()], + output_tokens: vec!["usage.completion_tokens".to_string()], + total_tokens: vec!["usage.total_tokens".to_string()], + cached_tokens: vec![ + "usage.prompt_tokens_details.cached_tokens".to_string(), + "usage.cache_read_input_tokens".to_string(), + ], + } + } +} + +#[derive(Clone, Debug, Deserialize, PartialEq, Eq)] +#[serde(default)] +pub struct OpenAIApiConfig { + pub enabled: bool, + pub path_prefixes: Vec, + pub path_suffixes: Vec, + pub request_body_max_bytes: usize, + pub response_event_max_bytes: usize, + pub sse_buffer_max_bytes: usize, + pub usage_field_paths: OpenAIUsageFieldPaths, + pub biz_dimension_extractors: OpenAIBizDimExtractors, +} + +impl Default for OpenAIApiConfig { + fn default() -> Self { + Self { + enabled: false, + path_prefixes: vec![], + path_suffixes: vec![ + "/v1/chat/completions".to_string(), + "/v1/responses".to_string(), + ], + request_body_max_bytes: 65536, + response_event_max_bytes: 32768, + sse_buffer_max_bytes: 131072, + usage_field_paths: OpenAIUsageFieldPaths::default(), + biz_dimension_extractors: OpenAIBizDimExtractors { + org_path: OpenAIBizDimExtractor { + headers: vec!["x-org-path".to_string()], + json_paths: vec![], + }, + user_id: OpenAIBizDimExtractor { + headers: vec!["x-user-id".to_string()], + json_paths: vec![], + }, + app_id: OpenAIBizDimExtractor { + headers: vec!["appid".to_string()], + json_paths: vec![], + }, + }, + } + } +} + #[derive(Clone, Default, Debug, Deserialize, PartialEq, Eq)] #[serde(default)] pub struct ProtocolSpecialConfig { @@ -2037,6 +2123,7 @@ pub struct ProtocolSpecialConfig { pub net_sign: NetSignConfig, pub mysql: MysqlConfig, pub grpc: GrpcConfig, + pub openai_api: OpenAIApiConfig, } #[derive(Clone, Debug, Deserialize, PartialEq, Eq)] diff --git a/agent/src/config/handler.rs b/agent/src/config/handler.rs index 50bfe7ebeb4..0e8de068278 100644 --- a/agent/src/config/handler.rs +++ b/agent/src/config/handler.rs @@ -55,9 +55,10 @@ use super::config::{Ebpf, EbpfFileIoEvent, ProcessMatcher, SymbolTable}; use super::{ config::{ ApiResources, Config, DpdkSource, ExtraLogFields, ExtraLogFieldsInfo, HttpEndpoint, - HttpEndpointMatchRule, Iso8583ParseConfig, NetSignParseConfig, OracleConfig, PcapStream, - PortConfig, ProcessorsFlowLogTunning, RequestLogTunning, SessionTimeout, TagFilterOperator, - Timeouts, UserConfig, WebSphereMqParseConfig, GRPC_BUFFER_SIZE_MIN, + HttpEndpointMatchRule, Iso8583ParseConfig, NetSignParseConfig, OpenAIApiConfig, + OracleConfig, PcapStream, PortConfig, ProcessorsFlowLogTunning, RequestLogTunning, + SessionTimeout, TagFilterOperator, Timeouts, UserConfig, WebSphereMqParseConfig, + GRPC_BUFFER_SIZE_MIN, }, ConfigError, KubernetesPollerType, TrafficOverflowAction, }; @@ -1205,6 +1206,7 @@ pub struct LogParserConfig { pub unconcerned_dns_nxdomain_trie: DomainNameTrie, pub mysql_decompress_payload: bool, pub mysql_endpoint_disabled: bool, + pub openai_api: OpenAIApiConfig, pub custom_app: CustomAppConfig, } @@ -1225,6 +1227,7 @@ impl Default for LogParserConfig { unconcerned_dns_nxdomain_trie: DomainNameTrie::default(), mysql_decompress_payload: true, mysql_endpoint_disabled: true, + openai_api: OpenAIApiConfig::default(), custom_app: CustomAppConfig::default(), } } @@ -1272,6 +1275,7 @@ impl fmt::Debug for LogParserConfig { ) .field("mysql_decompress_payload", &self.mysql_decompress_payload) .field("mysql_endpoint_disabled", &self.mysql_endpoint_disabled) + .field("openai_api_enabled", &self.openai_api.enabled) .field("custom_app", &self.custom_app) .finish() } @@ -2382,6 +2386,13 @@ impl TryFrom<(Config, UserConfig)> for ModuleConfig { .protocol_special_config .mysql .endpoint_disabled, + openai_api: conf + .processors + .request_log + .application_protocol_inference + .protocol_special_config + .openai_api + .clone(), #[cfg(not(feature = "enterprise"))] custom_app: CustomAppConfig::default(), #[cfg(feature = "enterprise")] diff --git a/agent/src/flow_generator/protocol_logs.rs b/agent/src/flow_generator/protocol_logs.rs index 13a6d8efb4b..7f48d454a8b 100644 --- a/agent/src/flow_generator/protocol_logs.rs +++ b/agent/src/flow_generator/protocol_logs.rs @@ -19,6 +19,7 @@ pub(crate) mod dns; pub(crate) mod fastcgi; pub(crate) mod http; pub(crate) mod mq; +pub(crate) mod openai_api; mod parser; pub mod pb_adapter; pub(crate) mod ping; @@ -413,15 +414,30 @@ impl AppProtoLogsBaseInfo { } // go http2 uprobe may merge multi times, if not req and resp merge can not set to session + // Save whether this entry was already a Session before the merge so we can + // decide whether to recompute rrt below. + let was_already_session = self.head.msg_type == LogMessageType::Session; if self.head.msg_type != log.head.msg_type { self.head.msg_type = LogMessageType::Session; } - self.head.rrt = if self.end_time > self.start_time { - (self.end_time.as_micros() - self.start_time.as_micros()) as u64 - } else { - 0 - }; + // Freeze rrt after the first req→resp merge. + // + // On the initial merge (Request + Response → Session), end_time equals the + // first-response packet time, so `end_time - start_time` naturally gives + // first-response latency — the same semantics as non-streaming HTTP. + // + // For multi-merge protocols (SSE streaming, Go HTTP2 uprobe), each + // continuation packet would push end_time forward and make rrt equal to + // the total stream duration instead. Skipping the recomputation once the + // entry is already a Session preserves the first-response latency value. + if !was_already_session { + self.head.rrt = if self.end_time > self.start_time { + (self.end_time.as_micros() - self.start_time.as_micros()) as u64 + } else { + 0 + }; + } if self.biz_type == 0 { self.biz_type = log.biz_type; diff --git a/agent/src/flow_generator/protocol_logs/consts.rs b/agent/src/flow_generator/protocol_logs/consts.rs index ea57563e0a3..b451ede801d 100644 --- a/agent/src/flow_generator/protocol_logs/consts.rs +++ b/agent/src/flow_generator/protocol_logs/consts.rs @@ -33,7 +33,7 @@ pub const HTTP_STATUS_CLIENT_ERROR_MIN: u16 = 400; pub const HTTP_STATUS_CLIENT_ERROR_MAX: u16 = 499; pub const HTTP_STATUS_SERVER_ERROR_MIN: u16 = 500; pub const HTTP_STATUS_SERVER_ERROR_MAX: u16 = 600; -pub const HTTP_RESP_MIN_LEN: usize = 13; // 响应行:"HTTP/1.1 200 " +pub const HTTP_RESP_MIN_LEN: usize = 12; // 响应行:"HTTP/1.1 200"(reason phrase 可省略,RFC 7230 允许) pub const HTTP_HOST_OFFSET: usize = 6; pub const HTTP_CONTENT_LENGTH_OFFSET: usize = 16; diff --git a/agent/src/flow_generator/protocol_logs/http.rs b/agent/src/flow_generator/protocol_logs/http.rs index d3a788f5ac6..f359de60dfa 100644 --- a/agent/src/flow_generator/protocol_logs/http.rs +++ b/agent/src/flow_generator/protocol_logs/http.rs @@ -23,6 +23,7 @@ use std::{ }; use hpack::Decoder; +use log::debug; use nom::{AsBytes, ParseTo}; use serde::Serialize; @@ -33,6 +34,7 @@ use public_derive::L7Log; use super::{ consts::*, + openai_api, pb_adapter::{ ExtendedInfo, KeyVal, L7ProtocolSendLog, L7Request, L7Response, MetricKeyVal, TraceInfo, }, @@ -339,6 +341,16 @@ pub struct HttpInfo { #[serde(skip)] metrics: Vec, + /// OpenAI API accumulated session state; Some only when this HttpInfo + /// carries OpenAI-specific data (request biz-dims, response metrics, etc.). + #[serde(skip)] + pub openai_session: Option>, + + /// True when this is an OpenAI streaming request that requires multi-merge. + /// Drives `need_merge()` for HTTP/1 streaming sessions. + #[serde(skip)] + openai_need_merge: bool, + #[serde(skip)] is_on_blacklist: bool, @@ -395,7 +407,7 @@ impl L7ProtocolInfoInterface for HttpInfo { fn need_merge(&self) -> bool { match self.raw_data_type { L7ProtoRawDataType::GoHttp2Uprobe => true, - _ => false, + _ => self.openai_need_merge, } } @@ -604,6 +616,14 @@ impl HttpInfo { if other.is_resp_end { self.is_resp_end = true; } + // For OpenAI multi-merge: the request entry is cached with is_req_end=false + // so the session aggregator doesn't discard it. Each response packet + // carries is_req_end=true so we propagate it here to let is_session_end() + // = is_req_end && is_resp_end eventually return true. + // For normal HTTP this is a no-op since responses never set is_req_end. + if other.is_req_end { + self.is_req_end = true; + } self.captured_response_byte += other.captured_response_byte; if other.status != L7ResponseStatus::Ok { @@ -642,6 +662,34 @@ impl HttpInfo { super::swap_if!(self, x_request_id_1, is_default, other); self.attributes.append(&mut other.attributes); self.metrics.append(&mut other.metrics); + + // Merge OpenAI session: the response (or final SSE packet) carries the + // fully-accumulated session. Always prefer the incoming session over the + // stored one because: + // • For non-streaming: the request stores a partial clone; the response + // has the complete session with parsed usage. + // • For streaming: the request stores None; the final SSE packet has the + // complete session. + // Replacing unconditionally is safe — SSE continuations that have not + // completed yet carry None, so the `if let` guard prevents overwriting. + if let Some(other_session) = other.openai_session.take() { + debug!( + "openai: merge – {} openai_session (kind={:?} events={} usage={:?})", + if self.openai_session.is_some() { + "replacing" + } else { + "setting" + }, + other_session.kind, + other_session.stream_event_count, + other_session.usage.as_ref().map(|u| u.total_tokens), + ); + self.openai_session = Some(other_session); + } + if other.openai_need_merge { + self.openai_need_merge = true; + } + Ok(()) } @@ -857,6 +905,38 @@ impl From for L7ProtocolSendLog { } } + // OpenAI API: populate attributes/metrics from the accumulated session state. + let openai_protocol_str = if let Some(session) = f.openai_session.take() { + let (ttft, tpot) = session.compute_timings(); + debug!( + "openai: converting to send log: kind={:?} stream={} usage_status={:?} \ + events={} ttft={:?} tpot={:?} tokens={:?} stream_end_ts={:?} req_ts={}", + session.kind, + session.is_stream, + session.usage_status, + session.stream_event_count, + ttft, + tpot, + session.usage.as_ref().map(|u| u.total_tokens), + session.stream_end_ts_us, + session.request_ts_us, + ); + // Biz dimension attrs (org_path/user_id/app_id) are pushed directly to + // f.attributes at REQUEST time so they appear even on timed-out sessions. + // populate_log also emits them (to capture body-sourced attrs from TCP + // continuation segments). Remove the direct-push duplicates first so the + // merged log has each attr exactly once with the latest session value. + f.attributes.retain(|kv| { + kv.key != openai_api::ATTR_BIZ_ORG_PATH + && kv.key != openai_api::ATTR_BIZ_USER_ID + && kv.key != openai_api::ATTR_BIZ_APP_ID + }); + session.populate_log(&mut f.attributes, &mut f.metrics); + Some(openai_api::BIZ_PROTOCOL.to_string()) + } else { + None + }; + L7ProtocolSendLog { req_len: f.req_content_length, resp_len: f.resp_content_length, @@ -913,6 +993,7 @@ impl From for L7ProtocolSendLog { user_agent: f.user_agent, referer: f.referer, rpc_service: f.service_name, + protocol_str: openai_protocol_str, attributes: { if f.attributes.is_empty() { None @@ -966,6 +1047,11 @@ pub struct HttpLog { http2_req_decoder: Option>, http2_resp_decoder: Option>, + /// Per-session OpenAI state accumulated across multiple response packets. + /// Created when an OpenAI streaming request is first seen; cleared when the + /// stream ends or the session is reset. + openai_session: Option>, + #[cfg(feature = "enterprise")] custom_field_store: Store, } @@ -1049,6 +1135,23 @@ impl L7ProtocolParserInterface for HttpLog { match self.proto { L7Protocol::Http1 => { + // Per-packet trace: only at debug level to avoid flooding production logs. + if config.openai_api.enabled && param.direction == PacketDirection::ServerToClient { + debug!( + "openai: flow={} parse_payload Http1 direction={:?} \ + len={} starts={:?} session={}", + param.flow_id, + param.direction, + payload.len(), + &payload[..payload.len().min(8)], + if self.openai_session.is_some() { + "Some" + } else { + "None" + }, + ); + } + let mut info = HttpInfo { proto: self.proto, is_tls: param.is_tls(), @@ -1056,27 +1159,51 @@ impl L7ProtocolParserInterface for HttpLog { ..Default::default() }; - let l7_payload = self.parse_http_v1( + // Try standard HTTP/1 parsing first. + let parse_result = self.parse_http_v1( payload, param, &mut info, #[cfg(feature = "enterprise")] custom_policies, - )?; - self.set_info_by_config( - param, - config, - payload, - Some(l7_payload), - &mut info, - #[cfg(feature = "enterprise")] - custom_policies, ); - if param.parse_log { - Ok(L7ParseResult::Single(L7ProtocolInfo::HttpInfo(info))) - } else { - Ok(L7ParseResult::None) + match parse_result { + Ok(l7_payload) => { + self.set_info_by_config( + param, + config, + payload, + Some(l7_payload), + &mut info, + #[cfg(feature = "enterprise")] + custom_policies, + ); + + // OpenAI API enhancement after successful HTTP parse. + self.handle_openai_http1(payload, l7_payload, param, config, &mut info); + + if param.parse_log { + Ok(L7ParseResult::Single(L7ProtocolInfo::HttpInfo(info))) + } else { + Ok(L7ParseResult::None) + } + } + Err(http_err) => { + // Not a valid HTTP/1 header – check if this is an OpenAI SSE + // continuation packet belonging to an active streaming session. + if let Some(sse_info) = + self.handle_openai_sse_continuation(payload, param, config) + { + if param.parse_log { + Ok(L7ParseResult::Single(L7ProtocolInfo::HttpInfo(sse_info))) + } else { + Ok(L7ParseResult::None) + } + } else { + Err(http_err) + } + } } } L7Protocol::Http2 | L7Protocol::Grpc | L7Protocol::Triple => { @@ -1190,6 +1317,11 @@ impl L7ProtocolParserInterface for HttpLog { new_log.perf_stats = self.perf_stats(); new_log.http2_req_decoder = self.http2_req_decoder.take(); new_log.http2_resp_decoder = self.http2_resp_decoder.take(); + // Preserve an active OpenAI streaming session across per-packet resets. + // reset() is called after every packet; without this the RESPONSE packet + // would find openai_session=None because the REQUEST packet's session was + // discarded. Matches the same pattern as http2_req/resp_decoder above. + new_log.openai_session = self.openai_session.take(); *self = new_log; } @@ -1346,6 +1478,457 @@ impl HttpLog { self.http2_resp_decoder = Some(Decoder::new_with_expected_headers(expected_headers_set)); } + // ─── OpenAI API integration helpers ───────────────────────────────────── + + /// Called after a successful HTTP/1 parse to apply OpenAI-specific logic: + /// - On request: create/init an OpenAISession if the path matches. + /// - On response: feed the body into the SSE state machine or parse JSON usage. + fn handle_openai_http1( + &mut self, + full_payload: &[u8], + body: &[u8], + param: &ParseParam, + config: &LogParserConfig, + info: &mut HttpInfo, + ) { + if !config.openai_api.enabled { + return; + } + + match info.msg_type { + LogMessageType::Request => { + if info.method != Method::Post { + return; + } + if !openai_api::is_openai_path(&info.path, config) { + debug!( + "openai: flow={} path={} not matched by prefixes={:?} suffixes={:?}", + param.flow_id, + info.path, + config.openai_api.path_prefixes, + config.openai_api.path_suffixes + ); + return; + } + + let kind = openai_api::kind_from_path(&info.path); + let mut session = Box::new(openai_api::OpenAISession::new( + kind, + false, + param.time, + config.openai_api.sse_buffer_max_bytes, + &config.openai_api.usage_field_paths, + )); + + self.extract_openai_headers_from_payload(full_payload, &mut session, config); + + if !body.is_empty() { + openai_api::parse_request_body(&mut session, body, config); + } + + // is_req_end must stay false: the session aggregator discards any + // first-seen packet with need_merge=true && (req_end || resp_end). + // The response side propagates is_req_end back via merge(). + info.stream_id = Some(session.stream_id); + info.is_resp_end = false; + info.openai_need_merge = true; + + debug!( + "openai: flow={} REQUEST path={} kind={:?} stream={} stream_id={} \ + biz_user={:?} biz_app={:?} biz_org={:?} body_bytes={}", + param.flow_id, + info.path, + kind, + session.is_stream, + session.stream_id, + session.biz_user_id, + session.biz_app_id, + session.biz_org_path, + body.len(), + ); + + // Write biz-dimension attributes on the request packet so they survive + // even if the final merged entry loses them. + if let Some(v) = &session.biz_org_path { + info.attributes.push(KeyVal { + key: openai_api::ATTR_BIZ_ORG_PATH.to_string(), + val: v.clone(), + }); + } + if let Some(v) = &session.biz_user_id { + info.attributes.push(KeyVal { + key: openai_api::ATTR_BIZ_USER_ID.to_string(), + val: v.clone(), + }); + } + if let Some(v) = &session.biz_app_id { + info.attributes.push(KeyVal { + key: openai_api::ATTR_BIZ_APP_ID.to_string(), + val: v.clone(), + }); + } + + // For non-streaming sessions, also attach a clone of the initial + // session state to the REQUEST info. If the response is never seen + // (packet drop, MTU issue, etc.) the session-aggregator times out + // the REQUEST entry but it will still be tagged as openai-api with + // the request-side metadata (path, biz dimensions). + // When the real response arrives, merge() replaces this clone with + // the response's fully-populated session. + if !session.is_stream { + info.openai_session = Some(session.clone()); + } + + self.openai_session = Some(session); + } + + LogMessageType::Response => { + let (stream_id, is_already_stream) = match self.openai_session.as_ref() { + Some(s) => (s.stream_id, s.is_stream), + None => { + debug!( + "openai: flow={} RESPONSE arrived but no openai_session \ + (mid-flow capture or session already finished)", + param.flow_id + ); + return; + } + }; + + info.stream_id = Some(stream_id); + // is_req_end=true will be propagated into the stored REQUEST entry + // via merge(), satisfying is_session_end() = is_req_end && is_resp_end. + info.is_req_end = true; + info.openai_need_merge = true; + + // Scan headers once and derive both flags from the same pass. + let (is_sse, is_chunked) = Self::response_sse_and_chunked(full_payload, body); + let is_stream = is_already_stream || is_sse; + + debug!( + "openai: flow={} RESPONSE stream_id={} status={:?} is_already_stream={} \ + is_sse={} is_chunked={} body_bytes={}", + param.flow_id, + stream_id, + info.status_code, + is_already_stream, + is_sse, + is_chunked, + body.len(), + ); + + // Propagate chunked flag to the session so continuation packets + // can decode chunk framing before feeding to the SSE state machine. + if is_chunked { + if let Some(s) = self.openai_session.as_mut() { + s.is_chunked_transfer = true; + } + } + + if is_stream { + let done = { + let session = self.openai_session.as_mut().unwrap(); + session.is_stream = true; + // Upgrade Unknown → Missing now that we know this is a stream. + // Unknown means "not yet determined"; Missing means "expected but + // not yet seen", which is the correct state for an in-progress SSE. + if session.usage_status == openai_api::UsageStatus::Unknown { + session.usage_status = openai_api::UsageStatus::Missing; + } + // For chunked SSE, the first response body is empty (headers + // only) so feed_sse is a no-op here; actual SSE events arrive + // in subsequent continuation packets. + session.feed_sse(body, param.time) + }; + info.is_resp_end = done; + debug!( + "openai: flow={} SSE response fed stream_id={} done={} \ + events={} usage_status={:?}", + param.flow_id, + stream_id, + done, + self.openai_session + .as_ref() + .map(|s| s.stream_event_count) + .unwrap_or(0), + self.openai_session + .as_ref() + .map(|s| s.usage_status) + .unwrap_or_default(), + ); + if done { + info.openai_session = self.openai_session.take(); + } + } else { + { + let session = self.openai_session.as_mut().unwrap(); + if !body.is_empty() { + openai_api::parse_response_json(session, body, config); + } + // Non-streaming: stream ends at the response packet. + session.stream_end_ts_us = Some(param.time); + } + info.is_resp_end = true; + info.openai_session = self.openai_session.take(); + debug!( + "openai: flow={} non-stream RESPONSE done stream_id={} usage_status={:?}", + param.flow_id, + stream_id, + info.openai_session + .as_ref() + .map(|s| s.usage_status) + .unwrap_or_default(), + ); + } + } + + _ => {} + } + } + + /// Scan raw HTTP/1 headers in `payload` and extract OpenAI biz dimensions. + fn extract_openai_headers_from_payload( + &self, + payload: &[u8], + session: &mut openai_api::OpenAISession, + config: &LogParserConfig, + ) { + let mut headers = parse_v1_headers(payload); + let _ = headers.next(); // skip request line + for line in headers { + if let Some(col) = line.find(':') { + if col + 1 >= line.len() { + continue; + } + // extract_biz_from_header uses eq_ignore_ascii_case internally, + // so no need to lowercase here. + let key = line[..col].trim(); + let val = line[col + 1..].trim(); + openai_api::extract_biz_from_header(session, key, val, config); + } + } + } + + /// Scan the HTTP/1 response headers once and return `(is_sse, is_chunked)`. + /// + /// Combining both checks avoids two separate O(n) scans for `\r\n\r\n`. + fn response_sse_and_chunked(full_payload: &[u8], body: &[u8]) -> (bool, bool) { + // Fast path: body already starts with SSE markers (no header scan needed). + let body_is_sse = body.starts_with(b"data:") || body.starts_with(b"event:"); + + // Find the end of headers (single O(n) scan). + let header_end = full_payload + .windows(4) + .position(|w| w == b"\r\n\r\n") + .unwrap_or(full_payload.len()); + let headers = &full_payload[..header_end]; + + // Case-insensitive header search without allocation: compare each + // window byte-by-byte with the lower-case needle. + let header_contains = |needle: &[u8]| -> bool { + if headers.len() < needle.len() { + return false; + } + headers.windows(needle.len()).any(|w| { + w.iter() + .zip(needle.iter()) + .all(|(a, b)| a.to_ascii_lowercase() == *b) + }) + }; + + let is_sse = body_is_sse || header_contains(b"text/event-stream"); + let is_chunked = header_contains(b"transfer-encoding: chunked"); + (is_sse, is_chunked) + } + + /// Handle raw TCP payload that is NOT a valid HTTP/1 header but belongs to + /// an active OpenAI session. Covers three cases: + /// + /// 1. **Request body continuation** (`ClientToServer`): when the POST body arrives + /// in a separate TCP segment from the headers, parse it to detect `"stream": true` + /// before the response arrives. + /// + /// 2. **SSE continuation** (`is_stream=true`, `ServerToClient`): feed the raw bytes + /// into the SSE state machine and forward progress to the session aggregator. + /// + /// 3. **Non-streaming fallback** (`is_stream=false`, `ServerToClient`): when + /// `parse_http_v1` fails for a non-streaming response (e.g., body-only TCP + /// segment, or an unusual response format), complete the session immediately + /// so the cached REQUEST is not left to time out. + fn handle_openai_sse_continuation( + &mut self, + payload: &[u8], + param: &ParseParam, + config: &LogParserConfig, + ) -> Option { + if !config.openai_api.enabled { + return None; + } + + let (stream_id, is_stream) = match self.openai_session.as_ref() { + Some(s) => (s.stream_id, s.is_stream), + None => { + // No active session — normal for non-OpenAI flows. + if param.direction == PacketDirection::ServerToClient { + debug!( + "openai: flow={} non-HTTP server→client payload but no active session", + param.flow_id, + ); + } + return None; + } + }; + + // ── Client→server continuation (request body in separate TCP segment) ── + // When the POST body arrives after the HTTP headers in a later TCP segment, + // parse_http_v1 fails for that segment. Parse the payload as a request body + // to pick up the "stream" flag before the response arrives. + if param.direction == PacketDirection::ClientToServer { + if !is_stream && !payload.is_empty() { + let session = self.openai_session.as_mut().unwrap(); + openai_api::parse_request_body(session, payload, config); + debug!( + "openai: flow={} request body continuation parsed stream_id={} is_stream={}", + param.flow_id, stream_id, session.is_stream, + ); + } + return None; + } + + // ── Non-streaming fallback ──────────────────────────────────────────── + // parse_http_v1 failed for a server→client packet while a non-streaming + // session is active. The packet is likely a body-continuation segment + // (headers were in a prior segment) or an oddly-formatted first response. + // Complete the session now so the cached REQUEST is not left to time out. + // + // EXCEPTION: any HTTP-looking payload (starts with "HTTP/") where + // parse_http_v1 failed — e.g. 1xx informational responses (100 Continue, + // 103 Early Hints) or a reason-phrase-less "HTTP/1.1 NNN" that was + // rejected by a too-strict length check. Preserve the session so the + // real response that follows can be matched. + if !is_stream { + // Don't consume the session for any packet that looks like an HTTP + // response header (starts with "HTTP/"). parse_http_v1 may have + // legitimately rejected it (1xx status, reason-phrase-less status + // line, unsupported version), but the actual response is coming. + if payload.starts_with(b"HTTP/") { + debug!( + "openai: flow={} HTTP-header-like packet rejected by parse_http_v1 \ + (preserving session stream_id={}), starts={:?}", + param.flow_id, + stream_id, + &payload[..payload.len().min(16)], + ); + return None; + } + let session = self.openai_session.as_mut().unwrap(); + if !payload.is_empty() { + // Best-effort: the payload might be the JSON body; extract usage + // if it parses. Failure is silent (usage_status stays Missing). + openai_api::parse_response_json(session, payload, config); + } + session.stream_end_ts_us = Some(param.time); + let mut info = HttpInfo { + proto: self.proto, + is_tls: param.is_tls(), + msg_type: LogMessageType::Response, + stream_id: Some(stream_id), + is_req_end: true, + is_resp_end: true, + openai_need_merge: true, + ..Default::default() + }; + info.openai_session = self.openai_session.take(); + debug!( + "openai: flow={} non-stream RESPONSE fallback stream_id={} \ + (HTTP parse failed, completing session) usage_status={:?}", + param.flow_id, + stream_id, + info.openai_session + .as_ref() + .map(|s| s.usage_status) + .unwrap_or_default(), + ); + return Some(info); + } + + // ── SSE continuation ───────────────────────────────────────────────── + + let done = { + let session = self.openai_session.as_mut().unwrap(); + if session.is_chunked_transfer { + // Decode HTTP chunked framing into the session's reusable scratch + // buffer (zero extra allocation per continuation packet). + let ok = { + // Temporarily move the scratch buffer out so we can mutably + // borrow both it and the rest of `session`. + let mut scratch = std::mem::take(&mut session.chunked_decode_buf); + // Terminal chunk can appear in the same TCP segment as the + // final SSE events (usage + [DONE]). Always feed decoded data + // first; mark stream done afterward. + let is_terminal = openai_api::decode_chunked_sse_into(payload, &mut scratch); + let sse_done = if !scratch.is_empty() { + session.feed_sse(&scratch, param.time) + } else { + false + }; + let result = if is_terminal { + if !session.stream_completed { + session.stream_completed = true; + } + session.stream_end_ts_us.get_or_insert(param.time); + true + } else { + sse_done + }; + session.chunked_decode_buf = scratch; // restore (reuses capacity) + result + }; + ok + } else { + session.feed_sse(payload, param.time) + } + }; + + debug!( + "openai: flow={} SSE continuation stream_id={} payload_bytes={} done={} \ + events={} usage_status={:?}", + param.flow_id, + stream_id, + payload.len(), + done, + self.openai_session + .as_ref() + .map(|s| s.stream_event_count) + .unwrap_or(0), + self.openai_session + .as_ref() + .map(|s| s.usage_status) + .unwrap_or_default(), + ); + + let mut info = HttpInfo { + proto: self.proto, + is_tls: param.is_tls(), + msg_type: LogMessageType::Response, + stream_id: Some(stream_id), + is_req_end: true, + is_resp_end: done, + openai_need_merge: true, + ..Default::default() + }; + + if done { + info.openai_session = self.openai_session.take(); + debug!( + "openai: flow={} SSE stream DONE stream_id={} – moving session to HttpInfo", + param.flow_id, stream_id, + ); + } + + Some(info) + } + fn http1_check_protocol(&mut self, payload: &[u8]) -> Option { let mut headers = parse_v1_headers(payload); let Some(first_line) = headers.next() else { @@ -3375,4 +3958,331 @@ mod tests { .check_payload("GET / HTTP/1.1\r\n\r\n".as_bytes(), ¶m) .is_some()); } + + // ── OpenAI API tests ──────────────────────────────────────────────────── + + /// Build a LogParserConfig with OpenAI API enabled, accepting any path that + /// contains "completions" as suffix, so that the test pcap paths (which may + /// not start with "/v1/") still match. + fn openai_test_config() -> LogParserConfig { + use crate::config::config::{ + OpenAIApiConfig, OpenAIBizDimExtractor, OpenAIBizDimExtractors, OpenAIUsageFieldPaths, + }; + LogParserConfig { + openai_api: OpenAIApiConfig { + enabled: true, + // Accept paths that end with "completions" so that the test pcap + // path /model-center/api/llm/openai/v1/chat/completions matches. + path_prefixes: vec![], + path_suffixes: vec!["completions".to_string()], + request_body_max_bytes: 65536, + response_event_max_bytes: 32768, + sse_buffer_max_bytes: 524288, + usage_field_paths: OpenAIUsageFieldPaths::default(), + biz_dimension_extractors: OpenAIBizDimExtractors { + org_path: OpenAIBizDimExtractor { + headers: vec!["x-org-path".to_string()], + json_paths: vec!["metadata.org_path".to_string()], + }, + user_id: OpenAIBizDimExtractor { + headers: vec!["x-user-id".to_string()], + json_paths: vec![ + "safety_identifier".to_string(), + "user".to_string(), + "source_aigc_appid".to_string(), + "metadata.user_id".to_string(), + ], + }, + app_id: OpenAIBizDimExtractor { + headers: vec!["x-app-id".to_string()], + json_paths: vec![ + "appid".to_string(), + "source_appid".to_string(), + "metadata.app_id".to_string(), + ], + }, + }, + }, + ..Default::default() + } + } + + /// Run all packets in a pcap file through the HTTP1 parser and collect the + /// resulting `HttpInfo` objects (merged request+response pairs). + fn run_openai_pcap(pcap_name: &str, config: &LogParserConfig) -> Vec { + let capture = Capture::load_pcap(Path::new(FILE_DIR).join(pcap_name)); + let log_cache = Rc::new(RefCell::new(L7PerfCache::new(L7_RRT_CACHE_CAPACITY))); + let mut packets: Vec<_> = capture.collect(); + if packets.is_empty() { + return vec![]; + } + + let first_dst_port = packets[0].lookup_key.dst_port; + let mut parser = HttpLog::new_v1(); + let mut results: Vec = Vec::new(); + + for packet in packets.iter_mut() { + packet.lookup_key.direction = if packet.lookup_key.dst_port == first_dst_port { + PacketDirection::ClientToServer + } else { + PacketDirection::ServerToClient + }; + let payload = match packet.get_l4_payload() { + Some(p) => p, + None => continue, + }; + + let param = &mut ParseParam::new( + packet as &MetaPacket, + Some(log_cache.clone()), + Default::default(), + #[cfg(any(target_os = "linux", target_os = "android"))] + Default::default(), + true, + true, + ); + param.set_captured_byte(payload.len()); + param.set_log_parser_config(config); + + match parser.parse_payload(payload, param) { + Ok(L7ParseResult::Single(L7ProtocolInfo::HttpInfo(info))) => { + // Merge responses / SSE continuations into the previous entry. + // A Request starts a new session; everything else merges into the current one. + if info.msg_type != LogMessageType::Request { + if let Some(last) = results.last_mut() { + let mut other = info; + let _ = last.merge(&mut other); + continue; + } + } + results.push(info); + } + _ => {} + } + } + results + } + + fn attr_val<'a>(info: &'a HttpInfo, key: &str) -> Option<&'a str> { + info.attributes + .iter() + .find(|kv| kv.key == key) + .map(|kv| kv.val.as_str()) + } + + fn metric_val(info: &HttpInfo, key: &str) -> Option { + info.metrics + .iter() + .find(|kv| kv.key == key) + .map(|kv| kv.val) + } + + #[test] + fn test_openai_normal_usage_pcap() { + let config = openai_test_config(); + let results = run_openai_pcap("openai_normal_usage.pcap", &config); + + // Expect at least one merged request+response. + assert!( + !results.is_empty(), + "no results from openai_normal_usage.pcap" + ); + let info = &results[0]; + + // openai_session should carry the final state. + let session = info.openai_session.as_ref().expect("openai_session absent"); + + // For a non-streaming response, usage should be available. + assert!( + matches!( + session.usage_status, + crate::flow_generator::protocol_logs::openai_api::UsageStatus::Available + ), + "usage_status should be Available, got {:?}", + session.usage_status + ); + let usage = session.usage.as_ref().expect("usage absent"); + assert!(usage.input_tokens > 0, "input_tokens should be > 0"); + assert!(usage.output_tokens > 0, "output_tokens should be > 0"); + } + + #[test] + fn test_openai_stream_usage_pcap() { + let config = openai_test_config(); + let results = run_openai_pcap("openai_stream_usage.pcap", &config); + + assert!( + !results.is_empty(), + "no results from openai_stream_usage.pcap" + ); + // Find the merged entry that has openai_session populated. + let info = results + .iter() + .find(|i| i.openai_session.is_some()) + .expect("no result with openai_session"); + + let session = info.openai_session.as_ref().unwrap(); + + // Streaming pcap should be detected as stream. + assert!(session.is_stream, "should be detected as stream"); + + // Usage should be available (stream includes usage in chunks). + assert!( + matches!( + session.usage_status, + crate::flow_generator::protocol_logs::openai_api::UsageStatus::Available + ), + "usage_status should be Available" + ); + + let usage = session.usage.as_ref().expect("usage absent"); + assert!(usage.input_tokens > 0, "input_tokens should be > 0"); + assert!(usage.output_tokens > 0, "output_tokens should be > 0"); + assert!( + session.stream_event_count > 0, + "stream_event_count should be > 0" + ); + } + + /// Regression test for `openai_stream_v537.pcap`. + /// + /// This pcap uses HTTP chunked-transfer-encoding where each SSE event is + /// spread across three chunks: `data:`, `{json}\n`, and `\r\n` (blank line). + /// After chunk decoding the event boundary is `\n\r\n` (not `\n\n`). + /// Also, every chunk in this pcap includes inline usage data. + #[test] + fn test_openai_stream_v537_pcap() { + let config = openai_test_config(); + let results = run_openai_pcap("openai_stream_v537.pcap", &config); + + assert!( + !results.is_empty(), + "no results from openai_stream_v537.pcap" + ); + + let info = results + .iter() + .find(|i| i.openai_session.is_some()) + .expect("no result with openai_session"); + let session = info.openai_session.as_ref().unwrap(); + + assert!(session.is_stream, "should be detected as stream"); + assert!( + matches!( + session.usage_status, + crate::flow_generator::protocol_logs::openai_api::UsageStatus::Available + ), + "usage_status should be Available (inline usage in every chunk), got {:?}", + session.usage_status, + ); + let usage = session.usage.as_ref().expect("usage absent"); + assert!(usage.input_tokens > 0, "input_tokens should be > 0"); + assert!(usage.output_tokens > 0, "output_tokens should be > 0"); + assert!( + session.stream_event_count > 0, + "stream_event_count should be > 0" + ); + assert!(session.stream_completed, "stream should be completed"); + } + + #[test] + fn test_openai_stream_pcap() { + let config = openai_test_config(); + let results = run_openai_pcap("openai_stream.pcap", &config); + + // The stream pcap without explicit usage may still produce a result. + // Just verify parsing doesn't panic and produces reasonable output. + // If there are results, verify stream is detected. + for info in &results { + if let Some(session) = &info.openai_session { + // If detected as openai, it should at least be kind=ChatCompletions. + assert!( + matches!( + session.kind, + crate::flow_generator::protocol_logs::openai_api::OpenAIKind::ChatCompletions + ), + "kind should be ChatCompletions" + ); + } + } + } + + /// Test that OpenAI metrics are correctly propagated into the L7ProtocolSendLog. + #[test] + fn test_openai_metrics_in_send_log() { + use crate::flow_generator::protocol_logs::openai_api::{ + OpenAIKind, OpenAISession, OpenAIUsage, UsageStatus, + }; + + // request_ts_us = 0, stream_end_ts_us = 5_000_000 µs → 5000 ms total. + let mut session = Box::new(OpenAISession::new( + OpenAIKind::ChatCompletions, + true, + 0, + 131072, + &Default::default(), + )); + session.usage = Some(OpenAIUsage { + input_tokens: 100, + output_tokens: 50, + total_tokens: 150, + cached_tokens: None, + }); + session.usage_status = UsageStatus::Available; + session.first_output_ts_us = Some(100_000); + session.last_output_ts_us = Some(600_000); + session.stream_end_ts_us = Some(5_000_000); // 5 s after request + session.stream_event_count = 5; + session.stream_completed = true; + session.biz_user_id = Some("test-user".to_string()); + session.biz_app_id = Some("test-app".to_string()); + + let info = HttpInfo { + proto: L7Protocol::Http1, + msg_type: LogMessageType::Session, + openai_session: Some(session), + ..Default::default() + }; + + let send_log: L7ProtocolSendLog = info.into(); + let ext = send_log.ext_info.expect("ext_info absent"); + + // protocol_str should be "openai-api" + assert_eq!( + ext.protocol_str.as_deref(), + Some("openai-api"), + "protocol_str should be openai-api" + ); + + // Attributes should contain biz_user_id and biz_app_id. + let attrs = ext.attributes.expect("attributes absent"); + let attr_map: std::collections::HashMap<_, _> = attrs + .iter() + .map(|kv| (kv.key.as_str(), kv.val.as_str())) + .collect(); + assert_eq!(attr_map.get("biz_user_id"), Some(&"test-user")); + assert_eq!(attr_map.get("biz_app_id"), Some(&"test-app")); + + // Metrics should contain token counts. + let metrics = ext.metrics.expect("metrics absent"); + let metric_map: std::collections::HashMap<_, _> = + metrics.iter().map(|kv| (kv.key.as_str(), kv.val)).collect(); + assert_eq!(metric_map.get("llm_input_tokens"), Some(&100.0f32)); + assert_eq!(metric_map.get("llm_output_tokens"), Some(&50.0f32)); + assert_eq!(metric_map.get("llm_total_tokens"), Some(&150.0f32)); + + // TTFT and TPOT should be present. + assert!(metric_map.contains_key("llm_ttft_us"), "ttft missing"); + assert!(metric_map.contains_key("llm_tpot_us"), "tpot missing"); + + // Total stream duration: 5_000_000 µs - 0 µs = 5_000_000 µs. + let total_us = metric_map + .get("llm_total_stream_us") + .copied() + .expect("llm_total_stream_us missing"); + assert!( + (total_us - 5_000_000.0).abs() < 1.0, + "expected ~5_000_000 µs total stream, got {total_us}" + ); + } } diff --git a/agent/src/flow_generator/protocol_logs/openai_api.rs b/agent/src/flow_generator/protocol_logs/openai_api.rs new file mode 100644 index 00000000000..19f7d40c5a3 --- /dev/null +++ b/agent/src/flow_generator/protocol_logs/openai_api.rs @@ -0,0 +1,1265 @@ +/* + * Copyright (c) 2024 Yunshan Networks + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +//! OpenAI API business sub-protocol enhancement layer. +//! +//! This module implements incremental recognition, field extraction, SSE state machine, +//! and metric calculation (TTFT/TPOT/tokens) on top of the existing HTTP1/HTTP2 parser. +//! It does NOT introduce a new L7Protocol; the `biz_protocol` in the final log is set +//! to "openai-api" while the native L7 protocol remains HTTP1 or HTTP2. + +use std::sync::atomic::{AtomicU32, Ordering}; + +use serde_json::Value; + +use crate::config::config::OpenAIUsageFieldPaths; +use crate::config::handler::LogParserConfig; +use crate::flow_generator::protocol_logs::pb_adapter::{KeyVal, MetricKeyVal}; + +// ─── synthetic stream-id counter (per-process, wraps around) ─────────────── +static OPENAI_SESSION_COUNTER: AtomicU32 = AtomicU32::new(1); + +fn next_openai_session_id() -> u32 { + OPENAI_SESSION_COUNTER.fetch_add(1, Ordering::Relaxed) +} + +// ─── Public constants ─────────────────────────────────────────────────────── +pub const BIZ_PROTOCOL: &str = "openai-api"; + +// Attribute names +pub const ATTR_API_KIND: &str = "openai_api_kind"; +pub const ATTR_STREAM: &str = "openai_stream"; +pub const ATTR_USAGE_STATUS: &str = "openai_usage_status"; +pub const ATTR_STREAM_COMPLETE: &str = "llm_stream_complete"; +pub const ATTR_ABORT_REASON: &str = "llm_abort_reason"; +pub const ATTR_BIZ_ORG_PATH: &str = "biz_org_path"; +pub const ATTR_BIZ_USER_ID: &str = "biz_user_id"; +pub const ATTR_BIZ_APP_ID: &str = "biz_app_id"; + +// Metric names +pub const METRIC_REQUEST: &str = "llm_request"; +pub const METRIC_STREAM_REQUEST: &str = "llm_stream_request"; +pub const METRIC_TTFT_US: &str = "llm_ttft_us"; +pub const METRIC_TPOT_US: &str = "llm_tpot_us"; +pub const METRIC_INPUT_TOKENS: &str = "llm_input_tokens"; +pub const METRIC_OUTPUT_TOKENS: &str = "llm_output_tokens"; +pub const METRIC_TOTAL_TOKENS: &str = "llm_total_tokens"; +pub const METRIC_CACHED_TOKENS: &str = "llm_cached_tokens"; +pub const METRIC_STREAM_EVENT_COUNT: &str = "llm_stream_event_count"; +pub const METRIC_TOTAL_STREAM_US: &str = "llm_total_stream_us"; + +// ─── Enumerations ────────────────────────────────────────────────────────── + +#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)] +pub enum OpenAIKind { + #[default] + Unknown, + ChatCompletions, + Responses, +} + +impl OpenAIKind { + pub fn as_str(self) -> &'static str { + match self { + OpenAIKind::ChatCompletions => "chat_completions", + OpenAIKind::Responses => "responses", + OpenAIKind::Unknown => "unknown", + } + } +} + +#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)] +pub enum UsageStatus { + #[default] + Unknown, + Available, + Missing, + NotRequested, + StreamInterrupted, +} + +impl UsageStatus { + pub fn as_str(self) -> &'static str { + match self { + UsageStatus::Available => "available", + UsageStatus::Missing => "missing", + UsageStatus::NotRequested => "not_requested", + UsageStatus::StreamInterrupted => "stream_interrupted", + UsageStatus::Unknown => "unknown", + } + } +} + +// ─── Token usage ───────────────────────────────────────────────────────── + +#[derive(Clone, Debug, Default)] +pub struct OpenAIUsage { + pub input_tokens: u64, + pub output_tokens: u64, + pub total_tokens: u64, + pub cached_tokens: Option, +} + +// ─── Pre-compiled usage path pointers ──────────────────────────────────── + +/// Dot-separated paths compiled once into JSON Pointer strings (`/a/b/c`). +/// Stored in `OpenAISession` so the hot SSE path does zero allocation. +#[derive(Clone, Debug)] +pub struct CompiledUsagePaths { + pub input_tokens: Vec, + pub output_tokens: Vec, + pub total_tokens: Vec, + pub cached_tokens: Vec, +} + +impl CompiledUsagePaths { + pub fn from_config(cfg: &OpenAIUsageFieldPaths) -> Self { + let compile = |paths: &[String]| -> Vec { + paths.iter().map(|p| dot_path_to_pointer(p)).collect() + }; + Self { + input_tokens: compile(&cfg.input_tokens), + output_tokens: compile(&cfg.output_tokens), + total_tokens: compile(&cfg.total_tokens), + cached_tokens: compile(&cfg.cached_tokens), + } + } +} + +/// Compile a dot-separated path into a JSON Pointer string (`/a/b/c`). +/// Escapes `~` → `~0` and `/` → `~1` as required by RFC 6901. +fn dot_path_to_pointer(path: &str) -> String { + let mut ptr = String::with_capacity(path.len() + 1); + ptr.push('/'); + for ch in path.chars() { + match ch { + '.' => ptr.push('/'), + '~' => ptr.push_str("~0"), + '/' => ptr.push_str("~1"), + c => ptr.push(c), + } + } + ptr +} + +/// Lookup a u64 using pre-compiled JSON Pointer strings. Zero allocation. +#[inline] +fn extract_u64_by_ptrs(json: &Value, ptrs: &[String]) -> Option { + for ptr in ptrs { + if let Some(n) = json.pointer(ptr.as_str()).and_then(|v| v.as_u64()) { + return Some(n); + } + } + None +} + +/// Parse token usage from the top-level JSON using pre-compiled pointers. +pub fn parse_usage_from_json(json: &Value, ptrs: &CompiledUsagePaths) -> Option { + let input_tokens = extract_u64_by_ptrs(json, &ptrs.input_tokens)?; + let output_tokens = extract_u64_by_ptrs(json, &ptrs.output_tokens)?; + let total_tokens = + extract_u64_by_ptrs(json, &ptrs.total_tokens).unwrap_or(input_tokens + output_tokens); + let cached_tokens = extract_u64_by_ptrs(json, &ptrs.cached_tokens); + Some(OpenAIUsage { + input_tokens, + output_tokens, + total_tokens, + cached_tokens, + }) +} + +/// Best-effort extraction of usage field values from raw (potentially truncated) +/// response bytes without full JSON parsing. +/// +/// Searches for patterns like `"prompt_tokens":18` in the raw byte slice. +/// Returns `None` if any required field is absent (e.g., body is truncated +/// before those bytes). Only supports simple leaf paths (the last path +/// component is the field name to search for). +fn extract_usage_raw(data: &[u8], ptrs: &CompiledUsagePaths) -> Option { + // Max field name length we expect (e.g., "completion_tokens" = 17 chars). + // Pattern is `"":` so max pattern len = 1 + 17 + 2 = 20 bytes. + const MAX_PATTERN: usize = 32; + let find_field = |field: &[u8]| -> Option { + let pattern_len = field.len() + 3; // '"' + field + '":' + if pattern_len > MAX_PATTERN { + return None; + } + let mut pattern = [0u8; MAX_PATTERN]; + pattern[0] = b'"'; + pattern[1..1 + field.len()].copy_from_slice(field); + pattern[1 + field.len()] = b'"'; + pattern[2 + field.len()] = b':'; + let pattern_slice = &pattern[..pattern_len]; + + let pos = data.windows(pattern_len).position(|w| w == pattern_slice)?; + let rest = &data[pos + pattern_len..]; + // Skip optional whitespace. + let start = rest + .iter() + .position(|&b| !b.is_ascii_whitespace()) + .unwrap_or(0); + let digits = &rest[start..]; + let end = digits + .iter() + .position(|b| !b.is_ascii_digit()) + .unwrap_or(digits.len()); + if end == 0 { + return None; + } + std::str::from_utf8(&digits[..end]) + .ok()? + .parse::() + .ok() + }; + + // Extract the leaf field name from the last configured path (e.g., + // "usage.prompt_tokens" → "prompt_tokens"). + let leaf = |paths: &[String]| -> Option { + for path in paths { + let field = path.rsplit('.').next().unwrap_or(path.as_str()); + if let Some(v) = find_field(field.as_bytes()) { + return Some(v); + } + } + None + }; + + let input_tokens = leaf(&ptrs.input_tokens)?; + let output_tokens = leaf(&ptrs.output_tokens)?; + let total_tokens = leaf(&ptrs.total_tokens).unwrap_or(input_tokens + output_tokens); + Some(OpenAIUsage { + input_tokens, + output_tokens, + total_tokens, + cached_tokens: None, + }) +} + +// ─── Per-session state (lives in HttpLog) ──────────────────────────────── + +/// State accumulated across multiple packets for one OpenAI streaming session. +#[derive(Clone, Debug)] +pub struct OpenAISession { + pub kind: OpenAIKind, + pub is_stream: bool, + /// True when the HTTP response uses `Transfer-Encoding: chunked`. + /// SSE data is then wrapped in chunk framing and must be decoded before + /// the SSE state machine can parse events. + pub is_chunked_transfer: bool, + /// Synthetic stream-id assigned to the session for multi-merge matching. + pub stream_id: u32, + + /// Request packet timestamp (microseconds). + pub request_ts_us: u64, + /// Timestamp when the SSE stream ended (microseconds). + /// `None` until the terminal event ([DONE] / response.completed) is received, + /// or until the non-streaming JSON response is parsed. + pub stream_end_ts_us: Option, + /// Timestamp of the first SSE output event (microseconds). + pub first_output_ts_us: Option, + /// Timestamp of the most recent SSE output event (microseconds). + pub last_output_ts_us: Option, + + pub stream_event_count: u32, + pub stream_completed: bool, + + pub usage: Option, + pub usage_status: UsageStatus, + + pub biz_org_path: Option, + pub biz_user_id: Option, + pub biz_app_id: Option, + + /// Partial SSE bytes not yet forming a complete event (any supported separator). + pub sse_buf: Vec, + pub sse_buf_overflowed: bool, + + pub abort_reason: Option, + + pub config_sse_max: usize, + /// Pre-compiled JSON Pointer strings for token extraction. Computed once at + /// session creation so the SSE hot path does zero allocation. + pub usage_ptrs: CompiledUsagePaths, + /// Scratch buffer reused across calls to decode HTTP chunked framing. + /// Avoids a fresh heap allocation per SSE continuation packet for chunked streams. + pub chunked_decode_buf: Vec, +} + +impl OpenAISession { + pub fn new( + kind: OpenAIKind, + is_stream: bool, + request_ts_us: u64, + sse_buffer_max_bytes: usize, + usage_paths: &OpenAIUsageFieldPaths, + ) -> Self { + Self { + kind, + is_stream, + is_chunked_transfer: false, + stream_id: next_openai_session_id(), + request_ts_us, + stream_end_ts_us: None, + first_output_ts_us: None, + last_output_ts_us: None, + stream_event_count: 0, + stream_completed: false, + usage: None, + usage_status: if is_stream { + UsageStatus::Missing + } else { + UsageStatus::Unknown + }, + biz_org_path: None, + biz_user_id: None, + biz_app_id: None, + sse_buf: Vec::new(), + sse_buf_overflowed: false, + abort_reason: None, + config_sse_max: sse_buffer_max_bytes, + usage_ptrs: CompiledUsagePaths::from_config(usage_paths), + chunked_decode_buf: Vec::new(), + } + } + + /// Feed raw bytes (from a streaming HTTP response chunk) into the SSE buffer + /// and process complete events. Returns `true` when the stream has ended. + pub fn feed_sse(&mut self, data: &[u8], packet_ts_us: u64) -> bool { + // Append to SSE buffer with overflow protection. + let available = self.config_sse_max.saturating_sub(self.sse_buf.len()); + let done = if available == 0 { + if !self.sse_buf_overflowed { + self.sse_buf_overflowed = true; + self.abort_reason = Some("sse_buffer_overflow".to_string()); + } + // Still try to scan for terminal events in the new data. + self.has_terminal_in(data) + } else { + let to_append = available.min(data.len()); + self.sse_buf.extend_from_slice(&data[..to_append]); + if to_append < data.len() { + self.sse_buf_overflowed = true; + self.abort_reason = Some("sse_buffer_overflow".to_string()); + } + self.drain_events(packet_ts_us) + }; + if done { + self.stream_end_ts_us.get_or_insert(packet_ts_us); + } + done + } + + /// Returns true if the stream has ended (terminal marker found in raw bytes). + /// Operates directly on bytes to avoid any allocation. + fn has_terminal_in(&self, data: &[u8]) -> bool { + // Both `data:[DONE]` (no space) and `data: [DONE]` (with space) are valid. + contains_bytes(data, b"data:[DONE]") + || contains_bytes(data, b"data: [DONE]") + || contains_bytes(data, b"\"response.completed\"") + } + + /// Drain all complete SSE events from the buffer. + /// Handles all four separator forms (`\n\n`, `\n\r\n`, `\r\n\n`, `\r\n\r\n`). + /// Returns `true` when the stream has ended. + fn drain_events(&mut self, packet_ts_us: u64) -> bool { + // Temporarily take the buffer so we can borrow slices from it while + // mutating the rest of `self` inside `process_sse_event`. This avoids + // per-event Vec allocations and reduces drain() calls to one. + let mut buf = std::mem::take(&mut self.sse_buf); + let mut cursor = 0; + let mut done = false; + + loop { + let Some((rel_end, sep_len)) = find_event_end(&buf[cursor..]) else { + break; + }; + let abs_end = cursor + rel_end; + done = self.process_sse_event(&buf[cursor..abs_end], packet_ts_us); + cursor = abs_end + sep_len; + if done { + break; + } + } + + // Put remaining (unprocessed) bytes back in one shot. + buf.drain(..cursor); + // Release excess capacity: if the buffer shrank to less than half its + // allocated capacity, shrink to avoid holding onto a large allocation + // for the rest of the stream after a burst of unprocessed data. + if buf.capacity() > 4096 && buf.len() < buf.capacity() / 2 { + buf.shrink_to_fit(); + } + self.sse_buf = buf; + done + } + + /// Process one complete SSE event. Returns `true` if this is the terminal event. + fn process_sse_event(&mut self, event_bytes: &[u8], packet_ts_us: u64) -> bool { + // Skip events with invalid UTF-8 silently. + let Ok(text) = std::str::from_utf8(event_bytes) else { + return false; + }; + + let mut event_type = ""; + let mut data_line = ""; + + for line in text.lines() { + if let Some(v) = line.strip_prefix("event:") { + event_type = v.trim(); + } else if let Some(v) = line.strip_prefix("data:") { + data_line = v.trim(); + } + } + + // Empty data line: SSE comment or keepalive (e.g. `: ping`). Nothing to parse. + if data_line.is_empty() { + return false; + } + + // Check for Chat Completions stream terminator. + if data_line == "[DONE]" { + self.stream_completed = true; + if self.usage.is_some() { + self.usage_status = UsageStatus::Available; + } + return true; + } + + // Parse JSON data payload. + let json: Value = match serde_json::from_str(data_line) { + Ok(v) => v, + Err(_) => return false, + }; + + // Check for Responses API completion event. + let is_responses_completed = event_type == "response.completed" + || json.get("type").and_then(|t| t.as_str()) == Some("response.completed"); + + if is_responses_completed { + if let Some(usage) = parse_usage_from_json(&json, &self.usage_ptrs) { + self.usage = Some(usage); + self.usage_status = UsageStatus::Available; + } + self.stream_completed = true; + return true; + } + + // Extract usage whenever it is present. This covers both the dedicated + // Chat Completions usage chunk (choices=[]) and providers that embed + // usage in every content chunk. + if let Some(usage) = parse_usage_from_json(&json, &self.usage_ptrs) { + self.usage = Some(usage); + self.usage_status = UsageStatus::Available; + } + + // Usage-only chunk (choices=[]): not a terminal event; [DONE] follows. + if json + .get("choices") + .and_then(|v| v.as_array()) + .map(|a| a.is_empty()) + .unwrap_or(false) + { + return false; + } + + // Check if this is a valid output event. + let is_output_event = match self.kind { + OpenAIKind::ChatCompletions => { + // choices[0].delta.content non-empty + json.pointer("/choices/0/delta/content") + .and_then(|c| c.as_str()) + .map(|s| !s.is_empty()) + .unwrap_or(false) + } + OpenAIKind::Responses => { + (event_type == "response.output_text.delta" + && json + .pointer("/delta") + .and_then(|d| d.as_str()) + .map(|s| !s.is_empty()) + .unwrap_or(false)) + || event_type == "response.output_item.done" + } + OpenAIKind::Unknown => false, + }; + + if is_output_event { + if self.first_output_ts_us.is_none() { + self.first_output_ts_us = Some(packet_ts_us); + } + self.last_output_ts_us = Some(packet_ts_us); + self.stream_event_count = self.stream_event_count.saturating_add(1); + } + + false + } + + /// Compute final TTFT and TPOT from the accumulated state. + /// Returns `(ttft_us, tpot_us)` in microseconds. + /// + /// **Streaming**: TTFT = time to first SSE content event; TPOT = inter-event + /// span divided by (output_tokens − 1). + /// + /// **Non-streaming**: TTFT = response latency (request → response received); + /// TPOT = response latency / output_tokens. + pub fn compute_timings(&self) -> (Option, Option) { + if self.is_stream { + let ttft_us = self + .first_output_ts_us + .map(|first| first.saturating_sub(self.request_ts_us) as f64); + + let tpot_us = match ( + self.first_output_ts_us, + self.last_output_ts_us, + self.usage.as_ref(), + ) { + (Some(first), Some(last), Some(usage)) if usage.output_tokens > 0 => { + let span_us = last.saturating_sub(first); + let divisor = usage.output_tokens.saturating_sub(1).max(1); + Some(span_us as f64 / divisor as f64) + } + _ => None, + }; + + (ttft_us, tpot_us) + } else { + // Non-streaming: all tokens arrive with the response body. + // TTFT = response latency. TPOT = latency / output_tokens. + let Some(end_ts) = self.stream_end_ts_us else { + return (None, None); + }; + let ttft_us = end_ts.saturating_sub(self.request_ts_us) as f64; + let tpot_us = self.usage.as_ref().and_then(|u| { + if u.output_tokens > 0 { + Some(ttft_us / u.output_tokens as f64) + } else { + None + } + }); + (Some(ttft_us), tpot_us) + } + } + + /// Write computed attributes and metrics into the provided vectors. + pub fn populate_log(&self, attrs: &mut Vec, metrics: &mut Vec) { + // Attributes. + push_attr(attrs, ATTR_API_KIND, self.kind.as_str()); + push_attr( + attrs, + ATTR_STREAM, + if self.is_stream { "true" } else { "false" }, + ); + push_attr(attrs, ATTR_USAGE_STATUS, self.usage_status.as_str()); + + if self.is_stream { + push_attr( + attrs, + ATTR_STREAM_COMPLETE, + if self.stream_completed { + "true" + } else { + "false" + }, + ); + if let Some(reason) = &self.abort_reason { + push_attr(attrs, ATTR_ABORT_REASON, reason); + } + } + + if let Some(v) = &self.biz_org_path { + push_attr(attrs, ATTR_BIZ_ORG_PATH, v); + } + if let Some(v) = &self.biz_user_id { + push_attr(attrs, ATTR_BIZ_USER_ID, v); + } + if let Some(v) = &self.biz_app_id { + push_attr(attrs, ATTR_BIZ_APP_ID, v); + } + + // Metrics. + push_metric(metrics, METRIC_REQUEST, 1.0); + push_metric( + metrics, + METRIC_STREAM_REQUEST, + if self.is_stream { 1.0 } else { 0.0 }, + ); + + if self.is_stream && self.stream_event_count > 0 { + push_metric( + metrics, + METRIC_STREAM_EVENT_COUNT, + self.stream_event_count as f64, + ); + } + + let (ttft_us, tpot_us) = self.compute_timings(); + if let Some(v) = ttft_us { + push_metric(metrics, METRIC_TTFT_US, v); + } + if let Some(v) = tpot_us { + push_metric(metrics, METRIC_TPOT_US, v); + } + + if let Some(usage) = &self.usage { + push_metric(metrics, METRIC_INPUT_TOKENS, usage.input_tokens as f64); + push_metric(metrics, METRIC_OUTPUT_TOKENS, usage.output_tokens as f64); + push_metric(metrics, METRIC_TOTAL_TOKENS, usage.total_tokens as f64); + if let Some(cached) = usage.cached_tokens { + push_metric(metrics, METRIC_CACHED_TOKENS, cached as f64); + } + } + + // Total request-to-completion duration in microseconds. + // For streaming: request → final [DONE] event (full stream wall-clock time). + // For non-streaming: request → response body received (same as llm_ttft_us). + if let Some(end_ts) = self.stream_end_ts_us { + let total_us = end_ts.saturating_sub(self.request_ts_us) as f64; + push_metric(metrics, METRIC_TOTAL_STREAM_US, total_us); + } + } +} + +// ─── Request parsing helpers ────────────────────────────────────────────── + +/// Check whether a request path matches the configured OpenAI API paths. +/// +/// Matching rules (OR logic): +/// - The path matches if it starts with **any** entry in `path_prefixes` +/// **or** ends with **any** entry in `path_suffixes`. +/// - An empty list means that group contributes nothing to the match. +/// - If **both** lists are empty, no path matches (explicit configuration required). +pub fn is_openai_path(path: &str, config: &LogParserConfig) -> bool { + if !config.openai_api.enabled { + return false; + } + let cfg = &config.openai_api; + // Both empty → no explicit paths configured, match nothing. + if cfg.path_prefixes.is_empty() && cfg.path_suffixes.is_empty() { + return false; + } + let matches_prefix = cfg + .path_prefixes + .iter() + .any(|p| path.starts_with(p.as_str())); + let matches_suffix = cfg.path_suffixes.iter().any(|s| path.ends_with(s.as_str())); + matches_prefix || matches_suffix +} + +/// Determine the API kind from the path. +pub fn kind_from_path(path: &str) -> OpenAIKind { + if path.contains("/chat/completions") { + OpenAIKind::ChatCompletions + } else if path.contains("/responses") { + OpenAIKind::Responses + } else { + OpenAIKind::Unknown + } +} + +/// Extract business dimensions from HTTP request headers. +/// `key` is the raw (mixed-case) header name; comparison is case-insensitive. +pub fn extract_biz_from_header( + session: &mut OpenAISession, + key: &str, + val: &str, + config: &LogParserConfig, +) { + // Short-circuit once all dims are populated — avoids iterating extractor + // lists for every remaining header in a request with many headers. + if session.biz_org_path.is_some() + && session.biz_user_id.is_some() + && session.biz_app_id.is_some() + { + return; + } + + let extractors = &config.openai_api.biz_dimension_extractors; + + if session.biz_org_path.is_none() + && extractors + .org_path + .headers + .iter() + .any(|h| h.eq_ignore_ascii_case(key)) + { + session.biz_org_path = Some(val.to_string()); + } + if session.biz_user_id.is_none() + && extractors + .user_id + .headers + .iter() + .any(|h| h.eq_ignore_ascii_case(key)) + { + session.biz_user_id = Some(val.to_string()); + } + if session.biz_app_id.is_none() + && extractors + .app_id + .headers + .iter() + .any(|h| h.eq_ignore_ascii_case(key)) + { + session.biz_app_id = Some(val.to_string()); + } +} + +/// Parse an OpenAI request JSON body and update the session: +/// - Extract `stream` field +/// - Extract business dimensions via json_paths +/// +/// When the body is a TCP continuation segment (partial JSON), full parsing +/// fails. In that case a raw byte search is used to detect `"stream": true` +/// so streaming is correctly identified even when the request spans multiple +/// TCP segments and the `stream` key is not in the first segment. +pub fn parse_request_body(session: &mut OpenAISession, body: &[u8], config: &LogParserConfig) { + let limit = config.openai_api.request_body_max_bytes; + let slice = if body.len() > limit { + &body[..limit] + } else { + body + }; + + let Ok(json) = serde_json::from_slice::(slice) else { + // Partial body segment: fall back to a byte search for the stream flag. + // This handles cases where "stream": true lives in a later TCP segment + // of a multi-segment POST body. + if !session.is_stream { + if contains_bytes(slice, b"\"stream\":true") + || contains_bytes(slice, b"\"stream\": true") + { + session.is_stream = true; + // Upgrade Unknown → Missing: we now know this is a streaming + // request, so usage is expected but not yet seen. + if session.usage_status == UsageStatus::Unknown { + session.usage_status = UsageStatus::Missing; + } + } + } + return; + }; + + // stream flag + if let Some(stream) = json.get("stream").and_then(|v| v.as_bool()) { + session.is_stream = stream; + if stream && session.usage_status == UsageStatus::Unknown { + session.usage_status = UsageStatus::Missing; + } + } + + let extractors = &config.openai_api.biz_dimension_extractors; + + if session.biz_org_path.is_none() { + session.biz_org_path = extract_json_paths(&json, &extractors.org_path.json_paths); + } + if session.biz_user_id.is_none() { + session.biz_user_id = extract_json_paths(&json, &extractors.user_id.json_paths); + } + if session.biz_app_id.is_none() { + session.biz_app_id = extract_json_paths(&json, &extractors.app_id.json_paths); + } +} + +/// Extract the first matching non-empty string value from a list of dot-notation JSON paths. +/// Supports arbitrary depth: "a.b.c" → json pointer "/a/b/c". +fn extract_json_paths(json: &Value, paths: &[String]) -> Option { + for path in paths { + let ptr = dot_path_to_pointer(path); + if let Some(s) = json.pointer(&ptr).and_then(|v| v.as_str()) { + if !s.is_empty() { + return Some(s.to_string()); + } + } + } + None +} + +/// Parse a non-streaming (JSON) response body and extract token usage. +/// +/// If JSON parsing fails (most commonly because the body is truncated by +/// `l7_log_packet_size`), the usage fields are searched for directly in the +/// raw bytes as a best-effort fallback. Usage may still be unavailable when +/// the fields are in the portion of the body beyond the capture limit. +pub fn parse_response_json(session: &mut OpenAISession, body: &[u8], config: &LogParserConfig) { + let limit = config.openai_api.response_event_max_bytes; + let slice = if body.len() > limit { + &body[..limit] + } else { + body + }; + + let Ok(json) = serde_json::from_slice::(slice) else { + // JSON parse failed — body is likely truncated by l7_log_packet_size. + // Try a best-effort raw-byte extraction of the usage fields. This + // succeeds only when the fields happen to fall within the captured bytes. + if let Some(usage) = extract_usage_raw(slice, &session.usage_ptrs) { + session.usage = Some(usage); + session.usage_status = UsageStatus::Available; + } else { + session.usage_status = UsageStatus::Missing; + } + return; + }; + + if let Some(usage) = parse_usage_from_json(&json, &session.usage_ptrs) { + session.usage = Some(usage); + session.usage_status = UsageStatus::Available; + } else { + session.usage_status = UsageStatus::Missing; + } +} + +// ─── Chunked transfer encoding decoder ─────────────────────────────────── + +/// Strip HTTP chunked-transfer-encoding framing from `payload`, writing the +/// decoded bytes into `out` (which is cleared first so the caller's buffer +/// capacity is reused across calls — zero extra allocation per invocation). +/// +/// Returns `true` when the HTTP terminal chunk (`0\r\n\r\n`) is found, +/// signalling stream end. Returns `false` otherwise (normal chunk or partial +/// packet). `out` may contain decoded data even when `true` is returned — +/// callers must feed it before acting on the terminal signal. +/// +/// If the payload does not look like valid chunk headers the remaining bytes +/// are appended to `out` unchanged so the SSE parser can still attempt to +/// process them. +pub fn decode_chunked_sse_into(payload: &[u8], out: &mut Vec) -> bool { + out.clear(); + let mut pos = 0; + + while pos < payload.len() { + // Find the end of the chunk-size line (terminated by \r\n). + let Some(crlf) = payload[pos..].windows(2).position(|w| w == b"\r\n") else { + // No \r\n — treat the rest as raw data (e.g., partial packet). + out.extend_from_slice(&payload[pos..]); + break; + }; + + let size_bytes = &payload[pos..pos + crlf]; + let Ok(size_str) = std::str::from_utf8(size_bytes) else { + out.extend_from_slice(&payload[pos..]); + break; + }; + // Ignore chunk extensions (after ';'). + let size_str = size_str + .find(';') + .map(|i| &size_str[..i]) + .unwrap_or(size_str) + .trim(); + + let Ok(chunk_size) = usize::from_str_radix(size_str, 16) else { + // Not a valid hex size — not chunked encoding; copy as-is. + out.extend_from_slice(&payload[pos..]); + break; + }; + + if chunk_size == 0 { + return true; // Terminal chunk — HTTP stream has ended. + } + + pos += crlf + 2; // Skip the chunk-size line including \r\n. + + // Append chunk data (handle partial last chunk). + // saturating_add guards against crafted chunk sizes near usize::MAX. + let data_end = pos.saturating_add(chunk_size).min(payload.len()); + out.extend_from_slice(&payload[pos..data_end]); + pos = data_end; + + // Skip the chunk's trailing \r\n terminator (may be absent if partial). + if pos + 2 <= payload.len() && payload[pos] == b'\r' && payload[pos + 1] == b'\n' { + pos += 2; + } + } + + false +} + +/// Allocating wrapper around `decode_chunked_sse_into` for call sites that +/// cannot pass a reusable buffer (e.g., tests). Returns `None` when the +/// terminal chunk is found, `Some(decoded)` otherwise. +pub fn decode_chunked_sse(payload: &[u8]) -> Option> { + let mut out = Vec::with_capacity(payload.len()); + if decode_chunked_sse_into(payload, &mut out) { + None + } else { + Some(out) + } +} + +// ─── SSE helper ─────────────────────────────────────────────────────────── + +/// Find the end of the first complete SSE event in `buf`. +/// +/// All four separator forms allowed by the SSE spec share the invariant that +/// a `\n` is immediately followed by either another `\n` (blank LF line) or +/// `\r\n` (blank CRLF line). Scanning for that covers `\n\n` (2 B), +/// `\n\r\n` / `\r\n\n` (3 B), and `\r\n\r\n` (4 B via the inner `\n`). +/// +/// Returns `(end, sep_len)` where `buf[..end]` is the event content and +/// `buf[end..end+sep_len]` is the separator. Any trailing `\r` left in the +/// event content by a CRLF line ending is stripped by `.lines()` in +/// `process_sse_event`. +#[inline] +fn find_event_end(buf: &[u8]) -> Option<(usize, usize)> { + let len = buf.len(); + if len < 2 { + return None; + } + for i in 0..len - 1 { + if buf[i] != b'\n' { + continue; + } + if buf[i + 1] == b'\n' { + return Some((i, 2)); + } + if i + 2 < len && buf[i + 1] == b'\r' && buf[i + 2] == b'\n' { + return Some((i, 3)); + } + } + None +} + +/// Byte-level substring search. Zero allocation. +#[inline] +fn contains_bytes(haystack: &[u8], needle: &[u8]) -> bool { + !needle.is_empty() && haystack.windows(needle.len()).any(|w| w == needle) +} + +// ─── Attribute/metric push helpers ──────────────────────────────────────── + +fn push_attr(attrs: &mut Vec, key: &str, val: &str) { + attrs.push(KeyVal { + key: key.to_string(), + val: val.to_string(), + }); +} + +fn push_metric(metrics: &mut Vec, key: &str, val: f64) { + metrics.push(MetricKeyVal { + key: key.to_string(), + val: val as f32, + }); +} + +// ─── Unit tests ───────────────────────────────────────────────────────── + +#[cfg(test)] +mod tests { + use super::*; + + fn make_session(kind: OpenAIKind, is_stream: bool) -> OpenAISession { + OpenAISession::new(kind, is_stream, 1_000_000, 131072, &Default::default()) + } + + #[test] + fn test_chat_completions_sse_ttft_tpot() { + let mut s = make_session(OpenAIKind::ChatCompletions, true); + s.request_ts_us = 0; + + // First output event at t=100ms (1 token). + let chunk1 = b"data: {\"choices\":[{\"delta\":{\"content\":\"Hello\"}}]}\n\n"; + let done = s.feed_sse(chunk1, 100_000); + assert!(!done); + assert_eq!(s.stream_event_count, 1); + assert_eq!(s.first_output_ts_us, Some(100_000)); + assert_eq!(s.last_output_ts_us, Some(100_000)); + + // More content tokens at t=500ms (4 more tokens = 5 total output tokens), + // then usage chunk and DONE – all in the same feed call. + let chunk2 = concat!( + "data: {\"choices\":[{\"delta\":{\"content\":\" w\"}}]}\n\n", + "data: {\"choices\":[{\"delta\":{\"content\":\"or\"}}]}\n\n", + "data: {\"choices\":[{\"delta\":{\"content\":\"ld\"}}]}\n\n", + "data: {\"choices\":[{\"delta\":{\"content\":\"!\"}}]}\n\n", + "data: {\"choices\":[],\"usage\":{\"prompt_tokens\":10,\"completion_tokens\":5,\"total_tokens\":15}}\n\n", + "data: [DONE]\n\n", + ); + let done = s.feed_sse(chunk2.as_bytes(), 500_000); + assert!(done); + assert!(s.stream_completed); + assert_eq!(s.usage_status, UsageStatus::Available); + let usage = s.usage.as_ref().unwrap(); + assert_eq!(usage.input_tokens, 10); + assert_eq!(usage.output_tokens, 5); + assert_eq!(s.stream_event_count, 5); // 1 + 4 + assert_eq!(s.last_output_ts_us, Some(500_000)); + + let (ttft, tpot) = s.compute_timings(); + // ttft = (100_000 - 0) = 100_000 µs + assert!((ttft.unwrap() - 100_000.0).abs() < 1.0); + // tpot = (500_000 - 100_000) / max(5-1, 1) = 400_000 / 4 = 100_000 µs + assert!((tpot.unwrap() - 100_000.0).abs() < 1.0); + } + + #[test] + fn test_responses_api_sse_completed() { + // The Responses API `response.completed` event embeds usage under + // `response.usage.*`, so we configure paths accordingly. + use crate::config::config::OpenAIUsageFieldPaths; + let paths = OpenAIUsageFieldPaths { + input_tokens: vec!["response.usage.input_tokens".to_string()], + output_tokens: vec!["response.usage.output_tokens".to_string()], + total_tokens: vec!["response.usage.total_tokens".to_string()], + ..Default::default() + }; + let mut s = OpenAISession::new(OpenAIKind::Responses, true, 0, 131072, &paths); + + let chunk = b"event: response.output_text.delta\ndata: {\"delta\":\"Hi\"}\n\nevent: response.completed\ndata: {\"type\":\"response.completed\",\"response\":{\"usage\":{\"input_tokens\":5,\"output_tokens\":3,\"total_tokens\":8}}}\n\n"; + let done = s.feed_sse(chunk, 200_000); + assert!(done); + assert!(s.stream_completed); + assert_eq!(s.usage_status, UsageStatus::Available); + let usage = s.usage.as_ref().unwrap(); + assert_eq!(usage.input_tokens, 5); + assert_eq!(usage.output_tokens, 3); + assert_eq!(s.stream_event_count, 1); + } + + #[test] + fn test_non_streaming_usage_extraction() { + let mut s = make_session(OpenAIKind::ChatCompletions, false); + let body = br#"{"usage":{"prompt_tokens":20,"completion_tokens":10,"total_tokens":30}}"#; + let config = crate::config::handler::LogParserConfig::default(); + parse_response_json(&mut s, body, &config); + assert_eq!(s.usage_status, UsageStatus::Available); + let usage = s.usage.as_ref().unwrap(); + assert_eq!(usage.input_tokens, 20); + assert_eq!(usage.output_tokens, 10); + assert_eq!(usage.total_tokens, 30); + } + + #[test] + fn test_non_streaming_timings() { + // Non-streaming: request at t=0, response at t=2_000_000 µs. + // TTFT = TOTAL = 2_000_000 µs, TPOT = 2_000_000 / 10 output_tokens = 200_000 µs. + let mut s = OpenAISession::new( + OpenAIKind::ChatCompletions, + false, + 0, + 131072, + &Default::default(), + ); + s.usage = Some(OpenAIUsage { + input_tokens: 20, + output_tokens: 10, + total_tokens: 30, + cached_tokens: None, + }); + s.usage_status = UsageStatus::Available; + s.stream_end_ts_us = Some(2_000_000); // 2 seconds in µs + + let (ttft, tpot) = s.compute_timings(); + assert!( + (ttft.unwrap() - 2_000_000.0).abs() < 1.0, + "ttft should be 2_000_000 µs, got {:?}", + ttft + ); + assert!( + (tpot.unwrap() - 200_000.0).abs() < 1.0, + "tpot should be 200_000 µs, got {:?}", + tpot + ); + + // Verify populate_log emits the metrics. + let mut attrs = Vec::new(); + let mut metrics = Vec::new(); + s.populate_log(&mut attrs, &mut metrics); + + let metric_map: std::collections::HashMap<_, _> = + metrics.iter().map(|kv| (kv.key.as_str(), kv.val)).collect(); + assert!(metric_map.contains_key("llm_ttft_us"), "ttft missing"); + assert!(metric_map.contains_key("llm_tpot_us"), "tpot missing"); + assert!( + metric_map.contains_key("llm_total_stream_us"), + "total_stream_us missing" + ); + assert!((metric_map["llm_ttft_us"] - 2_000_000.0).abs() < 1.0); + assert!((metric_map["llm_tpot_us"] - 200_000.0).abs() < 1.0); + assert!((metric_map["llm_total_stream_us"] - 2_000_000.0).abs() < 1.0); + } + + fn make_config_with_json_paths() -> crate::config::handler::LogParserConfig { + use crate::config::config::{ + OpenAIApiConfig, OpenAIBizDimExtractor, OpenAIBizDimExtractors, OpenAIUsageFieldPaths, + }; + crate::config::handler::LogParserConfig { + openai_api: OpenAIApiConfig { + enabled: true, + path_prefixes: vec!["/v1/chat/completions".to_string()], + path_suffixes: vec![], + request_body_max_bytes: 65536, + response_event_max_bytes: 32768, + sse_buffer_max_bytes: 131072, + usage_field_paths: OpenAIUsageFieldPaths::default(), + biz_dimension_extractors: OpenAIBizDimExtractors { + org_path: OpenAIBizDimExtractor { + headers: vec![], + json_paths: vec![ + "metadata.org_path".to_string(), + "metadata.department_path".to_string(), + ], + }, + user_id: OpenAIBizDimExtractor { + headers: vec![], + json_paths: vec![ + "safety_identifier".to_string(), + "user".to_string(), + "metadata.user_id".to_string(), + ], + }, + app_id: OpenAIBizDimExtractor { + headers: vec![], + json_paths: vec![ + "metadata.app_id".to_string(), + "metadata.application_id".to_string(), + ], + }, + }, + }, + ..Default::default() + } + } + + #[test] + fn test_biz_dims_from_json() { + let mut s = make_session(OpenAIKind::ChatCompletions, false); + let config = make_config_with_json_paths(); + let body = + br#"{"user":"alice","safety_identifier":"si-001","metadata":{"org_path":"/root/eng","app_id":"app-42"}}"#; + parse_request_body(&mut s, body, &config); + // user_id: "safety_identifier" has priority over "user" + assert_eq!(s.biz_user_id.as_deref(), Some("si-001")); + assert_eq!(s.biz_org_path.as_deref(), Some("/root/eng")); + assert_eq!(s.biz_app_id.as_deref(), Some("app-42")); + } + + #[test] + fn test_biz_dims_three_level_json_path() { + // Regression test: paths deeper than 2 levels must work. + use crate::config::config::{ + OpenAIApiConfig, OpenAIBizDimExtractor, OpenAIBizDimExtractors, OpenAIUsageFieldPaths, + }; + let config = crate::config::handler::LogParserConfig { + openai_api: OpenAIApiConfig { + enabled: true, + path_prefixes: vec!["/v1/chat/completions".to_string()], + path_suffixes: vec![], + request_body_max_bytes: 65536, + response_event_max_bytes: 32768, + sse_buffer_max_bytes: 131072, + usage_field_paths: OpenAIUsageFieldPaths::default(), + biz_dimension_extractors: OpenAIBizDimExtractors { + org_path: OpenAIBizDimExtractor { + headers: vec![], + json_paths: vec!["context.meta.org".to_string()], + }, + user_id: OpenAIBizDimExtractor { + headers: vec![], + json_paths: vec![], + }, + app_id: OpenAIBizDimExtractor { + headers: vec![], + json_paths: vec![], + }, + }, + }, + ..Default::default() + }; + let mut s = make_session(OpenAIKind::ChatCompletions, false); + let body = br#"{"context":{"meta":{"org":"/root/deep"}}}"#; + parse_request_body(&mut s, body, &config); + assert_eq!(s.biz_org_path.as_deref(), Some("/root/deep")); + } + + #[test] + fn test_biz_dims_user_fallback() { + let mut s = make_session(OpenAIKind::ChatCompletions, false); + let config = make_config_with_json_paths(); + let body = br#"{"user":"bob","metadata":{"org_path":"/root/sales"}}"#; + parse_request_body(&mut s, body, &config); + // No safety_identifier, fall back to "user" + assert_eq!(s.biz_user_id.as_deref(), Some("bob")); + assert_eq!(s.biz_org_path.as_deref(), Some("/root/sales")); + } + + #[test] + fn test_sse_across_chunks() { + let mut s = make_session(OpenAIKind::ChatCompletions, true); + s.request_ts_us = 0; + + // Event split across two feed calls. + let part1 = b"data: {\"choices\":[{\"delta\":{\"content\":\"He"; + let done = s.feed_sse(part1, 50_000); + assert!(!done); + assert_eq!(s.stream_event_count, 0); + + let part2 = b"llo\"}}]}\n\ndata: [DONE]\n\n"; + let done = s.feed_sse(part2, 100_000); + assert!(done); + assert_eq!(s.stream_event_count, 1); + assert_eq!(s.first_output_ts_us, Some(100_000)); + } + + /// Verify that `find_event_end` and `feed_sse` handle the `\n\r\n` SSE separator. + /// + /// Some servers (e.g., `openai_stream_v537.pcap`) deliver SSE over HTTP + /// chunked transfer where each chunk only carries part of an SSE event: + /// - Chunk 1: `data:` + /// - Chunk 2: `{json}\n` (single LF from SSE data line) + /// - Chunk 3: `\r\n` (CRLF blank line = event separator) + /// + /// After `decode_chunked_sse_into` concatenates the chunks, the buffer + /// looks like `data:{json}\n\r\n`. This test verifies the parser handles + /// that `\n\r\n` separator correctly. + #[test] + fn test_sse_crlf_event_separator() { + let mut s = make_session(OpenAIKind::ChatCompletions, true); + s.request_ts_us = 0; + + // Simulate the decoded content from a chunked SSE packet in v537 format: + // data:{json}\n ← SSE data line (LF) + // \r\n ← CRLF blank line (event separator) + let event = b"data:{\"choices\":[{\"delta\":{\"content\":\"Hi\"},\"finish_reason\":null}],\"usage\":{\"prompt_tokens\":10,\"completion_tokens\":1,\"total_tokens\":11}}\n\r\n"; + let done = s.feed_sse(event, 100_000); + assert!(!done); + assert_eq!(s.stream_event_count, 1, "should detect output event"); + assert_eq!( + s.usage_status, + UsageStatus::Available, + "inline usage should be extracted" + ); + let usage = s.usage.as_ref().unwrap(); + assert_eq!(usage.input_tokens, 10); + assert_eq!(usage.output_tokens, 1); + + // Terminator: data:[DONE]\n\r\n (same separator style) + let done_event = b"data:[DONE]\n\r\n"; + let done = s.feed_sse(done_event, 200_000); + assert!(done); + assert!(s.stream_completed); + } + + #[test] + fn test_tpot_missing_when_usage_absent() { + let mut s = make_session(OpenAIKind::ChatCompletions, true); + s.request_ts_us = 0; + s.first_output_ts_us = Some(100_000); + s.last_output_ts_us = Some(500_000); + // No usage -> TPOT should be None + let (_, tpot) = s.compute_timings(); + assert!(tpot.is_none()); + } +} diff --git a/server/agent_config/README-CH.md b/server/agent_config/README-CH.md index 6c557bd862c..d9e4492a5e9 100644 --- a/server/agent_config/README-CH.md +++ b/server/agent_config/README-CH.md @@ -2269,97 +2269,6 @@ inputs: - ebpf.profile.off_cpu(注意确认 `inputs.ebpf.profile.off_cpu.disabled` 已配置为 **false**) - ebpf.profile.memory(注意确认 `inputs.ebpf.profile.memory.disabled` 已配置为 **false**) -### 智能体治理 {#inputs.proc.ai_agent} - -#### HTTP 端点 {#inputs.proc.ai_agent.http_endpoints} - -**标签**: - -`hot_update` -ee_feature - -**FQCN**: - -`inputs.proc.ai_agent.http_endpoints` - -**默认值**: -```yaml -inputs: - proc: - ai_agent: - http_endpoints: - - /v1/chat/completions - - /v1/embeddings - - /v1/responses -``` - -**模式**: -| Key | Value | -| ---- | ---------------------------- | -| Type | string | - -**详细描述**: - -用于识别智能体的 HTTP 端点前缀,命中后会标记进程为 AI Agent。 - -#### 最大载荷大小 {#inputs.proc.ai_agent.max_payload_size} - -**标签**: - -`hot_update` -ee_feature - -**FQCN**: - -`inputs.proc.ai_agent.max_payload_size` - -**默认值**: -```yaml -inputs: - proc: - ai_agent: - max_payload_size: 0 -``` - -**模式**: -| Key | Value | -| ---- | ---------------------------- | -| Type | int | -| Unit | byte | -| Range | [0, 2147483647] | - -**详细描述**: - -AI Agent 流重组最大载荷大小,0 表示不限。 - -#### 文件 IO 事件 {#inputs.proc.ai_agent.file_io_enabled} - -**标签**: - -`hot_update` -ee_feature - -**FQCN**: - -`inputs.proc.ai_agent.file_io_enabled` - -**默认值**: -```yaml -inputs: - proc: - ai_agent: - file_io_enabled: true -``` - -**模式**: -| Key | Value | -| ---- | ---------------------------- | -| Type | bool | - -**详细描述**: - -是否开启 AI Agent 文件 IO 事件采集。 - ### 符号表 {#inputs.proc.symbol_table} #### Golang 特有 {#inputs.proc.symbol_table.golang_specific} @@ -8523,6 +8432,543 @@ processors: 开启后所有 gRPC 数据包都认为是 `stream` 类型,并且会将 `data` 类型数据包上报,同时延迟计算的响应使用带有 `grpc-status` 字段的。 +##### OpenAI API {#processors.request_log.application_protocol_inference.protocol_special_config.openai_api} + +OpenAI 兼容 API 协议增强配置。开启后,对命中配置路径的 HTTP/1 和 HTTP/2 流量进行 +OpenAI API 识别,并附加 LLM 专用指标(TTFT/TPOT/Token 用量)和业务维度属性(组织/用户/应用)。 + +###### 开启 OpenAI API 解析 {#processors.request_log.application_protocol_inference.protocol_special_config.openai_api.enabled} + +**标签**: + +agent_restart + +**FQCN**: + +`processors.request_log.application_protocol_inference.protocol_special_config.openai_api.enabled` + +**默认值**: +```yaml +processors: + request_log: + application_protocol_inference: + protocol_special_config: + openai_api: + enabled: false +``` + +**模式**: +| Key | Value | +| ---- | ---------------------------- | +| Type | bool | + +**详细描述**: + +开启 OpenAI 兼容 API 协议增强。 + +###### 路径前缀过滤 {#processors.request_log.application_protocol_inference.protocol_special_config.openai_api.path_prefixes} + +**标签**: + +agent_restart + +**FQCN**: + +`processors.request_log.application_protocol_inference.protocol_special_config.openai_api.path_prefixes` + +**默认值**: +```yaml +processors: + request_log: + application_protocol_inference: + protocol_special_config: + openai_api: + path_prefixes: [] +``` + +**模式**: +| Key | Value | +| ---- | ---------------------------- | +| Type | string[] | + +**详细描述**: + +路径以任意配置前缀开头的 HTTP 请求将进行 OpenAI API 增强。path_prefixes 与 +path_suffixes 独立匹配,满足其中任意一个即可。列表为空时该组不参与匹配; +两个列表同时为空则不匹配任何路径。 + +###### 路径后缀过滤 {#processors.request_log.application_protocol_inference.protocol_special_config.openai_api.path_suffixes} + +**标签**: + +agent_restart + +**FQCN**: + +`processors.request_log.application_protocol_inference.protocol_special_config.openai_api.path_suffixes` + +**默认值**: +```yaml +processors: + request_log: + application_protocol_inference: + protocol_special_config: + openai_api: + path_suffixes: + - /v1/chat/completions + - /v1/responses +``` + +**模式**: +| Key | Value | +| ---- | ---------------------------- | +| Type | string[] | + +**详细描述**: + +路径以任意配置后缀结尾的 HTTP 请求将进行 OpenAI API 增强。path_prefixes 与 +path_suffixes 独立匹配,满足其中任意一个即可。列表为空时该组不参与匹配; +两个列表同时为空则不匹配任何路径。 + +###### 请求体最大缓存字节数 {#processors.request_log.application_protocol_inference.protocol_special_config.openai_api.request_body_max_bytes} + +**标签**: + +agent_restart + +**FQCN**: + +`processors.request_log.application_protocol_inference.protocol_special_config.openai_api.request_body_max_bytes` + +**默认值**: +```yaml +processors: + request_log: + application_protocol_inference: + protocol_special_config: + openai_api: + request_body_max_bytes: 65536 +``` + +**模式**: +| Key | Value | +| ---- | ---------------------------- | +| Type | int | +| Unit | byte | +| Range | [1024, 1048576] | + +**详细描述**: + +请求体的最大缓存和解析字节数,用于提取 `stream` 标志和业务维度字段。 + +###### 响应事件最大解析字节数 {#processors.request_log.application_protocol_inference.protocol_special_config.openai_api.response_event_max_bytes} + +**标签**: + +agent_restart + +**FQCN**: + +`processors.request_log.application_protocol_inference.protocol_special_config.openai_api.response_event_max_bytes` + +**默认值**: +```yaml +processors: + request_log: + application_protocol_inference: + protocol_special_config: + openai_api: + response_event_max_bytes: 32768 +``` + +**模式**: +| Key | Value | +| ---- | ---------------------------- | +| Type | int | +| Unit | byte | +| Range | [1024, 1048576] | + +**详细描述**: + +单个 SSE 事件 JSON 的最大解析字节数(非流式响应体大小限制)。 + +###### SSE 缓冲区最大字节数 {#processors.request_log.application_protocol_inference.protocol_special_config.openai_api.sse_buffer_max_bytes} + +**标签**: + +agent_restart + +**FQCN**: + +`processors.request_log.application_protocol_inference.protocol_special_config.openai_api.sse_buffer_max_bytes` + +**默认值**: +```yaml +processors: + request_log: + application_protocol_inference: + protocol_special_config: + openai_api: + sse_buffer_max_bytes: 131072 +``` + +**模式**: +| Key | Value | +| ---- | ---------------------------- | +| Type | int | +| Unit | byte | +| Range | [4096, 4194304] | + +**详细描述**: + +SSE 事件跨多个响应包重组时的总缓冲上限。超限后丢弃后续事件并设置 +`attribute.llm_abort_reason=sse_buffer_overflow`。 + +###### Token 用量字段路径配置 {#processors.request_log.application_protocol_inference.protocol_special_config.openai_api.usage_field_paths} + +配置从 API 响应中提取 Token 用量的 JSON 路径。路径为相对于响应顶层 JSON 的点分格式 +(如 "usage.prompt_tokens" 解析为 json["usage"]["prompt_tokens"])。 +多个路径按顺序尝试,取第一个非空值。 + +####### 输入 Token 字段路径 {#processors.request_log.application_protocol_inference.protocol_special_config.openai_api.usage_field_paths.input_tokens} + +**标签**: + +agent_restart + +**FQCN**: + +`processors.request_log.application_protocol_inference.protocol_special_config.openai_api.usage_field_paths.input_tokens` + +**默认值**: +```yaml +processors: + request_log: + application_protocol_inference: + protocol_special_config: + openai_api: + usage_field_paths: + input_tokens: + - usage.prompt_tokens +``` + +**模式**: +| Key | Value | +| ---- | ---------------------------- | +| Type | string[] | + +**详细描述**: + +按顺序尝试的 JSON 路径,用于读取输入(prompt)Token 数量。 + +####### 输出 Token 字段路径 {#processors.request_log.application_protocol_inference.protocol_special_config.openai_api.usage_field_paths.output_tokens} + +**标签**: + +agent_restart + +**FQCN**: + +`processors.request_log.application_protocol_inference.protocol_special_config.openai_api.usage_field_paths.output_tokens` + +**默认值**: +```yaml +processors: + request_log: + application_protocol_inference: + protocol_special_config: + openai_api: + usage_field_paths: + output_tokens: + - usage.completion_tokens +``` + +**模式**: +| Key | Value | +| ---- | ---------------------------- | +| Type | string[] | + +**详细描述**: + +按顺序尝试的 JSON 路径,用于读取输出(completion)Token 数量。 + +####### 总 Token 字段路径 {#processors.request_log.application_protocol_inference.protocol_special_config.openai_api.usage_field_paths.total_tokens} + +**标签**: + +agent_restart + +**FQCN**: + +`processors.request_log.application_protocol_inference.protocol_special_config.openai_api.usage_field_paths.total_tokens` + +**默认值**: +```yaml +processors: + request_log: + application_protocol_inference: + protocol_special_config: + openai_api: + usage_field_paths: + total_tokens: + - usage.total_tokens +``` + +**模式**: +| Key | Value | +| ---- | ---------------------------- | +| Type | string[] | + +**详细描述**: + +按顺序尝试的 JSON 路径,用于读取总 Token 数量。 +若均未命中,则自动计算为 input_tokens + output_tokens。 + +####### 缓存 Token 字段路径 {#processors.request_log.application_protocol_inference.protocol_special_config.openai_api.usage_field_paths.cached_tokens} + +**标签**: + +agent_restart + +**FQCN**: + +`processors.request_log.application_protocol_inference.protocol_special_config.openai_api.usage_field_paths.cached_tokens` + +**默认值**: +```yaml +processors: + request_log: + application_protocol_inference: + protocol_special_config: + openai_api: + usage_field_paths: + cached_tokens: + - usage.prompt_tokens_details.cached_tokens + - usage.cache_read_input_tokens +``` + +**模式**: +| Key | Value | +| ---- | ---------------------------- | +| Type | string[] | + +**详细描述**: + +按顺序尝试的 JSON 路径,用于读取缓存(Prompt Cache 命中)Token 数量。 +usage.prompt_tokens_details.cached_tokens 对应 OpenAI API; +usage.cache_read_input_tokens 对应 Anthropic API。 +若均未命中,则不输出 llm_cached_tokens 指标。 + +###### 业务维度提取配置 {#processors.request_log.application_protocol_inference.protocol_special_config.openai_api.biz_dimension_extractors} + +配置从请求头或 JSON Body 字段提取 org_path、user_id、app_id 的规则。 +每个维度采用"首个命中即停止"策略:先按顺序尝试 headers,若均未命中则 +按顺序尝试 json_paths,找到第一个非空值后不再继续后续候选项。 + +####### 组织架构路径 {#processors.request_log.application_protocol_inference.protocol_special_config.openai_api.biz_dimension_extractors.org_path} + +提取组织架构路径维度的规则。 + +######## 请求头候选列表 {#processors.request_log.application_protocol_inference.protocol_special_config.openai_api.biz_dimension_extractors.org_path.headers} + +**标签**: + +agent_restart + +**FQCN**: + +`processors.request_log.application_protocol_inference.protocol_special_config.openai_api.biz_dimension_extractors.org_path.headers` + +**默认值**: +```yaml +processors: + request_log: + application_protocol_inference: + protocol_special_config: + openai_api: + biz_dimension_extractors: + org_path: + headers: + - x-org-path +``` + +**模式**: +| Key | Value | +| ---- | ---------------------------- | +| Type | string[] | + +**详细描述**: + +按顺序尝试的 HTTP 请求头名称,用于提取 org_path。 + +######## JSON 字段路径候选列表 {#processors.request_log.application_protocol_inference.protocol_special_config.openai_api.biz_dimension_extractors.org_path.json_paths} + +**标签**: + +agent_restart + +**FQCN**: + +`processors.request_log.application_protocol_inference.protocol_special_config.openai_api.biz_dimension_extractors.org_path.json_paths` + +**默认值**: +```yaml +processors: + request_log: + application_protocol_inference: + protocol_special_config: + openai_api: + biz_dimension_extractors: + org_path: + json_paths: [] +``` + +**模式**: +| Key | Value | +| ---- | ---------------------------- | +| Type | string[] | + +**详细描述**: + +按顺序尝试的 JSON Body 字段路径,用于提取 org_path。 +支持一级点号路径,如 "metadata.org_path"。 + +####### 用户标识 {#processors.request_log.application_protocol_inference.protocol_special_config.openai_api.biz_dimension_extractors.user_id} + +提取业务用户标识维度的规则。 + +######## 请求头候选列表 {#processors.request_log.application_protocol_inference.protocol_special_config.openai_api.biz_dimension_extractors.user_id.headers} + +**标签**: + +agent_restart + +**FQCN**: + +`processors.request_log.application_protocol_inference.protocol_special_config.openai_api.biz_dimension_extractors.user_id.headers` + +**默认值**: +```yaml +processors: + request_log: + application_protocol_inference: + protocol_special_config: + openai_api: + biz_dimension_extractors: + user_id: + headers: + - x-user-id +``` + +**模式**: +| Key | Value | +| ---- | ---------------------------- | +| Type | string[] | + +**详细描述**: + +按顺序尝试的 HTTP 请求头名称,用于提取 user_id。 + +######## JSON 字段路径候选列表 {#processors.request_log.application_protocol_inference.protocol_special_config.openai_api.biz_dimension_extractors.user_id.json_paths} + +**标签**: + +agent_restart + +**FQCN**: + +`processors.request_log.application_protocol_inference.protocol_special_config.openai_api.biz_dimension_extractors.user_id.json_paths` + +**默认值**: +```yaml +processors: + request_log: + application_protocol_inference: + protocol_special_config: + openai_api: + biz_dimension_extractors: + user_id: + json_paths: [] +``` + +**模式**: +| Key | Value | +| ---- | ---------------------------- | +| Type | string[] | + +**详细描述**: + +按顺序尝试的 JSON Body 字段路径,用于提取 user_id。 +支持一级点号路径,如 "metadata.user_id"。 + +####### 应用标识 {#processors.request_log.application_protocol_inference.protocol_special_config.openai_api.biz_dimension_extractors.app_id} + +提取应用标识维度的规则。 + +######## 请求头候选列表 {#processors.request_log.application_protocol_inference.protocol_special_config.openai_api.biz_dimension_extractors.app_id.headers} + +**标签**: + +agent_restart + +**FQCN**: + +`processors.request_log.application_protocol_inference.protocol_special_config.openai_api.biz_dimension_extractors.app_id.headers` + +**默认值**: +```yaml +processors: + request_log: + application_protocol_inference: + protocol_special_config: + openai_api: + biz_dimension_extractors: + app_id: + headers: + - appid +``` + +**模式**: +| Key | Value | +| ---- | ---------------------------- | +| Type | string[] | + +**详细描述**: + +按顺序尝试的 HTTP 请求头名称,用于提取 app_id。 + +######## JSON 字段路径候选列表 {#processors.request_log.application_protocol_inference.protocol_special_config.openai_api.biz_dimension_extractors.app_id.json_paths} + +**标签**: + +agent_restart + +**FQCN**: + +`processors.request_log.application_protocol_inference.protocol_special_config.openai_api.biz_dimension_extractors.app_id.json_paths` + +**默认值**: +```yaml +processors: + request_log: + application_protocol_inference: + protocol_special_config: + openai_api: + biz_dimension_extractors: + app_id: + json_paths: [] +``` + +**模式**: +| Key | Value | +| ---- | ---------------------------- | +| Type | string[] | + +**详细描述**: + +按顺序尝试的 JSON Body 字段路径,用于提取 app_id。 +支持一级点号路径,如 "metadata.app_id"。 + #### 自定义协议解析 {#processors.request_log.application_protocol_inference.custom_protocols} **标签**: diff --git a/server/agent_config/README.md b/server/agent_config/README.md index 801c71fed45..f824307f9ae 100644 --- a/server/agent_config/README.md +++ b/server/agent_config/README.md @@ -2299,97 +2299,6 @@ Also ensure the global configuration parameters for related features are enabled - ebpf.profile.off_cpu (Ensure `inputs.ebpf.profile.off_cpu.disabled` is configured to **false**) - ebpf.profile.memory (Ensure `inputs.ebpf.profile.memory.disabled` is configured to **false**) -### AI Agent {#inputs.proc.ai_agent} - -#### HTTP Endpoints {#inputs.proc.ai_agent.http_endpoints} - -**Tags**: - -`hot_update` -ee_feature - -**FQCN**: - -`inputs.proc.ai_agent.http_endpoints` - -**Default value**: -```yaml -inputs: - proc: - ai_agent: - http_endpoints: - - /v1/chat/completions - - /v1/embeddings - - /v1/responses -``` - -**Schema**: -| Key | Value | -| ---- | ---------------------------- | -| Type | string | - -**Description**: - -HTTP endpoints for AI agent recognition. Requests that match any prefix will mark the process as AI Agent. - -#### Max Payload Size {#inputs.proc.ai_agent.max_payload_size} - -**Tags**: - -`hot_update` -ee_feature - -**FQCN**: - -`inputs.proc.ai_agent.max_payload_size` - -**Default value**: -```yaml -inputs: - proc: - ai_agent: - max_payload_size: 0 -``` - -**Schema**: -| Key | Value | -| ---- | ---------------------------- | -| Type | int | -| Unit | byte | -| Range | [0, 2147483647] | - -**Description**: - -Maximum payload size for AI agent reassembly. 0 means unlimited. - -#### File IO Enabled {#inputs.proc.ai_agent.file_io_enabled} - -**Tags**: - -`hot_update` -ee_feature - -**FQCN**: - -`inputs.proc.ai_agent.file_io_enabled` - -**Default value**: -```yaml -inputs: - proc: - ai_agent: - file_io_enabled: true -``` - -**Schema**: -| Key | Value | -| ---- | ---------------------------- | -| Type | bool | - -**Description**: - -Whether to enable AI Agent file IO event collection. - ### Symbol Table {#inputs.proc.symbol_table} #### Golang-specific {#inputs.proc.symbol_table.golang_specific} @@ -8712,6 +8621,552 @@ processors: When enabled, all gRPC packets are considered to be of the `stream` type, and the `data` will be reported, and the rrt calculation of the response will use the `grpc-status` field. +##### OpenAI API {#processors.request_log.application_protocol_inference.protocol_special_config.openai_api} + +Configuration for OpenAI-compatible API protocol enhancement. +When enabled, HTTP/1 and HTTP/2 traffic matching the configured paths will be +recognised as OpenAI API calls and enriched with LLM-specific metrics and +business-dimension attributes (org_path, user_id, app_id). + +###### Enabled {#processors.request_log.application_protocol_inference.protocol_special_config.openai_api.enabled} + +**Tags**: + +agent_restart + +**FQCN**: + +`processors.request_log.application_protocol_inference.protocol_special_config.openai_api.enabled` + +**Default value**: +```yaml +processors: + request_log: + application_protocol_inference: + protocol_special_config: + openai_api: + enabled: false +``` + +**Schema**: +| Key | Value | +| ---- | ---------------------------- | +| Type | bool | + +**Description**: + +Enable OpenAI-compatible API protocol enhancement. + +###### Path Prefixes {#processors.request_log.application_protocol_inference.protocol_special_config.openai_api.path_prefixes} + +**Tags**: + +agent_restart + +**FQCN**: + +`processors.request_log.application_protocol_inference.protocol_special_config.openai_api.path_prefixes` + +**Default value**: +```yaml +processors: + request_log: + application_protocol_inference: + protocol_special_config: + openai_api: + path_prefixes: [] +``` + +**Schema**: +| Key | Value | +| ---- | ---------------------------- | +| Type | string[] | + +**Description**: + +HTTP requests whose path starts with any of these prefixes will be treated +as OpenAI API calls. path_prefixes and path_suffixes are matched independently +with OR logic: a path is accepted if it satisfies either list. An empty list +contributes nothing; if both lists are empty, no paths will be matched. + +###### Path Suffixes {#processors.request_log.application_protocol_inference.protocol_special_config.openai_api.path_suffixes} + +**Tags**: + +agent_restart + +**FQCN**: + +`processors.request_log.application_protocol_inference.protocol_special_config.openai_api.path_suffixes` + +**Default value**: +```yaml +processors: + request_log: + application_protocol_inference: + protocol_special_config: + openai_api: + path_suffixes: + - /v1/chat/completions + - /v1/responses +``` + +**Schema**: +| Key | Value | +| ---- | ---------------------------- | +| Type | string[] | + +**Description**: + +HTTP requests whose path ends with any of these suffixes will be treated +as OpenAI API calls. path_prefixes and path_suffixes are matched independently +with OR logic: a path is accepted if it satisfies either list. An empty list +contributes nothing; if both lists are empty, no paths will be matched. + +###### Request Body Max Bytes {#processors.request_log.application_protocol_inference.protocol_special_config.openai_api.request_body_max_bytes} + +**Tags**: + +agent_restart + +**FQCN**: + +`processors.request_log.application_protocol_inference.protocol_special_config.openai_api.request_body_max_bytes` + +**Default value**: +```yaml +processors: + request_log: + application_protocol_inference: + protocol_special_config: + openai_api: + request_body_max_bytes: 65536 +``` + +**Schema**: +| Key | Value | +| ---- | ---------------------------- | +| Type | int | +| Unit | byte | +| Range | [1024, 1048576] | + +**Description**: + +Maximum bytes of the request body to cache and parse for extracting +the `stream` flag and business-dimension fields. + +###### Response Event Max Bytes {#processors.request_log.application_protocol_inference.protocol_special_config.openai_api.response_event_max_bytes} + +**Tags**: + +agent_restart + +**FQCN**: + +`processors.request_log.application_protocol_inference.protocol_special_config.openai_api.response_event_max_bytes` + +**Default value**: +```yaml +processors: + request_log: + application_protocol_inference: + protocol_special_config: + openai_api: + response_event_max_bytes: 32768 +``` + +**Schema**: +| Key | Value | +| ---- | ---------------------------- | +| Type | int | +| Unit | byte | +| Range | [1024, 1048576] | + +**Description**: + +Maximum bytes per SSE event JSON to parse (non-streaming response body limit). + +###### SSE Buffer Max Bytes {#processors.request_log.application_protocol_inference.protocol_special_config.openai_api.sse_buffer_max_bytes} + +**Tags**: + +agent_restart + +**FQCN**: + +`processors.request_log.application_protocol_inference.protocol_special_config.openai_api.sse_buffer_max_bytes` + +**Default value**: +```yaml +processors: + request_log: + application_protocol_inference: + protocol_special_config: + openai_api: + sse_buffer_max_bytes: 131072 +``` + +**Schema**: +| Key | Value | +| ---- | ---------------------------- | +| Type | int | +| Unit | byte | +| Range | [4096, 4194304] | + +**Description**: + +Maximum total bytes buffered for SSE event reassembly across multiple +response packets. When exceeded, further events are dropped and +`attribute.llm_abort_reason=sse_buffer_overflow` is set. + +###### Usage Field Paths {#processors.request_log.application_protocol_inference.protocol_special_config.openai_api.usage_field_paths} + +Configure the JSON paths used to extract token usage counts from API +responses. Paths use dot-notation relative to the top-level response JSON +(e.g. "usage.prompt_tokens" resolves as json["usage"]["prompt_tokens"]). +Multiple paths are tried in order; the first non-null value wins. + +####### Input Tokens {#processors.request_log.application_protocol_inference.protocol_special_config.openai_api.usage_field_paths.input_tokens} + +**Tags**: + +agent_restart + +**FQCN**: + +`processors.request_log.application_protocol_inference.protocol_special_config.openai_api.usage_field_paths.input_tokens` + +**Default value**: +```yaml +processors: + request_log: + application_protocol_inference: + protocol_special_config: + openai_api: + usage_field_paths: + input_tokens: + - usage.prompt_tokens +``` + +**Schema**: +| Key | Value | +| ---- | ---------------------------- | +| Type | string[] | + +**Description**: + +JSON paths to read the input (prompt) token count, tried in order. + +####### Output Tokens {#processors.request_log.application_protocol_inference.protocol_special_config.openai_api.usage_field_paths.output_tokens} + +**Tags**: + +agent_restart + +**FQCN**: + +`processors.request_log.application_protocol_inference.protocol_special_config.openai_api.usage_field_paths.output_tokens` + +**Default value**: +```yaml +processors: + request_log: + application_protocol_inference: + protocol_special_config: + openai_api: + usage_field_paths: + output_tokens: + - usage.completion_tokens +``` + +**Schema**: +| Key | Value | +| ---- | ---------------------------- | +| Type | string[] | + +**Description**: + +JSON paths to read the output (completion) token count, tried in order. + +####### Total Tokens {#processors.request_log.application_protocol_inference.protocol_special_config.openai_api.usage_field_paths.total_tokens} + +**Tags**: + +agent_restart + +**FQCN**: + +`processors.request_log.application_protocol_inference.protocol_special_config.openai_api.usage_field_paths.total_tokens` + +**Default value**: +```yaml +processors: + request_log: + application_protocol_inference: + protocol_special_config: + openai_api: + usage_field_paths: + total_tokens: + - usage.total_tokens +``` + +**Schema**: +| Key | Value | +| ---- | ---------------------------- | +| Type | string[] | + +**Description**: + +JSON paths to read the total token count, tried in order. +If none match, total is computed as input_tokens + output_tokens. + +####### Cached Tokens {#processors.request_log.application_protocol_inference.protocol_special_config.openai_api.usage_field_paths.cached_tokens} + +**Tags**: + +agent_restart + +**FQCN**: + +`processors.request_log.application_protocol_inference.protocol_special_config.openai_api.usage_field_paths.cached_tokens` + +**Default value**: +```yaml +processors: + request_log: + application_protocol_inference: + protocol_special_config: + openai_api: + usage_field_paths: + cached_tokens: + - usage.prompt_tokens_details.cached_tokens + - usage.cache_read_input_tokens +``` + +**Schema**: +| Key | Value | +| ---- | ---------------------------- | +| Type | string[] | + +**Description**: + +JSON paths to read the cached (prompt cache hit) token count, tried in order. +usage.prompt_tokens_details.cached_tokens covers the OpenAI API; +usage.cache_read_input_tokens covers the Anthropic API. +If none match, the llm_cached_tokens metric is omitted. + +###### Business Dimension Extractors {#processors.request_log.application_protocol_inference.protocol_special_config.openai_api.biz_dimension_extractors} + +Configure how to extract org_path, user_id, and app_id from request +headers or JSON body fields. Extraction uses a first-match-wins strategy: +for each dimension, headers are tried in listed order first; if none match, +json_paths are tried in listed order. The first non-empty value found stops +further attempts for that dimension. + +####### Org Path {#processors.request_log.application_protocol_inference.protocol_special_config.openai_api.biz_dimension_extractors.org_path} + +Rules for extracting the organization/department path dimension. + +######## Headers {#processors.request_log.application_protocol_inference.protocol_special_config.openai_api.biz_dimension_extractors.org_path.headers} + +**Tags**: + +agent_restart + +**FQCN**: + +`processors.request_log.application_protocol_inference.protocol_special_config.openai_api.biz_dimension_extractors.org_path.headers` + +**Default value**: +```yaml +processors: + request_log: + application_protocol_inference: + protocol_special_config: + openai_api: + biz_dimension_extractors: + org_path: + headers: + - x-org-path +``` + +**Schema**: +| Key | Value | +| ---- | ---------------------------- | +| Type | string[] | + +**Description**: + +HTTP request header names to try, in order, for org_path extraction. + +######## JSON Paths {#processors.request_log.application_protocol_inference.protocol_special_config.openai_api.biz_dimension_extractors.org_path.json_paths} + +**Tags**: + +agent_restart + +**FQCN**: + +`processors.request_log.application_protocol_inference.protocol_special_config.openai_api.biz_dimension_extractors.org_path.json_paths` + +**Default value**: +```yaml +processors: + request_log: + application_protocol_inference: + protocol_special_config: + openai_api: + biz_dimension_extractors: + org_path: + json_paths: [] +``` + +**Schema**: +| Key | Value | +| ---- | ---------------------------- | +| Type | string[] | + +**Description**: + +JSON body field paths to try, in order, for org_path extraction. +Supports one level of dot notation, e.g. "metadata.org_path". + +####### User ID {#processors.request_log.application_protocol_inference.protocol_special_config.openai_api.biz_dimension_extractors.user_id} + +Rules for extracting the business user identifier dimension. + +######## Headers {#processors.request_log.application_protocol_inference.protocol_special_config.openai_api.biz_dimension_extractors.user_id.headers} + +**Tags**: + +agent_restart + +**FQCN**: + +`processors.request_log.application_protocol_inference.protocol_special_config.openai_api.biz_dimension_extractors.user_id.headers` + +**Default value**: +```yaml +processors: + request_log: + application_protocol_inference: + protocol_special_config: + openai_api: + biz_dimension_extractors: + user_id: + headers: + - x-user-id +``` + +**Schema**: +| Key | Value | +| ---- | ---------------------------- | +| Type | string[] | + +**Description**: + +HTTP request header names to try, in order, for user_id extraction. + +######## JSON Paths {#processors.request_log.application_protocol_inference.protocol_special_config.openai_api.biz_dimension_extractors.user_id.json_paths} + +**Tags**: + +agent_restart + +**FQCN**: + +`processors.request_log.application_protocol_inference.protocol_special_config.openai_api.biz_dimension_extractors.user_id.json_paths` + +**Default value**: +```yaml +processors: + request_log: + application_protocol_inference: + protocol_special_config: + openai_api: + biz_dimension_extractors: + user_id: + json_paths: [] +``` + +**Schema**: +| Key | Value | +| ---- | ---------------------------- | +| Type | string[] | + +**Description**: + +JSON body field paths to try, in order, for user_id extraction. +Supports one level of dot notation, e.g. "metadata.user_id". + +####### App ID {#processors.request_log.application_protocol_inference.protocol_special_config.openai_api.biz_dimension_extractors.app_id} + +Rules for extracting the application identifier dimension. + +######## Headers {#processors.request_log.application_protocol_inference.protocol_special_config.openai_api.biz_dimension_extractors.app_id.headers} + +**Tags**: + +agent_restart + +**FQCN**: + +`processors.request_log.application_protocol_inference.protocol_special_config.openai_api.biz_dimension_extractors.app_id.headers` + +**Default value**: +```yaml +processors: + request_log: + application_protocol_inference: + protocol_special_config: + openai_api: + biz_dimension_extractors: + app_id: + headers: + - appid +``` + +**Schema**: +| Key | Value | +| ---- | ---------------------------- | +| Type | string[] | + +**Description**: + +HTTP request header names to try, in order, for app_id extraction. + +######## JSON Paths {#processors.request_log.application_protocol_inference.protocol_special_config.openai_api.biz_dimension_extractors.app_id.json_paths} + +**Tags**: + +agent_restart + +**FQCN**: + +`processors.request_log.application_protocol_inference.protocol_special_config.openai_api.biz_dimension_extractors.app_id.json_paths` + +**Default value**: +```yaml +processors: + request_log: + application_protocol_inference: + protocol_special_config: + openai_api: + biz_dimension_extractors: + app_id: + json_paths: [] +``` + +**Schema**: +| Key | Value | +| ---- | ---------------------------- | +| Type | string[] | + +**Description**: + +JSON body field paths to try, in order, for app_id extraction. +Supports one level of dot notation, e.g. "metadata.app_id". + #### Custom Protocol Parsing {#processors.request_log.application_protocol_inference.custom_protocols} **Tags**: diff --git a/server/agent_config/template.yaml b/server/agent_config/template.yaml index b10a5ba85c8..f5d1502064d 100644 --- a/server/agent_config/template.yaml +++ b/server/agent_config/template.yaml @@ -6138,6 +6138,341 @@ processors: # ch: |- # 开启后所有 gRPC 数据包都认为是 `stream` 类型,并且会将 `data` 类型数据包上报,同时延迟计算的响应使用带有 `grpc-status` 字段的。 streaming_data_enabled: false + # type: section + # name: OpenAI API + # description: + # en: |- + # Configuration for OpenAI-compatible API protocol enhancement. + # When enabled, HTTP/1 and HTTP/2 traffic matching the configured paths will be + # recognised as OpenAI API calls and enriched with LLM-specific metrics and + # business-dimension attributes (org_path, user_id, app_id). + # ch: |- + # OpenAI 兼容 API 协议增强配置。开启后,对命中配置路径的 HTTP/1 和 HTTP/2 流量进行 + # OpenAI API 识别,并附加 LLM 专用指标(TTFT/TPOT/Token 用量)和业务维度属性(组织/用户/应用)。 + openai_api: + # type: bool + # name: + # en: Enabled + # ch: 开启 OpenAI API 解析 + # unit: + # range: [] + # enum_options: [] + # modification: agent_restart + # ee_feature: false + # description: + # en: |- + # Enable OpenAI-compatible API protocol enhancement. + # ch: |- + # 开启 OpenAI 兼容 API 协议增强。 + enabled: false + # type: string[] + # name: + # en: Path Prefixes + # ch: 路径前缀过滤 + # unit: + # range: [] + # enum_options: [] + # modification: agent_restart + # ee_feature: false + # description: + # en: |- + # HTTP requests whose path starts with any of these prefixes will be treated + # as OpenAI API calls. path_prefixes and path_suffixes are matched independently + # with OR logic: a path is accepted if it satisfies either list. An empty list + # contributes nothing; if both lists are empty, no paths will be matched. + # ch: |- + # 路径以任意配置前缀开头的 HTTP 请求将进行 OpenAI API 增强。path_prefixes 与 + # path_suffixes 独立匹配,满足其中任意一个即可。列表为空时该组不参与匹配; + # 两个列表同时为空则不匹配任何路径。 + path_prefixes: [] + # type: string[] + # name: + # en: Path Suffixes + # ch: 路径后缀过滤 + # unit: + # range: [] + # enum_options: [] + # modification: agent_restart + # ee_feature: false + # description: + # en: |- + # HTTP requests whose path ends with any of these suffixes will be treated + # as OpenAI API calls. path_prefixes and path_suffixes are matched independently + # with OR logic: a path is accepted if it satisfies either list. An empty list + # contributes nothing; if both lists are empty, no paths will be matched. + # ch: |- + # 路径以任意配置后缀结尾的 HTTP 请求将进行 OpenAI API 增强。path_prefixes 与 + # path_suffixes 独立匹配,满足其中任意一个即可。列表为空时该组不参与匹配; + # 两个列表同时为空则不匹配任何路径。 + path_suffixes: + - /v1/chat/completions + - /v1/responses + # type: int + # name: + # en: Request Body Max Bytes + # ch: 请求体最大缓存字节数 + # unit: byte + # range: [1024, 1048576] + # enum_options: [] + # modification: agent_restart + # ee_feature: false + # description: + # en: |- + # Maximum bytes of the request body to cache and parse for extracting + # the `stream` flag and business-dimension fields. + # ch: |- + # 请求体的最大缓存和解析字节数,用于提取 `stream` 标志和业务维度字段。 + request_body_max_bytes: 65536 + # type: int + # name: + # en: Response Event Max Bytes + # ch: 响应事件最大解析字节数 + # unit: byte + # range: [1024, 1048576] + # enum_options: [] + # modification: agent_restart + # ee_feature: false + # description: + # en: |- + # Maximum bytes per SSE event JSON to parse (non-streaming response body limit). + # ch: |- + # 单个 SSE 事件 JSON 的最大解析字节数(非流式响应体大小限制)。 + response_event_max_bytes: 32768 + # type: int + # name: + # en: SSE Buffer Max Bytes + # ch: SSE 缓冲区最大字节数 + # unit: byte + # range: [4096, 4194304] + # enum_options: [] + # modification: agent_restart + # ee_feature: false + # description: + # en: |- + # Maximum total bytes buffered for SSE event reassembly across multiple + # response packets. When exceeded, further events are dropped and + # `attribute.llm_abort_reason=sse_buffer_overflow` is set. + # ch: |- + # SSE 事件跨多个响应包重组时的总缓冲上限。超限后丢弃后续事件并设置 + # `attribute.llm_abort_reason=sse_buffer_overflow`。 + sse_buffer_max_bytes: 131072 + # type: section + # name: + # en: Usage Field Paths + # ch: Token 用量字段路径配置 + # description: + # en: |- + # Configure the JSON paths used to extract token usage counts from API + # responses. Paths use dot-notation relative to the top-level response JSON + # (e.g. "usage.prompt_tokens" resolves as json["usage"]["prompt_tokens"]). + # Multiple paths are tried in order; the first non-null value wins. + # ch: |- + # 配置从 API 响应中提取 Token 用量的 JSON 路径。路径为相对于响应顶层 JSON 的点分格式 + # (如 "usage.prompt_tokens" 解析为 json["usage"]["prompt_tokens"])。 + # 多个路径按顺序尝试,取第一个非空值。 + usage_field_paths: + # type: string[] + # name: + # en: Input Tokens + # ch: 输入 Token 字段路径 + # unit: + # range: [] + # enum_options: [] + # modification: agent_restart + # ee_feature: false + # description: + # en: JSON paths to read the input (prompt) token count, tried in order. + # ch: 按顺序尝试的 JSON 路径,用于读取输入(prompt)Token 数量。 + input_tokens: + - usage.prompt_tokens + # type: string[] + # name: + # en: Output Tokens + # ch: 输出 Token 字段路径 + # unit: + # range: [] + # enum_options: [] + # modification: agent_restart + # ee_feature: false + # description: + # en: JSON paths to read the output (completion) token count, tried in order. + # ch: 按顺序尝试的 JSON 路径,用于读取输出(completion)Token 数量。 + output_tokens: + - usage.completion_tokens + # type: string[] + # name: + # en: Total Tokens + # ch: 总 Token 字段路径 + # unit: + # range: [] + # enum_options: [] + # modification: agent_restart + # ee_feature: false + # description: + # en: |- + # JSON paths to read the total token count, tried in order. + # If none match, total is computed as input_tokens + output_tokens. + # ch: |- + # 按顺序尝试的 JSON 路径,用于读取总 Token 数量。 + # 若均未命中,则自动计算为 input_tokens + output_tokens。 + total_tokens: + - usage.total_tokens + # type: string[] + # name: + # en: Cached Tokens + # ch: 缓存 Token 字段路径 + # unit: + # range: [] + # enum_options: [] + # modification: agent_restart + # ee_feature: false + # description: + # en: |- + # JSON paths to read the cached (prompt cache hit) token count, tried in order. + # usage.prompt_tokens_details.cached_tokens covers the OpenAI API; + # usage.cache_read_input_tokens covers the Anthropic API. + # If none match, the llm_cached_tokens metric is omitted. + # ch: |- + # 按顺序尝试的 JSON 路径,用于读取缓存(Prompt Cache 命中)Token 数量。 + # usage.prompt_tokens_details.cached_tokens 对应 OpenAI API; + # usage.cache_read_input_tokens 对应 Anthropic API。 + # 若均未命中,则不输出 llm_cached_tokens 指标。 + cached_tokens: + - usage.prompt_tokens_details.cached_tokens + - usage.cache_read_input_tokens + # type: section + # name: + # en: Business Dimension Extractors + # ch: 业务维度提取配置 + # description: + # en: |- + # Configure how to extract org_path, user_id, and app_id from request + # headers or JSON body fields. Extraction uses a first-match-wins strategy: + # for each dimension, headers are tried in listed order first; if none match, + # json_paths are tried in listed order. The first non-empty value found stops + # further attempts for that dimension. + # ch: |- + # 配置从请求头或 JSON Body 字段提取 org_path、user_id、app_id 的规则。 + # 每个维度采用"首个命中即停止"策略:先按顺序尝试 headers,若均未命中则 + # 按顺序尝试 json_paths,找到第一个非空值后不再继续后续候选项。 + biz_dimension_extractors: + # type: section + # name: + # en: Org Path + # ch: 组织架构路径 + # description: + # en: Rules for extracting the organization/department path dimension. + # ch: 提取组织架构路径维度的规则。 + org_path: + # type: string[] + # name: + # en: Headers + # ch: 请求头候选列表 + # unit: + # range: [] + # enum_options: [] + # modification: agent_restart + # ee_feature: false + # description: + # en: HTTP request header names to try, in order, for org_path extraction. + # ch: 按顺序尝试的 HTTP 请求头名称,用于提取 org_path。 + headers: + - x-org-path + # type: string[] + # name: + # en: JSON Paths + # ch: JSON 字段路径候选列表 + # unit: + # range: [] + # enum_options: [] + # modification: agent_restart + # ee_feature: false + # description: + # en: |- + # JSON body field paths to try, in order, for org_path extraction. + # Supports one level of dot notation, e.g. "metadata.org_path". + # ch: |- + # 按顺序尝试的 JSON Body 字段路径,用于提取 org_path。 + # 支持一级点号路径,如 "metadata.org_path"。 + json_paths: [] + # type: section + # name: + # en: User ID + # ch: 用户标识 + # description: + # en: Rules for extracting the business user identifier dimension. + # ch: 提取业务用户标识维度的规则。 + user_id: + # type: string[] + # name: + # en: Headers + # ch: 请求头候选列表 + # unit: + # range: [] + # enum_options: [] + # modification: agent_restart + # ee_feature: false + # description: + # en: HTTP request header names to try, in order, for user_id extraction. + # ch: 按顺序尝试的 HTTP 请求头名称,用于提取 user_id。 + headers: + - x-user-id + # type: string[] + # name: + # en: JSON Paths + # ch: JSON 字段路径候选列表 + # unit: + # range: [] + # enum_options: [] + # modification: agent_restart + # ee_feature: false + # description: + # en: |- + # JSON body field paths to try, in order, for user_id extraction. + # Supports one level of dot notation, e.g. "metadata.user_id". + # ch: |- + # 按顺序尝试的 JSON Body 字段路径,用于提取 user_id。 + # 支持一级点号路径,如 "metadata.user_id"。 + json_paths: [] + # type: section + # name: + # en: App ID + # ch: 应用标识 + # description: + # en: Rules for extracting the application identifier dimension. + # ch: 提取应用标识维度的规则。 + app_id: + # type: string[] + # name: + # en: Headers + # ch: 请求头候选列表 + # unit: + # range: [] + # enum_options: [] + # modification: agent_restart + # ee_feature: false + # description: + # en: HTTP request header names to try, in order, for app_id extraction. + # ch: 按顺序尝试的 HTTP 请求头名称,用于提取 app_id。 + headers: + - appid + # type: string[] + # name: + # en: JSON Paths + # ch: JSON 字段路径候选列表 + # unit: + # range: [] + # enum_options: [] + # modification: agent_restart + # ee_feature: false + # description: + # en: |- + # JSON body field paths to try, in order, for app_id extraction. + # Supports one level of dot notation, e.g. "metadata.app_id". + # ch: |- + # 按顺序尝试的 JSON Body 字段路径,用于提取 app_id。 + # 支持一级点号路径,如 "metadata.app_id"。 + json_paths: [] # type: dict # name: # en: Custom Protocol Parsing