diff --git a/app/buck2_client_ctx/src/streaming.rs b/app/buck2_client_ctx/src/streaming.rs index b295eed1b5b38..8a6d1d21c2fec 100644 --- a/app/buck2_client_ctx/src/streaming.rs +++ b/app/buck2_client_ctx/src/streaming.rs @@ -300,6 +300,7 @@ fn get_event_log_subscriber( let user_event_log = cmd.user_event_log(); let logdir = paths.log_dir(); + let daemon_startup_config = ctx.immediate_config.daemon_startup_config().ok(); let log = EventLog::new( logdir, ctx.working_dir.clone(), @@ -312,10 +313,10 @@ fn get_event_log_subscriber( T::COMMAND_NAME.to_owned(), ctx.start_time, log_size_counter_bytes, - ctx.immediate_config - .daemon_startup_config() - .map(|daemon_startup_config| daemon_startup_config.retained_event_logs) + daemon_startup_config + .map(|c| c.retained_event_logs) .unwrap_or(DEFAULT_RETAINED_EVENT_LOGS), + daemon_startup_config.and_then(|c| c.log_upload_url.clone()), ); Box::new(log) } diff --git a/app/buck2_client_ctx/src/subscribers/event_log.rs b/app/buck2_client_ctx/src/subscribers/event_log.rs index f13dae56283fa..6ab53c646128e 100644 --- a/app/buck2_client_ctx/src/subscribers/event_log.rs +++ b/app/buck2_client_ctx/src/subscribers/event_log.rs @@ -40,6 +40,7 @@ impl EventLog { start_time: SystemTime, log_size_counter_bytes: Option>, retained_event_logs: usize, + log_upload_url: Option, ) -> EventLog { Self { writer: WriteEventLog::new( @@ -52,6 +53,7 @@ impl EventLog { start_time, log_size_counter_bytes, retained_event_logs, + log_upload_url, ), } } diff --git a/app/buck2_client_ctx/src/subscribers/re_log.rs b/app/buck2_client_ctx/src/subscribers/re_log.rs index 45d736f76ef67..0818461c7ff3a 100644 --- a/app/buck2_client_ctx/src/subscribers/re_log.rs +++ b/app/buck2_client_ctx/src/subscribers/re_log.rs @@ -82,7 +82,7 @@ async fn log_upload_impl( session_id: String, isolation_dir: FileNameBuf, ) -> buck2_error::Result<()> { - if !should_upload_log()? { + if buck2_core::is_open_source() || !should_upload_log()? { return Ok(()); } diff --git a/app/buck2_cmd_debug_client/src/persist_event_logs.rs b/app/buck2_cmd_debug_client/src/persist_event_logs.rs index 30d4b18b0bda7..a7ba889136b2d 100644 --- a/app/buck2_cmd_debug_client/src/persist_event_logs.rs +++ b/app/buck2_cmd_debug_client/src/persist_event_logs.rs @@ -18,6 +18,7 @@ use buck2_common::chunk_reader::ChunkReader; use buck2_common::manifold; use buck2_common::manifold::ManifoldChunkedUploader; use buck2_common::manifold::ManifoldClient; +use buck2_common::upload_client::HttpUploadClient; use buck2_core::soft_error; use buck2_data::InstantEvent; use buck2_data::PersistEventLogSubprocess; @@ -63,6 +64,11 @@ pub struct PersistEventLogsCommand { local_path: String, #[clap(long, help = "If present, only write to disk and don't upload")] no_upload: bool, + #[clap( + long, + help = "If present, also upload the log to this HTTP endpoint" + )] + log_upload_url: Option, #[clap( long, help = "UUID of invocation that called this subcommand for logging purposes" @@ -122,7 +128,7 @@ impl PersistEventLogsCommand { } }; let write = write_task(&file, tx, stdin); - let upload = upload_task(&file, rx, self.manifold_name, self.no_upload); + let upload = upload_task(&file, rx, self.manifold_name, self.log_upload_url, self.no_upload); // Wait for both tasks to finish. If the upload fails we want to keep writing to disk let (write_result, upload_result) = tokio::join!(write, upload); @@ -175,6 +181,7 @@ async fn upload_task( file_mutex: &Mutex, mut rx: tokio::sync::mpsc::UnboundedReceiver, manifold_name: String, + log_upload_url: Option, no_upload: bool, ) -> buck2_error::Result<()> { if no_upload { @@ -218,6 +225,21 @@ async fn upload_task( // Last chunk to upload is smaller than &reader uploader.upload_chunk().await?; + if let Some(url) = log_upload_url { + let mut file = file_mutex.lock().await; + file.seek(io::SeekFrom::Start(0)) + .await + .buck_error_context("Failed to seek log file")?; + let mut buf = Vec::new(); + file.read_to_end(&mut buf) + .await + .buck_error_context("Failed to read log file")?; + drop(file); + + let client = HttpUploadClient::new(url).await?; + client.write(&manifold_name, buf.into()).await?; + } + Ok(()) } diff --git a/app/buck2_common/src/init.rs b/app/buck2_common/src/init.rs index db12bbd893545..737125f7fd229 100644 --- a/app/buck2_common/src/init.rs +++ b/app/buck2_common/src/init.rs @@ -525,6 +525,7 @@ pub struct DaemonStartupConfig { pub log_download_method: LogDownloadMethod, pub health_check_config: HealthCheckConfig, pub retained_event_logs: usize, + pub log_upload_url: Option, pub macos_qos_class: Option, } @@ -608,6 +609,12 @@ impl DaemonStartupConfig { }) .and_then(|s| s.parse::().ok()) .unwrap_or(DEFAULT_RETAINED_EVENT_LOGS), + log_upload_url: config + .get(BuckconfigKeyRef { + section: "buck2", + property: "log_upload_url", + }) + .map(ToOwned::to_owned), macos_qos_class: { let from_config = config .get(BuckconfigKeyRef { @@ -661,6 +668,7 @@ impl DaemonStartupConfig { }, health_check_config: HealthCheckConfig::default(), retained_event_logs: DEFAULT_RETAINED_EVENT_LOGS, + log_upload_url: None, macos_qos_class: None, } } diff --git a/app/buck2_common/src/lib.rs b/app/buck2_common/src/lib.rs index 264e987692d9d..857f81cbe320b 100644 --- a/app/buck2_common/src/lib.rs +++ b/app/buck2_common/src/lib.rs @@ -59,3 +59,4 @@ pub mod sqlite; pub mod starlark_profiler; pub mod target_aliases; pub mod temp_path; +pub mod upload_client; diff --git a/app/buck2_common/src/upload_client.rs b/app/buck2_common/src/upload_client.rs new file mode 100644 index 0000000000000..105c7ec3ec58e --- /dev/null +++ b/app/buck2_common/src/upload_client.rs @@ -0,0 +1,75 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * + * This source code is dual-licensed under either the MIT license found in the + * LICENSE-MIT file in the root directory of this source tree or the Apache + * License, Version 2.0 found in the LICENSE-APACHE file in the root directory + * of this source tree. You may select, at your option, one of the + * above-listed licenses. + */ + +use std::time::Duration; + +use buck2_http::HttpClient; +use buck2_http::HttpClientBuilder; +use buck2_http::retries::HttpError; +use buck2_http::retries::HttpErrorForRetry; +use buck2_http::retries::IntoBuck2Error; +use buck2_http::retries::http_retry; +use bytes::Bytes; +use futures::stream::BoxStream; +use futures::stream::StreamExt; +use hyper::Response; + +#[derive(Debug, buck2_error::Error)] +#[buck2(tag = Http)] +enum HttpWriteError { + #[error(transparent)] + Client(HttpError), +} + +impl HttpErrorForRetry for HttpWriteError { + fn is_retryable(&self) -> bool { + match self { + Self::Client(e) => e.is_retryable(), + } + } +} + +impl IntoBuck2Error for HttpWriteError { + fn into_buck2_error(self) -> buck2_error::Error { + buck2_error::Error::from(self) + } +} + +pub struct HttpUploadClient { + client: HttpClient, + base_url: String, +} + +impl HttpUploadClient { + pub async fn new(base_url: String) -> buck2_error::Result { + let client = HttpClientBuilder::oss().await?.build(); + Ok(Self { client, base_url }) + } + + pub async fn write(&self, path: &str, buf: Bytes) -> buck2_error::Result<()> { + let url = format!("{}/{}", self.base_url, path); + let res = http_retry( + || async { + self.client + .put(&url, buf.clone(), vec![]) + .await + .map_err(|e| HttpWriteError::Client(HttpError::Client(e))) + }, + vec![Duration::from_secs(1), Duration::from_secs(2)], + ) + .await?; + consume_response(res).await; + Ok(()) + } +} + +async fn consume_response<'a>(mut res: Response>>) { + while let Some(_chunk) = res.body_mut().next().await {} +} diff --git a/app/buck2_event_log/src/lib.rs b/app/buck2_event_log/src/lib.rs index 6a9661f750e4a..fc31a8dc866f3 100644 --- a/app/buck2_event_log/src/lib.rs +++ b/app/buck2_event_log/src/lib.rs @@ -31,9 +31,6 @@ pub mod write; pub mod writer; pub fn should_upload_log() -> buck2_error::Result { - if buck2_core::is_open_source() { - return Ok(false); - } Ok(!buck2_env!( "BUCK2_TEST_DISABLE_LOG_UPLOAD", bool, diff --git a/app/buck2_event_log/src/write.rs b/app/buck2_event_log/src/write.rs index 22d55e65fc899..746b9f48bf4e6 100644 --- a/app/buck2_event_log/src/write.rs +++ b/app/buck2_event_log/src/write.rs @@ -64,6 +64,7 @@ pub struct WriteEventLog { buf: Vec, log_size_counter_bytes: Option>, retained_event_logs: usize, + log_upload_url: Option, } impl WriteEventLog { @@ -77,6 +78,7 @@ impl WriteEventLog { start_time: SystemTime, log_size_counter_bytes: Option>, retained_event_logs: usize, + log_upload_url: Option, ) -> Self { Self { state: LogWriterState::Unopened { @@ -91,6 +93,7 @@ impl WriteEventLog { buf: Vec::new(), log_size_counter_bytes, retained_event_logs, + log_upload_url, } } @@ -180,6 +183,7 @@ impl WriteEventLog { path, event.trace_id()?.clone(), self.log_size_counter_bytes.clone(), + self.log_upload_url.clone(), ) .await?; let mut writers = vec![writer]; @@ -255,6 +259,7 @@ async fn start_persist_event_log_subprocess( path: EventLogPathBuf, trace_id: TraceId, bytes_written: Option>, + log_upload_url: Option, ) -> buck2_error::Result { let current_exe = std::env::current_exe().buck_error_context("No current_exe")?; let mut command = buck2_util::process::async_background_command(current_exe); @@ -273,6 +278,8 @@ async fn start_persist_event_log_subprocess( .args(["--trace-id", &trace_id.to_string()]); if !should_upload_log()? { command.arg("--no-upload"); + } else if let Some(url) = log_upload_url { + command.args(["--log-upload-url", &url]); }; command.stdout(Stdio::null()).stdin(Stdio::piped()); @@ -483,6 +490,7 @@ mod tests { log_size_counter_bytes: None, start_time: SystemTime::UNIX_EPOCH, retained_event_logs: 5, + log_upload_url: None, }) } }