Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
87 changes: 87 additions & 0 deletions agent/src/config/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2028,6 +2028,92 @@ impl Default for InferenceWhitelist {
}
}

#[derive(Clone, Debug, Default, Deserialize, PartialEq, Eq)]
#[serde(default)]
pub struct OpenAIBizDimExtractor {
pub headers: Vec<String>,
pub json_paths: Vec<String>,
}

#[derive(Clone, Debug, Default, Deserialize, PartialEq, Eq)]
#[serde(default)]
pub struct OpenAIBizDimExtractors {
pub org_path: OpenAIBizDimExtractor,
pub user_id: OpenAIBizDimExtractor,
pub app_id: OpenAIBizDimExtractor,
}

#[derive(Clone, Debug, Deserialize, PartialEq, Eq)]
#[serde(default)]
pub struct OpenAIUsageFieldPaths {
/// JSON paths (dot-notation) to read the input token count, tried in order.
pub input_tokens: Vec<String>,
/// JSON paths (dot-notation) to read the output token count, tried in order.
pub output_tokens: Vec<String>,
/// JSON paths (dot-notation) to read the total token count, tried in order.
pub total_tokens: Vec<String>,
/// JSON paths (dot-notation) to read the cached token count, tried in order.
pub cached_tokens: Vec<String>,
}

impl Default for OpenAIUsageFieldPaths {
fn default() -> Self {
Self {
input_tokens: vec!["usage.prompt_tokens".to_string()],
output_tokens: vec!["usage.completion_tokens".to_string()],
total_tokens: vec!["usage.total_tokens".to_string()],
cached_tokens: vec![
"usage.prompt_tokens_details.cached_tokens".to_string(),
"usage.cache_read_input_tokens".to_string(),
],
}
}
}

#[derive(Clone, Debug, Deserialize, PartialEq, Eq)]
#[serde(default)]
pub struct OpenAIApiConfig {
pub enabled: bool,
pub path_prefixes: Vec<String>,
pub path_suffixes: Vec<String>,
pub request_body_max_bytes: usize,
pub response_event_max_bytes: usize,
pub sse_buffer_max_bytes: usize,
pub usage_field_paths: OpenAIUsageFieldPaths,
pub biz_dimension_extractors: OpenAIBizDimExtractors,
}

impl Default for OpenAIApiConfig {
fn default() -> Self {
Self {
enabled: false,
path_prefixes: vec![],
path_suffixes: vec![
"/v1/chat/completions".to_string(),
"/v1/responses".to_string(),
],
request_body_max_bytes: 65536,
response_event_max_bytes: 32768,
sse_buffer_max_bytes: 131072,
usage_field_paths: OpenAIUsageFieldPaths::default(),
biz_dimension_extractors: OpenAIBizDimExtractors {
org_path: OpenAIBizDimExtractor {
headers: vec!["x-org-path".to_string()],
json_paths: vec![],
},
user_id: OpenAIBizDimExtractor {
headers: vec!["x-user-id".to_string()],
json_paths: vec![],
},
app_id: OpenAIBizDimExtractor {
headers: vec!["appid".to_string()],
json_paths: vec![],
},
},
}
}
}

#[derive(Clone, Default, Debug, Deserialize, PartialEq, Eq)]
#[serde(default)]
pub struct ProtocolSpecialConfig {
Expand All @@ -2037,6 +2123,7 @@ pub struct ProtocolSpecialConfig {
pub net_sign: NetSignConfig,
pub mysql: MysqlConfig,
pub grpc: GrpcConfig,
pub openai_api: OpenAIApiConfig,
}

#[derive(Clone, Debug, Deserialize, PartialEq, Eq)]
Expand Down
17 changes: 14 additions & 3 deletions agent/src/config/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,10 @@ use super::config::{Ebpf, EbpfFileIoEvent, ProcessMatcher, SymbolTable};
use super::{
config::{
ApiResources, Config, DpdkSource, ExtraLogFields, ExtraLogFieldsInfo, HttpEndpoint,
HttpEndpointMatchRule, Iso8583ParseConfig, NetSignParseConfig, OracleConfig, PcapStream,
PortConfig, ProcessorsFlowLogTunning, RequestLogTunning, SessionTimeout, TagFilterOperator,
Timeouts, UserConfig, WebSphereMqParseConfig, GRPC_BUFFER_SIZE_MIN,
HttpEndpointMatchRule, Iso8583ParseConfig, NetSignParseConfig, OpenAIApiConfig,
OracleConfig, PcapStream, PortConfig, ProcessorsFlowLogTunning, RequestLogTunning,
SessionTimeout, TagFilterOperator, Timeouts, UserConfig, WebSphereMqParseConfig,
GRPC_BUFFER_SIZE_MIN,
},
ConfigError, KubernetesPollerType, TrafficOverflowAction,
};
Expand Down Expand Up @@ -1205,6 +1206,7 @@ pub struct LogParserConfig {
pub unconcerned_dns_nxdomain_trie: DomainNameTrie,
pub mysql_decompress_payload: bool,
pub mysql_endpoint_disabled: bool,
pub openai_api: OpenAIApiConfig,
pub custom_app: CustomAppConfig,
}

Expand All @@ -1225,6 +1227,7 @@ impl Default for LogParserConfig {
unconcerned_dns_nxdomain_trie: DomainNameTrie::default(),
mysql_decompress_payload: true,
mysql_endpoint_disabled: true,
openai_api: OpenAIApiConfig::default(),
custom_app: CustomAppConfig::default(),
}
}
Expand Down Expand Up @@ -1272,6 +1275,7 @@ impl fmt::Debug for LogParserConfig {
)
.field("mysql_decompress_payload", &self.mysql_decompress_payload)
.field("mysql_endpoint_disabled", &self.mysql_endpoint_disabled)
.field("openai_api_enabled", &self.openai_api.enabled)
.field("custom_app", &self.custom_app)
.finish()
}
Expand Down Expand Up @@ -2382,6 +2386,13 @@ impl TryFrom<(Config, UserConfig)> for ModuleConfig {
.protocol_special_config
.mysql
.endpoint_disabled,
openai_api: conf
.processors
.request_log
.application_protocol_inference
.protocol_special_config
.openai_api
.clone(),
#[cfg(not(feature = "enterprise"))]
custom_app: CustomAppConfig::default(),
#[cfg(feature = "enterprise")]
Expand Down
26 changes: 21 additions & 5 deletions agent/src/flow_generator/protocol_logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ pub(crate) mod dns;
pub(crate) mod fastcgi;
pub(crate) mod http;
pub(crate) mod mq;
pub(crate) mod openai_api;
mod parser;
pub mod pb_adapter;
pub(crate) mod ping;
Expand Down Expand Up @@ -413,15 +414,30 @@ impl AppProtoLogsBaseInfo {
}

// go http2 uprobe may merge multi times, if not req and resp merge can not set to session
// Save whether this entry was already a Session before the merge so we can
// decide whether to recompute rrt below.
let was_already_session = self.head.msg_type == LogMessageType::Session;
if self.head.msg_type != log.head.msg_type {
self.head.msg_type = LogMessageType::Session;
}

self.head.rrt = if self.end_time > self.start_time {
(self.end_time.as_micros() - self.start_time.as_micros()) as u64
} else {
0
};
// Freeze rrt after the first req→resp merge.
//
// On the initial merge (Request + Response → Session), end_time equals the
// first-response packet time, so `end_time - start_time` naturally gives
// first-response latency — the same semantics as non-streaming HTTP.
//
// For multi-merge protocols (SSE streaming, Go HTTP2 uprobe), each
// continuation packet would push end_time forward and make rrt equal to
// the total stream duration instead. Skipping the recomputation once the
// entry is already a Session preserves the first-response latency value.
if !was_already_session {
self.head.rrt = if self.end_time > self.start_time {
(self.end_time.as_micros() - self.start_time.as_micros()) as u64
} else {
0
};
}

if self.biz_type == 0 {
self.biz_type = log.biz_type;
Expand Down
2 changes: 1 addition & 1 deletion agent/src/flow_generator/protocol_logs/consts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ pub const HTTP_STATUS_CLIENT_ERROR_MIN: u16 = 400;
pub const HTTP_STATUS_CLIENT_ERROR_MAX: u16 = 499;
pub const HTTP_STATUS_SERVER_ERROR_MIN: u16 = 500;
pub const HTTP_STATUS_SERVER_ERROR_MAX: u16 = 600;
pub const HTTP_RESP_MIN_LEN: usize = 13; // 响应行:"HTTP/1.1 200 "
pub const HTTP_RESP_MIN_LEN: usize = 12; // 响应行:"HTTP/1.1 200"(reason phrase 可省略,RFC 7230 允许)

pub const HTTP_HOST_OFFSET: usize = 6;
pub const HTTP_CONTENT_LENGTH_OFFSET: usize = 16;
Expand Down
Loading
Loading