Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def main(argv):
`num_tasks` INT NOT NULL DEFAULT '0',
`num_tasks_completed` INT NOT NULL DEFAULT '0',
`clp_binary_version` INT NULL DEFAULT NULL,
`clp_config` VARBINARY(60000) NOT NULL,
`clp_config` MEDIUMBLOB NOT NULL,
PRIMARY KEY (`id`) USING BTREE,
INDEX `JOB_STATUS` (`status`) USING BTREE,
INDEX `JOB_UPDATE_TIME` (`update_time`) USING BTREE
Expand All @@ -92,7 +92,7 @@ def main(argv):
`start_time` DATETIME(3) NULL DEFAULT NULL,
`duration` FLOAT NULL DEFAULT NULL,
`job_id` INT NOT NULL,
`clp_paths_to_compress` VARBINARY(60000) NOT NULL,
`clp_paths_to_compress` MEDIUMBLOB NOT NULL,
`partition_original_size` BIGINT NOT NULL,
`partition_uncompressed_size` BIGINT NULL DEFAULT NULL,
`partition_compressed_size` BIGINT NULL DEFAULT NULL,
Expand All @@ -118,7 +118,7 @@ def main(argv):
`num_tasks_completed` INT NOT NULL DEFAULT '0',
`start_time` DATETIME(3) NULL DEFAULT NULL,
`duration` FLOAT NULL DEFAULT NULL,
`job_config` VARBINARY(60000) NOT NULL,
`job_config` MEDIUMBLOB NOT NULL,
PRIMARY KEY (`id`) USING BTREE,
INDEX `CREATION_TIME` (`creation_time`) USING BTREE,
INDEX `JOB_STATUS` (`status`) USING BTREE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,114 +308,156 @@ def search_and_schedule_new_tasks(
# TODO: revisit why we need to commit here. To end long transactions?
db_context.connection.commit()
for job_row in jobs:
job_id = job_row["id"]
_schedule_job(
clp_config,
clp_metadata_db_connection_config,
task_manager,
db_context,
job_row,
existing_datasets,
)


def _schedule_job(
clp_config: ClpConfig,
clp_metadata_db_connection_config: dict[str, Any],
task_manager: TaskManager,
db_context: DbContext,
job_row: dict[str, Any],
existing_datasets: set[str],
) -> None:
"""
Schedules a single pending compression job: deserializes its config, validates input paths,
and submits compression tasks.

On failure, the job is marked as FAILED in the database and the function returns early.

:param clp_config:
:param clp_metadata_db_connection_config:
:param task_manager:
:param db_context:
:param job_row: A row from the compression jobs table.
:param existing_datasets: [in/out] May be updated with newly created datasets.
"""
job_id = job_row["id"]
try:
clp_io_config = ClpIoConfig.model_validate(
msgpack.unpackb(brotli.decompress(job_row["clp_config"]))
)
input_config = clp_io_config.input

# Prepare paths buffer
paths_to_compress_buffer = PathsToCompressBuffer(
maintain_file_ordering=False,
empty_directories_allowed=True,
scheduling_job_id=job_id,
clp_io_config=clp_io_config,
clp_metadata_db_connection_config=clp_metadata_db_connection_config,
except Exception:
logger.exception("Failed to decompress clp_config for job %s", job_id)
update_compression_job_metadata(
db_context,
job_id,
{
"status": CompressionJobStatus.FAILED,
"status_msg": "Failed to decompress job config. The config data may be"
" corrupted or truncated.",
},
)
return
input_config = clp_io_config.input

# Prepare paths buffer
paths_to_compress_buffer = PathsToCompressBuffer(
maintain_file_ordering=False,
empty_directories_allowed=True,
scheduling_job_id=job_id,
clp_io_config=clp_io_config,
clp_metadata_db_connection_config=clp_metadata_db_connection_config,
)

# Process input paths
input_type = input_config.type
if input_type == InputType.FS.value:
invalid_path_messages = _process_fs_input_paths(input_config, paths_to_compress_buffer)
if len(invalid_path_messages) > 0:
user_log_relative_path = _write_user_failure_log(
title="Failed input paths log.",
content=invalid_path_messages,
logs_directory=clp_config.logs_directory,
job_id=job_id,
filename_suffix="failed_paths",
)
if user_log_relative_path is None:
err_msg = "Failed to write user log for invalid input paths."
raise RuntimeError(err_msg)

error_msg = (
"At least one of your input paths could not be processed."
f" See the error log at '{user_log_relative_path}' inside your configured logs"
" directory (`logs_directory`) for more details."
)
# Process input paths
input_type = input_config.type
if input_type == InputType.FS.value:
invalid_path_messages = _process_fs_input_paths(input_config, paths_to_compress_buffer)
if len(invalid_path_messages) > 0:
user_log_relative_path = _write_user_failure_log(
title="Failed input paths log.",
content=invalid_path_messages,
logs_directory=clp_config.logs_directory,
job_id=job_id,
filename_suffix="failed_paths",
)
if user_log_relative_path is None:
err_msg = "Failed to write user log for invalid input paths."
raise RuntimeError(err_msg)

error_msg = (
"At least one of your input paths could not be processed."
f" See the error log at '{user_log_relative_path}' inside your configured logs"
" directory (`logs_directory`) for more details."
)

update_compression_job_metadata(
db_context,
job_id,
{
"status": CompressionJobStatus.FAILED,
"status_msg": error_msg,
},
)
return
elif input_type == InputType.S3.value:
try:
_process_s3_input(input_config, paths_to_compress_buffer)
except Exception as err:
logger.exception("Failed to process S3 input")
update_compression_job_metadata(
db_context,
job_id,
{
"status": CompressionJobStatus.FAILED,
"status_msg": f"S3 Failure: {err}",
},
)
return
elif input_type == InputType.S3_OBJECT_METADATA.value:
try:
_process_s3_object_metadata_input(
input_config, paths_to_compress_buffer, db_context
)
except Exception as err:
logger.exception("Failed to process S3 object metadata input for job %s", job_id)
update_compression_job_metadata(
db_context,
job_id,
{
"status": CompressionJobStatus.FAILED,
"status_msg": f"S3 object metadata input failure: {err}",
},
)
return
else:
logger.error(f"Unsupported input type {input_type}")
update_compression_job_metadata(
db_context,
job_id,
{
"status": CompressionJobStatus.FAILED,
"status_msg": f"Unsupported input type: {input_type}",
"status_msg": error_msg,
},
)
return
paths_to_compress_buffer.flush()

if StorageEngine.CLP_S == clp_config.package.storage_engine:
table_prefix = clp_metadata_db_connection_config["table_prefix"]
dataset = clp_io_config.input.dataset
_ensure_dataset_exists(
clp_config,
elif input_type == InputType.S3.value:
try:
_process_s3_input(input_config, paths_to_compress_buffer)
except Exception as err:
logger.exception("Failed to process S3 input")
update_compression_job_metadata(
db_context,
table_prefix,
dataset,
existing_datasets,
job_id,
{
"status": CompressionJobStatus.FAILED,
"status_msg": f"S3 Failure: {err}",
},
)
return
elif input_type == InputType.S3_OBJECT_METADATA.value:
try:
_process_s3_object_metadata_input(input_config, paths_to_compress_buffer, db_context)
except Exception as err:
logger.exception("Failed to process S3 object metadata input for job %s", job_id)
update_compression_job_metadata(
db_context,
job_id,
{
"status": CompressionJobStatus.FAILED,
"status_msg": f"S3 object metadata input failure: {err}",
},
)
return
else:
logger.error("Unsupported input type %s", input_type)
update_compression_job_metadata(
db_context,
job_id,
{
"status": CompressionJobStatus.FAILED,
"status_msg": f"Unsupported input type: {input_type}",
},
)
return
paths_to_compress_buffer.flush()

_batch_and_submit_tasks(
if StorageEngine.CLP_S == clp_config.package.storage_engine:
table_prefix = clp_metadata_db_connection_config["table_prefix"]
dataset = clp_io_config.input.dataset
_ensure_dataset_exists(
clp_config,
task_manager,
db_context,
job_id,
paths_to_compress_buffer,
table_prefix,
dataset,
existing_datasets,
)

_batch_and_submit_tasks(
clp_config,
task_manager,
db_context,
job_id,
paths_to_compress_buffer,
)


def poll_running_jobs(
clp_config: ClpConfig, task_manager: TaskManager, db_context: DbContext
Expand Down
Loading