Skip to content

fix(package): Prevent clp_config column truncation and handle corrupted jobs gracefully (fixes 2151).#2178

Open
goynam wants to merge 4 commits intoy-scope:mainfrom
goynam:fix/compression-job-config-truncation
Open

fix(package): Prevent clp_config column truncation and handle corrupted jobs gracefully (fixes 2151).#2178
goynam wants to merge 4 commits intoy-scope:mainfrom
goynam:fix/compression-job-config-truncation

Conversation

@goynam
Copy link
Copy Markdown

@goynam goynam commented Apr 6, 2026

Fixes brotli decoder failure caused by VARBINARY(60000) silently truncating compressed job configs. Also makes the scheduler resilient to individual job failures.

Summary by CodeRabbit

  • Bug Fixes

    • Corrupted or unreadable job configurations are now isolated, logged, and marked failed without affecting other compression jobs.
    • Improved resilience across supported input types to reduce cascading failures.
  • Chores

    • Increased storage capacity for orchestration metadata payloads to better support larger job/config data.
  • Refactor

    • Scheduling flow reorganized for clearer, more reliable task batching and submission.

@goynam goynam requested a review from a team as a code owner April 6, 2026 05:08
@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai bot commented Apr 6, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review

Walkthrough

Database DDL changes three binary columns from VARBINARY(60000) to MEDIUMBLOB. The compression scheduler extracts per-job logic into _schedule_job(...), adds deserialization error handling that marks corrupted job configs as FAILED, and continues processing remaining pending jobs.

Changes

Cohort / File(s) Summary
Database Schema Migration
components/clp-py-utils/clp_py_utils/initialize-orchestration-db.py
Updated MySQL DDL: changed clp_config, clp_paths_to_compress, and job_config column types from VARBINARY(60000) NOT NULL to MEDIUMBLOB NOT NULL. No other DDL, constraints, or indexes modified.
Compression Scheduler Refactor & Error Handling
components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py
Moved per-job scheduling logic into new _schedule_job(...). Added try/except around brotli+msgpack deserialization to log errors, mark job FAILED with a fixed status_msg, and return early for that job. Refactored input-type branches (FS, S3, S3_OBJECT_METADATA, unsupported) and moved buffer flush / dataset-ensure outside branches; tasks are batched and submitted once via _batch_and_submit_tasks(...).

Sequence Diagram(s)

sequenceDiagram
    participant Scheduler
    participant DB as Database
    participant Deserializer as ClpConfigDeserializer
    participant TaskMgr as TaskManager

    Scheduler->>DB: fetch pending jobs
    loop for each job
        Scheduler->>Scheduler: call _schedule_job(job_row)
        Scheduler->>Deserializer: brotli.decompress + msgpack.unpackb(clp_config)
        alt deserialization succeeds
            Deserializer-->>Scheduler: ClpConfig
            Scheduler->>TaskMgr: validate inputs / ensure dataset
            TaskMgr-->>Scheduler: dataset exists/created
            Scheduler->>TaskMgr: flush buffers, batch & submit tasks
            TaskMgr-->>DB: persist tasks/metadata
            Scheduler->>DB: update job metadata (e.g., queued/started)
        else deserialization fails
            Deserializer-->>Scheduler: error
            Scheduler->>DB: mark job FAILED, set status_msg
            Scheduler-->>Scheduler: return from _schedule_job (continue loop)
        end
    end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 75.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately reflects both main changes: expanding VARBINARY column to MEDIUMBLOB and adding graceful job failure handling.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In
`@components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py`:
- Around line 312-327: The query scheduler loop still does a bare
msgpack.unpackb(job["job_config"]) which can raise on corrupted/truncated data
and halt processing; add the same per-job try/except guard used in
compression_scheduler.py: wrap the unpack/deserialize of job["job_config"] in
try/except Exception, log the exception with logger.exception including the job
id, call the query-job metadata updater (e.g. update_query_job_metadata or the
existing job metadata update function used by query_scheduler) to set the job to
FAILED with a clear status_msg about corrupted/truncated config, and continue
the loop so subsequent query jobs are processed.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

Run ID: c4b0df4f-119d-4998-b7a1-1df2f98341c9

📥 Commits

Reviewing files that changed from the base of the PR and between 3b4d13f and 513d846.

📒 Files selected for processing (2)
  • components/clp-py-utils/clp_py_utils/initialize-orchestration-db.py
  • components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py

Comment on lines +312 to +327
try:
clp_io_config = ClpIoConfig.model_validate(
msgpack.unpackb(brotli.decompress(job_row["clp_config"]))
)
except Exception:
logger.exception("Failed to decompress clp_config for job %s", job_id)
update_compression_job_metadata(
db_context,
job_id,
{
"status": CompressionJobStatus.FAILED,
"status_msg": "Failed to decompress job config. The config data may be"
" corrupted or truncated.",
},
)
continue
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot Apr 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Mirror this per-job deserialization guard in query_scheduler.py.

components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py:690-710 still does a bare msgpack.unpackb(job["job_config"]). One corrupt or truncated query job can still prevent later query jobs from being processed, so the resilience fix is incomplete until that loop also marks the bad job FAILED and continues.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py`
around lines 312 - 327, The query scheduler loop still does a bare
msgpack.unpackb(job["job_config"]) which can raise on corrupted/truncated data
and halt processing; add the same per-job try/except guard used in
compression_scheduler.py: wrap the unpack/deserialize of job["job_config"] in
try/except Exception, log the exception with logger.exception including the job
id, call the query-job metadata updater (e.g. update_query_job_metadata or the
existing job metadata update function used by query_scheduler) to set the job to
FAILED with a clear status_msg about corrupted/truncated config, and continue
the loop so subsequent query jobs are processed.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed this would be a good improvement. However, the query scheduler's job_config uses plain msgpack (no brotli compression) and is written by the Rust API server via rmp_serde, so it's not affected by the VARBINARY truncation issue that triggered this PR. The MEDIUMBLOB schema change here also prevents future truncation for that column. Additionally, query configs are inherently small (just a query string, timestamps, and a few flags), so hitting the old 60KB limit was extremely unlikely in practice.

I'd prefer to keep this PR focused on the compression scheduler fix to keep the review scope tight.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@goynam, that's a well-reasoned scoping decision. The distinction makes sense — since job_config in the query scheduler is plain msgpack without brotli compression, it's not subject to the same decompression failure vector, and the MEDIUMBLOB change here already addresses the truncation risk going forward. Happy to leave this for a future PR.

Would you like me to open a follow-up issue to track adding the resilience guard (try/except around deserialization) to the query scheduler loop, so it isn't lost?


✏️ Learnings added
Learnt from: goynam
Repo: y-scope/clp PR: 2178
File: components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py:0-0
Timestamp: 2026-04-13T18:49:55.259Z
Learning: In the query scheduler (`components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py`), `job["job_config"]` is serialized as plain msgpack (no brotli compression), written by the Rust API server via `rmp_serde`. It is not subject to brotli decompression failures. Query configs are also inherently small (query string, timestamps, a few flags), making VARBINARY(60000) truncation extremely unlikely in practice. The MEDIUMBLOB schema change in PR `#2178` further prevents future truncation for that column.

Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.


🧠 Learnings used
Learnt from: haiqi96
Repo: y-scope/clp PR: 651
File: components/clp-package-utils/clp_package_utils/scripts/compress.py:0-0
Timestamp: 2025-01-16T16:58:43.190Z
Learning: In the clp-package compression flow, path validation and error handling is performed at the scheduler level rather than in the compress.py script to maintain simplicity and avoid code duplication.

Learnt from: LinZhihao-723
Repo: y-scope/clp PR: 1340
File: components/job-orchestration/job_orchestration/executor/compress/compression_task.py:528-528
Timestamp: 2025-09-28T15:00:22.170Z
Learning: In components/job-orchestration/job_orchestration/executor/compress/compression_task.py, there is a suggestion to refactor from passing logger as a parameter through multiple functions to creating a ClpCompressor class that takes the logger as a class member, with current helper functions becoming private member functions.

Learnt from: gibber9809
Repo: y-scope/clp PR: 1169
File: components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py:463-469
Timestamp: 2025-09-19T18:28:26.747Z
Learning: In the compression scheduler (components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py), the SIGTERM handling uses a coarse-grained approach that only checks received_sigterm before calling search_and_schedule_new_tasks. The maintainers consider the race condition where SIGTERM arrives mid-execution to be benign, as scheduling a few extra tasks during shutdown is acceptable and will be handled by cleanup mechanisms.

Learnt from: quinntaylormitchell
Repo: y-scope/clp PR: 1125
File: components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py:267-291
Timestamp: 2025-09-15T22:20:40.750Z
Learning: For CLP compression jobs, the team has decided to fail the entire job immediately upon encountering any invalid input path, rather than continuing to process valid paths. This decision was made during PR `#1125` development.

Learnt from: junhaoliao
Repo: y-scope/clp PR: 1152
File: components/clp-package-utils/clp_package_utils/scripts/start_clp.py:613-613
Timestamp: 2025-08-08T06:59:42.436Z
Learning: In components/clp-package-utils/clp_package_utils/scripts/start_clp.py, generic_start_scheduler sets CLP_LOGGING_LEVEL using clp_config.query_scheduler.logging_level for both schedulers; compression scheduler should use its own logging level. Tracking via an issue created from PR `#1152` discussion.

Learnt from: haiqi96
Repo: y-scope/clp PR: 569
File: components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py:380-392
Timestamp: 2024-11-15T20:07:22.256Z
Learning: The current implementation assumes single-threaded execution, so race conditions in functions like `is_target_extracted` in `components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py` are not currently a concern.

Learnt from: Bill-hbrhbr
Repo: y-scope/clp PR: 1122
File: components/core/src/clp/clp/CMakeLists.txt:175-195
Timestamp: 2025-07-23T09:54:45.185Z
Learning: In the CLP project, when reviewing CMakeLists.txt changes that introduce new compression library dependencies (BZip2, LibLZMA, LZ4, ZLIB), the team prefers to address conditional linking improvements in separate PRs rather than expanding the scope of focused migration PRs like the LibArchive task-based installation migration.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@CodeRabbit Please open an issue about this.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
gh issue create \
  --repo y-scope/clp \
  --title "fix(scheduler): Add deserialization resilience guard to query scheduler loop" \
  --body "## Summary

In \`components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py\`, the loop that processes pending query jobs performs a bare \`msgpack.unpackb(job[\"job_config\"])\` without any error handling. A corrupted or otherwise undeserializable job config could raise an exception and halt processing of all subsequent pending query jobs.

## Suggested Fix

Wrap the \`msgpack.unpackb(job[\"job_config\"])\` call in a \`try/except\` block (similar to the guard added for the compression scheduler in PR `#2178`). On failure:
- Log the exception with \`logger.exception\`, including the job ID.
- Update the query job's status to \`FAILED\` with a clear \`status_msg\` indicating the config is corrupted or undeserializable.
- \`continue\` the loop so subsequent query jobs are still processed.

## Context

- PR `#2178` added this resilience pattern to the compression scheduler (around \`brotli.decompress\` + \`msgpack.unpackb\` of \`clp_config\`).
- The query scheduler's \`job_config\` is plain msgpack (no brotli compression), written by the Rust API server via \`rmp_serde\`, so it is not affected by the VARBINARY truncation issue that triggered PR `#2178`. The MEDIUMBLOB schema change in PR `#2178` further prevents future truncation for that column.
- This issue was raised during the review of PR `#2178`: https://github.com/y-scope/clp/pull/2178#discussion_r3038022880
- Requested by `@sitaowang1998`." \
  --assignee sitaowang1998

Length of output: 42


@sitaowang1998, done! I've opened #2200 to track adding the deserialization resilience guard to the query scheduler loop. It's assigned to you and includes the relevant context from this PR's discussion.


🧠 Learnings used
Learnt from: haiqi96
Repo: y-scope/clp PR: 651
File: components/clp-package-utils/clp_package_utils/scripts/compress.py:0-0
Timestamp: 2025-01-16T16:58:43.190Z
Learning: In the clp-package compression flow, path validation and error handling is performed at the scheduler level rather than in the compress.py script to maintain simplicity and avoid code duplication.

Learnt from: LinZhihao-723
Repo: y-scope/clp PR: 1340
File: components/job-orchestration/job_orchestration/executor/compress/compression_task.py:528-528
Timestamp: 2025-09-28T15:00:22.170Z
Learning: In components/job-orchestration/job_orchestration/executor/compress/compression_task.py, there is a suggestion to refactor from passing logger as a parameter through multiple functions to creating a ClpCompressor class that takes the logger as a class member, with current helper functions becoming private member functions.

Learnt from: goynam
Repo: y-scope/clp PR: 2178
File: components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py:0-0
Timestamp: 2026-04-13T18:49:55.259Z
Learning: In the query scheduler (`components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py`), `job["job_config"]` is serialized as plain msgpack (no brotli compression), written by the Rust API server via `rmp_serde`. It is not subject to brotli decompression failures. Query configs are also inherently small (query string, timestamps, a few flags), making VARBINARY(60000) truncation extremely unlikely in practice. The MEDIUMBLOB schema change in PR `#2178` further prevents future truncation for that column.

Learnt from: gibber9809
Repo: y-scope/clp PR: 1169
File: components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py:463-469
Timestamp: 2025-09-19T18:28:26.747Z
Learning: In the compression scheduler (components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py), the SIGTERM handling uses a coarse-grained approach that only checks received_sigterm before calling search_and_schedule_new_tasks. The maintainers consider the race condition where SIGTERM arrives mid-execution to be benign, as scheduling a few extra tasks during shutdown is acceptable and will be handled by cleanup mechanisms.

Learnt from: quinntaylormitchell
Repo: y-scope/clp PR: 1125
File: components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py:267-291
Timestamp: 2025-09-15T22:20:40.750Z
Learning: For CLP compression jobs, the team has decided to fail the entire job immediately upon encountering any invalid input path, rather than continuing to process valid paths. This decision was made during PR `#1125` development.

Learnt from: junhaoliao
Repo: y-scope/clp PR: 1152
File: components/clp-package-utils/clp_package_utils/scripts/start_clp.py:613-613
Timestamp: 2025-08-08T06:59:42.436Z
Learning: In components/clp-package-utils/clp_package_utils/scripts/start_clp.py, generic_start_scheduler sets CLP_LOGGING_LEVEL using clp_config.query_scheduler.logging_level for both schedulers; compression scheduler should use its own logging level. Tracking via an issue created from PR `#1152` discussion.

Learnt from: junhaoliao
Repo: y-scope/clp PR: 1178
File: components/clp-package-utils/clp_package_utils/controller.py:217-223
Timestamp: 2025-09-25T05:13:13.298Z
Learning: The compression scheduler service in CLP runs with CLP_UID_GID (current user's UID:GID) rather than CLP_SERVICE_CONTAINER_UID_GID (999:999), unlike infrastructure services such as database, queue, redis, and results cache which run with the service container UID:GID.

Learnt from: Bill-hbrhbr
Repo: y-scope/clp PR: 1122
File: components/core/src/clp/clp/CMakeLists.txt:175-195
Timestamp: 2025-07-23T09:54:45.185Z
Learning: In the CLP project, when reviewing CMakeLists.txt changes that introduce new compression library dependencies (BZip2, LibLZMA, LZ4, ZLIB), the team prefers to address conditional linking improvements in separate PRs rather than expanding the scope of focused migration PRs like the LibArchive task-based installation migration.

Learnt from: Bill-hbrhbr
Repo: y-scope/clp PR: 1156
File: components/core/CMakeLists.txt:772-772
Timestamp: 2025-08-09T04:07:27.083Z
Learning: In the CLP project's CMakeLists.txt, when reviewing changes related to the ${zstd_TARGET} variable usage in test linking, the team is planning a refactoring PR to improve this mechanism. Guards for undefined target variables should be deferred to that separate PR rather than being added in focused dependency migration PRs.

Learnt from: haiqi96
Repo: y-scope/clp PR: 569
File: components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py:380-392
Timestamp: 2024-11-15T20:07:22.256Z
Learning: The current implementation assumes single-threaded execution, so race conditions in functions like `is_target_extracted` in `components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py` are not currently a concern.

Learnt from: junhaoliao
Repo: y-scope/clp PR: 0
File: :0-0
Timestamp: 2025-11-17T22:58:50.056Z
Learning: In the y-scope/clp repository, when enabling new linting tools (ruff, mypy) on Python components, the team uses an incremental approach: first enable the tools with errors allowed (exit code 0), apply only safe auto-fixable fixes, then address remaining issues in follow-up PRs. During the initial enablement PR, reviews should focus on correctness of auto-fixes rather than flagging new code quality issues.

Learnt from: junhaoliao
Repo: y-scope/clp PR: 939
File: components/package-template/src/etc/clp-config.yml:64-64
Timestamp: 2025-06-24T20:13:46.758Z
Learning: When users ask CodeRabbit to create an issue after providing suggestions, they want a GitHub issue created with the high-level requirements and context, not specific code implementations.

Copy link
Copy Markdown
Contributor

@sitaowang1998 sitaowang1998 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The db schema change looks good.

The loop with a lot of continue is hard to reason about. How about extract the loop body into a helper function?

- Change VARBINARY(60000) to MEDIUMBLOB for clp_config, clp_paths_to_compress,
  and job_config columns to prevent silent data truncation.
- Wrap per-job deserialization in try/except so a corrupted job is marked FAILED
  and the scheduler continues processing remaining jobs.
- Change error-handling return statements to continue so one failed job doesn't
  block processing of other pending jobs.
@goynam goynam force-pushed the fix/compression-job-config-truncation branch from ca63786 to 904db82 Compare April 9, 2026 18:07
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
components/clp-py-utils/clp_py_utils/initialize-orchestration-db.py (1)

62-84: ⚠️ Potential issue | 🟠 Major

Existing deployments will not benefit from this schema change.

CREATE TABLE IF NOT EXISTS only creates new tables; existing tables with VARBINARY(60000) columns will remain unchanged and continue to experience truncation. Consider providing an ALTER TABLE migration script for existing deployments, or documenting that users must manually migrate their schema.

Example migration statements:

ALTER TABLE compression_jobs MODIFY clp_config MEDIUMBLOB NOT NULL;
ALTER TABLE compression_tasks MODIFY clp_paths_to_compress MEDIUMBLOB NOT NULL;
ALTER TABLE query_jobs MODIFY job_config MEDIUMBLOB NOT NULL;
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@components/clp-py-utils/clp_py_utils/initialize-orchestration-db.py` around
lines 62 - 84, Existing deployments won't get the new MEDIUMBLOB columns because
CREATE TABLE IF NOT EXISTS only affects new tables; add a migration step that
alters existing tables to change the relevant columns to MEDIUMBLOB (for example
alter COMPRESSION_JOBS_TABLE_NAME.clp_config,
compression_tasks.clp_paths_to_compress, and query_jobs.job_config) and ensure
the migration runs during initialization or is documented for users; implement
this as an ALTER TABLE migration script invoked from the initialization flow (or
provide documentation and a script file) so existing rows/columns are converted
to MEDIUMBLOB without truncation.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In
`@components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py`:
- Around line 461-464: The file compression_scheduler.py contains three
consecutive blank lines between top-level definitions which violates PEP 8; open
components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py
and remove the extra blank line so there are at most two blank lines separating
the surrounding top-level definitions (e.g., between the enclosing class or
function near the blank region and the next top-level function/class in
compression_scheduler.py).
- Line 431: Replace the f-string in the logger call with lazy formatting:
instead of logger.error(f"Unsupported input type {input_type}") use
logger.error("Unsupported input type %s", input_type) so the string is only
formatted when the error level is enabled; update the logger.error call in
compression_scheduler.py (same occurrence referencing input_type) to use the
"%s" placeholder form.

---

Outside diff comments:
In `@components/clp-py-utils/clp_py_utils/initialize-orchestration-db.py`:
- Around line 62-84: Existing deployments won't get the new MEDIUMBLOB columns
because CREATE TABLE IF NOT EXISTS only affects new tables; add a migration step
that alters existing tables to change the relevant columns to MEDIUMBLOB (for
example alter COMPRESSION_JOBS_TABLE_NAME.clp_config,
compression_tasks.clp_paths_to_compress, and query_jobs.job_config) and ensure
the migration runs during initialization or is documented for users; implement
this as an ALTER TABLE migration script invoked from the initialization flow (or
provide documentation and a script file) so existing rows/columns are converted
to MEDIUMBLOB without truncation.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

Run ID: 5dd88b1a-54bc-423d-a359-78a2a2a569f6

📥 Commits

Reviewing files that changed from the base of the PR and between 513d846 and 904db82.

📒 Files selected for processing (2)
  • components/clp-py-utils/clp_py_utils/initialize-orchestration-db.py
  • components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py

@goynam
Copy link
Copy Markdown
Author

goynam commented Apr 9, 2026

The db schema change looks good.

The loop with a lot of continue is hard to reason about. How about extract the loop body into a helper function?

Good call — extracted the loop body into a _schedule_job helper. The continue statements are now early returns, and search_and_schedule_new_tasks just iterates and delegates. Same behavior, easier to follow.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In
`@components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py`:
- Around line 470-472: Remove the stray duplicate function header "def
poll_running_jobs(" that appears before the real definition; keep the valid
function declaration for poll_running_jobs (the actual implementation that
begins later) and delete the extra incomplete line so the file parses correctly
and the poll_running_jobs function remains defined only once.
- Around line 461-467: The duplicate call to _batch_and_submit_tasks should be
removed because it re-enters the helper immediately after the first submission
and can overwrite scheduled_jobs[job_id] or double-queue work; locate the two
consecutive calls to _batch_and_submit_tasks (the one that processes
paths_to_compress_buffer, updates the job row, inserts task rows and submits
work) and delete the second invocation so only a single submission occurs per
batch, ensuring the original handle in scheduled_jobs[job_id] is preserved and
task rows are not duplicated.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

Run ID: 3419d495-770f-4cf6-8dcc-864c0bbf638f

📥 Commits

Reviewing files that changed from the base of the PR and between 904db82 and 4aa77a1.

📒 Files selected for processing (1)
  • components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py

Copy link
Copy Markdown
Contributor

@sitaowang1998 sitaowang1998 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nitpick

@sitaowang1998 sitaowang1998 changed the title fix: prevent clp_config truncation and handle corrupted jobs gracefully fix(package): Prevent clp_config column truncation and handle corrupted jobs gracefully (fixes 2151). Apr 10, 2026
@goynam goynam force-pushed the fix/compression-job-config-truncation branch from 4aa77a1 to 467feee Compare April 13, 2026 11:28
Copy link
Copy Markdown
Contributor

@sitaowang1998 sitaowang1998 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a few more to satisfy formatter.

@goynam goynam force-pushed the fix/compression-job-config-truncation branch 2 times, most recently from 21c2914 to b050de8 Compare April 13, 2026 18:20
sitaowang1998
sitaowang1998 previously approved these changes Apr 13, 2026
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In
`@components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py`:
- Around line 375-385: The current code in _schedule_job calls
_write_user_failure_log and raises RuntimeError when user_log_relative_path is
None, which aborts scheduling of subsequent jobs; instead, catch the
failed-write case, mark the current job as FAILED (use the existing job state
update logic for job_id), attach a fallback failure message indicating the user
log couldn't be written (include logs_directory and job_id in the message),
persist that status/message, and return from _schedule_job so scheduling
continues for other PENDING jobs rather than raising an exception from the
unwritable logs_directory.
- Around line 440-451: Wrap the dataset initialization (call to
_ensure_dataset_exists) in the per-job failure path instead of after
paths_to_compress_buffer.flush(): move the _ensure_dataset_exists check to run
before calling paths_to_compress_buffer.flush(), and/or surround the existing
_ensure_dataset_exists call with a try/except that on any exception marks the
current job as FAILED (update the job status via the same mechanism used
elsewhere in this file) and returns early so _batch_and_submit_tasks() is not
skipped; ensure you reference and update the same job record used by
_batch_and_submit_tasks() so there are no dangling buffered partitions for a
failed job.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

Run ID: d3a90796-3fab-4c34-95ae-07de6e033a81

📥 Commits

Reviewing files that changed from the base of the PR and between 467feee and b050de8.

📒 Files selected for processing (1)
  • components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py

Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
@goynam goynam force-pushed the fix/compression-job-config-truncation branch from 5aa4adc to f89a892 Compare April 13, 2026 18:40
@goynam goynam requested a review from sitaowang1998 April 13, 2026 18:40
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Silent data truncation in orchestration DB causes compression scheduler crash — VARBINARY(60000) too small for clp_config blobs

2 participants