Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
5075547
feat: UploadLogFile command implementation
AcquaDiGiorgio Feb 2, 2026
7a03ef2
chore: improve UploadLogFile tests
AcquaDiGiorgio Feb 4, 2026
fd12496
feat: Change UploadLogFile DataManager Mocks to real DIRAC Classes
AcquaDiGiorgio Feb 11, 2026
8317c9f
chore: Update project name at imports
AcquaDiGiorgio Feb 12, 2026
91cef73
chore: setup lhcbdirac dependency to fork
AcquaDiGiorgio Apr 27, 2026
98ccc37
feat: Migrate BookkeepingReport command to cwl-dirac
AcquaDiGiorgio Apr 27, 2026
1da58a2
chore: set lhcbdirac dependency to https instead of ssh
AcquaDiGiorgio Apr 27, 2026
4586f84
chore: remove all DIRAC import mypy type checking
AcquaDiGiorgio Apr 28, 2026
0ad8e0e
feat: Migrate FailoverRequest command to cwl-dirac
AcquaDiGiorgio May 4, 2026
bd285c3
chore(tests): improve command fixtures
AcquaDiGiorgio May 4, 2026
26de911
feat: Migrate UploadOutputData command to cwl-dirac
AcquaDiGiorgio May 5, 2026
b87c180
feat: Migrate AnalyseXmlSummary command to cwl-dirac
AcquaDiGiorgio May 6, 2026
32e54b4
feat: Migrate WorkflowAccounting command to cwl-dirac
AcquaDiGiorgio May 6, 2026
f02159a
feat: Migrate UploadLogFile command to cwl-dirac
AcquaDiGiorgio May 6, 2026
f4d2821
chore: update pixi.lock
AcquaDiGiorgio May 7, 2026
028ca4f
chore: fix BookkeepingReport typo
AcquaDiGiorgio May 11, 2026
d9e24e9
chore: fix possible None values while saving workflow_commons
AcquaDiGiorgio May 11, 2026
6907021
chore: set proper commands exception catching
AcquaDiGiorgio May 11, 2026
1987844
chore: fix job path not being taken into account
AcquaDiGiorgio May 11, 2026
947bc8b
chore: change workflow commons from dict to a pydantic model
AcquaDiGiorgio May 15, 2026
396645e
chore: fix typos
AcquaDiGiorgio May 15, 2026
a81648b
chore: add logging to commands
AcquaDiGiorgio May 18, 2026
dcb1693
chore: wrap command execute function
AcquaDiGiorgio May 19, 2026
2d16150
chore: use DataStoreClient private registersList attribute
AcquaDiGiorgio May 19, 2026
ed1c6f0
feat: Add step information for executions of multiple steps at a time
AcquaDiGiorgio Jun 4, 2026
3a8908d
chore: fix snake-case convention mismatch
AcquaDiGiorgio Jun 4, 2026
dd0ec27
chore: split test_commands into multiple files
AcquaDiGiorgio Jun 9, 2026
fd454ba
feat: move clients from workflow_commons to CommandBase
AcquaDiGiorgio Jun 10, 2026
f720b9c
chore: fix typing
AcquaDiGiorgio Jun 10, 2026
0adbe67
chore: fix variable naming to snake_case
AcquaDiGiorgio Jun 10, 2026
6089b09
chore: fix returnValueOrRaise codeblocks
AcquaDiGiorgio Jun 10, 2026
c91c839
chore: rename commands to a verb_nouns format
AcquaDiGiorgio Jun 10, 2026
ddbbd58
chore: fix command execution when job_path is not cwd
AcquaDiGiorgio Jun 10, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10,815 changes: 5,844 additions & 4,971 deletions pixi.lock

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ dependencies = [
"diracx-client>=0.0.8",
"diracx-cli>=0.0.8",
"lbprodrun",
"LHCbDIRAC @ git+https://git@gitlab.cern.ch/jlisalab/LHCbDIRAC.git@modules-to-cwl-migration", # Temporary fork dependency
"pydantic",
"pyyaml",
"typer",
Expand Down Expand Up @@ -78,7 +79,7 @@ allow_redefinition = true
enable_error_code = ["import", "attr-defined"]

[[tool.mypy.overrides]]
module = ["requests", "yaml"]
module = ["requests", "yaml", "DIRAC.*", "LHCbDIRAC.*", "DIRACCommon.*"]
ignore_missing_imports = true

[tool.pytest.ini_options]
Expand Down
17 changes: 16 additions & 1 deletion src/dirac_cwl/commands/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,20 @@
"""Command classes for workflow pre/post-processing operations."""

from .analyze_xml_summary import AnalyseXmlSummary
from .core import PostProcessCommand, PreProcessCommand
from .create_failover_request import CreateFailoverRequest
from .register_accounting_report import RegisterAccountingReport
from .report_bookkeeping import ReportBookkeeping
from .upload_log_file import UploadLogFile
from .upload_output_data import UploadOutputData

__all__ = ["PreProcessCommand", "PostProcessCommand"]
__all__ = [
"AnalyseXmlSummary",
"PreProcessCommand",
"PostProcessCommand",
"UploadLogFile",
"ReportBookkeeping",
"CreateFailoverRequest",
"UploadOutputData",
"RegisterAccountingReport",
]
58 changes: 58 additions & 0 deletions src/dirac_cwl/commands/analyze_xml_summary.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
"""LHCb command for checking the XMLSummary output to ensure that the execution was done correctly."""

import logging
import os

from DIRAC.TransformationSystem.Client.FileReport import FileReport
from LHCbDIRAC.Workflow.Modules.AnalyseXMLSummary import _areInputsOK, _isXMLSummaryOK

from dirac_cwl.core.exceptions import WorkflowProcessingException

from .core import PostProcessCommand
from .workflow_commons import Step, StepStatus, WorkflowCommons

logger = logging.getLogger(__name__)


class AnalyseXmlSummary(PostProcessCommand):
"""Performs a series of checks on the XMLSummary output to make sure the execution was done correctly."""

def _execute(self, job_path: os.PathLike[str], workflow_commons: WorkflowCommons, **kwargs):
"""Execute the command.

:param job_path: Path to the job working directory.
:param workflow_commons: WorkflowCommons object
:param kwargs: Additional keyword arguments.
"""
for step in workflow_commons.steps:
self._execute_for_step(job_path, workflow_commons, step, **kwargs)

def _execute_for_step(
self, job_path: os.PathLike[str], workflow_commons: WorkflowCommons, step_commons: Step, **kwargs
):
"""Execute the command for a specific step."""
job_ok = _isXMLSummaryOK(step_commons.xf_o)

if job_ok:
job_ok = _areInputsOK(
step_commons.xf_o,
step_commons.inputs,
step_commons.number_of_events,
workflow_commons.production_id,
self.file_report,
)
if not job_ok:
self.job_report.setApplicationStatus("XMLSummary reports error")
raise WorkflowProcessingException("XMLSummary reports error")

if workflow_commons.step_status == StepStatus.Failed:
logger.info("Workflow already failed")
return

self.job_report.setApplicationStatus(f"{step_commons.application_name} Step OK")

def _resolve_clients(self, workflow_commons: WorkflowCommons):
super()._resolve_clients(workflow_commons)

if not self.file_report:
self.file_report = FileReport()
69 changes: 67 additions & 2 deletions src/dirac_cwl/commands/core.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,22 @@
"""Core base classes for workflow processing commands."""

import logging
import os
from abc import ABC, abstractmethod
from pathlib import Path

from DIRAC.AccountingSystem.Client.DataStoreClient import DataStoreClient
from DIRAC.DataManagementSystem.Client.DataManager import DataManager
from DIRAC.DataManagementSystem.Client.FailoverTransfer import FailoverTransfer
from DIRAC.RequestManagementSystem.Client.Request import Request
from DIRAC.TransformationSystem.Client.FileReport import FileReport
from DIRAC.WorkloadManagementSystem.Client.JobReport import JobReport
from LHCbDIRAC.BookkeepingSystem.Client.BookkeepingClient import BookkeepingClient

from dirac_cwl.core.exceptions import WorkflowProcessingException

from .workflow_commons import WorkflowCommons

logger = logging.getLogger(__name__)


class CommandBase(ABC):
Expand All @@ -12,8 +27,58 @@ class CommandBase(ABC):
:class:`dirac_cwl.commands.base.PostProcessCommand`
"""

request: Request = None
failover_transfer: FailoverTransfer = None
job_report: JobReport = None
file_report: FileReport = None
data_manager: DataManager = None
bk_client: BookkeepingClient = None
dsc: DataStoreClient = None

def execute(self, job_path: os.PathLike[str], **kwargs) -> None:
"""Execute the command in the given job path.

:param job_path: Path to the job working directory.
:param kwargs: Additional keyword arguments.
"""
failed = False
workflow_commons = None

try:
workflow_commons = WorkflowCommons.load(job_path)

logger.info("WorkflowCommons:\n%s", workflow_commons)

self._resolve_clients(workflow_commons)
self._execute(job_path, workflow_commons, **kwargs)

except WorkflowProcessingException:
failed = True
raise

except Exception as e:
logger.exception("Exception in %s", self.__class__.__name__, exc_info=e)

failed = True
if self.job_report:
self.job_report.setApplicationStatus(repr(e))

raise WorkflowProcessingException(e) from e

finally:
if workflow_commons:
workflow_commons.save(job_path, request=self.request, dsc=self.dsc, failed=failed)

def _resolve_clients(self, workflow_commons: WorkflowCommons):
"""Initialize the required clients.

JobReport is always needed, so when overriding, this needs to be called via super().
"""
if not self.job_report:
self.job_report = JobReport(workflow_commons.job_id)

@abstractmethod
def execute(self, job_path: Path, **kwargs) -> None:
def _execute(self, job_path: os.PathLike[str], workflow_commons: WorkflowCommons, **kwargs) -> None:
"""Execute the command in the given job path.

:param job_path: Path to the job working directory.
Expand Down
127 changes: 127 additions & 0 deletions src/dirac_cwl/commands/create_failover_request.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
"""LHCb command for committing the status of the files in the file report.

The status will be "Processed" if everything ended properly or "Unused" if it did not.
"""

import json
import logging
import os
from pathlib import Path

from DIRAC.AccountingSystem.Client.DataStoreClient import DataStoreClient
from DIRAC.Core.Utilities.ReturnValues import SErrorException, returnValueOrRaise
from DIRAC.RequestManagementSystem.Client.Request import Request
from DIRAC.RequestManagementSystem.private.RequestValidator import RequestValidator
from DIRAC.TransformationSystem.Client.FileReport import FileReport
from LHCbDIRAC.Workflow.Modules.FailoverRequest import _prepareRequest

from .core import PostProcessCommand
from .workflow_commons import StepStatus, WorkflowCommons

logger = logging.getLogger(__name__)


class CreateFailoverRequest(PostProcessCommand):
"""Commits the status of the files in the file report.

The status will be "Processed" if everything ended properly or "Unused" if it did not.
"""

def _execute(self, job_path: os.PathLike[str], workflow_commons: WorkflowCommons, **kwargs):
"""Execute the command.

:param job_path: Path to the job working directory.
:param workflow_commons: WorkflowCommons object.
:param kwargs: Additional keyword arguments.
"""
if not self.request:
self.request = Request(workflow_commons.request_dict)

_prepareRequest(self.request, workflow_commons.job_id)

files_in_file_report = self.file_report.getFiles()

for lfn in workflow_commons.inputs:
if lfn not in files_in_file_report:
status = "Processed" if workflow_commons.step_status == StepStatus.Done else "Unused"
if status == "Unused":
logger.info("Set status of %s to 'Unused' due to workflow failure", lfn)
else:
logger.debug("No status populated for %s, setting to 'Processed'", lfn)

self.file_report.setFileStatus(int(workflow_commons.production_id), lfn, status)

try:
value = returnValueOrRaise(self.file_report.commit())
if value:
logger.info("Status of files have been properly updated in the TransformationDB")
else:
logger.warning("No file status update reported. There are no input files?")
except SErrorException as e:
logger.error("Something went wrong trying fileReport.commit() %s", e)

if self.file_report.getFiles():
logger.error("On first attempt, failed to report file status to TransformationDB")
try:
value = returnValueOrRaise(self.file_report.generateForwardDISET())
if not value:
logger.info("On second attempt, files correctly reported to TransformationDB")
elif workflow_commons.step_status == StepStatus.Done:
logger.info("Adding a SetFileStatus operation to the request")
self.request.addOperation(value)
else:
logger.info("The job should fail: do not set requests, as the DRA will take care")
except SErrorException as e:
logger.warning("Could not generate Operation for file report: %s", e)

if workflow_commons.step_status == StepStatus.Done:
self.job_report.setApplicationStatus("Job Finished Successfully", True)

self.generate_failover_file(job_path, workflow_commons)

def _resolve_clients(self, workflow_commons: WorkflowCommons):
super()._resolve_clients(workflow_commons)

if not self.file_report:
self.file_report = FileReport()

if not self.dsc:
self.dsc = DataStoreClient()

def generate_failover_file(self, job_path: os.PathLike[str], workflow_commons: WorkflowCommons):
"""Create a request.json file."""
try:
diset_op = returnValueOrRaise(self.job_report.generateForwardDISET())
if diset_op:
logger.info("Populating request with job report information")
self.request.addOperation(diset_op)
except SErrorException as e:
logger.warning("Could not generate Operation for job report", exc_info=e)

if len(self.request):
# Try to optimize the request
try:
returnValueOrRaise(self.request.optimize())
except SErrorException as e:
logger.error("Could not optimize", exc_info=e)
logger.error("Not failing the job because of that, keep going")
except Exception:
pass

# Validate self.request
returnValueOrRaise(RequestValidator().validate(self.request))

# Get the self.request as a Json
request_json_content = returnValueOrRaise(self.request.toJSON())

# Write it
fname = Path(job_path).joinpath(
f"{workflow_commons.production_id}_{workflow_commons.prod_job_id}_request.json"
)
with open(fname, "w", encoding="utf-8") as f:
json.dump(request_json_content, f)

if workflow_commons.accounting_registers:
for register in workflow_commons.accounting_registers:
self.dsc.addRegister(register)
self.dsc.commit()
4 changes: 3 additions & 1 deletion src/dirac_cwl/commands/download_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@

from dirac_cwl.commands import PreProcessCommand

from .workflow_commons import WorkflowCommons


class DownloadConfig(PreProcessCommand):
"""Example command that creates a file with named 'content.cfg'."""

def execute(self, job_path, **kwargs):
def _execute(self, job_path: os.PathLike, workflow_commons: WorkflowCommons, **kwargs):
"""Execute the configuration download.

:param job_path: Path to the job working directory.
Expand Down
4 changes: 3 additions & 1 deletion src/dirac_cwl/commands/group_outputs.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@

from dirac_cwl.commands import PostProcessCommand

from .workflow_commons import WorkflowCommons


class GroupOutputs(PostProcessCommand):
"""Example command that merges all of the outputs in a singular file."""

def execute(self, job_path, **kwargs):
def _execute(self, job_path: os.PathLike, workflow_commons: WorkflowCommons, **kwargs):
"""Execute the output file grouping.

:param job_path: Path to the job working directory.
Expand Down
Loading
Loading