fix(api-server): enforce max_num_results across all result output paths#2180
fix(api-server): enforce max_num_results across all result output paths#2180goynam wants to merge 1 commit intoy-scope:mainfrom
Conversation
Walkthrough
Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant Config
participant FileBackend
participant S3Backend
participant MongoDB
Client->>Config: read SearchJobConfig.max_num_results
Client->>FileBackend: fetch_results_from_file(search_job_id, max_num_results)
FileBackend-->>Client: stream (taken if max_num_results>0)
Client->>S3Backend: fetch_results_from_s3(search_job_id, max_num_results)
S3Backend-->>Client: stream (taken if max_num_results>0)
Client->>MongoDB: fetch_results_from_mongo(search_job_id, max_num_results)
MongoDB->>MongoDB: apply FindOptions.limit if max_num_results>0
MongoDB-->>Client: cursor stream (limited if applicable)
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes 🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
@kirkrodrigues , @junhaoliao , @LinZhihao-723 for awareness. |
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
components/api-server/src/client.rs (2)
365-368:⚠️ Potential issue | 🔴 CriticalUndefined variable
max_num_resultsin S3 stream — code will not compile.The stream at lines 427-432 references
max_num_results, but this variable is not in scope. The functionfetch_results_from_s3does not acceptmax_num_resultsas a parameter.🐛 Proposed fix — add parameter to function signature
async fn fetch_results_from_s3( &self, search_job_id: u64, + max_num_results: u32, ) -> Result<impl Stream<Item = Result<String, ClientError>> + use<>, ClientError> {Then update the call site at line 210:
- inner: self.fetch_results_from_s3(search_job_id).await?, + inner: self.fetch_results_from_s3(search_job_id, job_config.max_num_results).await?,Also applies to: 403-432
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@components/api-server/src/client.rs` around lines 365 - 368, The stream uses an out-of-scope variable max_num_results; update the function signature of fetch_results_from_s3 to accept a max_num_results: usize (or appropriate integer type) parameter, update any internal references to use that parameter (e.g., inside the stream closure that currently references max_num_results), and then update all call sites (such as the caller at the earlier invocation) to pass the desired max_num_results value; ensure the return type and error handling remain unchanged and adjust any tests or usages that call fetch_results_from_s3 accordingly.
204-213:⚠️ Potential issue | 🟠 MajorFile output path does not enforce
max_num_results.The PR description states the limit should be enforced for all three output paths, but the file path at line 207 does not pass or enforce
max_num_results. Thefetch_results_from_filemethod (lines 308-331) lacks any limiting logic, so file-based results will still return all results regardless of the configured maximum.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@components/api-server/src/client.rs` around lines 204 - 213, The file-output path doesn't enforce max_num_results: when job_config.write_to_file is true and StreamOutputStorage::Fs is chosen you construct SearchResultStream::File using fetch_results_from_file(search_job_id) which neither receives nor enforces job_config.max_num_results; update the call site to pass the max (e.g., fetch_results_from_file(search_job_id, job_config.max_num_results)) and change the fetch_results_from_file function signature and implementation to cap the returned results accordingly (or apply the limit when constructing SearchResultStream::File), referencing job_config.write_to_file, StreamOutputStorage::Fs, SearchResultStream::File, fetch_results_from_file, and the max_num_results parameter so all three output paths consistently honor the configured limit.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@components/api-server/src/client.rs`:
- Around line 216-218: In fetch_results, the variable max_num_results is
undefined; update the call to fetch_results_from_mongo(search_job_id,
max_num_results) to pass the correct value from the available job_config (i.e.,
use job_config.max_num_results) so the call becomes
fetch_results_from_mongo(search_job_id, job_config.max_num_results); ensure you
reference job_config in the fetch_results scope and retain mapping to
SearchResultStream::Mongo { inner: s }.
- Around line 473-480: The current FindOptions branch applies a
timestamp-descending sort only when max_num_results > 0, causing inconsistent
ordering between limited and unlimited queries; update the construction of
find_options (the mongodb::options::FindOptions used where max_num_results is
checked) so that the sort(mongodb::bson::doc! { "timestamp": -1 }) is always
applied and only set .limit(i64::from(max_num_results)) when max_num_results > 0
(i.e., build a FindOptions::builder() with the sort unconditionally, and
conditionally add the limit), ensuring consistent ordering for both limited and
unlimited queries.
---
Outside diff comments:
In `@components/api-server/src/client.rs`:
- Around line 365-368: The stream uses an out-of-scope variable max_num_results;
update the function signature of fetch_results_from_s3 to accept a
max_num_results: usize (or appropriate integer type) parameter, update any
internal references to use that parameter (e.g., inside the stream closure that
currently references max_num_results), and then update all call sites (such as
the caller at the earlier invocation) to pass the desired max_num_results value;
ensure the return type and error handling remain unchanged and adjust any tests
or usages that call fetch_results_from_s3 accordingly.
- Around line 204-213: The file-output path doesn't enforce max_num_results:
when job_config.write_to_file is true and StreamOutputStorage::Fs is chosen you
construct SearchResultStream::File using fetch_results_from_file(search_job_id)
which neither receives nor enforces job_config.max_num_results; update the call
site to pass the max (e.g., fetch_results_from_file(search_job_id,
job_config.max_num_results)) and change the fetch_results_from_file function
signature and implementation to cap the returned results accordingly (or apply
the limit when constructing SearchResultStream::File), referencing
job_config.write_to_file, StreamOutputStorage::Fs, SearchResultStream::File,
fetch_results_from_file, and the max_num_results parameter so all three output
paths consistently honor the configured limit.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: e40d351a-f6e6-4bcc-b88f-3ab1c8a3f2d1
📒 Files selected for processing (1)
components/api-server/src/client.rs
| let find_options = if max_num_results > 0 { | ||
| mongodb::options::FindOptions::builder() | ||
| .sort(mongodb::bson::doc! { "timestamp": -1 }) | ||
| .limit(i64::from(max_num_results)) | ||
| .build() | ||
| } else { | ||
| mongodb::options::FindOptions::default() | ||
| }; |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial
Result ordering differs between limited and unlimited queries.
When max_num_results > 0, results are sorted by timestamp descending (newest first). When max_num_results == 0 (unlimited), no sort is applied, so results follow MongoDB's natural/insertion order. This inconsistency may surprise consumers who expect the same ordering regardless of the limit.
Consider applying the same sort for both cases to ensure consistent behaviour:
♻️ Suggested refactor for consistent ordering
let find_options = if max_num_results > 0 {
mongodb::options::FindOptions::builder()
.sort(mongodb::bson::doc! { "timestamp": -1 })
.limit(i64::from(max_num_results))
.build()
} else {
- mongodb::options::FindOptions::default()
+ mongodb::options::FindOptions::builder()
+ .sort(mongodb::bson::doc! { "timestamp": -1 })
+ .build()
};📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| let find_options = if max_num_results > 0 { | |
| mongodb::options::FindOptions::builder() | |
| .sort(mongodb::bson::doc! { "timestamp": -1 }) | |
| .limit(i64::from(max_num_results)) | |
| .build() | |
| } else { | |
| mongodb::options::FindOptions::default() | |
| }; | |
| let find_options = if max_num_results > 0 { | |
| mongodb::options::FindOptions::builder() | |
| .sort(mongodb::bson::doc! { "timestamp": -1 }) | |
| .limit(i64::from(max_num_results)) | |
| .build() | |
| } else { | |
| mongodb::options::FindOptions::builder() | |
| .sort(mongodb::bson::doc! { "timestamp": -1 }) | |
| .build() | |
| }; |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@components/api-server/src/client.rs` around lines 473 - 480, The current
FindOptions branch applies a timestamp-descending sort only when max_num_results
> 0, causing inconsistent ordering between limited and unlimited queries; update
the construction of find_options (the mongodb::options::FindOptions used where
max_num_results is checked) so that the sort(mongodb::bson::doc! { "timestamp":
-1 }) is always applied and only set .limit(i64::from(max_num_results)) when
max_num_results > 0 (i.e., build a FindOptions::builder() with the sort
unconditionally, and conditionally add the limit), ensuring consistent ordering
for both limited and unlimited queries.
|
Hi @goynam , thank you for submitting this PR. We're aware of this issue -- it's currently tracked in #1858. That said, truncating the number of results in the API server alone wouldn't prevent query workers from submitting a full result set, so a more comprehensive fix will be needed in the future. Do you have a specific motivation for applying this quick patch now? |
Hi @hoophalab , I was working on a canary monitoring system that periodically queries an API server (at every tk interval) to verify its operational status. The canary’s primary role is to detect issues and trigger alerts when anomalies are detected. During validation testing, I configured max_num_results = to limit the number of responses for simplicity. However, the API server consistently returns more results than this limit allows, due to which i tried to give this a go. But I’m happy to wait for a more comprehensive fix if one is planned for future updates. |
| } | ||
|
|
||
| self.fetch_results_from_mongo(search_job_id) | ||
| self.fetch_results_from_mongo(search_job_id, max_num_results) |
There was a problem hiding this comment.
| self.fetch_results_from_mongo(search_job_id, max_num_results) | |
| self.fetch_results_from_mongo(search_job_id, job_config.max_num_results) |
This doesn't compile
components/api-server/src/client.rs
Outdated
There was a problem hiding this comment.
We probably also need to patch fetch_results_from_file
| mongodb::options::FindOptions::builder() | ||
| .sort(mongodb::bson::doc! { "timestamp": -1 }) | ||
| .limit(i64::from(max_num_results)) | ||
| .build() |
There was a problem hiding this comment.
| mongodb::options::FindOptions::builder() | |
| .sort(mongodb::bson::doc! { "timestamp": -1 }) | |
| .limit(i64::from(max_num_results)) | |
| .build() | |
| mongodb::options::FindOptions::builder() | |
| .limit(i64::from(max_num_results)) | |
| .build() |
Sort sounds like an expensive operation. S3 also doesn't support sort. Can we drop this sort for consistency?
components/api-server/src/client.rs
Outdated
| Ok(stream! { | ||
| while let Some(object_page) = object_pages.next().await { | ||
| let mut num_results: u32 = 0; |
There was a problem hiding this comment.
- Instead of counting manually, we could utilize StreamExt::take
How about:
let stream = stream! { ... };
Ok(stream.take(max_num_results))
max_num_resultsisn't passed as an argument of this function- checking
max_num_results == 0is unnecessarily
| async fn fetch_results_from_mongo( | ||
| &self, | ||
| search_job_id: u64, | ||
| max_num_results: u32, |
There was a problem hiding this comment.
We need to update the docstring of this function
The max_num_results parameter was only enforced in the results-cache (MongoDB) output path at the worker level. The file and S3 output paths had no limit, causing all results to be returned regardless of the configured max. Apply the limit in the API server when streaming results back: - File path: counter with labeled break in the stream - S3 path: same counter pattern - MongoDB path: use find() with sort and limit options A value of 0 continues to mean unlimited (no change in behavior).
b25963a to
9b2ab87
Compare
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
components/api-server/src/client.rs (1)
409-440:⚠️ Potential issue | 🟠 MajorS3 still downloads the full object before the limit can bite.
take()caps the number of yielded items, but each touched object is still fully fetched and buffered viaobj.body.collect().await?before the first record from that object is yielded. Withmax_num_results = 1, a large shard can still incur the full S3 download and memory cost, which undercuts the canary use-case. Please switch this path to incremental decoding so the read can stop as soon as the limit is reached.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@components/api-server/src/client.rs` around lines 409 - 440, The current stream implementation fetches entire S3 object bodies via obj.body.collect().await? before deserializing which defeats stream.take(max_num_results) — change to incremental decoding by converting the response body into an async reader (e.g., obj.body.into_async_read() or tokio_util::io::StreamReader) and feed that reader into rmp_serde::Deserializer (use from_read or from_read_ref variant that accepts an AsyncRead adapter) and deserialize events one-by-one inside the yielded loop so you can stop reading as soon as the outer stream reaches max_num_results; update the loop inside the stream! block where object_pages, obj.body.collect().await?, Deserialize::deserialize and deserializer are used to perform incremental reads and ensure the stream.take(max_num_results) actually prevents further S3 reads.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@components/api-server/src/client.rs`:
- Around line 203-217: The API currently conflates "omitted" and "zero" for
max_num_results so callers can't request an unlimited stream; change the
representation to Option<u32> for job_config.max_num_results (and the incoming
request type used by submit_query) so None = omitted (apply
default_max_num_query_results later) and Some(0) = explicit "no limit"; update
submit_query to stop rewriting 0 into default_max_num_query_results and to
forward Option<u32> as-is, and update fetch_results / fetch_results_from_mongo /
fetch_results_from_file /fetch_results_from_s3 call sites to accept Option<u32>
and treat Some(0) as unbounded while treating None as "use default" where
appropriate.
---
Outside diff comments:
In `@components/api-server/src/client.rs`:
- Around line 409-440: The current stream implementation fetches entire S3
object bodies via obj.body.collect().await? before deserializing which defeats
stream.take(max_num_results) — change to incremental decoding by converting the
response body into an async reader (e.g., obj.body.into_async_read() or
tokio_util::io::StreamReader) and feed that reader into rmp_serde::Deserializer
(use from_read or from_read_ref variant that accepts an AsyncRead adapter) and
deserialize events one-by-one inside the yielded loop so you can stop reading as
soon as the outer stream reaches max_num_results; update the loop inside the
stream! block where object_pages, obj.body.collect().await?,
Deserialize::deserialize and deserializer are used to perform incremental reads
and ensure the stream.take(max_num_results) actually prevents further S3 reads.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: 7ea73f67-21b6-4e49-95ee-c4a8e61b32d2
📒 Files selected for processing (1)
components/api-server/src/client.rs
| let max_num_results = job_config.max_num_results; | ||
|
|
||
| if job_config.write_to_file { | ||
| let stream = match &self.config.stream_output.storage { | ||
| StreamOutputStorage::Fs { .. } => SearchResultStream::File { | ||
| inner: self.fetch_results_from_file(search_job_id), | ||
| inner: self.fetch_results_from_file(search_job_id, max_num_results), | ||
| }, | ||
| StreamOutputStorage::S3 { .. } => SearchResultStream::S3 { | ||
| inner: self.fetch_results_from_s3(search_job_id).await?, | ||
| inner: self.fetch_results_from_s3(search_job_id, max_num_results).await?, | ||
| }, | ||
| }; | ||
| return Ok(stream); | ||
| } | ||
|
|
||
| self.fetch_results_from_mongo(search_job_id) | ||
| self.fetch_results_from_mongo(search_job_id, max_num_results) |
There was a problem hiding this comment.
0 still cannot produce an unlimited stream for API-submitted jobs.
fetch_results now treats job_config.max_num_results == 0 as unbounded, but submit_query still rewrites an incoming 0 to default_max_num_query_results on Lines 134-137. In practice, callers of this API still cannot request “no limit”, so the docs on Line 32 and these new 0 branches do not hold end-to-end. This needs a representation that distinguishes “omitted” from 0 (for example, Option<u32>).
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@components/api-server/src/client.rs` around lines 203 - 217, The API
currently conflates "omitted" and "zero" for max_num_results so callers can't
request an unlimited stream; change the representation to Option<u32> for
job_config.max_num_results (and the incoming request type used by submit_query)
so None = omitted (apply default_max_num_query_results later) and Some(0) =
explicit "no limit"; update submit_query to stop rewriting 0 into
default_max_num_query_results and to forward Option<u32> as-is, and update
fetch_results / fetch_results_from_mongo / fetch_results_from_file
/fetch_results_from_s3 call sites to accept Option<u32> and treat Some(0) as
unbounded while treating None as "use default" where appropriate.
The max_num_results parameter was only enforced in the results-cache (MongoDB) output path at the worker level. The file and S3 output paths had no limit applied, so all results were returned regardless of the configured max.
This applies the limit in the API server when streaming results back to the client across all three paths (file, S3, MongoDB). A value of 0 continues to mean unlimited.
Summary by CodeRabbit