Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions app/buck2_client_ctx/src/streaming.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,7 @@ fn get_event_log_subscriber<T: StreamingCommand>(
let user_event_log = cmd.user_event_log();

let logdir = paths.log_dir();
let daemon_startup_config = ctx.immediate_config.daemon_startup_config().ok();
let log = EventLog::new(
logdir,
ctx.working_dir.clone(),
Expand All @@ -312,10 +313,10 @@ fn get_event_log_subscriber<T: StreamingCommand>(
T::COMMAND_NAME.to_owned(),
ctx.start_time,
log_size_counter_bytes,
ctx.immediate_config
.daemon_startup_config()
.map(|daemon_startup_config| daemon_startup_config.retained_event_logs)
daemon_startup_config
.map(|c| c.retained_event_logs)
.unwrap_or(DEFAULT_RETAINED_EVENT_LOGS),
daemon_startup_config.and_then(|c| c.log_upload_url.clone()),
);
Box::new(log)
}
Expand Down
2 changes: 2 additions & 0 deletions app/buck2_client_ctx/src/subscribers/event_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ impl EventLog {
start_time: SystemTime,
log_size_counter_bytes: Option<Arc<AtomicU64>>,
retained_event_logs: usize,
log_upload_url: Option<String>,
) -> EventLog {
Self {
writer: WriteEventLog::new(
Expand All @@ -52,6 +53,7 @@ impl EventLog {
start_time,
log_size_counter_bytes,
retained_event_logs,
log_upload_url,
),
}
}
Expand Down
2 changes: 1 addition & 1 deletion app/buck2_client_ctx/src/subscribers/re_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ async fn log_upload_impl(
session_id: String,
isolation_dir: FileNameBuf,
) -> buck2_error::Result<()> {
if !should_upload_log()? {
if buck2_core::is_open_source() || !should_upload_log()? {
return Ok(());
}

Expand Down
24 changes: 23 additions & 1 deletion app/buck2_cmd_debug_client/src/persist_event_logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use buck2_common::chunk_reader::ChunkReader;
use buck2_common::manifold;
use buck2_common::manifold::ManifoldChunkedUploader;
use buck2_common::manifold::ManifoldClient;
use buck2_common::upload_client::HttpUploadClient;
use buck2_core::soft_error;
use buck2_data::InstantEvent;
use buck2_data::PersistEventLogSubprocess;
Expand Down Expand Up @@ -63,6 +64,11 @@ pub struct PersistEventLogsCommand {
local_path: String,
#[clap(long, help = "If present, only write to disk and don't upload")]
no_upload: bool,
#[clap(
long,
help = "If present, also upload the log to this HTTP endpoint"
)]
log_upload_url: Option<String>,
#[clap(
long,
help = "UUID of invocation that called this subcommand for logging purposes"
Expand Down Expand Up @@ -122,7 +128,7 @@ impl PersistEventLogsCommand {
}
};
let write = write_task(&file, tx, stdin);
let upload = upload_task(&file, rx, self.manifold_name, self.no_upload);
let upload = upload_task(&file, rx, self.manifold_name, self.log_upload_url, self.no_upload);

// Wait for both tasks to finish. If the upload fails we want to keep writing to disk
let (write_result, upload_result) = tokio::join!(write, upload);
Expand Down Expand Up @@ -175,6 +181,7 @@ async fn upload_task(
file_mutex: &Mutex<File>,
mut rx: tokio::sync::mpsc::UnboundedReceiver<u64>,
manifold_name: String,
log_upload_url: Option<String>,
no_upload: bool,
) -> buck2_error::Result<()> {
if no_upload {
Expand Down Expand Up @@ -218,6 +225,21 @@ async fn upload_task(
// Last chunk to upload is smaller than &reader
uploader.upload_chunk().await?;

if let Some(url) = log_upload_url {
let mut file = file_mutex.lock().await;
file.seek(io::SeekFrom::Start(0))
.await
.buck_error_context("Failed to seek log file")?;
let mut buf = Vec::new();
file.read_to_end(&mut buf)
.await
.buck_error_context("Failed to read log file")?;
drop(file);

let client = HttpUploadClient::new(url).await?;
client.write(&manifold_name, buf.into()).await?;
}

Ok(())
}

Expand Down
8 changes: 8 additions & 0 deletions app/buck2_common/src/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -525,6 +525,7 @@ pub struct DaemonStartupConfig {
pub log_download_method: LogDownloadMethod,
pub health_check_config: HealthCheckConfig,
pub retained_event_logs: usize,
pub log_upload_url: Option<String>,
pub macos_qos_class: Option<String>,
}

Expand Down Expand Up @@ -608,6 +609,12 @@ impl DaemonStartupConfig {
})
.and_then(|s| s.parse::<usize>().ok())
.unwrap_or(DEFAULT_RETAINED_EVENT_LOGS),
log_upload_url: config
.get(BuckconfigKeyRef {
section: "buck2",
property: "log_upload_url",
})
.map(ToOwned::to_owned),
macos_qos_class: {
let from_config = config
.get(BuckconfigKeyRef {
Expand Down Expand Up @@ -661,6 +668,7 @@ impl DaemonStartupConfig {
},
health_check_config: HealthCheckConfig::default(),
retained_event_logs: DEFAULT_RETAINED_EVENT_LOGS,
log_upload_url: None,
macos_qos_class: None,
}
}
Expand Down
1 change: 1 addition & 0 deletions app/buck2_common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,3 +59,4 @@ pub mod sqlite;
pub mod starlark_profiler;
pub mod target_aliases;
pub mod temp_path;
pub mod upload_client;
75 changes: 75 additions & 0 deletions app/buck2_common/src/upload_client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* This source code is dual-licensed under either the MIT license found in the
* LICENSE-MIT file in the root directory of this source tree or the Apache
* License, Version 2.0 found in the LICENSE-APACHE file in the root directory
* of this source tree. You may select, at your option, one of the
* above-listed licenses.
*/

use std::time::Duration;

use buck2_http::HttpClient;
use buck2_http::HttpClientBuilder;
use buck2_http::retries::HttpError;
use buck2_http::retries::HttpErrorForRetry;
use buck2_http::retries::IntoBuck2Error;
use buck2_http::retries::http_retry;
use bytes::Bytes;
use futures::stream::BoxStream;
use futures::stream::StreamExt;
use hyper::Response;

#[derive(Debug, buck2_error::Error)]
#[buck2(tag = Http)]
enum HttpWriteError {
#[error(transparent)]
Client(HttpError),
}

impl HttpErrorForRetry for HttpWriteError {
fn is_retryable(&self) -> bool {
match self {
Self::Client(e) => e.is_retryable(),
}
}
}

impl IntoBuck2Error for HttpWriteError {
fn into_buck2_error(self) -> buck2_error::Error {
buck2_error::Error::from(self)
}
}

pub struct HttpUploadClient {
client: HttpClient,
base_url: String,
}

impl HttpUploadClient {
pub async fn new(base_url: String) -> buck2_error::Result<Self> {
let client = HttpClientBuilder::oss().await?.build();
Ok(Self { client, base_url })
}

pub async fn write(&self, path: &str, buf: Bytes) -> buck2_error::Result<()> {
let url = format!("{}/{}", self.base_url, path);
let res = http_retry(
|| async {
self.client
.put(&url, buf.clone(), vec![])
.await
.map_err(|e| HttpWriteError::Client(HttpError::Client(e)))
},
vec![Duration::from_secs(1), Duration::from_secs(2)],
)
.await?;
consume_response(res).await;
Ok(())
}
}

async fn consume_response<'a>(mut res: Response<BoxStream<'a, hyper::Result<Bytes>>>) {
while let Some(_chunk) = res.body_mut().next().await {}
}
3 changes: 0 additions & 3 deletions app/buck2_event_log/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,6 @@ pub mod write;
pub mod writer;

pub fn should_upload_log() -> buck2_error::Result<bool> {
if buck2_core::is_open_source() {
return Ok(false);
}
Ok(!buck2_env!(
"BUCK2_TEST_DISABLE_LOG_UPLOAD",
bool,
Expand Down
8 changes: 8 additions & 0 deletions app/buck2_event_log/src/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ pub struct WriteEventLog {
buf: Vec<u8>,
log_size_counter_bytes: Option<Arc<AtomicU64>>,
retained_event_logs: usize,
log_upload_url: Option<String>,
}

impl WriteEventLog {
Expand All @@ -77,6 +78,7 @@ impl WriteEventLog {
start_time: SystemTime,
log_size_counter_bytes: Option<Arc<AtomicU64>>,
retained_event_logs: usize,
log_upload_url: Option<String>,
) -> Self {
Self {
state: LogWriterState::Unopened {
Expand All @@ -91,6 +93,7 @@ impl WriteEventLog {
buf: Vec::new(),
log_size_counter_bytes,
retained_event_logs,
log_upload_url,
}
}

Expand Down Expand Up @@ -180,6 +183,7 @@ impl WriteEventLog {
path,
event.trace_id()?.clone(),
self.log_size_counter_bytes.clone(),
self.log_upload_url.clone(),
)
.await?;
let mut writers = vec![writer];
Expand Down Expand Up @@ -255,6 +259,7 @@ async fn start_persist_event_log_subprocess(
path: EventLogPathBuf,
trace_id: TraceId,
bytes_written: Option<Arc<AtomicU64>>,
log_upload_url: Option<String>,
) -> buck2_error::Result<NamedEventLogWriter> {
let current_exe = std::env::current_exe().buck_error_context("No current_exe")?;
let mut command = buck2_util::process::async_background_command(current_exe);
Expand All @@ -273,6 +278,8 @@ async fn start_persist_event_log_subprocess(
.args(["--trace-id", &trace_id.to_string()]);
if !should_upload_log()? {
command.arg("--no-upload");
} else if let Some(url) = log_upload_url {
command.args(["--log-upload-url", &url]);
};
command.stdout(Stdio::null()).stdin(Stdio::piped());

Expand Down Expand Up @@ -483,6 +490,7 @@ mod tests {
log_size_counter_bytes: None,
start_time: SystemTime::UNIX_EPOCH,
retained_event_logs: 5,
log_upload_url: None,
})
}
}
Expand Down
Loading