Skip to content
Draft
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
5075547
feat: UploadLogFile command implementation
AcquaDiGiorgio Feb 2, 2026
7a03ef2
chore: improve UploadLogFile tests
AcquaDiGiorgio Feb 4, 2026
fd12496
feat: Change UploadLogFile DataManager Mocks to real DIRAC Classes
AcquaDiGiorgio Feb 11, 2026
8317c9f
chore: Update project name at imports
AcquaDiGiorgio Feb 12, 2026
91cef73
chore: setup lhcbdirac dependency to fork
AcquaDiGiorgio Apr 27, 2026
98ccc37
feat: Migrate BookkeepingReport command to cwl-dirac
AcquaDiGiorgio Apr 27, 2026
1da58a2
chore: set lhcbdirac dependency to https instead of ssh
AcquaDiGiorgio Apr 27, 2026
4586f84
chore: remove all DIRAC import mypy type checking
AcquaDiGiorgio Apr 28, 2026
0ad8e0e
feat: Migrate FailoverRequest command to cwl-dirac
AcquaDiGiorgio May 4, 2026
bd285c3
chore(tests): improve command fixtures
AcquaDiGiorgio May 4, 2026
26de911
feat: Migrate UploadOutputData command to cwl-dirac
AcquaDiGiorgio May 5, 2026
b87c180
feat: Migrate AnalyseXmlSummary command to cwl-dirac
AcquaDiGiorgio May 6, 2026
32e54b4
feat: Migrate WorkflowAccounting command to cwl-dirac
AcquaDiGiorgio May 6, 2026
f02159a
feat: Migrate UploadLogFile command to cwl-dirac
AcquaDiGiorgio May 6, 2026
f4d2821
chore: update pixi.lock
AcquaDiGiorgio May 7, 2026
028ca4f
chore: fix BookkeepingReport typo
AcquaDiGiorgio May 11, 2026
d9e24e9
chore: fix possible None values while saving workflow_commons
AcquaDiGiorgio May 11, 2026
6907021
chore: set proper commands exception catching
AcquaDiGiorgio May 11, 2026
1987844
chore: fix job path not being taken into account
AcquaDiGiorgio May 11, 2026
947bc8b
chore: change workflow commons from dict to a pydantic model
AcquaDiGiorgio May 15, 2026
396645e
chore: fix typos
AcquaDiGiorgio May 15, 2026
a81648b
chore: add logging to commands
AcquaDiGiorgio May 18, 2026
dcb1693
chore: wrap command execute function
AcquaDiGiorgio May 19, 2026
2d16150
chore: use DataStoreClient private registersList attribute
AcquaDiGiorgio May 19, 2026
ed1c6f0
feat: Add step information for executions of multiple steps at a time
AcquaDiGiorgio Jun 4, 2026
3a8908d
chore: fix snake-case convention mismatch
AcquaDiGiorgio Jun 4, 2026
dd0ec27
chore: split test_commands into multiple files
AcquaDiGiorgio Jun 9, 2026
fd454ba
feat: move clients from workflow_commons to CommandBase
AcquaDiGiorgio Jun 10, 2026
f720b9c
chore: fix typing
AcquaDiGiorgio Jun 10, 2026
0adbe67
chore: fix variable naming to snake_case
AcquaDiGiorgio Jun 10, 2026
6089b09
chore: fix returnValueOrRaise codeblocks
AcquaDiGiorgio Jun 10, 2026
c91c839
chore: rename commands to a verb_nouns format
AcquaDiGiorgio Jun 10, 2026
7b3d120
chore: fix command execution when job_path is not cwd
AcquaDiGiorgio Jun 10, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
918 changes: 897 additions & 21 deletions pixi.lock

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ dependencies = [
"diracx-client>=0.0.8",
"diracx-cli>=0.0.8",
"lbprodrun",
"LHCbDIRAC @ git+https://git@gitlab.cern.ch/jlisalab/LHCbDIRAC.git@modules-to-cwl-migration", # Temporary fork dependency
"pydantic",
"pyyaml",
"typer",
Expand Down Expand Up @@ -78,7 +79,7 @@ allow_redefinition = true
enable_error_code = ["import", "attr-defined"]

[[tool.mypy.overrides]]
module = ["requests", "yaml"]
module = ["requests", "yaml", "DIRAC.*", "LHCbDIRAC.*", "DIRACCommon.*"]
ignore_missing_imports = true

[tool.pytest.ini_options]
Expand Down
17 changes: 16 additions & 1 deletion src/dirac_cwl/commands/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,20 @@
"""Command classes for workflow pre/post-processing operations."""

from .analyze_xml_summary import AnalyseXmlSummary
from .bookkeeping_report import BookeepingReport
Comment thread
AcquaDiGiorgio marked this conversation as resolved.
Outdated
from .core import PostProcessCommand, PreProcessCommand
from .failover_request import FailoverRequest
from .upload_log_file import UploadLogFile
from .upload_output_data import UploadOutputData
from .workflow_accounting import WorkflowAccounting

__all__ = ["PreProcessCommand", "PostProcessCommand"]
__all__ = [
"AnalyseXmlSummary",
"PreProcessCommand",
"PostProcessCommand",
"UploadLogFile",
"BookeepingReport",
"FailoverRequest",
"UploadOutputData",
"WorkflowAccounting",
]
84 changes: 84 additions & 0 deletions src/dirac_cwl/commands/analyze_xml_summary.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
"""LHCb command for checking the XMLSummary output to ensure that the execution was done correctly."""

import os

from DIRAC.TransformationSystem.Client.FileReport import FileReport
from DIRAC.WorkloadManagementSystem.Client.JobReport import JobReport
from LHCbDIRAC.Core.Utilities.XMLSummaries import XMLSummary
from LHCbDIRAC.Workflow.Modules.AnalyseXMLSummary import _areInputsOK, _isXMLSummaryOK
from LHCbDIRAC.Workflow.Modules.BookkeepingReport import _generate_xml_object

from dirac_cwl.core.exceptions import WorkflowProcessingException

from .core import PostProcessCommand
from .utils import prepare_lhcb_workflow_commons, save_workflow_commons


class AnalyseXmlSummary(PostProcessCommand):
"""Performs a series of checks on the XMLSummary output to make sure the execution was done correctly."""

def execute(self, job_path, **kwargs):
"""Execute the command.

:param job_path: Path to the job working directory.
:param kwargs: Additional keyword arguments.
"""
failed = False
try:
workflow_commons_path = kwargs.get("workflow_commons_path", os.path.join(job_path, "workflow_commons.json"))

workflow_commons = prepare_lhcb_workflow_commons(
workflow_commons_path,
extra_mandatory_values=[
"bk_step_id",
],
extra_default_values={
"bookkeeping_LFNs": [],
"size": {},
"md5": {},
"guid": {},
"sim_description": "NoSimConditions",
Comment thread
AcquaDiGiorgio marked this conversation as resolved.
Outdated
},
)

if not workflow_commons["step_status"]["OK"]:
return

if "xml_summary_path" in workflow_commons:
xf_o = XMLSummary(workflow_commons["xml_summary_path"])
else:
xf_o = _generate_xml_object(
workflow_commons["cleaned_application_name"],
workflow_commons["production_id"],
workflow_commons["prod_job_id"],
workflow_commons["command_number"],
workflow_commons["command_id"],
)

file_report = FileReport()
job_report = JobReport(workflow_commons["job_id"])

file_report.statusDict = workflow_commons["file_report_files_dict"]

jobOk = _isXMLSummaryOK(xf_o)

if jobOk:
jobOk = _areInputsOK(

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: in CWL, I think the new Rust implementation of lb-prod-run is going to be used:

And there are some checks around the xml summary directly integrated into it: https://gitlab.cern.ch/roneil/lbprodrun-rs/-/blob/main/src/summary.rs

So I assume we could only report the file status of the problematic files in this command

xf_o,
workflow_commons["inputs"],
workflow_commons["number_of_events"],
workflow_commons["production_id"],
file_report,
)
if not jobOk:
job_report.setApplicationStatus("XMLSummary reports error")
raise WorkflowProcessingException("XMLSummary reports error")

job_report.setApplicationStatus(f"{workflow_commons['application_name']} Step OK")

except:
failed = True
raise

finally:
save_workflow_commons(workflow_commons, workflow_commons_path, failed=failed)
168 changes: 168 additions & 0 deletions src/dirac_cwl/commands/bookkeeping_report.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
"""LHCb command for bookkeeping report file generation based on the XMLSummary and the XML catalog."""

import os

from DIRAC.Workflow.Utilities.Utils import getStepCPUTimes
from LHCbDIRAC.BookkeepingSystem.Client.BookkeepingClient import BookkeepingClient
from LHCbDIRAC.Core.Utilities.ProductionData import constructProductionLFNs
from LHCbDIRAC.Core.Utilities.XMLSummaries import XMLSummary
from LHCbDIRAC.Workflow.Modules.BookkeepingReport import (
_generate_xml_object,
_generateInputFiles,
_generateOutputFiles,
_prepare_job_info,
_process_time,
)
from LHCbDIRAC.Workflow.Modules.ModulesUtilities import getNumberOfProcessorsToUse

from dirac_cwl.core.exceptions import WorkflowProcessingException

from .core import PostProcessCommand
from .utils import prepare_lhcb_workflow_commons, save_workflow_commons


class BookeepingReport(PostProcessCommand):
"""Generates a bookkeeping report file based on the XMLSummary and the pool XML catalog."""

def execute(self, job_path, **kwargs):
"""Execute the command.

:param job_path: Path to the job working directory.
:param kwargs: Additional keyword arguments.
"""
failed = False
try:
# Obtain Workflow Commons
workflow_commons_path = kwargs.get("workflow_commons_path", os.path.join(job_path, "workflow_commons.json"))

workflow_commons = prepare_lhcb_workflow_commons(
workflow_commons_path,
extra_mandatory_values=[
"bk_step_id",
],
extra_default_values={
"bookkeeping_LFNs": [],
"size": {},
"md5": {},
"guid": {},
"sim_description": "NoSimConditions",
},
)

if not workflow_commons["step_status"]["OK"]:
return

# Setup variables
start_time = workflow_commons.get("start_time", None)

cpu_times = {}
if start_time:
cpu_times["StartTime"] = start_time
if "start_stats" in workflow_commons:
cpu_times["StartStats"] = workflow_commons["start_stats"]

exectime, cputime = getStepCPUTimes(cpu_times)

number_of_processors = getNumberOfProcessorsToUse(
workflow_commons["job_id"], workflow_commons["max_number_of_processors"]
)

bk_client = BookkeepingClient()

parameters = {
"PRODUCTION_ID": workflow_commons["production_id"],
"JOB_ID": workflow_commons["prod_job_id"],
"configVersion": workflow_commons["config_version"],
"outputList": workflow_commons["outputs"],
"configName": workflow_commons["config_name"],
"outputDataFileMask": workflow_commons["output_data_file_mask"],
}

if "bookkeeping_LFNs" in workflow_commons and "production_output_data" in workflow_commons:
bk_lfns = workflow_commons["bookkeeping_LFNs"]

if not isinstance(bk_lfns, list):
bk_lfns = [i.strip() for i in bk_lfns.split(";")]

else:
result = constructProductionLFNs(parameters, bk_client)
if not result["OK"]:
raise WorkflowProcessingException("Could not create production LFNs")

bk_lfns = result["Value"]["BookkeepingLFNs"]

ldate, ltime, ldatestart, ltimestart = _process_time(start_time)

# Obtain XMLSummary
if "xml_summary_path" in workflow_commons:
xf_o = XMLSummary(workflow_commons["xml_summary_path"])
else:
xf_o = _generate_xml_object(
workflow_commons["cleaned_application_name"],
workflow_commons["production_id"],
workflow_commons["prod_job_id"],
workflow_commons["command_number"],
workflow_commons["command_id"],
)

info_dict = {
"exectime": exectime,
"cputime": cputime,
"numberOfProcessors": number_of_processors,
"production_id": workflow_commons["production_id"],
"jobID": workflow_commons["job_id"],
"siteName": workflow_commons["site_name"],
"jobType": workflow_commons["job_type"],
"applicationName": workflow_commons["application_name"],
"applicationVersion": workflow_commons["application_version"],
"numberOfEvents": workflow_commons["number_of_events"],
}
Comment thread
AcquaDiGiorgio marked this conversation as resolved.
Outdated

# Generate job_info object
job_info = _prepare_job_info(
info_dict,
ldatestart,
ltimestart,
ldate,
ltime,
xf_o,
workflow_commons["inputs"],
workflow_commons["command_id"],
workflow_commons["bk_step_id"],
bk_client,
workflow_commons["config_name"],
workflow_commons["config_version"],
)

# Add input files to job_info
_generateInputFiles(job_info, bk_lfns, workflow_commons["inputs"])

# Add output files to job_info
_generateOutputFiles(
job_info,
bk_lfns,
workflow_commons["event_type"],
workflow_commons["application_name"],
xf_o,
workflow_commons["outputs"],
workflow_commons["inputs"],
)

# Generate SimulationConditions
if workflow_commons["application_name"] == "Gauss":
job_info.simulation_condition = workflow_commons["sim_description"]

# Convert job_info object to XML
doc = job_info.to_xml()

# Write to file
bfilename = f"bookkeeping_{workflow_commons['command_id']}.xml"
with open(bfilename, "wb") as bfile:
bfile.write(doc)

except:
failed = True
raise

finally:
save_workflow_commons(workflow_commons, workflow_commons_path, failed=failed)
Comment thread
AcquaDiGiorgio marked this conversation as resolved.
Outdated
Loading
Loading